かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」

Observable.FromEventメソッド

では、HotなIObservableを作成するメソッドのトップバッターとして前回、登場したFromEventメソッドを紹介します。FromEventメソッドのシグネチャは既に示したので、より詳しく動作を確認するためのコードを下記に示します。

// イベントを発行するクラス
var eventSource = new EventSource();
var source = Observable.FromEvent<EventHandler, EventArgs>(
    h => (s, e) => h(e),
    // 普通は h => eventSource.Raised += h だけでいい
    h => 
    {
        Console.WriteLine("add handler");
        eventSource.Raised += h;
    },
    // 普通は h => eventSource.Raised -= h だけでいい
    h => 
    {
        Console.WriteLine("remove handler");
        eventSource.Raised -= h;
    });

// 2回購読
var subscription1 = source.Subscribe(
    i => Console.WriteLine("1##OnNext({0})", i),
    ex => Console.WriteLine("1##OnError({0})", ex.Message),
    () => Console.WriteLine("1##Completed()"));
var subscription2 = source.Subscribe(
    i => Console.WriteLine("2##OnNext({0})", i),
    ex => Console.WriteLine("2##OnError({0})", ex.Message),
    () => Console.WriteLine("2##Completed()"));

// 2回呼び出してみる
// 合計4回のOnNextが呼ばれるはず
eventSource.OnRaised();
eventSource.OnRaised();

// Observableが発行する値の購読を停止
Console.WriteLine("dispose method call.");
subscription1.Dispose();
subscription2.Dispose();

この例で使用しているEventSourceクラスは、RaisedというイベントとOnRaisedというイベントを発行するメソッドだけを持ったクラスで下記のように定義しています。

// イベント発行クラス
class EventSource
{
    public event EventHandler Raised;
    public void OnRaised()
    {
        var h = this.Raised;
        if (h != null)
        {
            h(this, EventArgs.Empty);
        }
    }
}

このコードの実行結果を下記に示します。

add handler
add handler
1##OnNext(System.EventArgs)
2##OnNext(System.EventArgs)
1##OnNext(System.EventArgs)
2##OnNext(System.EventArgs)
dispose method call.
remove handler
remove handler

FromEventメソッドのイベントハンドラ登録処理とイベントハンドラの登録解除処理にログを出力するように仕込んだものが表示されています。このことから、Disposeを呼ぶときちんとイベントハンドラの登録解除が行われることがわかります。また、イベントが発行されたタイミングで2つ登録したObserverの両方に対して通知がいっていることも確認できます。

Observable.Startメソッド

次に、簡単にバックグラウンドの処理を記述できるStartメソッドについて説明します。StartメソッドはActionかFuncを引数に受け取りIObservableかIObservableを返します。引数で受け取ったデリゲートの処理が終わるとIObservableから結果が発行されます。コード例を下記に示します。

// バックグラウンドで処理を開始
var source = Observable.Start(() =>
{
    Console.WriteLine("background task start.");
    Thread.Sleep(2000);
    Console.WriteLine("background task end.");
    return 1;
});
// 購読
Console.WriteLine("subscribe1");
var subscription1 = source.Subscribe(
    i => Console.WriteLine("1##OnNext({0})", i),
    ex => Console.WriteLine("1##OnError({0})", ex.Message),
    () => Console.WriteLine("1##Completed()"));

// 処理が確実に終わるように5秒待つ
Console.WriteLine("sleep 5sec.");
Thread.Sleep(5000);

// Observableが発行する値の購読を停止
Console.WriteLine("dispose method call.");
subscription1.Dispose();

// 購読
Console.WriteLine("subscribe2");
var subscription2 = source.Subscribe(
    i => Console.WriteLine("2##OnNext({0})", i),
    ex => Console.WriteLine("2##OnError({0})", ex.Message),
    () => Console.WriteLine("2##Completed()"));
subscription2.Dispose();

このサンプルで特徴的なのが、Startメソッド内の処理が2秒で終わるにも関わらず5秒スリープした後にSubscribeをしている点です。通常の感覚では、既に処理が完了して値が発行された後なのでSubscribeしても何も起きないと考えられます。しかし、Startメソッドの戻り値のIObservableは最後の処理結果をキャッシュしています。そのため、Subscribeされるとキャッシュしている値とOnCompletedを発行します。
この例の実行結果では、最初のSubscribeと二回目のSubscribeそれぞれで、OnNextとOnCompletedの処理が呼ばれます。実行結果を下記に示します。

subscribe1
sleep 5sec.                     ← ここで5秒スリープしている
background task start.
background task end.
1##OnNext(1)
1##Completed()
dispose method call.
subscribe2                     ← このタイミングではStartメソッドの処理は終了している
2##OnNext(1)                  ← OnNextとOnCompletedが通知される
2##Completed()

Observable.ToAsyncメソッド

