かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その43「GroupJoinメソッド」

過去記事インデックス

はじめに

今回はGroupJoinメソッドについてみていきます。使いどころは、まだわからないですが動きはとりあえず把握しておきましょう!

GroupJoinメソッド

ここでは、GroupJoinメソッドについて説明します。GroupJoinメソッドは、2つのIObservableのシーケンスのうち左辺の値に対して右辺の値のIObservableのシーケンスの組み合わせを合成します。このメソッドのシグネチャを以下に示します。

public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, IObservable<TRight>, TResult> resultSelector);

Joinメソッドと、ほぼ同じシグネチャですが、resultSelectorデリゲートの引数に違いがあります。resultSelectorデリゲートの第二引数がJoinメソッドではTRight型だったのに対してGroupJoinメソッドではIObservableになります。GroupJoinメソッドのresultSelectorは、left引数から値が発行されたタイミングで呼び出されます。この時のresultSelectorの第一引数が、leftから発行される値で、第二引数がrightから発行された有効期限内の値を発行するIObservableになります。このメソッドの使用例を下記に示します。

// センサー名
var sensors = new Subject<string>();
// センサーが受信する値
var values = new Subject<int>();

// 値のリセット用Subject
var valueReset = new Subject<Unit>();

sensors.GroupJoin(
    values,
    // センサーは有効期限無し
    _ => Observable.Never<Unit>(),
    // センサーの値はvalueResetのOnNextで無効に出来る
    _ => valueReset,
    // センサーの名前と、センサーが受け取った値の現在の合計値を発行するLogにして後続に流す
    (l, r) => new { Name = l, Log = r.Scan((x, y) => x + y) })
    .Subscribe(
        sensor =>
        {
            // Logを表示する
            sensor
                .Log
                .Subscribe(i => Console.WriteLine("{0}: {1}", sensor.Name, i));
        },
        // 完了
        () => Console.WriteLine("OnCompleted"));

// センサーを2つ登録
Console.WriteLine("sensors.OnNext(sensor1)");
sensors.OnNext("sensor1");
Console.WriteLine("sensors.OnNext(sensor2)");
sensors.OnNext("sensor2");

// 値を3つ発行
Console.WriteLine("values.OnNext(100)");
values.OnNext(100);
Console.WriteLine("values.OnNext(10)");
values.OnNext(10);
Console.WriteLine("values.OnNext(1)");
values.OnNext(1);

// センサーの値を一旦リセット
Console.WriteLine("valueReset.OnNext()");
valueReset.OnNext(Unit.Default);

// 新しいセンサーを追加
Console.WriteLine("sensors.OnNext(sensor3)");
sensors.OnNext("sensor3");
// 値を3つ発行
Console.WriteLine("values.OnNext(1)");
values.OnNext(1);
Console.WriteLine("values.OnNext(2)");
values.OnNext(2);
Console.WriteLine("values.OnNext(3)");
values.OnNext(3);

// 終了
Console.WriteLine("values.OnCompleted()");
values.OnCompleted();
Console.WriteLine("sensors.OnCompleted()");
sensors.OnCompleted();

コードのイメージはセンサーと、センサーが受信した値の合計値をリアルタイムで表示するプログラムです。センサーが受信する値は任意のタイミング(valueReset変数のOnNext)でリセットできるようにしています。このメソッドの実行結果を以下に示します。

sensors.OnNext(sensor1)
sensors.OnNext(sensor2)
values.OnNext(100)
sensor1: 100
sensor2: 100
values.OnNext(10)
sensor1: 110
sensor2: 110
values.OnNext(1)
sensor1: 111
sensor2: 111
valueReset.OnNext()
sensors.OnNext(sensor3)
values.OnNext(1)
sensor1: 112
sensor2: 112
sensor3: 1
values.OnNext(2)
sensor1: 114
sensor2: 114
sensor3: 3
values.OnNext(3)
sensor1: 117
sensor2: 117
sensor3: 6
values.OnCompleted()
sensors.OnCompleted()
OnCompleted

sensor1とsensor2は、発行された値の合計を全て保持していることが確認できますが、valueResetのOnNextを呼んだ後に追加したsensor3は、途中からの値の合計しか集計していないことが確認できます。