かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive Extensions再入門 その43「GroupJoinメソッド」

過去記事インデックス

はじめに

今回はGroupJoinメソッドについてみていきます。使いどころは、まだわからないですが動きはとりあえず把握しておきましょう!

GroupJoinメソッド

ここでは、GroupJoinメソッドについて説明します。GroupJoinメソッドは、2つのIObservableのシーケンスのうち左辺の値に対して右辺の値のIObservableのシーケンスの組み合わせを合成します。このメソッドのシグネチャを以下に示します。

public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, IObservable<TRight>, TResult> resultSelector);

Joinメソッドと、ほぼ同じシグネチャですが、resultSelectorデリゲートの引数に違いがあります。resultSelectorデリゲートの第二引数がJoinメソッドではTRight型だったのに対してGroupJoinメソッドではIObservableになります。GroupJoinメソッドのresultSelectorは、left引数から値が発行されたタイミングで呼び出されます。この時のresultSelectorの第一引数が、leftから発行される値で、第二引数がrightから発行された有効期限内の値を発行するIObservableになります。このメソッドの使用例を下記に示します。

// センサー名
var sensors = new Subject<string>();
// センサーが受信する値
var values = new Subject<int>();

// 値のリセット用Subject
var valueReset = new Subject<Unit>();

sensors.GroupJoin(
    values,
    // センサーは有効期限無し
    _ => Observable.Never<Unit>(),
    // センサーの値はvalueResetのOnNextで無効に出来る
    _ => valueReset,
    // センサーの名前と、センサーが受け取った値の現在の合計値を発行するLogにして後続に流す
    (l, r) => new { Name = l, Log = r.Scan((x, y) => x + y) })
    .Subscribe(
        sensor =>
        {
            // Logを表示する
            sensor
                .Log
                .Subscribe(i => Console.WriteLine("{0}: {1}", sensor.Name, i));
        },
        // 完了
        () => Console.WriteLine("OnCompleted"));

// センサーを2つ登録
Console.WriteLine("sensors.OnNext(sensor1)");
sensors.OnNext("sensor1");
Console.WriteLine("sensors.OnNext(sensor2)");
sensors.OnNext("sensor2");

// 値を3つ発行
Console.WriteLine("values.OnNext(100)");
values.OnNext(100);
Console.WriteLine("values.OnNext(10)");
values.OnNext(10);
Console.WriteLine("values.OnNext(1)");
values.OnNext(1);

// センサーの値を一旦リセット
Console.WriteLine("valueReset.OnNext()");
valueReset.OnNext(Unit.Default);

// 新しいセンサーを追加
Console.WriteLine("sensors.OnNext(sensor3)");
sensors.OnNext("sensor3");
// 値を3つ発行
Console.WriteLine("values.OnNext(1)");
values.OnNext(1);
Console.WriteLine("values.OnNext(2)");
values.OnNext(2);
Console.WriteLine("values.OnNext(3)");
values.OnNext(3);

// 終了
Console.WriteLine("values.OnCompleted()");
values.OnCompleted();
Console.WriteLine("sensors.OnCompleted()");
sensors.OnCompleted();

コードのイメージはセンサーと、センサーが受信した値の合計値をリアルタイムで表示するプログラムです。センサーが受信する値は任意のタイミング(valueReset変数のOnNext)でリセットできるようにしています。このメソッドの実行結果を以下に示します。

sensors.OnNext(sensor1)
sensors.OnNext(sensor2)
values.OnNext(100)
sensor1: 100
sensor2: 100
values.OnNext(10)
sensor1: 110
sensor2: 110
values.OnNext(1)
sensor1: 111
sensor2: 111
valueReset.OnNext()
sensors.OnNext(sensor3)
values.OnNext(1)
sensor1: 112
sensor2: 112
sensor3: 1
values.OnNext(2)
sensor1: 114
sensor2: 114
sensor3: 3
values.OnNext(3)
sensor1: 117
sensor2: 117
sensor3: 6
values.OnCompleted()
sensors.OnCompleted()
OnCompleted

sensor1とsensor2は、発行された値の合計を全て保持していることが確認できますが、valueResetのOnNextを呼んだ後に追加したsensor3は、途中からの値の合計しか集計していないことが確認できます。