Site icon JVM Advent

Are Kotlin Coroutines Enough to Replace RxJava?

When JetBrains first announced coroutines as Kotlin’s asynchronous programming solution, many developers were intrigued but doubtful whether or not this shiny new gem would be enough to solve all their asynchronous problems. At that time, in the Java world, the market standard was the reactive library RxJava (which also has equivalents in other languages – RxSwift, RxJS, Rx.NET, etc.).

It seemed like the suspend modifier and the Flow couldn’t hold a candle to the old and proven Rx with its complexity and many operators. Apart from that, RxJava was a stable, battle-tested library, while coroutines had just become stable with many of their APIs still marked as experimental.

Quite some time had passed since the coroutines became stable in Kotlin 1.3 – at the moment of writing this article, the team at JetBrains has just released Kotlin 1.6. And although we still pretty much have the suspend modifier and the Flow to compete with all the functionality of RxJava, in this article, I would like to explore the question: “Are Kotlin coroutines enough to replace RxJava?”

Contents

1. Comparing the stream types
    1.1. Stream types in RxJava
    1.2. Their equivalents in coroutines
2. Comparing the basic implementation
3. Comparing the operators
    3.1. Completable, Single, Maybe
    3.2. Flow operators vs Rx operators
        3.2.1. .map()
        3.2.2. .flatMap()
        3.2.3. .filter()
        3.2.4. Backpressure operators
        3.2.5. Other operators
4. Subjects / Hot streams
5. Other considerations
6. Conclusion

Comparing the stream types

This article assumes at least the basic knowledge of RxJava (or any other Rx library) and the basic understanding of how coroutines work. If one or the other is lacking, some of the concepts and examples might prove challenging.

The coroutines are fundamentally very different from RxJava, so it is tricky to compare them directly. However, what we can do, is compare the solutions they offer to everyday asynchronous problems.

Stream types in RxJava

Before we jump into specifics, let’s look at the stream types offered by the RxJava library and compare them to corresponding solutions in coroutines. Later in the article, we will look at how to use these types in more detail.

Starting from RxJava version 2, the basic stream types we can work with are as follows:

1. Observable<T> // a flow of 0..N items, **without** any back-pressure management
2. Flowable<T> // a flow of 0..N items, **with** back-pressure management
3. Single<T> // a flow of exactly 1 item or an error
4. Maybe<T> // a flow of exactly 1 item, no items, or an error
5. Completable // a flow without items, that only signals either completion or an error

Their equivalents in coroutines

Completable in coroutines would be a simple suspend function that doesn’t return anything, or to be precise – returns Unit :

suspend fun completableOperation() {
    //it will return when it's done
    //or throw an error
}

Single<T> would be a suspend function that returns a non-nullable value:

suspend fun getUser(): User {
    //it will either return a User
    //or throw an error
}

Maybe<T> would be a suspend function that returns a nullable value:

suspend fun getUser(): User? {
    //it will return either a User or a null
    //or throw an error
}

Now let’s look at the Observable and the Flowable. Both are a stream of 0..N events and the main difference between the two is that the Flowable is backpressure aware while the Observable is not. In coroutines, both are covered by the Flow which is an asynchronous data stream that sequentially emits values and completes normally or with an exception.

Later on, we will look into how we can create and use flows, but on a basic level, all we need to use flows is a function that returns a Flow<T>:

fun getDataFlow(): Flow<Data> {
    // returns a flow
}

Take note that the function itself doesn’t require the suspend modifier, since Flows are cold and won’t start emitting data until there is a collector/subscriber. That said, calling collect on a Flow does require a coroutine scope.

The main reason why Flow is enough to replace both the Observable and the Flowable is because of how it handles backpressure.

The solution for backpressure in Flow comes naturally from the design and philosophy of the coroutines library. It doesn’t need some cleverly engineered solution to handle the backpressure explicitly. All the elements in Flow‘s API are marked with the suspend modifier, which is designed to suspend the execution of the caller without blocking the thread. Therefore, when the Flow<T> is emitting and collecting in the same coroutine, if the collector cannot keep up with the data flow, it can simply suspend the emission of elements until it is ready to receive more.

