かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive Extensions再入門 その40「IObservableの合成はじめました」

過去記事インデックス

IObservableの合成

ここでは、IObservableクラスの合成について説明します。いままで説明してきたメソッドは単一のIObservableのシーケンスに対して操作を行うものでした。それだけでもThrottleやSkip, Take、BufferメソッドやDistictメソッドなどReactive Extensionsを使う上で重要かつ特徴的なメソッドです。これから紹介するメソッドは、複数のIObservableシーケンスを合成して1つのIObservableのシーケンスを作成するメソッド群です。
Reactive Extensionsでは、時間、イベント、非同期などをIObservableのシーケンスとして扱うことができます。つまり、時間、イベント、非同期などを合成して単一のシーケンスにしたあと、Distinctで重複を排除したりBufferなどでまとめたり、Scanメソッドなどで集計をとったりすることが出来ます。これは、Reactive Extensionsの特徴の1つになります。

Mergeメソッド

ここでは、Mergeメソッドについて説明します。Mergeメソッドは名前の通り複数のIObservableのシーケンスにするメソッドです。オーバーロードはいくつかありますが一番シンプルな2つのIObservableのシーケンスを合成するメソッドのシグネチャを以下に示します。

public static IObservable<T> Merge<T>(
    this IObservable<T> first, 
    IObservable<T> second);

このメソッドは、左辺値のIObservableのシーケンスと右辺値のIObservableのシーケンスを1つのシーケンスとしてマージします。コード例を下記に示します。

Console.WriteLine("# 2つのIObservable<T>の統合");
Observable
    // 0〜2の値を1秒間隔で発行
    .Interval(TimeSpan.FromSeconds(1)).Take(3)
    // 1〜3の値に変換
    .Select(i => i + 1)
    // マージ(統合)
    .Merge(
        Observable
            // 0〜2の値を1秒間隔で発行
            .Interval(TimeSpan.FromSeconds(1)).Take(3)
            // 10, 20, 30に変換
            .Select(i => (i + 1) * 10))
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));
// Enter押すまで待機
Console.ReadLine();

1秒間隔で1, 2, 3の値を発行するIObservableのシーケンスと1秒間隔で10, 20, 30の値を発行するIObservableのシーケンスをMergeメソッドで合成して購読しています。実行結果を以下に示します。

# 2つのIObservable<T>の統合
OnNext: 1
OnNext: 10
OnNext: 2
OnNext: 20
OnNext: 3
OnNext: 30
OnCompleted

上記の結果からMergeメソッドではleftとrightで渡したIObservableのシーケンスから発行された値を単純に後続に流していることが確認出来ます。
次に、複数のIObservableのシーケンスを1つに統合するMergeメソッドのオーバーロードについてみていきます。メソッドのシグネチャを以下に示します。

public static IObservable<T> Merge<T>(params IObservable<T>[] sources);

引数のparamsで渡したIObservableを全てマージします。コード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合");
// Observable.Merge<T>(params IObservable<T>[] sources)
Observable.Merge(
    // 3つのIObservable<T>をマージ
    Observable.Range(0, 3),
    Observable.Range(10, 3),
    Observable.Range(100, 3))
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));
// Enter押すまで待機
Console.ReadLine();

上記のコードは3つのIObservableのシーケンスをMergeメソッドで合成して結果を出力しています。実行結果を以下に示します。

# 複数のIObservable<T>の統合
OnNext: 0
OnNext: 10
OnNext: 100
OnNext: 1
OnNext: 11
OnNext: 101
OnNext: 2
OnNext: 12
OnNext: 102
OnCompleted

3つのIObservableのシーケンスが1つに統合されていることが確認できます。
このメソッドで興味深いオーバーロードとして以下のようなものがあります。

public static IObservable<T> Merge<T>(this IObservable<IObservable<T>> sources);

IObservable>の拡張メソッドとして定義されています。このメソッドを使うと、下記のようにSubjectなどのような型からIObservable>を作成してMergeして1つにつなげるということが出来るようになります。コード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合2");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    // IObservable<int>からIObservable<IObservable<int>>に変換
    .Select(i => Observable
        // 0〜2の値を1秒間隔で3つ発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        // 値を変換
        .Select(l => (l + 1) * i))
    // IObservable<IObservable<T>>からIObservable<T>へマージ
    .Merge()
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("# OnCompleted");
source.OnCompleted();

