かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その44「And, Then, Whenメソッド」

過去記事インデックス

はじめに

合成系メソッドの紹介も今回で最後の予定です!今回は、個人的に理解にてこずったWhenメソッドと、その周辺メソッド達です。

Whenメソッド

ここではWhenメソッドについて説明します。Whenメソッドのシグネチャを以下に示します。

public static IObservable<TResult> When<TResult>(params Plan<TResult>[] plans);
public static IObservable<TResult> When<TResult>(this IEnumerable<Plan<TResult>> plans);

Whenメソッドには2つのオーバーロードがあり、片方がPlan型の可変長引数で、もう一方がIEnumerable>の拡張メソッドです。どちらも複数のPlanを纏めるという意味合いになります。

Planクラスの作成

Whenメソッドの引数に渡すPlanクラスの作成方法について説明します。Planクラスのインスタンスを作成する前に、まずPatternというクラスのインスタンスを用意します。PatternクラスはAndというメソッドで作成します。AndメソッドはIObservableのシーケンスを引数に取る拡張メソッドとして定義されています。シグネチャを以下に示します。

public static Pattern<TLeft, TRight> And<TLeft, TRight>(
    this IObservable<TLeft> left, 
    IObservable<TRight> right);

この戻り値のPatternクラスにはIObservableを受け取るAndというメソッドが定義されています。シグネチャを以下に示します。

public Pattern<T1, T2, T3> And<T3>(IObservable<T3> other);

このように、このPatternにもIObservableを受け取ってPattern型の値を返すメソッドが定義されています。このように、Andメソッドを使って複数のIObservableのシーケンスを繋げてPatternクラスにまとめることが出来ます。Patternクラスの型引数は、16個まで定義されているので事実上数を気にすることなくつなげていくことが出来ます。
Patternクラスには型引数の数の引数を受け取るデリゲートを受け取るThenメソッドが定義されています。Patternクラスの場合のThenメソッドのシグネチャを以下に示します。

public Plan<TResult> Then<TResult>(Func<T1, T2, T3, TResult> selector);

このメソッドからPlanクラスのインスタンスが作成されます。Thenメソッドで渡したデリゲートは、それまでのAndで繋いだIObservableのシーケンスの全てから値が発行されたタイミングで実行されます。

Whenメソッドの使用例

では、上記のAndメソッドとThenメソッドを使ってPlanクラスのインスタンスを作成するコード例を下記に示します。

var plan = Observable
    // plan1という文字列を10個発行するIObservable<string>
    .Return("plan1").Repeat(10)
    // 1秒間隔で0からのカウントアップにタイムスタンプをつけるIObservable<Timestamped<long>>
    .And(Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp())
    // 100〜110の値を発行するIObservable<int>
    .And(Observable.Range(100, 10))
    // 3つの値を文字列として纏める
    .Then((planName, timestamped, value) => 
        string.Format("{0} {1} {2}", planName, timestamped, value));
// WhenでPlan<string>からIObservable<string>にして購読
Observable.When(plan).Subscribe(
    s => Console.WriteLine("OnNext: {0}", s),
    () => Console.WriteLine("OnCompleted"));
Console.ReadLine();

コメントにある通りですが、無限に値を発行し続けるIObservableのシーケンスから10個の値を発行するIObservableのシーケンスをAndで繋いでThenメソッドで文字列化しています。そして、Whenメソッドに渡して購読しています。Thenメソッドは、全てのAndメソッドから値が発行されたタイミングで実行されるため、もっとも短い10個しか値を発行しないIObservableのシーケンスが終了したタイミングでThenメソッドも呼ばれなくなります。そのため、このPlanから生成したIObservableのシーケンスは10個しか値を発行しないということになります。実行結果を以下に示します。

