かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その3「IObservableのファクトリメソッド」

修正 2011/11/06
次回にHotなIObservableを説明するといいつつ説明できないため該当箇所を削除しました。

IObservableのファクトリメソッド

ここでは、Reactive Extensionsで提供されるIObservableを作成するファクトリメソッドを紹介します。通常のReactive Extensionsを使ったプログラミングでは、提供されている様々なファクトリメソッドからIObservableを生成して使用します。そのため、どのようなIObservableの生成方法があるかを把握していることはReactive Extensionsを使う上でとても重要な要素になります。ファクトリメソッドは基本的にSystem.Reactive.Linq.Observableクラスのstaticメソッドとして提供されています。メソッドの一覧は下記のMSDNのドキュメントを参照してください。

基本的なファクトリメソッドの使用

ここでは、数多くあるObservableクラスのファクトリメソッドから基本的なものをいくつか紹介します。

指定された値を返すIObservableを返すファクトリメソッド

Observable.Returnメソッド

まず、一番動作を理解しやすい指定した値を通知するIObservableを作成するメソッドから使用します。最初に紹介するメソッドはReturnメソッドになります。これは引数に指定した値を通知するIObservableを作成します。コード例を以下に示します。

// 10を発行するIObservable<int>を作成する
var source = Observable.Return(10);
// 購読
var subscription = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読の停止(この場合意味はない)
subscription.Dispose();

コメントにあるように、このコードは10という値を発行するIObservableを作成しています。実行結果は下記のようになります。

OnNext(10)
Completed()

Returnで作成したIObservableは値を1つ発行すると、それ以降発行する値が無いため終了状態になります。そのためSubscribeをすると実行結果のようにOnNextの後にCompletedが呼ばれます。また、ここでは示していませんが2回Subscribeを行うと値の発行と終了の通知を再び行います。上記のコード例で言うと、2回目のSubscribeの呼び出しでも、OnNext(10)とCompleted()が表示されます。

Observable.Repeatメソッド

次は、同じ値を指定した回数発行するIObservableを返すメソッドになります。コード例を下記に示します。

// 2を5回発行するIObservable<int>を作成する
var source = Observable.Repeat(2, 5);
// 購読
var subscription = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読停止(この場合意味はない)
subscription.Dispose();

これを実行すると下記のようになります。

OnNext(2)
OnNext(2)
OnNext(2)
OnNext(2)
OnNext(2)
Completed()

同じ値をひたすら発行しているのがわかります。

Observable.Rangeメソッド

このメソッドは、指定した値から1ずつカウントアップした値を指定した個数だけ返します。コード例を下記に示します。

// 1から始まる値を10個発行するIObservable<int>を作成する
var source = Observable.Range(1, 10);
// 購読
var subscription = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読停止(この場合意味はない)
subscription.Dispose();

1からカウントアップする値を10個発行するので1〜10の値を発行するIObservableを作成しています。実行例を下記に示します。

OnNext(1)
OnNext(2)
OnNext(3)
OnNext(4)
OnNext(5)
OnNext(6)
OnNext(7)
OnNext(8)
OnNext(9)
OnNext(10)
Completed()
IObservableの拡張メソッドのRepeat

ここまではObservableクラスに定義されたIObservableを返すメソッドを使ってきましたが、ここで少し横道に逸れてIObservableの拡張メソッドとして定義されたRepeatメソッドを紹介したいと思います。この拡張メソッドもObservableクラスに定義されています。これまでのメソッドとの違いは純粋なstaticメソッドではなく、IObservableの拡張メソッドとして定義されている点です。このRepeat拡張メソッドは、IObservableが発行する値を指定した回数繰り返すIObservableを作成します。Rangeメソッドと組み合わせて使用した例を下記に示します。

// 1から始まる値を3個発行するIObservable<int>を作成する
var source = Observable.Range(1, 3);
// そして、それを3回繰り返すIObservable<int>を作成する
source = source.Repeat(3);
// 購読
var subscription = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読停止(この場合意味はない)
subscription.Dispose();

実行結果は下記のようになります。1〜3の値が3回発行されているのがわかります。

OnNext(1)
OnNext(2)
OnNext(3)
OnNext(1)
OnNext(2)
OnNext(3)
OnNext(1)
OnNext(2)
OnNext(3)
Completed()
Observable.Generateメソッド

次は、Generateメソッドです。このメソッドはfor文に近い使用感のメソッドになっています。第一引数で初期値状態、第二引数で継続の条件、第三引数で更新処理、第四引数で発行する値の生成を行う処理を渡します。コードを見ていただくとイメージがわきやすいと思うので下記にコード例を示します。

// 初期値0, 値が10より小さい間, 値は1ずつインクリメントして, 値を二乗したものを発行する
// IObservable<int>を作成する。
// for (int i = 0; i < 10; i++) { yield return i * i; }のようなイメージ
var source = Observable.Generate(0, i => i < 10, i => ++i, i => i * i);
// 購読
var subscription = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読停止(この場合意味はない)
subscription.Dispose();

このコードを実行すると0〜9までの値を二乗したものを発行するIObservableが作成されます。実行結果は下記のようになります。

OnNext(0)
OnNext(1)
OnNext(4)
OnNext(9)
OnNext(16)
OnNext(25)
OnNext(36)
OnNext(49)
OnNext(64)
OnNext(81)
Completed()

0〜9までの値を二乗した値が発行されているのが確認出来ます。

Observable.Deferメソッド

