過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
IObservableの合成
ここでは、IObservable
Reactive Extensionsでは、時間、イベント、非同期などをIObservable
Mergeメソッド
ここでは、Mergeメソッドについて説明します。Mergeメソッドは名前の通り複数のIObservable
public static IObservable<T> Merge<T>( this IObservable<T> first, IObservable<T> second);
このメソッドは、左辺値のIObservable
Console.WriteLine("# 2つのIObservable<T>の統合"); Observable // 0〜2の値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 1〜3の値に変換 .Select(i => i + 1) // マージ(統合) .Merge( Observable // 0〜2の値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 10, 20, 30に変換 .Select(i => (i + 1) * 10)) // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // Enter押すまで待機 Console.ReadLine();
1秒間隔で1, 2, 3の値を発行するIObservable
# 2つのIObservable<T>の統合 OnNext: 1 OnNext: 10 OnNext: 2 OnNext: 20 OnNext: 3 OnNext: 30 OnCompleted
上記の結果からMergeメソッドではleftとrightで渡したIObservable
次に、複数のIObservable
public static IObservable<T> Merge<T>(params IObservable<T>[] sources);
引数のparamsで渡したIObservable
Console.WriteLine("# 複数のIObservable<T>の統合"); // Observable.Merge<T>(params IObservable<T>[] sources) Observable.Merge( // 3つのIObservable<T>をマージ Observable.Range(0, 3), Observable.Range(10, 3), Observable.Range(100, 3)) // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // Enter押すまで待機 Console.ReadLine();
上記のコードは3つのIObservable
# 複数のIObservable<T>の統合 OnNext: 0 OnNext: 10 OnNext: 100 OnNext: 1 OnNext: 11 OnNext: 101 OnNext: 2 OnNext: 12 OnNext: 102 OnCompleted
3つのIObservable
このメソッドで興味深いオーバーロードとして以下のようなものがあります。
public static IObservable<T> Merge<T>(this IObservable<IObservable<T>> sources);
IObservable
Console.WriteLine("# 複数のIObservable<T>の統合2"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source // IObservable<int>からIObservable<IObservable<int>>に変換 .Select(i => Observable // 0〜2の値を1秒間隔で3つ発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 値を変換 .Select(l => (l + 1) * i)) // IObservable<IObservable<T>>からIObservable<T>へマージ .Merge() // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); // Enter押すまで待機 Console.ReadLine();
Subject
# 複数のIObservable<T>の統合2 # OnNext(10) # OnNext(100) # OnNext(1000) # OnCompleted OnNext: 10 OnNext: 100 OnNext: 1000 OnNext: 200 OnNext: 20 OnNext: 2000 OnNext: 300 OnNext: 3000 OnNext: 30 OnCompleted
3つのIObservable
SelectManyメソッド
ここではSelectManyメソッドについて説明します。SelectManyメソッドには様々なオーバーロードがありますが、一番使用頻度の高いオーバーロードは以下のものになります。
public static IObservable<R> SelectMany<T, R>( this IObservable<T> source, Func<T, IObservable<R>> selector);
sourceから発行された値を受け取ってselectorで渡したデリゲート処理を行いIObservable
Console.WriteLine("# 複数のIObservable<T>の統合(SelectMany)"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source // 発行された値からIObservable<T>を作成してマージ(統合)する .SelectMany(i => Observable // 0〜2の値を1秒間隔で3つ発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 値を変換 .Select(l => (l + 1) * i)) // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); // Enter押すまで待機 Console.ReadLine();
IObservable
SelectMany(i => Observable.Return(i)) → Select(i => Observable.Return(i)).Merge()
実行結果を以下に示します。
>|||
# 複数のIObservable
# OnNext(10)
# OnNext(100)
# OnNext(1000)
# OnCompleted
OnNext: 10
OnNext: 100
OnNext: 1000
OnNext: 20
OnNext: 200
OnNext: 2000
OnNext: 30
OnNext: 3000
OnNext: 300
OnCompleted
|
Mergeメソッドと同様の結果になっていることが確認できます。
ここで説明したSelectManyメソッドのオーバーライド以外に以下のようなオーバーライドがあります。
// IObservable<T>がIEnumerable<T>になった感じ public static IObservable<R> SelectMany<T, R>( this IObservable<T> source, Func<T, IEnumerable<R>> selector); // sourceから発行された値に関係なくIObservable<T>を作成する場合 public static IObservable<O> SelectMany<T, O>( this IObservable<T> source, IObservable<O> other); // sourceから発行された値を元にIEnumerable<TCollection>を作り、sourceから発行された値と // IEnumerable<TCollection>の値を元に結果を作成する。その結果を後続に流す。 public static IObservable<R> SelectMany<T, TCollection, R>( this IObservable<T> source, Func<T, IEnumerable<TCollection>> collectionSelector, Func<T, TCollection, R> resultSelector); // sourceから発行された値を元にIObservable<TCollection>を作り、sourceから発行された値と // IObservable<TCollection>の値を元に結果を作成する。その結果を後続に流す。 public static IObservable<R> SelectMany<T, TCollection, R>( this IObservable<T> source, Func<T, IObservable<TCollection>> collectionSelector, Func<T, TCollection, R> resultSelector); // onNext, onError, onCompletedのそれぞれでIObservable<R>を作成して、マージして返す。 public static IObservable<R> SelectMany<T, R>( this IObservable<T> source, Func<T, IObservable<R>> onNext, Func<Exception, IObservable<R>> onError, Func<IObservable<R>> onCompleted);
全てのオーバーロードについてのコード例はここでは割愛します。ここでは上記のメソッドの4番目のコードの使用例を下記に示します。
Console.WriteLine("# 複数のIObservable<T>の統合(SelectManyのオーバーロード)"); var source = new Subject<int>(); source .SelectMany( // sourceから発行された値を元にi, i+1, i+2の値を発行するIObservable<int>を返す i => Observable.Range(i, 3), // sourceから発行された値とi => Observable.Range(i, 3)で生成された値を // 使って最終的に発行される値を作成する。ここでは匿名型にしただけ。 (x, y) => new { x, y }) .Subscribe(Console.WriteLine); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); // Enter押すまで待機 Console.ReadLine();
上記のコードの実行結果を以下に示します。
# 複数のIObservable<T>の統合(SelectManyのオーバーロード) # OnNext(10) { x = 10, y = 10 } { x = 10, y = 11 } { x = 10, y = 12 } # OnNext(100) { x = 100, y = 100 } { x = 100, y = 101 } { x = 100, y = 102 } # OnNext(1000) { x = 1000, y = 1000 } { x = 1000, y = 1001 } { x = 1000, y = 1002 } # OnCompleted
sourceから発行された値と、sourceから発行された値を元に作成した結果のIObservable
Switchメソッド
ここではSwitchメソッドについて説明します。Switchメソッドは、IObservable
public static IObservable<T> Switch<T>(this IObservable<IObservable<T>> sources);
Mergeメソッドとの違いを確認するために、MergeメソッドとSwitchメソッドで同じ処理を書いて比較します。まず、Mergeメソッドのコード例を下記に示します。
Console.WriteLine("# 複数のIObservable<T>の統合(Merge)"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source .Select(i => Observable // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 値を変換 .Select(l => (l + 1) * i)) // 統合 .Merge() .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); Console.ReadLine();
若干複雑ですが、時間間隔をあけて複数のIObservable
# 複数のIObservable<T>の統合(Merge) # OnNext(10) Sleep 2000ms... OnNext: 10 OnNext: 20 # OnNext(100) Sleep 2000ms... OnNext: 30 OnNext: 100 # OnNext(1000) Sleep 2000ms... OnNext: 200 OnNext: 1000 OnNext: 300 # OnCompleted OnNext: 2000 OnNext: 3000 OnCompleted
10, 100, 1000の3つの値を発行して、そこからさらに3つの値を発行するIObservable
Console.WriteLine("# 複数のIObservable<T>の統合(Switch)"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source .Select(i => Observable // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) .Select(l => (l + 1) * i)) // 最後 .Switch() .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); Console.ReadLine();
このコードの実行結果を以下に示します。
# 複数のIObservable<T>の統合(Switch) # OnNext(10) Sleep 2000ms... OnNext: 10 OnNext: 20 # OnNext(100) Sleep 2000ms... OnNext: 100 # OnNext(1000) Sleep 2000ms... OnNext: 1000 # OnCompleted OnNext: 2000 OnNext: 3000 OnCompleted
上記の実行結果から確認できるように、Mergeの時と異なって9個の値を発行しているにも関わらず、最終的にOnNextまで値が流れているのが6つになっています。これは、Mergeメソッドが元になるIObservable