かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」

過去記事インデックス

はじめに

はるか昔にやったColdとHotなObservable。今回はColdからHotへ変換して遊んでみました。

ColdからHotへ変換する拡張メソッド

ここでは、ColdなIObservableのシーケンスをHotなIObservableのシーケンスに変換するメソッドについて説明します。ColdからHotに変換することで、複数回Subscribeしたときに、毎回値を発行するのか、まとめて値を発行するのかといったことを細かく制御することが可能になります。

Publishメソッド

ここでは、Publishメソッドについて説明します。Publishメソッドは、ColdなIObservableをHotなIObservableに変換するメソッドです。ColdなIObservableは、Subscribeで追加されたObserverごとに独立して値を流し込みます。しかし、このPublishメソッドを使うことで、複数のObserverに対して同時に同一の値を流すことが出来ます。Publishメソッドのシグネチャを以下に示します。

public static IConnectableObservable<T> Publish<T>(this IObservable<T> source);

Publishメソッドの戻り値のIConnectableObservableは、Connectというメソッドを持ったIObservableになります。このConnectメソッドを呼び出すまで、元になったIObservableの値を発行するのを遅延させます。このConnectメソッドのシグネチャを以下に示します。

IDisposable Connect();

Connectメソッドは、戻り値のIDisposableに対してDisposeメソッドが呼び出されるまでHotなIObservableとして振る舞います。
コード例を下記に示します。

// coldなObservableの作成
var source = Observable.Range(1, 3);

// 2回購読
Console.WriteLine("# Subscribe1");
source.Subscribe(
    i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
    () => Console.WriteLine("Subscribe1#OnCompleted"));
Console.WriteLine("# Subscribe2");
source.Subscribe(
    i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
    () => Console.WriteLine("Subscribe2#OnCompleted"));

Console.WriteLine("--");

// PublishでHotなObservableに
Console.WriteLine("# Publish");
var connectableSource = source.Publish();

// 2回購読
Console.WriteLine("# Subscribe1");
connectableSource.Subscribe(
    i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
    () => Console.WriteLine("Subscribe1#OnCompleted"));
Console.WriteLine("# Subscribe2");
connectableSource.Subscribe(
    i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
    () => Console.WriteLine("Subscribe2#OnCompleted"));

// Connectで購読しているObserverに値を流す
Console.WriteLine("# Connect");
connectableSource.Connect();

// Connect後に購読したときの動作確認
Console.WriteLine("# Subscribe3");
connectableSource.Subscribe(
    i => Console.WriteLine("Subscribe3#OnNext: {0}", i),
    () => Console.WriteLine("Subscribe3#OnCompleted"));

まず、Rangeメソッドで0〜2の値を発行するColdなIObservableを作成しています。Publishをした時としてない時とで動きを比較するために、Publishをする前に2回Subscribeをしています。その後、Publishメソッドを呼び出してHotなIObservableへ変換しています。そして、2回購読を行ってConnectメソッドで値の発行を開始しています。
上記コードの実行結果を以下に示します。

# Subscribe1
Subscribe1#OnNext: 1
Subscribe1#OnNext: 2
Subscribe1#OnNext: 3
Subscribe1#OnCompleted
# Subscribe2
Subscribe2#OnNext: 1
Subscribe2#OnNext: 2
Subscribe2#OnNext: 3
Subscribe2#OnCompleted
--
# Publish
# Subscribe1
# Subscribe2
# Connect
Subscribe1#OnNext: 1
Subscribe2#OnNext: 1
Subscribe1#OnNext: 2
Subscribe2#OnNext: 2
Subscribe1#OnNext: 3
Subscribe2#OnNext: 3
Subscribe1#OnCompleted
Subscribe2#OnCompleted
# Subscribe3
Subscribe3#OnCompleted

IConnectableObservableのConnectメソッドが呼ばれるまで、値を購読しているところまで値が流れていないことが確認できます。値の発行が終わった後に、SubscribeしたときにはIObservableは完了している状態のため、OnCompletedが実行されていることが確認できます。
Publishの動作を確認するために、もう1つコード例を見ていきます。今度はIntervalメソッドを使って500msごとに0からカウントアップしていくColdなIObservableに対してPublishメソッドを呼び出しています。購読者のいない状態でのConnectと、Connectしたものの途中から購読した場合の挙動について確認しています。また、Connectメソッドの戻り値のIDisposableを使ってConnectの解除を行っています。コードを下記に示します。

// 500ms間隔で0からカウントアップしていく値を発行するIObservableを作成
Console.WriteLine("# Create source(ConnectableObservable)");
var source = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    .Publish();

// Connectで値の放流開始
Console.WriteLine("# Connect");
using (source.Connect())
{
    // 2秒間何もせずに待つ
    Console.WriteLine("Sleep 2sec...");
    Thread.Sleep(2000);

    // 2秒間購読
    Console.WriteLine("# Subscribe1");
    using (
        source.Subscribe(
            i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
            () => Console.WriteLine("Subscribe1#OnCompleted")))
    {
        Console.WriteLine("Sleep 2sec...");
        Thread.Sleep(2000);
    }
    Console.WriteLine("# UnSubscribe1");
}
// Disposeが呼ばれるのでConnectが解除される
Console.WriteLine("# DisConnect");

// Connectが解除された状態で2秒間購読する
Console.WriteLine("# Subscribe2");
using (
    source.Subscribe(
        i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
        () => Console.WriteLine("Subscribe2#OnCompleted")))
{
    Console.WriteLine("Sleep 2sec...");
    Thread.Sleep(2000);
}
Console.WriteLine("# UnSubscribe2");