OnNext: plan1 0@2012/02/22 23:47:09 +09:00 100
OnNext: plan1 1@2012/02/22 23:47:10 +09:00 101
OnNext: plan1 2@2012/02/22 23:47:11 +09:00 102
OnNext: plan1 3@2012/02/22 23:47:12 +09:00 103
OnNext: plan1 4@2012/02/22 23:47:13 +09:00 104
OnNext: plan1 5@2012/02/22 23:47:14 +09:00 105
OnNext: plan1 6@2012/02/22 23:47:15 +09:00 106
OnNext: plan1 7@2012/02/22 23:47:16 +09:00 107
OnNext: plan1 8@2012/02/22 23:47:17 +09:00 108
OnNext: plan1 9@2012/02/22 23:47:18 +09:00 109
OnCompleted

Andで連結したIObservableのシーケンスから発行された値をThenで加工した結果が表示されていることが確認できます。また、10個の値が発行されたらOnCompletedが呼ばれていることも確認できます。
次に、複数のPlanをWhenメソッドに渡した場合のコード例を下記に示します。

var plan1 = Observable
    // plan1という文字列を10個発行するIObservable<string>
    .Return("plan1").Repeat(10)
    // 1秒間隔で0からのカウントアップにタイムスタンプをつけるIObservable<Timestamped<long>>
    .And(Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp())
    // 100〜110の値を発行するIObservable<int>
    .And(Observable.Range(100, 10))
    // 3つの値を文字列として纏める
    .Then((planName, timestamped, value) =>
        string.Format("{0} {1} {2}", planName, timestamped, value));
                
var plan2 = Observable
    // plan2という文字列を20個発行するIObservable<string>
    .Return("plan2").Repeat(20)
    // 0.5s間隔で0から値を発行していくIObservable<long>
    .And(Observable.Interval(TimeSpan.FromSeconds(0.5)))
    // Thenで文字列に纏める
    .Then((s, l) => string.Format("{0} {1}", s, l));

Observable.When(plan1, plan2).Subscribe(
    s => Console.WriteLine("OnNext: {0}", s),
    () => Console.WriteLine("OnCompleted"));

plan1は、最初の例と同じように作成しています。plan2は、20個のplan2という文字列と、0.5秒間隔で0から値をカウントアップしていくIObservableをAndで繋いでThen内で文字列化しています。この2つのWhenで繋いでIObservableに変換して購読しています。実行結果を以下に示します。

OnNext: plan2 0
OnNext: plan1 0@2012/02/22 23:53:21 +09:00 100
OnNext: plan2 1
OnNext: plan2 2
OnNext: plan2 3
OnNext: plan1 1@2012/02/22 23:53:22 +09:00 101
OnNext: plan2 4
OnNext: plan2 5
OnNext: plan1 2@2012/02/22 23:53:23 +09:00 102
OnNext: plan2 6
OnNext: plan2 7
OnNext: plan1 3@2012/02/22 23:53:24 +09:00 103
OnNext: plan2 8
OnNext: plan2 9
OnNext: plan1 4@2012/02/22 23:53:25 +09:00 104
OnNext: plan2 10
OnNext: plan2 11
OnNext: plan1 5@2012/02/22 23:53:26 +09:00 105
OnNext: plan2 12
OnNext: plan2 13
OnNext: plan1 6@2012/02/22 23:53:27 +09:00 106
OnNext: plan2 14
OnNext: plan2 15
OnNext: plan1 7@2012/02/22 23:53:28 +09:00 107
OnNext: plan2 16
OnNext: plan2 17
OnNext: plan1 8@2012/02/22 23:53:29 +09:00 108
OnNext: plan2 18
OnNext: plan1 9@2012/02/22 23:53:30 +09:00 109
OnNext: plan2 19
OnCompleted

plan1から発行された値と、plan2から発行された値が混在して表示されていることが確認できます。このことからWhenで繋いだPlanから発行された値はMergeメソッドで合成したように、値が発行された順番で後続に流されていることがわかります。

まとめ

このようにAndメソッドは使ってZipメソッドのように複数のIObservableのシーケンスを合成してThenメソッドで値の変換を行うPlanクラスのインスタンスを作成します。そしてWhenメソッドにPlanを複数渡すことでAndで繋いだものをMergeメソッドで合成したように繋げることが出来ます。
恐らく合成系のメソッドの中で一番自由度が高く複雑な合成が出来るメソッドになります。