Swift Combine 是 Apple 用來實現 reactive programming 的函式庫。在 Combine 還沒出來之前,我們一般是使用 RxSwift。不過,使用 Combine 的話,我們就不需要再引入額外的函式庫。而且與 RxSwift 相比,Combine 的效能更好。
Table of Contents
Publisher & Subscriber
在 Swift Combine 中,有兩個很重要的觀念,就是 publishers 和 subscribers。Publishers 是用來發送資料,而 subscribers 是用來訂閱某個 publisher。也就是說,subscriber 是用來從 publisher 中接收資料。這樣就會產生出一個 stream。
protocol Publisher<Output, Failure> protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible
以下範例中,我們建立一個 TimerPublisher。它每一秒會發送當前的時間。然後,我們呼叫 sink(receiveValue:) 來接收 TimerPublisher 發送的資料。sink(receiveValue:) 會建立一個 subscriber 來訂閱當前的 publisher。
import UIKit import Combine class ViewController: UIViewController { @IBOutlet weak var timeLabel: UILabel! private var cancellable: AnyCancellable? override func viewDidLoad() { super.viewDidLoad() cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common) .autoconnect() .sink { date in timeLabel.text = date.description } } }
如果我們是想要將接收到的資料,直接 assign 給某個變數,那麼我們可以使用 assign(to:on:) 來將收到的資料直接 assign 給某個變數,如下範例。
import UIKit import Combine class ViewController: UIViewController { @IBOutlet weak var timeLabel: UILabel! private var cancellable: AnyCancellable? override func viewDidLoad() { super.viewDidLoad() cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common) .autoconnect() .map { $0.description } .assign(to: \.text, on: timeLabel) } }
@Published
@Published 可以將一個 property 變成 publisher。只要 assign 值給它,它就會將值發送出去。當存取 property 的 publisher 時,要在前面加上 $
operator。
import UIKit import Combine class ViewController: UIViewController { @IBOutlet weak var timeLabel: UILabel! @IBAction func clickButton(_ sender: Any) { text = Date().description } private var cancellable: AnyCancellable? @Published private var text: String? = "" override func viewDidLoad() { super.viewDidLoad() cancellable = $text.assign(to: \.text, on: timeLabel) } }
Publishers
Future
Future publisher 最終會產生一個值,然後結束。它是用來將非同步的程式碼包裝成一個 publisher。
func login() -> Future<String, Never> { Future { promise in DispatchQueue.global().async { // do login ... let token = result.token promise(Result.success(token)) } } } let cancellable = login() .sink { token in print(token) }
Just
Just publisher 會發送一個值,然後結束。它常被用來開始一連串的 publisher。
let cancellable = Just(1) .sink(receiveCompletion: { completion in print(completion) }, receiveValue: { value in print(value) }) // Output: // 1 // finished
Record
Record publisher 和 Just 很像,但是 Record 可以發送一系列的值。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .sink(receiveValue: { value in print(value) }) // Output: // 1 // 2 // 3 // 4
Deferred
Deferred publisher 利用傳入的 closure 建立 publisher。但是,它要被訂閱後,才會建立 publisher。
func login() -> Deferred<Future<String, Never>> { Deferred { Future { promise in DispatchQueue.global().async { // do login ... let token = result.token promise(Result.success(token)) } } } }
Subjects
Subject 是一個 Publisher,此外還提供 Subject.send() 讓 callers 發送資料。
PassthroughSubject
PassthroughSubject 可以 broadcast 資料至所有的 subscribers。而且,subscribers 只會接收到它們 subscribe 之後的資料。
let subject = PassthroughSubject<String, Error>() subject.send("1") subject.send("2") let cancellable = subject .sink(receiveCompletion: { completion in print(completion) }, receiveValue: { value in print(value) }) subject.send("3") subject.send("4") subject.send(completion: .finished) // Output: // 3 // 4 // finished
CurrentValueSubject
如同 PassthroughSubject,CurrentValueSubject 可以 broadcast 資料至所有的 subscribers。不過 CurrentValueSubject 會維護一個 buffer 來儲存最後一次發送的值。所以,在初始化時,必須要傳入一個預設值給 buffer。Subscribers 在 subscribe 之後,他們只會收到 buffer 裡的值和之後發送的資料。
let subject = CurrentValueSubject<String, Error>("0") subject.send("1") subject.send("2") let cancellable = subject .sink(receiveCompletion: { _ in print("complete") }, receiveValue: { value in print(value) }) subject.send("3") subject.send("4") subject.send(completion: .finished) // Output: // 2 // 3 // 4 // complete
Specifying Scheduler
如果我們不指定 scheduler 時,Combine 會用和發送資料時一樣的 thread 來接收資料。也就是說,你在 background thread 裡發送資料,那 sink() 裡接收資料的 closure 就會在 background thread 裡被呼叫。
// Main thread let subject = PassthroughSubject<Int, Never>() let cancellable = subject.sink(receiveValue: { value in print("\(value) sent from \(Thread.current)") }) subject.send(1) DispatchQueue.global().async { subject.send(2) } // Output: // 1 sent from <_NSMainThread: 0x6000011043c0>{number = 1, name = main} // 2 sent from <NSThread: 0x600001134000>{number = 4, name = (null)}
receive(on:)
當你用 receive(on:) 指定一個 scheduler 後,在它之後的 operators 都會在那個 scheduler 裡接收資料。
// Main thread let subject = PassthroughSubject<Int, Never>() let cancellable = subject .map { print("map1: \($0) sent from \(Thread.current)") return $0 } .receive(on: DispatchQueue.global()) .map { print("map2: \($0) sent from \(Thread.current)") return $0 } .sink(receiveValue: { value in print("sink: \(value) sent from \(Thread.current)") }) subject.send(1) // Output: // map1: 1 sent from <_NSMainThread: 0x600003d343c0>{number = 1, name = main} // map2: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)} // sink: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}
subscribe(on:)
當你用 subscribe(on:options:) 指定一個 scheduler 後,所有的 operators 都會在那個 scheduler 裡接收資料,直到有一個 receive(on:) 指定其他的 scheduler。
// Main thread let subject = PassthroughSubject<Int, Never>() let cancellable = subject .subscribe(on: DispatchQueue.global()) .map { print("map1: \($0) sent from \(Thread.current)") return $0 } .receive(on: RunLoop.main) .map { print("map2: \($0) sent from \(Thread.current)") return $0 } .sink(receiveValue: { value in print("sink: \(value) sent from \(Thread.current)") }) subject.send(1) // Output: // map1: 1 sent from <NSThread: 0x600000a19100>{number = 4, name = (null)} // map2: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main} // sink: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}
Operators
以下將介紹一些常用的 operators。Publishers 還有很多其他 operators,請參照官網。
map() & tryMap()
func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> func tryMap<T>(_ transform: @escaping (Self.Output) throws -> T) -> Publishers.TryMap<Self, T>
map() 和 tryMap() 用 transform
對 upstream publisher 的每一個值做轉換,回傳一個新值到 downstream publisher。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .map { "Hello \($0)" } .sink(receiveValue: { value in print(value) }) // Output: // Hello 1 // Hello 2 // Hello 3 // Hello 4
filter() & tryFilter()
func filter(_ isIncluded: @escaping (Self.Output) -> Bool) -> Publishers.Filter<Self> func tryFilter(_ isIncluded: @escaping (Self.Output) throws -> Bool) -> Publishers.TryFilter<Self>
filter() 和 tryFilter() 會過濾從 upstream 進來的值。只有 inIncluded
回傳 true 時,才會重新發送到 downstream。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .filter { $0 > 2 } .sink(receiveValue: { value in print(value) }) // Output: // 3 // 4
compactMap() & tryCompactMap()
func compactMap<T>(_ transform: @escaping (Self.Output) -> T?) -> Publishers.CompactMap<Self, T> https://developer.apple.com/documentation/combine/publisher/trycompactmap(_:)
compactMap() 和 tryCompactMap() 會從 upstream 接收每一個值,並且回傳一個 optional 值。只有當回傳的 optional 值不是 nil 時,才會被發送到 downstream。
let cancellable = Record(output: [1, 2, nil, 4], completion: .finished) .compactMap { $0 } .sink(receiveValue: { value in print(value) }) // Output: // 1 // 2 // 4
reduce() & tryReduce()
func reduce<T>( _ initialResult: T, _ nextPartialResult: @escaping (T, Self.Output) -> T ) -> Publishers.Reduce<Self, T> func tryReduce<T>( _ initialResult: T, _ nextPartialResult: @escaping (T, Self.Output) throws -> T ) -> Publishers.TryReduce<Self, T>
reduce() 和 tryReduce() 有兩個參數。第二個參數 nextPartialResult
的第一個參數 accumulator
,一開始會是 initialResult
的值。然候,nextPartialResult
會回傳一個值。等下一個值從 upstream 進來時,accumulator
會是上一次 nextPartialResult
回傳的值。
以下範例顯示如何用 reduce() 來計算 1 到 4 的和。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .reduce(0) { accumulator, value in print("acc=\(accumulator), value=\(value)") return accumulator + value } .sink(receiveValue: { value in print(value) }) // Output // acc=0, value=1 // acc=1, value=2 // acc=3, value=3 // acc=6, value=4 // 10
merge()
func merge(with other: Self) -> Publishers.MergeMany<Self>
merge() 可以合併另外一個相同型態的 publisher。值會依照發送的時間順序送到 downstream。
let subject1 = PassthroughSubject<Int, Never>() let subject2 = PassthroughSubject<Int, Never>() let cancellable = subject1 .merge(with: subject2) .sink(receiveValue: { value in print(value) }) subject1.send(1) subject2.send(11) subject1.send(2) subject1.send(3) subject1.send(12) subject1.send(4) // Output: // 1 // 11 // 2 // 3 // 12 // 4
zip()
func zip<P>(_ other: P) -> Publishers.Zip<Self, P> where P : Publisher, Self.Failure == P.Failure
zip() 會結合另外一個 publisher 發送出來的值,以 tuple 將一對值發送到 downstream。
let _ = Record(output: [1, 2, 3, 4], completion: .finished) .zip(Record(output: [5, 6, 7, 8], completion: .finished)) .sink(receiveValue: { value in print(value) }) // Output: // (1, 5) // (2, 6) // (3, 7) // (4, 8)
flatMap()
func flatMap<T, P>( maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P ) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure
和 map() 一樣,flatMap() 用 transform
對 upstream publisher 的每一個值做轉換,但是它回傳一個新的 publisher。
let _ = Just([1, 2, 3, 4]) .flatMap { array in Just(array.map { $0 * 10 }) } .sink(receiveValue: { value in print(value) }) // Output: // [10, 20, 30, 40]
結語
Swift Combine 可以讓我們更優雅地處理非同步的程式碼。但是,在 Swift async/await 出現之後,我們可能會比較偏向使用 async/await 來實作非同步的程式碼,而不是使用 Combine。但是,我們依然會利用 Combine 來實作 Observer pattern。這在 SwiftUI 中被大量地使用。
參考
- Combine, Apple Developer.
- Using Combine.
- RunLoop.main vs DispatchQueue.main: The differences explained, SwiftLee.