When JetBrains first announced coroutines as Kotlin’s asynchronous programming solution, many developers were intrigued but doubtful whether or not this shiny new gem would be enough to solve all their asynchronous problems. At that time, in the Java world, the market standard was the reactive library RxJava (which also has equivalents in other languages – RxSwift, RxJS, Rx.NET, etc.).
It seemed like the suspend
modifier and the Flow
couldn’t hold a candle to the old and proven Rx with its complexity and many operators. Apart from that, RxJava was a stable, battle-tested library, while coroutines had just become stable with many of their APIs still marked as experimental.
Quite some time had passed since the coroutines became stable in Kotlin 1.3 – at the moment of writing this article, the team at JetBrains has just released Kotlin 1.6. And although we still pretty much have the suspend
modifier and the Flow
to compete with all the functionality of RxJava, in this article, I would like to explore the question: “Are Kotlin coroutines enough to replace RxJava?”
Contents
1. Comparing the stream types 1.1. Stream types in RxJava 1.2. Their equivalents in coroutines 2. Comparing the basic implementation 3. Comparing the operators 3.1. Completable, Single, Maybe 3.2. Flow operators vs Rx operators 3.2.1. .map() 3.2.2. .flatMap() 3.2.3. .filter() 3.2.4. Backpressure operators 3.2.5. Other operators 4. Subjects / Hot streams 5. Other considerations 6. Conclusion
Comparing the stream types
This article assumes at least the basic knowledge of RxJava (or any other Rx library) and the basic understanding of how coroutines work. If one or the other is lacking, some of the concepts and examples might prove challenging.
The coroutines are fundamentally very different from RxJava, so it is tricky to compare them directly. However, what we can do, is compare the solutions they offer to everyday asynchronous problems.
Stream types in RxJava
Before we jump into specifics, let’s look at the stream types offered by the RxJava library and compare them to corresponding solutions in coroutines. Later in the article, we will look at how to use these types in more detail.
Starting from RxJava version 2, the basic stream types we can work with are as follows:
1. Observable<T> // a flow of 0..N items, **without** any back-pressure management 2. Flowable<T> // a flow of 0..N items, **with** back-pressure management 3. Single<T> // a flow of exactly 1 item or an error 4. Maybe<T> // a flow of exactly 1 item, no items, or an error 5. Completable // a flow without items, that only signals either completion or an error
Their equivalents in coroutines
Completable
in coroutines would be a simple suspend
function that doesn’t return anything, or to be precise – returns Unit
:
suspend fun completableOperation() { //it will return when it's done //or throw an error }
Single<T>
would be a suspend
function that returns a non-nullable value:
suspend fun getUser(): User { //it will either return a User //or throw an error }
Maybe<T>
would be a suspend
function that returns a nullable value:
suspend fun getUser(): User? { //it will return either a User or a null //or throw an error }
Now let’s look at the Observable
and the Flowable
. Both are a stream of 0..N events and the main difference between the two is that the Flowable
is backpressure aware while the Observable
is not. In coroutines, both are covered by the Flow
which is an asynchronous data stream that sequentially emits values and completes normally or with an exception.
Later on, we will look into how we can create and use flows, but on a basic level, all we need to use flows is a function that returns a Flow<T>
:
fun getDataFlow(): Flow<Data> { // returns a flow }
Take note that the function itself doesn’t require the
suspend
modifier, sinceFlow
s are cold and won’t start emitting data until there is a collector/subscriber. That said, callingcollect
on aFlow
does require a coroutine scope.
The main reason why Flow
is enough to replace both the Observable
and the Flowable
is because of how it handles backpressure.
The solution for backpressure in Flow
comes naturally from the design and philosophy of the coroutines library. It doesn’t need some cleverly engineered solution to handle the backpressure explicitly. All the elements in Flow
‘s API are marked with the suspend
modifier, which is designed to suspend the execution of the caller without blocking the thread. Therefore, when the Flow<T>
is emitting and collecting in the same coroutine, if the collector cannot keep up with the data flow, it can simply suspend the emission of elements until it is ready to receive more.
That is why there is no need to have separate solutions for streams with and without backpressure management in coroutines. The nature of coroutines makes the problem of backpressure itself almost non-existent, while at the same time giving you different strategies to handle fast producer/slow consumer situations – more on that in the operator section.
You can find more information about this topic in Roman Elizarov’s article on the design of Kotlin Flow.
Comparing the basic implementation
With that out of the way, let’s now compare the actual implementations of different use cases.
Asynchronous tasks that complete without a result and might throw an error
In RxJava we use a Completable
. A CompletableEmitter
has a method onComplete
to signal completion and a method onError
to pass an exception.
/** * Completable in RxJava */ fun main() { completableRequest() .subscribe( { println("I'm done") }, { println("Got an exception!") } ) } fun completableRequest(): Completable { return Completable.create { emitter -> try { // process a request emitter.onComplete() } catch (e: Exception) { emitter.onError(e) } } }
In coroutines, we call a suspend function that doesn’t return anything (returns Unit
), like we would a normal function:
/** * Completable equivalent in coroutines */ fun main() = runBlocking { try { doRequest() println("I'm done") } catch (e: Exception) { println("Got an exception!") } } suspend fun doRequest() { // process a request delay(500) }
Don’t use
runBlocking
in the production code, you should have a properCoroutineScope
. However, we will use it throughout this article to illustrate test scenarios.
Also, I will not discuss error handling in coroutines in this article, since it is a different topic. But keep in mind that you don’t necessarily need to wrap all your suspend functions in
try/catch
to handle exceptions.
Asynchronous tasks that must return a value or throw an error
In RxJava we use a Single
. A SingleEmitter
has a method onSuccess
to pass a return value and a method onError
to pass an exception.
/** * Single in RxJava */ fun main() { singleResult() .subscribe( { result -> println(result) }, { println("Got an exception") } ) } fun singleResult(): Single<String> { return Single.create { emitter -> try { // process a request emitter.onSuccess("Some result") } catch (e: Exception) { emitter.onError(e) } } }
In coroutines, we call a suspend function that returns a non-nullable value:
/** * Single equivalent in coroutines */ fun main() = runBlocking { try { val result = getResult() println(result) } catch (e: Exception) { println("Got an exception") } } suspend fun getResult(): String { // process a request delay(100) return "Some result" }
Asynchronous tasks that might return a result or throw an error
In RxJava we use a Maybe
. A MaybeEmitter
has a method onSuccess
to pass a return value, a method onComplete
to signal a completion without a value, and a method onError
to pass an exception.
/** * Maybe in RxJava */ fun main() { maybeResult() .subscribe( { result -> println(result) }, { println("Got an exception") }, { println("Completed without a value!") } ) } fun maybeResult(): Maybe<String> { return Maybe.create { emitter -> try { // process a request if (Random.nextBoolean()) { emitter.onSuccess("Some value") } else { emitter.onComplete() } } catch (e: Exception) { emitter.onError(e) } } }
In coroutines, we call a suspend function that returns a nullable value:
/** * Maybe equivalent in coroutines */ fun main() = runBlocking { try { val result = getNullableResult() if (result != null) { println(result) } else { println("Completed without a value!") } } catch (e: Exception) { println("Got an exception") } } suspend fun getNullableResult(): String? { // process a request delay(100) return if (Random.nextBoolean()) { "Some value" } else { null } }
I am keeping the Kotlin code as Java-like as possible not to complicate things and alienate people new to Kotlin.
In idiomatic Kotlin we could write the code inside the try/catch
block like this:
getNullableResult() ?.let(::println) ?: println("Completed without a value!")
Asynchronous streams of 0..N events
Not to repeat ourselves, for RxJava, we will only look at Flowable
, since Observable
is pretty much the same, only without the backpressure management.
Both FlowableEmitter
and ObservableEmiter
have a method onNext
to emit the next stream value, a method onComplete
to signal completion of the stream, and a method onError
to pass an exception.
/** * Flowable in RxJava */ fun main() { flowableValues() .subscribe( { value -> println(value) }, { println("Got an exception") }, { println("I'm done") } ) } fun flowableValues(): Flowable<Int> { val flowableEmitter = { emitter: FlowableEmitter<Int> -> try { for (i in 1..10) { emitter.onNext(i) } } catch (e: Exception) { emitter.onError(e) } finally { emitter.onComplete() } } return Flowable.create(flowableEmitter, BackpressureStrategy.BUFFER) }
In coroutines, we just create a Flow
. There are many ways to do it, but the most common is using the flow { }
builder function:
/** * Flow in Kotlin */ fun main() = runBlocking { try { eventFlow().collect { value -> println(value) } println("I'm done") } catch (e: Exception) { println("Got an exception") } } fun eventFlow() = flow { for (i in 1..10) { emit(i) } }
In idiomatic Kotlin, one of the ways to create the above flow would be:
fun eventFlow() = (1..10).asFlow()
As you can see, we can easily cover all the primary use cases of RxJava in coroutines. Moreover, the design of coroutines allows us to write typical sequential code using all the standard Kotlin features.
It also removes the need for onComplete
or onError
callbacks. We can catch errors as we would in normal code or set a coroutine exception handler. And, taking into account that when a suspend function completes, the coroutines continues to execute sequentially, we can continue writing our “completion logic” on the next line.
Please note, that this is also true when collecting flows, as we have seen in our example:
eventFlow().collect { value -> println(value) } println("I'm done")
“I’m done” will be printed after all the elements of the Flow
are collected.
Comparing the operators
However, the main strength of RxJava is its operators that allow us to transform and combine streams in multiple ways. There are so many that it is impossible to cover all of them in the scope of one article, so I will only look at the most common ones.
Completable, Single, Maybe
First of all, I need to mention that although Completable
, Single
, and Maybe
streams in RxJava have many of the same operators that Flowable
and Observable
have (Completable
has quite a few less, due to its nature), having any kind of operators in their coroutine counterparts is simply redundant.
Let’s take at a simple .map()
operator on a Single
stream as an example:
/** * Maps Single<String> to * Single<User> synchronously */ fun main() { getUsername() .map { username -> User(username) } .subscribe( { user -> println(user) }, { println("Got an exception") } ) }
This is something you see quite a lot in RxJava – we take a value and transform it into another value.
That said, once again, given the nature of coroutines, we don’t need a special .map()
operator for those kinds of use cases:
/** * In coroutines we can write * normal sequential code */ fun main() = runBlocking { try { val username = getUsername() // suspend fun val user = User(username) println(user) } catch (e: Exception) { println("Got an exception") } }
We can just write normal sequential code using both suspend
and normal functions.
Flow operators vs Rx operators
With that out of the way, let’s take a look at what we can do with our Flow
s and how it compares to the functionality of RxJava.
.map()
First of all, we have a .map()
operator that we have looked at in the example above. The Flow
has the same operator:
/** * Maps Flow<String> to Flow<User> */ fun main() = runBlocking { usernameFlow() .map { username -> User(username) } .collect { user -> println(user) } }
Looking inside the implementation of this .map()
reveals a major advantage Flow
has over RxJava.
After a little simplification for illustration purposes, the basic declaration of .map()
looks like:
fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R> = flow { collect { value -> emit(transform(value)) } }
We just create a new Flow
that collects values from the upstream Flow
and emits those values after applying the transform
function.
And that’s it.
Most of Flow
‘s operators work this way and there is no need to follow strict protocols, as is the case with RxJava. If you open, for example, the code for FlowableMap
you will see why writing custom operators for RxJava requires a very good knowledge of Reactive Streams protocols.
Standard Flow
operators will be enough for most use cases, but if not, writing a custom operator is straightforward and intuitive.
.flatMap()
The next operator we see a lot in RxJava is .flatMap()
. The are many variants of this operator, such as .flatMapSingle()
, .flatMapObservable()
, .flatMapIterable()
, etc.
In RxJava, if we need to make a synchronous transformation on a value, we use .map()
. An asynchronous transformation, however, requires one of the .flatMap()
variants.
On the other hand, Flow
doesn’t require different operators for synchronous and asynchronous code. Lambdas that are passed to Flow
‘s operators are marked with the suspend
modifier, and therefore you can use both suspend and regular functions inside them.
Here is an example in RxJava:
/** * FlatMaps Flowable<String> to * Flowable<User> with an asynchronous * Single<User> call */ fun main() { getUsernames() //Flowable<String> .flatMapSingle { username -> getUserFromNetwork(username) // Single<User> } .subscribe( { user -> println(user) }, { println("Got an exception") } ) }
Translated to Flow
this example doesn’t require a different operator. If we need to transform the value in any way we just use .map()
, regardless of synchronicity:
/** * Flow's .map() operator can be * used for both synchronous * and asynchronous transformations */ fun main() = runBlocking { usernameFlow() .map { username -> getUserFromNetwork(username) // suspend fun } .collect { user -> println(user) } }
That said, there are also a couple of .flatMap()
operators on a Flow
– .flatMapMerge()
, .flatMapConcat()
, and .flatMapLatest()
. They convert values emitted by the upstream flow into new flows and then flatten all the flows into one.
These operators mirror the functionality of .flatMap()
, .concatMap()
, and .switchMap()
when they are called on a Flowable
or an Observable
.
However, keep in mind that at the moment of writing this article, in Kotlin version 1.6, these operators are marked as either experimental or flow preview.
.filter()
With the .filter()
operator we stumble upon one more disadvantage of RxJava.
It takes a predicate (T) -> Boolean
and there is no intuitive way to perform asynchronous filtering. For that you need to get creative and, for example, do something like this:
/** * One of the ways to achieve * asynchronous filtering in * RxJava */ fun main() { getUsernames() // Flowable<String> .flatMapSingle { username -> isCorrectUsername(username) // Single<Boolean> .flatMap { isCorrect -> if (isCorrect) { Single.just(username) } else { Single.never() } } }.subscribe { println(it) } }
In the Flow
API, as you might have guessed, the .filter()
predicate takes a suspend (T) -> Boolean
lambda, which allows us to easily filter streams based on both synchronous and asynchronous calls:
/** * Flow's .filter() can filter both * synchronously and asynchronously */ fun main() = runBlocking { usernameFlow() .filter { username -> isCorrectUsername(username) // suspend fun } .collect { user -> println(user) } }
Backpressure operators
Backpressure is a very complex topic that deserves a separate article. Therefore, this article will only look at the basics and compare the solutions to the most common backpressure situations.
Given the differences in the implementation of RxJava and the coroutines, the actual outputs will not be the same in every example. The goal of these examples is to illustrate different strategies that deal with backpressure.
.onBackpressureDrop()
This strategy discards all the emitted items if the consumer cannot keep up and the buffer is full.
Here is an example from RxJava:
/** * An example of .onBackpressureDrop() in RxJava */ fun main() { Flowable.range(1, 1_000_000) .onBackpressureDrop() // change the scheduler and overwrite the default buffer size .observeOn(Schedulers.single(), false, 1) .subscribe { value -> Thread.sleep(100) println("Got value: $value") } Thread.sleep(1000) } Output: Got value: 1
All the values after 1
are dropped since the consumer cannot process them.
There are a couple of things to keep in mind here.
First of all, if we hadn’t changed the Scheduler
, we would get all the items since this code would run synchronously, and the consumer would block the producer.
Similarly, by default a Flow
is running in the same coroutine, it will sequentially alternate between the emission and the collection until the Flow
is done. The difference, however, is that coroutines are non-blocking.
Also, keep in mind, that .observeOn()
uses the default buffer size of 128
, therefore we are specifying our own size of 1
. With the default buffer size we would get the first 128
items if the main()
would run long enough.
The equivalent strategy using Flow
looks like this:
/** * An example of .onBackpressureDrop() in Flow */ fun main() = runBlocking { (1..1_000_000).asFlow() .buffer(capacity = 1, onBufferOverflow = BufferOverflow.DROP_LATEST) .collect { value -> delay(100) println("Got value: $value") } } Output: Got value: 1 Got value: 2
Given the nature of coroutines, the collector processes the first two items, but the idea is the same – all unprocessed items are dropped.
We have discussed that when calling .collect()
on a Flow
, by default, the emission and the collection will run sequentially in the same coroutine. In other words, the collector will suspend the emitter until it is ready to receive more items.
So what is happening here?
For buffering and backpressure handling to work correctly, we need to run the collector in a separate coroutine. That is where the .buffer()
operator comes in. It sends all emitted items through a Channel
to a collector that is running in a separate coroutine.
It also gives us a buffering functionality:
public fun <T> Flow<T>.buffer( capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): Flow<T>
We can specify the capacity
of our buffer and a strategy for handling onBufferOverflow
.
In this example, we have looked at one of those strategies, and now let’s quickly look at the others.
.onBackpressureLatest()
The idea of this strategy is to only emit the latest items in the case of backpressure. However, there is again a difference in how the solutions work in Rx and coroutines.
First, let’s take a look at the Rx example:
/** * An example of .onBackpressureLatest() in RxJava */ fun main() { Flowable.range(1, 1_000_000) .onBackpressureLatest() .observeOn(Schedulers.single(), false, 2) .subscribe { value -> Thread.sleep(100) println("Got value: $value") } Thread.sleep(1000) } Output: Got value: 1 Got value: 2 Got value: 1000000
To illustrate an important point, we have increased the buffer size to 2
. The buffer, in this case, is filled with the oldest values, and all the subsequent values are dropped, except the last one.
In Flow
, however, the equivalent solution fills the buffer with the latest values:
/** * An example of .onBackpressureLatest() in Flow */ fun main() = runBlocking { (1..1_000_000).asFlow() .buffer(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) .collect { value -> delay(100) println("Got value: $value") } } Output: Got value: 1 Got value: 999999 Got value: 1000000
.onBackpressureBuffer()
With this scenario, we can only draw parallels and achieve an equivalent solution for our use case. Which is – we would like to receive the emitted events, even if the consumer cannot keep up.
RxJava and coroutines go about this problem in different ways.
The RxJava can buffer the items until the consumer is ready to process them (which can lead to OutOfMemoryException
), while the coroutines can suspend the emitter.
The example in RxJava looks like this:
/** * An example of .onBackpressureBuffer() in RxJava */ fun main() { Flowable.range(1, 1_000_000) .onBackpressureBuffer() .observeOn(Schedulers.single()) .subscribe { value -> Thread.sleep(100) println("Got value: $value") } Thread.sleep(1000) } Output: // All the items, while main() is running
One of the ways to go about this in Flow
is this:
/** * An example of .onBackpressureBuffer() in Flow */ fun main() = runBlocking { (1..1_000_000).asFlow() .buffer(capacity = 0, onBufferOverflow = BufferOverflow.SUSPEND) .collect { value -> delay(100) println("Got value: $value") } }
You might ask, isn’t this example the same as without any buffer at all, since the Flow
will suspend the emitter by default.
That is a good observation, but keep in mind that when the emitter and collector are running in separate coroutines, they can run in parallel, which is faster if both of them take some time to complete.
Final thoughts on buffering and backpressure
The RxJava library has a lot more functionality for handling backpressure, which we won’t cover in this article. It can be good in terms of flexibility and choice but also bad in terms of the learning curve – to use these operators correctly is not a trivial task for most developers.
In my opinion, Kotlin’s Flow
is much more beginner-friendly and easy to use while providing solutions for most problems you might encounter.
Other operators
By this time you should have a good understanding of the philosophy behind Flow
operators. To avoid bloating this article, I will just mention that some of the most popular Rx operators: .take()
, .retry()
, .doOnEach()
, .doOnSubscribe()
, .debounce()
, .zip()
, .reduce()
, .count()
, .onErrorReturn()
– all have their counterparts in the Flow
API, as well as many others.
Subjects / Hot streams
Now I would like to take a look at Subject
s from RxJava and how we can replace them using Kotlin’s Flow
. This article is getting lengthy, however, so we will keep it short.
Subject
s are hot versions of Observable
types – they do not need subscribers to start emitting items.
The most popular Subject
s in RxJava are:
PublishSubject
– aSubject
that emits items to all current subscribers, without caching.BehaviorSubject
– aSubject
that emits the last item and all subsequent items to subscribers.ReplaySubject
– aSubject
that replays (caches) all the previous items and also emits subsequent items to all subscribers.
There are a couple more Subject
s available in RxJava, but in a nutshell, they are hot versions of RxJava types (SingleSubject
, MaybeSubject
, CompletableSubject
), with some added functionality in some cases (AsyncSubject
, UnicastSubject
).
In Kotlin, we have two “hot” versions of the Flow
– StateFlow
and SharedFlow
, however, for most use cases SharedFlow
will be more than enough. If we look at the MutableSharedFlow()
function that creates an instance of MutableSharedFlow<T>
, which we would use as a backing property for a publicly exposed SharedFlow
, it reads:
public fun<T>MutableSharedFlow( replay: Int = 0, extraBufferCapacity: Int = 0, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): MutableSharedFlow<T>
The replay
parameter is what makes SharedFlow
very flexible. We can specify it as 0
to make our flow behave like a PublishSubject
, 1
to replay only the last item like a BehaviorSubject
, and as many as we like to basically become a ReplaySubject
.
SharedFlow
withreplay
value set to1
behaves just like aStateFlow
, which also can be used to replaceBehaviorSubject
use cases. The main difference is that aStateFlow
requires an initial value, while aSharedFlow
doesn’t.
Apart from that, SharedFlow
has the extraBufferCapacity
, which allows buffering items in addition to replay, and the onBufferOverflow
strategy, which we have already discussed earlier.
Other considerations
Both RxJava and coroutines are enormous topics, and we can go on forever comparing their different aspects. Going on forever, however, is not my goal for this article. Therefore, let’s briefly discuss some final considerations before wrapping it up.
These are some points that I feel should be mentioned, but don’t merit a dedicated section.
- Both approaches provide easy ways to switch between threads;
- Starting from RxJava2 you cannot send
null
values through your streams, withFlow
, however, this is not an issue; kotlinx.coroutines
library provides interops from and to RxJava types for easy migrations: for RxJava2, for RxJava3;- According to the Reactive Scrabble benchmark,
Flow
performs noticeably faster than RxJava.
Conclusion
Before Kotlin got widespread acceptance, I had very extensively used RxJava as the primary asynchronous solution in all my Java projects. So even after I had switched to Kotlin, I was still using Rx for quite some time.
In the beginning, it seemed a little tricky to switch from writing asynchronous code in a functional style of Rx to writing it imperatively, as you would most of your other code. In addition, I needed some time to wrap my head around the idea of coroutines.
However, at some point, I committed to using coroutines for all my asynchronous needs. Then, of course, there were some growing pains, primarily because of my unwillingness to spend a couple of days dissecting documentation. Still, eventually, I got to the point where I was actively migrating all RxJava code to coroutines without looking back.
During that process, I had never encountered a use case where coroutines weren’t enough. Moreover, almost every single time, code had become more readable and easier to use.
Before sitting down to write this article, I haven’t thought extensively about RxJava for years. It even made me feel a little nostalgic. RxJava is a great library, and I enjoyed using it. That said, I am confident that, as long as I am using Kotlin, I will never use RxJava in production again.
It is not a statement against RxJava.
It is a statement of how great the coroutines are.
That is my experience and my opinion. I understand that things might be different for some projects, in which case I would love to hear about it. If your experience differs – don’t hesitate to share it.
For now, however, I will take my leave and play with coroutines a little more.
There is still so much to learn.
Talk to you next time,
Your friend,
Max