// Enter押すまで待機
Console.ReadLine();

Subjectから発行された値を元にIntervalメソッドを使って3つの値を発行するIObservable型を生成しています。これに対してMergeメソッドを呼び出しIObservable>を1つのIObservableにならしています。実行結果を以下に示します。

# 複数のIObservable<T>の統合2
# OnNext(10)
# OnNext(100)
# OnNext(1000)
# OnCompleted
OnNext: 10
OnNext: 100
OnNext: 1000
OnNext: 200
OnNext: 20
OnNext: 2000
OnNext: 300
OnNext: 3000
OnNext: 30
OnCompleted

3つのIObservableから発行された値が順不同(実行するごとにOnNext: ***の箇所の順番が変わる)で表示されます。このようにMergeメソッドを使用することで、例えば、あるイベントが発行されたタイミングでWebAPIを非同期に実行して結果を受け取って処理を行うという処理を書くことが出来ます。

SelectManyメソッド

ここではSelectManyメソッドについて説明します。SelectManyメソッドには様々なオーバーロードがありますが、一番使用頻度の高いオーバーロードは以下のものになります。

public static IObservable<R> SelectMany<T, R>(
    this IObservable<T> source, 
    Func<T, IObservable<R>> selector);

sourceから発行された値を受け取ってselectorで渡したデリゲート処理を行いIObservableを取得します。sourceから複数の値が発行されると複数のIObservableが内部で作成されますが、それらをすべて1つのIObservableにマージして後続に流します。つまり、Mergeメソッドの最後のサンプルで示したIObservable>の引数を受け取るオーバーロードと同様のことを内部でしています。コード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(SelectMany)");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    // 発行された値からIObservable<T>を作成してマージ(統合)する
    .SelectMany(i => Observable
        // 0〜2の値を1秒間隔で3つ発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        // 値を変換
        .Select(l => (l + 1) * i))
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("# OnCompleted");
source.OnCompleted();

// Enter押すまで待機
Console.ReadLine();

IObservable>型の引数を受け取るMergeメソッドのオーバーロードと同様の処理をSelectManyメソッドで記述しています。実質SelectManyは下記のようにSelectメソッドとMergeメソッドを使って置き換え出来ます。

SelectMany(i => Observable.Return(i)) → Select(i => Observable.Return(i)).Merge()

実行結果を以下に示します。
>|||
# 複数のIObservableの統合(SelectMany)
# OnNext(10)
# OnNext(100)
# OnNext(1000)
# OnCompleted
OnNext: 10
OnNext: 100
OnNext: 1000
OnNext: 20
OnNext: 200
OnNext: 2000
OnNext: 30
OnNext: 3000
OnNext: 300
OnCompleted
|

Mergeメソッドと同様の結果になっていることが確認できます。
ここで説明したSelectManyメソッドのオーバーライド以外に以下のようなオーバーライドがあります。

// IObservable<T>がIEnumerable<T>になった感じ
public static IObservable<R> SelectMany<T, R>(
    this IObservable<T> source, 
    Func<T, IEnumerable<R>> selector);

// sourceから発行された値に関係なくIObservable<T>を作成する場合
public static IObservable<O> SelectMany<T, O>(
    this IObservable<T> source, 
    IObservable<O> other);

// sourceから発行された値を元にIEnumerable<TCollection>を作り、sourceから発行された値と
// IEnumerable<TCollection>の値を元に結果を作成する。その結果を後続に流す。
public static IObservable<R> SelectMany<T, TCollection, R>(
    this IObservable<T> source, 
    Func<T, IEnumerable<TCollection>> collectionSelector, 
    Func<T, TCollection, R> resultSelector);

// sourceから発行された値を元にIObservable<TCollection>を作り、sourceから発行された値と
// IObservable<TCollection>の値を元に結果を作成する。その結果を後続に流す。
public static IObservable<R> SelectMany<T, TCollection, R>(
    this IObservable<T> source, 
    Func<T, IObservable<TCollection>> collectionSelector, 
    Func<T, TCollection, R> resultSelector);

