かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

SubscribeOnの挙動

この記事は、Reactive Extensions再入門とは関係ありません。メモです。
ずっと挙動が個人的に謎だったSubscribeOnメソッドですが、調べてみたところ単純な機能を持ったメソッドっぽいです。
どういう時に使うのかイメージは沸きませんが、Windows PhoneのMSDNのヘルプには、こう記載されています。

スケジューラを使用して、オブザーバーを非同期的にサブスクリプションおよび
サブスクリプション取り消しします。

そう、つまりSubscribeメソッドによるIObserverの登録やIObserverの登録解除が重かった時(どういう時にそうなるのか謎いですが)に非同期で処理をおこなってくれるみたいです。

たとえば以下のようなクラスを用意します。これは意図的にSubscribe処理で2秒スリープしています。

class MySubject : ISubject<int>
{
    private Subject<int> source = new Subject<int>();

    #region IObserver<int> メンバー

    public void OnCompleted()
    {
        this.source.OnCompleted();
    }

    public void OnError(Exception error)
    {
        this.source.OnError(error);
    }

    public void OnNext(int value)
    {
        this.source.OnNext(value);
    }

    #endregion

    #region IObservable<int> メンバー

    public IDisposable Subscribe(IObserver<int> observer)
    {
        // Subscribeが奇跡的に重い場合!!
        Console.WriteLine("{0:HH:mm:ss.FFF} MySubject#Subscribe start", DateTime.Now);
        try
        {
            Console.WriteLine("Subscribeが重いよ");
            Thread.Sleep(2000);
            return this.source.Subscribe(observer);
        }
        finally
        {
            Console.WriteLine("{0:HH:mm:ss.FFF} MySubject#Subscribe end", DateTime.Now);
        }
    }

    #endregion
}

これを使って簡単なプログラムを書いてみます。

var source = new MySubject();
source
    .Subscribe(i =>
    {
        Console.WriteLine("{0:HH:mm:ss.FFF} value: {1}", DateTime.Now, i);
    });
Console.WriteLine("OnNext(1)");
source.OnNext(1);
Console.WriteLine("OnNext(2)");
source.OnNext(2);

Console.WriteLine("Main Thread.Sleep(5000)");
Thread.Sleep(5000);

Console.WriteLine("OnNext(3)");
source.OnNext(3);

これを実行すると以下のような実行結果になります。

23:12:06.474 MySubject#Subscribe start
Subscribeが重いよ
23:12:08.486 MySubject#Subscribe end
OnNext(1)
23:12:08.487 value: 1
OnNext(2)
23:12:08.487 value: 2
Main Thread.Sleep(5000)
OnNext(3)
23:12:13.487 value: 3

このプログラムはSubscribe処理で2秒ほどプログラムが固まっています。(1行目と3行目のタイムスタンプを見てください)そのあと、OnNextに処理が流れていきます。これをSubscribeOnを使うとどうなるか見ていきます。プログラムを以下のように書き換えます。

var source = new MySubject();
source
    // Sbscribeをスレッドプールで行う
    .SubscribeOn(Scheduler.ThreadPool)
    .Subscribe(i =>
    {
        Console.WriteLine("{0:HH:mm:ss.FFF} value: {1}", DateTime.Now, i);
    });
Console.WriteLine("OnNext(1)");
source.OnNext(1);
Console.WriteLine("OnNext(2)");
source.OnNext(2);

Console.WriteLine("Main Thread.Sleep(5000)");
Thread.Sleep(5000);

Console.WriteLine("OnNext(3)");
source.OnNext(3);

コメントにもあるようにSubscribeOnメソッドの呼び出しを追加しています。これでSubscribeの処理を指定したスケジューラ上で実行するように指定しています。このコードを実行すると以下のような結果になります。

OnNext(1)
OnNext(2)
Main Thread.Sleep(5000)
23:15:10.16 MySubject#Subscribe start
Subscribeが重いよ
23:15:12.168 MySubject#Subscribe end
OnNext(3)
23:15:15.158 value: 3

SubscribeがThreadPoolで実行されるようになったため、メインスレッドからの値の発行はブロックされません。そのため、Subscribeメソッドで渡したデリゲートの処理が呼び出されているのは最後のOnNext(3)の呼び出しだけになります。(OnNext(3)の呼び出し前に5秒メインスレッドをスリープしているため、2秒かかるSubscribe処理が裏で完了しているため)

以上が、今日調べてわかったことでした。ということで詳しい人の降臨を待ちます。