かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」

過去記事インデックス

はじめに

前の記事でPublishの基本動作は詳しくやったので残りはさ〜っと流していきます!

引数を受け取るPublishメソッドのオーバーロード

ここでは、引数を受け取るPublishメソッドのオーバーロードについて説明します。このオーバーロードは、Subscribeしたタイミングで引数で渡した値を流します。その後、の動作は引数無しのPublishメソッドと同様にConnectメソッドを呼び出したタイミングで値を流します。このメソッドのシグネチャを以下に示します。

public static IConnectableObservable<T> Publish<T>(
    this IObservable<T> source, 
    TSource initialValue);

このメソッドの使用例を下記に示します。

// Publishのintの引数を受け取るオーバーロード
var source = Observable
    // 1〜3の値を発行するColdなObservable
    .Range(1, 3)
    // initialValueに100を指定してPublishを呼ぶ
    .Publish(100);

// 購読
Console.WriteLine("# Subscribe");
source.Subscribe(
    i => Console.WriteLine("OnNext: {0}", i),
    () => Console.WriteLine("OnCompleted"));

// 接続
Console.WriteLine("# Connect");
source.Connect();

Publishメソッドの引数に100を渡してIConnectableObservableを作成しています。その後、購読してConnectメソッドを呼び出しています。SubscribeのOnNextの呼び出されるタイミングに注目して実行結果を確認してください。このコードの実行結果を以下に示します。

# Subscribe
OnNext: 100
# Connect
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted

Connectメソッドを呼び出す前にOnNext: 100が表示されていることが確認できます。このように、Publishメソッドの引数で渡した値がSubscribe直後に発行されていることがわかります。

PublishLastメソッド

ここでは、PublishLastメソッドについて説明します。PublishLastメソッドは最後に発行された値をConnect後に流します。このメソッドのシグネチャを以下に示します。

public static IConnectableObservable<T> PublishLast<T>(this IObservable<T> source);

このメソッドの使用例を下記に示します。

// PublishLast
var source = Observable
    // 1〜3の値を発行するColdなObservable
    .Range(1, 3)
    // PublishLastでHot化
    .PublishLast();

// 購読
Console.WriteLine("# Subscribe1");
source.Subscribe(
    i => Console.WriteLine("Subscribe1#OnNext: {0}", i),
    () => Console.WriteLine("Subscribe1#OnCompleted"));

// 接続
Console.WriteLine("# Connect");
source.Connect();

// 接続後にもう一度購読
Console.WriteLine("# Subscribe2");
source.Subscribe(
    i => Console.WriteLine("Subscribe2#OnNext: {0}", i),
    () => Console.WriteLine("Subscribe2#OnCompleted"));

Connect前に1回購読(Subscribe1)をして、Connect後にもう一度購読(Subscribe2)しています。このコードの実行結果を以下に示します。

# Subscribe1
# Connect
Subscribe1#OnNext: 3
Subscribe1#OnCompleted
# Subscribe2
Subscribe2#OnNext: 3
Subscribe2#OnCompleted

OnNextに、元になるIObservableから発行される1〜3のうち最後の3が表示されていることが確認できます。このことから、最後(OnCompleted直前)に発行された値のみ後続に流していることがわかります。また、このメソッドはConnect後に購読しても最後の状態をキャッシュしているため、Connect前に購読したものと同じ結果が表示されています。

Multicastメソッド

ここでは、Multicastメソッドについて説明します。Multicastメソッドは引数に応じて、Publishメソッド、引数のあるPublishメソッド、PublishLastメソッドと同じ動きをするIConnectableObservableを返すメソッドになります。このメソッドのシグネチャを以下に示します。

public static IConnectableObservable<T> Multicast<T, U>(
    this IObservable<T> source, 
    ISubject<T, U> subject);

引数のsubjectに応じてPublish系の各メソッドと同じ動作をさせることが出来ます。ISubjectインターフェースはSubjectクラス等のSubject系クラスが実装しているインターフェースでSubjectクラスはISubjectを実装しています。そのため、基本的に型引数のTとUは同じケースが現実問題としては多いので下記のようなメソッドの定義だと思っていて差支え無いと思います。

public static IConnectableObservable<T> Multicast<T >(
    this IObservable<T> source, 
    ISubject<T > subject);

このメソッドの使用例を下記に示します。

Console.WriteLine("# Multicast(new Subject<int>())");
var source = Observable
    .Range(1, 3)
    // Publishと一緒
    .Multicast(new Subject<int>());

// 購読
Console.WriteLine("# Subscribe");
source.Subscribe(
    i => Console.WriteLine("OnNext: {0}", i),
    () => Console.WriteLine("OnCompleted"));

// 接続
Console.WriteLine("# Connect");
source.Connect();

上記コードではMulticastメソッドにSubjectクラスを渡しています。コメント内にもある通り、これは引数の無いPublishメソッドと同じ動きになります。実行結果を以下に示します。

# Multicast(new Subject<int>())
# Subscribe
# Connect
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted

Publishメソッドと同様にConnect後に、値が発行されていることが確認できます。
次に、BehaviorSubjectを渡したコード例を下記に示します。

Console.WriteLine("# Multicast(new BehaviorSubject<int>(100))");
var source = Observable
    .Range(1, 3)
    // Publish(100)と一緒
    .Multicast(new BehaviorSubject<int>(100));

// 購読
Console.WriteLine("# Subscribe");
source.Subscribe(
    i => Console.WriteLine("OnNext: {0}", i),
    () => Console.WriteLine("OnCompleted"));

// 接続
Console.WriteLine("# Connect");
source.Connect();

BehaviorSubjectは、「5. Subject系クラス」で説明するクラスで、初期値を持つSubjectクラスになります。このコードの実行結果を以下に示します。

# Multicast(new BehaviorSubject<int>(100))
# Subscribe
OnNext: 100
# Connect
OnNext: 1
OnNext: 2
OnNext: 3
OnCompleted

引数のあるPublishメソッドと同様にSubscribe直後に値が発行されて、Connect後に元になるIObservableのシーケンスの値が発行されていることが確認できます。
次に、AsyncSubjectを渡したコード例を下記に示します。

Console.WriteLine("# Multicast(new AsyncSubject<int>())");
var source = Observable
    .Range(1, 3)
    // PublishLastと一緒
    .Multicast(new AsyncSubject<int>());

// 購読
Console.WriteLine("# Subscribe");
source.Subscribe(
    i => Console.WriteLine("OnNext: {0}", i),
    () => Console.WriteLine("OnCompleted"));

// 接続
Console.WriteLine("# Connect");
source.Connect();

AsyncSubjectクラスは、OnCompletedが発行される前の最後の値をキャッシュして購読者に流すSubjectです。このコードの実行結果を以下に示します。

# Multicast(new AsyncSubject<int>())
# Subscribe
# Connect
OnNext: 3
OnCompleted

上記の実行結果からもわかるとおり、最後の値だけが発行されています。このことから、AsyncSubjectクラスを渡した場合はPublishLastと同様の動きを行います。
このMulticastメソッドは、任意のSubjectを渡すことによってColdなIObservableからHotなIObservableへの変換を行います。その際の挙動をSubjectクラスを差し替えることで様々な形にカスタマイズできます。