この記事は、Reactive Extensions再入門とは関係ありません。メモです。
ずっと挙動が個人的に謎だったSubscribeOnメソッドですが、調べてみたところ単純な機能を持ったメソッドっぽいです。
どういう時に使うのかイメージは沸きませんが、Windows PhoneのMSDNのヘルプには、こう記載されています。
スケジューラを使用して、オブザーバーを非同期的にサブスクリプションおよび サブスクリプション取り消しします。
そう、つまりSubscribeメソッドによるIObserver
たとえば以下のようなクラスを用意します。これは意図的にSubscribe処理で2秒スリープしています。
class MySubject : ISubject<int> { private Subject<int> source = new Subject<int>(); #region IObserver<int> メンバー public void OnCompleted() { this.source.OnCompleted(); } public void OnError(Exception error) { this.source.OnError(error); } public void OnNext(int value) { this.source.OnNext(value); } #endregion #region IObservable<int> メンバー public IDisposable Subscribe(IObserver<int> observer) { // Subscribeが奇跡的に重い場合!! Console.WriteLine("{0:HH:mm:ss.FFF} MySubject#Subscribe start", DateTime.Now); try { Console.WriteLine("Subscribeが重いよ"); Thread.Sleep(2000); return this.source.Subscribe(observer); } finally { Console.WriteLine("{0:HH:mm:ss.FFF} MySubject#Subscribe end", DateTime.Now); } } #endregion }
これを使って簡単なプログラムを書いてみます。
var source = new MySubject(); source .Subscribe(i => { Console.WriteLine("{0:HH:mm:ss.FFF} value: {1}", DateTime.Now, i); }); Console.WriteLine("OnNext(1)"); source.OnNext(1); Console.WriteLine("OnNext(2)"); source.OnNext(2); Console.WriteLine("Main Thread.Sleep(5000)"); Thread.Sleep(5000); Console.WriteLine("OnNext(3)"); source.OnNext(3);
これを実行すると以下のような実行結果になります。
23:12:06.474 MySubject#Subscribe start Subscribeが重いよ 23:12:08.486 MySubject#Subscribe end OnNext(1) 23:12:08.487 value: 1 OnNext(2) 23:12:08.487 value: 2 Main Thread.Sleep(5000) OnNext(3) 23:12:13.487 value: 3
このプログラムはSubscribe処理で2秒ほどプログラムが固まっています。(1行目と3行目のタイムスタンプを見てください)そのあと、OnNextに処理が流れていきます。これをSubscribeOnを使うとどうなるか見ていきます。プログラムを以下のように書き換えます。
var source = new MySubject(); source // Sbscribeをスレッドプールで行う .SubscribeOn(Scheduler.ThreadPool) .Subscribe(i => { Console.WriteLine("{0:HH:mm:ss.FFF} value: {1}", DateTime.Now, i); }); Console.WriteLine("OnNext(1)"); source.OnNext(1); Console.WriteLine("OnNext(2)"); source.OnNext(2); Console.WriteLine("Main Thread.Sleep(5000)"); Thread.Sleep(5000); Console.WriteLine("OnNext(3)"); source.OnNext(3);
コメントにもあるようにSubscribeOnメソッドの呼び出しを追加しています。これでSubscribeの処理を指定したスケジューラ上で実行するように指定しています。このコードを実行すると以下のような結果になります。
OnNext(1) OnNext(2) Main Thread.Sleep(5000) 23:15:10.16 MySubject#Subscribe start Subscribeが重いよ 23:15:12.168 MySubject#Subscribe end OnNext(3) 23:15:15.158 value: 3
SubscribeがThreadPoolで実行されるようになったため、メインスレッドからの値の発行はブロックされません。そのため、Subscribeメソッドで渡したデリゲートの処理が呼び出されているのは最後のOnNext(3)の呼び出しだけになります。(OnNext(3)の呼び出し前に5秒メインスレッドをスリープしているため、2秒かかるSubscribe処理が裏で完了しているため)
以上が、今日調べてわかったことでした。ということで詳しい人の降臨を待ちます。