修正 2011/11/06 次回にHotなIObservableを説明するといいつつ説明できないため該当箇所を削除しました。
過去記事インデックス
IObservableのファクトリメソッド
ここでは、Reactive Extensionsで提供されるIObservable
基本的なファクトリメソッドの使用
ここでは、数多くあるObservableクラスのファクトリメソッドから基本的なものをいくつか紹介します。
指定された値を返すIObservableを返すファクトリメソッド
Observable.Returnメソッド
まず、一番動作を理解しやすい指定した値を通知するIObservable
// 10を発行するIObservable<int>を作成する var source = Observable.Return(10); // 購読 var subscription = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読の停止(この場合意味はない) subscription.Dispose();
コメントにあるように、このコードは10という値を発行するIObservable
OnNext(10) Completed()
Returnで作成したIObservable
Observable.Repeatメソッド
次は、同じ値を指定した回数発行するIObservable
// 2を5回発行するIObservable<int>を作成する var source = Observable.Repeat(2, 5); // 購読 var subscription = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読停止(この場合意味はない) subscription.Dispose();
これを実行すると下記のようになります。
OnNext(2) OnNext(2) OnNext(2) OnNext(2) OnNext(2) Completed()
同じ値をひたすら発行しているのがわかります。
Observable.Rangeメソッド
このメソッドは、指定した値から1ずつカウントアップした値を指定した個数だけ返します。コード例を下記に示します。
// 1から始まる値を10個発行するIObservable<int>を作成する var source = Observable.Range(1, 10); // 購読 var subscription = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読停止(この場合意味はない) subscription.Dispose();
1からカウントアップする値を10個発行するので1〜10の値を発行するIObservable
OnNext(1) OnNext(2) OnNext(3) OnNext(4) OnNext(5) OnNext(6) OnNext(7) OnNext(8) OnNext(9) OnNext(10) Completed()
IObservableの拡張メソッドのRepeat
ここまではObservableクラスに定義されたIObservable
// 1から始まる値を3個発行するIObservable<int>を作成する var source = Observable.Range(1, 3); // そして、それを3回繰り返すIObservable<int>を作成する source = source.Repeat(3); // 購読 var subscription = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読停止(この場合意味はない) subscription.Dispose();
実行結果は下記のようになります。1〜3の値が3回発行されているのがわかります。
OnNext(1) OnNext(2) OnNext(3) OnNext(1) OnNext(2) OnNext(3) OnNext(1) OnNext(2) OnNext(3) Completed()
Observable.Generateメソッド
次は、Generateメソッドです。このメソッドはfor文に近い使用感のメソッドになっています。第一引数で初期値状態、第二引数で継続の条件、第三引数で更新処理、第四引数で発行する値の生成を行う処理を渡します。コードを見ていただくとイメージがわきやすいと思うので下記にコード例を示します。
// 初期値0, 値が10より小さい間, 値は1ずつインクリメントして, 値を二乗したものを発行する // IObservable<int>を作成する。 // for (int i = 0; i < 10; i++) { yield return i * i; }のようなイメージ var source = Observable.Generate(0, i => i < 10, i => ++i, i => i * i); // 購読 var subscription = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読停止(この場合意味はない) subscription.Dispose();
このコードを実行すると0〜9までの値を二乗したものを発行するIObservable
OnNext(0) OnNext(1) OnNext(4) OnNext(9) OnNext(16) OnNext(25) OnNext(36) OnNext(49) OnNext(64) OnNext(81) Completed()
0〜9までの値を二乗した値が発行されているのが確認出来ます。
Observable.Deferメソッド
次に紹介するメソッドはDeferメソッドです。このメソッドはIObservable
// 1, 2, 3と順番に値を発行して終了するIObservable<int>を生成する var source = Observable.Defer<int>(() => { Console.WriteLine("# Defar method called."); // ReplaySubject<T>はSubject<T>の亜種でSubscribeされると // 今まで行われた操作を全てリプレイする。 var s = new ReplaySubject<int>(); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnCompleted(); // AsObservableでIObservable<T>へ変換できる。 return s.AsObservable(); }); // 購読(sourceはReplaySubjectで作っているのでDeferメソッド内でした操作が再生される) var subscription1 = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); var subscription2 = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読停止(この場合意味はない) subscription1.Dispose(); subscription2.Dispose();
実行結果は下記のようになります。2回Subscribeしているので2回Deferメソッドが呼ばれていることが確認出来ます。
# Defar method called. OnNext(1) OnNext(2) OnNext(3) Completed() # Defar method called. OnNext(1) OnNext(2) OnNext(3) Completed()
Deferメソッド内で使用しているReplaySubject
# Defar method called. Completed() # Defar method called. Completed()
Deferメソッドで返されたIObservable
Observable.Createメソッド
次は、Createメソッドを紹介します。このメソッドは、引数の形が特殊でIObserver
// 1, 2, 3と順番に値を発行して終了するIObservable<int>を生成する var source = Observable.Create<int>(observer => { Console.WriteLine("# Create method called."); // 引数のIObserver<int>に対してOn****メソッドを呼ぶ observer.OnNext(1); observer.OnNext(2); observer.OnNext(3); observer.OnCompleted(); // Disposeが呼ばれた時の処理を返す。 // リソースを確保していた場合は、ここで解放すると良い。 return () => Console.WriteLine("Disposable action"); }); // 購読 var subscription1 = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); var subscription2 = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読停止(この場合意味はない) Console.WriteLine("# Dispose method call."); subscription1.Dispose(); subscription2.Dispose();
実行結果は、下記のようになります。
## CreateSample # Create method called. OnNext(1) OnNext(2) OnNext(3) Completed() Disposable action # Create method called. OnNext(1) OnNext(2) OnNext(3) Completed() Disposable action # Dispose method call.
注意したいのはDisposable actionの表示されているタイミングです。通常の考えだとsubscription1.Dispose();の呼び出しタイミングでDisposable actionと表示されるように思いますが動作を確認するとDisposeメソッドを呼ぶ前に自動で呼び出されています。これは、Completedの後で、もう購読していても値が来ないためDisposeが自動で行われていることを示しています。これまでのサンプルでもDisposeに特に意味がないと書いていたのはこのためです。
Observable.Throwメソッド
一連の基本的なIObservable
// エラーを発行するだけのIObservable<int>を生成 var source = Observable.Throw<int>(new Exception("Error message")); // 購読 var subscription = source.Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("Completed()")); // 購読停止(この場合意味はない) subscription.Dispose();
実行結果は下記のようになります。OnErrorが呼ばれているのがわかります。
OnError(Error message)
ここまでに紹介したIObservableの動作
ここまで紹介してきたIObservable
- ファクトリで発行する値を指定してIObservable
を作成する。 - Subscribeが呼び出されると以下のような動きをする
- 値を全て発行する
- 終わったらOnCompletedを呼ぶ
- 購読を解除する
- Subscribeが行われる度に2の動作を行う。
上記の図ではOnErrorと二度目のSubscribeについて書いていませんが、基本的にOnErrorの時には例外がIObserver
2011/11/06削除上記のような特徴をもつIObservable