RxSwift などで処理をつなげて書いていると、ここはメインスレッドで、ここはバックグランドスレッドでなどと、処理によってスレッドを切り替えたい場合があるかと思います。そんな時は、Observable
のプロパティとして提供されている subscribeOn
または、observeOn
を使用することでスレッドを切り替えて処理を実行することができます。今回はそんな subscribeOn
と observeOn
の違いについて簡単にまとめていこうかと思います。
その前に Scheduler
RxSwift では Observable や subscribe の処理をどのスレッドでどのよう(直列・並列)に処理をさばくかを決定する役割を Scheduler
と呼びます。これについては後で触れますが、RxSwift を使用し、マルチスレッドで処理を行う際にはとても重要な知識です。
subscribeOn
subscribeOn
は参照するストリームの元となる Observable の実行スレッドとそれに続くメソッドのスレッドを決定します。SerialDispatchQueueScheduler
で Scheduler を生成し、バックグランドスレッドの直列処理で実行したサンプルが下記のようになります(呼び出しはメインスレッド)。どのように出力されるか結果を見る前に予想してみてください👨💻
let ob = Observable<Int>.create { observer -> Disposable in for i in 0...10000 { if i == 10000 { print("observer called!") observer.onNext(1) observer.onCompleted() } } return Disposables.create() } let scheduler = SerialDispatchQueueScheduler(qos: .background) ob.subscribeOn(scheduler) .subscribe(onNext: { _ in print(1) for i in 0...100 { if i == 100 { print(2) } } }) .disposed(by: disposeBag) ob.subscribeOn(scheduler) .subscribe(onNext: { _ in print(3) }) .disposed(by: disposeBag)
出力
// 全てバックグランドスレッド observer called! 1 2 observer called! 3
上から順番にバックグランドスレッドにタスクが追加されていき、直列でそれぞれの処理が実行されているのが分かります。また、基本的には、同じスレッドで実行したい場合には Scheduler
のインスタンスは統一する必要があるかと思います。(試しに同じ Schedler
を別々のインスタンスで実行したところ直列で実行がなされませんでした🤔原因がわかる人教えください🙇♂️)
observeOn
observeOn
はどの Scheduler
で次の Observer にイベントを流すのかを決定します。MainScheduler.instance
で Schedler
を生成して、購読部分のみメインスレッドで行ったサンプルは下記のようになります。
DispatchQueue.global(qos: .background).async { let ob = Observable<Int>.create { observer -> Disposable in for i in 0...10000 { if i == 10000 { print("observer called!") observer.onNext(1) observer.onCompleted() } } return Disposables.create() } ob.observeOn(MainScheduler.instance) .subscribe(onNext: { _ in print(1) for i in 0...100 { if i == 100 { print(2) } } }) .disposed(by: self.disposeBag) ob.observeOn(MainScheduler.instance) .subscribe(onNext: { _ in print(3) }) .disposed(by: self.disposeBag) }
出力
observer called! // バックグランドスレッド 1 // メインスレッド 2 // メインスレッド observer called! // バックグランドスレッド 3 // メインスレッド
こちらはバックグランドの処理により、全体の順番が変わることはありますが、基本的にはメインスレッドでの処理は直列処理なので、順番が変わることはありません。
Scheduler の種類
先でもスレッドや処理の実行順などを指定するのに Schedler
を使用しましたが、RxSwift では現在下記のような Scheduler
が用意されています。
Class | 内容 |
---|---|
CurrentThreadScheduler (Serial scheduler) | Observable.create などで指定されるデフォルトのスケジューラにはこれがしてされていて、スレッドを切り替えず現在のスレッドで処理を実行します。 |
MainScheduler (Serial scheduler) | これは Swift の UI などを更新するためのデフォルトのスレッドで、このスケジューラはメインメソッドから呼ばれた場合はスケジューリングなしですぐに処理が実行されます |
SerialDispatchQueueScheduler (Serial scheduler) | バックグランドで処理を直列に実行すれ場合に使用するスケジューラで、Qos を引数にとり最適なスレッドを選択することができます。 |
ConcurrentDispatchQueueScheduler (Concurrent scheduler) | バックグランドで処理を並列に実行する場合に使用するスケジューラで、SerialDispatchQueueScheduler と同様に Qos を指定して使用できます。 |
OperationQueueScheduler (Concurrent scheduler) | NSOperationQueue を使った非同期処理を行う時に使用します。また、maxConcurrentOperationCount などで同時実行数を制御したい時などに便利なスケジューラです。 |
参考
- https://github.com/ReactiveX/RxSwift/blob/master/Documentation/Schedulers.md
- https://qiita.com/k5n/items/e80ab6bff4bbb170122d
- http://reactivex.io/documentation/scheduler.html
- https://medium.com/eureka-engineering/rxswift%E3%81%AB%E3%81%8A%E3%81%91%E3%82%8B%E3%83%9E%E3%83%AB%E3%83%81%E3%82%B9%E3%83%AC%E3%83%83%E3%83%89%E3%81%AE%E7%90%86%E8%A7%A3%E3%82%92%E6%B7%B1%E3%82%81%E3%82%8B-scheduler%E3%81%AB%E3%81%A4%E3%81%84%E3%81%A6-2471ec76e518