過去記事インデックス
IObservableインターフェースとIObserverインターフェース
前回の宣言通りに、Reactive Extensionsの機能の要となるIObservable
namespace IObservableIObserverImpl { using System; // 監視する人 class PrintObserver : IObserver<int> { // 監視対象から通知が来たときの処理 public void OnNext(int value) { Console.WriteLine("OnNext({0}) called.", value); } // 完了通知が来たときの処理 public void OnCompleted() { Console.WriteLine("OnCompleted called."); } // エラー通知が来たときの処理 public void OnError(Exception error) { Console.WriteLine("OnError({0}) called.", error.Message); } } }
単純に、OnNextとOnCompletedとOnErrorで標準出力に値を出力しています。次にIObservable
namespace IObservableIObserverImpl { using System; using System.Collections.Generic; /// <summary> /// 監視されるクラス /// </summary> class NumberObservable : IObservable<int> { // 自分を監視してる人を管理するリスト private List<IObserver<int>> observers = new List<IObserver<int>>(); // 自分を監視してる人に通知を行う // 0を渡したらエラー通知 public void Execute(int value) { if (value == 0) { foreach (var obs in observers) { obs.OnError(new Exception("value is 0")); } // エラーが起きたので処理は終了 this.observers.Clear(); return; } foreach (var obs in observers) { obs.OnNext(value); } } // 完了通知 public void Completed() { foreach (var obs in observers) { obs.OnCompleted(); } // 完了したので監視してる人たちをクリア this.observers.Clear(); } // 監視してる人を追加する。 // 戻り値のIDisposableをDisposeすると監視から外れる。 public IDisposable Subscribe(IObserver<int> observer) { this.observers.Add(observer); return new RemoveListDisposable(observers, observer); } // Disposeが呼ばれたらobserverを監視対象から削除する private class RemoveListDisposable : IDisposable { private List<IObserver<int>> observers = new List<IObserver<int>>(); private IObserver<int> observer; public RemoveListDisposable(List<IObserver<int>> observers, IObserver<int> observer) { this.observers = observers; this.observer = observer; } public void Dispose() { if (this.observers == null) { return; } if (observers.IndexOf(observer) != -1) { this.observers.Remove(observer); } this.observers = null; this.observer = null; } } } }
IObservable
namespace IObservableIObserverImpl { using System; class Program { static void Main(string[] args) { // 監視される人を作成 var source = new NumberObservable(); // 監視役を2つ登録 var sbscriber1 = source.Subscribe(new PrintObserver()); var sbscriber2 = source.Subscribe(new PrintObserver()); // 監視される人の処理を実行 Console.WriteLine("## Execute(1)"); source.Execute(1); // 片方を監視する人から解雇 Console.WriteLine("## Dispose"); sbscriber2.Dispose(); // 再度処理を実行 Console.WriteLine("## Execute(2)"); source.Execute(2); // エラーを起こしてみる Console.WriteLine("## Execute(0)"); source.Execute(0); // 完了通知 // もう1つ監視役を追加して完了通知を行う var sbscriber3 = source.Subscribe(new PrintObserver()); Console.WriteLine("## Completed"); source.Completed(); } } }
上記のプログラムは、NumberObservable型の変数sourceにPrintObserverを2つ登録しています。その状態でExecuteメソッドを呼んだりDisposeを読んだりエラーを起こしたりして出力を確認しています。このプログラムを動かすと下記のような結果になります。
## Execute(1) OnNext(1) called. OnNext(1) called. ## Dispose ## Execute(2) OnNext(2) called. ## Execute(0) OnError(value is 0) called. ## Completed OnCompleted called.
最初のExecuteでは、PrintObserverを2つ登録しているので2回OnNextが呼ばれていることが確認出来ます。次に片方をDisposeした後ではExecuteを読んでも1回しかOnNextが呼ばれません。Executeメソッドの引数に0を渡してエラーを起こした場合と処理を完了させたときも、PrintObserverに処理が伝わっていることが確認できます。
Reactive Extensionsの機能を使った書き直し
ここまで書いてきたプログラムは、Reactive Extensionsの説明というよりはIObservable
Reactive Extensionsを使ったプログラムでは、頻繁にIObservable
- void Subscribe
(this IObservable source, Action onNext) - void Subscribe
( this IObservable source, Action onNext, Action onCompleted) - void Subscribe
( this IObservable source, Action onNext, Action onError, Action onCompleted)
この拡張メソッドを使うことでデリゲートでIObserver
// 監視される人を作成 var source = new NumberObservable(); // 2つ監視役を登録 var subscriber1 = source.Subscribe( // OnNext value => Console.WriteLine("OnNext({0}) called.", value), // OnError ex => Console.WriteLine("OnError({0}) called.", ex.Message), // OnCompleted () => Console.WriteLine("OnCompleted() called.")); var subscriber2 = source.Subscribe( // OnNext value => Console.WriteLine("OnNext({0}) called.", value), // OnError ex => Console.WriteLine("OnError({0}) called.", ex.Message), // OnCompleted () => Console.WriteLine("OnCompleted() called.")); // 監視される人の処理を実行 Console.WriteLine("## Execute(1)"); source.Execute(1); // 1つを監視する人から解雇 Console.WriteLine("## Dispose"); subscriber2.Dispose(); // 再度処理を実行 Console.WriteLine("## Execute(2)"); source.Execute(2); // エラーを起こしてみる Console.WriteLine("## Execute(0)"); source.Execute(0); // もう1つ監視役を追加して完了通知を行う var sbscriber3 = source.Subscribe( // OnNext value => Console.WriteLine("OnNext({0}) called.", value), // OnError ex => Console.WriteLine("OnError({0}) called.", ex.Message), // OnCompleted () => Console.WriteLine("OnCompleted() called.")); Console.WriteLine("## Completed"); source.Completed();
引数を3つ受け取るタイプのSubscribe拡張メソッドを使用しているためPrintObserverクラスは不要になります。今回の例では、同じ処理を何度もSubscribeしているのでメソッドとして切り出すなりしたほうがコードの重複が無くなりますが、サンプルとしての見通しのためにあえて1メソッド内に重複コードを書いています。
次にIObservable
namespace UseSubscribeMethod { using System; using System.Reactive.Subjects; class NumberObservable : IObservable<int> { // IObservable<T>とIObserver<T>の両方を兼ねるクラス private Subject<int> source = new Subject<int>(); // 自分を監視してる人に通知を行う // 0を渡したらエラー通知 public void Execute(int value) { if (value == 0) { this.source.OnError(new Exception("value is 0")); // エラー状態じゃないまっさらなSubjectを再作成 this.source = new Subject<int>(); return; } this.source.OnNext(value); } // 完了通知 public void Completed() { this.source.OnCompleted(); } // 監視してる人を追加する。 // 戻り値のIDisposableをDisposeすると監視から外れる。 public IDisposable Subscribe(IObserver<int> observer) { return this.source.Subscribe(observer); } } }
これで、初期のプログラムと同じ動作を実装出来ました。ただし、Reactive Extensionsを普通に使っている範囲では、Subject
以上で、今回は終了です。次回以降では、IObservable