That is why there is no need to have separate solutions for streams with and without backpressure management in coroutines. The nature of coroutines makes the problem of backpressure itself almost non-existent, while at the same time giving you different strategies to handle fast producer/slow consumer situations – more on that in the operator section.

You can find more information about this topic in Roman Elizarov’s article on the design of Kotlin Flow.

Comparing the basic implementation

With that out of the way, let’s now compare the actual implementations of different use cases.

Asynchronous tasks that complete without a result and might throw an error

In RxJava we use a Completable. A CompletableEmitter has a method onComplete to signal completion and a method onError to pass an exception.

/**
 * Completable in RxJava
 */
fun main() {
    completableRequest()
        .subscribe(
            { println("I'm done") },
            { println("Got an exception!") }
        )
}

fun completableRequest(): Completable {
    return Completable.create { emitter ->
        try {
            // process a request
            emitter.onComplete()
        } catch (e: Exception) {
            emitter.onError(e)
        }
    }
}

In coroutines, we call a suspend function that doesn’t return anything (returns Unit), like we would a normal function:

/**
 * Completable equivalent in coroutines
 */
fun main() = runBlocking {
    try {
        doRequest()
        println("I'm done")
    } catch (e: Exception) {
        println("Got an exception!")
    }
}

suspend fun doRequest() {
    // process a request
    delay(500)
}

Don’t use runBlocking in the production code, you should have a proper CoroutineScope. However, we will use it throughout this article to illustrate test scenarios.

Also, I will not discuss error handling in coroutines in this article, since it is a different topic. But keep in mind that you don’t necessarily need to wrap all your suspend functions in try/catch to handle exceptions.

Asynchronous tasks that must return a value or throw an error

In RxJava we use a Single. A SingleEmitter has a method onSuccess to pass a return value and a method onError to pass an exception.

/**
 * Single in RxJava
 */
fun main() {
    singleResult()
        .subscribe(
            { result -> println(result) },
            { println("Got an exception") }
        )
}

fun singleResult(): Single<String> {
    return Single.create { emitter ->
        try {
            // process a request
            emitter.onSuccess("Some result")
        } catch (e: Exception) {
            emitter.onError(e)
        }
    }
}

In coroutines, we call a suspend function that returns a non-nullable value:

/**
 * Single equivalent in coroutines
 */
fun main() = runBlocking {
    try {
        val result = getResult()
        println(result)
    } catch (e: Exception) {
        println("Got an exception")
    }
}

suspend fun getResult(): String {
    // process a request
    delay(100)
    return "Some result"
}

Asynchronous tasks that might return a result or throw an error

In RxJava we use a Maybe. A MaybeEmitter has a method onSuccess to pass a return value, a method onComplete to signal a completion without a value, and a method onError to pass an exception.

/**
 * Maybe in RxJava
 */
fun main() {
    maybeResult()
        .subscribe(
            { result -> println(result) },
            { println("Got an exception") },
            { println("Completed without a value!") }
        )
}

fun maybeResult(): Maybe<String> {
    return Maybe.create { emitter ->
        try {
            // process a request
            if (Random.nextBoolean()) {
                emitter.onSuccess("Some value")
            } else {
                emitter.onComplete()
            }
        } catch (e: Exception) {
            emitter.onError(e)
        }
    }
}

In coroutines, we call a suspend function that returns a nullable value:

/**
 * Maybe equivalent in coroutines
 */
fun main() = runBlocking {
    try {
        val result = getNullableResult()
        if (result != null) {
            println(result)
        } else {
            println("Completed without a value!")
        }
    } catch (e: Exception) {
        println("Got an exception")
    }
}

suspend fun getNullableResult(): String? {
    // process a request
    delay(100)
    return if (Random.nextBoolean()) {
        "Some value"
    } else {
        null
    }
}

I am keeping the Kotlin code as Java-like as possible not to complicate things and alienate people new to Kotlin.

In idiomatic Kotlin we could write the code inside the try/catch block like this:

getNullableResult()
    ?.let(::println)
    ?: println("Completed without a value!")

