Language/Swift

[Swift] Reactive Programming Combine - 2: Publishers & Subscribers

jaewpark 2024. 5. 4. 00:00

Publishers & Subscribers

Cancellable

Subscriber가 완료될 때, 더 이상 publisher로부터 값을 받지 않으려면 subscription을 취소하여 리소스를 확보하고 네트워크 호출과 같은 해당 활동이 발생하지 않도록 하는 것이 좋습니다. Subcription은 cancellation token으로 AnyCancellable 인스턴스를 반환하므로, 완료되면 취소할 수 있습니다. AnyCancellable은 Cancellable protocol을 준수하며, cancel()메서드는 정확히 그 목적을 위해 필요합니다. 

 

Subsciption에서 cancel()을 명시적으로 호출하지 않으면, publisher가 완료하거나 정상적인 메모리 관리로 인해 저장된 subscription이 초기화될 때까지 계속되며, 그 시점에 취소됩니다.

 

Custom subscriber

public protocol Subscriber: CustomCombineIdentifierConvertible {
  // 수신할 수 있는 값의 유형
  associatedtype Input
  // 수신할 수 있는 오류 유형 또는 Never
  associatedtype Failure: Error
  // publisher가 subscriber에게 이 메서드를 호출하여 subscription을 제공
  func receive(subscription: Subscription)
  // subsriber에게 publish한 값을 발송
  func receive(_ input: Self.Input) -> Subscribers.Demand
  // 정상적으로 값을 생성하거나 오류가 생성되었다는 것을 알림
  func receive(completion: Subscribers.Completion<Self.Failure>)
}

 

Subscriber를 준수하여 아래와 같이 만듭니다.

Demand를 최대 3으로 지정했고 receive(_:)에서 반환을 .none으로 했기에 수신되는 값은 3개만 출력됩니다.

.none을 .unlimited로 변경하게 되면 완료 이벤트와 함께 모든 값이 수신되는 것을 확인할 수 있습니다.

final class IntSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    func receive(subscription: Subscription) {
        subscription.request(.max(3))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Received value", input)
        return .none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        print("Received completion", completion)
    }
}

let publisher = (1...6).publisher
let subscriber = IntSubscriber()

publisher.subscribe(subscriber)

// Received value 1
// Received value 2
// Received value 3

 

또한, Publisher가 Input의 유형과 동일하지 않다면 오류가 발생합니다.

Subscription을 만들기 위해서는 Input 및 Failure 유형을 일치시켜야 합니다.

 

Future

비동기적으로 단일 결과를 생성하고 완료할 수 있는 Publisher

Future에서 promise는 게시한 단일 값 또는 오류가 포함된 결과를 수신하는 클로저의 유형입니다.

func futureIncrement(integer: Int, afterDelay delay: TimeInterval) -> Future<Int, Never> {
    Future<Int, Never> { promise in
        DispatchQueue.global().asyncAfter(deadline: .now() + delay) {
            promise(.success(integer + 1))
        }
    }
}

var subscriptions = Set<AnyCancellable>()
let future = futureIncrement(integer: 1, afterDelay: 3)

future
  .sink(receiveCompletion: { print($0) },
        receiveValue: { print($0) })
  .store(in: &subscriptions)

 

위 코드를 유추하면 3초 뒤에 입력받은 숫자보다 1 커진 값을 출력하는 것을 알 수 있습니다.

let future = futureIncrement(integer: 1, afterDelay: 3)

future
  .sink(receiveCompletion: { print($0) },
        receiveValue: { print($0) })
  .store(in: &subscriptions)

future
  .sink(receiveCompletion: { print("Second", $0) },
        receiveValue: { print("Second", $0) })
  .store(in: &subscriptions)

 

3초 뒤에 처음의 값을 출력하고 다음 Second 관련 출력이 바로 발생됩니다.

비동기적으로 게시를 하도록 만들었기 때문에 6초가 아닌 3초만 지연되는 것을 확인할 수 있습니다.

 

Subject

결합하지 않은 명령형 코드가 Combine subscribers에게 값을 보낼 수 있도록 하는 중개자 역할입니다.

 

enum MyError: Error {
    case test
}

final class StringSubscriber: Subscriber {
    typealias Input = String
    typealias Failure = MyError
    func receive(subscription: Subscription) {
        subscription.request(.max(2))
    }
    func receive(_ input: String) -> Subscribers.Demand {
        print("Received value", input)
        // 입력이 "World"라면 최대값을 1 늘립니다.
        return input == "World" ? .max(1) : .none
    }
    func receive(completion: Subscribers.Completion<MyError>) {
        print("Received completion", completion)
    }
}

let subscriber = StringSubscriber()
let subject = PassthroughSubject<String, MyError>()
subject.subscribe(subscriber)

let subscription = subject
  .sink(
    receiveCompletion: { completion in
      print("Received completion (sink)", completion)
    },
    receiveValue: { value in
      print("Received value (sink)", value)
    }
)

subject.send("Hello")
subject.send("World")

// PassthroughSubject의 출력
// Received value (sink) Hello
// Received value Hello
// Received value (sink) World
// Received value World

 