次に紹介するメソッドはDeferメソッドです。このメソッドはIObservableを直接返すラムダ式を引数に渡します。Subscribeメソッドが呼ばれる度に、Deferメソッドが実行されてIObservableが作成されます。コード例を以下に示します。

// 1, 2, 3と順番に値を発行して終了するIObservable<int>を生成する
var source = Observable.Defer<int>(() =>
{
    Console.WriteLine("# Defar method called.");
    // ReplaySubject<T>はSubject<T>の亜種でSubscribeされると
    // 今まで行われた操作を全てリプレイする。
    var s = new ReplaySubject<int>();
    s.OnNext(1);
    s.OnNext(2);
    s.OnNext(3);
    s.OnCompleted();
    // AsObservableでIObservable<T>へ変換できる。
    return s.AsObservable();
});
// 購読(sourceはReplaySubjectで作っているのでDeferメソッド内でした操作が再生される)
var subscription1 = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
var subscription2 = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読停止(この場合意味はない)
subscription1.Dispose();
subscription2.Dispose();

実行結果は下記のようになります。2回Subscribeしているので2回Deferメソッドが呼ばれていることが確認出来ます。

# Defar method called.
OnNext(1)
OnNext(2)
OnNext(3)
Completed()
# Defar method called.
OnNext(1)
OnNext(2)
OnNext(3)
Completed()

Deferメソッド内で使用しているReplaySubjectクラスは、コメント内にあるようにSubjectクラスの親戚のクラスです。SubjectクラスがSubscribeされる前の操作は通知しないのに対してReplaySubjectクラスは、Subscribeされる前の操作も通知する点が異なります。試しにDeferメソッド内のReplaySubjectを普通のSubjectに変更して実行すると下記のような結果になります。

# Defar method called.
Completed()
# Defar method called.
Completed()

Deferメソッドで返されたIObservableは既に通知する値が無く完了した状態になっているため、OnCompletedだけが実行されます。

Observable.Createメソッド

次は、Createメソッドを紹介します。このメソッドは、引数の形が特殊でIObserverを受け取ってActionを返すラムダ式を引数に受け取ります。引数で受け取るIObserverにはOnNextやOnErrrorやOnCompletedなどの操作を行います。ここで行った操作に応じた値がCreateメソッドの戻り値のIObservableで発行される値になります。最期に戻り値のActionですが、これはDisposeされた時に実行される処理になります。Createメソッド内でリソースやイベントの購読などをしていた場合に解放する処理を行うと良いと思います。では、コード例を下記に示します。

// 1, 2, 3と順番に値を発行して終了するIObservable<int>を生成する
var source = Observable.Create<int>(observer =>
{
    Console.WriteLine("# Create method called.");
    // 引数のIObserver<int>に対してOn****メソッドを呼ぶ
    observer.OnNext(1);
    observer.OnNext(2);
    observer.OnNext(3);
    observer.OnCompleted();
    // Disposeが呼ばれた時の処理を返す。
    // リソースを確保していた場合は、ここで解放すると良い。
    return () => Console.WriteLine("Disposable action");
});
// 購読
var subscription1 = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
var subscription2 = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読停止(この場合意味はない)
Console.WriteLine("# Dispose method call.");
subscription1.Dispose();
subscription2.Dispose();

実行結果は、下記のようになります。

## CreateSample
# Create method called.
OnNext(1)
OnNext(2)
OnNext(3)
Completed()
Disposable action
# Create method called.
OnNext(1)
OnNext(2)
OnNext(3)
Completed()
Disposable action
# Dispose method call.

注意したいのはDisposable actionの表示されているタイミングです。通常の考えだとsubscription1.Dispose();の呼び出しタイミングでDisposable actionと表示されるように思いますが動作を確認するとDisposeメソッドを呼ぶ前に自動で呼び出されています。これは、Completedの後で、もう購読していても値が来ないためDisposeが自動で行われていることを示しています。これまでのサンプルでもDisposeに特に意味がないと書いていたのはこのためです。

Observable.Throwメソッド

一連の基本的なIObservableを作成するメソッドの締めくくりとして最後にThrowメソッドを紹介します。これは引数に例外を渡します。疑似的にエラーを起こしたいときに使う以外に使用方法が思いつきません。コード例を下記に示します。

// エラーを発行するだけのIObservable<int>を生成
var source = Observable.Throw<int>(new Exception("Error message"));
// 購読
var subscription = source.Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("Completed()"));
// 購読停止(この場合意味はない)
subscription.Dispose();

実行結果は下記のようになります。OnErrorが呼ばれているのがわかります。

OnError(Error message)

ここまでに紹介したIObservableの動作

ここまで紹介してきたIObservableは、全て下記のような特徴があります。

  1. ファクトリで発行する値を指定してIObservableを作成する。
  2. Subscribeが呼び出されると以下のような動きをする
    1. 値を全て発行する
    2. 終わったらOnCompletedを呼ぶ
    3. 購読を解除する
  3. Subscribeが行われる度に2の動作を行う。

この動作を表した図を以下に示します。

上記の図ではOnErrorと二度目のSubscribeについて書いていませんが、基本的にOnErrorの時には例外がIObserverに通知されます。また、一度のSubscribeでIObservableが空になるように見えますがデータの流れを示すために空にしているだけで実際には再度Subscribeを行うと値が発行されます。
2011/11/06削除上記のような特徴をもつIObservableをColdなIObservableと言います。Coldと対となるものとしてHotなIObservableも存在します。次回は、HotなIObservableについてみていこうと思います。