In the previous posts we got to know what Observable
is and how it is different from Subject
.
Also, in previous examples we used Subjects
and manually pushed values into them to create a Observable
sequence to understand some key concept like most important Rx method, subscribe. In most cases, subjects are not the best way to create a new Observable. We will now see some tidier ways to create observable sequences using some simple Factory methods.
Observable.just
The just method creates an Observable that will emit a predifined sequence of values, supplied on creation, and then terminate.
Observable<String> values = Observable.just("one", "two", "three");
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
// Output
// Received: one
// Received: two
// Received: three
// Completed
Observable.empty
This observable will emit a single onCompleted and nothing else.
Observable<String> values = Observable.empty();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
// Output
// Completed
Observable.never
This observable will never emit anything
Observable<String> values = Observable.never();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
The code above will print nothing. Note that this doesn’t mean that the program is blocking. In fact, it will terminate immediately.
Observable.error
This observable will emit a single error event and terminate.
Observable<String> values = Observable.error(new Exception("Oops"));
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
// Output
// Error: java.lang.Exception: Oops
Now, you could have thought why we need these kind of Observables which do nothing so, the Empty
, Never
, and Throw
operators generate Observables with very specific and limited behavior. These are useful for testing purposes, and sometimes also for combining with other Observables or as parameters to operators that expect other Observables as parameters.
Now the most important and powerful function for creating Observable which gets widely used is Observable.create
.
Observable.create
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
The Observable.OnSubscribe
Observable<String> values = Observable.create(subscriber -> {
subscriber.onNext("Hello");
subscriber.onCompleted();
});
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
// Output
// Received: Hello
// Completed
When Observer subscribes to the observable (here: values), the corresponding Subscriber instance is passed to your function. As the code is executed, values are being pushed to the subscriber. Note that you have to call onCompleted in the end by yourself, if you want the sequence to signal its completion.
The key difference as compared to Subject
is that the code is executed lazily, when and if an observer subscribes. In the example above, the code is run not when the observable is created (because there is no Subscriber yet), but each time subscribe is called. This means that every value is generated again for each subscriber.
This Observable.create
is the preferred way of creating a custom observable, when none of the existing shorthands serve your purpose. The code is similar to how we created a Subject
and pushed values to it, but there are a few important differences. First of all, the source of the events is neatly encapsulated and separated from unrelated code that means we can place the data source in Observable like this
Observable<String> values = Observable.create(subscriber -> {
while (data = getDataFromTwitter()) {
subscriber.onNext(data);
}
});
Observable.onNext() // there is no such method defined
// here we are not allowed to push value to the Observables once it is created
But, in case of subject
we can push the value even after creating it that could cause a serious
dangers that are not obvious: with a Subject
you are managing state, and anyone with access to the instance can push values into it and alter the sequence
PublishSubject<Double> subject = PublishSubject.create();
subject.subscribe((v -> System.out.println("First: "+ v)));
subject.onNext(1.0); // here we can push the value even after subject is created
subject.subscribe((v -> System.out.println("Second: "+ v)));
subject.onNext(2.0);
Observable.from
Much like most of the functions we’ve seen so far, you can turn any kind of input into an Rx Observable
with create
.
Futures
are part of the Java framework and you may have used it for concurrency. Since, once task is completed they return a single value so, we can turn them into observables.
FutureTask<Integer> f = new FutureTask<Integer>(() -> {
Thread.sleep(2000);
return 21;
});
new Thread(f).start();
Observable<Integer> values = Observable.from(f);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
// Output
// Received: 21
// Completed
The observable emits the result of the FutureTask when it is available and then terminates. If the task is canceled, the observable will emit a java.util.concurrent.CancellationException
error.
If you’re interested in the results of the Future for a limited amount of time, you can provide a timeout period like this
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
If the Future
has not completed in the specified amount of time, the observable will ignore it and fail with a TimeoutException.
You can also turn any collection into an observable using the overloads of Observable.from
that take arrays
and iterables
. This will result in every item in the collection being emitted and then a final onCompleted
event.
Integer[] is = {1,2,3};
Observable<Integer> values = Observable.from(is);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
// Output
// Received: 1
// Received: 2
// Received: 3
// Completed
Also, not that Observable
is not interchangeable with Iterable
or Stream
as Observables
are push-based that means they push the value(once it is available) to all of its subscriber whereas, other Streams
are pull-based, which means that values are requested as soon as possible and execution blocks until the result is returned.
Map<String, Integer> items = new HashMap<>();
items.put("A", 10);
items.put("B", 20);
items.put("C", 30);
items.forEach((k,v)->{
System.out.println("Item : " + k + " Count : " + v);
// pulling the values, which are available in items
});
Functional Constructs
In functional programming it is common to create sequences oF infinite length. RxJava has factory methods that create such observable sequences
Observable.range
A straight forward and familiar method to any functional programmer. It emits the specified range of integers.
Observable<Integer> values = Observable.range(10, 15);
// The example emits the values from 10 to 24 in sequence.
// Output
// Received for one: 10
// Received for one: 11
// Received for one: 12
// Received for one: 13
// ...
// ...
// Received for one: 25
Observable.interval
This function will create an infinite sequence of ticks, separated by the specified time duration.
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
System.in.read();
// Output
// Received: 0
// Received: 1
// Received: 2
// Received: 3
// ...
// ...
This sequence will not terminate until we unsubscribe.
Please note the blocking read System.in.read()
at the end is necessary here. Without it, the program terminates without printing something. That’s because the timer that produces the ticks runs on its own thread and our operations are non-blocking. We create an observable that will emit values over time, then we register the subscriber which processes the value when it arrives. None of that is blocking and the main thread proceeds to terminate so, just to block the main thread and to see the values coming from an Observable
we used blocking read at the end.
In the next post, we will unfold the Hot and Cold Observable