iOS/RxSwift

RxSwift - Unicast, Multicast

ios-hans 2023. 11. 6. 17:28

이번 시간엔 unicast, multicast의 개념에 대해 알아보자.

Observable(unicast) vs Subject(multicast)

Observable(unicast)

- 1:1 연결방식의 특성을 가지고 있어서 독립적인 실행을 가지며, sequence가 내부적으로 공유되지 않는다.

Subject(multicast)

- 1:N 연결 방식의 특성을 가지고 있어서 해당 sequence가 내부적으로 공유된다.

사실 설명이나 그림만 보면 선뜻 잘 이해가 가지 않을 수 있다. 그래서 예시 코드로 어떻게 출력이 되는지 확인해보면 훨씬 직관일 것이라 생각이 든다. 한번 코드를 살펴보자.

// 1. Observable
func aboutUnicast() {
    let random = Observable<Int>.create { value in
        value.onNext(Int.random(in: 1...100))
        return Disposables.create()
    }
    
    random
        .subscribe(with: self) { owner, value in
            print(value, "Unicast")
        }
        .disposed(by: disposeBag)
    
    random
        .subscribe(with: self) { owner, value in
            print(value, "Unicast")
        }
        .disposed(by: disposeBag)
    
    random
        .subscribe(with: self) { owner, value in
            print(value, "Unicast")
        }
        .disposed(by: disposeBag)
}

// 2. Subject
func aboutMulticast() { 
    let random = BehaviorSubject(value: 100)
    random.on(.next(Int.random(in: 1...100)))
    
    random
        .subscribe(with: self) { owner, value in
            print(value, "Multicast")
        }
        .disposed(by: disposeBag)
    
    random
        .subscribe(with: self) { owner, value in
            print(value, "Multicast")
        }
        .disposed(by: disposeBag)
    
    random
        .subscribe(with: self) { owner, value in
            print(value, "Multicast")
        }
        .disposed(by: disposeBag)
}

/*
93 Unicast
60 Unicast
7 Unicast
75 Multicast
75 Multicast
75 Multicast
*/

Creating Observables

💡 create

여기서 제 블로그 글에는 작성하지 않은 개념이 나오는데 Observable을 create하고 있다. 이것이 뭔지 잠깐만 살펴보자.

원래 Observable을 사용할 때 just, of, from 같은 operator로 이벤트를 방출하는 역할만 사용해왔다. 하지만 create메서드를 활용해 Observable의 sequence를 직접 구성할 때 사용하는 연산자라고 알고 넘어가면 되겠다. 이 부분은 또 기회가 있다면 다른 글에서 자세히 다루도록 하고 본록으로 넘어가도록 하자.

Observable을 생성할 수 있는 연산자는 이외에도 많으니 한번 살펴보면 좋을 것 같다.

🔗 References

- RxSwift - Creating Observables

https://reactivex.io/documentation/operators.html#creating

 

ReactiveX - Operators

Introduction Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementa

reactivex.io

 

자 위 코드를 실행하면 unicast에서는 random값이 각 subscribe별로 다른 random int값이 출력되었고, multicast 방식에서는 같은 random int값이 출력된 모습을 볼 수 있다. 그래서 이게 뭐? 라고 생각할 수 있지만 이 개념을 모른다면 프로젝트를 하면서 분명 리소스가 낭비되는 현상이 발생할 것 같다는 추측을 해볼 수 있다.

 

예를 든다면 URLSession을 통해 API를 호출하는 코드를 구성하였는데 Observable(unicast)를 통해 여러 subscribe를 한다면 분명 한번의 호출을 통한 데이터를 활용하면 될 것을 다수의 호출을 하면서 Request를 여러번 하면서 리소스를 낭비하게 될 것이다. 이 부분도 예시를 살펴보면서 정확히 어떻게 리소스가 낭비되는지 살펴보자.

 

Example URLSession Observable

URLSession을 사용하여 네트워크 통신을 하는 코드에서 Observable이 unicast인 방식이었던 것을 한번 더 확인해보고 어떤 문제가 있는지, 또한 나아가서 어떻게 해결할 수 있는지 살펴보자.

