Kotlin Coroutine Flow:SharedFlow 和 StateFlow

Photo by Hans-Jurgen Mager on Unsplash
Photo by Hans-Jurgen Mager on Unsplash
Kotlin coroutine flow 包含 cold flow 和 hot flow 兩種。而,SharedFlow 和 StateFlow 是其中兩種 hot flow。它們可以 broadcast 值給數個接收者,所以可以被用來實作 publisher-subscriber 模式。

Kotlin coroutine flow 包含 cold flow 和 hot flow 兩種。而,SharedFlowStateFlow 是其中兩種 hot flow。它們可以 broadcast 值給數個接收者,所以可以被用來實作 publisher-subscriber 模式

完整程式碼可以在 下載。

Flow

SharedFlow 和 StateFlow 都是 hot flow,而 flow 又是 Kotlin coroutine 的其中一種功能。所以在開始閱讀本文章之前,必須要了解 Kotlin coroutine 和 flow。如果你不熟悉 Kotlin coroutine 和 flow 的話,可以先閱讀下面的文章。

SharedFlow & MutableSharedFlow

Publisher-Subscriber 模式

SharedFlow 可以 broadcast 值給多個接收者。發送值的那一方稱之為 publisher,接收值的那一方則稱之為 subscriber。所以我們可以用 SharedFlow 來取代 RxJava 的 PublishSubject

Publisher 呼叫 emit() 來發送值,而 subscriber 呼叫 onEach() 來接收值。另外,我們不需要呼叫 collect() 來 terminate 目前的 flow,因為 SharedFlow 永遠不會 complete。不過,subscriber 可以呼叫 cancel() 來取消接收值。

Relay Cache

SharedFlow 會將最近發送的值存到 relay cache。如,relay cache 的大小是 3 的話,那 relay cache 會存儲最近 3 個已經發送過的值。新的 subscriber 一開始就會先接收到 relay cache 裡面所有的值,所以才會稱為 relay。

範例:Relay Cache = 0

以下是一個 relay cache 為 0 的範例。在範例中,我們宣告的不是 SharedFlow,而是 MutableSharedFlow。這是因為 SharedFlow 是 readonly,只有 MutableSharedFlow 才可以呼叫 emit() 來發送值。

在範例中,我們會先發送 -3、-2、-1,然後在建立兩個 subscriber。

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val sharedFlow = MutableSharedFlow<Int>()

    sharedFlow.emit(-3)
    sharedFlow.emit(-2)
    sharedFlow.emit(-1)

    val subscriber1 = sharedFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscriber2 = sharedFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val publisher2 = scope.launch {
        repeat(100) {
            println("Emitting $it (${currentCoroutineContext()})")
            sharedFlow.emit(it)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscriber1")
    readln()
    subscriber1.cancel()

    println("Enter any key to cancel subscriber2")
    readln()
    subscriber2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    publisher2.cancel()
}

此範例會產生以下的輸出。我們可以看到 subscriber1subscriber2 並沒有收到在它們 subscribe 之前就發送出來的值,因為 relay cache 設為 0。

Enter any key to cancel subscriber1
Emitting 0 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])

Enter any key to cancel subscriber2
Emitting 2 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 3 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])

範例:Relay Cache = 2

以下的範例是 relay cache 為 2。

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)

    sharedFlow.emit(-3)
    sharedFlow.emit(-2)
    sharedFlow.emit(-1)

    val subscribe1 = sharedFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscribe2 = sharedFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val publisher1 = scope.launch {
        repeat(100) {
            println("Emitting $it (${currentCoroutineContext()})")
            sharedFlow.emit(it)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscribe1")
    readln()
    subscribe1.cancel()

    println("Enter any key to cancel subscribe2")
    readln()
    subscribe2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    publisher1.cancel()
}

從以下的輸出中,我們可以看到,subscriber1subscriber2 有接收到 -2 和 -1,但是沒有接收到 -3。這是因為 relay cache 為 2。

Enter any key to cancel subscribe1
Emitting 0 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got -2 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got -1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])

Enter any key to cancel subscribe2
Emitting 2 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 4 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])

用 SharedFlow 來取代 BroadcastChannel

BroadcastChannel 與 SharedFlow 非常地相似。重要的是 SharedFlow 是被設計出來取代 BroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。

StateFlow & MutableStateFlow

StateFlow 像是一個 replay cache 為 1 的 SharedFlow。而且,在初始化一個 StateFlow 時,就要先給定一個初始值到 replay cache。另外,如果發送的值和目前在 replay cache 裡的值一樣時,subscribes 不會被通知。所以我們用 StateFlow 來取代 RxJava 的的 BehaviorSubject

下面的程式碼顯示,如何用 SharedFlow 來製作出和 StateFlow 一樣的行為。

val sharedFlow = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue)
val stateFlow = sharedFlow.distinctUntilChanged()

讓我們來看一下要如何使用 StateFlow。

val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)

suspend fun main(args: Array<String>) {
    val stateFlow = MutableStateFlow(-100)

    val subscriber1 = stateFlow
        .onEach {
            println("#1 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val subscriber2 = stateFlow
        .onEach {
            println("#2 got $it (${currentCoroutineContext()})")
        }
        .launchIn(scope)

    val job = scope.launch {
        delay(1000)
        repeat(100) {
            val value = it / 2
            println("Emitting $value (${currentCoroutineContext()})")
            stateFlow.emit(value)
            delay(1000)
        }
    }

    println("Enter any key to cancel subscriber1")
    readln()
    subscriber1.cancel()

    println("Enter any key to cancel subscriber2")
    readln()
    subscriber2.cancel()

    println("Enter any key to stop emitting values")
    readln()
    job.cancel()
}

從以下的輸出可以看到,只有當發送的值和目前在 StateFlow 裡面的值不一樣的時候,subscribers 才會被通知。

Enter any key to cancel subscriber1
#2 got -100 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
#1 got -100 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 2 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 3 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 3 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])

Enter any key to cancel subscriber2
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#2 got 4 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])

Enter any key to stop emitting values
Emitting 5 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])

用 StateFlow 來取代 ConflatedBroadcastChannel

ConflatedBroadcastChannel 與 StateFlow 非常地相似。重要的是 StateFlow 是被設計出來取代 ConflatedBroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。

結語

在寫程式的時候,我們有時會需要監聽某個狀態。SharedFlow 和 StateFlow 提供了一個比 BroadcastChannel 和 ConflatedBroadcastChannel 更為簡單的方式,讓我們可以使用 publisher-subscriber pattern。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

You May Also Like
Photo by Gemma Evans on Unsplash
Read More

Android:Hilt 依賴注入

Hilt 是基於 Dagger 且設計在 Android 上使用的 dependency injection library。所以在開發 Android 時,使用 Hilt 會比使用 Dagger 更加地方便。本文章將介紹如何使用 Hilt。
Read More
Photo by Amal Abas on Unsplash
Read More

Android Service 教學

Android Service 是 Android 的四個 application components 中的其中一個。它可以在背景處理 longer-running 工作,如播放音樂、下載檔案等。所以 Service 不提供使用者介面。本文章將介紹 Service 基本概念。
Read More