かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive Extensions入門 11「非同期処理用のSubject」

その名もAsyncSubjectです。非同期処理を自前で実装してIObservableを返すようなメソッドを作るときに使えます。
こいつは、OnCompleted()メソッドメソッドを呼び出すと、最後にOnNextした値を通知するようになります。特徴としては、OnCompleted()を呼ぶ前にSubscribeしたもの以外にも、OnCompleted()を呼んだ後にSubscribeした奴に対しても値の通知をしてくれます。


簡単に動きのわかるプログラムをさくっと書いてみました。

using System;
using System.Collections.Generic;

class Program
{
    static void Main(string[] args)
    {
        var s = new AsyncSubject<int>();
        // 複数OnNextしても最後のだけが通知される(この場合20になる)
        s.OnNext(10);
        s.OnNext(20);
        // OnNextしただけじゃSubscribeしても何も起きない
        s.Subscribe(i => Console.WriteLine("最初の : " + i));

        // OnCompletedして初めてSubscribeしたところに通知がいく
        Console.WriteLine("OnCompleted前");
        s.OnCompleted();
        Console.WriteLine("OnCompleted後");

        // いったん通知がとんだ後でも、Subscribeすれば通知がくる
        s.Subscribe(i => Console.WriteLine("後からでもOK : " + i));

        Console.ReadKey();
    }
}

実行結果は以下のようになります。

OnCompleted前
最初の : 20
OnCompleted後
後からでもOK : 20

こいつの使いどころは、以下のような時間のかかる処理を行うメソッドを自作するときだと思います。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        SayHello().Subscribe(Console.WriteLine);
        Console.ReadKey();
    }

    static IObservable<string> SayHello()
    {
        var s = new AsyncSubject<string>();
        new Task(() =>
            {
                // 時間のかかる処理
                Thread.Sleep(1000);
                s.OnNext("Hello world");
                s.OnCompleted();
            }).Start();
        // AsyncSubjectをIObservableにして返す
        return s.AsObservable();
    }
}

まぁでも、正直これを実装するならObservable.Startが素直でいいです。