Swift Combine 教學

Photo by Chris Murray on Unsplash
Photo by Chris Murray on Unsplash
Swift Combine 是 Apple 用來實現 reactive programming 的函式庫。在 Combine 還沒出來之前,我們一般是使用 RxSwift。不過,使用 Combine 的話,我們就不需要再引入額外的函式庫。而且與 RxSwift 相比,Combine 的效能更好。

Swift Combine 是 Apple 用來實現 reactive programming 的函式庫。在 Combine 還沒出來之前,我們一般是使用 RxSwift。不過,使用 Combine 的話,我們就不需要再引入額外的函式庫。而且與 RxSwift 相比,Combine 的效能更好。

Publisher & Subscriber

在 Swift Combine 中,有兩個很重要的觀念,就是 publishers 和 subscribers。Publishers 是用來發送資料,而 subscribers 是用來訂閱某個 publisher。也就是說,subscriber 是用來從 publisher 中接收資料。這樣就會產生出一個 stream。

protocol Publisher<Output, Failure>

protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible

以下範例中,我們建立一個 TimerPublisher。它每一秒會發送當前的時間。然後,我們呼叫 sink(receiveValue:) 來接收 TimerPublisher 發送的資料。sink(receiveValue:) 會建立一個 subscriber 來訂閱當前的 publisher。

import UIKit
import Combine

class ViewController: UIViewController {
    @IBOutlet weak var timeLabel: UILabel!
    private var cancellable: AnyCancellable?

    override func viewDidLoad() {
        super.viewDidLoad()

        cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
            .autoconnect()
            .sink { date in
                timeLabel.text = date.description
            }
    }
}

如果我們是想要將接收到的資料,直接 assign 給某個變數,那麼我們可以使用 assign(to:on:) 來將收到的資料直接 assign 給某個變數,如下範例。

import UIKit
import Combine

class ViewController: UIViewController {
    @IBOutlet weak var timeLabel: UILabel!
    private var cancellable: AnyCancellable?

    override func viewDidLoad() {
        super.viewDidLoad()

        cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
            .autoconnect()
            .map { $0.description }
            .assign(to: \.text, on: timeLabel)
    }
}

@Published

@Published 可以將一個 property 變成 publisher。只要 assign 值給它,它就會將值發送出去。當存取 property 的 publisher 時,要在前面加上 $ operator。

import UIKit
import Combine

class ViewController: UIViewController {
    @IBOutlet weak var timeLabel: UILabel!
    
    @IBAction func clickButton(_ sender: Any) {
        text = Date().description
    }
    
    private var cancellable: AnyCancellable?
    @Published private var text: String? = ""

    override func viewDidLoad() {
        super.viewDidLoad()
        cancellable = $text.assign(to: \.text, on: timeLabel)
    }
}

Publishers

Future

Future publisher 最終會產生一個值,然後結束。它是用來將非同步的程式碼包裝成一個 publisher。

func login() -> Future<String, Never> {
    Future { promise in
        DispatchQueue.global().async {
            // do login ...
            let token = result.token
            promise(Result.success(token))
        }
    }
}

let cancellable = login()
    .sink { token in
        print(token)
    }

Just

Just publisher 會發送一個值,然後結束。它常被用來開始一連串的 publisher。

let cancellable = Just(1)
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { value in
        print(value)
    })

// Output:
// 1
// finished

Record

Record publisher 和 Just 很像,但是 Record 可以發送一系列的值。

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// 1
// 2
// 3
// 4

Deferred

Deferred publisher 利用傳入的 closure 建立 publisher。但是,它要被訂閱後,才會建立 publisher。

func login() -> Deferred<Future<String, Never>> {
    Deferred {
        Future { promise in
            DispatchQueue.global().async {
                // do login ...
                let token = result.token
                promise(Result.success(token))
            }
        }
    }
}

Subjects

Subject 是一個 Publisher,此外還提供 Subject.send() 讓 callers 發送資料。

PassthroughSubject

