かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その21「GroupByUntilメソッド」

過去記事インデックス

はじめに

前回は、GroupByメソッドを使ってみました。今回はGroupByUntilメソッドを使ってみようと思います。このメソッドを使うとGroupByとは違った一定の間隔や何かが起きたタイミングでグルーピングをいったん締めたりといったことができるので、はまるところで使うととても強力そうなやつです。ということで見ていきましょう。

GroupByUntilメソッド

ここでは、GroupByUntilメソッドについて説明します。これは「SkipUntilとTakeUntilメソッド」で説明したSkipUntilメソッドやTakeUntilメソッドと同じような動きをします。SkipUntilは引数で渡したIObservableのOnNextが発行されるまで値をスキップします。TakeUntilは引数で渡したIObservableのOnNextが発行されるまで値を後ろに流したりします。このようにGroupByUntilメソッドでも、IObservableを使ってメソッドの処理の開始や終了のタイミングを制御することが出来ます。
動きの詳しい説明の前にGroupByUntilメソッドのシグネチャを下記に示します。

public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(
	this IObservable<TSource> source,
	Func<TSource, TKey> keySelector,
	Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector
)

第三引数にFunc, IObservable>という型のデリゲートを受け取っているところが、理解に苦しむ点だと思います。これは、第二引数のkeySelectorで選択された値で新しいグループが出来たときに呼び出されるデリゲートになります。そのため引数がグループを表すIGroupedObservableになります。そして、戻り値で、このグループの集計を終了するきっかけをOnNextで通知するIObservableを返します。コード例を下記に示します。

// 値をランダムで出したいので
var r = new Random();
// グループピングの終了を通知するためのIObservable
var duration = new Subject<Unit>();

// GroupByUntilの使用例
var subscriber = Observable.Interval(TimeSpan.FromMilliseconds(500))
    // 500ミリ秒間隔で乱数発生
    .Select(_ => r.Next(1000))
    .GroupByUntil(
        // 1の位の数でグルーピング
        l => l % 10,
        // durationでOnNextが発行されるまでの間グルーピングをする。
        // 全てのグループに対して、同じIObservableを返しているので、同じタイミングで
        // グルーピングが終わる。
        l => duration.AsObservable())
    // 購読
    .Subscribe(
        g =>
        {
            // GroupByのOnNextの処理
            Console.WriteLine("集計開始 {0}", g.Key);
            // 値を集計 配列にまとめて
            g.ToArray()
                // ,区切りの文字列にして
                .Select(array => string.Join(", ", array))
                // 結果を表示
                .Subscribe(
                    values => Console.WriteLine("集計結果 Key: {0}, Values: {{{1}}}", g.Key, values),
                    () => Console.WriteLine("集計終了 Key: {0}", g.Key));
        },
        ex =>
        {
            // エラー処理(今回の例ではエラーは起きないけど・・・
            Console.WriteLine("GroupBy OnError {0}", ex.Message);
        },
        () =>
        {
            // GroupByの結果がOnCompletedになった時の処理
            Console.WriteLine("GroupBy OnCompleted");
        });

while(true)
{
    // 待ち
    Console.WriteLine("Enterを押すまで集計します。(終了したい場合はendと入力してください)");
    if (Console.ReadLine() == "end")
    {
        break;
    }

    // durationのOnNextでいったんグルーピングの集計が一区切り
    duration.OnNext(Unit.Default);
}
// 後始末
subscriber.Dispose();

このコードでは0.5秒(500ms)間隔でランダムに0〜999の値を発行しています。そして、発行された値の1の位でグルーピングをしています。GroupByUntilでは、グルーピングの終了のタイミングを通知するIObservableを返すデリゲートで全て同じIObservableを返しています。そのため、全てのグルーピングの終了のタイミングが同じになります。グルーピングの終了のタイミングは、コンソールでEnterを押したタイミングになります。Enterを押すと、今までのグルーピングの結果を表示して、新たにグルーピングを行います。endと入力してEnterを押すと処理が終了します。実行結果を下記に示します。

Enterを押すまで集計します。(終了したい場合はendと入力してください)
集計開始 8
集計開始 4
集計開始 1
集計開始 6
集計開始 9
集計開始 7
集計開始 0
集計開始 5
集計開始 2
集計開始 3

集計結果 Key: 8, Values: {358, 808, 48, 308}
集計終了 Key: 8
集計結果 Key: 4, Values: {724, 544, 594, 604}
集計終了 Key: 4
集計結果 Key: 1, Values: {371, 161, 81, 141, 221}
集計終了 Key: 1
集計結果 Key: 6, Values: {426, 666, 456}
集計終了 Key: 6
集計結果 Key: 9, Values: {309, 379, 689, 19}
集計終了 Key: 9
集計結果 Key: 7, Values: {967, 57, 557, 477}
集計終了 Key: 7
集計結果 Key: 0, Values: {650, 330, 590, 570, 780, 550}
集計終了 Key: 0
集計結果 Key: 5, Values: {545}
集計終了 Key: 5
集計結果 Key: 2, Values: {462, 532, 492, 902, 252, 542, 692, 852}
集計終了 Key: 2
集計結果 Key: 3, Values: {663, 433, 763}
集計終了 Key: 3
Enterを押すまで集計します。(終了したい場合はendと入力してください)
集計開始 2
集計開始 7
集計開始 8
集計開始 0
集計開始 4
集計開始 9
end
集計結果 Key: 2, Values: {112}
集計終了 Key: 2
集計結果 Key: 7, Values: {357}
集計終了 Key: 7
集計結果 Key: 8, Values: {598}
集計終了 Key: 8
集計結果 Key: 0, Values: {850}
集計終了 Key: 0
集計結果 Key: 4, Values: {644}
集計終了 Key: 4
集計結果 Key: 9, Values: {699}
集計終了 Key: 9

長い実行結果ですが、実行後暫く放置してEnterを押して、その後endと入力してEnterを押した実行結果になります。一度Enterを押してグルーピングが終了したあとにも、新たにグルーピングが行われています。これはもとになるObservable.Intervalが終了していないためです。グルーピングが一旦終了したあとに、新たに値が発行されるため再度グルーピングが行われます。このように、GroupByでは素直に書けない一定間隔でのグルーピングなどがGroupByUntilでは行えます。また、今回の例では全てのIGroupedObservableに対して同じIObservableを返していたため、全てのグルーピングの終了タイミングが同じになっていましたが、IGroupedObservableごとに異なるIObservableを返すことで、グルーピングの終了タイミングをグループごとに指定することも出来ます。