// 再接続
Console.WriteLine("# Connect");
using (source.Connect())
{
    // 再接続で発行される値を確認するため2秒間購読する
    Console.WriteLine("# Subscribe3");
    using (
        source.Subscribe(
            i => Console.WriteLine("Subscribe3#OnNext: {0}", i),
            () => Console.WriteLine("Subscribe3#OnCompleted")))
    {
        Console.WriteLine("Sleep 2sec...");
        Thread.Sleep(2000);
    }
    Console.WriteLine("# UnSubscribe3");
}
// Disposeが呼ばれるので接続解除
Console.WriteLine("# DisConnect");

// 接続解除状態で2秒間待機して様子を見る
Console.WriteLine("Sleep 2sec...");
Thread.Sleep(2000);

実行結果を以下に示します。

# Create source(ConnectableObservable)
# Connect
Sleep 2sec...
# Subscribe1
Sleep 2sec...
Subscribe1#OnNext: 4
Subscribe1#OnNext: 5
Subscribe1#OnNext: 6
# UnSubscribe1
# DisConnect
# Subscribe2
Sleep 2sec...
# UnSubscribe2
# Connect
# Subscribe3
Sleep 2sec...
Subscribe3#OnNext: 0
Subscribe3#OnNext: 1
Subscribe3#OnNext: 2
# UnSubscribe3
# DisConnect
Sleep 2sec...

Connectをして2秒経過してからの購読(Subscribe1)で値が4から始まっていることが確認できます。このことから、Connectを呼び出したタイミングでIntervalから値が発行されていることがわかります。また、Connectを解除してからの購読(Subscribe2)では値が発行されていないことも確認できます。そして最後に、再度Connectしてから購読(Subscribe3)では、値が0から始まっていることが確認できます。このことから、再Connect時には元になるColdなIObservableから再度値が発行されていることがわかります。

RefCountメソッド

ここでは、IConnectableObservableの拡張メソッドとして定義されているRefCountメソッドについて説明します。RefCountメソッドは、購読者が1つ以上いる時はConnect状態を維持して購読者が0になったらConnectを解除するIObservableを返すメソッドです。このメソッドを使うことでIConnectableObservableのConnectメソッドと戻り値のIDisposableの管理を省略することが出来ます。RefCountメソッドのシグネチャを以下に示します。

public static IObservable<T> RefCount<T>(this IConnectableObservable<T> source);

このメソッドの使用例を下記に示します。

// 500ms間隔で0から値をカウントアップしていくIObservableを作成
Console.WriteLine("# Create source(RefCount)");
var source = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    // Hot
    .Publish()
    // 購読者がいる間Connect状態を保つ
    .RefCount();

// 2秒待機して様子身
Console.WriteLine("Sleep 2sec...");
Thread.Sleep(2000);

// 購読開始
Console.WriteLine("# Subscribe1");
using (
    source.Subscribe(
        i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
        () => Console.WriteLine("Subscribe1#OnCompleted")))
{
    // 購読状態で2秒待機
    Console.WriteLine("Sleep 2sec...");
    Thread.Sleep(2000);

    // 追加で1秒間購読
    Console.WriteLine("# Subscribe2");
    using (source.Subscribe(
            i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
            () => Console.WriteLine("Subscribe2#OnCompleted")))
    {
        Console.WriteLine("Sleep 1sec...");
        Thread.Sleep(1000);
    }
    // 1つ購読解除
    Console.WriteLine("# UnSubscribe2");

    // 1つ購読解除した状態で2秒待機
    Console.WriteLine("Sleep 2sec...");
    Thread.Sleep(2000);
}
// 購読解除(ここで購読者数が0になるのでConnectが解除される)
Console.WriteLine("# UnSubscribe1");

// 2秒待機
Console.WriteLine("Sleep 2sec...");
Thread.Sleep(2000);

// 新たに購読開始(Connect状態になる)
Console.WriteLine("# Subscribe3");
using (source.Subscribe(
        i => Console.WriteLine("Subscribe3#OnNext: {0}", i),
        () => Console.WriteLine("Subscribe3#OnCompleted")))
{
    // 2秒待機
    Console.WriteLine("Sleep 2sec...");
    Thread.Sleep(2000);
}
// 購読解除
Console.WriteLine("# UnSubscribe3");

usingブロックを使って購読と購読解除を複数回行っています。最初の購読(Subscribe1)の後2秒待機して、次の購読(Subscribe2)をしています。Subscribe2は1秒後に購読を解除して、その2秒後にSubscribe1も購読を解除しています。そして、2秒待機したあと再び購読(Subscribe3)をしています。実行結果を以下に示します。

# Create source(RefCount)
Sleep 2sec...
# Subscribe1
Sleep 2sec...
Subscribe1#OnNext: 0
Subscribe1#OnNext: 1
Subscribe1#OnNext: 2
# Subscribe2
Sleep 1sec...
Subscribe1#OnNext: 3
Subscribe2#OnNext: 3
Subscribe1#OnNext: 4
Subscribe2#OnNext: 4
# UnSubscribe2
Sleep 2sec...
Subscribe1#OnNext: 5
Subscribe1#OnNext: 6
Subscribe1#OnNext: 7
Subscribe1#OnNext: 8
# UnSubscribe1
Sleep 2sec...
# Subscribe3
Sleep 2sec...
Subscribe3#OnNext: 0
Subscribe3#OnNext: 1
Subscribe3#OnNext: 2
# UnSubscribe3

Subscribe1で値を出力している途中でSubscribe2も購読を開始しています。このときの出力を見ると同じ値が同時に複数の購読者に対して発行されているのでHotなIObservableになっていることが確認できます。そして、Subscribe1とSubscribe2の購読を解除したあと、再度購読(Subscribe3)をすると、値が0からはじまっていることが確認できます。このことから、購読者数が0になったタイミングで、Connectが解除されていることがわかります。