かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その41「どんどん合成するよ」

過去記事インデックス

はじめに

前回に引き続き、どんどんIObservableを合成するメソッドを試してみようと思います!!

Concatメソッド

ここでは、Concatメソッドについて説明します。Concatメソッドは、その名前の通り複数のIObservableのシーケンスを直列で繋ぎます。例えば、3つのIObservableのシーケンスを渡すと1つ目のIObservableのシーケンスが完了するまでの間は1つ目のIObservableのシーケンスを後続に流します。1つ目のIObservableのシーケンスが完了すると、2つ目のIObservableのシーケンスの値を流します。2つ目のIObservableのシーケンスが完了すると、3つ目のIObservableのシーケンスの値を流します。そして3つ目のIObservableのシーケンスが完了すると、完了したということを後続に通知します。Concatメソッドには2つのシーケンスを繋ぐことに特化したオーバーロードと、複数のシーケンスを繋ぐためのメソッドのオーバーロードがあります。このメソッドのオーバーロードを以下に示します。

// 引数で渡した全てのIObservable<T>のシーケンスを繋ぐ
public static IObservable<T> Concat<T>(params IObservable<T>[] sources);
// IEnumerable<IObservable<T>>から取得できるすべてのIObservable<T>を繋ぐ
public static IObservable<T> Concat<T>(this IEnumerable<IObservable<T>> sources);
// IObservable<IObservable<T>>から発行されるすべてのIObservable<T>を繋ぐ
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> sources);
// firstとsecondを繋ぐ
public static IObservable<T> Concat<T>(this IObservable<T> first, IObservable<T> second);

Concatメソッドの動作を示すためのコード例を下記に示します。