- BasicAPIManager.swift

enum APIError: Error {
    case invalidURL
    case unknown
    case statusError
}

final class BasicAPIManager {
    static let shared = BasicAPIManager()
    private init() {}
    
    func fetchData() -> Observable<SearchAppModel> {
        return Observable<SearchAppModel>.create { value in
            let urlString = "https://itunes.apple.com/search?term=todo&country=KR&media=software&lang=ko_KR&limit=10"
            guard let url = URL(string: urlString) else {
                value.onError(APIError.invalidURL)
                return Disposables.create()
            }
            
            URLSession.shared.dataTask(with: url) { data, response, error in
                
                print("URLSession Succeed")
                
                if let _ = error {
                    value.onError(APIError.unknown)
                    return
                }
                
                guard let response = response as? HTTPURLResponse, (200...299).contains(response.statusCode) else {
                    value.onError(APIError.statusError)
                    return
                }
                
                if let data = data, let appData = try? JSONDecoder().decode(SearchAppModel.self, from: data) {
                    value.onNext(appData)
                }
            }.resume()
            
            return Disposables.create()
        }
    }
}

fetchData() 메서드는 Observable 연산자를 생성해서 return하고 있다. URLSession을 활용하여 네트워크 통신하는 기본적인 코드여서 힘들지 않게 이해할 수 있을 것이다. 물론 RxSwift의 Observable연산자를 생성해 return 하는 코드라 조금 헷갈릴 수 있지만 그렇게 어렵지 않다. 

- SearchAppModel.swift

import Foundation

struct SearchAppModel: Codable {
    let resultCount: Int
    let results: [AppInfo]
}

struct AppInfo: Codable {
    let screenshotUrls: [String]
    let trackName: String // 이름
    let genres: [String] // 장르
    let trackContentRating: String // 연령제한
    let description: String // 설명
    let price: Double // 가격
    let sellerName: String // 개발자 이름
    let formattedPrice: String // 가격(무료/유료)
    let userRatingCount: Int // 평가자 수
    let averageUserRating: Double // 평균 평점
    let artworkUrl512: String // 아이콘 이미지
    let languageCodesISO2A: [String] // 언어 지원
    let trackId: Int
    let version: String
    let releaseNotes: String
}

- SearchViewController.swift

class SearchViewController: UIViewController {

	let disposeBag = DisposeBag()
    
    override func viewDidLoad() {
        super.viewDidLoad()
        bind()
    }
    
    func bind() {
    	let request = BasicAPIManager.shared.fetchData()

        request
            .subscribe(with: self) { owner, result in
                print("첫번째 Observable subscribe")
                owner.items.onNext(result.results)
            }
            .disposed(by: disposeBag)

        request
            .map { data in
                "\(data.results.count)개의 검색 결과"
            }
            .observe(on: MainScheduler.instance)
            .subscribe(with: self) { owner, result in
                print("두번째 Observable subscribe")
                owner.navigationItem.title = result
            }
            .disposed(by: disposeBag)
    }    
}

/*
URLSession Succeed
첫번째 Observable subscribe
URLSession Succeed
두번째 Observable subscribe
*/

출력값을 확인해보면 예상했던대로 2번의 Request가 일어나고 Succeed가 2번 출력되는 것을 볼 수 있다. 잘 생각해보면 2번할 필요가 있을까? 당연히 없다. 호출 url이 변경된 것도 아니고 완전히 똑같은 response를 2번 받는 것 뿐이다. response가 똑같으니 당연히 그 데이터를 가지고 활용할 수 있는 곳에서도 같은 response를 가지고 활용하면된다.

 

그렇다면 어떻게 해결할 수 있을까? 물론 어떤 문제가 발생해서 해결하는 개념이 아니다. 이것의 개념을 명확히 알고 모르고 사용하는 것에 대한 문제가 있다면 효율적인 방안을 모색해볼 수 있기 때문에 그 방법을 알아보자.

Solution

1. Share

