Kotlin Coroutine Flow: SharedFlow & StateFlow

Photo by Hans-Jurgen Mager on Unsplash
Photo by Hans-Jurgen Mager on Unsplash
Kotlin coroutine flow includes cold flow and hot flow. SharedFlow and StateFlow are two of the hot flows. They can broadcast values ​​to several receivers, so can be used to implement the publisher-subscriber pattern.

Kotlin coroutine flow includes cold flow and hot flow. SharedFlow and StateFlow are two of the hot flows. They can broadcast values ​​to several receivers, so can be used to implement the publisher-subscriber pattern.

The complete code for this chapter can be found in .

Flow

Both SharedFlow and StateFlow are hot flows, and flow is one of the functions of Kotlin coroutines. So before you start reading this article, you must understand Kotlin coroutines and flow. If you are not familiar with Kotlin coroutines and flow, you can read the following articles first.

SharedFlow & MutableSharedFlow

Publisher-Subscriber Pattern

SharedFlow can broadcast values ​​to multiple receivers. The one that sends values is called publisher, and the one that receives values is called subscriber. So we can use SharedFlow to replace RxJava’s PublishSubject .

Publisher calls emit() to send values, and subscribers call onEach() to receive values. Also, we don’t need to call collect() to terminate the current flow because the SharedFlow will never complete. However, subscribers can call cancel() to cancel receiving values.

Relay Cache

SharedFlow will store the most recently sent value in the relay cache. For example, if the size of the relay cache is 3, the relay cache will store the last 3 values ​​that have been sent. The new subscriber will first receive all the values ​​in the relay cache, so it is called relay.

Example with Relay Cache 0

The following demonstrate SharedFlow with relay cache 0. In the example, we are not declaring SharedFlow, but MutableSharedFlow. This is because SharedFlow is readonly and only MutableSharedFlow can call emit() to send values.

In the example, we will first send -3, -2, -1, and then create two subscribers.

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val sharedFlow = MutableSharedFlow<Int>()

    sharedFlow.emit(-3)
    sharedFlow.emit(-2)
    sharedFlow.emit(-1)

    val subscriber1 = sharedFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscriber2 = sharedFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val publisher2 = scope.launch {
        repeat(100) {
            println("Emitting $it (${currentCoroutineContext()})")
            sharedFlow.emit(it)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscriber1")
    readln()
    subscriber1.cancel()

    println("Enter any key to cancel subscriber2")
    readln()
    subscriber2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    publisher2.cancel()
}

This example outputs the following. We can see that subscriber1 and subscriber2 did not receive the values sent before they subscribed because the relay cache is set to 0.

Enter any key to cancel subscriber1
Emitting 0 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])

Enter any key to cancel subscriber2
Emitting 2 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 3 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])

Example with Relay Cache 2

The following example shows SharedFlow with relay cache 2.

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)

    sharedFlow.emit(-3)
    sharedFlow.emit(-2)
    sharedFlow.emit(-1)

    val subscribe1 = sharedFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscribe2 = sharedFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val publisher1 = scope.launch {
        repeat(100) {
            println("Emitting $it (${currentCoroutineContext()})")
            sharedFlow.emit(it)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscribe1")
    readln()
    subscribe1.cancel()

    println("Enter any key to cancel subscribe2")
    readln()
    subscribe2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    publisher1.cancel()
}

From the output below, we can see that subscriber1 and subscriber2 received -2 ​​and -1, but not -3. This is because the relay cache is 2.

Enter any key to cancel subscribe1
Emitting 0 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got -2 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got -1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])

Enter any key to cancel subscribe2
Emitting 2 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 4 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])

Replacing BroadcastChannel with SharedFlow

BroadcastChannel is very similar to SharedFlow. The important thing is that SharedFlow is designed to replace BroadcastChannel. If you want to know the difference between them, you can find it here .

StateFlow & MutableStateFlow

StateFlow is like SharedFlow with replay cache of 1. Moreover, when initializing a StateFlow, it is necessary to give an initial value to the replay cache. Also, subscribes will not be notified if the sent value is the same as the value currently in the replay cache. So we can use StateFlow to replace RxJava’s BehaviorSubject .

The following code shows how to use SharedFlow to make the same behavior as StateFlow.

val sharedFlow = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue)
val stateFlow = sharedFlow.distinctUntilChanged()

Let’s see how to use StateFlow.

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val stateFlow = MutableStateFlow(-100)

    val subscriber1 = stateFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscriber2 = stateFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val job = scope.launch {
        delay(1000)
        repeat(100) {
            val value = it / 2
            println("Emitting $value (${currentCoroutineContext()})")
            stateFlow.emit(value)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscriber1")
    readln()
    subscriber1.cancel()

    println("Enter any key to cancel subscriber2")
    readln()
    subscriber2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    job.cancel()
}

As you can see from the output below, subscribers will only be notified if the sent value is different from the value currently in the StateFlow.

Enter any key to cancel subscriber1
#2 got -100 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
#1 got -100 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 2 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 3 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 3 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])

Enter any key to cancel subscriber2
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#2 got 4 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 5 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])

Replacing ConflatedBroadcastChannel with StateFlow

ConflatedBroadcastChannel is very similar to StateFlow. The important thing is that StateFlow was designed to replace ConflatedBroadcastChannel. If you want to know the difference between them, you can find it here .

Conclusion

When writing programs, we sometimes need to monitor a certain state. SharedFlow and StateFlow provide a simpler way than BroadcastChannel and ConflatedBroadcastChannel to use publisher-subscriber pattern.

Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like
Photo by Willian Justen de Vasconcellos on Unsplash
Read More

Android Looper and Handler Tutorial

Looper and Handler are one of the Android core components, and many high-level components are built on top of them. Understanding them helps us understand how some core components work. This article will introduce Looper and Handler and related components.
Read More