かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その5「HotとCold」

過去記事インデックス

前回盛大に間違えてid:neueccさんに指摘していただいたホットなObservableですが、教えてもらった内容を踏まえて自分なりに書いてみました。また間違ってる理解の可能性があるので注意深く読んでください!

ColdなIObservableとHotなIObservable

これまでに紹介したIObservableのファクトリメソッドは全て共通の特徴があります。それは複数回Subscribeすると、それぞれのIObserverにIObservableが個別に値を発行します。IObservableの作り方にもよりますが、基本的には同じ値が発行されていきます。文章で説明するよりもコードの動作例で示します。

// 1秒間隔で値を発行するIObservable<long>を作成する
var source = Observable.Timer(
    TimeSpan.FromSeconds(1),
    TimeSpan.FromSeconds(1));
// 購読
var subscription1 = source.Subscribe(
    i => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 1##OnNext({1})", DateTime.Now, i),
    ex => Console.WriteLine("1##OnError({0})", ex.Message),
    () => Console.WriteLine("1##Completed()"));

// 3秒後にもう一度購読
Thread.Sleep(3000);
// 購読
var subscription2 = source.Subscribe(
    i => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 2##OnNext({1})", DateTime.Now, i),
    ex => Console.WriteLine("2##OnError({0})", ex.Message),
    () => Console.WriteLine("2##Completed()"));

Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();

上記のコードは1秒間隔で値を発行するIObservableを作成し、1回Subscribeしたあとに3秒待ってもう一度Subscribeしています。この実行結果を下記に示します。

2011/11/07 23:05:01.883 1##OnNext(0)
2011/11/07 23:05:02.884 1##OnNext(1)
2011/11/07 23:05:03.879 1##OnNext(2)
2011/11/07 23:05:04.877 1##OnNext(3)
2011/11/07 23:05:04.893 2##OnNext(0)
2011/11/07 23:05:05.89 1##OnNext(4)
2011/11/07 23:05:05.894 2##OnNext(1)
2011/11/07 23:05:06.877 1##OnNext(5)
2011/11/07 23:05:06.892 2##OnNext(2)
2011/11/07 23:05:07.873 1##OnNext(6)
2011/11/07 23:05:07.894 2##OnNext(3)

最初にSubscribeした方にはyyyy/MM/dd HH:mm:ss.FFF 1##OnNext(値)の形式で出力しています。二番目にSubscribeした方にはyyyy/MM/dd HH:mm:ss.FFF 2##OnNext(値)の形式で出力しています。これを見ると、最初にSubscribeしたものと二回目にSubscribeしたものでは、タイムスタンプが微妙にずれていることから、内部的には別のタイマーが割り当てられていることが見て取れます。また、発行される値もSubscribeした時点で0からカウントされていて最初にSubscribeしたものと二回目にSubscribeしたものの間に関係性は全くありません。このように、ColdなObservableは「複数回Subscribeした時にObserverごとに発行する値のシーケンスから値を発行する」という特徴があります。
では次に、Coldと対となるものとしてHotなIObservableを見ていきます。HotなObservableの代表として.NET組み込みのObserverパターンであるイベントからIObservableを生成するObservable.FromEvent(…)というメソッドがあります。このメソッドのシグネチャを下記に示します。

public static IObservable<TEventArgs> FromEvent<TDelegate, TEventArgs>(
    // Action<TEventArgs>からイベントハンドラの形へと変換する処理
    // Action<TEventArgs>はSubscribeしたときのOnNextの処理にあたる。
    Func<Action<TEventArgs>, TDelegate> conversion,
    // イベントハンドラを登録する処理
    Action<TDelegate> addHandler,
    // イベントハンドラの登録を解除する処理
    Action<TDelegate> removeHandler)

今まで出てきたもののなかではかなり異質の引数ですがイベントハンドラの形にあったデリゲートを作成してハンドラの登録処理と削除処理を渡します。あとはIObservableをSubscribeしたタイミングやDisposeしたタイミングで適切にイベントの登録・登録解除が行われます。自前でやるとイベントハンドラの登録解除は忘れがちな処理なので地味に有りがたい機能です。コード例を下記に示します。

// 1秒間隔で値を発行するTimer
var timer = new System.Timers.Timer(1000);
var source = Observable.FromEvent<ElapsedEventHandler, ElapsedEventArgs>(
    h => (s, e) => h(e),
    h => timer.Elapsed += h,
    h => timer.Elapsed -= h);
// タイマー開始
timer.Start();

// 購読
var subscription1 = source.Subscribe(
    e => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 1##OnNext({1:yyyy/MM/dd HH:mm:ss.FFF})",DateTime.Now, e.SignalTime),
    ex => Console.WriteLine("1##OnError({0})", ex.Message),
    () => Console.WriteLine("1##Completed()"));

// 3秒後にもう一度購読
Thread.Sleep(3000);
// 購読
var subscription2 = source.Subscribe(
    e => Console.WriteLine("{0:yyyy/MM/dd HH:mm:ss.FFF} 2##OnNext({1:yyyy/MM/dd HH:mm:ss.FFF})",DateTime.Now, e.SignalTime),
    ex => Console.WriteLine("2##OnError({0})", ex.Message),
    () => Console.WriteLine("2##Completed()"));

Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();
timer.Stop();

ColdなObservableで示した処理と基本的には同じ処理を行っています。実行結果を下記に示します。

2011/11/07 23:37:41.146 1##OnNext(2011/11/07 23:37:41.146)
2011/11/07 23:37:42.16 1##OnNext(2011/11/07 23:37:42.16)
2011/11/07 23:37:43.174 1##OnNext(2011/11/07 23:37:43.174)
2011/11/07 23:37:43.174 2##OnNext(2011/11/07 23:37:43.174)
2011/11/07 23:37:44.188 1##OnNext(2011/11/07 23:37:44.188)
2011/11/07 23:37:44.188 2##OnNext(2011/11/07 23:37:44.188)
2011/11/07 23:37:45.202 1##OnNext(2011/11/07 23:37:45.202)
2011/11/07 23:37:45.202 2##OnNext(2011/11/07 23:37:45.202)
2011/11/07 23:37:46.216 1##OnNext(2011/11/07 23:37:46.216)
2011/11/07 23:37:46.216 2##OnNext(2011/11/07 23:37:46.216)

上記の結果で興味深い点は、最初にSubscribeしたものと、二番目にSubscribeしたものの出力結果のタイムスタンプと、OnNextに渡ってきている値が同じという点です。
つまり、HotなIObservableとはColdなIObservableと違って同じ値を同時にIObserverに通知するものということになります。

次回からは、HotなIObservableを作成するメソッドをいくつか確認していこうと思います。