過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
はじめに
前回は、GroupByメソッドを使ってみました。今回はGroupByUntilメソッドを使ってみようと思います。このメソッドを使うとGroupByとは違った一定の間隔や何かが起きたタイミングでグルーピングをいったん締めたりといったことができるので、はまるところで使うととても強力そうなやつです。ということで見ていきましょう。
GroupByUntilメソッド
ここでは、GroupByUntilメソッドについて説明します。これは「SkipUntilとTakeUntilメソッド」で説明したSkipUntilメソッドやTakeUntilメソッドと同じような動きをします。SkipUntilは引数で渡したIObservable
動きの詳しい説明の前にGroupByUntilメソッドのシグネチャを下記に示します。
public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>( this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector )
第三引数にFunc
// 値をランダムで出したいので var r = new Random(); // グループピングの終了を通知するためのIObservable var duration = new Subject<Unit>(); // GroupByUntilの使用例 var subscriber = Observable.Interval(TimeSpan.FromMilliseconds(500)) // 500ミリ秒間隔で乱数発生 .Select(_ => r.Next(1000)) .GroupByUntil( // 1の位の数でグルーピング l => l % 10, // durationでOnNextが発行されるまでの間グルーピングをする。 // 全てのグループに対して、同じIObservableを返しているので、同じタイミングで // グルーピングが終わる。 l => duration.AsObservable()) // 購読 .Subscribe( g => { // GroupByのOnNextの処理 Console.WriteLine("集計開始 {0}", g.Key); // 値を集計 配列にまとめて g.ToArray() // ,区切りの文字列にして .Select(array => string.Join(", ", array)) // 結果を表示 .Subscribe( values => Console.WriteLine("集計結果 Key: {0}, Values: {{{1}}}", g.Key, values), () => Console.WriteLine("集計終了 Key: {0}", g.Key)); }, ex => { // エラー処理(今回の例ではエラーは起きないけど・・・ Console.WriteLine("GroupBy OnError {0}", ex.Message); }, () => { // GroupByの結果がOnCompletedになった時の処理 Console.WriteLine("GroupBy OnCompleted"); }); while(true) { // 待ち Console.WriteLine("Enterを押すまで集計します。(終了したい場合はendと入力してください)"); if (Console.ReadLine() == "end") { break; } // durationのOnNextでいったんグルーピングの集計が一区切り duration.OnNext(Unit.Default); } // 後始末 subscriber.Dispose();
このコードでは0.5秒(500ms)間隔でランダムに0〜999の値を発行しています。そして、発行された値の1の位でグルーピングをしています。GroupByUntilでは、グルーピングの終了のタイミングを通知するIObservableを返すデリゲートで全て同じIObservableを返しています。そのため、全てのグルーピングの終了のタイミングが同じになります。グルーピングの終了のタイミングは、コンソールでEnterを押したタイミングになります。Enterを押すと、今までのグルーピングの結果を表示して、新たにグルーピングを行います。endと入力してEnterを押すと処理が終了します。実行結果を下記に示します。
Enterを押すまで集計します。(終了したい場合はendと入力してください) 集計開始 8 集計開始 4 集計開始 1 集計開始 6 集計開始 9 集計開始 7 集計開始 0 集計開始 5 集計開始 2 集計開始 3 集計結果 Key: 8, Values: {358, 808, 48, 308} 集計終了 Key: 8 集計結果 Key: 4, Values: {724, 544, 594, 604} 集計終了 Key: 4 集計結果 Key: 1, Values: {371, 161, 81, 141, 221} 集計終了 Key: 1 集計結果 Key: 6, Values: {426, 666, 456} 集計終了 Key: 6 集計結果 Key: 9, Values: {309, 379, 689, 19} 集計終了 Key: 9 集計結果 Key: 7, Values: {967, 57, 557, 477} 集計終了 Key: 7 集計結果 Key: 0, Values: {650, 330, 590, 570, 780, 550} 集計終了 Key: 0 集計結果 Key: 5, Values: {545} 集計終了 Key: 5 集計結果 Key: 2, Values: {462, 532, 492, 902, 252, 542, 692, 852} 集計終了 Key: 2 集計結果 Key: 3, Values: {663, 433, 763} 集計終了 Key: 3 Enterを押すまで集計します。(終了したい場合はendと入力してください) 集計開始 2 集計開始 7 集計開始 8 集計開始 0 集計開始 4 集計開始 9 end 集計結果 Key: 2, Values: {112} 集計終了 Key: 2 集計結果 Key: 7, Values: {357} 集計終了 Key: 7 集計結果 Key: 8, Values: {598} 集計終了 Key: 8 集計結果 Key: 0, Values: {850} 集計終了 Key: 0 集計結果 Key: 4, Values: {644} 集計終了 Key: 4 集計結果 Key: 9, Values: {699} 集計終了 Key: 9
長い実行結果ですが、実行後暫く放置してEnterを押して、その後endと入力してEnterを押した実行結果になります。一度Enterを押してグルーピングが終了したあとにも、新たにグルーピングが行われています。これはもとになるObservable.Intervalが終了していないためです。グルーピングが一旦終了したあとに、新たに値が発行されるため再度グルーピングが行われます。このように、GroupByでは素直に書けない一定間隔でのグルーピングなどがGroupByUntilでは行えます。また、今回の例では全てのIGroupedObservableに対して同じIObservableを返していたため、全てのグルーピングの終了タイミングが同じになっていましたが、IGroupedObservableごとに異なるIObservableを返すことで、グルーピングの終了タイミングをグループごとに指定することも出来ます。