iOSエンジニアのつぶやき

毎朝8:30に iOS 関連の技術について1つぶやいています。まれに釣りについてつぶやく可能性があります。

RxSwift における subscribeOn と observeOn の違い

RxSwift などで処理をつなげて書いていると、ここはメインスレッドで、ここはバックグランドスレッドでなどと、処理によってスレッドを切り替えたい場合があるかと思います。そんな時は、Observable のプロパティとして提供されている subscribeOn または、observeOn を使用することでスレッドを切り替えて処理を実行することができます。今回はそんな subscribeOnobserveOn の違いについて簡単にまとめていこうかと思います。

f:id:yum_fishing:20200903210908p:plain

その前に Scheduler

RxSwift では Observable や subscribe の処理をどのスレッドでどのよう(直列・並列)に処理をさばくかを決定する役割を Scheduler と呼びます。これについては後で触れますが、RxSwift を使用し、マルチスレッドで処理を行う際にはとても重要な知識です。

subscribeOn

subscribeOn は参照するストリームの元となる Observable の実行スレッドとそれに続くメソッドのスレッドを決定します。SerialDispatchQueueScheduler で Scheduler を生成し、バックグランドスレッドの直列処理で実行したサンプルが下記のようになります(呼び出しはメインスレッド)。どのように出力されるか結果を見る前に予想してみてください👨‍💻

    let ob = Observable<Int>.create { observer -> Disposable in
        for i in 0...10000 {
            if i == 10000 {
                print("observer called!")
                observer.onNext(1)
                observer.onCompleted()
            }
        }
        return Disposables.create()
    }
    let scheduler = SerialDispatchQueueScheduler(qos: .background)
    ob.subscribeOn(scheduler)
        .subscribe(onNext: { _ in
            print(1)
            for i in 0...100 {
                if i == 100 {
                    print(2)
                }
            }
        })
        .disposed(by: disposeBag)
    ob.subscribeOn(scheduler)
        .subscribe(onNext: { _ in
            print(3)
        })
        .disposed(by: disposeBag)

出力

// 全てバックグランドスレッド
observer called!
1
2
observer called!
3

上から順番にバックグランドスレッドにタスクが追加されていき、直列でそれぞれの処理が実行されているのが分かります。また、基本的には、同じスレッドで実行したい場合には Schedulerインスタンスは統一する必要があるかと思います。(試しに同じ Schedler を別々のインスタンスで実行したところ直列で実行がなされませんでした🤔原因がわかる人教えください🙇‍♂️)

observeOn

observeOn はどの Scheduler で次の Observer にイベントを流すのかを決定します。MainScheduler.instanceSchedler を生成して、購読部分のみメインスレッドで行ったサンプルは下記のようになります。

    DispatchQueue.global(qos: .background).async {
        let ob = Observable<Int>.create { observer -> Disposable in
            for i in 0...10000 {
                if i == 10000 {
                    print("observer called!")
                    observer.onNext(1)
                    observer.onCompleted()
                }
            }
            return Disposables.create()
        }
        ob.observeOn(MainScheduler.instance)
            .subscribe(onNext: { _ in
                print(1)
                for i in 0...100 {
                    if i == 100 {
                        print(2)
                    }
                }
            })
            .disposed(by: self.disposeBag)
        ob.observeOn(MainScheduler.instance)
            .subscribe(onNext: { _ in
                print(3)
            })
            .disposed(by: self.disposeBag)
    }

出力

observer called! // バックグランドスレッド
1 // メインスレッド
2 // メインスレッド
observer called! // バックグランドスレッド
3 // メインスレッド

こちらはバックグランドの処理により、全体の順番が変わることはありますが、基本的にはメインスレッドでの処理は直列処理なので、順番が変わることはありません。

Scheduler の種類

先でもスレッドや処理の実行順などを指定するのに Schedler を使用しましたが、RxSwift では現在下記のような Scheduler が用意されています。

Class 内容
CurrentThreadScheduler (Serial scheduler) Observable.create などで指定されるデフォルトのスケジューラにはこれがしてされていて、スレッドを切り替えず現在のスレッドで処理を実行します。
MainScheduler (Serial scheduler) これは Swift の UI などを更新するためのデフォルトのスレッドで、このスケジューラはメインメソッドから呼ばれた場合はスケジューリングなしですぐに処理が実行されます
SerialDispatchQueueScheduler (Serial scheduler) バックグランドで処理を直列に実行すれ場合に使用するスケジューラで、Qos を引数にとり最適なスレッドを選択することができます。
ConcurrentDispatchQueueScheduler (Concurrent scheduler) バックグランドで処理を並列に実行する場合に使用するスケジューラで、SerialDispatchQueueScheduler と同様に Qos を指定して使用できます。
OperationQueueScheduler (Concurrent scheduler) NSOperationQueue を使った非同期処理を行う時に使用します。また、maxConcurrentOperationCount などで同時実行数を制御したい時などに便利なスケジューラです。

github.com

参考