過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
前回盛大に間違えてid:neueccさんに指摘していただいたホットなObservableですが、教えてもらった内容を踏まえて自分なりに書いてみました。また間違ってる理解の可能性があるので注意深く読んでください!
ColdなIObservableとHotなIObservable
これまでに紹介したIObservable
// 1秒間隔で値を発行するIObservable<long>を作成する var source = Observable.Timer( TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); // 購読 var subscription1 = source.Subscribe( i => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 1##OnNext({1})", DateTime.Now, i), ex => Console.WriteLine("1##OnError({0})", ex.Message), () => Console.WriteLine("1##Completed()")); // 3秒後にもう一度購読 Thread.Sleep(3000); // 購読 var subscription2 = source.Subscribe( i => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 2##OnNext({1})", DateTime.Now, i), ex => Console.WriteLine("2##OnError({0})", ex.Message), () => Console.WriteLine("2##Completed()")); Console.ReadLine(); subscription1.Dispose(); subscription2.Dispose();
上記のコードは1秒間隔で値を発行するIObservable
2011/11/07 23:05:01.883 1##OnNext(0) 2011/11/07 23:05:02.884 1##OnNext(1) 2011/11/07 23:05:03.879 1##OnNext(2) 2011/11/07 23:05:04.877 1##OnNext(3) 2011/11/07 23:05:04.893 2##OnNext(0) 2011/11/07 23:05:05.89 1##OnNext(4) 2011/11/07 23:05:05.894 2##OnNext(1) 2011/11/07 23:05:06.877 1##OnNext(5) 2011/11/07 23:05:06.892 2##OnNext(2) 2011/11/07 23:05:07.873 1##OnNext(6) 2011/11/07 23:05:07.894 2##OnNext(3)
最初にSubscribeした方にはyyyy/MM/dd HH:mm:ss.FFF 1##OnNext(値)の形式で出力しています。二番目にSubscribeした方にはyyyy/MM/dd HH:mm:ss.FFF 2##OnNext(値)の形式で出力しています。これを見ると、最初にSubscribeしたものと二回目にSubscribeしたものでは、タイムスタンプが微妙にずれていることから、内部的には別のタイマーが割り当てられていることが見て取れます。また、発行される値もSubscribeした時点で0からカウントされていて最初にSubscribeしたものと二回目にSubscribeしたものの間に関係性は全くありません。このように、ColdなObservableは「複数回Subscribeした時にObserverごとに発行する値のシーケンスから値を発行する」という特徴があります。
では次に、Coldと対となるものとしてHotなIObservable
public static IObservable<TEventArgs> FromEvent<TDelegate, TEventArgs>( // Action<TEventArgs>からイベントハンドラの形へと変換する処理 // Action<TEventArgs>はSubscribeしたときのOnNextの処理にあたる。 Func<Action<TEventArgs>, TDelegate> conversion, // イベントハンドラを登録する処理 Action<TDelegate> addHandler, // イベントハンドラの登録を解除する処理 Action<TDelegate> removeHandler)
今まで出てきたもののなかではかなり異質の引数ですがイベントハンドラの形にあったデリゲートを作成してハンドラの登録処理と削除処理を渡します。あとはIObservable
// 1秒間隔で値を発行するTimer var timer = new System.Timers.Timer(1000); var source = Observable.FromEvent<ElapsedEventHandler, ElapsedEventArgs>( h => (s, e) => h(e), h => timer.Elapsed += h, h => timer.Elapsed -= h); // タイマー開始 timer.Start(); // 購読 var subscription1 = source.Subscribe( e => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 1##OnNext({1:yyyy/MM/dd HH:mm:ss.FFF})",DateTime.Now, e.SignalTime), ex => Console.WriteLine("1##OnError({0})", ex.Message), () => Console.WriteLine("1##Completed()")); // 3秒後にもう一度購読 Thread.Sleep(3000); // 購読 var subscription2 = source.Subscribe( e => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 2##OnNext({1:yyyy/MM/dd HH:mm:ss.FFF})",DateTime.Now, e.SignalTime), ex => Console.WriteLine("2##OnError({0})", ex.Message), () => Console.WriteLine("2##Completed()")); Console.ReadLine(); subscription1.Dispose(); subscription2.Dispose(); timer.Stop();
ColdなObservableで示した処理と基本的には同じ処理を行っています。実行結果を下記に示します。
2011/11/07 23:37:41.146 1##OnNext(2011/11/07 23:37:41.146) 2011/11/07 23:37:42.16 1##OnNext(2011/11/07 23:37:42.16) 2011/11/07 23:37:43.174 1##OnNext(2011/11/07 23:37:43.174) 2011/11/07 23:37:43.174 2##OnNext(2011/11/07 23:37:43.174) 2011/11/07 23:37:44.188 1##OnNext(2011/11/07 23:37:44.188) 2011/11/07 23:37:44.188 2##OnNext(2011/11/07 23:37:44.188) 2011/11/07 23:37:45.202 1##OnNext(2011/11/07 23:37:45.202) 2011/11/07 23:37:45.202 2##OnNext(2011/11/07 23:37:45.202) 2011/11/07 23:37:46.216 1##OnNext(2011/11/07 23:37:46.216) 2011/11/07 23:37:46.216 2##OnNext(2011/11/07 23:37:46.216)
上記の結果で興味深い点は、最初にSubscribeしたものと、二番目にSubscribeしたものの出力結果のタイムスタンプと、OnNextに渡ってきている値が同じという点です。
つまり、HotなIObservable
次回からは、HotなIObservable