Kotlin Coroutine Flow:SharedFlow 和 StateFlow

Photo by Hans-Jurgen Mager on Unsplash
Photo by Hans-Jurgen Mager on Unsplash
Kotlin coroutine flow 包含 cold flow 和 hot flow 兩種。而,SharedFlow 和 StateFlow 是其中兩種 hot flow。它們可以 broadcast 值給數個接收者,所以可以被用來實作 publisher-subscriber 模式。

Kotlin coroutine flow 包含 cold flow 和 hot flow 兩種。而,SharedFlowStateFlow 是其中兩種 hot flow。它們可以 broadcast 值給數個接收者,所以可以被用來實作 publisher-subscriber 模式

完整程式碼可以在 下載。

Flow

SharedFlow 和 StateFlow 都是 hot flow,而 flow 又是 Kotlin coroutine 的其中一種功能。所以在開始閱讀本文章之前,必須要了解 Kotlin coroutine 和 flow。如果你不熟悉 Kotlin coroutine 和 flow 的話,可以先閱讀下面的文章。

SharedFlow & MutableSharedFlow

Publisher-Subscriber 模式

SharedFlow 可以 broadcast 值給多個接收者。發送值的那一方稱之為 publisher,接收值的那一方則稱之為 subscriber。所以我們可以用 SharedFlow 來取代 RxJava 的 PublishSubject

Publisher 呼叫 emit() 來發送值,而 subscriber 呼叫 onEach() 來接收值。另外,我們不需要呼叫 collect() 來 terminate 目前的 flow,因為 SharedFlow 永遠不會 complete。不過,subscriber 可以呼叫 cancel() 來取消接收值。

Relay Cache

SharedFlow 會將最近發送的值存到 relay cache。如,relay cache 的大小是 3 的話,那 relay cache 會存儲最近 3 個已經發送過的值。新的 subscriber 一開始就會先接收到 relay cache 裡面所有的值,所以才會稱為 relay。

範例:Relay Cache = 0

以下是一個 relay cache 為 0 的範例。在範例中,我們宣告的不是 SharedFlow,而是 MutableSharedFlow。這是因為 SharedFlow 是 readonly,只有 MutableSharedFlow 才可以呼叫 emit() 來發送值。

在範例中,我們會先發送 -3、-2、-1,然後在建立兩個 subscriber。

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val sharedFlow = MutableSharedFlow<Int>()

    sharedFlow.emit(-3)
    sharedFlow.emit(-2)
    sharedFlow.emit(-1)

    val subscriber1 = sharedFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscriber2 = sharedFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val publisher2 = scope.launch {
        repeat(100) {
            println("Emitting $it (${currentCoroutineContext()})")
            sharedFlow.emit(it)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscriber1")
    readln()
    subscriber1.cancel()

    println("Enter any key to cancel subscriber2")
    readln()
    subscriber2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    publisher2.cancel()
}

此範例會產生以下的輸出。我們可以看到 subscriber1subscriber2 並沒有收到在它們 subscribe 之前就發送出來的值,因為 relay cache 設為 0。

Enter any key to cancel subscriber1
Emitting 0 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])

Enter any key to cancel subscriber2
Emitting 2 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 3 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])

範例:Relay Cache = 2

以下的範例是 relay cache 為 2。

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)

    sharedFlow.emit(-3)
    sharedFlow.emit(-2)
    sharedFlow.emit(-1)

    val subscribe1 = sharedFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscribe2 = sharedFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val publisher1 = scope.launch {
        repeat(100) {
            println("Emitting $it (${currentCoroutineContext()})")
            sharedFlow.emit(it)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscribe1")
    readln()
    subscribe1.cancel()

    println("Enter any key to cancel subscribe2")
    readln()
    subscribe2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    publisher1.cancel()
}

從以下的輸出中,我們可以看到,subscriber1subscriber2 有接收到 -2 和 -1,但是沒有接收到 -3。這是因為 relay cache 為 2。

Enter any key to cancel subscribe1
Emitting 0 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got -2 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got -1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])

Enter any key to cancel subscribe2
Emitting 2 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 4 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])

用 SharedFlow 來取代 BroadcastChannel

BroadcastChannel 與 SharedFlow 非常地相似。重要的是 SharedFlow 是被設計出來取代 BroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。

StateFlow & MutableStateFlow

StateFlow 像是一個 replay cache 為 1 的 SharedFlow。而且,在初始化一個 StateFlow 時,就要先給定一個初始值到 replay cache。另外,如果發送的值和目前在 replay cache 裡的值一樣時,subscribes 不會被通知。所以我們用 StateFlow 來取代 RxJava 的的 BehaviorSubject

下面的程式碼顯示,如何用 SharedFlow 來製作出和 StateFlow 一樣的行為。

val sharedFlow = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue)
val stateFlow = sharedFlow.distinctUntilChanged()

讓我們來看一下要如何使用 StateFlow。

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val stateFlow = MutableStateFlow(-100)

    val subscriber1 = stateFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscriber2 = stateFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val job = scope.launch {
        delay(1000)
        repeat(100) {
            val value = it / 2
            println("Emitting $value (${currentCoroutineContext()})")
            stateFlow.emit(value)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscriber1")
    readln()
    subscriber1.cancel()

    println("Enter any key to cancel subscriber2")
    readln()
    subscriber2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    job.cancel()
}

從以下的輸出可以看到,只有當發送的值和目前在 StateFlow 裡面的值不一樣的時候,subscribers 才會被通知。

Enter any key to cancel subscriber1
#2 got -100 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
#1 got -100 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 2 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 3 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 3 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])

Enter any key to cancel subscriber2
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#2 got 4 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 5 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])

用 StateFlow 來取代 ConflatedBroadcastChannel

ConflatedBroadcastChannel 與 StateFlow 非常地相似。重要的是 StateFlow 是被設計出來取代 ConflatedBroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。

結語

在寫程式的時候,我們有時會需要監聽某個狀態。SharedFlow 和 StateFlow 提供了一個比 BroadcastChannel 和 ConflatedBroadcastChannel 更為簡單的方式,讓我們可以使用 publisher-subscriber pattern。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

You May Also Like
Photo by Gemma Evans on Unsplash
Read More

Android:Hilt 依賴注入

Hilt 是基於 Dagger 且設計在 Android 上使用的 dependency injection library。所以在開發 Android 時,使用 Hilt 會比使用 Dagger 更加地方便。本文章將介紹如何使用 Hilt。
Read More