過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
はじめに
前回から、1週間程度間があいてしまいました。ちょっとPrismのコードおいかけたりして遊んでましたが、こっちのことも忘れてません。ということで今回はBufferメソッドです。
Bufferメソッド
ここでは、Bufferメソッドについて説明します。Bufferメソッドは、もとになるIObservable
数でまとめるBufferメソッドのオーバーロード
まず、最初に一番直感的に使える指定した数で値をまとめるオーバーロードについて説明します。メソッドのシグネチャを以下に示します。
public static IObservable<IList<TSource>> Buffer<TSource>( this IObservable<TSource> source, int count)
countで、値をまとめる個数を指定します。このメソッドの使用例を書きに示します。
// 1〜10の値を発行するIObservable<int>のシーケンス Observable.Range(1, 10) // 3つずつの値に分ける .Buffer(3) .Subscribe( l => { // IList<int>の内容を出力 Console.WriteLine("-- Buffer start"); foreach (var i in l) { Console.WriteLine(i); } }, // 完了 () => Console.WriteLine("OnCompleted"));
Observable.Rangeを使って1〜10の10個の値を発行するIObservable
-- Buffer start 1 2 3 -- Buffer start 4 5 6 -- Buffer start 7 8 9 -- Buffer start 10 OnCompleted
結果からわかるように、(1,2,3), (4,5,6), (7,8,9), (10)のようにまとめられています。最後のあまった数は、無視されるのではなく、残ったものだけをIList
数を指定するBufferメソッドのオーバーロードには、下記のシグネチャのように2つのint型の数を受け取るものがあります。
public static IObservable<IList<TSource>> Buffer<TSource>( this IObservable<TSource> source, int count, int skip)
引数のcountは、値をまとめる数でskipは、Bufferでまとめ終わった後に次の値を収集するスタート地点を何処にするのか指定する引数です。skipで指定した数だけ、発行される値を無視したあとにcount個の値を集め始めます。最初に使用した引数が1つのBufferメソッドは、count個ずつ値をまとめたあとの、次に値をまとめ始める起点がcount個の値を飛ばした個所になるため、countとskipに同じ値を指定した場合と等しくなります。
このオーバーロードのコード例を下記に示します。
// 1〜10の値を発行するIObservable<int>のシーケンス Observable.Range(1, 10) // 3つずつの値に分けて、値は2つ飛ばし .Buffer(3, 2) .Subscribe( l => { // IList<int>の内容を出力 Console.WriteLine("-- Buffer start"); foreach (var i in l) { Console.WriteLine(i); } }, // 完了 () => Console.WriteLine("OnCompleted"));
Observable.Rangeメソッドを使用して1〜10の10個の値を発行するIObservable
-- Buffer start 1 2 3 -- Buffer start 3 4 5 -- Buffer start 5 6 7 -- Buffer start 7 8 9 -- Buffer start 9 10 OnCompleted
実行結果からわかるように(1,2,3), (3,4,5), (5,6,7), (7,8,9), (9, 10)という形で値がまとめられています。まとめられた値の最初の数を見ると1, 3, 5, 7, 9というようにskip引数で指定した2が公差になっている数列になっていることが確認できます。
skip引数にcount引数より大きな値を指定することも可能です。コード例と実行結果を下記に示します。
// 1〜10の値を発行するIObservable<int>のシーケンス Observable.Range(1, 10) // 3つずつの値に分けて、値は5つ飛ばし .Buffer(3, 5) .Subscribe( l => { // IList<int>の内容を出力 Console.WriteLine("-- Buffer start"); foreach (var i in l) { Console.WriteLine(i); } }, // 完了 () => Console.WriteLine("OnCompleted"));
このコードではcountに3, skipに5を指定しています。実行結果を下記に示します。
-- Buffer start 1 2 3 -- Buffer start 6 7 8 OnCompleted
時間でまとめるBufferメソッドのオーバーロード
次に、時間で値をまとめるBufferメソッドのオーバーロードについて説明します。Reactive Extensionsが得意とする時間を扱うオーバーロードなので、単純に数を指定するだけのよりも実践では使う機会が多いかもしれません。
まず、一番単純な時間の間隔を指定するオーバーロードのメソッドのシグネチャを下記に示します。
public static IObservable<IList<T>> Buffer<TSource>( this IObservable<T> source, TimeSpan timeSpan);
timeSpan引数で指定した間隔で値をためて、たまった値のIList
var gate = new EventWaitHandle(false, EventResetMode.AutoReset); Observable // 500msごとに値を発行する .Interval(TimeSpan.FromMilliseconds(500)) // 3秒間値を溜める .Buffer(TimeSpan.FromSeconds(3)) // 最初の3つを後続に流す .Take(3) .Subscribe( l => { // 値を表示 Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now); foreach (var i in l) { Console.WriteLine(i); } }, () => { // 完了 Console.WriteLine("OnCompleted"); gate.Set(); }); // OnCompleted待ち Console.WriteLine("WaitOne"); gate.WaitOne(); Console.WriteLine("WaitOne Completed");
500msごとに値を発行して、それを3秒スパンでまとめて出力しています。実行結果を下記に示します。
WaitOne --Buffer 20:35:26 0 1 2 3 4 5 --Buffer 20:35:29 6 7 8 9 10 11 --Buffer 20:35:32 12 13 14 15 16 17 OnCompleted WaitOne Completed
3秒間隔で6個ずつ値がまとめられていることが確認できます。
時間指定のオーバーロードにも、第二引数で次の値の塊を作るために待機する時間を指定するオーバーロードがあります。シグネチャを下記に示します。
public static IObservable<IList<TSource>> Buffer<T>(this IObservable<T> source, TimeSpan timeSpan, TimeSpan timeShift);
timeShift引数が、次の値を集め始めるまでの時間を指定する箇所になります。このオーバーロードを使用したコード例を下記に示します。
var gate = new EventWaitHandle(false, EventResetMode.AutoReset); Observable // 500msごとに値を発行する .Interval(TimeSpan.FromMilliseconds(500)) // 3秒間値を溜める。次の値をためるための待機時間は2秒 .Buffer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(2)) // 最初の3つを後続に流す .Take(3) .Subscribe( l => { // 値を表示 Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now); foreach (var i in l) { Console.WriteLine(i); } }, () => { // 完了 Console.WriteLine("OnCompleted"); gate.Set(); }); // OnCompleted待ち Console.WriteLine("WaitOne"); gate.WaitOne(); Console.WriteLine("WaitOne Completed");
このコードでは、3秒間値を溜めるとともに次の値を溜め始めるまでの間隔を2秒にしています。もととなるIObservableのシーケンスでは500msごとに値が発行されているため、1秒分の値が1つ前の値の塊と重複します。実行結果を以下に示します。
WaitOne --Buffer 20:44:00 0 1 2 3 4 5 --Buffer 20:44:02 4 ← 1秒分(500msスパンなのでこの場合2個)の値がかぶってる 5 ← 6 7 8 9 --Buffer 20:44:04 8 ← 1秒分(500msスパンなのでこの場合2個)の値がかぶってる 9 ← 10 11 12 13 OnCompleted
実行結果内にも示していますが、1秒分の値が重複していることが確認できます。
任意のタイミングで値をまとめるBufferメソッドのオーバーロード
これまで個数と時間で値をまとめるBufferメソッドのオーバーロードを見てきましたが、任意のタイミングで値をまとめるのを区切るオーバーロードもあります。シグネチャを下記に示します。
public static IObservable<IList<TSource>> Buffer<T, TBufferClosing>( this IObservable<T> source, Func<IObservable<TBufferClosing>> bufferClosingSelector);
bufferClosingSelectorデリゲートが返すIObservable
var gate = new EventWaitHandle(false, EventResetMode.AutoReset); Observable // 500ms間隔で値を発行 .Interval(TimeSpan.FromMilliseconds(500)) // 任意の値で値をまとめるのを辞める(この場合3秒間隔) .Buffer(() => Observable.Interval(TimeSpan.FromSeconds(3))) // 最初の3つだけ後続に流す .Take(3) .Subscribe( l => { // 値を表示 Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now); foreach (var i in l) { Console.WriteLine(i); } }, () => { // 完了 Console.WriteLine("OnCompleted"); gate.Set(); }); // OnCompleted待ち Console.WriteLine("WaitOne"); gate.WaitOne(); Console.WriteLine("WaitOne Completed");
このコード例では、Buffer(() => Observable.Interval(TimeSpan.FromSeconds(3)))のようにして、3秒で値をまとめるのを終了するようにしています。実行結果を下記に示します。
WaitOne --Buffer 21:08:55 0 1 2 3 4 5 --Buffer 21:08:58 6 7 8 9 10 11 --Buffer 21:09:01 12 13 14 15 16 17 OnCompleted WaitOne Completed
指定した通り、3秒間隔で値をまとめていることが確認できます。
このBufferの引数に渡したデリゲートは、Bufferを開始するタイミングで毎回評価されるので、その時の状況に応じた終了タイミングを表すIObservable
また、IObservableを受け取るオーバーロードには値を集めるタイミングと、値を集めるのを終了するタイミングを任意に指定するオーバーロードも定義されています。シグネチャを下記に示します。
public static IObservable<IList<T>> Buffer<T, TBufferOpening, TBufferClosing>( this IObservable<T> source, IObservable<TBufferOpening> bufferOpenings, Func<TBufferOpening, IObservable<TBufferClosing>> bufferClosingSelector);
bufferOpeningsは、もとになるIObservable
var gate = new EventWaitHandle(false, EventResetMode.AutoReset); // クリックをエミュレート var clickEmuration = new Subject<Unit>(); Observable // 500ms間隔で値を発行 .Interval(TimeSpan.FromMilliseconds(500)) // 任意の値で値をまとめるのを辞める(この場合3秒間隔) .Buffer( // clickEmurationから通知がきたら clickEmuration.AsObservable(), // 2秒間値を集める _ => Observable.Interval(TimeSpan.FromSeconds(2))) // 最初の2つだけ後続に流す .Take(2) .Subscribe( l => { // 値を表示 Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now); foreach (var i in l) { Console.WriteLine(i); } }, () => { // 完了 Console.WriteLine("OnCompleted"); gate.Set(); }); // Enterを押すとクリックを模したSubjectから通知を上げる Console.ReadLine(); Console.WriteLine("{0:HH:mm:ss} Click emurate", DateTime.Now); clickEmuration.OnNext(Unit.Default); // Enterを押すとクリックを模したSubjectから通知を上げる Console.ReadLine(); Console.WriteLine("{0:HH:mm:ss} Click emurate", DateTime.Now); clickEmuration.OnNext(Unit.Default); // OnCompleted待ち gate.WaitOne(); Console.WriteLine("WaitOne Completed");
このコードは、clickEmurationから値が発行されてから2秒間IObservable
21:22:02 Click emurate --Buffer 21:22:04 5 6 7 8 21:22:08 Click emurate --Buffer 21:22:10 17 18 19 20 OnCompleted WaitOne Completed
時間と数でまとめるBufferメソッドのオーバーロード
最後に、紹介するのは時間と数の両方でまとめるBufferメソッドのオーバーロードです。これは、「数でまとめるBufferメソッドのオーバーロード」と「時間でまとめるBufferメソッドのオーバーロード」で紹介した内容をあわせたもので、指定した時間か指定した数のどちらかがそろったタイミングで後続にまとめた値を流します。メソッドのシグネチャを下記に示します。
public static IObservable<IList<T>> Buffer<T>( this IObservable<T> source, TimeSpan timeSpan, int count);
timeSpanで時間間隔を指定し、countでまとめる個数を指定します。コード例を下記に示します。
var gate = new EventWaitHandle(false, EventResetMode.AutoReset); Observable // 0-9の値をi * 200 ms間隔で発行する .Generate( 0, i => i < 10, i => ++i, i => i, i => TimeSpan.FromMilliseconds(i * 200)) // 2秒間隔か2つ値がくるまでまとめる .Buffer(TimeSpan.FromSeconds(2), 2) .Subscribe( l => { // 値を表示 Console.WriteLine("--Buffer {0:HH:mm:ss}", DateTime.Now); foreach (var i in l) { Console.WriteLine(i); } }, () => { // 完了 Console.WriteLine("OnCompleted"); gate.Set(); }); gate.WaitOne();
実行結果を下記に示します。
--Buffer 21:44:37 0 1 --Buffer 21:44:38 2 3 --Buffer 21:44:40 4 5 --Buffer 21:44:42 6 --Buffer 21:44:44 7 --Buffer 21:44:46 8 9 --Buffer 21:44:46 OnCompleted
最初の方は、値の発行間隔が短いためcount引数で指定した2つの値が流れてきていますが、後半になってくると、値の発行間隔が長くなってくるため、timeSpanで指定した間隔で値がまとめられていることが確認できます。