かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その25「値をまとめるBufferメソッド」

過去記事インデックス

はじめに

前回から、1週間程度間があいてしまいました。ちょっとPrismのコードおいかけたりして遊んでましたが、こっちのことも忘れてません。ということで今回はBufferメソッドです。

Bufferメソッド

ここでは、Bufferメソッドについて説明します。Bufferメソッドは、もとになるIObservableのシーケンスから指定した数、時間、タイミングで値をまとめてIObservable>のシーケンスとして後続に流すメソッドになります。動作は単純ですが、オーバーロードが多数存在します。ここでは、代表的なものをいくつかピックアップして紹介します。オーバーロードの一覧は、Reactive Extensionsのドキュメントを参照してください。

数でまとめるBufferメソッドのオーバーロード

まず、最初に一番直感的に使える指定した数で値をまとめるオーバーロードについて説明します。メソッドのシグネチャを以下に示します。

public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source, 
    int count)

countで、値をまとめる個数を指定します。このメソッドの使用例を書きに示します。

// 1〜10の値を発行するIObservable<int>のシーケンス
Observable.Range(1, 10)
    // 3つずつの値に分ける
    .Buffer(3)
    .Subscribe(
        l =>
        {
            // IList<int>の内容を出力
            Console.WriteLine("-- Buffer start");
            foreach (var i in l)
            {
                Console.WriteLine(i);
            }
        },
        // 完了
        () => Console.WriteLine("OnCompleted"));

Observable.Rangeを使って1〜10の10個の値を発行するIObservableのシーケンスを作成してBuffer(3)で3つずつ値をまとめています。順当にいくと、1つ値が余ってしまいます。その値がどのように扱われるのかに注意して下記実行結果を確認してください。

-- Buffer start
1
2
3
-- Buffer start
4
5
6
-- Buffer start
7
8
9
-- Buffer start
10
OnCompleted

結果からわかるように、(1,2,3), (4,5,6), (7,8,9), (10)のようにまとめられています。最後のあまった数は、無視されるのではなく、残ったものだけをIListにまとめて、後続に流します。
数を指定するBufferメソッドのオーバーロードには、下記のシグネチャのように2つのint型の数を受け取るものがあります。

public static IObservable<IList<TSource>> Buffer<TSource>(
    this IObservable<TSource> source, 
    int count, 
    int skip)

引数のcountは、値をまとめる数でskipは、Bufferでまとめ終わった後に次の値を収集するスタート地点を何処にするのか指定する引数です。skipで指定した数だけ、発行される値を無視したあとにcount個の値を集め始めます。最初に使用した引数が1つのBufferメソッドは、count個ずつ値をまとめたあとの、次に値をまとめ始める起点がcount個の値を飛ばした個所になるため、countとskipに同じ値を指定した場合と等しくなります。
このオーバーロードのコード例を下記に示します。

// 1〜10の値を発行するIObservable<int>のシーケンス
Observable.Range(1, 10)
    // 3つずつの値に分けて、値は2つ飛ばし
    .Buffer(3, 2)
    .Subscribe(
        l =>
        {
            // IList<int>の内容を出力
            Console.WriteLine("-- Buffer start");
            foreach (var i in l)
            {
                Console.WriteLine(i);
            }
        },
        // 完了
        () => Console.WriteLine("OnCompleted"));

Observable.Rangeメソッドを使用して1〜10の10個の値を発行するIObservableのシーケンスを作成して、Buffer(3, 2)の呼び出しで3こずつ、値は2つ飛ばしでまとめています。実行結果を下記に示します。

-- Buffer start
1
2
3
-- Buffer start
3
4
5
-- Buffer start
5
6
7
-- Buffer start
7
8
9
-- Buffer start
9
10
OnCompleted

実行結果からわかるように(1,2,3), (3,4,5), (5,6,7), (7,8,9), (9, 10)という形で値がまとめられています。まとめられた値の最初の数を見ると1, 3, 5, 7, 9というようにskip引数で指定した2が公差になっている数列になっていることが確認できます。
skip引数にcount引数より大きな値を指定することも可能です。コード例と実行結果を下記に示します。

// 1〜10の値を発行するIObservable<int>のシーケンス
Observable.Range(1, 10)
    // 3つずつの値に分けて、値は5つ飛ばし
    .Buffer(3, 5)
    .Subscribe(
        l =>
        {
            // IList<int>の内容を出力
            Console.WriteLine("-- Buffer start");
            foreach (var i in l)
            {
                Console.WriteLine(i);
            }
        },
        // 完了
        () => Console.WriteLine("OnCompleted"));

このコードではcountに3, skipに5を指定しています。実行結果を下記に示します。

-- Buffer start
1
2
3
-- Buffer start
6
7
8
OnCompleted

時間でまとめるBufferメソッドのオーバーロード

次に、時間で値をまとめるBufferメソッドのオーバーロードについて説明します。Reactive Extensionsが得意とする時間を扱うオーバーロードなので、単純に数を指定するだけのよりも実践では使う機会が多いかもしれません。
まず、一番単純な時間の間隔を指定するオーバーロードのメソッドのシグネチャを下記に示します。