PassthroughSubject 可以 broadcast 資料至所有的 subscribers。而且,subscribers 只會接收到它們 subscribe 之後的資料。

let subject = PassthroughSubject<String, Error>()

subject.send("1")
subject.send("2")
        
let cancellable = subject
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { value in
        print(value)
    })
        
subject.send("3")
subject.send("4")
subject.send(completion: .finished)

// Output:
// 3
// 4
// finished

CurrentValueSubject

如同 PassthroughSubject,CurrentValueSubject 可以 broadcast 資料至所有的 subscribers。不過 CurrentValueSubject 會維護一個 buffer 來儲存最後一次發送的值。所以,在初始化時,必須要傳入一個預設值給 buffer。Subscribers 在 subscribe 之後,他們只會收到 buffer 裡的值和之後發送的資料。

let subject = CurrentValueSubject<String, Error>("0")

subject.send("1")
subject.send("2")
        
let cancellable = subject
    .sink(receiveCompletion: { _ in
        print("complete")
    }, receiveValue: { value in
        print(value)
    })
        
subject.send("3")
subject.send("4")
subject.send(completion: .finished)

// Output:
// 2
// 3
// 4
// complete

Specifying Scheduler

如果我們不指定 scheduler 時,Combine 會用和發送資料時一樣的 thread 來接收資料。也就是說,你在 background thread 裡發送資料,那 sink() 裡接收資料的 closure 就會在 background thread 裡被呼叫。

// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject.sink(receiveValue: { value in
    print("\(value) sent from \(Thread.current)")
})
subject.send(1)
DispatchQueue.global().async {
    subject.send(2)
}

// Output:
// 1 sent from <_NSMainThread: 0x6000011043c0>{number = 1, name = main}
// 2 sent from <NSThread: 0x600001134000>{number = 4, name = (null)}

receive(on:)

當你用 receive(on:) 指定一個 scheduler 後,在它之後的 operators 都會在那個 scheduler 裡接收資料。

// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
    .map {
        print("map1: \($0) sent from \(Thread.current)")
        return $0
    }
    .receive(on: DispatchQueue.global())
    .map {
        print("map2: \($0) sent from \(Thread.current)")
        return $0
    }
    .sink(receiveValue: { value in
        print("sink: \(value) sent from \(Thread.current)")
    })
subject.send(1)

// Output:
// map1: 1 sent from <_NSMainThread: 0x600003d343c0>{number = 1, name = main}
// map2: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}
// sink: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}

subscribe(on:)

當你用 subscribe(on:options:) 指定一個 scheduler 後,所有的 operators 都會在那個 scheduler 裡接收資料,直到有一個 receive(on:) 指定其他的 scheduler。

// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
    .subscribe(on: DispatchQueue.global())
    .map {
        print("map1: \($0) sent from \(Thread.current)")
        return $0
    }
    .receive(on: RunLoop.main)
    .map {
        print("map2: \($0) sent from \(Thread.current)")
        return $0
    }
    .sink(receiveValue: { value in
        print("sink: \(value) sent from \(Thread.current)")
    })
subject.send(1)

// Output:
// map1: 1 sent from <NSThread: 0x600000a19100>{number = 4, name = (null)}
// map2: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}
// sink: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}

Operators

以下將介紹一些常用的 operators。Publishers 還有很多其他 operators,請參照官網

map() & tryMap()

func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T>

func tryMap<T>(_ transform: @escaping (Self.Output) throws -> T) -> Publishers.TryMap<Self, T>

