Swift Combine 是 Apple 用來實現 reactive programming 的函式庫。在 Combine 還沒出來之前,我們一般是使用 RxSwift。不過,使用 Combine 的話,我們就不需要再引入額外的函式庫。而且與 RxSwift 相比,Combine 的效能更好。
Table of Contents
Publisher & Subscriber
在 Swift Combine 中,有兩個很重要的觀念,就是 publishers 和 subscribers。Publishers 是用來發送資料,而 subscribers 是用來訂閱某個 publisher。也就是說,subscriber 是用來從 publisher 中接收資料。這樣就會產生出一個 stream。
protocol Publisher<Output, Failure> protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible
以下範例中,我們建立一個 TimerPublisher。它每一秒會發送當前的時間。然後,我們呼叫 sink(receiveValue:) 來接收 TimerPublisher 發送的資料。sink(receiveValue:) 會建立一個 subscriber 來訂閱當前的 publisher。
import UIKit
import Combine
class ViewController: UIViewController {
@IBOutlet weak var timeLabel: UILabel!
private var cancellable: AnyCancellable?
override func viewDidLoad() {
super.viewDidLoad()
cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
.autoconnect()
.sink { date in
timeLabel.text = date.description
}
}
}如果我們是想要將接收到的資料,直接 assign 給某個變數,那麼我們可以使用 assign(to:on:) 來將收到的資料直接 assign 給某個變數,如下範例。
import UIKit
import Combine
class ViewController: UIViewController {
@IBOutlet weak var timeLabel: UILabel!
private var cancellable: AnyCancellable?
override func viewDidLoad() {
super.viewDidLoad()
cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
.autoconnect()
.map { $0.description }
.assign(to: \.text, on: timeLabel)
}
}@Published
@Published 可以將一個 property 變成 publisher。只要 assign 值給它,它就會將值發送出去。當存取 property 的 publisher 時,要在前面加上 $ operator。
import UIKit
import Combine
class ViewController: UIViewController {
@IBOutlet weak var timeLabel: UILabel!
@IBAction func clickButton(_ sender: Any) {
text = Date().description
}
private var cancellable: AnyCancellable?
@Published private var text: String? = ""
override func viewDidLoad() {
super.viewDidLoad()
cancellable = $text.assign(to: \.text, on: timeLabel)
}
}Publishers
Future
Future publisher 最終會產生一個值,然後結束。它是用來將非同步的程式碼包裝成一個 publisher。
func login() -> Future<String, Never> {
Future { promise in
DispatchQueue.global().async {
// do login ...
let token = result.token
promise(Result.success(token))
}
}
}
let cancellable = login()
.sink { token in
print(token)
}Just
Just publisher 會發送一個值,然後結束。它常被用來開始一連串的 publisher。
let cancellable = Just(1)
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
})
// Output:
// 1
// finishedRecord
Record publisher 和 Just 很像,但是 Record 可以發送一系列的值。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.sink(receiveValue: { value in
print(value)
})
// Output:
// 1
// 2
// 3
// 4Deferred
Deferred publisher 利用傳入的 closure 建立 publisher。但是,它要被訂閱後,才會建立 publisher。
func login() -> Deferred<Future<String, Never>> {
Deferred {
Future { promise in
DispatchQueue.global().async {
// do login ...
let token = result.token
promise(Result.success(token))
}
}
}
}Subjects
Subject 是一個 Publisher,此外還提供 Subject.send() 讓 callers 發送資料。
PassthroughSubject
PassthroughSubject 可以 broadcast 資料至所有的 subscribers。而且,subscribers 只會接收到它們 subscribe 之後的資料。
let subject = PassthroughSubject<String, Error>()
subject.send("1")
subject.send("2")
let cancellable = subject
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
})
subject.send("3")
subject.send("4")
subject.send(completion: .finished)
// Output:
// 3
// 4
// finishedCurrentValueSubject
如同 PassthroughSubject,CurrentValueSubject 可以 broadcast 資料至所有的 subscribers。不過 CurrentValueSubject 會維護一個 buffer 來儲存最後一次發送的值。所以,在初始化時,必須要傳入一個預設值給 buffer。Subscribers 在 subscribe 之後,他們只會收到 buffer 裡的值和之後發送的資料。
let subject = CurrentValueSubject<String, Error>("0")
subject.send("1")
subject.send("2")
let cancellable = subject
.sink(receiveCompletion: { _ in
print("complete")
}, receiveValue: { value in
print(value)
})
subject.send("3")
subject.send("4")
subject.send(completion: .finished)
// Output:
// 2
// 3
// 4
// completeSpecifying Scheduler
如果我們不指定 scheduler 時,Combine 會用和發送資料時一樣的 thread 來接收資料。也就是說,你在 background thread 裡發送資料,那 sink() 裡接收資料的 closure 就會在 background thread 裡被呼叫。
// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject.sink(receiveValue: { value in
print("\(value) sent from \(Thread.current)")
})
subject.send(1)
DispatchQueue.global().async {
subject.send(2)
}
// Output:
// 1 sent from <_NSMainThread: 0x6000011043c0>{number = 1, name = main}
// 2 sent from <NSThread: 0x600001134000>{number = 4, name = (null)}receive(on:)
當你用 receive(on:) 指定一個 scheduler 後,在它之後的 operators 都會在那個 scheduler 裡接收資料。
// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
.map {
print("map1: \($0) sent from \(Thread.current)")
return $0
}
.receive(on: DispatchQueue.global())
.map {
print("map2: \($0) sent from \(Thread.current)")
return $0
}
.sink(receiveValue: { value in
print("sink: \(value) sent from \(Thread.current)")
})
subject.send(1)
// Output:
// map1: 1 sent from <_NSMainThread: 0x600003d343c0>{number = 1, name = main}
// map2: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}
// sink: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}subscribe(on:)
當你用 subscribe(on:options:) 指定一個 scheduler 後,所有的 operators 都會在那個 scheduler 裡接收資料,直到有一個 receive(on:) 指定其他的 scheduler。
// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
.subscribe(on: DispatchQueue.global())
.map {
print("map1: \($0) sent from \(Thread.current)")
return $0
}
.receive(on: RunLoop.main)
.map {
print("map2: \($0) sent from \(Thread.current)")
return $0
}
.sink(receiveValue: { value in
print("sink: \(value) sent from \(Thread.current)")
})
subject.send(1)
// Output:
// map1: 1 sent from <NSThread: 0x600000a19100>{number = 4, name = (null)}
// map2: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}
// sink: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}Operators
以下將介紹一些常用的 operators。Publishers 還有很多其他 operators,請參照官網。
map() & tryMap()
func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> func tryMap<T>(_ transform: @escaping (Self.Output) throws -> T) -> Publishers.TryMap<Self, T>
map() 和 tryMap() 用 transform 對 upstream publisher 的每一個值做轉換,回傳一個新值到 downstream publisher。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.map {
"Hello \($0)"
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// Hello 1
// Hello 2
// Hello 3
// Hello 4filter() & tryFilter()
func filter(_ isIncluded: @escaping (Self.Output) -> Bool) -> Publishers.Filter<Self> func tryFilter(_ isIncluded: @escaping (Self.Output) throws -> Bool) -> Publishers.TryFilter<Self>
filter() 和 tryFilter() 會過濾從 upstream 進來的值。只有 inIncluded 回傳 true 時,才會重新發送到 downstream。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.filter {
$0 > 2
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// 3
// 4compactMap() & tryCompactMap()
func compactMap<T>(_ transform: @escaping (Self.Output) -> T?) -> Publishers.CompactMap<Self, T> https://developer.apple.com/documentation/combine/publisher/trycompactmap(_:)
compactMap() 和 tryCompactMap() 會從 upstream 接收每一個值,並且回傳一個 optional 值。只有當回傳的 optional 值不是 nil 時,才會被發送到 downstream。
let cancellable = Record(output: [1, 2, nil, 4], completion: .finished)
.compactMap {
$0
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// 1
// 2
// 4reduce() & tryReduce()
func reduce<T>(
_ initialResult: T,
_ nextPartialResult: @escaping (T, Self.Output) -> T
) -> Publishers.Reduce<Self, T>
func tryReduce<T>(
_ initialResult: T,
_ nextPartialResult: @escaping (T, Self.Output) throws -> T
) -> Publishers.TryReduce<Self, T>reduce() 和 tryReduce() 有兩個參數。第二個參數 nextPartialResult 的第一個參數 accumulator,一開始會是 initialResult 的值。然候,nextPartialResult 會回傳一個值。等下一個值從 upstream 進來時,accumulator 會是上一次 nextPartialResult 回傳的值。
以下範例顯示如何用 reduce() 來計算 1 到 4 的和。
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.reduce(0) { accumulator, value in
print("acc=\(accumulator), value=\(value)")
return accumulator + value
}
.sink(receiveValue: { value in
print(value)
})
// Output
// acc=0, value=1
// acc=1, value=2
// acc=3, value=3
// acc=6, value=4
// 10merge()
func merge(with other: Self) -> Publishers.MergeMany<Self>
merge() 可以合併另外一個相同型態的 publisher。值會依照發送的時間順序送到 downstream。
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
let cancellable = subject1
.merge(with: subject2)
.sink(receiveValue: { value in
print(value)
})
subject1.send(1)
subject2.send(11)
subject1.send(2)
subject1.send(3)
subject1.send(12)
subject1.send(4)
// Output:
// 1
// 11
// 2
// 3
// 12
// 4zip()
func zip<P>(_ other: P) -> Publishers.Zip<Self, P> where P : Publisher, Self.Failure == P.Failure
zip() 會結合另外一個 publisher 發送出來的值,以 tuple 將一對值發送到 downstream。
let _ = Record(output: [1, 2, 3, 4], completion: .finished)
.zip(Record(output: [5, 6, 7, 8], completion: .finished))
.sink(receiveValue: { value in
print(value)
})
// Output:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)flatMap()
func flatMap<T, P>(
maxPublishers: Subscribers.Demand = .unlimited,
_ transform: @escaping (Self.Output) -> P
) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure和 map() 一樣,flatMap() 用 transform 對 upstream publisher 的每一個值做轉換,但是它回傳一個新的 publisher。
let _ = Just([1, 2, 3, 4])
.flatMap { array in
Just(array.map { $0 * 10 })
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// [10, 20, 30, 40]結語
Swift Combine 可以讓我們更優雅地處理非同步的程式碼。但是,在 Swift async/await 出現之後,我們可能會比較偏向使用 async/await 來實作非同步的程式碼,而不是使用 Combine。但是,我們依然會利用 Combine 來實作 Observer pattern。這在 SwiftUI 中被大量地使用。
參考
- Combine, Apple Developer.
- Using Combine.
- RunLoop.main vs DispatchQueue.main: The differences explained, SwiftLee.