public static IObservable<IList<T>> Buffer<TSource>(
    this IObservable<T> source, 
    TimeSpan timeSpan);

timeSpan引数で指定した間隔で値をためて、たまった値のIListを後続に流します。コード例を下記に示します。

var gate = new EventWaitHandle(false, EventResetMode.AutoReset);
Observable
    // 500msごとに値を発行する
    .Interval(TimeSpan.FromMilliseconds(500))
    // 3秒間値を溜める
    .Buffer(TimeSpan.FromSeconds(3))
    // 最初の3つを後続に流す
    .Take(3)
    .Subscribe(
        l =>
        {
            // 値を表示
            Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now);
            foreach (var i in l)
            {
                Console.WriteLine(i);
            }
        },
        () =>
        {
            // 完了
            Console.WriteLine("OnCompleted");
            gate.Set();
        });

// OnCompleted待ち
Console.WriteLine("WaitOne");
gate.WaitOne();
Console.WriteLine("WaitOne Completed");

500msごとに値を発行して、それを3秒スパンでまとめて出力しています。実行結果を下記に示します。

WaitOne
--Buffer 20:35:26
0
1
2
3
4
5
--Buffer 20:35:29
6
7
8
9
10
11
--Buffer 20:35:32
12
13
14
15
16
17
OnCompleted
WaitOne Completed

3秒間隔で6個ずつ値がまとめられていることが確認できます。
時間指定のオーバーロードにも、第二引数で次の値の塊を作るために待機する時間を指定するオーバーロードがあります。シグネチャを下記に示します。

public static IObservable<IList<TSource>> Buffer<T>(this IObservable<T> source, 
    TimeSpan timeSpan, 
    TimeSpan timeShift);

timeShift引数が、次の値を集め始めるまでの時間を指定する箇所になります。このオーバーロードを使用したコード例を下記に示します。

var gate = new EventWaitHandle(false, EventResetMode.AutoReset);
Observable
    // 500msごとに値を発行する
    .Interval(TimeSpan.FromMilliseconds(500))
    // 3秒間値を溜める。次の値をためるための待機時間は2秒
    .Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(2))
    // 最初の3つを後続に流す
    .Take(3)
    .Subscribe(
        l =>
        {
            // 値を表示
            Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now);
            foreach (var i in l)
            {
                Console.WriteLine(i);
            }
        },
        () =>
        {
            // 完了
            Console.WriteLine("OnCompleted");
            gate.Set();
        });

// OnCompleted待ち
Console.WriteLine("WaitOne");
gate.WaitOne();
Console.WriteLine("WaitOne Completed");

このコードでは、3秒間値を溜めるとともに次の値を溜め始めるまでの間隔を2秒にしています。もととなるIObservableのシーケンスでは500msごとに値が発行されているため、1秒分の値が1つ前の値の塊と重複します。実行結果を以下に示します。

WaitOne
--Buffer 20:44:00
0
1
2
3
4
5
--Buffer 20:44:02
4                         ← 1秒分(500msスパンなのでこの場合2個)の値がかぶってる
5                         ← 
6
7
8
9
--Buffer 20:44:04
8                         ← 1秒分(500msスパンなのでこの場合2個)の値がかぶってる
9                         ←
10
11
12
13
OnCompleted

実行結果内にも示していますが、1秒分の値が重複していることが確認できます。

任意のタイミングで値をまとめるBufferメソッドのオーバーロード

これまで個数と時間で値をまとめるBufferメソッドのオーバーロードを見てきましたが、任意のタイミングで値をまとめるのを区切るオーバーロードもあります。シグネチャを下記に示します。

public static IObservable<IList<TSource>> Buffer<T, TBufferClosing>(
    this IObservable<T> source, 
    Func<IObservable<TBufferClosing>> bufferClosingSelector);

bufferClosingSelectorデリゲートが返すIObservableがOnNextを発行したタイミングで値をまとめるのを辞めます。コード例を下記に示します。

var gate = new EventWaitHandle(false, EventResetMode.AutoReset);
Observable
    // 500ms間隔で値を発行
    .Interval(TimeSpan.FromMilliseconds(500))
    // 任意の値で値をまとめるのを辞める(この場合3秒間隔)
    .Buffer(() => Observable.Interval(TimeSpan.FromSeconds(3)))
    // 最初の3つだけ後続に流す
    .Take(3)
    .Subscribe(
            l =>
            {
                // 値を表示
                Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now);
                foreach (var i in l)
                {
                    Console.WriteLine(i);
                }
            },
            () =>
            {
                // 完了
                Console.WriteLine("OnCompleted");
                gate.Set();
            });
// OnCompleted待ち
Console.WriteLine("WaitOne");
gate.WaitOne();
Console.WriteLine("WaitOne Completed");

このコード例では、Buffer(() => Observable.Interval(TimeSpan.FromSeconds(3)))のようにして、3秒で値をまとめるのを終了するようにしています。実行結果を下記に示します。

WaitOne
--Buffer 21:08:55
0
1
2
3
4
5
--Buffer 21:08:58
6
7
8
9
10
11
--Buffer 21:09:01
12
13
14
15
16
17
OnCompleted
WaitOne Completed