Asynchronous streams of 0..N events

Not to repeat ourselves, for RxJava, we will only look at Flowable, since Observable is pretty much the same, only without the backpressure management.

Both FlowableEmitter and ObservableEmiter have a method onNext to emit the next stream value, a method onComplete to signal completion of the stream, and a method onError to pass an exception.

/**
 * Flowable in RxJava
 */
fun main() {
    flowableValues()
        .subscribe(
            { value -> println(value) },
            { println("Got an exception") },
            { println("I'm done") }
        )
}

fun flowableValues(): Flowable<Int> {
    val flowableEmitter = { emitter: FlowableEmitter<Int> ->
        try {
            for (i in 1..10) {
                emitter.onNext(i)
            }
        } catch (e: Exception) {
            emitter.onError(e)
        } finally {
            emitter.onComplete()
        }
    }

    return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER)
}

In coroutines, we just create a Flow. There are many ways to do it, but the most common is using the flow { } builder function:

/**
 * Flow in Kotlin
 */
fun main() = runBlocking {
    try {
        eventFlow().collect { value ->
            println(value)
        }
        println("I'm done")
    } catch (e: Exception) {
        println("Got an exception")
    }
}

fun eventFlow() = flow {
    for (i in 1..10) {
        emit(i)
    }
}

In idiomatic Kotlin, one of the ways to create the above flow would be: fun eventFlow() = (1..10).asFlow()

As you can see, we can easily cover all the primary use cases of RxJava in coroutines. Moreover, the design of coroutines allows us to write typical sequential code using all the standard Kotlin features.

It also removes the need for onComplete or onError callbacks. We can catch errors as we would in normal code or set a coroutine exception handler. And, taking into account that when a suspend function completes, the coroutines continues to execute sequentially, we can continue writing our “completion logic” on the next line.

Please note, that this is also true when collecting flows, as we have seen in our example:

eventFlow().collect { value ->
    println(value)
}
println("I'm done")

“I’m done” will be printed after all the elements of the Flow are collected.

Comparing the operators

However, the main strength of RxJava is its operators that allow us to transform and combine streams in multiple ways. There are so many that it is impossible to cover all of them in the scope of one article, so I will only look at the most common ones.

Completable, Single, Maybe

First of all, I need to mention that although Completable, Single, and Maybe streams in RxJava have many of the same operators that Flowable and Observable have (Completable has quite a few less, due to its nature), having any kind of operators in their coroutine counterparts is simply redundant.

Let’s take at a simple .map() operator on a Single stream as an example:

/**
 * Maps Single<String> to
 * Single<User> synchronously
 */
fun main() {
    getUsername()
        .map { username ->
            User(username)
        }
        .subscribe(
            { user -> println(user) },
            { println("Got an exception") }
        )
}

This is something you see quite a lot in RxJava – we take a value and transform it into another value.

That said, once again, given the nature of coroutines, we don’t need a special .map() operator for those kinds of use cases:

/**
 * In coroutines we can write
 * normal sequential code
 */
fun main() = runBlocking {
    try {
        val username = getUsername() // suspend fun
        val user = User(username)
        println(user)
    } catch (e: Exception) {
        println("Got an exception")
    }
}

We can just write normal sequential code using both suspend and normal functions.

Flow operators vs Rx operators

With that out of the way, let’s take a look at what we can do with our Flows and how it compares to the functionality of RxJava.

.map()

First of all, we have a .map() operator that we have looked at in the example above. The Flow has the same operator:

/**
 * Maps Flow<String> to Flow<User>
 */
fun main() = runBlocking {
    usernameFlow()
        .map { username ->
            User(username)
        }
        .collect { user ->
            println(user)
        }
}

Looking inside the implementation of this .map() reveals a major advantage Flow has over RxJava.

After a little simplification for illustration purposes, the basic declaration of .map() looks like:

fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow {
    collect { value -> emit(transform(value)) }
}

We just create a new Flow that collects values from the upstream Flow and emits those values after applying the transform function.

And that’s it.