share() operator를 활용하면 Observable의 독립적인 실행의 sequence를 공유할 수 있다.

let request = BasicAPIManager.shared
    .fetchData()
    .share() // replay와 scope을 통해 버퍼 사이즈와 유지 상태를 결정할 수 있습니다.

request
    .subscribe(with: self) { owner, result in
        print("첫번째 Observable subscribe")
        owner.items.onNext(result.results)
    }
    .disposed(by: disposeBag)

request
    .map { data in
        "\(data.results.count)개의 검색 결과"
    }
    .observe(on: MainScheduler.instance)
    .subscribe(with: self) { owner, result in
        print("두번째 Observable subscribe")
        owner.navigationItem.title = result
    }
    .disposed(by: disposeBag)
    
/*
URLSession Succeed
첫번째 Observable subscribe
두번째 Observable subscribe
*/

Request요청이 이번에는 한번만 일어나서 Succeed print문이 한번만 출력된 것을 볼 수 있다.

 

subscribe할 때마다, 새로운 시퀀스가 생성되고 같은 Observable을 subscribe, bind하는 곳이 여러 곳이라면 각각의 Stream이 생겨나게 된다. 그렇다면 위에 보았듯이 request을 여러번 하는 것처럼 불필요한 리소스가 발생할 수 있게되고 이것을 해결하기 위해 모든 subscribe가 하나의 subscribe를 공유할 수 있도록 하는 작업이 필요할 것 같다.

 

그럼 2번째로 어떤 방법을 통해 subscribe를 공유할 수 있는지 알아보자.

2. Driver

Driver라는 개념이 등장하는데 큰 줄기로 간단하게 설명한다면 지금까지 Observable을 구독하기 위해 subscribe, bind를 사용해왔는데 이 글에서는 조금 더 UI작업에 특화되어 있다는 개념 정도만 알고 넘어가도 될 것 같다. 그런데 왜 UI에 특화되어 있다는건지정도는 설명을 해보겠다.

Driver의 특징

- Main Thread에서 동작하도록 보장

- bind와 달리 Stream이 공유됨

- subscribe만 할 수 있고 값을 변경할 수는 없음

// driver
let request = BasicAPIManager.shared
    .fetchData()
    .asDriver(onErrorJustReturn: SearchAppModel(resultCount: 0, results: []))

request
    .drive(with: self) { owner, result in
        owner.items.onNext(result.results)
    }
    .disposed(by: disposeBag)

request
    .map { data in
        "\(data.results.count)개의 검색 결과"
    }
    .drive(navigationItem.rx.title)
    .disposed(by: disposeBag)
    
/* Prints
URLSession Succeed
*/

우선 share 대신에 asDriver 메서드를 사용한 다음 초기값을 설정했다. 이후 subscribe나 bind가 아닌 drive를 사용해 구독을 시켰다. drive의 특성에 따라 메인 스레드의 동작을 보장하기 때문에 observe(on:) 메서드를 통해 메인 스레드에 동작하도록 하는 코드를 생략해도 된다.

- drive의 내부 코드를 살펴보게 되면 subscribe를 하는 코드를 wrapping 하도록 설계

- Main Thread에서 동작하도록 보장되어 있는 코드로 설계

- share Operator 또한 내부적으로 동작하도록 설계

Drive는 Rxswift Traits의 한 종류인데 이것 또한 다른 글에서 자세하게 알아볼텐데, 개념이 나왔으므로 간단하게 살펴보면 다음과 같다.

Traits

  • UI처리에 특화된 Observable의 한 종류
    • RxSwift(Observable)
    • RxCocoa(Observable)
  • Traits에 해당하는 Observable은 Main Thread에서 동작하도록 보장하는 특성을 가짐
  • Error 이벤트가 없음(UI에 특화된 Observable이므로..)
  • Traits 중 Signal을 제외하면 Share() Operator가 내부적으로 구현되어 있어 시퀀스를 공유함

Traits의 종류

  • RxSwift
    • Single
    • Completable
    • Maybe
  • RxCocoa
    • Driver
    • Signal
    • ControlProperty, ControlEvent