PassthroughSubject는 초기 값이나 가장 최근에 게시된(published) 요소의 버퍼를 갖지 않습니다.

PassthroughSubject는 2가지 경우에 값을 삭제합니다.

  • subscribers가 없는 경우
  • 현재 demand가 0인 경우

예제에서는 cancel()이 되거나 완료에 대한 내용을 다룹니다.

 

구독을 cancel한 경우라면 subscriber는 "Still there?" 문장을 받을 수 없습니다.

subscription.cancel()
subject.send("Still there?")

// 출력
// Received value Still there?

 

구독을 완료했다면?

// 완료를 이미했기 때문에 문장을 받을 수 없습니다.
subject.send(completion: .finished)
subject.send("How about another one?")

// 출력
// Received completion finished

/* ------------ 종료가 아닌 실패로 발생시켰다면 ?? ------------ */

// 실패로 완료했기에 에러가 발생합니다.
subject.send(completion: .failure(MyError.test))

// 출력
// Received completion failure

 

각 subscription을 값으로 저장하는 대신 AnyCancellable 컬렉션에 여러 subscription을 저장할 수 있습니다.

그러면 컬렉션이 초기화될 때 컬렉션에 추가된 각 subscription은 자동으로 취소됩니다.

 

CurrentValueSubject는 가장 최근에 게시된 요소의 버퍼를 유지합니다.

초기 값을 설정해야하고 가장 최근에 게시된 요소를 받을 수도 있습니다.

example(of: "CurrentValueSubject") {

    let subject = CurrentValueSubject<Int, Never>(0)

    subject
        .sink(receiveValue: { print($0) })
        .store(in: &subscriptions)

    subject.send(1)
    subject.send(2)

    print("CurrentValueSubject's value is \"\(subject.value)\"")

    subject.value = 3
    print("CurrentValueSubject's value is \"\(subject.value)\"")
}

// CurrentValueSubject의 출력
// 0
// 1
// 2
// CurrentValueSubject's value is "2"
// 3
// CurrentValueSubject's value is "3"

 

Dynamically adjusting demand

특정 값을 받은 경우에 동적으로 Demand 의 값을 변경할 수 있습니다.

example(of: "Dynamically adjusting Demand") {
    final class IntSubscriber: Subscriber {
        typealias Input = Int
        typealias Failure = Never
        
        func receive(subscription: Subscription) {
            subscription.request(.max(2))
        }
        
        func receive(_ input: Int) -> Subscribers.Demand {
            print("Received value", input)
            switch input {
            case 1:
                return .max(2)
            case 3:
                return .max(1)
            default:
                return .none
            }
        }
        
        func receive(completion: Subscribers.Completion<Never>) {
            print("Received completion", completion)
        }
    }
    
    let subscriber = IntSubscriber()
    let subject = PassthroughSubject<Int, Never>()
    subject.subscribe(subscriber)
    
    subject.send(1)  // Demand의 최대값을 2 증가
    subject.send(2)
    subject.send(3)  // Demand의 최대값을 1 증가
    subject.send(4)
    subject.send(5)
    subject.send(6)
}

// 출력
// Received value 1
// Received value 2
// Received value 3
// Received value 4
// Received value 5

 

Type erasure

eraseToAnyPublisher()를 사용하면 추상화되어 AnyPublisher로 노출합니다.

추상화를 한다는 것은 제네릭 타입을 숨길 수 있습니다. 유형이 삭제되면 기존 클라이언트에 영향을 주지 않고 시간이 지남에 따라 기본 publisher 구현을 변경할 수 있습니다. 아직 자세한 의미를 모르겠지만, 진행을 하면서 모르는 부분에 대해서는 추가적인 링크로 연결할 예정입니다.

AnyPublisher에는 send(_:) 연산자가 없으므로 새 값을 추가할 수 없습니다. Subject를 추상화하여 표현하게 되면 값을 전달하는 역할만 하게 되는 거 같습니다.

 

let subject = PassthroughSubject<Int, Never>()
let publisher = subject.eraseToAnyPublisher()

publisher
.sink(receiveValue: { print($0) })
.store(in: &subscriptions)

subject.send(0)

publisher.send(1)  // Error

 


Key Points

  • Publishers는 시간이 지남에 따라 일련의 값을 하나 이상의 subscribers에게 동기식 또는 비동기식으로 전송
  • Subscriber는 publisher를 구독하여 값을 받을 수 있지만 Input과 failure 유형이 publisher의 output과 failure와 동일
  • Publisher를 구독하는데 사용하는 기본 연산자는 sinkassign이 존재
  • Subscriber는 demand를 늘릴 수 있지만 줄이는 건 불가 
  • 리소스를 확보하고 원치않는 부작용을 방지하기 위해서는 구독이 완료되면 각 구독을 취소
  • 구독을 AnyCancellable의 인스턴스 또는 컬렉션에 저장하여 초기화시 자동 취소 가능
  • Subjects는 외부 호출자가 시작 값의 유무에 관계없이 여러 값을 subscribers에게 비동기적으로 보낼 수 있도록 하는 publisher