Why do we need hot/Connectable observable in reactive paradigm (RxJava)?

Imagine a scenario where you're implementing a stock price ticker. You want to stream real-time stock prices to multiple users. Each user should see the same prices in real-time from the moment they start observing, not a separate sequence.

public class StockPriceTicker {
    public static void main(String[] args) throws InterruptedException {
        ConnectableObservable<Long> stockPriceObservable = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .take(5)
                .publish(); // Convert to ConnectableObservable (hot observable)

        // First subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 1: " + price));

        // Sleep for some time To observe the emission between subscriber-1 and subscriber-2   
        Thread.sleep(2000);

        // Second subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 2: " + price));

        // Connect to start emitting
        stockPriceObservable.connect();

        // Sleep for some time to observe emissions
        Thread.sleep(50000);
    }
}

Output:
Subscriber 1: 0
Subscriber 2: 0

Subscriber 1: 1
Subscriber 2: 1

Subscriber 1: 2
Subscriber 2: 2

Subscriber 1: 3
Subscriber 2: 3

Subscriber 1: 4
Subscriber 2: 4
  • Observable Creation: Create an Observable that emits a stock price every second using Observable.interval.

  • Conversion to ConnectableObservable: Use publish() to convert the Observable to a ConnectableObservable. This conversion allows us to control when the emissions start.

  • Subscribers: Add two subscribers to the ConnectableObservable.

  • Starting Emissions: Call connect() to start the emissions. Both subscribers will now receive the same sequence of stock prices from the moment connect() is called.

What if we use cold observable in lieu of Connectable observable ?

By default, observables in RxJava are cold. This means each subscription starts a new sequence of emissions. If subscribers subscribe very close to each other (without Thread in between two subscribers as shown below in the code) in time, it may appear that they are receiving the same sequence of emissions, but each subscriber actually has its own independent sequence.

public class StockPriceTicker {
    public static void main(String[] args) throws InterruptedException {
        Observable<Long> stockPriceObservable = Observable.interval(1, TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io())
                .take(5);

        // First subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 1: " + price));

        // Sleep for some time To observe the emission between subscriber-1 and subscriber-2
        Thread.sleep(2000);

        // Second subscriber
        stockPriceObservable.subscribe(price ->
                System.out.println("Subscriber 2: " + price));

        // Sleep for some time to observe emissions
        Thread.sleep(50000);
    }
}

Output:
Subscriber 1: 0
Subscriber 1: 1
Subscriber 2: 0
Subscriber 1: 2
Subscriber 2: 1
Subscriber 1: 3
Subscriber 2: 2
Subscriber 1: 4
Subscriber 2: 3
Subscriber 2: 4

Key Differences between Cold and ConnectableObservable

  • Cold Observable (Default): Each subscriber gets its own independent sequence.

  • Hot Observable (Using ConnectableObservable): All subscribers share the same sequence from the moment connect() is called.