かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

SubscribeOnの挙動

この記事は、Reactive Extensions再入門とは関係ありません。メモです。
ずっと挙動が個人的に謎だったSubscribeOnメソッドですが、調べてみたところ単純な機能を持ったメソッドっぽいです。
どういう時に使うのかイメージは沸きませんが、Windows PhoneのMSDNのヘルプには、こう記載されています。

スケジューラを使用して、オブザーバーを非同期的にサブスクリプションおよび
サブスクリプション取り消しします。

そう、つまりSubscribeメソッドによるIObserverの登録やIObserverの登録解除が重かった時(どういう時にそうなるのか謎いですが)に非同期で処理をおこなってくれるみたいです。

たとえば以下のようなクラスを用意します。これは意図的にSubscribe処理で2秒スリープしています。

class MySubject : ISubject<int>
{
    private Subject<int> source = new Subject<int>();

    #region IObserver<int> メンバー

    public void OnCompleted()
    {
        this.source.OnCompleted();
    }

    public void OnError(Exception error)
    {
        this.source.OnError(error);
    }

    public void OnNext(int value)
    {
        this.source.OnNext(value);
    }

    #endregion

    #region IObservable<int> メンバー

    public IDisposable Subscribe(IObserver<int> observer)
    {
        // Subscribeが奇跡的に重い場合!!
        Console.WriteLine("{0:HH:mm:ss.FFF} MySubject#Subscribe start", DateTime.Now);
        try
        {
            Console.WriteLine("Subscribeが重いよ");
            Thread.Sleep(2000);
            return this.source.Subscribe(observer);
        }
        finally
        {
            Console.WriteLine("{0:HH:mm:ss.FFF} MySubject#Subscribe end", DateTime.Now);
        }
    }

    #endregion
}

これを使って簡単なプログラムを書いてみます。

var source = new MySubject();
source
    .Subscribe(i =>
    {
        Console.WriteLine("{0:HH:mm:ss.FFF} value: {1}", DateTime.Now, i);
    });
Console.WriteLine("OnNext(1)");
source.OnNext(1);
Console.WriteLine("OnNext(2)");
source.OnNext(2);

Console.WriteLine("Main Thread.Sleep(5000)");
Thread.Sleep(5000);

Console.WriteLine("OnNext(3)");
source.OnNext(3);

これを実行すると以下のような実行結果になります。

23:12:06.474 MySubject#Subscribe start
Subscribeが重いよ
23:12:08.486 MySubject#Subscribe end
OnNext(1)
23:12:08.487 value: 1
OnNext(2)
23:12:08.487 value: 2
Main Thread.Sleep(5000)
OnNext(3)
23:12:13.487 value: 3

このプログラムはSubscribe処理で2秒ほどプログラムが固まっています。(1行目と3行目のタイムスタンプを見てください)そのあと、OnNextに処理が流れていきます。これをSubscribeOnを使うとどうなるか見ていきます。プログラムを以下のように書き換えます。

var source = new MySubject();
source
    // Sbscribeをスレッドプールで行う
    .SubscribeOn(Scheduler.ThreadPool)
    .Subscribe(i =>
    {
        Console.WriteLine("{0:HH:mm:ss.FFF} value: {1}", DateTime.Now, i);
    });
Console.WriteLine("OnNext(1)");
source.OnNext(1);
Console.WriteLine("OnNext(2)");
source.OnNext(2);

Console.WriteLine("Main Thread.Sleep(5000)");
Thread.Sleep(5000);

Console.WriteLine("OnNext(3)");
source.OnNext(3);

コメントにもあるようにSubscribeOnメソッドの呼び出しを追加しています。これでSubscribeの処理を指定したスケジューラ上で実行するように指定しています。このコードを実行すると以下のような結果になります。

OnNext(1)
OnNext(2)
Main Thread.Sleep(5000)
23:15:10.16 MySubject#Subscribe start
Subscribeが重いよ
23:15:12.168 MySubject#Subscribe end
OnNext(3)
23:15:15.158 value: 3

SubscribeがThreadPoolで実行されるようになったため、メインスレッドからの値の発行はブロックされません。そのため、Subscribeメソッドで渡したデリゲートの処理が呼び出されているのは最後のOnNext(3)の呼び出しだけになります。(OnNext(3)の呼び出し前に5秒メインスレッドをスリープしているため、2秒かかるSubscribe処理が裏で完了しているため)

以上が、今日調べてわかったことでした。ということで詳しい人の降臨を待ちます。