かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」

過去記事インデックス

はじめに

ここでは、GroupByメソッドについて説明します。これは、SQL文のGROUP BYのように特定の値でグルーピングしてくれる機能を提供します。値のグルーピング等は自分で書いてもたいしたことない!というケースもあると思いますが、提供されている機能なので車輪の再発明をするより、積極的に使っていけばいいように思います。個人的には、Timerからリアルタイムで上がってくる値をグルーピングする処理とかは自前では書きたくない今日この頃です。

使ってみよう

GroupByメソッドのシグネチャとしては、IObservableのTからグルーピングのキーとなる値を抽出するデリゲートを渡します。戻り値は、IObservable>になります。少し、IGroupedObservableについて説明します。
IGroupedObservableは、IObservableを拡張したインターフェースでTKey型のKeyプロパティを追加しています。このKeyプロパティで、何でグルーピングされたかを表す以外は通常のIObservableと変わりはありません。コード例を下記に示します。

var s = new Subject<int>();
// 10で割った余りでグルーピングする。(つまり1の位の数字でグルーピング)
s.GroupBy(i => i % 10)
    // IGroupedObservable<int, int>がSubscribeのOnNextに渡ってくる
    .Subscribe(
        g =>
        {
            // GroupByのOnNextの処理
            Console.WriteLine("集計開始 {0}", g.Key);
            // 値を集計 配列にまとめて
            g.ToArray()
                // ,区切りの文字列にして
                .Select(array => string.Join(", ", array))
                // 結果を表示
                .Subscribe(
                    values => Console.WriteLine("集計結果 Key: {0}, Values: {{{1}}}", g.Key, values),
                    () => Console.WriteLine("集計終了 Key: {0}", g.Key));
        },
        ex =>
        {
            // エラー処理(今回の例ではエラーは起きないけど・・・
            Console.WriteLine("GroupBy OnError {0}", ex.Message);
        },
        () =>
        {
            // GroupByの結果がOnCompletedになった時の処理
            Console.WriteLine("GroupBy OnCompleted");
        });

// 値をいくつか発行して終了
Console.WriteLine("OnNext(13)");
s.OnNext(13);
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(11)");
s.OnNext(11);
Console.WriteLine("OnNext(42)");
s.OnNext(42);
Console.WriteLine("OnNext(21)");
s.OnNext(21);
Console.WriteLine("OnNext(12)");
s.OnNext(12);
Console.WriteLine("OnNext(23)");
s.OnNext(23);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

多少複雑ですが、コードを理解するポイントはGroupByの結果がIObservable>になっている点です。これをSubscribeするとOnNextにはIGroupedObservableが渡ってきます。IGroupedObservableは、Keyプロパティがある以外はIObservableと同じなので、これまで使ってきた拡張メソッドがすべて使えます。ここの例では、発行された値をすべて”, ”区切りの文字列にして出力しています。実行結果を下記に示します。

OnNext(13)
集計開始 3
OnNext(1)
集計開始 1
OnNext(11)
OnNext(42)
集計開始 2
OnNext(21)
OnNext(12)
OnNext(23)
OnCompleted()
集計結果 Key: 3, Values: {13, 23}
集計終了 Key: 3
集計結果 Key: 1, Values: {1, 11, 21}
集計終了 Key: 1
集計結果 Key: 2, Values: {42, 12}
集計終了 Key: 2
GroupBy OnCompleted

新しいグループに属する値が発行される度に、“集計開始“という文字列が出力されます。これはGroupByをした結果のOnNext内で出力しているものです。新しいグループに属する値が出てくると即座にIGroupedObservableが作成されてSubscribeしている箇所に流れてくることがわかります。そして、値を発行しおわったあとに、“集計結果”という文字列が出力されます。これは、もとになるIObservableのシーケンスが完了したため、IGroupedObservableのToArrayが値をまとめた結果を後続の処理に流し文字列として加工して出力しているものになります。最後に、IGroupedObservableのOnCompletedの”集計終了”という文字列が出力されます。そして、最後にGroupByの結果を購読しているところのOnCompletedが呼ばれて”GroupBy OnCompleted”と表示されています。
このコードはSubscribeしている中で、さらにSubscribeしていたりToArrayなどの他のメソッドの挙動もかかわってくるため若干複雑になっています。処理の流れがイメージできない場合は、コードを実際に書いてみてデバッガで止めて処理の流れを追うなどして動作を確認してみましょう。