Reactive is certainly not for the faint of heart. It can be broadly categorized into reactive programming, reactive systems, and the Reactive Streams specification. Fundamentally, reactive addresses one of the most challenging areas in computing, namely, distributed event-driven systems. But it promises higher performance, responsiveness, and lower memory footprints by leveraging the use of asynchronous and non-blocking calls, and operating on streams of data. We would not go into all of the concepts and theories in this article, and if you are new to reactive, it will be good to start with reactive programming through the use of the reactive operators from ReactiveX.
"ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences."
While ReactiveX is not a specification per se, it is a polyglot implementation as evident by its availability in many languages, with RxJava being especially popular in Android applications. While the exact usages and names of the operators may not be the same across all language implementations, the concepts do apply.
It would not be possible to discuss all of the available ReactiveX operators as there are well over 450, while RxJava alone has 248 based on this list. To align with the current Advent season (in anticipation of a time of creation), during which this article is being published, we will look at the observer pattern and a few creating operators with the help of the marble diagrams. It is worth noting the difference between RxJava1 and RxJava2, in which back-pressure is only supported in the latter version 2 (and subsequent versions beyond).
Observables
It is important to first discuss about observables in RxJava, as an Observable can be thought of as the “backbone” for the reactive operation, upon which the items that are emitted by the observable are being operated on (or transformed) by the operator. The diagram also lets us visualize a difference between reactive and imperative programming, in which the essence of time is also being captured, as shown with the top observable ‘time-line’ being transformed, and after which the updated items are shown on the resulting one below.
Observables essentially represent the “sources” of data, while observers, or subscribers, are the ones that normally end up consuming the resulting data. The two interacting with each other also form the observer pattern.
Observable in RxJava1 does not have support for back-pressure. In RxJava2, Observable continues to be available, but if you need Reactive Streams and back-pressure support, you can turn to Flowable.
The Creating Operators
Creating from scratch
The create operator allows for the creation of an observable from scratch. It essentially bridges the reactive world with the bygone callback-style days.
Observable.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onComplete();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellable(c::close);
});
Types of Observables
OBSERVABLES TYPE | #ITEMS being emitted | Terminates with |
---|---|---|
Flowable <T> | 0 or n | Success or Error event |
Observable<T> | 0 or n | Success or Error event |
Single<T> | 1 or an error | Success with 1 or Error event |
Maybe<T> | 1 or None or an error | Success with 1 or none, or Error event |
Completable | None | Success or Error event |
Operators for Creating Observables
OPERATOR | DESCRIPTION |
---|---|
just | Constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription |
fromArray | Takes an array and emits their values in their order in the data structure |
fromCallable | Takes a java.util.concurrent.Callable<V> and relays the resulting value from its invocation back to the consumer |
fromFuture | Takes a java.util.concurrent.Future and relays back the ‘future’ resulting value from that back to the consumer |
fromIterable | Converts an Iterable sequence into an ObservableSource that emits the items in the sequence. |
Let’s look at a few simplified examples
Transforming an Observable containing an array of Strings:
import io.reactivex.Observable;
public class Main {
public static void main(String[] args) {
Observable.just("hello", "friends", "twitch", "LostAlgorithm", "timeless_potato", "appboy", "hedi")
.map(String::toUpperCase)
.subscribe(System.out::println);
}
}
- Converts the value of the String arrays to upper case
- Expected results:
HELLO FRIENDS TWITCH LOSTALGORITHM TIMELESS_POTATO APPBOY HEDI
Illustrating the use of an Observable as created from a Callable:
import io.reactivex.Observable;
import java.util.concurrent.Callable;
public class FromCallableObservableTest {
public static void main(String[] argv) {
String charAsString = "TwitchFriends from the World";
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromCallable(new SampleCallable());
observable.take(3)
.subscribe( c -> result.append(c));
System.out.println(result.toString());
}
}
class SampleCallable implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return Thread.currentThread().getName();
}
}
- An Observable gets created by invoking a Callable, which is implemented as an inner class that returns the name of its main Thread, in turn, this name gets relayed back to the original subscriber
- Expected result:
Main
This article is a simplified introductory discussion on reactive programming, based on ReactiveX and RxJava through the use of its extensive set of operators. Many of my audience have asked me what the best way would be to start learning about reactive, and I believe that by learning and practicing with the reactive operators, it should be very helpful in helping us all with the paradigm shift. I anticipate writing more blog posts in the new year, with more in-depth discussions and some code katas for you to try out. Please stay tuned. Happy Holidays!