Observable
    // IObservable<T>のシーケンスを直列につなげる
    .Concat(
        // "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "1st: " + i).Take(3),
        // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "2nd: " + i).Take(3),
        // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "3rd: " + i).Take(3))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservableのシーケンスを3つ作成して、Concatメソッドで繋いでいます。1秒間隔で値を発行するため、Mergeメソッドでは1つ目と2つ目と3つ目のIObservableのシーケンスの発行する値が混在しますが、Concatは前のIObservableのシーケンスが完了しない限り後ろのIObservableのシーケンスが発行する値を、後続に流さないため上記のサンプルではSubscribeのOnNextに値が渡る順番が保障されています。実行結果を以下に示します。

OnNext: 1st: 0
OnNext: 1st: 1
OnNext: 1st: 2
OnNext: 2nd: 0
OnNext: 2nd: 1
OnNext: 2nd: 2
OnNext: 3rd: 0
OnNext: 3rd: 1
OnNext: 3rd: 2
OnCompleted

前のIObservableのシーケンスが完了するまで次のIObservableのシーケンスが発行される値がOnNextに流れていないことが確認できます。このプログラムは、可変長引数のオーバーロードを使いましたが、下記のように別のオーバーロードを使っても同じように書くことができます。

// "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する
Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(i => "1st: " + i)
    .Take(3)
    .Concat(
        // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => "2nd: " + i)
            .Take(3))
        .Concat(
        // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => "3rd: " + i)
            .Take(3))
// 購読
.Subscribe(
    s => Console.WriteLine("OnNext: {0}", s),
    () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

実行結果は同じため省略します。

Zipメソッド

ここでは、Zipメソッドについて説明します。Zipメソッドは、2つのIObservableのシーケンスから値が発行されるのを待って、2つの値を変換して、変換した結果を後ろに流すメソッドです。メソッドのシグネチャを下記に示します。

public static IObservable<R> Zip<T, U, R>(
    this IObservable<T> first, 
    IObservable<U> second, 
    Func<T, U, R> resultSelector);

firstから発行された値とsecondから発行された値をresultSelectorで変換して後ろに流します。また、firstかsecondのどちらかのIObservableのシーケンスが完了状態になった段階で後続に完了通知を発行します。このメソッドのコード例を下記に示します。

// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3)
    // Zipでまとめる
    .Zip(
        // 100, 101, 102を発行するIObservable<T>のシーケンス
        Observable.Range(100, 3), 
        // 渡された値を元に文字列化
        (l, r) => string.Format("{0}-{1}", l, r))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

1秒間隔で0, 1, 2の値を発行するIObservableのシーケンスと100, 101, 102の値を発行するIObservableのシーケンスをZipメソッドで合成しています。実行結果を以下に示します。

OnNext: 0-100
OnNext: 1-101
OnNext: 2-102
OnCompleted

Zipメソッドで合成された3つの値がOnNextに渡ってきていることが確認できます。上記のプログラムを少し変更して片方のIObservableのシーケンスが発行する値の数を1つにしたコードを下記に示します。

// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3)
    // Zipでまとめる
    .Zip(
        // 100を発行するIObservable<T>のシーケンス
        Observable.Return(100), 
        // 渡された値を元に文字列化
        (l, r) => string.Format("{0}-{1}", l, r))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

IntervalメソッドからTake(3)で3つの値を発行するIObservableのシーケンスとReturnメソッドで1つの値しか発行しないIObservableのシーケンスをZipメソッドで合成しています。実行結果を下記に示します。

OnNext: 0-100
OnCompleted

OnNextが1回しか実行されていないことが確認できます。このことからZipメソッドが、合成対象のIObservableのシーケンスのどちらか片方が終了した段階で、後続に完了通知を発行することが確認できます。

Ambメソッド

ここではAmbメソッドについて説明します。Ambメソッドは複数のIObservableのシーケンスの中から一番最初に値を発行したIObservableのシーケンスの値を後続に流すメソッドです。このメソッドのオーバーロードを以下に示します。

// 引数で渡した全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す
public static IObservable<T> Amb<T>(params IObservable<T>[] sources);
// sources内の全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す
public static IObservable<T> Amb<T>(this IEnumerable<IObservable<T>> sources);
// firstとsecondのうち最初に値を発行したものの値を後ろに流す
public static IObservable<T> Amb<T>(this IObservable<T> first, IObservable<T> second);

他の合成系メソッドと同様に可変長引数やIEnumerable>の拡張メソッドや2つの合成に特化したオーバーロードが用意されています。Ambメソッドのコード例を下記に示します。

Observable.Amb(
    // 3秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => "3sec"),
    // 10秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(10)).Select(_ => "10sec"),
    // 2秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(2)).Select(_ => "2sec"),
    // 6秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(6)).Select(_ => "6sec"),
    // 22秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(22)).Select(_ => "22sec"))
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Timerメソッドを使って、指定した時間が経過した後に、値を発行するIObservableのシーケンスをAmbメソッドで合成しています。実行結果を以下に示します。

OnNext: 2sec
OnCompleted

引数で渡した中で一番早く値を発行するObservable.Timer(TimeSpan.FromSecond(2)).Select(_ => “2sec”)の結果がOnNextやOnCompletedに渡っていることが確認できます。

CombineLatestメソッド

ここでは、CombineLatestメソッドについて説明します。CombineLatestメソッドはZipメソッドのように2つのIObservableのシーケンスを合成するメソッドです。Zipメソッドが両方のIObservableのシーケンスから値が発行されるのを待つのに対してCombineLatestメソッドは、どちらか一方から値が発行されると、もう一方の最後に発行した値があるか確認し、あればそれを使って合成処理を行い、後続に値を流します。このメソッドのシグネチャを以下に示します。

public static IObservable<R> CombineLatest<F, S, R>(
    this IObservable<F> first, 
    IObservable<S> second, 
    Func<F, S, R> resultSelector);

このメソッドのコード例を下記に示します。

var source1 = new Subject<string>();
var source2 = new Subject<int>();

source1
    // source1とsource2を合成
    .CombineLatest(
        source2, 
        // 発行された値を結合して文字列化
        (l, r) => string.Format("{0}-{1}", l, r))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

// 適当に値を発行する
Console.WriteLine("source1.OnNext(foo)");
source1.OnNext("foo");
Console.WriteLine("source2.OnNext(100)");
source2.OnNext(100);
Console.WriteLine("source2.OnNext(200)");
source2.OnNext(200);
Console.WriteLine("source1.OnNext(bar)");
source1.OnNext("bar");
Console.WriteLine("source1.OnNext(boo)");
source1.OnNext("boo");

// source1完了
Console.WriteLine("source1.OnCompleted()");
source1.OnCompleted();
// source1完了後にsource2から値を発行する
Console.WriteLine("source2.OnNext(200)");
source2.OnNext(999);
// source2完了
Console.WriteLine("source2.OnCompleted()");
source2.OnCompleted();

このコードの実行結果を以下に示します。

source1.OnNext(foo)
source2.OnNext(100)
OnNext: foo-100
source2.OnNext(200)
OnNext: foo-200
source1.OnNext(bar)
OnNext: bar-200
source1.OnNext(boo)
OnNext: boo-200
source1.OnCompleted()
source2.OnNext(200)
OnNext: boo-999
source2.OnCompleted()
OnCompleted

最初のOnNextでは、source1とsource2の値が両方発行されるまで実行されませんが、それ以降は、source1かsource2のどちらかから値が発行されるとOnNextに値が発行されていることが確認できます。このことから、CombineLatestメソッドの特徴である、最後に発行された値をキャッシュしているということがわかります。そして、CombineLatestメソッドは両方のシーケンスが完了状態になった時点で完了通知を行うという動作も確認できます。これは、片方が完了しても、もう一方から値が発行されたら最後に発行された値を元に処理を継続できるためです。