次は、ToAsyncメソッドを紹介します。このメソッドもStartメソッドと同様に重たい処理をバックグラウンドでやるために使用できます。Startメソッドとの違いはToAsyncの戻り値にあらわれています。シグネチャを下記に示します。
public static Func> ToAsync(Func function)
引数に重たいことをやる処理を渡して、戻り値がIObservableを返すデリゲートになっています。この戻り値のデリゲートを呼び出すことでToAsyncの引数に渡した処理がはじめて実行されます。Startは、Startメソッドを呼び出した直後から処理が開始されましたが、ToAsyncを使うと処理の開始のタイミングを柔軟に制御できます。
ここで示したメソッドのシグネチャは数十個あるオーバーロードの1つになります。ToAsyncにはほかにも戻り値が無いケースや引数が大量にあるケースに備えて膨大な数のオーバーロードがあります。完全なオーバーロードのリストについては下記のMSDNのリファレンスを参照してください。
Observable.ToAsync Method
コード例を下記に示します。

// 戻り値はFunc<IObservable<T>>
var source = Observable.ToAsync(() =>
{
    Console.WriteLine("background task start.");
    Thread.Sleep(2000);
    Console.WriteLine("background task end.");
    return 1;
});

// ToAsyncはデリゲートを返すのでInvoke() or ()をしないと処理が開始されない
Console.WriteLine("source() call.");
var invokedSource = source.Invoke();
var subscription1 = invokedSource.Subscribe(
    i => Console.WriteLine("1##OnNext({0})", i),
    ex => Console.WriteLine("1##OnError({0})", ex.Message),
    () => Console.WriteLine("1##Completed()"));
// 処理が確実に終わるように5秒待つ
Console.WriteLine("sleep 5sec.");
Thread.Sleep(5000);

// Observableが発行する値の購読を停止
Console.WriteLine("dispose method call.");
subscription1.Dispose();

// 購読
Console.WriteLine("subscribe2");
var subscription2 = invokedSource.Subscribe(
    i => Console.WriteLine("2##OnNext({0})", i),
    ex => Console.WriteLine("2##OnError({0})", ex.Message),
    () => Console.WriteLine("2##Completed()"));
subscription2.Dispose();

ポイントは、ToAsyncの戻り値に対してInvokeをしているところです。ここで初めてToAsyncに渡した処理の実行がはじまります。実行結果を下記に示します。

source() call.
background task start.
sleep 5sec.
background task end.
1##OnNext(1)
1##Completed()
dispose method call.
subscribe2
2##OnNext(1)
2##Completed()

ここでもStartメソッドの例と同じようにToAsyncの処理が終わった後にSubscribeしているにも関わらずOnNextとOnCompletedが呼ばれていることがわかります。非同期で処理を行うIObservableは全体的にこのような動きを行うので覚えておきましょう。

Observable.FromAsyncPatternメソッド

これで、一連のファクトリメソッドの紹介は最後になります。最後を飾るのはFromAsyncPatternメソッドです。このメソッドは名前が示す通り.NET Frameworkで使われている非同期呼び出しのパターンからIObservableを作成します。

// 重たい処理
Func<int, int, int> asyncProcess = (x, y) =>
{
    Console.WriteLine("process start.");
    Thread.Sleep(2000);
    Console.WriteLine("process end.");
    return x + y;
};

// 因みに非同期呼び出しは普通に書くとこんな感じ
// asyncProcess.BeginInvoke(
//     10, 2,
//     ar =>
//     {
//         var ret = asyncProcess.EndInvoke(ar);
//         // do something
//     },
//     null);
var asyncPattern = Observable.FromAsyncPattern<int, int, int>(
    asyncProcess.BeginInvoke,
    asyncProcess.EndInvoke);

var source = asyncPattern(10, 2);

// 処理中に購読開始
Console.WriteLine("subscribe2");
var subscription1 = source.Subscribe(
    i => Console.WriteLine("1##OnNext({0})", i),
    ex => Console.WriteLine("1##OnError({0})", ex.Message),
    () => Console.WriteLine("1##Completed()"));

// 確実に処理が終わるように5秒待つ
Console.WriteLine("sleep 5sec");
Thread.Sleep(5000);
// Observableが発行する値の購読を停止
Console.WriteLine("dispose method call.");
subscription1.Dispose();

// 処理が完了したあとに購読
Console.WriteLine("subscribe2");
var subscription2 = source.Subscribe(
    i => Console.WriteLine("2##OnNext({0})", i),
    ex => Console.WriteLine("2##OnError({0})", ex.Message),
    () => Console.WriteLine("2##Completed()"));

// 購読解除
Console.WriteLine("dispose method call.");
subscription2.Dispose();

普通はコールバックで書く非同期呼び出しをIObservableにラッピングします。このメソッドもToAsyncと同様に戻り値がデリゲートなので、デリゲートを呼び出すことで非同期処理が開始されます。実行結果を下記に示します。

process start.
subscribe2
sleep 5sec
process end.
1##OnNext(12)
1##Completed()
dispose method call.
subscribe2
2##OnNext(12)
2##Completed()
dispose method call.

ここでも、処理が終わった後にSubscribeをしてもOnNextとOnCompletedが呼ばれていることがわかります。

次回

次回からは、IObservableに定義されている拡張メソッドについて紹介します。この拡張メソッドがあるからこそReactive Extensionsを強力たらしめていると思っています。少しずつ楽しい説明が出来るように頑張ります!