// onNext, onError, onCompletedのそれぞれでIObservable<R>を作成して、マージして返す。
public static IObservable<R> SelectMany<T, R>(
    this IObservable<T> source, 
    Func<T, IObservable<R>> onNext, 
    Func<Exception, IObservable<R>> onError, 
    Func<IObservable<R>> onCompleted);

全てのオーバーロードについてのコード例はここでは割愛します。ここでは上記のメソッドの4番目のコードの使用例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(SelectManyのオーバーロード)");
var source = new Subject<int>();
source
    .SelectMany(
        // sourceから発行された値を元にi, i+1, i+2の値を発行するIObservable<int>を返す
        i => Observable.Range(i, 3),
        // sourceから発行された値とi => Observable.Range(i, 3)で生成された値を
        // 使って最終的に発行される値を作成する。ここでは匿名型にしただけ。
        (x, y) => new { x, y })
    .Subscribe(Console.WriteLine);

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("# OnCompleted");
source.OnCompleted();

// Enter押すまで待機
Console.ReadLine();

上記のコードの実行結果を以下に示します。

# 複数のIObservable<T>の統合(SelectManyのオーバーロード)
# OnNext(10)
{ x = 10, y = 10 }
{ x = 10, y = 11 }
{ x = 10, y = 12 }
# OnNext(100)
{ x = 100, y = 100 }
{ x = 100, y = 101 }
{ x = 100, y = 102 }
# OnNext(1000)
{ x = 1000, y = 1000 }
{ x = 1000, y = 1001 }
{ x = 1000, y = 1002 }
# OnCompleted

sourceから発行された値と、sourceから発行された値を元に作成した結果のIObservableから発行される値を組み合わせるケースで使用します。

Switchメソッド

ここではSwitchメソッドについて説明します。Switchメソッドは、IObservable Merge(self IObservable> source)と同じシグネチャを持ちます。メソッドの定義を以下に示します。

public static IObservable<T> Switch<T>(this IObservable<IObservable<T>> sources);

Mergeメソッドとの違いを確認するために、MergeメソッドとSwitchメソッドで同じ処理を書いて比較します。まず、Mergeメソッドのコード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(Merge)");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    .Select(i => Observable
        // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        // 値を変換
        .Select(l => (l + 1) * i))
    // 統合
    .Merge()
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnCompleted");
source.OnCompleted();

Console.ReadLine();

若干複雑ですが、時間間隔をあけて複数のIObservableのシーケンスが入り乱れて値を発行するようにしています。実行結果を以下に示します。

# 複数のIObservable<T>の統合(Merge)
# OnNext(10)
Sleep 2000ms...
OnNext: 10
OnNext: 20
# OnNext(100)
Sleep 2000ms...
OnNext: 30
OnNext: 100
# OnNext(1000)
Sleep 2000ms...
OnNext: 200
OnNext: 1000
OnNext: 300
# OnCompleted
OnNext: 2000
OnNext: 3000
OnCompleted

10, 100, 1000の3つの値を発行して、そこからさらに3つの値を発行するIObservableのシーケンスを作成しているので、合計で9つの値が発行されます。このコードのMerge部分をSwitchメソッドに書き換えたコードを下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(Switch)");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    .Select(i => Observable
        // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        .Select(l => (l + 1) * i))
    // 最後
    .Switch()
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnCompleted");
source.OnCompleted();

Console.ReadLine();

このコードの実行結果を以下に示します。

# 複数のIObservable<T>の統合(Switch)
# OnNext(10)
Sleep 2000ms...
OnNext: 10
OnNext: 20
# OnNext(100)
Sleep 2000ms...
OnNext: 100
# OnNext(1000)
Sleep 2000ms...
OnNext: 1000
# OnCompleted
OnNext: 2000
OnNext: 3000
OnCompleted

上記の実行結果から確認できるように、Mergeの時と異なって9個の値を発行しているにも関わらず、最終的にOnNextまで値が流れているのが6つになっています。これは、Mergeメソッドが元になるIObservableが発行した値を全て後続に流すのに対して、Switchメソッドは、最後に値を発行したIObservableの値を後続に流します。Switchメソッドは、このような特徴から一定間隔で非同期にデータを取得する処理を実行して、最後に応答が返ってきたところからデータを抽出したいといったケースで利用できます。