JVM Advent

The JVM Programming Advent Calendar

Reactive OBSERVABLES and CREATING operators

Reactive is certainly not for the faint of heart. It can be broadly categorized into reactive programming, reactive systems, and the Reactive Streams specification. Fundamentally, reactive addresses one of the most challenging areas in computing, namely, distributed event-driven systems. But it promises higher performance, responsiveness, and lower memory footprints by leveraging the use of asynchronous and non-blocking calls, and operating on streams of data. We would not go into all of the concepts and theories in this article, and if you are new to reactive, it will be good to start with reactive programming through the use of the reactive operators from ReactiveX.

"ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences."

While ReactiveX is not a specification per se, it is a polyglot implementation as evident by its availability in many languages, with RxJava being especially popular in Android applications. While the exact usages and names of the operators may not be the same across all language implementations, the concepts do apply.

It would not be possible to discuss all of the available ReactiveX operators as there are well over 450, while RxJava alone has 248 based on this list. To align with the current Advent season (in anticipation of a time of creation), during which this article is being published, we will look at the observer pattern and a few creating operators with the help of the marble diagrams. It is worth noting the difference between RxJava1 and RxJava2, in which back-pressure is only supported in the latter version 2 (and subsequent versions beyond).

Observables

Source: https://reactivex.io/documentation/observable.html

It is important to first discuss about observables in RxJava, as an Observable can be thought of as the “backbone” for the reactive operation, upon which the items that are emitted by the observable are being operated on (or transformed) by the operator. The diagram also lets us visualize a difference between reactive and imperative programming, in which the essence of time is also being captured, as shown with the top observable ‘time-line’ being transformed, and after which the updated items are shown on the resulting one below.

Observables essentially represent the “sources” of data, while observers, or subscribers, are the ones that normally end up consuming the resulting data. The two interacting with each other also form the observer pattern.

Observable in RxJava1 does not have support for back-pressure. In RxJava2, Observable continues to be available, but if you need Reactive Streams and back-pressure support, you can turn to Flowable.

The Creating Operators

Creating from scratch

from http://reactivex.io/documentation/operators/create.html

The create operator allows for the creation of an observable from scratch. It essentially bridges the reactive world with the bygone callback-style days.

 Observable.<Event>create(emitter -> {
     Callback listener = new Callback() {
         @Override
         public void onEvent(Event e) {
             emitter.onNext(e);
             if (e.isLast()) {
                 emitter.onComplete();
             }
         }

         @Override
         public void onFailure(Exception e) {
             emitter.onError(e);
         }
     };

     AutoCloseable c = api.someMethod(listener);

     emitter.setCancellable(c::close);

 });

Types of Observables

OBSERVABLES TYPE#ITEMS being emittedTerminates with
Flowable <T>0 or nSuccess or Error event
Observable<T>0 or nSuccess or Error event
Single<T>1 or an errorSuccess with 1 or Error event
Maybe<T>1 or None or an errorSuccess with 1 or none, or Error event
CompletableNoneSuccess or Error event
Types of Observables as expressed in their respective base classes

Operators for Creating Observables

OPERATORDESCRIPTION
justConstructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription
fromArrayTakes an array and emits their values in their order in the data structure
fromCallableTakes a java.util.concurrent.Callable<V> and relays the resulting value from its invocation back to the consumer
fromFutureTakes a java.util.concurrent.Future and relays back the ‘future’ resulting value from that back to the consumer
fromIterableConverts an Iterable sequence into an ObservableSource that emits the items in the sequence.
Commonly used Operators for creating Observables

Let’s look at a few simplified examples

Transforming an Observable containing an array of Strings:
import io.reactivex.Observable;

public class Main {

    public static void main(String[] args) {
        Observable.just("hello", "friends", "twitch", "LostAlgorithm", "timeless_potato", "appboy", "hedi")
                .map(String::toUpperCase)
                .subscribe(System.out::println);
    }
}
  • Converts the value of the String arrays to upper case
  • Expected results:
HELLO
FRIENDS
TWITCH
LOSTALGORITHM
TIMELESS_POTATO
APPBOY
HEDI
Illustrating the use of an Observable as created from a Callable:
import io.reactivex.Observable;
import java.util.concurrent.Callable;

public class FromCallableObservableTest {

    public static void main(String[] argv) {
        String charAsString = "TwitchFriends from the World";
        final StringBuilder result = new StringBuilder();
        Observable<String> observable = Observable.fromCallable(new SampleCallable());
            observable.take(3)
                    .subscribe( c -> result.append(c));
            System.out.println(result.toString());
        }

}

class SampleCallable implements Callable<String> {

    @Override
    public String call() throws Exception {
        Thread.sleep(1000);
        return Thread.currentThread().getName();
    }
}

  • An Observable gets created by invoking a Callable, which is implemented as an inner class that returns the name of its main Thread, in turn, this name gets relayed back to the original subscriber
  • Expected result:
Main

This article is a simplified introductory discussion on reactive programming, based on ReactiveX and RxJava through the use of its extensive set of operators. Many of my audience have asked me what the best way would be to start learning about reactive, and I believe that by learning and practicing with the reactive operators, it should be very helpful in helping us all with the paradigm shift. I anticipate writing more blog posts in the new year, with more in-depth discussions and some code katas for you to try out. Please stay tuned. Happy Holidays!

Author: Mary Grygleski

Currently the AI Practice Lead at Callibrity. Will always be a passionate globe-trotting technical advocate in topic areas such as Java, Open Source, Cloud, Event “stuff” such as Streams, Data pipelines, and now also AI/ML that includes GenAI. By night, you will find her busy as an active tech community builder, the President of the Chicago JUG, and an assistant organizer of the Chicago chapter of the GenAI Collective. Grateful to be a Java Champion since 2021.

Next Post

Previous Post

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

© 2024 JVM Advent | Powered by steinhauer.software Logosteinhauer.software

Theme by Anders Norén