map()tryMap()transform 對 upstream publisher 的每一個值做轉換,回傳一個新值到 downstream publisher。

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .map {
        "Hello \($0)"
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// Hello 1
// Hello 2
// Hello 3
// Hello 4

filter() & tryFilter()

func filter(_ isIncluded: @escaping (Self.Output) -> Bool) -> Publishers.Filter<Self>

func tryFilter(_ isIncluded: @escaping (Self.Output) throws -> Bool) -> Publishers.TryFilter<Self>

filter()tryFilter() 會過濾從 upstream 進來的值。只有 inIncluded 回傳 true 時,才會重新發送到 downstream。

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .filter {
        $0 > 2
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// 3
// 4

compactMap() & tryCompactMap()

func compactMap<T>(_ transform: @escaping (Self.Output) -> T?) -> Publishers.CompactMap<Self, T>

https://developer.apple.com/documentation/combine/publisher/trycompactmap(_:)

compactMap()tryCompactMap() 會從 upstream 接收每一個值,並且回傳一個 optional 值。只有當回傳的 optional 值不是 nil 時,才會被發送到 downstream。

let cancellable = Record(output: [1, 2, nil, 4], completion: .finished)
    .compactMap { 
        $0
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// 1
// 2
// 4

reduce() & tryReduce()

func reduce<T>(
    _ initialResult: T,
    _ nextPartialResult: @escaping (T, Self.Output) -> T
) -> Publishers.Reduce<Self, T>

func tryReduce<T>(
    _ initialResult: T,
    _ nextPartialResult: @escaping (T, Self.Output) throws -> T
) -> Publishers.TryReduce<Self, T>

reduce()tryReduce() 有兩個參數。第二個參數 nextPartialResult 的第一個參數 accumulator,一開始會是 initialResult 的值。然候,nextPartialResult 會回傳一個值。等下一個值從 upstream 進來時,accumulator 會是上一次 nextPartialResult 回傳的值。

以下範例顯示如何用 reduce() 來計算 1 到 4 的和。

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .reduce(0) { accumulator, value in
        print("acc=\(accumulator), value=\(value)")
        return accumulator + value
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output
// acc=0, value=1
// acc=1, value=2
// acc=3, value=3
// acc=6, value=4
// 10

merge()

func merge(with other: Self) -> Publishers.MergeMany<Self>

merge() 可以合併另外一個相同型態的 publisher。值會依照發送的時間順序送到 downstream。

let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
let cancellable = subject1
    .merge(with: subject2)
    .sink(receiveValue: { value in
        print(value)
    })
subject1.send(1)
subject2.send(11)
subject1.send(2)
subject1.send(3)
subject1.send(12)
subject1.send(4)

// Output:
// 1
// 11
// 2
// 3
// 12
// 4

zip()

func zip<P>(_ other: P) -> Publishers.Zip<Self, P> where P : Publisher, Self.Failure == P.Failure

zip() 會結合另外一個 publisher 發送出來的值,以 tuple 將一對值發送到 downstream。

let _ = Record(output: [1, 2, 3, 4], completion: .finished)
    .zip(Record(output: [5, 6, 7, 8], completion: .finished))
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)

flatMap()

func flatMap<T, P>(
    maxPublishers: Subscribers.Demand = .unlimited,
    _ transform: @escaping (Self.Output) -> P
) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure

和 map() 一樣,flatMap()transform 對 upstream publisher 的每一個值做轉換,但是它回傳一個新的 publisher。

let _ = Just([1, 2, 3, 4])
    .flatMap { array in
        Just(array.map { $0 * 10 })
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// [10, 20, 30, 40]

結語

Swift Combine 可以讓我們更優雅地處理非同步的程式碼。但是,在 Swift async/await 出現之後,我們可能會比較偏向使用 async/await 來實作非同步的程式碼,而不是使用 Combine。但是,我們依然會利用 Combine 來實作 Observer pattern。這在 SwiftUI 中被大量地使用。

參考

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

You May Also Like
Photo by Ben White on Unsplash
Read More

Swift Concurrency 教學

Swift 5.5 推出了 Swift concurrency。它讓我們用 synchronous 的方式來完成 asynchronous code。大大地降低 asynchronous code 的複雜度。本文章將介紹 Swift concurrency 的基本知識。
Read More
Photo by Heather Barnes on Unsplash
Read More

如何建立並發佈 Swift Packages

Swift packages 是可重複使用的程式碼元件。它可包含程式碼、二進位檔、和資源檔。我們可以很容易地在我們的 app 專案中使用 Swift packages。本文章將介紹如何建立並發佈 Swift packages。
Read More