iOSエンジニアのつぶやき

毎朝8:30に iOS 関連の技術について1つぶやいています。まれに釣りについてつぶやく可能性があります。

RxSwift で非同期処理を合成しよう

今回は同じ型の非同期処理を RxSwift でまとめる際に使用する concatmerge の使い方と挙動を簡単にまとめとこうと思います。

concat

f:id:yum_fishing:20200831205414p:plain

concat() は複数の非同期処理を渡された順番で順次処理を行っていきます。サンプルは次のようになります。

    let ob1 = Observable<String>.create { observer -> Disposable in
        // 3秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }
    let ob2 = Observable<String>.create { observer -> Disposable in
        // 2秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext("ob2")
            observer.onCompleted()
        }
        return Disposables.create()
    }

    Observable.of(ob1, ob2)
        .concat()
        .subscribe(onNext: { str in
            print(str)
        })
        .disposed(by: disposeBag)
    
    // 出力:
    //
    // ob1
    // ob2
    //

また、Observable の合成は下記のように書くこともできます。

    Observable.concat(ob1, ob2)

merge

f:id:yum_fishing:20200831205430p:plain

merge() は複数の非同期処理を並列に実行することができます。つまり、Observable の渡される順番などが関係なく処理が早く終わった順にストリームに流れます。下記がサンプルコードになります。

    let ob1 = Observable<String>.create { observer -> Disposable in
        // 3秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }
    let ob2 = Observable<String>.create { observer -> Disposable in
        // 2秒後にイベントを流す
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext("ob2")
            observer.onCompleted()
        }
        return Disposables.create()
    }

    Observable.of(ob1, ob2)
        .merge()
        .subscribe(onNext: { str in
            print(str)
        })
        .disposed(by: disposeBag)

    // 出力:
    //
    // ob2
    // ob1
    //

じゃあ異なる型の Observable はどうなるの?

基本的には、ストリームのイベントを逐次検知する必要がある場合は Observable の型を統一して、concat なり merge なりを使用する必要があります。並列で処理を実行して全ての処理が完了したタイミングで値を参照する場合は、zip という関数が用意されていますが、直列で実行が完了した値を参照したい場合は flatMap なり、concat なりを使って実装する感じでしょうか🤔(こんな方法があるよってやつがあれば教えてください🥺)

直列で逐次イベントを検知

    enum Container {
        case string(String)
        case int(Int)
    }
    let ob1 = Observable<String>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.string($0) }
    
    let ob2 = Observable<Int>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext(2)
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.int($0) }

    Observable.of(ob1, ob2)
        .concat()
        .subscribe(onNext: { c in
            switch c {
            case .string(let str):
                print("string: \(str)")
            case .int(let num):
                print("int: \(num)")
            }
        })
        .disposed(by: disposeBag)


    // 出力:
    //
    // string: ob1
    // int: 2
    //

並列で逐次イベントを検知

    enum Container {
        case string(String)
        case int(Int)
    }
    let ob1 = Observable<String>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.string($0) }
    
    let ob2 = Observable<Int>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext(2)
            observer.onCompleted()
        }
        return Disposables.create()
    }.map { Container.int($0) }

    Observable.of(ob1, ob2)
        .merge()
        .subscribe(onNext: { c in
            switch c {
            case .string(let str):
                print("string: \(str)")
            case .int(let num):
                print("int: \(num)")
            }
        })
        .disposed(by: disposeBag)


    // 出力:
    //
    // int: 2
    // string: ob1
    //

並列で完了イベントを検知

zip を使うと上記2つの方法とは違い型を統一する必要がないので、よりシンプルに実装することができます。

    let ob1 = Observable<String>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer.onNext("ob1")
            observer.onCompleted()
        }
        return Disposables.create()
    }
    
    let ob2 = Observable<Int>.create { observer -> Disposable in
        DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
            observer.onNext(2)
            observer.onCompleted()
        }
        return Disposables.create()
    }

    Observable.zip(ob1, ob2)
        .subscribe(onNext: { str, num in
            print("string: \(str), int: \(num)")
        })
        .disposed(by: disposeBag)

    // 出力:
    //
    // string: ob1, int: 2
    //

参考

その他の記事

yamato8010.hatenablog.com

yamato8010.hatenablog.com

yamato8010.hatenablog.com