かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」

過去記事インデックス

はじめに

今回も、前回に続いて合成する関係のメソッドを使ってみようと思います。

StartWithメソッド

ここでは、StartWithメソッドについて説明します。StartWithメソッドのシグネチャを以下に示します。

public static IObservable<T> StartWith<T>(this IObservable<T> source, params T[] values);

このメソッドは、sourceで渡したIObservableのシーケンスの先頭にvaluesで指定した要素を合成して1つのIObservableのシーケンスにします。このメソッドの使用例を下記に示します。

Observable
    // 1〜3の値を発行するIObservable<T>のシーケンス
    .Range(1, 3)
    // 頭に10, 20, 30をつける
    .StartWith(10, 20, 30)
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i));

Rangeメソッドを使って1〜3の値を発行するIObservableのシーケンスを作成して、StartWithメソッドで10, 20, 30の値を先頭に追加して、Subscribeメソッドで購読をしています。このメソッドの実行例を以下に示します。

OnNext: 10
OnNext: 20
OnNext: 30
OnNext: 1
OnNext: 2
OnNext: 3

Joinメソッド

ここでは、Joinメソッドについて説明します。Joinメソッドは2つのIObservableのシーケンスから発行された値の全ての組み合わせをもとに、値を生成して発行するIObservableのシーケンスを作成します。このメソッドのシグネチャを以下に示します。

public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, TRight, TResult> resultSelector);

Joinメソッドは、leftとrightから値が発行されると、それぞれleftDurationSelector引数とrightDurationSelector引数で指定したデリゲートが呼ばれて、その値の有効期間を示すIObservableが生成されます。この有効期間を示すIObservableから値が発行されたらleftとrightから発行された対象となる値はJoinメソッドで使われなくなります。そして、leftとrightから値が発行されると、現在有効な値の全てを使ってresultSelectorが呼び出されます。例えばleftから発行された値で有効なものが(1, 2, 3)でrightから発行された値で有効なものが(10, 20, 30)の場合は、resultSelectorは(1, 10)(1, 20)(1, 30)(2, 10)(2, 20)(2, 30)(3, 10)(3, 20)(3, 30)の組み合わせの引数で呼び出されます。ここで生成された値がJoinメソッドの結果のIObservableから発行される値になります。
このメソッドのコード例を下記に示します。

// Joinで合成するIObservable<T>
var left = new Subject<int>();
var right = new Subject<int>();

left.Join(
    right,
    // leftから発行される値の有効期間は永久
    _ => Observable.Never<Unit>(),
    // rightから発行される値の有効期間は永久
    _ => Observable.Never<Unit>(),
    // 発行された値の組を作る
    Tuple.Create)
    // 購読
    .Subscribe(
        // 組を表示
        tuple => Console.WriteLine("Left: {0}, Right: {1}", tuple.Item1, tuple.Item2),
        // 完了を表示
        () => Console.WriteLine("OnCompleted"));

// 値の発行
Console.WriteLine("left.OnNext(1)");
left.OnNext(1);
Console.WriteLine("right.OnNext(10)");
right.OnNext(10);
Console.WriteLine("right.OnNext(100)");
right.OnNext(100);
Console.WriteLine("left.OnNext(2)");
left.OnNext(2);

// 終了
Console.WriteLine("left.OnCompleted()");
left.OnCompleted();
Console.WriteLine("right.OnCompleted()");
right.OnCompleted();

Joinメソッドを使って2つのIObservableのシーケンスから発行される値を組(Tuple)にしています。Joinの引数で渡したIObservableのシーケンスから発行される値の有効期間は、Observable.Neverメソッドを使って永遠にOnNextの呼ばれないIObservableのシーケンスを指定しているため、永遠に無効にならないようにしています。Subscribeでは組の値を表示しています。このコードの実行結果を以下に示します。

left.OnNext(1)	← この段階ではleftに1があるだけ
right.OnNext(10)	← この段階でleftに1, rightに10があるため(1, 10)の組が生成される
Left: 1, Right: 10
right.OnNext(100)	← この段階でleftに1, rightに10, 100があるため (1, 100)の組が追加で生成される
Left: 1, Right: 100
left.OnNext(2)	← この段階でleftに1, 2, rightに10, 100があるため(2, 10), (2, 100)の組が追加で生成される
Left: 2, Right: 10
Left: 2, Right: 100
left.OnCompleted()
right.OnCompleted()
OnCompleted	← rightとleftが両方終了したため、これ以降新たな組み合わせが出来ないため終了

この例のように値の有効期間にObservable.Neverを使うと、leftとrightから発行された全ての組合わせを生成することが出来ます。Never以外の値の有効期間を指定したときのコード例を下記に示します。

// Joinで合成するIObservable<T>
var left = new Subject<int>();
var right = new Subject<int>();

left.Join(
    right,
    // leftから発行される値の有効期間は永久
    _ => Observable.Never<Unit>(),
    // rightから発行される値の有効期間は一瞬
    _ => Observable.Empty<Unit>(),
    // 発行された値の組を作る
    Tuple.Create)
    // 購読
    .Subscribe(
    // 組を表示
        tuple => Console.WriteLine("Left: {0}, Right: {1}", tuple.Item1, tuple.Item2),
    // 完了を表示
        () => Console.WriteLine("OnCompleted"));

// 値の発行
Console.WriteLine("left.OnNext(1)");
left.OnNext(1);
Console.WriteLine("right.OnNext(10)");
right.OnNext(10);
Console.WriteLine("right.OnNext(100)");
right.OnNext(100);
Console.WriteLine("left.OnNext(2)");
left.OnNext(2);
Console.WriteLine("right.OnNext(1000)");
right.OnNext(1000);

// 終了
Console.WriteLine("left.OnCompleted()");
left.OnCompleted();
Console.WriteLine("right.OnCompleted()");
right.OnCompleted();

上記のコードはrightから発行された値の有効期間をObservable.Empty()で指定しているため、値が発行された瞬間しか有効期間が無いようにしています。このプログラムの実行結果を以下に示します。

left.OnNext(1)
right.OnNext(10)
Left: 1, Right: 10
right.OnNext(100)
Left: 1, Right: 100
left.OnNext(2)
right.OnNext(1000)
Left: 1, Right: 1000
Left: 2, Right: 1000
left.OnCompleted()
right.OnCompleted()
OnCompleted

rightから値が発行されると、今までleftから発行された値との組み合わせが生成されていることが確認できます。逆にleftから値が発行されてもrightから発行された値には有効なものが1つもないため、resultSelectorで指定したデリゲートは呼ばれません。
ここでは、コード例を示しませんが、値の有効期限にタイマーやイベントなどから生成したIObservableを指定することで、よりきめ細やかに値の有効期限を指定することが出来ます。