過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
はじめに
はるか昔にやったColdとHotなObservable。今回はColdからHotへ変換して遊んでみました。
ColdからHotへ変換する拡張メソッド
ここでは、ColdなIObservable
Publishメソッド
ここでは、Publishメソッドについて説明します。Publishメソッドは、ColdなIObservable
public static IConnectableObservable<T> Publish<T>(this IObservable<T> source);
Publishメソッドの戻り値のIConnectableObservable
IDisposable Connect();
Connectメソッドは、戻り値のIDisposableに対してDisposeメソッドが呼び出されるまでHotなIObservable
コード例を下記に示します。
// coldなObservableの作成 var source = Observable.Range(1, 3); // 2回購読 Console.WriteLine("# Subscribe1"); source.Subscribe( i => Console.WriteLine("Subscribe1#OnNext: {0}", i), () => Console.WriteLine("Subscribe1#OnCompleted")); Console.WriteLine("# Subscribe2"); source.Subscribe( i => Console.WriteLine("Subscribe2#OnNext: {0}", i), () => Console.WriteLine("Subscribe2#OnCompleted")); Console.WriteLine("--"); // PublishでHotなObservableに Console.WriteLine("# Publish"); var connectableSource = source.Publish(); // 2回購読 Console.WriteLine("# Subscribe1"); connectableSource.Subscribe( i => Console.WriteLine("Subscribe1#OnNext: {0}", i), () => Console.WriteLine("Subscribe1#OnCompleted")); Console.WriteLine("# Subscribe2"); connectableSource.Subscribe( i => Console.WriteLine("Subscribe2#OnNext: {0}", i), () => Console.WriteLine("Subscribe2#OnCompleted")); // Connectで購読しているObserverに値を流す Console.WriteLine("# Connect"); connectableSource.Connect(); // Connect後に購読したときの動作確認 Console.WriteLine("# Subscribe3"); connectableSource.Subscribe( i => Console.WriteLine("Subscribe3#OnNext: {0}", i), () => Console.WriteLine("Subscribe3#OnCompleted"));
まず、Rangeメソッドで0〜2の値を発行するColdなIObservable
上記コードの実行結果を以下に示します。
# Subscribe1 Subscribe1#OnNext: 1 Subscribe1#OnNext: 2 Subscribe1#OnNext: 3 Subscribe1#OnCompleted # Subscribe2 Subscribe2#OnNext: 1 Subscribe2#OnNext: 2 Subscribe2#OnNext: 3 Subscribe2#OnCompleted -- # Publish # Subscribe1 # Subscribe2 # Connect Subscribe1#OnNext: 1 Subscribe2#OnNext: 1 Subscribe1#OnNext: 2 Subscribe2#OnNext: 2 Subscribe1#OnNext: 3 Subscribe2#OnNext: 3 Subscribe1#OnCompleted Subscribe2#OnCompleted # Subscribe3 Subscribe3#OnCompleted
IConnectableObservable
Publishの動作を確認するために、もう1つコード例を見ていきます。今度はIntervalメソッドを使って500msごとに0からカウントアップしていくColdなIObservable
// 500ms間隔で0からカウントアップしていく値を発行するIObservableを作成 Console.WriteLine("# Create source(ConnectableObservable)"); var source = Observable .Interval(TimeSpan.FromMilliseconds(500)) .Publish(); // Connectで値の放流開始 Console.WriteLine("# Connect"); using (source.Connect()) { // 2秒間何もせずに待つ Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); // 2秒間購読 Console.WriteLine("# Subscribe1"); using ( source.Subscribe( i => Console.WriteLine("Subscribe1#OnNext: {0}", i), () => Console.WriteLine("Subscribe1#OnCompleted"))) { Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); } Console.WriteLine("# UnSubscribe1"); } // Disposeが呼ばれるのでConnectが解除される Console.WriteLine("# DisConnect"); // Connectが解除された状態で2秒間購読する Console.WriteLine("# Subscribe2"); using ( source.Subscribe( i => Console.WriteLine("Subscribe2#OnNext: {0}", i), () => Console.WriteLine("Subscribe2#OnCompleted"))) { Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); } Console.WriteLine("# UnSubscribe2"); // 再接続 Console.WriteLine("# Connect"); using (source.Connect()) { // 再接続で発行される値を確認するため2秒間購読する Console.WriteLine("# Subscribe3"); using ( source.Subscribe( i => Console.WriteLine("Subscribe3#OnNext: {0}", i), () => Console.WriteLine("Subscribe3#OnCompleted"))) { Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); } Console.WriteLine("# UnSubscribe3"); } // Disposeが呼ばれるので接続解除 Console.WriteLine("# DisConnect"); // 接続解除状態で2秒間待機して様子を見る Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000);
実行結果を以下に示します。
# Create source(ConnectableObservable) # Connect Sleep 2sec... # Subscribe1 Sleep 2sec... Subscribe1#OnNext: 4 Subscribe1#OnNext: 5 Subscribe1#OnNext: 6 # UnSubscribe1 # DisConnect # Subscribe2 Sleep 2sec... # UnSubscribe2 # Connect # Subscribe3 Sleep 2sec... Subscribe3#OnNext: 0 Subscribe3#OnNext: 1 Subscribe3#OnNext: 2 # UnSubscribe3 # DisConnect Sleep 2sec...
Connectをして2秒経過してからの購読(Subscribe1)で値が4から始まっていることが確認できます。このことから、Connectを呼び出したタイミングでIntervalから値が発行されていることがわかります。また、Connectを解除してからの購読(Subscribe2)では値が発行されていないことも確認できます。そして最後に、再度Connectしてから購読(Subscribe3)では、値が0から始まっていることが確認できます。このことから、再Connect時には元になるColdなIObservable
RefCountメソッド
ここでは、IConnectableObservable
public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source);
このメソッドの使用例を下記に示します。
// 500ms間隔で0から値をカウントアップしていくIObservableを作成 Console.WriteLine("# Create source(RefCount)"); var source = Observable .Interval(TimeSpan.FromMilliseconds(500)) // Hot .Publish() // 購読者がいる間Connect状態を保つ .RefCount(); // 2秒待機して様子身 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); // 購読開始 Console.WriteLine("# Subscribe1"); using ( source.Subscribe( i => Console.WriteLine("Subscribe1#OnNext: {0}", i), () => Console.WriteLine("Subscribe1#OnCompleted"))) { // 購読状態で2秒待機 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); // 追加で1秒間購読 Console.WriteLine("# Subscribe2"); using (source.Subscribe( i => Console.WriteLine("Subscribe2#OnNext: {0}", i), () => Console.WriteLine("Subscribe2#OnCompleted"))) { Console.WriteLine("Sleep 1sec..."); Thread.Sleep(1000); } // 1つ購読解除 Console.WriteLine("# UnSubscribe2"); // 1つ購読解除した状態で2秒待機 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); } // 購読解除(ここで購読者数が0になるのでConnectが解除される) Console.WriteLine("# UnSubscribe1"); // 2秒待機 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); // 新たに購読開始(Connect状態になる) Console.WriteLine("# Subscribe3"); using (source.Subscribe( i => Console.WriteLine("Subscribe3#OnNext: {0}", i), () => Console.WriteLine("Subscribe3#OnCompleted"))) { // 2秒待機 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); } // 購読解除 Console.WriteLine("# UnSubscribe3");
usingブロックを使って購読と購読解除を複数回行っています。最初の購読(Subscribe1)の後2秒待機して、次の購読(Subscribe2)をしています。Subscribe2は1秒後に購読を解除して、その2秒後にSubscribe1も購読を解除しています。そして、2秒待機したあと再び購読(Subscribe3)をしています。実行結果を以下に示します。
# Create source(RefCount) Sleep 2sec... # Subscribe1 Sleep 2sec... Subscribe1#OnNext: 0 Subscribe1#OnNext: 1 Subscribe1#OnNext: 2 # Subscribe2 Sleep 1sec... Subscribe1#OnNext: 3 Subscribe2#OnNext: 3 Subscribe1#OnNext: 4 Subscribe2#OnNext: 4 # UnSubscribe2 Sleep 2sec... Subscribe1#OnNext: 5 Subscribe1#OnNext: 6 Subscribe1#OnNext: 7 Subscribe1#OnNext: 8 # UnSubscribe1 Sleep 2sec... # Subscribe3 Sleep 2sec... Subscribe3#OnNext: 0 Subscribe3#OnNext: 1 Subscribe3#OnNext: 2 # UnSubscribe3
Subscribe1で値を出力している途中でSubscribe2も購読を開始しています。このときの出力を見ると同じ値が同時に複数の購読者に対して発行されているのでHotなIObservable