指定した通り、3秒間隔で値をまとめていることが確認できます。
このBufferの引数に渡したデリゲートは、Bufferを開始するタイミングで毎回評価されるので、その時の状況に応じた終了タイミングを表すIObservableを返すことが出来ます。例えば、ボタンのクリックイベントをIObservableにしたものを渡すと、クリックのタイミングで値のグルーピングを終了させるといった使い方も出来ます。
また、IObservableを受け取るオーバーロードには値を集めるタイミングと、値を集めるのを終了するタイミングを任意に指定するオーバーロードも定義されています。シグネチャを下記に示します。

public static IObservable<IList<T>> Buffer<T, TBufferOpening, TBufferClosing>(
    this IObservable<T> source, 
    IObservable<TBufferOpening> bufferOpenings, 
    Func<TBufferOpening, IObservable<TBufferClosing>> bufferClosingSelector);

bufferOpeningsは、もとになるIObservableのシーケンスから値をあつめはじめるタイミングを指定します。bufferOpeningsから値が発行されるとbufferClosingSelectorに、その値が渡され、値の収集を終了するタイミングを通知するIObservableが作成されます。このオーバーロードの使用例を下記に示します。

var gate = new EventWaitHandle(false, EventResetMode.AutoReset);
// クリックをエミュレート
var clickEmuration = new Subject<Unit>();
Observable
    // 500ms間隔で値を発行
    .Interval(TimeSpan.FromMilliseconds(500))
    // 任意の値で値をまとめるのを辞める(この場合3秒間隔)
    .Buffer(
        // clickEmurationから通知がきたら
        clickEmuration.AsObservable(),
        // 2秒間値を集める
        _ => Observable.Interval(TimeSpan.FromSeconds(2)))
    // 最初の2つだけ後続に流す
    .Take(2)
    .Subscribe(
            l =>
            {
                // 値を表示
                Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now);
                foreach (var i in l)
                {
                    Console.WriteLine(i);
                }
            },
            () =>
            {
                // 完了
                Console.WriteLine("OnCompleted");
                gate.Set();
            });

// Enterを押すとクリックを模したSubjectから通知を上げる
Console.ReadLine();
Console.WriteLine("{0:HH:mm:ss} Click emurate", DateTime.Now);
clickEmuration.OnNext(Unit.Default);

// Enterを押すとクリックを模したSubjectから通知を上げる
Console.ReadLine();
Console.WriteLine("{0:HH:mm:ss} Click emurate", DateTime.Now);
clickEmuration.OnNext(Unit.Default);

// OnCompleted待ち
gate.WaitOne();
Console.WriteLine("WaitOne Completed");

このコードは、clickEmurationから値が発行されてから2秒間IObservableのシーケンスから値を収集して出力します。このコード例ではSubjectを使用していますが、これをボタンのクリックイベントや、何か別の契機を表すIObservableに変えることができることを考えると、かなり柔軟性の高いオーバーロードだと言えます。実行結果を下記に示します。

21:22:02 Click emurate
--Buffer 21:22:04
5
6
7
8

21:22:08 Click emurate
--Buffer 21:22:10
17
18
19
20
OnCompleted
WaitOne Completed

時間と数でまとめるBufferメソッドのオーバーロード

最後に、紹介するのは時間と数の両方でまとめるBufferメソッドのオーバーロードです。これは、「数でまとめるBufferメソッドのオーバーロード」と「時間でまとめるBufferメソッドのオーバーロード」で紹介した内容をあわせたもので、指定した時間か指定した数のどちらかがそろったタイミングで後続にまとめた値を流します。メソッドのシグネチャを下記に示します。

public static IObservable<IList<T>> Buffer<T>(
    this IObservable<T> source, 
    TimeSpan timeSpan, 
    int count);

timeSpanで時間間隔を指定し、countでまとめる個数を指定します。コード例を下記に示します。

var gate = new EventWaitHandle(false, EventResetMode.AutoReset);
Observable
    // 0-9の値をi * 200 ms間隔で発行する
    .Generate(
        0,
        i => i < 10,
        i => ++i,
        i => i,
        i => TimeSpan.FromMilliseconds(i * 200))
    // 2秒間隔か2つ値がくるまでまとめる
    .Buffer(TimeSpan.FromSeconds(2), 2)
    .Subscribe(
            l =>
            {
                // 値を表示
                Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now);
                foreach (var i in l)
                {
                    Console.WriteLine(i);
                }
            },
            () =>
            {
                // 完了
                Console.WriteLine("OnCompleted");
                gate.Set();
            });
gate.WaitOne();

実行結果を下記に示します。

--Buffer 21:44:37
0
1
--Buffer 21:44:38
2
3
--Buffer 21:44:40
4
5
--Buffer 21:44:42
6
--Buffer 21:44:44
7
--Buffer 21:44:46
8
9
--Buffer 21:44:46
OnCompleted

最初の方は、値の発行間隔が短いためcount引数で指定した2つの値が流れてきていますが、後半になってくると、値の発行間隔が長くなってくるため、timeSpanで指定した間隔で値がまとめられていることが確認できます。