過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
はじめに
前回に引き続き、どんどんIObservable
Concatメソッド
ここでは、Concatメソッドについて説明します。Concatメソッドは、その名前の通り複数のIObservable
// 引数で渡した全てのIObservable<T>のシーケンスを繋ぐ public static IObservable<T> Concat<T>(params IObservable<T>[] sources); // IEnumerable<IObservable<T>>から取得できるすべてのIObservable<T>を繋ぐ public static IObservable<T> Concat<T>(this IEnumerable<IObservable<T>> sources); // IObservable<IObservable<T>>から発行されるすべてのIObservable<T>を繋ぐ public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> sources); // firstとsecondを繋ぐ public static IObservable<T> Concat<T>(this IObservable<T> first, IObservable<T> second);
Concatメソッドの動作を示すためのコード例を下記に示します。
Observable // IObservable<T>のシーケンスを直列につなげる .Concat( // "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "1st: " + i).Take(3), // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "2nd: " + i).Take(3), // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "3rd: " + i).Take(3)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservable
OnNext: 1st: 0 OnNext: 1st: 1 OnNext: 1st: 2 OnNext: 2nd: 0 OnNext: 2nd: 1 OnNext: 2nd: 2 OnNext: 3rd: 0 OnNext: 3rd: 1 OnNext: 3rd: 2 OnCompleted
前のIObservable
// "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => "1st: " + i) .Take(3) .Concat( // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => "2nd: " + i) .Take(3)) .Concat( // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => "3rd: " + i) .Take(3)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
実行結果は同じため省略します。
Zipメソッド
ここでは、Zipメソッドについて説明します。Zipメソッドは、2つのIObservable
public static IObservable<R> Zip<T, U, R>( this IObservable<T> first, IObservable<U> second, Func<T, U, R> resultSelector);
firstから発行された値とsecondから発行された値をresultSelectorで変換して後ろに流します。また、firstかsecondのどちらかのIObservable
// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ Observable.Interval(TimeSpan.FromSeconds(1)).Take(3) // Zipでまとめる .Zip( // 100, 101, 102を発行するIObservable<T>のシーケンス Observable.Range(100, 3), // 渡された値を元に文字列化 (l, r) => string.Format("{0}-{1}", l, r)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
1秒間隔で0, 1, 2の値を発行するIObservable
OnNext: 0-100 OnNext: 1-101 OnNext: 2-102 OnCompleted
Zipメソッドで合成された3つの値がOnNextに渡ってきていることが確認できます。上記のプログラムを少し変更して片方のIObservable
// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ Observable.Interval(TimeSpan.FromSeconds(1)).Take(3) // Zipでまとめる .Zip( // 100を発行するIObservable<T>のシーケンス Observable.Return(100), // 渡された値を元に文字列化 (l, r) => string.Format("{0}-{1}", l, r)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
IntervalメソッドからTake(3)で3つの値を発行するIObservable
OnNext: 0-100 OnCompleted
OnNextが1回しか実行されていないことが確認できます。このことからZipメソッドが、合成対象のIObservable
Ambメソッド
ここではAmbメソッドについて説明します。Ambメソッドは複数のIObservable
// 引数で渡した全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す public static IObservable<T> Amb<T>(params IObservable<T>[] sources); // sources内の全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す public static IObservable<T> Amb<T>(this IEnumerable<IObservable<T>> sources); // firstとsecondのうち最初に値を発行したものの値を後ろに流す public static IObservable<T> Amb<T>(this IObservable<T> first, IObservable<T> second);
他の合成系メソッドと同様に可変長引数やIEnumerable
Observable.Amb( // 3秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => "3sec"), // 10秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(10)).Select(_ => "10sec"), // 2秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(2)).Select(_ => "2sec"), // 6秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(6)).Select(_ => "6sec"), // 22秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(22)).Select(_ => "22sec")) .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted"));
Timerメソッドを使って、指定した時間が経過した後に、値を発行するIObservable
OnNext: 2sec OnCompleted
引数で渡した中で一番早く値を発行するObservable.Timer(TimeSpan.FromSecond(2)).Select(_ => “2sec”)の結果がOnNextやOnCompletedに渡っていることが確認できます。
CombineLatestメソッド
ここでは、CombineLatestメソッドについて説明します。CombineLatestメソッドはZipメソッドのように2つのIObservable
public static IObservable<R> CombineLatest<F, S, R>( this IObservable<F> first, IObservable<S> second, Func<F, S, R> resultSelector);
このメソッドのコード例を下記に示します。
var source1 = new Subject<string>(); var source2 = new Subject<int>(); source1 // source1とsource2を合成 .CombineLatest( source2, // 発行された値を結合して文字列化 (l, r) => string.Format("{0}-{1}", l, r)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); // 適当に値を発行する Console.WriteLine("source1.OnNext(foo)"); source1.OnNext("foo"); Console.WriteLine("source2.OnNext(100)"); source2.OnNext(100); Console.WriteLine("source2.OnNext(200)"); source2.OnNext(200); Console.WriteLine("source1.OnNext(bar)"); source1.OnNext("bar"); Console.WriteLine("source1.OnNext(boo)"); source1.OnNext("boo"); // source1完了 Console.WriteLine("source1.OnCompleted()"); source1.OnCompleted(); // source1完了後にsource2から値を発行する Console.WriteLine("source2.OnNext(200)"); source2.OnNext(999); // source2完了 Console.WriteLine("source2.OnCompleted()"); source2.OnCompleted();
このコードの実行結果を以下に示します。
source1.OnNext(foo) source2.OnNext(100) OnNext: foo-100 source2.OnNext(200) OnNext: foo-200 source1.OnNext(bar) OnNext: bar-200 source1.OnNext(boo) OnNext: boo-200 source1.OnCompleted() source2.OnNext(200) OnNext: boo-999 source2.OnCompleted() OnCompleted
最初のOnNextでは、source1とsource2の値が両方発行されるまで実行されませんが、それ以降は、source1かsource2のどちらかから値が発行されるとOnNextに値が発行されていることが確認できます。このことから、CombineLatestメソッドの特徴である、最後に発行された値をキャッシュしているということがわかります。そして、CombineLatestメソッドは両方のシーケンスが完了状態になった時点で完了通知を行うという動作も確認できます。これは、片方が完了しても、もう一方から値が発行されたら最後に発行された値を元に処理を継続できるためです。