かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive Extensions再入門 その17「集計するメソッド」

はじめに

ここでは、IObservableのシーケンスに対して集計を行うメソッドについて説明します。

Aggregateメソッド

ここで紹介するAggregateメソッドは汎用的にIObservableのシーケンスに対して収集・集計するメソッドになります。Aggregateメソッドを使ってIObservableのシーケンスから最大の値を返すMaxメソッドと同等のメソッドは、下記のようになります。

var s = new Subject<int>();

// x, yから値の大きい方を返す
s.Aggregate((x, y) => x > y ? x : y).Subscribe(
    i => Console.WriteLine("Aggregate OnNext({0})", i),
    () => Console.WriteLine("Aggregate OnCompleted()"));

// 値の発行〜完了通知
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(10)");
s.OnNext(10);
Console.WriteLine("OnNext(100)");
s.OnNext(100);
Console.WriteLine("OnNext(50)");
s.OnNext(50);
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(1)
OnNext(10)
OnNext(100)
OnNext(50)
OnNext(1)
OnCompleted()
Aggregate OnNext(100)
Aggregate OnCompleted()

Aggregateメソッドは、2つの値を受け取り1つの値を返すデリゲートを引数に受け取ります。2つの値のうちの第一引数が直前までの集計値で、第二引数が新たにIObservableシーケンスから発行された値になります。この2つの値を使って、任意の集計処理を行い集計結果を戻り値として返します。IObservableから新たな値が発行されると、直前までの集計値と、新たに発行された値を使って集計処理を行います。これをシーケンスの完了まで繰り返し、シーケンスが終了した時点の集計結果を後続に流します。動作確認をするためのコードを下記に示します。

var s = new Subject<int>();

// IObservable<T>のシーケンスから渡される値の合計を求める
s.Aggregate((x, y) =>
{
    // Aggregateの引数に渡されるデリゲートがどのように動くのか確認するためのログ
    Console.WriteLine("log({0}, {1})", x, y);
    // 今までの合計(x)と、新たな値(y)を足して新たな合計値として返す。
    return x + y;
})
// 結果を購読
.Subscribe(
    i => Console.WriteLine("Aggregate OnNext({0})", i),
    () => Console.WriteLine("Aggregate OnCompleted()"));

// 値の発行〜完了通知
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(10)");
s.OnNext(10);
Console.WriteLine("OnNext(100)");
s.OnNext(100);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

Aggregateメソッドの引数に渡しているデリゲート内でlog(x, y)の形式で値を標準出力に出力して、どのようにメソッドが呼ばれているか確認できるようにしています。実行結果を下記に示します。

OnNext(1)
OnNext(10)
log(1, 10)
OnNext(100)
log(11, 100)
OnCompleted()
Aggregate OnNext(111)
Aggregate OnCompleted()

注意して見てほしい点として、最初のOnNextではAggregateの引数で渡したデリゲートが実行されない点です。2つ目のOnNextの呼び出しではじめてAggregateの引数に渡したデリゲートが実行されています。そして、最初のOnNextで渡した値の1が第一引数に、2つ目のOnNextで渡した値の10が第二引数に渡っていることが確認できます。次のOnNextで100を発行すると、第一引数が1 + 10の結果の11で、第二引数に100が渡されてデリゲートが実行されていることが、実行結果からわかります。そして、シーケンスが終了(OnCompleted)すると、Aggregateの返すIObservableから値が後続に流れてSubscribeしているところに1 + 10 + 100の結果である111が渡ってきていることが確認できます。
このAggregateメソッドには初期値を受け取るオーバーロードがあります。初期値を受け取るオーバーロードでは、最初のOnNextでAggregateに渡したデリゲートが呼ばれます。その時の第一引数は、初期値で渡した値で、第二引数が、OnNextで発行された値になります。
コード例を下記に示します。

