かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions入門 11「非同期処理用のSubject」

その名もAsyncSubjectです。非同期処理を自前で実装してIObservableを返すようなメソッドを作るときに使えます。
こいつは、OnCompleted()メソッドメソッドを呼び出すと、最後にOnNextした値を通知するようになります。特徴としては、OnCompleted()を呼ぶ前にSubscribeしたもの以外にも、OnCompleted()を呼んだ後にSubscribeした奴に対しても値の通知をしてくれます。


簡単に動きのわかるプログラムをさくっと書いてみました。

using System;
using System.Collections.Generic;

class Program
{
    static void Main(string[] args)
    {
        var s = new AsyncSubject<int>();
        // 複数OnNextしても最後のだけが通知される(この場合20になる)
        s.OnNext(10);
        s.OnNext(20);
        // OnNextしただけじゃSubscribeしても何も起きない
        s.Subscribe(i => Console.WriteLine("最初の : " + i));

        // OnCompletedして初めてSubscribeしたところに通知がいく
        Console.WriteLine("OnCompleted前");
        s.OnCompleted();
        Console.WriteLine("OnCompleted後");

        // いったん通知がとんだ後でも、Subscribeすれば通知がくる
        s.Subscribe(i => Console.WriteLine("後からでもOK : " + i));

        Console.ReadKey();
    }
}

実行結果は以下のようになります。

OnCompleted前
最初の : 20
OnCompleted後
後からでもOK : 20

こいつの使いどころは、以下のような時間のかかる処理を行うメソッドを自作するときだと思います。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static void Main(string[] args)
    {
        SayHello().Subscribe(Console.WriteLine);
        Console.ReadKey();
    }

    static IObservable<string> SayHello()
    {
        var s = new AsyncSubject<string>();
        new Task(() =>
            {
                // 時間のかかる処理
                Thread.Sleep(1000);
                s.OnNext("Hello world");
                s.OnCompleted();
            }).Start();
        // AsyncSubjectをIObservableにして返す
        return s.AsObservable();
    }
}

まぁでも、正直これを実装するならObservable.Startが素直でいいです。