Most of Flow‘s operators work this way and there is no need to follow strict protocols, as is the case with RxJava. If you open, for example, the code for FlowableMap you will see why writing custom operators for RxJava requires a very good knowledge of Reactive Streams protocols.

Standard Flow operators will be enough for most use cases, but if not, writing a custom operator is straightforward and intuitive.

.flatMap()

The next operator we see a lot in RxJava is .flatMap(). The are many variants of this operator, such as .flatMapSingle(), .flatMapObservable(), .flatMapIterable(), etc.

In RxJava, if we need to make a synchronous transformation on a value, we use .map(). An asynchronous transformation, however, requires one of the .flatMap() variants.

On the other hand, Flow doesn’t require different operators for synchronous and asynchronous code. Lambdas that are passed to Flow‘s operators are marked with the suspend modifier, and therefore you can use both suspend and regular functions inside them.

Here is an example in RxJava:

/**
 * FlatMaps Flowable<String> to
 * Flowable<User> with an asynchronous
 * Single<User> call
 */
fun main() {
    getUsernames() //Flowable<String>
        .flatMapSingle { username ->
            getUserFromNetwork(username) // Single<User>
        }
        .subscribe(
            { user -> println(user) },
            { println("Got an exception") }
        )
}

Translated to Flow this example doesn’t require a different operator. If we need to transform the value in any way we just use .map(), regardless of synchronicity:

/**
 * Flow's .map() operator can be
 * used for both synchronous
 * and asynchronous transformations
 */
fun main() = runBlocking {
    usernameFlow()
        .map { username ->
            getUserFromNetwork(username) // suspend fun
        }
        .collect { user ->
            println(user)
        }
}

That said, there are also a couple of .flatMap() operators on a Flow.flatMapMerge(), .flatMapConcat(), and .flatMapLatest(). They convert values emitted by the upstream flow into new flows and then flatten all the flows into one.

These operators mirror the functionality of .flatMap(), .concatMap(), and .switchMap() when they are called on a Flowable or an Observable.

However, keep in mind that at the moment of writing this article, in Kotlin version 1.6, these operators are marked as either experimental or flow preview.

.filter()

With the .filter() operator we stumble upon one more disadvantage of RxJava.

It takes a predicate (T) -> Boolean and there is no intuitive way to perform asynchronous filtering. For that you need to get creative and, for example, do something like this:

/**
 * One of the ways to achieve
 * asynchronous filtering in
 * RxJava
 */
fun main() {
    getUsernames() // Flowable<String>
        .flatMapSingle { username ->
            isCorrectUsername(username) // Single<Boolean>
                .flatMap { isCorrect ->
                    if (isCorrect) {
                        Single.just(username)
                    } else {
                        Single.never()
                    }
                }
        }.subscribe {
            println(it)
        }
}

In the Flow API, as you might have guessed, the .filter() predicate takes a suspend (T) -> Boolean lambda, which allows us to easily filter streams based on both synchronous and asynchronous calls:

/**
 * Flow's .filter() can filter both
 * synchronously and asynchronously
 */
fun main() = runBlocking {
    usernameFlow()
        .filter { username ->
            isCorrectUsername(username) // suspend fun
        }
        .collect { user ->
            println(user)
        }
}

Backpressure operators

Backpressure is a very complex topic that deserves a separate article. Therefore, this article will only look at the basics and compare the solutions to the most common backpressure situations.

Given the differences in the implementation of RxJava and the coroutines, the actual outputs will not be the same in every example. The goal of these examples is to illustrate different strategies that deal with backpressure.

.onBackpressureDrop()

This strategy discards all the emitted items if the consumer cannot keep up and the buffer is full.

Here is an example from RxJava:

/**
 * An example of .onBackpressureDrop() in RxJava
 */
fun main() {
    Flowable.range(1, 1_000_000)
        .onBackpressureDrop()
	       // change the scheduler and overwrite the default buffer size
        .observeOn(Schedulers.single(), false, 1)
        .subscribe { value ->
            Thread.sleep(100)
            println("Got value: $value")
        }

    Thread.sleep(1000)
}

Output:
Got value: 1

