過去記事インデックス
Observable.FromEventメソッド
では、HotなIObservable
// イベントを発行するクラス var eventSource = new EventSource(); var source = Observable.FromEvent<EventHandler, EventArgs>( h => (s, e) => h(e), // 普通は h => eventSource.Raised += h だけでいい h => { Console.WriteLine("add handler"); eventSource.Raised += h; }, // 普通は h => eventSource.Raised -= h だけでいい h => { Console.WriteLine("remove handler"); eventSource.Raised -= h; }); // 2回購読 var subscription1 = source.Subscribe( i => Console.WriteLine("1##OnNext({0})", i), ex => Console.WriteLine("1##OnError({0})", ex.Message), () => Console.WriteLine("1##Completed()")); var subscription2 = source.Subscribe( i => Console.WriteLine("2##OnNext({0})", i), ex => Console.WriteLine("2##OnError({0})", ex.Message), () => Console.WriteLine("2##Completed()")); // 2回呼び出してみる // 合計4回のOnNextが呼ばれるはず eventSource.OnRaised(); eventSource.OnRaised(); // Observableが発行する値の購読を停止 Console.WriteLine("dispose method call."); subscription1.Dispose(); subscription2.Dispose();
この例で使用しているEventSourceクラスは、RaisedというイベントとOnRaisedというイベントを発行するメソッドだけを持ったクラスで下記のように定義しています。
// イベント発行クラス class EventSource { public event EventHandler Raised; public void OnRaised() { var h = this.Raised; if (h != null) { h(this, EventArgs.Empty); } } }
このコードの実行結果を下記に示します。
add handler add handler 1##OnNext(System.EventArgs) 2##OnNext(System.EventArgs) 1##OnNext(System.EventArgs) 2##OnNext(System.EventArgs) dispose method call. remove handler remove handler
FromEventメソッドのイベントハンドラ登録処理とイベントハンドラの登録解除処理にログを出力するように仕込んだものが表示されています。このことから、Disposeを呼ぶときちんとイベントハンドラの登録解除が行われることがわかります。また、イベントが発行されたタイミングで2つ登録したObserverの両方に対して通知がいっていることも確認できます。
Observable.Startメソッド
次に、簡単にバックグラウンドの処理を記述できるStartメソッドについて説明します。StartメソッドはActionかFunc
// バックグラウンドで処理を開始 var source = Observable.Start(() => { Console.WriteLine("background task start."); Thread.Sleep(2000); Console.WriteLine("background task end."); return 1; }); // 購読 Console.WriteLine("subscribe1"); var subscription1 = source.Subscribe( i => Console.WriteLine("1##OnNext({0})", i), ex => Console.WriteLine("1##OnError({0})", ex.Message), () => Console.WriteLine("1##Completed()")); // 処理が確実に終わるように5秒待つ Console.WriteLine("sleep 5sec."); Thread.Sleep(5000); // Observableが発行する値の購読を停止 Console.WriteLine("dispose method call."); subscription1.Dispose(); // 購読 Console.WriteLine("subscribe2"); var subscription2 = source.Subscribe( i => Console.WriteLine("2##OnNext({0})", i), ex => Console.WriteLine("2##OnError({0})", ex.Message), () => Console.WriteLine("2##Completed()")); subscription2.Dispose();
このサンプルで特徴的なのが、Startメソッド内の処理が2秒で終わるにも関わらず5秒スリープした後にSubscribeをしている点です。通常の感覚では、既に処理が完了して値が発行された後なのでSubscribeしても何も起きないと考えられます。しかし、Startメソッドの戻り値のIObservable
この例の実行結果では、最初のSubscribeと二回目のSubscribeそれぞれで、OnNextとOnCompletedの処理が呼ばれます。実行結果を下記に示します。
subscribe1 sleep 5sec. ← ここで5秒スリープしている background task start. background task end. 1##OnNext(1) 1##Completed() dispose method call. subscribe2 ← このタイミングではStartメソッドの処理は終了している 2##OnNext(1) ← OnNextとOnCompletedが通知される 2##Completed()
Observable.ToAsyncメソッド
次は、ToAsyncメソッドを紹介します。このメソッドもStartメソッドと同様に重たい処理をバックグラウンドでやるために使用できます。Startメソッドとの違いはToAsyncの戻り値にあらわれています。シグネチャを下記に示します。
public static Func
引数に重たいことをやる処理を渡して、戻り値がIObservable
ここで示したメソッドのシグネチャは数十個あるオーバーロードの1つになります。ToAsyncにはほかにも戻り値が無いケースや引数が大量にあるケースに備えて膨大な数のオーバーロードがあります。完全なオーバーロードのリストについては下記のMSDNのリファレンスを参照してください。
Observable.ToAsync Method
コード例を下記に示します。
// 戻り値はFunc<IObservable<T>> var source = Observable.ToAsync(() => { Console.WriteLine("background task start."); Thread.Sleep(2000); Console.WriteLine("background task end."); return 1; }); // ToAsyncはデリゲートを返すのでInvoke() or ()をしないと処理が開始されない Console.WriteLine("source() call."); var invokedSource = source.Invoke(); var subscription1 = invokedSource.Subscribe( i => Console.WriteLine("1##OnNext({0})", i), ex => Console.WriteLine("1##OnError({0})", ex.Message), () => Console.WriteLine("1##Completed()")); // 処理が確実に終わるように5秒待つ Console.WriteLine("sleep 5sec."); Thread.Sleep(5000); // Observableが発行する値の購読を停止 Console.WriteLine("dispose method call."); subscription1.Dispose(); // 購読 Console.WriteLine("subscribe2"); var subscription2 = invokedSource.Subscribe( i => Console.WriteLine("2##OnNext({0})", i), ex => Console.WriteLine("2##OnError({0})", ex.Message), () => Console.WriteLine("2##Completed()")); subscription2.Dispose();
ポイントは、ToAsyncの戻り値に対してInvokeをしているところです。ここで初めてToAsyncに渡した処理の実行がはじまります。実行結果を下記に示します。
source() call. background task start. sleep 5sec. background task end. 1##OnNext(1) 1##Completed() dispose method call. subscribe2 2##OnNext(1) 2##Completed()
ここでもStartメソッドの例と同じようにToAsyncの処理が終わった後にSubscribeしているにも関わらずOnNextとOnCompletedが呼ばれていることがわかります。非同期で処理を行うIObservable
Observable.FromAsyncPatternメソッド
これで、一連のファクトリメソッドの紹介は最後になります。最後を飾るのはFromAsyncPatternメソッドです。このメソッドは名前が示す通り.NET Frameworkで使われている非同期呼び出しのパターンからIObservable
// 重たい処理 Func<int, int, int> asyncProcess = (x, y) => { Console.WriteLine("process start."); Thread.Sleep(2000); Console.WriteLine("process end."); return x + y; }; // 因みに非同期呼び出しは普通に書くとこんな感じ // asyncProcess.BeginInvoke( // 10, 2, // ar => // { // var ret = asyncProcess.EndInvoke(ar); // // do something // }, // null); var asyncPattern = Observable.FromAsyncPattern<int, int, int>( asyncProcess.BeginInvoke, asyncProcess.EndInvoke); var source = asyncPattern(10, 2); // 処理中に購読開始 Console.WriteLine("subscribe2"); var subscription1 = source.Subscribe( i => Console.WriteLine("1##OnNext({0})", i), ex => Console.WriteLine("1##OnError({0})", ex.Message), () => Console.WriteLine("1##Completed()")); // 確実に処理が終わるように5秒待つ Console.WriteLine("sleep 5sec"); Thread.Sleep(5000); // Observableが発行する値の購読を停止 Console.WriteLine("dispose method call."); subscription1.Dispose(); // 処理が完了したあとに購読 Console.WriteLine("subscribe2"); var subscription2 = source.Subscribe( i => Console.WriteLine("2##OnNext({0})", i), ex => Console.WriteLine("2##OnError({0})", ex.Message), () => Console.WriteLine("2##Completed()")); // 購読解除 Console.WriteLine("dispose method call."); subscription2.Dispose();
普通はコールバックで書く非同期呼び出しをIObservable
process start. subscribe2 sleep 5sec process end. 1##OnNext(12) 1##Completed() dispose method call. subscribe2 2##OnNext(12) 2##Completed() dispose method call.
ここでも、処理が終わった後にSubscribeをしてもOnNextとOnCompletedが呼ばれていることがわかります。
次回
次回からは、IObservableに定義されている拡張メソッドについて紹介します。この拡張メソッドがあるからこそReactive Extensionsを強力たらしめていると思っています。少しずつ楽しい説明が出来るように頑張ります!