かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」

過去記事インデックス

IObservableインターフェースとIObserverインターフェース

前回の宣言通りに、Reactive Extensionsの機能の要となるIObservableとIObserverを実装して、その動作を確認します。まずは、Observerパターンでの監視役となるIObserverから実装を行います。IObserverインターフェースは、先に示したようにOnNextとOnErrorとOnCompletedの3つのメソッドからなります。この3種類のメソッド内にIObservableから通知された値を受け取った時の処理を行います。ここでは、IObservableから通知された値を単純にコンソールに出力するものを作成します。

namespace IObservableIObserverImpl
{
    using System;

    // 監視する人
    class PrintObserver : IObserver<int>
    {
        // 監視対象から通知が来たときの処理
        public void OnNext(int value)
        {
            Console.WriteLine("OnNext({0}) called.", value);
        }

        // 完了通知が来たときの処理
        public void OnCompleted()
        {
            Console.WriteLine("OnCompleted called.");
        }

        // エラー通知が来たときの処理
        public void OnError(Exception error)
        {
            Console.WriteLine("OnError({0}) called.", error.Message);
        }
    }
}

単純に、OnNextとOnCompletedとOnErrorで標準出力に値を出力しています。次にIObservableを実装したコードを示します。

namespace IObservableIObserverImpl
{
    using System;
    using System.Collections.Generic;

    /// <summary>
    /// 監視されるクラス
    /// </summary>
    class NumberObservable : IObservable<int>
    {
        // 自分を監視してる人を管理するリスト
        private List<IObserver<int>> observers = new List<IObserver<int>>();

        // 自分を監視してる人に通知を行う
        // 0を渡したらエラー通知
        public void Execute(int value)
        {
            if (value == 0)
            {
                foreach (var obs in observers)
                {
                    obs.OnError(new Exception("value is 0"));
                }

                // エラーが起きたので処理は終了
                this.observers.Clear();
                return;
            }

            foreach (var obs in observers)
            {
                obs.OnNext(value);
            }
        }

        // 完了通知
        public void Completed()
        {
            foreach (var obs in observers)
            {
                obs.OnCompleted();
            }
            // 完了したので監視してる人たちをクリア
            this.observers.Clear();
        }

        // 監視してる人を追加する。
        // 戻り値のIDisposableをDisposeすると監視から外れる。
        public IDisposable Subscribe(IObserver<int> observer)
        {
            this.observers.Add(observer);
            return new RemoveListDisposable(observers, observer);
        }

        // Disposeが呼ばれたらobserverを監視対象から削除する
        private class RemoveListDisposable : IDisposable
        {
            private List<IObserver<int>> observers = new List<IObserver<int>>();
            private IObserver<int> observer;

            public RemoveListDisposable(List<IObserver<int>> observers, IObserver<int> observer)
            {
                this.observers = observers;
                this.observer = observer;
            }

            public void Dispose()
            {
                if (this.observers == null)
                {
                    return;
                }

                if (observers.IndexOf(observer) != -1)
                {
                    this.observers.Remove(observer);
                }

                this.observers = null;
                this.observer = null;
            }
        }
    }
}

IObservableの実装はIObserverに比べて複雑になっています。これは、IObservableインターフェースがIObserverを自分自身の監視役として登録するSubscribeメソッドしか提供していないため、その他の監視役のIObserverの保持や、Subscribeメソッドの戻り値のIDosposableのDisposeを呼び出したときの監視役解除の処理を作りこんでいるためです。どちらも一般的なC#によるプログラミングの範囲の内容になるので詳細は割愛します。最期に、このPrintObserverクラスとNumberObservableクラスを使ったサンプルプログラムを以下に示します。

namespace IObservableIObserverImpl
{
    using System;

    class Program
    {
        static void Main(string[] args)
        {
            // 監視される人を作成
            var source = new NumberObservable();
            // 監視役を2つ登録
            var sbscriber1 = source.Subscribe(new PrintObserver());
            var sbscriber2 = source.Subscribe(new PrintObserver());

            // 監視される人の処理を実行
            Console.WriteLine("## Execute(1)");
            source.Execute(1);
            // 片方を監視する人から解雇
            Console.WriteLine("## Dispose");
            sbscriber2.Dispose();
            // 再度処理を実行
            Console.WriteLine("## Execute(2)");
            source.Execute(2);
            // エラーを起こしてみる
            Console.WriteLine("## Execute(0)");
            source.Execute(0);
            // 完了通知
            // もう1つ監視役を追加して完了通知を行う
            var sbscriber3 = source.Subscribe(new PrintObserver());
            Console.WriteLine("## Completed");
            source.Completed();
        }
    }
}

上記のプログラムは、NumberObservable型の変数sourceにPrintObserverを2つ登録しています。その状態でExecuteメソッドを呼んだりDisposeを読んだりエラーを起こしたりして出力を確認しています。このプログラムを動かすと下記のような結果になります。

## Execute(1)
OnNext(1) called.
OnNext(1) called.
## Dispose
## Execute(2)
OnNext(2) called.
## Execute(0)
OnError(value is 0) called.
## Completed
OnCompleted called.

最初のExecuteでは、PrintObserverを2つ登録しているので2回OnNextが呼ばれていることが確認出来ます。次に片方をDisposeした後ではExecuteを読んでも1回しかOnNextが呼ばれません。Executeメソッドの引数に0を渡してエラーを起こした場合と処理を完了させたときも、PrintObserverに処理が伝わっていることが確認できます。

Reactive Extensionsの機能を使った書き直し