All the values after 1 are dropped since the consumer cannot process them.

There are a couple of things to keep in mind here.

First of all, if we hadn’t changed the Scheduler, we would get all the items since this code would run synchronously, and the consumer would block the producer.

Similarly, by default a Flow is running in the same coroutine, it will sequentially alternate between the emission and the collection until the Flow is done. The difference, however, is that coroutines are non-blocking.

Also, keep in mind, that .observeOn() uses the default buffer size of 128, therefore we are specifying our own size of 1. With the default buffer size we would get the first 128 items if the main() would run long enough.

The equivalent strategy using Flow looks like this:

/**
 * An example of .onBackpressureDrop() in Flow
 */
fun main() = runBlocking {
    (1..1_000_000).asFlow()
        .buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST)
        .collect { value ->
            delay(100)
            println("Got value: $value")
        }
}

Output:
Got value: 1
Got value: 2

Given the nature of coroutines, the collector processes the first two items, but the idea is the same – all unprocessed items are dropped.

We have discussed that when calling .collect() on a Flow, by default, the emission and the collection will run sequentially in the same coroutine. In other words, the collector will suspend the emitter until it is ready to receive more items.

So what is happening here?

For buffering and backpressure handling to work correctly, we need to run the collector in a separate coroutine. That is where the .buffer() operator comes in. It sends all emitted items through a Channel to a collector that is running in a separate coroutine.

It also gives us a buffering functionality:

public fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

We can specify the capacity of our buffer and a strategy for handling onBufferOverflow.

In this example, we have looked at one of those strategies, and now let’s quickly look at the others.

.onBackpressureLatest()

The idea of this strategy is to only emit the latest items in the case of backpressure. However, there is again a difference in how the solutions work in Rx and coroutines.

First, let’s take a look at the Rx example:

/**
 * An example of .onBackpressureLatest() in RxJava
 */
fun main() {
    Flowable.range(1, 1_000_000)
        .onBackpressureLatest()
        .observeOn(Schedulers.single(), false, 2)
        .subscribe { value ->
            Thread.sleep(100)
            println("Got value: $value")
        }

    Thread.sleep(1000)
}

Output:
Got value: 1
Got value: 2
Got value: 1000000

To illustrate an important point, we have increased the buffer size to 2. The buffer, in this case, is filled with the oldest values, and all the subsequent values are dropped, except the last one.

In Flow, however, the equivalent solution fills the buffer with the latest values:

/**
 * An example of .onBackpressureLatest() in Flow
 */
