過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
はじめに
ここでは、IObservable
Aggregateメソッド
ここで紹介するAggregateメソッドは汎用的にIObservable
var s = new Subject<int>(); // x, yから値の大きい方を返す s.Aggregate((x, y) => x > y ? x : y).Subscribe( i => Console.WriteLine("Aggregate OnNext({0})", i), () => Console.WriteLine("Aggregate OnCompleted()")); // 値の発行〜完了通知 Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(10)"); s.OnNext(10); Console.WriteLine("OnNext(100)"); s.OnNext(100); Console.WriteLine("OnNext(50)"); s.OnNext(50); Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnCompleted()"); s.OnCompleted();
実行結果を下記に示します。
OnNext(1) OnNext(10) OnNext(100) OnNext(50) OnNext(1) OnCompleted() Aggregate OnNext(100) Aggregate OnCompleted()
Aggregateメソッドは、2つの値を受け取り1つの値を返すデリゲートを引数に受け取ります。2つの値のうちの第一引数が直前までの集計値で、第二引数が新たにIObservable
var s = new Subject<int>(); // IObservable<T>のシーケンスから渡される値の合計を求める s.Aggregate((x, y) => { // Aggregateの引数に渡されるデリゲートがどのように動くのか確認するためのログ Console.WriteLine("log({0}, {1})", x, y); // 今までの合計(x)と、新たな値(y)を足して新たな合計値として返す。 return x + y; }) // 結果を購読 .Subscribe( i => Console.WriteLine("Aggregate OnNext({0})", i), () => Console.WriteLine("Aggregate OnCompleted()")); // 値の発行〜完了通知 Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(10)"); s.OnNext(10); Console.WriteLine("OnNext(100)"); s.OnNext(100); Console.WriteLine("OnCompleted()"); s.OnCompleted();
Aggregateメソッドの引数に渡しているデリゲート内でlog(x, y)の形式で値を標準出力に出力して、どのようにメソッドが呼ばれているか確認できるようにしています。実行結果を下記に示します。
OnNext(1) OnNext(10) log(1, 10) OnNext(100) log(11, 100) OnCompleted() Aggregate OnNext(111) Aggregate OnCompleted()
注意して見てほしい点として、最初のOnNextではAggregateの引数で渡したデリゲートが実行されない点です。2つ目のOnNextの呼び出しではじめてAggregateの引数に渡したデリゲートが実行されています。そして、最初のOnNextで渡した値の1が第一引数に、2つ目のOnNextで渡した値の10が第二引数に渡っていることが確認できます。次のOnNextで100を発行すると、第一引数が1 + 10の結果の11で、第二引数に100が渡されてデリゲートが実行されていることが、実行結果からわかります。そして、シーケンスが終了(OnCompleted)すると、Aggregateの返すIObservable
このAggregateメソッドには初期値を受け取るオーバーロードがあります。初期値を受け取るオーバーロードでは、最初のOnNextでAggregateに渡したデリゲートが呼ばれます。その時の第一引数は、初期値で渡した値で、第二引数が、OnNextで発行された値になります。
コード例を下記に示します。
var s = new Subject<int>(); // IObservable<T>のシーケンスから渡される値の合計を求める // ただし、初期値として5を使用する s.Aggregate(5, (x, y) => { // Aggregateの引数に渡されるデリゲートがどのように動くのか確認するためのログ Console.WriteLine("log({0}, {1})", x, y); // 今までの合計(x)と、新たな値(y)を足して新たな合計値として返す。 return x + y; }) // 結果を購読 .Subscribe( i => Console.WriteLine("Aggregate OnNext({0})", i), () => Console.WriteLine("Aggregate OnCompleted()")); // 値の発行〜完了通知 Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(10)"); s.OnNext(10); Console.WriteLine("OnNext(100)"); s.OnNext(100); Console.WriteLine("OnCompleted()"); s.OnCompleted();
初期値に5を渡している点が1つ前のサンプルコードと異なります。実行結果を下記に示します。
OnNext(1) log(5, 1) OnNext(10) log(6, 10) OnNext(100) log(16, 100) OnCompleted() Aggregate OnNext(116) Aggregate OnCompleted()
最初のOnNextからデリゲートが呼ばれていることが確認できます。
因みに、このAggregateメソッドを使うと数値の合計といった集計だけではなく、値の収集も可能です。コード例は下記のようになります。
var s = new Subject<int>(); // IObservable<T>のシーケンスから渡される値を集める // 初期値が空のリスト s.Aggregate(new List<int>(), (list, value) => { // 通知された値をリストの保持しておく list.Add(value); return list; }) // 購読。リストの内容を表示 .Subscribe(list => { foreach (var i in list) { Console.WriteLine("value : {0}", i); } }, () => Console.WriteLine("Aggregate OnCompleted()")); // 値の発行〜完了通知 Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(10)"); s.OnNext(10); Console.WriteLine("OnNext(100)"); s.OnNext(100); Console.WriteLine("OnCompleted()"); s.OnCompleted();
実行結果を下記に示します。
OnNext(1) OnNext(10) OnNext(100) OnCompleted() value : 1 value : 10 value : 100 Aggregate OnCompleted()
AggregateメソッドでToListメソッドと同様の処理ができていることが確認できます。
Scanメソッド
次は、Scanメソッドについて説明します。ScanメソッドはAggregateメソッドと同じシグネチャを持ちます。違いは、Aggregateが集計結果しか後続に流さないのに対してScanメソッドは集計経過を後続に流します。Aggregateと同じコードをScanメソッドを使って書き換えたコード例を下記に示します。
var s = new Subject<int>(); // Aggregateで合計値を求めるのと同じ処理をScanメソッドで行う。 s.Scan((x, y) => { Console.WriteLine("log({0}, {1})", x, y); return x + y; }) // 購読 .Subscribe( i => Console.WriteLine("Scan OnNext({0})", i), () => Console.WriteLine("Scan OnCompleted()")); // 値の発行〜完了通知 Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(10)"); s.OnNext(10); Console.WriteLine("OnNext(100)"); s.OnNext(100); Console.WriteLine("OnCompleted()"); s.OnCompleted();
実行結果を下記に示します。
OnNext(1) Scan OnNext(1) OnNext(10) log(1, 10) Scan OnNext(11) OnNext(100) log(11, 100) Scan OnNext(111) OnCompleted() Scan OnCompleted()
OnNextが呼び出される度に、Subscribeで購読しているところに値が流れていることが確認できます。これがScanメソッドの挙動になります。Aggregateと同様に初期値を渡すオーバーロードもあります。使用例を下記に示します。
var s = new Subject<int>(); // 初期値5でScanを使い合計を求める s.Scan(5, (x, y) => { Console.WriteLine("log({0}, {1})", x, y); return x + y; }) // 購読 .Subscribe( i => Console.WriteLine("Scan OnNext({0})", i), () => Console.WriteLine("Scan OnCompleted()")); // 値の発行〜完了通知 Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(10)"); s.OnNext(10); Console.WriteLine("OnNext(100)"); s.OnNext(100); Console.WriteLine("OnCompleted()"); s.OnCompleted();
実行結果を下記に示します。
OnNext(1) log(5, 1) Scan OnNext(6) OnNext(10) log(6, 10) Scan OnNext(16) OnNext(100) log(16, 100) Scan OnNext(116) OnCompleted() Scan OnCompleted()
実行結果から、最初のOnNextで値が発行されたタイミングでScanメソッドに渡したデリゲートが実行されて、結果がSubscribeで購読しているところにながれていることが確認できます。