過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
はじめに
えっと、Publish系のColdからHotへ変換するメソッドで1つ忘れてました…。Replayメソッド。こいつだけ名前違うんだもんっ!!
Replayメソッド
ここではReplayメソッドについて説明します。Replayメソッドは、購読した対象に今まで発行した値を全て流すという特徴があります。このため、Connect後暫く放置してSubscribeをすると一気に値が流れてきます。このメソッドのシグネチャを以下に示します。
public static IConnectableObservable<T> Replay<T>(this IObservable<T> source);
このメソッドの使用例を下記に示します。
// 500ms間隔で0から値をカウントアップしていく var source = Observable .Interval(TimeSpan.FromMilliseconds(500)) // ReplayでHotに変換 .Replay(); // 購読 Console.WriteLine("# Subscribe1"); using ( source.Subscribe( i => Console.WriteLine("Subscribe1#OnNext: {0}", i))) { // 接続 Console.WriteLine("# Connect"); source.Connect(); // 2秒待機 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); // 購読2 Console.WriteLine("# Subscribe2"); using ( source.Subscribe( i => Console.WriteLine("Subscribe1#OnNext: {0}", i))) { // 1秒待機 Console.WriteLine("Sleep 1sec..."); Thread.Sleep(1000); } // 購読2解除 Console.WriteLine("# UnSubscribe2"); // 2秒待機 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); } // 購読解除 Console.WriteLine("# UnSubscribe1");
Replayメソッドで作成したIConnectableObservable
# Subscribe1 # Connect Sleep 2sec... Subscribe1#OnNext: 0 Subscribe1#OnNext: 1 Subscribe1#OnNext: 2 Subscribe1#OnNext: 3 # Subscribe2 Subscribe1#OnNext: 0 Subscribe1#OnNext: 1 Subscribe1#OnNext: 2 Subscribe1#OnNext: 3 Sleep 1sec... Subscribe1#OnNext: 4 Subscribe1#OnNext: 4 Subscribe1#OnNext: 5 Subscribe1#OnNext: 5 # UnSubscribe2 Sleep 2sec... Subscribe1#OnNext: 6 Subscribe1#OnNext: 7 Subscribe1#OnNext: 8 Subscribe1#OnNext: 9 # UnSubscribe1
2回目のSubscribe(# Subscribe2が表示されている箇所)でどのようにOnNextに値が渡っているかに注目をしてください。Subscribeと同時に一気に0〜3の値が発行されていることが確認できます。このことから、Replayメソッドの挙動である、発行済みの値を購読した時点で一気に流すということが確認できます。
MulticastメソッドでのReplayメソッドの再現
ほかのPublish系メソッドと同様にReplayメソッドもMulticastメソッドで再現することができます。ReplaySubject
Console.WriteLine("# Multicast(new ReplaySubject<long>())"); var source = Observable .Interval(TimeSpan.FromMilliseconds(500)) // Replayと一緒 .Multicast(new ReplaySubject<long>()); // 接続 Console.WriteLine("# Connect"); source.Connect(); // 2秒待機 Console.WriteLine("Sleep 2sec..."); Thread.Sleep(2000); // 購読 Console.WriteLine("# Subscribe"); source.Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted"));
ReplaySubject
# Multicast(new ReplaySubject<long>()) # Connect Sleep 2sec... # Subscribe OnNext: 0 OnNext: 1 OnNext: 2 OnNext: 3
Subscribeをした時点で、これまで発行された値が一気にOnNextに渡っていることが確認できます。