Kotlin coroutine flow 包含 cold flow 和 hot flow 兩種。而,SharedFlow 和 StateFlow 是其中兩種 hot flow。它們可以 broadcast 值給數個接收者,所以可以被用來實作 publisher-subscriber 模式。
Table of Contents
Flow
SharedFlow 和 StateFlow 都是 hot flow,而 flow 又是 Kotlin coroutine 的其中一種功能。所以在開始閱讀本文章之前,必須要了解 Kotlin coroutine 和 flow。如果你不熟悉 Kotlin coroutine 和 flow 的話,可以先閱讀下面的文章。
SharedFlow & MutableSharedFlow
Publisher-Subscriber 模式
SharedFlow 可以 broadcast 值給多個接收者。發送值的那一方稱之為 publisher,接收值的那一方則稱之為 subscriber。所以我們可以用 SharedFlow 來取代 RxJava 的 PublishSubject。
Publisher 呼叫 emit()
來發送值,而 subscriber 呼叫 onEach()
來接收值。另外,我們不需要呼叫 collect()
來 terminate 目前的 flow,因為 SharedFlow 永遠不會 complete。不過,subscriber 可以呼叫 cancel()
來取消接收值。
Relay Cache
SharedFlow 會將最近發送的值存到 relay cache。如,relay cache 的大小是 3 的話,那 relay cache 會存儲最近 3 個已經發送過的值。新的 subscriber 一開始就會先接收到 relay cache 裡面所有的值,所以才會稱為 relay。
範例:Relay Cache = 0
以下是一個 relay cache 為 0 的範例。在範例中,我們宣告的不是 SharedFlow,而是 MutableSharedFlow。這是因為 SharedFlow 是 readonly,只有 MutableSharedFlow 才可以呼叫 emit()
來發送值。
在範例中,我們會先發送 -3、-2、-1,然後在建立兩個 subscriber。
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) suspend fun main(args: Array<String>) { val sharedFlow = MutableSharedFlow<Int>() sharedFlow.emit(-3) sharedFlow.emit(-2) sharedFlow.emit(-1) val subscriber1 = sharedFlow .onEach { println("#1 got $it (${currentCoroutineContext()})") } .launchIn(scope) val subscriber2 = sharedFlow .onEach { println("#2 got $it (${currentCoroutineContext()})") } .launchIn(scope) val publisher2 = scope.launch { repeat(100) { println("Emitting $it (${currentCoroutineContext()})") sharedFlow.emit(it) delay(1000) } } println("Enter any key to cancel subscriber1") readln() subscriber1.cancel() println("Enter any key to cancel subscriber2") readln() subscriber2.cancel() println("Enter any key to stop emitting values") readln() publisher2.cancel() }
此範例會產生以下的輸出。我們可以看到 subscriber1
和 subscriber2
並沒有收到在它們 subscribe 之前就發送出來的值,因為 relay cache 設為 0。
Enter any key to cancel subscriber1 Emitting 0 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO]) #1 got 0 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO]) #2 got 0 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO]) #1 got 1 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO]) #2 got 1 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO]) Enter any key to cancel subscriber2 Emitting 2 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO]) #2 got 2 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO]) Enter any key to stop emitting values Emitting 3 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
範例:Relay Cache = 2
以下的範例是 relay cache 為 2。
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) suspend fun main(args: Array<String>) { val sharedFlow = MutableSharedFlow<Int>(replay = 2) sharedFlow.emit(-3) sharedFlow.emit(-2) sharedFlow.emit(-1) val subscribe1 = sharedFlow .onEach { println("#1 got $it (${currentCoroutineContext()})") } .launchIn(scope) val subscribe2 = sharedFlow .onEach { println("#2 got $it (${currentCoroutineContext()})") } .launchIn(scope) val publisher1 = scope.launch { repeat(100) { println("Emitting $it (${currentCoroutineContext()})") sharedFlow.emit(it) delay(1000) } } println("Enter any key to cancel subscribe1") readln() subscribe1.cancel() println("Enter any key to cancel subscribe2") readln() subscribe2.cancel() println("Enter any key to stop emitting values") readln() publisher1.cancel() }
從以下的輸出中,我們可以看到,subscriber1
和 subscriber2
有接收到 -2 和 -1,但是沒有接收到 -3。這是因為 relay cache 為 2。
Enter any key to cancel subscribe1 Emitting 0 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO]) #1 got -2 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got -2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) #1 got -1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got -1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) #1 got 0 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got 0 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO]) #1 got 1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO]) #2 got 1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) Enter any key to cancel subscribe2 Emitting 2 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO]) #2 got 2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO]) Enter any key to stop emitting values Emitting 4 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
用 SharedFlow 來取代 BroadcastChannel
BroadcastChannel 與 SharedFlow 非常地相似。重要的是 SharedFlow 是被設計出來取代 BroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。
StateFlow & MutableStateFlow
StateFlow 像是一個 replay cache 為 1 的 SharedFlow。而且,在初始化一個 StateFlow 時,就要先給定一個初始值到 replay cache。另外,如果發送的值和目前在 replay cache 裡的值一樣時,subscribes 不會被通知。所以我們用 StateFlow 來取代 RxJava 的的 BehaviorSubject。
下面的程式碼顯示,如何用 SharedFlow 來製作出和 StateFlow 一樣的行為。
val sharedFlow = MutableSharedFlow( replay = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST ) shared.tryEmit(initialValue) val stateFlow = sharedFlow.distinctUntilChanged()
讓我們來看一下要如何使用 StateFlow。
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) suspend fun main(args: Array<String>) { val stateFlow = MutableStateFlow(-100) val subscriber1 = stateFlow .onEach { println("#1 got $it (${currentCoroutineContext()})") } .launchIn(scope) val subscriber2 = stateFlow .onEach { println("#2 got $it (${currentCoroutineContext()})") } .launchIn(scope) val job = scope.launch { delay(1000) repeat(100) { val value = it / 2 println("Emitting $value (${currentCoroutineContext()})") stateFlow.emit(value) delay(1000) } } println("Enter any key to cancel subscriber1") readln() subscriber1.cancel() println("Enter any key to cancel subscriber2") readln() subscriber2.cancel() println("Enter any key to stop emitting values") readln() job.cancel() }
從以下的輸出可以看到,只有當發送的值和目前在 StateFlow 裡面的值不一樣的時候,subscribers 才會被通知。
Enter any key to cancel subscriber1 #2 got -100 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) #1 got -100 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 0 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 0 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 1 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 1 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 2 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 2 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #1 got 3 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO]) #2 got 3 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Enter any key to cancel subscriber2 Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) #2 got 4 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO]) Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO]) Enter any key to stop emitting values Emitting 5 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
用 StateFlow 來取代 ConflatedBroadcastChannel
ConflatedBroadcastChannel 與 StateFlow 非常地相似。重要的是 StateFlow 是被設計出來取代 ConflatedBroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。
結語
在寫程式的時候,我們有時會需要監聽某個狀態。SharedFlow 和 StateFlow 提供了一個比 BroadcastChannel 和 ConflatedBroadcastChannel 更為簡單的方式,讓我們可以使用 publisher-subscriber pattern。