ここまで書いてきたプログラムは、Reactive Extensionsの説明というよりはIObservableインターフェースとIObserverインターフェースを実装して使用しただけのObserverパターンの1実装例です。ここでは、このプログラムをReactive Extensionsが提供する便利な拡張メソッドやクラスを使って書き換えを行います。
Reactive Extensionsを使ったプログラムでは、頻繁にIObservableインターフェースのSubscribeメソッドを使用して通知に対するアクションを設定します。この時、実行したい処理の単位でクラスを作成するのは現実的ではありません。そのため、Reactive ExtensionsではSystem.ObservableExtensionsクラスでIObservableインターフェースの拡張メソッドを定義しています。主な拡張メソッドは下記の3つになります。どれもデリゲートを受け取るタイプになります。

  • void Subscribe(this IObservable source, Action onNext)
  • void Subscribe( this IObservable source, Action onNext, Action onCompleted)
  • void Subscribe( this IObservable source, Action onNext, Action onError, Action onCompleted)

この拡張メソッドを使うことでデリゲートでIObserverインターフェースのOnNextメソッドとOnErrorメソッドとOnCompletedメソッドを指定するだけで内部でIObserverを継承したクラスを作成してIObservableインターフェースのSubscribe(IObserver)メソッドへ渡してくれます。このため、Reactive Extensionsを使う上ではIObserverインターフェースを実装することは、ほぼ無くなります。(私は今まで実際の処理を書いていて実装したことは有りません)上記の拡張メソッドを使うためにはReactive Extensionsをプロジェクトの参照に追加します。NugetでRx-Mainという名前のパッケージをプロジェクトに追加します。(Express Editionの方はnugetコマンドラインをインストールしてnuget install rx-mainでダウンロードされるので手動で参照に追加してください)そして、Mainのプログラムを下記のように変更します。

// 監視される人を作成
var source = new NumberObservable();
// 2つ監視役を登録
var subscriber1 = source.Subscribe(
    // OnNext
    value => Console.WriteLine("OnNext({0}) called.", value),
    // OnError
    ex => Console.WriteLine("OnError({0}) called.", ex.Message),
    // OnCompleted
    () => Console.WriteLine("OnCompleted() called."));
var subscriber2 = source.Subscribe(
    // OnNext
    value => Console.WriteLine("OnNext({0}) called.", value),
    // OnError
    ex => Console.WriteLine("OnError({0}) called.", ex.Message),
    // OnCompleted
    () => Console.WriteLine("OnCompleted() called."));

// 監視される人の処理を実行
Console.WriteLine("## Execute(1)");
source.Execute(1);
// 1つを監視する人から解雇
Console.WriteLine("## Dispose");
subscriber2.Dispose();
// 再度処理を実行
Console.WriteLine("## Execute(2)");
source.Execute(2);
// エラーを起こしてみる
Console.WriteLine("## Execute(0)");
source.Execute(0);
// もう1つ監視役を追加して完了通知を行う
var sbscriber3 = source.Subscribe(
    // OnNext
    value => Console.WriteLine("OnNext({0}) called.", value),
    // OnError
    ex => Console.WriteLine("OnError({0}) called.", ex.Message),
    // OnCompleted
    () => Console.WriteLine("OnCompleted() called."));
Console.WriteLine("## Completed");
source.Completed();

引数を3つ受け取るタイプのSubscribe拡張メソッドを使用しているためPrintObserverクラスは不要になります。今回の例では、同じ処理を何度もSubscribeしているのでメソッドとして切り出すなりしたほうがコードの重複が無くなりますが、サンプルとしての見通しのためにあえて1メソッド内に重複コードを書いています。
次にIObservableの実装クラスですが、これもIObservableを実装したクラスが提供されています。System.Reactive.Subjects.Subjectクラスがそれにあたります。このクラスはIObservableインターフェースとIObserverインターフェースの両方を実装していてOnNextやOnErrorやOnCompletedなどのIObserverインターフェースで提供されているメソッドを呼び出すとIObservableインターフェースのSubscribeメソッドで監視対象として追加されているIObserverに処理を通知します。このクラスをラップする形で使うと簡単にIObservableインターフェースを実装することが出来ます。

namespace UseSubscribeMethod
{
    using System;
    using System.Reactive.Subjects;

    class NumberObservable : IObservable<int>
    {
        // IObservable<T>とIObserver<T>の両方を兼ねるクラス
        private Subject<int> source = new Subject<int>();

        // 自分を監視してる人に通知を行う
        // 0を渡したらエラー通知
        public void Execute(int value)
        {
            if (value == 0)
            {
                this.source.OnError(new Exception("value is 0"));
                // エラー状態じゃないまっさらなSubjectを再作成
                this.source = new Subject<int>();
                return;
            }

            this.source.OnNext(value);
        }

        // 完了通知
        public void Completed()
        {
            this.source.OnCompleted();
        }

        // 監視してる人を追加する。
        // 戻り値のIDisposableをDisposeすると監視から外れる。
        public IDisposable Subscribe(IObserver<int> observer)
        {
            return this.source.Subscribe(observer);
        }
    }
}

これで、初期のプログラムと同じ動作を実装出来ました。ただし、Reactive Extensionsを普通に使っている範囲では、Subjectクラスは使用することは少ないです。本来は、IObservableを実装したクラスを生成するためのファクトリメソッドが用意されているので、そちらを利用することのほうが多いです。しかし、動作確認をするためには自分で値の発行などが細かく制御できるSubjectクラスを、これから先のサンプルで使用するために紹介しました。
以上で、今回は終了です。次回以降では、IObservableを生成するためのファクトリメソッドを使っていこうと思います。