かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その39「Subject系クラス」

過去記事インデックス

はじめに

Reactive Extensionsには、これまでのサンプル内で使ってきたように様々なSubject系のクラスがあります。Reactive Extensions再入門 その14「Nextメソッド」でBehaviorSubjectクラスについて説明したので、今回はその他のSubjectクラス、AsyncSubjectクラス、ReplaySubjectクラスについて説明します。

Subjectクラス

SubjectクラスはIObservableとIObserverの両方のインターフェースを実装するクラスです。これまでのサンプルプログラム内で使ってきた通り、OnNext, OnError, OnCompletedの各種メソッドを呼ぶことで、値の発行、エラーの発行、完了の通知を行います。各種あるSubject系のクラスの中でも一番素直な挙動を行うクラスです。コード例下記に示します。

var source = new Subject<int>();
// 購読1
Console.WriteLine("# Subscribe1");
source.Subscribe(
    i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
    ex => Console.WriteLine("Subscribe1#OnError: {0}", ex),
    () => Console.WriteLine("Subscribe1#OnCompleted"));

// 購読2
Console.WriteLine("# Subscribe2");
source.Subscribe(
    i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
    ex => Console.WriteLine("Subscribe2#OnError: {0}", ex),
    () => Console.WriteLine("Subscribe2#OnCompleted"));

// 値の発行〜完了
Console.WriteLine("OnNext(1)");
source.OnNext(1);
Console.WriteLine("OnNext(2)");
source.OnNext(2);
Console.WriteLine("OnNext(3)");
source.OnNext(3);
Console.WriteLine("OnCompleted()");
source.OnCompleted();

Subjectに対して2回Subscribeをした状態で値の発行から完了を行っています。実行結果を以下に示します。

# Subscribe1
# Subscribe2
OnNext(1)
Subscribe1#OnNext: 1
Subscribe2#OnNext: 1
OnNext(2)
Subscribe1#OnNext: 2
Subscribe2#OnNext: 2
OnNext(3)
Subscribe1#OnNext: 3
Subscribe2#OnNext: 3
OnCompleted()
Subscribe1#OnCompleted
Subscribe2#OnCompleted

Subjectから値が発行される度に2回OnNextに値が渡っていることが確認できます。Subjectクラスではエラーを通知するために、OnErrorで例外を発行することも出来ます。コード例を下記に示します。

var source = new Subject<int>();

// 購読
Console.WriteLine("# Subscribe");
source.Subscribe(
    i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
    ex => Console.WriteLine("Subscribe1#OnError: {0}", ex),
    () => Console.WriteLine("Subscribe1#OnCompleted"));

// 例外
source.OnError(new Exception("Error!!"));

実行結果を以下に示します。

# Subscribe
Subscribe1#OnError: System.Exception: Error!!

AsyncSubjectクラス

ここでは、AsyncSubjectクラスについて説明します。AsyncSubjectクラスは名前の通り非同期処理をラップする際によく使用します。このクラスの特徴は、OnNextを何回発行してもOnCompletedを行うまで値が発行されないという点です。そしてOnCompletedが呼ばれると、最後のOnNextの値とOnCompletedをObserverに通知します。また、OnCompletedが呼ばれた後に購読された場合にも、最後のOnNextの値とOnCompletedをObserverに通知します。コード例を下記に示します。

var source = new AsyncSubject<int>();

// 購読
Console.WriteLine("# Subscribe1");
source.Subscribe(
    i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
    ex => Console.WriteLine("Subscribe1#OnError: {0}", ex),
    () => Console.WriteLine("Subscribe1#OnCompleted"));

// 値の発行〜完了
Console.WriteLine("OnNext(1)");
source.OnNext(1);
Console.WriteLine("OnNext(2)");
source.OnNext(2);
Console.WriteLine("OnNext(3)");
source.OnNext(3);
Console.WriteLine("OnCompleted()");
source.OnCompleted();

// OnCompleted後の購読
Console.WriteLine("# Subscribe2");
source.Subscribe(
    i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
    ex => Console.WriteLine("Subscribe2#OnError: {0}", ex),
    () => Console.WriteLine("Subscribe2#OnCompleted"));

実行結果を以下に示します。

# Subscribe1
OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
Subscribe1#OnNext: 3
Subscribe1#OnCompleted
# Subscribe2
Subscribe2#OnNext: 3
Subscribe2#OnCompleted

OnCompletedメソッドが呼ばれたタイミングでの値の発行と、OnCompletedメソッドが呼ばれた後のSubscribeでも値が発行されていることが確認できます。

ReplaySubjectクラス

ここでは、ReplaySubjectクラスについて説明します。ReplaySubjectクラスは、発行した全ての値をキャッシュしてSubscribeされたタイミングでキャッシュしている値を全て発行するという特徴があります。それ以外は、通常のSubjectクラスと同じ動きをします。コード例を下記に示します。

var source = new ReplaySubject<int>();

// 購読
Console.WriteLine("# Subscribe1");
source.Subscribe(
    i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
    ex => Console.WriteLine("Subscribe1#OnError: {0}", ex),
    () => Console.WriteLine("Subscribe1#OnCompleted"));

// 値の発行〜完了
Console.WriteLine("OnNext(1)");
source.OnNext(1);
Console.WriteLine("OnNext(2)");
source.OnNext(2);
Console.WriteLine("OnNext(3)");
source.OnNext(3);
Console.WriteLine("OnCompleted()");
source.OnCompleted();

// OnCompleted後の購読
Console.WriteLine("# Subscribe2");
source.Subscribe(
    i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
    ex => Console.WriteLine("Subscribe2#OnError: {0}", ex),
    () => Console.WriteLine("Subscribe2#OnCompleted"));

実行結果を以下に示します。

# Subscribe1
OnNext(1)
Subscribe1#OnNext: 1
OnNext(2)
Subscribe1#OnNext: 2
OnNext(3)
Subscribe1#OnNext: 3
OnCompleted()
Subscribe1#OnCompleted
# Subscribe2
Subscribe2#OnNext: 1
Subscribe2#OnNext: 2
Subscribe2#OnNext: 3
Subscribe2#OnCompleted

最初の購読から値の発行、完了通知の流れではSubjectクラスと同様の動きをしています。完了後の購読時に、今まで発行した値と完了通知が名前の通りリプレイされていることが確認できます。