Swift Combine is a library created by Apple implementing reactive programming. Before Combine came out, we generally used RxSwift. However, using Combine, we don’t need to introduce additional libraries. And compared to RxSwift, Combine has better performance.
Table of Contents
Publisher & Subscriber
In Swift Combine, there are two very important concepts, publishers and subscribers. Publishers are used to send data, and subscribers are used to subscribe to a publisher. That is, a subscriber is used to receive data from a publisher. This creates a stream.
protocol Publisher<Output, Failure> protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible
In the following example, we create a TimerPublisher. It sends the current time every second. Then, we call sink(receiveValue:) to receive the data sent by TimerPublisher. sink(receiveValue:) will create a subscriber to subscribe to the current publisher.
import UIKit
import Combine
class ViewController: UIViewController {
@IBOutlet weak var timeLabel: UILabel!
private var cancellable: AnyCancellable?
override func viewDidLoad() {
super.viewDidLoad()
cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
.autoconnect()
.sink { date in
timeLabel.text = date.description
}
}
}If we want to directly assign the received data to a certain variable, then we can use assign(to:on:) to directly assign the received data to a certain variable, as shown in the following example.
import UIKit
import Combine
class ViewController: UIViewController {
@IBOutlet weak var timeLabel: UILabel!
private var cancellable: AnyCancellable?
override func viewDidLoad() {
super.viewDidLoad()
cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
.autoconnect()
.map { $0.description }
.assign(to: \.text, on: timeLabel)
}
}@Published
@Published can turn a property into a publisher. As soon as you assign a value to it, it will ㄖpublish the value. When accessing the property’s publisher, add $ operator in front.
import UIKit
import Combine
class ViewController: UIViewController {
@IBOutlet weak var timeLabel: UILabel!
@IBAction func clickButton(_ sender: Any) {
text = Date().description
}
private var cancellable: AnyCancellable?
@Published private var text: String? = ""
override func viewDidLoad() {
super.viewDidLoad()
cancellable = $text.assign(to: \.text, on: timeLabel)
}
}Publishers
Future
The Future publisher eventually produces a value, and finishes. It is used to wrap asynchronous code into a publisher.
func login() -> Future<String, Never> {
Future { promise in
DispatchQueue.global().async {
// do login ...
let token = result.token
promise(Result.success(token))
}
}
}
let cancellable = login()
.sink { token in
print(token)
}Just
Just publisher produces a value and finishes. It is often used to start a chain of publishers.
let cancellable = Just(1)
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
})
// Output:
// 1
// finishedRecord
Record publisher is similar to Just, but Record can produce a series of values.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.sink(receiveValue: { value in
print(value)
})
// Output:
// 1
// 2
// 3
// 4Deferred
Deferred publisher uses the closure to create a publisher, but it will not create the publisher until it is subscribed.
func login() -> Deferred<Future<String, Never>> {
Deferred {
Future { promise in
DispatchQueue.global().async {
// do login ...
let token = result.token
promise(Result.success(token))
}
}
}
}Subjects
Subject is a Publisher, and it also provides Subject.send() to let the callers send data.
PassthroughSubject
PassthroughSubject can broadcast data to all subscribers. Moreover, subscribers will only receive the data after they subscribe.
let subject = PassthroughSubject<String, Error>()
subject.send("1")
subject.send("2")
let cancellable = subject
.sink(receiveCompletion: { completion in
print(completion)
}, receiveValue: { value in
print(value)
})
subject.send("3")
subject.send("4")
subject.send(completion: .finished)
// Output:
// 3
// 4
// finishedCurrentValueSubject
Like PassthroughSubject, CurrentValueSubject can broadcast data to all subscribers. However, CurrentValueSubject maintains a buffer to store the last value. Therefore, when initializing, a default value must be passed in to the buffer. After Subscribers subscribe, they will only receive the value in the buffer and the data sent later.
let subject = CurrentValueSubject<String, Error>("0")
subject.send("1")
subject.send("2")
let cancellable = subject
.sink(receiveCompletion: { _ in
print("complete")
}, receiveValue: { value in
print(value)
})
subject.send("3")
subject.send("4")
subject.send(completion: .finished)
// Output:
// 2
// 3
// 4
// completeSpecifying Scheduler
If we do not specify a scheduler, Combine uses the same thread as when sending data to receive data. That is to say, if you send data in the background thread, the closure that receives the data in sink() will be called in the background thread.
// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject.sink(receiveValue: { value in
print("\(value) sent from \(Thread.current)")
})
subject.send(1)
DispatchQueue.global().async {
subject.send(2)
}
// Output:
// 1 sent from <_NSMainThread: 0x6000011043c0>{number = 1, name = main}
// 2 sent from <NSThread: 0x600001134000>{number = 4, name = (null)}receive(on:)
When you use receive(on:) to specify a scheduler, all operators after it will receive data in that scheduler.
// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
.map {
print("map1: \($0) sent from \(Thread.current)")
return $0
}
.receive(on: DispatchQueue.global())
.map {
print("map2: \($0) sent from \(Thread.current)")
return $0
}
.sink(receiveValue: { value in
print("sink: \(value) sent from \(Thread.current)")
})
subject.send(1)
// Output:
// map1: 1 sent from <_NSMainThread: 0x600003d343c0>{number = 1, name = main}
// map2: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}
// sink: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}subscribe(on:)
When you use subscribe(on:options:) to specify a scheduler, all operators will receive data in that scheduler until a receive(on:) specifies other schedulers.
// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
.subscribe(on: DispatchQueue.global())
.map {
print("map1: \($0) sent from \(Thread.current)")
return $0
}
.receive(on: RunLoop.main)
.map {
print("map2: \($0) sent from \(Thread.current)")
return $0
}
.sink(receiveValue: { value in
print("sink: \(value) sent from \(Thread.current)")
})
subject.send(1)
// Output:
// map1: 1 sent from <NSThread: 0x600000a19100>{number = 4, name = (null)}
// map2: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}
// sink: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}Operators
The following will introduce some commonly used operators. Publishers has many other operators, please refer to the official website.
map() & tryMap()
func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> func tryMap<T>(_ transform: @escaping (Self.Output) throws -> T) -> Publishers.TryMap<Self, T>
map() and tryMap() use transform to transform each value from the upstream publisher, and return a new value to the downstream publisher.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.map {
"Hello \($0)"
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// Hello 1
// Hello 2
// Hello 3
// Hello 4filter() & tryFilter()
func filter(_ isIncluded: @escaping (Self.Output) -> Bool) -> Publishers.Filter<Self> func tryFilter(_ isIncluded: @escaping (Self.Output) throws -> Bool) -> Publishers.TryFilter<Self>
filter() and tryFilter() filter the values from upstream. Only when inIncluded returns true will it be resent to downstream.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.filter {
$0 > 2
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// 3
// 4compactMap() & tryCompactMap()
func compactMap<T>(_ transform: @escaping (Self.Output) -> T?) -> Publishers.CompactMap<Self, T> https://developer.apple.com/documentation/combine/publisher/trycompactmap(_:)
compactMap() and tryCompactMap() receive each value from upstream and return an optional value. Only when the returned optional value is not nil will it be sent downstream.
let cancellable = Record(output: [1, 2, nil, 4], completion: .finished)
.compactMap {
$0
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// 1
// 2
// 4reduce() & tryReduce()
func reduce<T>(
_ initialResult: T,
_ nextPartialResult: @escaping (T, Self.Output) -> T
) -> Publishers.Reduce<Self, T>
func tryReduce<T>(
_ initialResult: T,
_ nextPartialResult: @escaping (T, Self.Output) throws -> T
) -> Publishers.TryReduce<Self, T>reduce() and tryReduce() take two parameters. The first parameter accumulator of the second parameter nextPartialResult will initially be the value of initialResult. Then, nextPartialResult will return a value. When the next value comes in from upstream, the accumulator will be the value returned by nextPartialResult last time.
The following example shows how to use reduce() to calculate the sum from 1 to 4.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
.reduce(0) { accumulator, value in
print("acc=\(accumulator), value=\(value)")
return accumulator + value
}
.sink(receiveValue: { value in
print(value)
})
// Output
// acc=0, value=1
// acc=1, value=2
// acc=3, value=3
// acc=6, value=4
// 10merge()
func merge(with other: Self) -> Publishers.MergeMany<Self>
merge() can merge another publisher of the same type. Values will be sent to downstream in the order of time they were sent.
let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
let cancellable = subject1
.merge(with: subject2)
.sink(receiveValue: { value in
print(value)
})
subject1.send(1)
subject2.send(11)
subject1.send(2)
subject1.send(3)
subject1.send(12)
subject1.send(4)
// Output:
// 1
// 11
// 2
// 3
// 12
// 4zip()
func zip<P>(_ other: P) -> Publishers.Zip<Self, P> where P : Publisher, Self.Failure == P.Failure
zip() combines the values sent by another publisher, and send a pair of values to downstream in tuple.
let _ = Record(output: [1, 2, 3, 4], completion: .finished)
.zip(Record(output: [5, 6, 7, 8], completion: .finished))
.sink(receiveValue: { value in
print(value)
})
// Output:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)flatMap()
func flatMap<T, P>(
maxPublishers: Subscribers.Demand = .unlimited,
_ transform: @escaping (Self.Output) -> P
) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.FailureLike map(), flatMap() transforms each value of the upstream publisher with transform, but it returns a new publisher.
let _ = Just([1, 2, 3, 4])
.flatMap { array in
Just(array.map { $0 * 10 })
}
.sink(receiveValue: { value in
print(value)
})
// Output:
// [10, 20, 30, 40]Conclusion
Swift Combine allows us to handle asynchronous code more gracefully. However, after Swift async/await is released, we may prefer to use async/await to implement asynchronous code instead of using Combine. However, we still use Combine to implement the Observer pattern. This is heavily used in SwiftUI.
Reference
- Combine, Apple Developer.
- Using Combine.
- RunLoop.main vs DispatchQueue.main: The differences explained, SwiftLee.