fun main() = runBlocking {
    (1..1_000_000).asFlow()
        .buffer(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
        .collect { value ->
            delay(100)
            println("Got value: $value")
        }
}

Output:
Got value: 1
Got value: 999999
Got value: 1000000

.onBackpressureBuffer()

With this scenario, we can only draw parallels and achieve an equivalent solution for our use case. Which is – we would like to receive the emitted events, even if the consumer cannot keep up.

RxJava and coroutines go about this problem in different ways.

The RxJava can buffer the items until the consumer is ready to process them (which can lead to OutOfMemoryException), while the coroutines can suspend the emitter.

The example in RxJava looks like this:

/**
 * An example of .onBackpressureBuffer() in RxJava
 */
fun main() {
    Flowable.range(1, 1_000_000)
        .onBackpressureBuffer()
        .observeOn(Schedulers.single())
        .subscribe { value ->
            Thread.sleep(100)
            println("Got value: $value")
        }

    Thread.sleep(1000)
}

Output:
// All the items, while main() is running

One of the ways to go about this in Flow is this:

/**
 * An example of .onBackpressureBuffer() in Flow
 */
fun main() = runBlocking {
    (1..1_000_000).asFlow()
        .buffer(capacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)
        .collect { value ->
            delay(100)
            println("Got value: $value")
        }
}

You might ask, isn’t this example the same as without any buffer at all, since the Flow will suspend the emitter by default.

That is a good observation, but keep in mind that when the emitter and collector are running in separate coroutines, they can run in parallel, which is faster if both of them take some time to complete.

Final thoughts on buffering and backpressure

The RxJava library has a lot more functionality for handling backpressure, which we won’t cover in this article. It can be good in terms of flexibility and choice but also bad in terms of the learning curve – to use these operators correctly is not a trivial task for most developers.

In my opinion, Kotlin’s Flow is much more beginner-friendly and easy to use while providing solutions for most problems you might encounter.

Other operators

By this time you should have a good understanding of the philosophy behind Flow operators. To avoid bloating this article, I will just mention that some of the most popular Rx operators: .take(), .retry(), .doOnEach(), .doOnSubscribe(), .debounce(), .zip(), .reduce(), .count(), .onErrorReturn() – all have their counterparts in the Flow API, as well as many others.

Subjects / Hot streams

Now I would like to take a look at Subjects from RxJava and how we can replace them using Kotlin’s Flow. This article is getting lengthy, however, so we will keep it short.

Subjects are hot versions of Observable types – they do not need subscribers to start emitting items.

The most popular Subjects in RxJava are:

  1. PublishSubject – a Subject that emits items to all current subscribers, without caching.
  2. BehaviorSubject – a Subject that emits the last item and all subsequent items to subscribers.
  3. ReplaySubject – a Subject that replays (caches) all the previous items and also emits subsequent items to all subscribers.

There are a couple more Subjects available in RxJava, but in a nutshell, they are hot versions of RxJava types (SingleSubject, MaybeSubject, CompletableSubject), with some added functionality in some cases (AsyncSubject, UnicastSubject).

In Kotlin, we have two “hot” versions of the FlowStateFlow and SharedFlow, however, for most use cases SharedFlow will be more than enough. If we look at the MutableSharedFlow() function that creates an instance of MutableSharedFlow<T>, which we would use as a backing property for a publicly exposed SharedFlow, it reads:

public fun<T>MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

The replay parameter is what makes SharedFlow very flexible. We can specify it as 0 to make our flow behave like a PublishSubject, 1 to replay only the last item like a BehaviorSubject , and as many as we like to basically become a ReplaySubject.

SharedFlow with replay value set to 1 behaves just like a StateFlow, which also can be used to replace BehaviorSubject use cases. The main difference is that a StateFlow requires an initial value, while a SharedFlow doesn’t.

Apart from that, SharedFlow has the extraBufferCapacity, which allows buffering items in addition to replay, and the onBufferOverflow strategy, which we have already discussed earlier.

Other considerations

Both RxJava and coroutines are enormous topics, and we can go on forever comparing their different aspects. Going on forever, however, is not my goal for this article. Therefore, let’s briefly discuss some final considerations before wrapping it up.

These are some points that I feel should be mentioned, but don’t merit a dedicated section.

Conclusion

Before Kotlin got widespread acceptance, I had very extensively used RxJava as the primary asynchronous solution in all my Java projects. So even after I had switched to Kotlin, I was still using Rx for quite some time.

In the beginning, it seemed a little tricky to switch from writing asynchronous code in a functional style of Rx to writing it imperatively, as you would most of your other code. In addition, I needed some time to wrap my head around the idea of coroutines.

However, at some point, I committed to using coroutines for all my asynchronous needs. Then, of course, there were some growing pains, primarily because of my unwillingness to spend a couple of days dissecting documentation. Still, eventually, I got to the point where I was actively migrating all RxJava code to coroutines without looking back.

During that process, I had never encountered a use case where coroutines weren’t enough. Moreover, almost every single time, code had become more readable and easier to use.

Before sitting down to write this article, I haven’t thought extensively about RxJava for years. It even made me feel a little nostalgic. RxJava is a great library, and I enjoyed using it. That said, I am confident that, as long as I am using Kotlin, I will never use RxJava in production again.

It is not a statement against RxJava.

It is a statement of how great the coroutines are.

That is my experience and my opinion. I understand that things might be different for some projects, in which case I would love to hear about it. If your experience differs – don’t hesitate to share it.

For now, however, I will take my leave and play with coroutines a little more.

There is still so much to learn.

Talk to you next time,

Your friend,

Max

Author: Max Kim

Exit mobile version