var s = new Subject<int>();
// IObservable<T>のシーケンスから渡される値の合計を求める
// ただし、初期値として5を使用する
s.Aggregate(5, (x, y) =>
{
    // Aggregateの引数に渡されるデリゲートがどのように動くのか確認するためのログ
    Console.WriteLine("log({0}, {1})", x, y);
    // 今までの合計(x)と、新たな値(y)を足して新たな合計値として返す。
    return x + y;
})
// 結果を購読
.Subscribe(
    i => Console.WriteLine("Aggregate OnNext({0})", i),
    () => Console.WriteLine("Aggregate OnCompleted()"));

// 値の発行〜完了通知
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(10)");
s.OnNext(10);
Console.WriteLine("OnNext(100)");
s.OnNext(100);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

初期値に5を渡している点が1つ前のサンプルコードと異なります。実行結果を下記に示します。

OnNext(1)
log(5, 1)
OnNext(10)
log(6, 10)
OnNext(100)
log(16, 100)
OnCompleted()
Aggregate OnNext(116)
Aggregate OnCompleted()

最初のOnNextからデリゲートが呼ばれていることが確認できます。
因みに、このAggregateメソッドを使うと数値の合計といった集計だけではなく、値の収集も可能です。コード例は下記のようになります。

var s = new Subject<int>();
// IObservable<T>のシーケンスから渡される値を集める
// 初期値が空のリスト
s.Aggregate(new List<int>(), (list, value) =>
{
    // 通知された値をリストの保持しておく
    list.Add(value);
    return list;
})
// 購読。リストの内容を表示
.Subscribe(list =>
{
    foreach (var i in list)
    {
        Console.WriteLine("value : {0}", i);
    }
},
() => Console.WriteLine("Aggregate OnCompleted()"));

// 値の発行〜完了通知
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(10)");
s.OnNext(10);
Console.WriteLine("OnNext(100)");
s.OnNext(100);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(1)
OnNext(10)
OnNext(100)
OnCompleted()
value : 1
value : 10
value : 100
Aggregate OnCompleted()

AggregateメソッドでToListメソッドと同様の処理ができていることが確認できます。

Scanメソッド

次は、Scanメソッドについて説明します。ScanメソッドはAggregateメソッドと同じシグネチャを持ちます。違いは、Aggregateが集計結果しか後続に流さないのに対してScanメソッドは集計経過を後続に流します。Aggregateと同じコードをScanメソッドを使って書き換えたコード例を下記に示します。

var s = new Subject<int>();
// Aggregateで合計値を求めるのと同じ処理をScanメソッドで行う。
s.Scan((x, y) =>
{
    Console.WriteLine("log({0}, {1})", x, y);
    return x + y;
})
// 購読
.Subscribe(
    i => Console.WriteLine("Scan OnNext({0})", i),
    () => Console.WriteLine("Scan OnCompleted()"));

// 値の発行〜完了通知
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(10)");
s.OnNext(10);
Console.WriteLine("OnNext(100)");
s.OnNext(100);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(1)
Scan OnNext(1)
OnNext(10)
log(1, 10)
Scan OnNext(11)
OnNext(100)
log(11, 100)
Scan OnNext(111)
OnCompleted()
Scan OnCompleted()

OnNextが呼び出される度に、Subscribeで購読しているところに値が流れていることが確認できます。これがScanメソッドの挙動になります。Aggregateと同様に初期値を渡すオーバーロードもあります。使用例を下記に示します。

var s = new Subject<int>();
// 初期値5でScanを使い合計を求める
s.Scan(5, (x, y) =>
{
    Console.WriteLine("log({0}, {1})", x, y);
    return x + y;
})
// 購読
.Subscribe(
    i => Console.WriteLine("Scan OnNext({0})", i),
    () => Console.WriteLine("Scan OnCompleted()"));

// 値の発行〜完了通知
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(10)");
s.OnNext(10);
Console.WriteLine("OnNext(100)");
s.OnNext(100);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(1)
log(5, 1)
Scan OnNext(6)
OnNext(10)
log(6, 10)
Scan OnNext(16)
OnNext(100)
log(16, 100)
Scan OnNext(116)
OnCompleted()
Scan OnCompleted()

実行結果から、最初のOnNextで値が発行されたタイミングでScanメソッドに渡したデリゲートが実行されて、結果がSubscribeで購読しているところにながれていることが確認できます。