かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」

過去記事インデックス

はじめに

えっと、Publish系のColdからHotへ変換するメソッドで1つ忘れてました…。Replayメソッド。こいつだけ名前違うんだもんっ!!

Replayメソッド

ここではReplayメソッドについて説明します。Replayメソッドは、購読した対象に今まで発行した値を全て流すという特徴があります。このため、Connect後暫く放置してSubscribeをすると一気に値が流れてきます。このメソッドのシグネチャを以下に示します。

public static IConnectableObservable<T> Replay<T>(this IObservable<T> source);

このメソッドの使用例を下記に示します。

// 500ms間隔で0から値をカウントアップしていく
var source = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    // ReplayでHotに変換
    .Replay();

// 購読
Console.WriteLine("# Subscribe1");
using (
    source.Subscribe(
        i => Console.WriteLine("Subscribe1#OnNext: {0}", i)))
{
    // 接続
    Console.WriteLine("# Connect");
    source.Connect();

    // 2秒待機
    Console.WriteLine("Sleep 2sec...");
    Thread.Sleep(2000);
                    
    // 購読2
    Console.WriteLine("# Subscribe2");
    using (
        source.Subscribe(
            i => Console.WriteLine("Subscribe1#OnNext: {0}", i)))
    {
        // 1秒待機
        Console.WriteLine("Sleep 1sec...");
        Thread.Sleep(1000);
    }
    // 購読2解除
    Console.WriteLine("# UnSubscribe2");

    // 2秒待機
    Console.WriteLine("Sleep 2sec...");
    Thread.Sleep(2000);
}
// 購読解除
Console.WriteLine("# UnSubscribe1");

Replayメソッドで作成したIConnectableObservableに対して、2回Subscribeを行っています。実行結果を以下に示します。

# Subscribe1
# Connect
Sleep 2sec...
Subscribe1#OnNext: 0
Subscribe1#OnNext: 1
Subscribe1#OnNext: 2
Subscribe1#OnNext: 3
# Subscribe2
Subscribe1#OnNext: 0
Subscribe1#OnNext: 1
Subscribe1#OnNext: 2
Subscribe1#OnNext: 3
Sleep 1sec...
Subscribe1#OnNext: 4
Subscribe1#OnNext: 4
Subscribe1#OnNext: 5
Subscribe1#OnNext: 5
# UnSubscribe2
Sleep 2sec...
Subscribe1#OnNext: 6
Subscribe1#OnNext: 7
Subscribe1#OnNext: 8
Subscribe1#OnNext: 9
# UnSubscribe1

2回目のSubscribe(# Subscribe2が表示されている箇所)でどのようにOnNextに値が渡っているかに注目をしてください。Subscribeと同時に一気に0〜3の値が発行されていることが確認できます。このことから、Replayメソッドの挙動である、発行済みの値を購読した時点で一気に流すということが確認できます。

MulticastメソッドでのReplayメソッドの再現

ほかのPublish系メソッドと同様にReplayメソッドもMulticastメソッドで再現することができます。ReplaySubjectクラスをMulticastメソッドに渡すことで同じ挙動になります。ReplaySubjectを渡したコード例を下記に示します。

Console.WriteLine("# Multicast(new ReplaySubject<long>())");
var source = Observable
    .Interval(TimeSpan.FromMilliseconds(500))
    // Replayと一緒
    .Multicast(new ReplaySubject<long>());

// 接続
Console.WriteLine("# Connect");
source.Connect();

// 2秒待機
Console.WriteLine("Sleep 2sec...");
Thread.Sleep(2000);

// 購読
Console.WriteLine("# Subscribe");
source.Subscribe(
    i => Console.WriteLine("OnNext: {0}", i),
    () => Console.WriteLine("OnCompleted"));

ReplaySubjectクラスは、今まで発行した値を全てキャッシュしていて購読時に一気に発行しなおす挙動をします。上記コードではConnect後に2秒待機してIntervalメソッドから0〜3の4つの値が発行された状態でSubscribeしています。実行結果を以下に示します。

# Multicast(new ReplaySubject<long>())
# Connect
Sleep 2sec...
# Subscribe
OnNext: 0
OnNext: 1
OnNext: 2
OnNext: 3

Subscribeをした時点で、これまで発行された値が一気にOnNextに渡っていることが確認できます。