All Articles

Lifecycle Management of Reactive Subscriptions

Lets get deeper into the Rx, the another core key of Rx is the Reactive Subscription

Reactive Subscription

The intent behind Rx is that it is unknown when an observable sequence emits values or terminates, but you can still have a control over when you begin and stop accepting values. so,when an observer subscribes to the observable it returns a Subscription, it basically allocates the required resources for you and its always best practice to free up the resources once we are done and Rx provides control over your subscriptions to enable you to do that.

please note that RxJava 1.0 had Subscription but in RxJava 2.0 we have Disposable

public interface Disposable {
  void dispose();

  boolean isDisposed();
}

As we have seen in the last post Observer interface has 3 defined methods onNext, onError, onComplete so, while subscribing to the Observable we pass these actions which constructs a subscriber and returns the Disposable

Observable observableEmitter = Observable.create(emitter-> {
  emitter.onNext(1);
});

Disposable disposable = observableEmitter.subscribe(
        v -> System.out.println(v), // passing these functions constructs Observer
        e -> System.err.println(e));

There are several overloads to Observable.subscribe, which are shorthands for the same thing.

Disposable  subscribe()
Disposable  subscribe(Consumer<? super T> onNext)
Disposable  subscribe(Consumer<? super T> onNext, Consumer<java.lang.Throwable> onError)
Disposable  subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)
Disposable  subscribe(Observer<? super T> observer)
Disposable  subscribe(Subscriber<? super T> subscriber)

subscribe() consumes events but performs no actions. The overloads that take one or more Action will construct a Subscriber with the functions that you provide. Where you don’t give an action, the event is practically ignored.

Lets see the following example which handles error

Subject<Integer> subject = ReplaySubject.create();
subject.subscribe(
  v -> System.out.println(v),
  e -> System.err.println(e));
subject.onNext(0);
subject.onError(new Exception("Oops"));
subject.onNext(1);

// Output
// 0
// java.lang.Exception: Oops

If we do not provide a function for error handling, an OnErrorNotImplementedException will be thrown at the point where subject.onError is called, which is the producer’s side. It happens here that the producer and the consumer are side-by-side, so we could do a traditional try-catch. However, on a modularised system, the producer(Observable) and the consumer(Observer) very often are in different places. Unless the consumer provides a handle for errors to subscribe, they will never know that an error has occured and that the sequence was terminated.

Also, notice that after exception Subject/Observable is continue to send the data but Observer didn’t print anything the reason is, in our very first post we saw if error signal occurs through the error channel then no data will be flown through the data channel

Unsubscribing

As I mentioned Rx provides a way to control the subscriptions so that we can free up the resources. So, you can stop receiving values from a Observable sequence. Every subscribe overload returns an instance of Disposable, which is an interface with 2 methods:

public interface Disposable {
    void dispose();

    boolean isDisposed();
}

Calling dispose will stop events from being pushed to your observer.

Subject<Integer>  values = ReplaySubject.create();
Disposable disposable = values.subscribe(
    v -> System.out.println(v),
    e -> System.err.println(e),
    () -> System.out.println("Done")
);
values.onNext(0);
values.onNext(1);
disposable.dispose();
values.onNext(2); // observer will not receive this value

// Output
// 0
// 1

Also, Unsubscribing one observer does not interfere with other observers on the same observable.

Subject<Integer>  values = ReplaySubject.create();
Disposable disposable1 = values.subscribe(
    v -> System.out.println("First: " + v)
);
Disposable disposable2 = values.subscribe(
  v -> System.out.println("Second: " + v)
);

values.onNext(0);
values.onNext(1);

disposable1.unsubscribe();
System.out.println("Unsubscribed first");

values.onNext(2); //second observer will still receive this value


// Output
// First: 0
// Second: 0
// First: 1
// Second: 1
// Unsubscribed first
// Second: 2

Regardless of first subscriber is disposed, second subscriber is still receving the values

Handling onError and onComplete events

onError and onCompleted mean the termination of a sequence. An observable that complies with the Rx contract will not emit anything after either of those events. This is something to note both when consuming in Rx and when implementing your own observables.

Subject<Integer>  values = ReplaySubject.create();
Disposable disposable = values.subscribe(
    v -> System.out.println("First: " + v),
    e -> System.out.println("First: " + e),
    () -> System.out.println("Completed")
);
values.onNext(0);
values.onNext(1);
values.onCompleted();
values.onNext(2);


// Output
// First: 0
// First: 1
// Completed

So, once observable has sent complete signal, no data will be flown from the data channel

Freeing the resources

So, its always a best practice to free up the resources when it is not in use, for example if an Observable has multiple Subscribers, sharing the same resources with each of them, we can release these resources on dispose of each of the subscriber, we can create the binding between a Disposable and the necessary resources using the Disposables factory. Lets have a look at the below example

Observable observableEmitter = Observable.create(emitter -> {
  BufferedReader reader = new BufferedReader(new FileReader("sample.txt"));
  String line;
  while ((line = reader.readLine()) != null)
  {
      emitter.onNext(line);
  }
  emitter.setDisposable(Disposables.fromAction(() -> {
      reader.close();
      System.out.println("resource disposed");

  }));
});

Disposable disposable1 = observableEmitter.subscribe(
        v -> System.out.println("subscriber first " + v),
        e -> System.err.println(e));
disposable1.dispose();

Disposable disposable2 = observableEmitter.subscribe(
        v -> System.out.println("subscriber second " + v),
        e -> System.err.println(e));
disposable2.dispose();


// output
// subscriber first sample
// resource disposed
// subscriber second sample
// resource disposed

In the above example both the observers subscribed to the same observable, while defining observable we set the disposable through Disposables.fromAction which returns Disposable. so, when any observer gets disposed all the resources mentioned in Disposables.fromAction will get released.

In the next post we will be looking in to the Observable and the strategies to create them.