Kotlin coroutine flow includes cold flow and hot flow. SharedFlow and StateFlow are two of the hot flows. They can broadcast values to several receivers, so can be used to implement the publisher-subscriber pattern.
The complete code for this chapter can be found in .
Table of Contents
Both SharedFlow and StateFlow are hot flows, and flow is one of the functions of Kotlin coroutines. So before you start reading this article, you must understand Kotlin coroutines and flow. If you are not familiar with Kotlin coroutines and flow, you can read the following articles first.
SharedFlow & MutableSharedFlow
Publisher-Subscriber Pattern
SharedFlow can broadcast values to multiple receivers. The one that sends values is called publisher, and the one that receives values is called subscriber. So we can use SharedFlow to replace RxJava’s PublishSubject .
Publisher calls emit()
to send values, and subscribers call onEach()
to receive values. Also, we don’t need to call collect()
to terminate the current flow because the SharedFlow will never complete. However, subscribers can call cancel()
to cancel receiving values.
Relay Cache
SharedFlow will store the most recently sent value in the relay cache. For example, if the size of the relay cache is 3, the relay cache will store the last 3 values that have been sent. The new subscriber will first receive all the values in the relay cache, so it is called relay.
Example with Relay Cache 0
The following demonstrate SharedFlow with relay cache 0. In the example, we are not declaring SharedFlow, but MutableSharedFlow. This is because SharedFlow is readonly and only MutableSharedFlow can call emit()
to send values.
In the example, we will first send -3, -2, -1, and then create two subscribers.
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) suspend fun main(args: Array<String>) { val sharedFlow = MutableSharedFlow<Int>() sharedFlow.emit(-3) sharedFlow.emit(-2) sharedFlow.emit(-1) val subscriber1 = sharedFlow .onEach { println("#1 got $it (${currentCoroutineContext()})") } .launchIn(scope) val subscriber2 = sharedFlow .onEach { println("#2 got $it (${currentCoroutineContext()})") } .launchIn(scope) val publisher2 = scope.launch { repeat(100) { println("Emitting $it (${currentCoroutineContext()})") sharedFlow.emit(it) delay(1000) } } println("Enter any key to cancel subscriber1") readln() subscriber1.cancel() println("Enter any key to cancel subscriber2") readln() subscriber2.cancel() println("Enter any key to stop emitting values") readln() publisher2.cancel() }
This example outputs the following. We can see that subscriber1
and subscriber2
did not receive the values sent before they subscribed because the relay cache is set to 0.
Enter any key to cancel subscriber1 Emitting 0 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO]) #1 got 0 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO]) #2 got 0 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO]) #1 got 1 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO]) #2 got 1 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO]) Enter any key to cancel subscriber2 Emitting 2 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO]) #2 got 2 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO]) Enter any key to stop emitting values Emitting 3 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
Example with Relay Cache 2
The following example shows SharedFlow with relay cache 2.
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) suspend fun main(args: Array<String>) { val sharedFlow = MutableSharedFlow<Int>(replay = 2) sharedFlow.emit(-3) sharedFlow.emit(-2) sharedFlow.emit(-1) val subscribe1 = sharedFlow .onEach { println("#1 got $it (${currentCoroutineContext()})") } .launchIn(scope) val subscribe2 = sharedFlow .onEach { println("#2 got $it (${currentCoroutineContext()})") } .launchIn(scope) val publisher1 = scope.launch { repeat(100) { println("Emitting $it (${currentCoroutineContext()})") sharedFlow.emit(it) delay(1000) } } println("Enter any key to cancel subscribe1") readln() subscribe1.cancel() println("Enter any key to cancel subscribe2") readln() subscribe2.cancel() println("Enter any key to stop emitting values") readln() publisher1.cancel() }
From the output below, we can see that subscriber1
and subscriber2
received -2 and -1, but not -3. This is because the relay cache is 2.
Enter any key to cancel subscribe1 Emitting 0 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO]) #1 got -2 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got -2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) #1 got -1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got -1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) #1 got 0 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got 0 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO]) #1 got 1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got 1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) Enter any key to cancel subscribe2 Emitting 2 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO]) #2 got 2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) Enter any key to stop emitting values Emitting 4 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
BroadcastChannel is very similar to SharedFlow. The important thing is that SharedFlow is designed to replace BroadcastChannel. If you want to know the difference between them, you can find it here .
StateFlow & MutableStateFlow
StateFlow is like SharedFlow with replay cache of 1. Moreover, when initializing a StateFlow, it is necessary to give an initial value to the replay cache. Also, subscribes will not be notified if the sent value is the same as the value currently in the replay cache. So we can use StateFlow to replace RxJava’s BehaviorSubject .
The following code shows how to use SharedFlow to make the same behavior as StateFlow.
val sharedFlow = MutableSharedFlow( replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST ) shared.tryEmit(initialValue) val stateFlow = sharedFlow.distinctUntilChanged()
Let’s see how to use StateFlow.
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) suspend fun main(args: Array<String>) { val stateFlow = MutableStateFlow(-100) val subscriber1 = stateFlow .onEach { println("#1 got $it (${currentCoroutineContext()})") } .launchIn(scope) val subscriber2 = stateFlow .onEach { println("#2 got $it (${currentCoroutineContext()})") } .launchIn(scope) val job = scope.launch { delay(1000) repeat(100) { val value = it / 2 println("Emitting $value (${currentCoroutineContext()})") stateFlow.emit(value) delay(1000) } } println("Enter any key to cancel subscriber1") readln() subscriber1.cancel() println("Enter any key to cancel subscriber2") readln() subscriber2.cancel() println("Enter any key to stop emitting values") readln() job.cancel() }
As you can see from the output below, subscribers will only be notified if the sent value is different from the value currently in the StateFlow.
Enter any key to cancel subscriber1 #2 got -100 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) #1 got -100 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 0 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 0 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 1 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 1 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 2 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 2 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 3 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 3 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Enter any key to cancel subscriber2 Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #2 got 4 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Enter any key to stop emitting values Emitting 5 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Replacing ConflatedBroadcastChannel with StateFlow
ConflatedBroadcastChannel is very similar to StateFlow. The important thing is that StateFlow was designed to replace ConflatedBroadcastChannel. If you want to know the difference between them, you can find it here .
When writing programs, we sometimes need to monitor a certain state. SharedFlow and StateFlow provide a simpler way than BroadcastChannel and ConflatedBroadcastChannel to use publisher-subscriber pattern.