[Swift] Reactive Programming Combine - 2: Publishers & Subscribers
Publishers & Subscribers
Cancellable
Subscriber가 완료될 때, 더 이상 publisher로부터 값을 받지 않으려면 subscription을 취소하여 리소스를 확보하고 네트워크 호출과 같은 해당 활동이 발생하지 않도록 하는 것이 좋습니다. Subcription은 cancellation token으로 AnyCancellable 인스턴스를 반환하므로, 완료되면 취소할 수 있습니다. AnyCancellable은 Cancellable protocol을 준수하며, cancel()메서드는 정확히 그 목적을 위해 필요합니다.
Subsciption에서 cancel()을 명시적으로 호출하지 않으면, publisher가 완료하거나 정상적인 메모리 관리로 인해 저장된 subscription이 초기화될 때까지 계속되며, 그 시점에 취소됩니다.
Custom subscriber
public protocol Subscriber: CustomCombineIdentifierConvertible {
// 수신할 수 있는 값의 유형
associatedtype Input
// 수신할 수 있는 오류 유형 또는 Never
associatedtype Failure: Error
// publisher가 subscriber에게 이 메서드를 호출하여 subscription을 제공
func receive(subscription: Subscription)
// subsriber에게 publish한 값을 발송
func receive(_ input: Self.Input) -> Subscribers.Demand
// 정상적으로 값을 생성하거나 오류가 생성되었다는 것을 알림
func receive(completion: Subscribers.Completion<Self.Failure>)
}
Subscriber를 준수하여 아래와 같이 만듭니다.
Demand를 최대 3으로 지정했고 receive(_:)에서 반환을 .none으로 했기에 수신되는 값은 3개만 출력됩니다.
.none을 .unlimited로 변경하게 되면 완료 이벤트와 함께 모든 값이 수신되는 것을 확인할 수 있습니다.
final class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(3))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
let publisher = (1...6).publisher
let subscriber = IntSubscriber()
publisher.subscribe(subscriber)
// Received value 1
// Received value 2
// Received value 3
또한, Publisher가 Input의 유형과 동일하지 않다면 오류가 발생합니다.
Subscription을 만들기 위해서는 Input 및 Failure 유형을 일치시켜야 합니다.
Future
비동기적으로 단일 결과를 생성하고 완료할 수 있는 Publisher
Future에서 promise는 게시한 단일 값 또는 오류가 포함된 결과를 수신하는 클로저의 유형입니다.
func futureIncrement(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never> {
Future<Int, Never> { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
promise(.success(integer + 1))
}
}
}
var subscriptions = Set<AnyCancellable>()
let future = futureIncrement(integer: 1, afterDelay: 3)
future
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)
위 코드를 유추하면 3초 뒤에 입력받은 숫자보다 1 커진 값을 출력하는 것을 알 수 있습니다.
let future = futureIncrement(integer: 1, afterDelay: 3)
future
.sink(receiveCompletion: { print($0) },
receiveValue: { print($0) })
.store(in: &subscriptions)
future
.sink(receiveCompletion: { print("Second", $0) },
receiveValue: { print("Second", $0) })
.store(in: &subscriptions)
3초 뒤에 처음의 값을 출력하고 다음 Second 관련 출력이 바로 발생됩니다.
비동기적으로 게시를 하도록 만들었기 때문에 6초가 아닌 3초만 지연되는 것을 확인할 수 있습니다.
Subject
결합하지 않은 명령형 코드가 Combine subscribers에게 값을 보낼 수 있도록 하는 중개자 역할입니다.
enum MyError: Error {
case test
}
final class StringSubscriber: Subscriber {
typealias Input = String
typealias Failure = MyError
func receive(subscription: Subscription) {
subscription.request(.max(2))
}
func receive(_ input: String) -> Subscribers.Demand {
print("Received value", input)
// 입력이 "World"라면 최대값을 1 늘립니다.
return input == "World" ? .max(1) : .none
}
func receive(completion: Subscribers.Completion<MyError>) {
print("Received completion", completion)
}
}
let subscriber = StringSubscriber()
let subject = PassthroughSubject<String, MyError>()
subject.subscribe(subscriber)
let subscription = subject
.sink(
receiveCompletion: { completion in
print("Received completion (sink)", completion)
},
receiveValue: { value in
print("Received value (sink)", value)
}
)
subject.send("Hello")
subject.send("World")
// PassthroughSubject의 출력
// Received value (sink) Hello
// Received value Hello
// Received value (sink) World
// Received value World
PassthroughSubject는 초기 값이나 가장 최근에 게시된(published) 요소의 버퍼를 갖지 않습니다.
PassthroughSubject는 2가지 경우에 값을 삭제합니다.
- subscribers가 없는 경우
- 현재 demand가 0인 경우
예제에서는 cancel()이 되거나 완료에 대한 내용을 다룹니다.
구독을 cancel한 경우라면 subscriber는 "Still there?" 문장을 받을 수 없습니다.
subscription.cancel()
subject.send("Still there?")
// 출력
// Received value Still there?
구독을 완료했다면?
// 완료를 이미했기 때문에 문장을 받을 수 없습니다.
subject.send(completion: .finished)
subject.send("How about another one?")
// 출력
// Received completion finished
/* ------------ 종료가 아닌 실패로 발생시켰다면 ?? ------------ */
// 실패로 완료했기에 에러가 발생합니다.
subject.send(completion: .failure(MyError.test))
// 출력
// Received completion failure
각 subscription을 값으로 저장하는 대신 AnyCancellable 컬렉션에 여러 subscription을 저장할 수 있습니다.
그러면 컬렉션이 초기화될 때 컬렉션에 추가된 각 subscription은 자동으로 취소됩니다.
CurrentValueSubject는 가장 최근에 게시된 요소의 버퍼를 유지합니다.
초기 값을 설정해야하고 가장 최근에 게시된 요소를 받을 수도 있습니다.
example(of: "CurrentValueSubject") {
let subject = CurrentValueSubject<Int, Never>(0)
subject
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)
subject.send(1)
subject.send(2)
print("CurrentValueSubject's value is \"\(subject.value)\"")
subject.value = 3
print("CurrentValueSubject's value is \"\(subject.value)\"")
}
// CurrentValueSubject의 출력
// 0
// 1
// 2
// CurrentValueSubject's value is "2"
// 3
// CurrentValueSubject's value is "3"
Dynamically adjusting demand
특정 값을 받은 경우에 동적으로 Demand 의 값을 변경할 수 있습니다.
example(of: "Dynamically adjusting Demand") {
final class IntSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(2))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Received value", input)
switch input {
case 1:
return .max(2)
case 3:
return .max(1)
default:
return .none
}
}
func receive(completion: Subscribers.Completion<Never>) {
print("Received completion", completion)
}
}
let subscriber = IntSubscriber()
let subject = PassthroughSubject<Int, Never>()
subject.subscribe(subscriber)
subject.send(1) // Demand의 최대값을 2 증가
subject.send(2)
subject.send(3) // Demand의 최대값을 1 증가
subject.send(4)
subject.send(5)
subject.send(6)
}
// 출력
// Received value 1
// Received value 2
// Received value 3
// Received value 4
// Received value 5
Type erasure
eraseToAnyPublisher()를 사용하면 추상화되어 AnyPublisher로 노출합니다.
추상화를 한다는 것은 제네릭 타입을 숨길 수 있습니다. 유형이 삭제되면 기존 클라이언트에 영향을 주지 않고 시간이 지남에 따라 기본 publisher 구현을 변경할 수 있습니다. 아직 자세한 의미를 모르겠지만, 진행을 하면서 모르는 부분에 대해서는 추가적인 링크로 연결할 예정입니다.
AnyPublisher에는 send(_:) 연산자가 없으므로 새 값을 추가할 수 없습니다. Subject를 추상화하여 표현하게 되면 값을 전달하는 역할만 하게 되는 거 같습니다.
let subject = PassthroughSubject<Int, Never>()
let publisher = subject.eraseToAnyPublisher()
publisher
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)
subject.send(0)
publisher.send(1) // Error
Key Points
- Publishers는 시간이 지남에 따라 일련의 값을 하나 이상의 subscribers에게 동기식 또는 비동기식으로 전송
- Subscriber는 publisher를 구독하여 값을 받을 수 있지만 Input과 failure 유형이 publisher의 output과 failure와 동일
- Publisher를 구독하는데 사용하는 기본 연산자는 sink와 assign이 존재
- Subscriber는 demand를 늘릴 수 있지만 줄이는 건 불가
- 리소스를 확보하고 원치않는 부작용을 방지하기 위해서는 구독이 완료되면 각 구독을 취소
- 구독을 AnyCancellable의 인스턴스 또는 컬렉션에 저장하여 초기화시 자동 취소 가능
- Subjects는 외부 호출자가 시작 값의 유무에 관계없이 여러 값을 subscribers에게 비동기적으로 보낼 수 있도록 하는 publisher