かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions v2.0 betaの記事が凄い

Reactive Extensions Team BlogReactive Extensions v2.0 Beta available now!の記事が凄くアツイです。
前半は、まぁ置いといて途中の「Rx v2.0 and .NET 4.5 “async” / “await” – a better together story」というところからasyncとawaitを使ったプログラムの例がガンガン出てきますが、これが凄い!目から鱗がポロポロしてました。
かる〜く見るだけでも、今後のC#での非同期プログラミングの方向性がなんとなく見えてくるかもしれません!

Reactive Extensions再入門 その47「 Reactive Extensions 入門 ソースコードスニペット集」

ここまでのBlog記事を書くときにコードを書き溜めたソリューションを公開します。きちんと整理されているものではないですが、実際に動かして試せるコードのベースになると思います。Code Recipeからダウンロードできます。

過去記事インデックス

Reactive Extensions再入門 その46「 Reactive Extensions 入門 」

ここまで、自分のReactive Extensionsの復習もかねて基本的なメソッドの動作を確認するためのプログラムの記録を淡々と書いてきました。わかりにくかったり、図が少なかったり文章が拙かったりするところはあると思いますが、個人的に一区切りだと思ってるところまで書けたので、Blogの記事をまとめてPDFにしたものを公開したいと思います。

Reactive extensions入門v0.1

過去記事インデックス

Reactive Extensions再入門 その45「Scheduler」

過去記事インデックス

はじめに

ここまでは、IObservableの作成や合成やLINQのメソッドなどについてざ〜っと見てきましたが、今回は毛色を変えてSchedulerという機能についてみていこうと思います。

Scheduler

ここでは、Reactive Extensionsの特徴的な機能の1つSchedulerについて説明します。Reactive Extensionsでは、Schedulerという仕組みを使ってIObservableのシーケンスの処理の実行場所と時間を自在に制御することができます。この機能を提供するためにReactive Extensionsでは、ISchedulerというインターフェースを定義しています。ISchedulerインターフェースは下記のような1つのプロパティと3つのメソッドを持つシンプルなインターフェースです。

public interface IScheduler
{
    DateTimeOffset Now { get; }

    public IDisposable Schedule<TState>(
        TState state, 
        DateTimeOffset dueTime, 
        Func<IScheduler, TState, IDisposable> action);

    public IDisposable Schedule<TState>(
        TState state, 
        TimeSpan dueTime, 
        Func<IScheduler, TState, IDisposable> action);

    public IDisposable Schedule<TState>(
        TState state, 
        Func<IScheduler, TState, IDisposable> action);
}

ISchedulerインターフェースのNowプロパティがスケジューラ上の現在の時間を表し、ScheduleメソッドでSchedulerで実行する処理を設定します。実際にISchedulerインターフェースを実装することや、ここで定義されているメソッドを使用することは、ほとんどありませんが、基本としては上記のようなものを使用しているという点はなんとなく頭の片隅に入れておいてください。

実行場所の切り替え

Reactive Extensionsは実行場所を制御するためのISchedulerインターフェースの実装クラスがいくつか定義されています。Reactive Extensionsで定義されている実装クラスを以下に示します。

  • ImmediateScheduler
    • 現在のスレッド上で即座に処理を実行するSchedulerです。
  • CurrentThreadScheduler
    • 現在のスレッド上で処理を逐次実行するSchedulerです。
  • ThreadPoolScheduler
    • ThreadPool上で処理を実行するSchedulerです。
  • NewThreadScheduler
    • 新規スレッドを作り、その上で処理を実行するSchedulerです。
  • TaskPoolScheduler
    • TPLのTask上で処理を実行するSchedulerです。

上記のSchedulerはSystem.Reactive.Concurrency.Schedulerクラスのstaticプロパティとしてインスタンスが提供されています。ImmediateSchedulerとCurrentThreadSchedulerは、どちらも現在のスレッド上で処理を実行しますが、ImmediateSchedulerが、依頼された処理をその場で実行するのに対して、CurrentThreadSchedulerは、依頼された処理をキューに追加して順次実行していきます。動作の違いを示すコード例を下記に示します。

Console.WriteLine("CurrentThread");
// ChrrentThreadSchedulerで処理を実行
Scheduler.CurrentThread.Schedule(() =>
{
    Console.WriteLine("Start");
    // 処理の内部でさらに処理をCurrentThreadSchedulerに依頼
    Scheduler.CurrentThread.Schedule(() =>
    {
        Console.WriteLine("    Start");
        Console.WriteLine("    End");
    });
    Console.WriteLine("End");
});

Console.WriteLine("Immediate");
// ImmediateSchedulerで処理を実行
Scheduler.Immediate.Schedule(() =>
{
    Console.WriteLine("Start");
    // 処理の内部でさらに処理をImmediateSchedulerに依頼
    Scheduler.Immediate.Schedule(() =>
    {
        Console.WriteLine("    Start");
        Console.WriteLine("    End");
    });
    Console.WriteLine("End");
});

CurrentThreadSchedulerとImmediateSchedulerの双方で、Scheduleメソッドを再起呼び出ししています。このコードの実行結果を以下に示します。

CurrentThread
Start
End
    Start
    End
Immediate
Start
    Start
    End
End

CurrentThreadSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理が終了してから実行しているのに対して、ImmediateSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理を中断して実行している(通常のメソッド呼び出しをしたのと同じ)ことが確認できます。

IObservable生成時のSchedulerの指定方法

IObservableのシーケンスを生成するメソッドの大半にはISchedulerインターフェースを受け取るオーバーロードがあります。このオーバーロードにISchedulerの実装クラスのインスタンスを渡すことで実行場所を設定できます。Timerメソッドを例にスケジューラの指定方法を示します。コード例を下記に示します。

Console.WriteLine("Normal");
// 開始のログ
Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now);
// 1秒後に値を発行する
Observable.Timer(TimeSpan.FromSeconds(1))
    .Subscribe(
        i => Console.WriteLine("OnNext({0})", i),
        () => Console.WriteLine("OnCompleted"));
// 終了のログ
Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now);
// Enterが押されるまで待つ
Console.ReadLine();

Console.WriteLine("CurrentThreadScheduler");
// 開始のログ
Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now);
// 1秒後に値を発行する, 実行場所は現在のスレッド
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.CurrentThread)
    .Subscribe(
        i => Console.WriteLine("OnNext({0})", i),
        () => Console.WriteLine("OnCompleted"));
// 終了のログ
Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now);
// Enterが押されるまで待つ
Console.ReadLine();

Timerメソッドを2回呼び出して発行された値をコンソールに出力しています。最初のTimerメソッドの呼び出しは、Schedulerを指定しないものです。2つ目のTimerメソッドの呼び出しではScheduler.CurrentThreadを指定しています。このようにメソッドの最後の引数にSchedulerを渡すことで、生成されたIObservableのシーケンスが何処で実行されるのか指定できます。実行結果を以下に示します。

Normal
20:25:22: Start!!
20:25:22: End!!
OnNext(0)
OnCompleted

CurrentThreadScheduler
20:25:26: Start!!
OnNext(0)
OnCompleted
20:25:27: End!!

CurrentThreadSchedulerを指定したほうでは、OnNextとOnCompletedが現在のスレッド上で実行されるため、Start!!とEnd!!の実行順序がSchedulerを指定しない場合と異なっていることが確認できます。

デフォルトのScheduler

TimerメソッドやIntervalメソッドのように、現在のスレッドをブロックしない形のIObservableのシーケンスを生成するメソッドではデフォルトのSchedulerはThreadPoolSchedulerが指定されています。また、現在のスレッドをブロックするIObservableのシーケンスを生成するメソッドではImmediateSchedulerが指定されています。

Schedulerの切り替え

ここでは、IObservableのシーケンスの処理中にSchedulerを切り替える方法について説明します。

ObserveOnメソッド

IObservableのシーケンスの処理中にSchedulerを切り替えるにはObserveOnメソッドを使用します。ObserveOnメソッドはIObservableの拡張メソッドとして定義されています。メソッドのシグネチャを以下に示します。

public static IObservable<TSource> ObserveOn<T>(
    this IObservable<T> source, 
    IScheduler scheduler);

public static IObservable<T> ObserveOn<TSource>(
    this IObservable<T> source, 
    SynchronizationContext context);

ISchedulerを受け取るオーバーロードは、後続の処理を指定したScheduler経由で実行するメソッドです。SynchronizationContextを受け取るオーバーロードは、SynchronizationContext経由で後続の処理を実行するメソッドです。実際には、SynchronizationContextSchedulerというクラスがReactive Extensionsで定義されているので、実際にはObserveOn(new SynchronizationContextScheduler())へのショートカットのメソッドです。ObserveOnメソッドのコード例を下記に示します。

// 1秒間隔で3つ値を発行する
Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Take(3)
    // 現在のスレッドIDを表示
    .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId))
    // 実行スレッドを新しいスレッドに切り替える
    .ObserveOn(Scheduler.NewThread)
    // 現在のスレッドIDを表示
    .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId))
    // 購読して、発行された値とスレッドIDを表示
    .Subscribe(
        i => Console.WriteLine("OnNext({0}), ThreadId: {1}", i,
Thread.CurrentThread.ManagedThreadId),
        () => Console.WriteLine("OnCompleted, ThreadId: {0}",
Thread.CurrentThread.ManagedThreadId));

Console.ReadLine();

TimerメソッドとTakeメソッドを使って1秒間隔で3つの値を発行しています。そしてObserveOnメソッドを使ってスレッドを新規スレッドに切り替えています。スレッドの切り替え前後でDoメソッドを使ってスレッドIDを表示しています。SubscribeメソッドでもスレッドIDを表示して実行スレッドがどうなっているのか確認しています。実行結果を以下に示します。

ThreadId: 4
ThreadId: 5
OnNext(0), ThreadId: 5
ThreadId: 4
ThreadId: 7
OnNext(1), ThreadId: 7
ThreadId: 6
ThreadId: 8
OnNext(2), ThreadId: 8
OnCompleted, ThreadId: 9

スレッドIDが切り替わっていることが確認できます。このことから、IObservableのシーケンスの途中で実行スレッドの切り替えができることがわかります。

ObserveOnの使用用途

実行場所(ここでの例では実行スレッド)の切り替えが、どのような用途で使えるのかについて説明します。代表的な用途では、Windows FormsやWPFなどのようなUIを操作するケースです。たとえば、ボタンクリックのイベントなどをトリガーにバックグラウンドでWeb APIを呼び出して、結果を画面に反映させる操作があるとします。
このとき、イベントの購読とイベントの値の加工程度まではUIスレッドで行い、Web APIを呼び出す箇所はバックグラウンドで行うのがUIをフリーズさせないようにするため最近では一般的な方法です。そして、最後のUIへの結果の反映で、再びUIスレッドに切り替えて処理を行います。これは、UIフレームワークが一般的にUIスレッド以外からの操作に対応していないために必要になります。このように「イベントの発火→非同期での処理の呼び出し→結果の処理」といったReactive Extensionsで1シーケンスで記述できるような処理内でのスレッドの切り替えにObserveOnメソッドは効果を発揮します。

時間を制御するScheduler

Schedulerには、実行場所を制御するもののほかに、時間を制御するタイプのSchedulerがあります。実際の処理内部では、あまり出てくる頻度は高くないですがテスト時に24時間かかる処理を24時間待たずに一瞬で24時間後の状態にするといったことも可能なため、このようなものもあるのだと把握しておいてください。

HistoricalSchedulerクラス

時間を制御するSchedulerクラスとしてHistoricalSchedulerを紹介します。HistoricalSchedulerはAdvancedByメソッドやAdvancedToメソッドを使ってScheduler内の時間を制御します。AdvancedByがTimeSpan型でAdvancedToメソッドがDateTimeOffcetを使って時間を指定します。HistoricalSchedulerクラスの使用例を下記に示します。

var scheduler = new HistoricalScheduler();
// 1秒間隔で値を発行する
Observable.Interval(TimeSpan.FromSeconds(1), scheduler)
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext({0})", i),
        () => Console.WriteLine("OnCompleted"));

// HistoricalSchedulerを使ってプログラム内でSchedulerの時間を進める
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));

Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservableのシーケンスを作成しています。このときのSchedulerにHistoricalSchedulerを指定しています。そしてAdvanceByメソッドを使ってプログラム内でSchedulerの時間を進めています。0.5秒×5回の時間を進めているので合計で2.5秒の時間が進んでいることになります。実行結果を以下に示します。

AdvanceBy(0.5sec)
AdvanceBy(0.5sec)
OnNext(0)
AdvanceBy(0.5sec)
AdvanceBy(0.5sec)
OnNext(1)
AdvanceBy(0.5sec)

AdvanceByを2回呼んだ時点で値が発行され、さらにAdvanceByを2回呼んだ時点で値が発行されていることが確認できます。このようにReactive Extensionsでは時間をScheduler経由で取得するようにすることで、プログラムから柔軟に時間を制御することができるようになっています。

Reactive Extensions再入門 その44「And, Then, Whenメソッド」

過去記事インデックス

はじめに

合成系メソッドの紹介も今回で最後の予定です!今回は、個人的に理解にてこずったWhenメソッドと、その周辺メソッド達です。

Whenメソッド

ここではWhenメソッドについて説明します。Whenメソッドのシグネチャを以下に示します。

public static IObservable<TResult> When<TResult>(params Plan<TResult>[] plans);
public static IObservable<TResult> When<TResult>(this IEnumerable<Plan<TResult>> plans);

Whenメソッドには2つのオーバーロードがあり、片方がPlan型の可変長引数で、もう一方がIEnumerable>の拡張メソッドです。どちらも複数のPlanを纏めるという意味合いになります。

Planクラスの作成

Whenメソッドの引数に渡すPlanクラスの作成方法について説明します。Planクラスのインスタンスを作成する前に、まずPatternというクラスのインスタンスを用意します。PatternクラスはAndというメソッドで作成します。AndメソッドはIObservableのシーケンスを引数に取る拡張メソッドとして定義されています。シグネチャを以下に示します。

public static Pattern<TLeft, TRight> And<TLeft, TRight>(
    this IObservable<TLeft> left, 
    IObservable<TRight> right);

この戻り値のPatternクラスにはIObservableを受け取るAndというメソッドが定義されています。シグネチャを以下に示します。

public Pattern<T1, T2, T3> And<T3>(IObservable<T3> other);

このように、このPatternにもIObservableを受け取ってPattern型の値を返すメソッドが定義されています。このように、Andメソッドを使って複数のIObservableのシーケンスを繋げてPatternクラスにまとめることが出来ます。Patternクラスの型引数は、16個まで定義されているので事実上数を気にすることなくつなげていくことが出来ます。
Patternクラスには型引数の数の引数を受け取るデリゲートを受け取るThenメソッドが定義されています。Patternクラスの場合のThenメソッドのシグネチャを以下に示します。

public Plan<TResult> Then<TResult>(Func<T1, T2, T3, TResult> selector);

このメソッドからPlanクラスのインスタンスが作成されます。Thenメソッドで渡したデリゲートは、それまでのAndで繋いだIObservableのシーケンスの全てから値が発行されたタイミングで実行されます。

Whenメソッドの使用例

では、上記のAndメソッドとThenメソッドを使ってPlanクラスのインスタンスを作成するコード例を下記に示します。

var plan = Observable
    // plan1という文字列を10個発行するIObservable<string>
    .Return("plan1").Repeat(10)
    // 1秒間隔で0からのカウントアップにタイムスタンプをつけるIObservable<Timestamped<long>>
    .And(Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp())
    // 100〜110の値を発行するIObservable<int>
    .And(Observable.Range(100, 10))
    // 3つの値を文字列として纏める
    .Then((planName, timestamped, value) => 
        string.Format("{0} {1} {2}", planName, timestamped, value));
// WhenでPlan<string>からIObservable<string>にして購読
Observable.When(plan).Subscribe(
    s => Console.WriteLine("OnNext: {0}", s),
    () => Console.WriteLine("OnCompleted"));
Console.ReadLine();

コメントにある通りですが、無限に値を発行し続けるIObservableのシーケンスから10個の値を発行するIObservableのシーケンスをAndで繋いでThenメソッドで文字列化しています。そして、Whenメソッドに渡して購読しています。Thenメソッドは、全てのAndメソッドから値が発行されたタイミングで実行されるため、もっとも短い10個しか値を発行しないIObservableのシーケンスが終了したタイミングでThenメソッドも呼ばれなくなります。そのため、このPlanから生成したIObservableのシーケンスは10個しか値を発行しないということになります。実行結果を以下に示します。

OnNext: plan1 0@2012/02/22 23:47:09 +09:00 100
OnNext: plan1 1@2012/02/22 23:47:10 +09:00 101
OnNext: plan1 2@2012/02/22 23:47:11 +09:00 102
OnNext: plan1 3@2012/02/22 23:47:12 +09:00 103
OnNext: plan1 4@2012/02/22 23:47:13 +09:00 104
OnNext: plan1 5@2012/02/22 23:47:14 +09:00 105
OnNext: plan1 6@2012/02/22 23:47:15 +09:00 106
OnNext: plan1 7@2012/02/22 23:47:16 +09:00 107
OnNext: plan1 8@2012/02/22 23:47:17 +09:00 108
OnNext: plan1 9@2012/02/22 23:47:18 +09:00 109
OnCompleted

Andで連結したIObservableのシーケンスから発行された値をThenで加工した結果が表示されていることが確認できます。また、10個の値が発行されたらOnCompletedが呼ばれていることも確認できます。
次に、複数のPlanをWhenメソッドに渡した場合のコード例を下記に示します。

var plan1 = Observable
    // plan1という文字列を10個発行するIObservable<string>
    .Return("plan1").Repeat(10)
    // 1秒間隔で0からのカウントアップにタイムスタンプをつけるIObservable<Timestamped<long>>
    .And(Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp())
    // 100〜110の値を発行するIObservable<int>
    .And(Observable.Range(100, 10))
    // 3つの値を文字列として纏める
    .Then((planName, timestamped, value) =>
        string.Format("{0} {1} {2}", planName, timestamped, value));
                
var plan2 = Observable
    // plan2という文字列を20個発行するIObservable<string>
    .Return("plan2").Repeat(20)
    // 0.5s間隔で0から値を発行していくIObservable<long>
    .And(Observable.Interval(TimeSpan.FromSeconds(0.5)))
    // Thenで文字列に纏める
    .Then((s, l) => string.Format("{0} {1}", s, l));

Observable.When(plan1, plan2).Subscribe(
    s => Console.WriteLine("OnNext: {0}", s),
    () => Console.WriteLine("OnCompleted"));

plan1は、最初の例と同じように作成しています。plan2は、20個のplan2という文字列と、0.5秒間隔で0から値をカウントアップしていくIObservableをAndで繋いでThen内で文字列化しています。この2つのWhenで繋いでIObservableに変換して購読しています。実行結果を以下に示します。

OnNext: plan2 0
OnNext: plan1 0@2012/02/22 23:53:21 +09:00 100
OnNext: plan2 1
OnNext: plan2 2
OnNext: plan2 3
OnNext: plan1 1@2012/02/22 23:53:22 +09:00 101
OnNext: plan2 4
OnNext: plan2 5
OnNext: plan1 2@2012/02/22 23:53:23 +09:00 102
OnNext: plan2 6
OnNext: plan2 7
OnNext: plan1 3@2012/02/22 23:53:24 +09:00 103
OnNext: plan2 8
OnNext: plan2 9
OnNext: plan1 4@2012/02/22 23:53:25 +09:00 104
OnNext: plan2 10
OnNext: plan2 11
OnNext: plan1 5@2012/02/22 23:53:26 +09:00 105
OnNext: plan2 12
OnNext: plan2 13
OnNext: plan1 6@2012/02/22 23:53:27 +09:00 106
OnNext: plan2 14
OnNext: plan2 15
OnNext: plan1 7@2012/02/22 23:53:28 +09:00 107
OnNext: plan2 16
OnNext: plan2 17
OnNext: plan1 8@2012/02/22 23:53:29 +09:00 108
OnNext: plan2 18
OnNext: plan1 9@2012/02/22 23:53:30 +09:00 109
OnNext: plan2 19
OnCompleted

plan1から発行された値と、plan2から発行された値が混在して表示されていることが確認できます。このことからWhenで繋いだPlanから発行された値はMergeメソッドで合成したように、値が発行された順番で後続に流されていることがわかります。

まとめ

このようにAndメソッドは使ってZipメソッドのように複数のIObservableのシーケンスを合成してThenメソッドで値の変換を行うPlanクラスのインスタンスを作成します。そしてWhenメソッドにPlanを複数渡すことでAndで繋いだものをMergeメソッドで合成したように繋げることが出来ます。
恐らく合成系のメソッドの中で一番自由度が高く複雑な合成が出来るメソッドになります。

Reactive Extensions再入門 その43「GroupJoinメソッド」

過去記事インデックス

はじめに

今回はGroupJoinメソッドについてみていきます。使いどころは、まだわからないですが動きはとりあえず把握しておきましょう!

GroupJoinメソッド

ここでは、GroupJoinメソッドについて説明します。GroupJoinメソッドは、2つのIObservableのシーケンスのうち左辺の値に対して右辺の値のIObservableのシーケンスの組み合わせを合成します。このメソッドのシグネチャを以下に示します。

public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, IObservable<TRight>, TResult> resultSelector);

Joinメソッドと、ほぼ同じシグネチャですが、resultSelectorデリゲートの引数に違いがあります。resultSelectorデリゲートの第二引数がJoinメソッドではTRight型だったのに対してGroupJoinメソッドではIObservableになります。GroupJoinメソッドのresultSelectorは、left引数から値が発行されたタイミングで呼び出されます。この時のresultSelectorの第一引数が、leftから発行される値で、第二引数がrightから発行された有効期限内の値を発行するIObservableになります。このメソッドの使用例を下記に示します。

// センサー名
var sensors = new Subject<string>();
// センサーが受信する値
var values = new Subject<int>();

// 値のリセット用Subject
var valueReset = new Subject<Unit>();

sensors.GroupJoin(
    values,
    // センサーは有効期限無し
    _ => Observable.Never<Unit>(),
    // センサーの値はvalueResetのOnNextで無効に出来る
    _ => valueReset,
    // センサーの名前と、センサーが受け取った値の現在の合計値を発行するLogにして後続に流す
    (l, r) => new { Name = l, Log = r.Scan((x, y) => x + y) })
    .Subscribe(
        sensor =>
        {
            // Logを表示する
            sensor
                .Log
                .Subscribe(i => Console.WriteLine("{0}: {1}", sensor.Name, i));
        },
        // 完了
        () => Console.WriteLine("OnCompleted"));

// センサーを2つ登録
Console.WriteLine("sensors.OnNext(sensor1)");
sensors.OnNext("sensor1");
Console.WriteLine("sensors.OnNext(sensor2)");
sensors.OnNext("sensor2");

// 値を3つ発行
Console.WriteLine("values.OnNext(100)");
values.OnNext(100);
Console.WriteLine("values.OnNext(10)");
values.OnNext(10);
Console.WriteLine("values.OnNext(1)");
values.OnNext(1);

// センサーの値を一旦リセット
Console.WriteLine("valueReset.OnNext()");
valueReset.OnNext(Unit.Default);

// 新しいセンサーを追加
Console.WriteLine("sensors.OnNext(sensor3)");
sensors.OnNext("sensor3");
// 値を3つ発行
Console.WriteLine("values.OnNext(1)");
values.OnNext(1);
Console.WriteLine("values.OnNext(2)");
values.OnNext(2);
Console.WriteLine("values.OnNext(3)");
values.OnNext(3);

// 終了
Console.WriteLine("values.OnCompleted()");
values.OnCompleted();
Console.WriteLine("sensors.OnCompleted()");
sensors.OnCompleted();

コードのイメージはセンサーと、センサーが受信した値の合計値をリアルタイムで表示するプログラムです。センサーが受信する値は任意のタイミング(valueReset変数のOnNext)でリセットできるようにしています。このメソッドの実行結果を以下に示します。

sensors.OnNext(sensor1)
sensors.OnNext(sensor2)
values.OnNext(100)
sensor1: 100
sensor2: 100
values.OnNext(10)
sensor1: 110
sensor2: 110
values.OnNext(1)
sensor1: 111
sensor2: 111
valueReset.OnNext()
sensors.OnNext(sensor3)
values.OnNext(1)
sensor1: 112
sensor2: 112
sensor3: 1
values.OnNext(2)
sensor1: 114
sensor2: 114
sensor3: 3
values.OnNext(3)
sensor1: 117
sensor2: 117
sensor3: 6
values.OnCompleted()
sensors.OnCompleted()
OnCompleted

sensor1とsensor2は、発行された値の合計を全て保持していることが確認できますが、valueResetのOnNextを呼んだ後に追加したsensor3は、途中からの値の合計しか集計していないことが確認できます。

Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」

過去記事インデックス

はじめに

今回も、前回に続いて合成する関係のメソッドを使ってみようと思います。

StartWithメソッド

ここでは、StartWithメソッドについて説明します。StartWithメソッドのシグネチャを以下に示します。

public static IObservable<T> StartWith<T>(this IObservable<T> source, params T[] values);

このメソッドは、sourceで渡したIObservableのシーケンスの先頭にvaluesで指定した要素を合成して1つのIObservableのシーケンスにします。このメソッドの使用例を下記に示します。

Observable
    // 1〜3の値を発行するIObservable<T>のシーケンス
    .Range(1, 3)
    // 頭に10, 20, 30をつける
    .StartWith(10, 20, 30)
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i));

Rangeメソッドを使って1〜3の値を発行するIObservableのシーケンスを作成して、StartWithメソッドで10, 20, 30の値を先頭に追加して、Subscribeメソッドで購読をしています。このメソッドの実行例を以下に示します。

OnNext: 10
OnNext: 20
OnNext: 30
OnNext: 1
OnNext: 2
OnNext: 3

Joinメソッド

ここでは、Joinメソッドについて説明します。Joinメソッドは2つのIObservableのシーケンスから発行された値の全ての組み合わせをもとに、値を生成して発行するIObservableのシーケンスを作成します。このメソッドのシグネチャを以下に示します。

public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(
    this IObservable<TLeft> left, 
    IObservable<TRight> right, 
    Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, 
    Func<TRight, IObservable<TRightDuration>> rightDurationSelector, 
    Func<TLeft, TRight, TResult> resultSelector);

Joinメソッドは、leftとrightから値が発行されると、それぞれleftDurationSelector引数とrightDurationSelector引数で指定したデリゲートが呼ばれて、その値の有効期間を示すIObservableが生成されます。この有効期間を示すIObservableから値が発行されたらleftとrightから発行された対象となる値はJoinメソッドで使われなくなります。そして、leftとrightから値が発行されると、現在有効な値の全てを使ってresultSelectorが呼び出されます。例えばleftから発行された値で有効なものが(1, 2, 3)でrightから発行された値で有効なものが(10, 20, 30)の場合は、resultSelectorは(1, 10)(1, 20)(1, 30)(2, 10)(2, 20)(2, 30)(3, 10)(3, 20)(3, 30)の組み合わせの引数で呼び出されます。ここで生成された値がJoinメソッドの結果のIObservableから発行される値になります。
このメソッドのコード例を下記に示します。

// Joinで合成するIObservable<T>
var left = new Subject<int>();
var right = new Subject<int>();

left.Join(
    right,
    // leftから発行される値の有効期間は永久
    _ => Observable.Never<Unit>(),
    // rightから発行される値の有効期間は永久
    _ => Observable.Never<Unit>(),
    // 発行された値の組を作る
    Tuple.Create)
    // 購読
    .Subscribe(
        // 組を表示
        tuple => Console.WriteLine("Left: {0}, Right: {1}", tuple.Item1, tuple.Item2),
        // 完了を表示
        () => Console.WriteLine("OnCompleted"));

// 値の発行
Console.WriteLine("left.OnNext(1)");
left.OnNext(1);
Console.WriteLine("right.OnNext(10)");
right.OnNext(10);
Console.WriteLine("right.OnNext(100)");
right.OnNext(100);
Console.WriteLine("left.OnNext(2)");
left.OnNext(2);

// 終了
Console.WriteLine("left.OnCompleted()");
left.OnCompleted();
Console.WriteLine("right.OnCompleted()");
right.OnCompleted();

Joinメソッドを使って2つのIObservableのシーケンスから発行される値を組(Tuple)にしています。Joinの引数で渡したIObservableのシーケンスから発行される値の有効期間は、Observable.Neverメソッドを使って永遠にOnNextの呼ばれないIObservableのシーケンスを指定しているため、永遠に無効にならないようにしています。Subscribeでは組の値を表示しています。このコードの実行結果を以下に示します。

left.OnNext(1)	← この段階ではleftに1があるだけ
right.OnNext(10)	← この段階でleftに1, rightに10があるため(1, 10)の組が生成される
Left: 1, Right: 10
right.OnNext(100)	← この段階でleftに1, rightに10, 100があるため (1, 100)の組が追加で生成される
Left: 1, Right: 100
left.OnNext(2)	← この段階でleftに1, 2, rightに10, 100があるため(2, 10), (2, 100)の組が追加で生成される
Left: 2, Right: 10
Left: 2, Right: 100
left.OnCompleted()
right.OnCompleted()
OnCompleted	← rightとleftが両方終了したため、これ以降新たな組み合わせが出来ないため終了

この例のように値の有効期間にObservable.Neverを使うと、leftとrightから発行された全ての組合わせを生成することが出来ます。Never以外の値の有効期間を指定したときのコード例を下記に示します。

// Joinで合成するIObservable<T>
var left = new Subject<int>();
var right = new Subject<int>();

left.Join(
    right,
    // leftから発行される値の有効期間は永久
    _ => Observable.Never<Unit>(),
    // rightから発行される値の有効期間は一瞬
    _ => Observable.Empty<Unit>(),
    // 発行された値の組を作る
    Tuple.Create)
    // 購読
    .Subscribe(
    // 組を表示
        tuple => Console.WriteLine("Left: {0}, Right: {1}", tuple.Item1, tuple.Item2),
    // 完了を表示
        () => Console.WriteLine("OnCompleted"));

// 値の発行
Console.WriteLine("left.OnNext(1)");
left.OnNext(1);
Console.WriteLine("right.OnNext(10)");
right.OnNext(10);
Console.WriteLine("right.OnNext(100)");
right.OnNext(100);
Console.WriteLine("left.OnNext(2)");
left.OnNext(2);
Console.WriteLine("right.OnNext(1000)");
right.OnNext(1000);

// 終了
Console.WriteLine("left.OnCompleted()");
left.OnCompleted();
Console.WriteLine("right.OnCompleted()");
right.OnCompleted();

上記のコードはrightから発行された値の有効期間をObservable.Empty()で指定しているため、値が発行された瞬間しか有効期間が無いようにしています。このプログラムの実行結果を以下に示します。

left.OnNext(1)
right.OnNext(10)
Left: 1, Right: 10
right.OnNext(100)
Left: 1, Right: 100
left.OnNext(2)
right.OnNext(1000)
Left: 1, Right: 1000
Left: 2, Right: 1000
left.OnCompleted()
right.OnCompleted()
OnCompleted

rightから値が発行されると、今までleftから発行された値との組み合わせが生成されていることが確認できます。逆にleftから値が発行されてもrightから発行された値には有効なものが1つもないため、resultSelectorで指定したデリゲートは呼ばれません。
ここでは、コード例を示しませんが、値の有効期限にタイマーやイベントなどから生成したIObservableを指定することで、よりきめ細やかに値の有効期限を指定することが出来ます。

Reactive Extensions再入門 その41「どんどん合成するよ」

過去記事インデックス

はじめに

前回に引き続き、どんどんIObservableを合成するメソッドを試してみようと思います!!

Concatメソッド

ここでは、Concatメソッドについて説明します。Concatメソッドは、その名前の通り複数のIObservableのシーケンスを直列で繋ぎます。例えば、3つのIObservableのシーケンスを渡すと1つ目のIObservableのシーケンスが完了するまでの間は1つ目のIObservableのシーケンスを後続に流します。1つ目のIObservableのシーケンスが完了すると、2つ目のIObservableのシーケンスの値を流します。2つ目のIObservableのシーケンスが完了すると、3つ目のIObservableのシーケンスの値を流します。そして3つ目のIObservableのシーケンスが完了すると、完了したということを後続に通知します。Concatメソッドには2つのシーケンスを繋ぐことに特化したオーバーロードと、複数のシーケンスを繋ぐためのメソッドのオーバーロードがあります。このメソッドのオーバーロードを以下に示します。

// 引数で渡した全てのIObservable<T>のシーケンスを繋ぐ
public static IObservable<T> Concat<T>(params IObservable<T>[] sources);
// IEnumerable<IObservable<T>>から取得できるすべてのIObservable<T>を繋ぐ
public static IObservable<T> Concat<T>(this IEnumerable<IObservable<T>> sources);
// IObservable<IObservable<T>>から発行されるすべてのIObservable<T>を繋ぐ
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> sources);
// firstとsecondを繋ぐ
public static IObservable<T> Concat<T>(this IObservable<T> first, IObservable<T> second);

Concatメソッドの動作を示すためのコード例を下記に示します。

Observable
    // IObservable<T>のシーケンスを直列につなげる
    .Concat(
        // "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "1st: " + i).Take(3),
        // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "2nd: " + i).Take(3),
        // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "3rd: " + i).Take(3))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservableのシーケンスを3つ作成して、Concatメソッドで繋いでいます。1秒間隔で値を発行するため、Mergeメソッドでは1つ目と2つ目と3つ目のIObservableのシーケンスの発行する値が混在しますが、Concatは前のIObservableのシーケンスが完了しない限り後ろのIObservableのシーケンスが発行する値を、後続に流さないため上記のサンプルではSubscribeのOnNextに値が渡る順番が保障されています。実行結果を以下に示します。

OnNext: 1st: 0
OnNext: 1st: 1
OnNext: 1st: 2
OnNext: 2nd: 0
OnNext: 2nd: 1
OnNext: 2nd: 2
OnNext: 3rd: 0
OnNext: 3rd: 1
OnNext: 3rd: 2
OnCompleted

前のIObservableのシーケンスが完了するまで次のIObservableのシーケンスが発行される値がOnNextに流れていないことが確認できます。このプログラムは、可変長引数のオーバーロードを使いましたが、下記のように別のオーバーロードを使っても同じように書くことができます。

// "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する
Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(i => "1st: " + i)
    .Take(3)
    .Concat(
        // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => "2nd: " + i)
            .Take(3))
        .Concat(
        // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する
        Observable.Interval(TimeSpan.FromSeconds(1))
            .Select(i => "3rd: " + i)
            .Take(3))
// 購読
.Subscribe(
    s => Console.WriteLine("OnNext: {0}", s),
    () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

実行結果は同じため省略します。

Zipメソッド

ここでは、Zipメソッドについて説明します。Zipメソッドは、2つのIObservableのシーケンスから値が発行されるのを待って、2つの値を変換して、変換した結果を後ろに流すメソッドです。メソッドのシグネチャを下記に示します。

public static IObservable<R> Zip<T, U, R>(
    this IObservable<T> first, 
    IObservable<U> second, 
    Func<T, U, R> resultSelector);

firstから発行された値とsecondから発行された値をresultSelectorで変換して後ろに流します。また、firstかsecondのどちらかのIObservableのシーケンスが完了状態になった段階で後続に完了通知を発行します。このメソッドのコード例を下記に示します。

// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3)
    // Zipでまとめる
    .Zip(
        // 100, 101, 102を発行するIObservable<T>のシーケンス
        Observable.Range(100, 3), 
        // 渡された値を元に文字列化
        (l, r) => string.Format("{0}-{1}", l, r))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

1秒間隔で0, 1, 2の値を発行するIObservableのシーケンスと100, 101, 102の値を発行するIObservableのシーケンスをZipメソッドで合成しています。実行結果を以下に示します。

OnNext: 0-100
OnNext: 1-101
OnNext: 2-102
OnCompleted

Zipメソッドで合成された3つの値がOnNextに渡ってきていることが確認できます。上記のプログラムを少し変更して片方のIObservableのシーケンスが発行する値の数を1つにしたコードを下記に示します。

// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ
Observable.Interval(TimeSpan.FromSeconds(1)).Take(3)
    // Zipでまとめる
    .Zip(
        // 100を発行するIObservable<T>のシーケンス
        Observable.Return(100), 
        // 渡された値を元に文字列化
        (l, r) => string.Format("{0}-{1}", l, r))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Console.ReadLine();

IntervalメソッドからTake(3)で3つの値を発行するIObservableのシーケンスとReturnメソッドで1つの値しか発行しないIObservableのシーケンスをZipメソッドで合成しています。実行結果を下記に示します。

OnNext: 0-100
OnCompleted

OnNextが1回しか実行されていないことが確認できます。このことからZipメソッドが、合成対象のIObservableのシーケンスのどちらか片方が終了した段階で、後続に完了通知を発行することが確認できます。

Ambメソッド

ここではAmbメソッドについて説明します。Ambメソッドは複数のIObservableのシーケンスの中から一番最初に値を発行したIObservableのシーケンスの値を後続に流すメソッドです。このメソッドのオーバーロードを以下に示します。

// 引数で渡した全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す
public static IObservable<T> Amb<T>(params IObservable<T>[] sources);
// sources内の全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す
public static IObservable<T> Amb<T>(this IEnumerable<IObservable<T>> sources);
// firstとsecondのうち最初に値を発行したものの値を後ろに流す
public static IObservable<T> Amb<T>(this IObservable<T> first, IObservable<T> second);

他の合成系メソッドと同様に可変長引数やIEnumerable>の拡張メソッドや2つの合成に特化したオーバーロードが用意されています。Ambメソッドのコード例を下記に示します。

Observable.Amb(
    // 3秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => "3sec"),
    // 10秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(10)).Select(_ => "10sec"),
    // 2秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(2)).Select(_ => "2sec"),
    // 6秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(6)).Select(_ => "6sec"),
    // 22秒後に値を発行するIO<T>
    Observable.Timer(TimeSpan.FromSeconds(22)).Select(_ => "22sec"))
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

Timerメソッドを使って、指定した時間が経過した後に、値を発行するIObservableのシーケンスをAmbメソッドで合成しています。実行結果を以下に示します。

OnNext: 2sec
OnCompleted

引数で渡した中で一番早く値を発行するObservable.Timer(TimeSpan.FromSecond(2)).Select(_ => “2sec”)の結果がOnNextやOnCompletedに渡っていることが確認できます。

CombineLatestメソッド

ここでは、CombineLatestメソッドについて説明します。CombineLatestメソッドはZipメソッドのように2つのIObservableのシーケンスを合成するメソッドです。Zipメソッドが両方のIObservableのシーケンスから値が発行されるのを待つのに対してCombineLatestメソッドは、どちらか一方から値が発行されると、もう一方の最後に発行した値があるか確認し、あればそれを使って合成処理を行い、後続に値を流します。このメソッドのシグネチャを以下に示します。

public static IObservable<R> CombineLatest<F, S, R>(
    this IObservable<F> first, 
    IObservable<S> second, 
    Func<F, S, R> resultSelector);

このメソッドのコード例を下記に示します。

var source1 = new Subject<string>();
var source2 = new Subject<int>();

source1
    // source1とsource2を合成
    .CombineLatest(
        source2, 
        // 発行された値を結合して文字列化
        (l, r) => string.Format("{0}-{1}", l, r))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        () => Console.WriteLine("OnCompleted"));

// 適当に値を発行する
Console.WriteLine("source1.OnNext(foo)");
source1.OnNext("foo");
Console.WriteLine("source2.OnNext(100)");
source2.OnNext(100);
Console.WriteLine("source2.OnNext(200)");
source2.OnNext(200);
Console.WriteLine("source1.OnNext(bar)");
source1.OnNext("bar");
Console.WriteLine("source1.OnNext(boo)");
source1.OnNext("boo");

// source1完了
Console.WriteLine("source1.OnCompleted()");
source1.OnCompleted();
// source1完了後にsource2から値を発行する
Console.WriteLine("source2.OnNext(200)");
source2.OnNext(999);
// source2完了
Console.WriteLine("source2.OnCompleted()");
source2.OnCompleted();

このコードの実行結果を以下に示します。

source1.OnNext(foo)
source2.OnNext(100)
OnNext: foo-100
source2.OnNext(200)
OnNext: foo-200
source1.OnNext(bar)
OnNext: bar-200
source1.OnNext(boo)
OnNext: boo-200
source1.OnCompleted()
source2.OnNext(200)
OnNext: boo-999
source2.OnCompleted()
OnCompleted

最初のOnNextでは、source1とsource2の値が両方発行されるまで実行されませんが、それ以降は、source1かsource2のどちらかから値が発行されるとOnNextに値が発行されていることが確認できます。このことから、CombineLatestメソッドの特徴である、最後に発行された値をキャッシュしているということがわかります。そして、CombineLatestメソッドは両方のシーケンスが完了状態になった時点で完了通知を行うという動作も確認できます。これは、片方が完了しても、もう一方から値が発行されたら最後に発行された値を元に処理を継続できるためです。

Reactive Extensions再入門 その40「IObservableの合成はじめました」

過去記事インデックス

IObservableの合成

ここでは、IObservableクラスの合成について説明します。いままで説明してきたメソッドは単一のIObservableのシーケンスに対して操作を行うものでした。それだけでもThrottleやSkip, Take、BufferメソッドやDistictメソッドなどReactive Extensionsを使う上で重要かつ特徴的なメソッドです。これから紹介するメソッドは、複数のIObservableシーケンスを合成して1つのIObservableのシーケンスを作成するメソッド群です。
Reactive Extensionsでは、時間、イベント、非同期などをIObservableのシーケンスとして扱うことができます。つまり、時間、イベント、非同期などを合成して単一のシーケンスにしたあと、Distinctで重複を排除したりBufferなどでまとめたり、Scanメソッドなどで集計をとったりすることが出来ます。これは、Reactive Extensionsの特徴の1つになります。

Mergeメソッド

ここでは、Mergeメソッドについて説明します。Mergeメソッドは名前の通り複数のIObservableのシーケンスにするメソッドです。オーバーロードはいくつかありますが一番シンプルな2つのIObservableのシーケンスを合成するメソッドのシグネチャを以下に示します。

public static IObservable<T> Merge<T>(
    this IObservable<T> first, 
    IObservable<T> second);

このメソッドは、左辺値のIObservableのシーケンスと右辺値のIObservableのシーケンスを1つのシーケンスとしてマージします。コード例を下記に示します。

Console.WriteLine("# 2つのIObservable<T>の統合");
Observable
    // 0〜2の値を1秒間隔で発行
    .Interval(TimeSpan.FromSeconds(1)).Take(3)
    // 1〜3の値に変換
    .Select(i => i + 1)
    // マージ(統合)
    .Merge(
        Observable
            // 0〜2の値を1秒間隔で発行
            .Interval(TimeSpan.FromSeconds(1)).Take(3)
            // 10, 20, 30に変換
            .Select(i => (i + 1) * 10))
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));
// Enter押すまで待機
Console.ReadLine();

1秒間隔で1, 2, 3の値を発行するIObservableのシーケンスと1秒間隔で10, 20, 30の値を発行するIObservableのシーケンスをMergeメソッドで合成して購読しています。実行結果を以下に示します。

# 2つのIObservable<T>の統合
OnNext: 1
OnNext: 10
OnNext: 2
OnNext: 20
OnNext: 3
OnNext: 30
OnCompleted

上記の結果からMergeメソッドではleftとrightで渡したIObservableのシーケンスから発行された値を単純に後続に流していることが確認出来ます。
次に、複数のIObservableのシーケンスを1つに統合するMergeメソッドのオーバーロードについてみていきます。メソッドのシグネチャを以下に示します。

public static IObservable<T> Merge<T>(params IObservable<T>[] sources);

引数のparamsで渡したIObservableを全てマージします。コード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合");
// Observable.Merge<T>(params IObservable<T>[] sources)
Observable.Merge(
    // 3つのIObservable<T>をマージ
    Observable.Range(0, 3),
    Observable.Range(10, 3),
    Observable.Range(100, 3))
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));
// Enter押すまで待機
Console.ReadLine();

上記のコードは3つのIObservableのシーケンスをMergeメソッドで合成して結果を出力しています。実行結果を以下に示します。

# 複数のIObservable<T>の統合
OnNext: 0
OnNext: 10
OnNext: 100
OnNext: 1
OnNext: 11
OnNext: 101
OnNext: 2
OnNext: 12
OnNext: 102
OnCompleted

3つのIObservableのシーケンスが1つに統合されていることが確認できます。
このメソッドで興味深いオーバーロードとして以下のようなものがあります。

public static IObservable<T> Merge<T>(this IObservable<IObservable<T>> sources);

IObservable>の拡張メソッドとして定義されています。このメソッドを使うと、下記のようにSubjectなどのような型からIObservable>を作成してMergeして1つにつなげるということが出来るようになります。コード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合2");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    // IObservable<int>からIObservable<IObservable<int>>に変換
    .Select(i => Observable
        // 0〜2の値を1秒間隔で3つ発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        // 値を変換
        .Select(l => (l + 1) * i))
    // IObservable<IObservable<T>>からIObservable<T>へマージ
    .Merge()
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("# OnCompleted");
source.OnCompleted();

// Enter押すまで待機
Console.ReadLine();

Subjectから発行された値を元にIntervalメソッドを使って3つの値を発行するIObservable型を生成しています。これに対してMergeメソッドを呼び出しIObservable>を1つのIObservableにならしています。実行結果を以下に示します。

# 複数のIObservable<T>の統合2
# OnNext(10)
# OnNext(100)
# OnNext(1000)
# OnCompleted
OnNext: 10
OnNext: 100
OnNext: 1000
OnNext: 200
OnNext: 20
OnNext: 2000
OnNext: 300
OnNext: 3000
OnNext: 30
OnCompleted

3つのIObservableから発行された値が順不同(実行するごとにOnNext: ***の箇所の順番が変わる)で表示されます。このようにMergeメソッドを使用することで、例えば、あるイベントが発行されたタイミングでWebAPIを非同期に実行して結果を受け取って処理を行うという処理を書くことが出来ます。

SelectManyメソッド

ここではSelectManyメソッドについて説明します。SelectManyメソッドには様々なオーバーロードがありますが、一番使用頻度の高いオーバーロードは以下のものになります。

public static IObservable<R> SelectMany<T, R>(
    this IObservable<T> source, 
    Func<T, IObservable<R>> selector);

sourceから発行された値を受け取ってselectorで渡したデリゲート処理を行いIObservableを取得します。sourceから複数の値が発行されると複数のIObservableが内部で作成されますが、それらをすべて1つのIObservableにマージして後続に流します。つまり、Mergeメソッドの最後のサンプルで示したIObservable>の引数を受け取るオーバーロードと同様のことを内部でしています。コード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(SelectMany)");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    // 発行された値からIObservable<T>を作成してマージ(統合)する
    .SelectMany(i => Observable
        // 0〜2の値を1秒間隔で3つ発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        // 値を変換
        .Select(l => (l + 1) * i))
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("# OnCompleted");
source.OnCompleted();

// Enter押すまで待機
Console.ReadLine();

IObservable>型の引数を受け取るMergeメソッドのオーバーロードと同様の処理をSelectManyメソッドで記述しています。実質SelectManyは下記のようにSelectメソッドとMergeメソッドを使って置き換え出来ます。

SelectMany(i => Observable.Return(i)) → Select(i => Observable.Return(i)).Merge()

実行結果を以下に示します。
>|||
# 複数のIObservableの統合(SelectMany)
# OnNext(10)
# OnNext(100)
# OnNext(1000)
# OnCompleted
OnNext: 10
OnNext: 100
OnNext: 1000
OnNext: 20
OnNext: 200
OnNext: 2000
OnNext: 30
OnNext: 3000
OnNext: 300
OnCompleted
|

Mergeメソッドと同様の結果になっていることが確認できます。
ここで説明したSelectManyメソッドのオーバーライド以外に以下のようなオーバーライドがあります。

// IObservable<T>がIEnumerable<T>になった感じ
public static IObservable<R> SelectMany<T, R>(
    this IObservable<T> source, 
    Func<T, IEnumerable<R>> selector);

// sourceから発行された値に関係なくIObservable<T>を作成する場合
public static IObservable<O> SelectMany<T, O>(
    this IObservable<T> source, 
    IObservable<O> other);

// sourceから発行された値を元にIEnumerable<TCollection>を作り、sourceから発行された値と
// IEnumerable<TCollection>の値を元に結果を作成する。その結果を後続に流す。
public static IObservable<R> SelectMany<T, TCollection, R>(
    this IObservable<T> source, 
    Func<T, IEnumerable<TCollection>> collectionSelector, 
    Func<T, TCollection, R> resultSelector);

// sourceから発行された値を元にIObservable<TCollection>を作り、sourceから発行された値と
// IObservable<TCollection>の値を元に結果を作成する。その結果を後続に流す。
public static IObservable<R> SelectMany<T, TCollection, R>(
    this IObservable<T> source, 
    Func<T, IObservable<TCollection>> collectionSelector, 
    Func<T, TCollection, R> resultSelector);

// onNext, onError, onCompletedのそれぞれでIObservable<R>を作成して、マージして返す。
public static IObservable<R> SelectMany<T, R>(
    this IObservable<T> source, 
    Func<T, IObservable<R>> onNext, 
    Func<Exception, IObservable<R>> onError, 
    Func<IObservable<R>> onCompleted);

全てのオーバーロードについてのコード例はここでは割愛します。ここでは上記のメソッドの4番目のコードの使用例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(SelectManyのオーバーロード)");
var source = new Subject<int>();
source
    .SelectMany(
        // sourceから発行された値を元にi, i+1, i+2の値を発行するIObservable<int>を返す
        i => Observable.Range(i, 3),
        // sourceから発行された値とi => Observable.Range(i, 3)で生成された値を
        // 使って最終的に発行される値を作成する。ここでは匿名型にしただけ。
        (x, y) => new { x, y })
    .Subscribe(Console.WriteLine);

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("# OnCompleted");
source.OnCompleted();

// Enter押すまで待機
Console.ReadLine();

上記のコードの実行結果を以下に示します。

# 複数のIObservable<T>の統合(SelectManyのオーバーロード)
# OnNext(10)
{ x = 10, y = 10 }
{ x = 10, y = 11 }
{ x = 10, y = 12 }
# OnNext(100)
{ x = 100, y = 100 }
{ x = 100, y = 101 }
{ x = 100, y = 102 }
# OnNext(1000)
{ x = 1000, y = 1000 }
{ x = 1000, y = 1001 }
{ x = 1000, y = 1002 }
# OnCompleted

sourceから発行された値と、sourceから発行された値を元に作成した結果のIObservableから発行される値を組み合わせるケースで使用します。

Switchメソッド

ここではSwitchメソッドについて説明します。Switchメソッドは、IObservable Merge(self IObservable> source)と同じシグネチャを持ちます。メソッドの定義を以下に示します。

public static IObservable<T> Switch<T>(this IObservable<IObservable<T>> sources);

Mergeメソッドとの違いを確認するために、MergeメソッドとSwitchメソッドで同じ処理を書いて比較します。まず、Mergeメソッドのコード例を下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(Merge)");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    .Select(i => Observable
        // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        // 値を変換
        .Select(l => (l + 1) * i))
    // 統合
    .Merge()
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnCompleted");
source.OnCompleted();

Console.ReadLine();

若干複雑ですが、時間間隔をあけて複数のIObservableのシーケンスが入り乱れて値を発行するようにしています。実行結果を以下に示します。

# 複数のIObservable<T>の統合(Merge)
# OnNext(10)
Sleep 2000ms...
OnNext: 10
OnNext: 20
# OnNext(100)
Sleep 2000ms...
OnNext: 30
OnNext: 100
# OnNext(1000)
Sleep 2000ms...
OnNext: 200
OnNext: 1000
OnNext: 300
# OnCompleted
OnNext: 2000
OnNext: 3000
OnCompleted

10, 100, 1000の3つの値を発行して、そこからさらに3つの値を発行するIObservableのシーケンスを作成しているので、合計で9つの値が発行されます。このコードのMerge部分をSwitchメソッドに書き換えたコードを下記に示します。

Console.WriteLine("# 複数のIObservable<T>の統合(Switch)");
// IObservable<T> Merge<T>(IObservable<IObservable<T>> source)
var source = new Subject<int>();
source
    .Select(i => Observable
        // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行
        .Interval(TimeSpan.FromSeconds(1)).Take(3)
        .Select(l => (l + 1) * i))
    // 最後
    .Switch()
    .Subscribe(
        i => Console.WriteLine("OnNext: {0}", i),
        () => Console.WriteLine("OnCompleted"));

// 値の発行から完了
Console.WriteLine("# OnNext(10)");
source.OnNext(10);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(100)");
source.OnNext(100);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnNext(1000)");
source.OnNext(1000);
Console.WriteLine("Sleep 2000ms...");
Thread.Sleep(2000);

Console.WriteLine("# OnCompleted");
source.OnCompleted();

Console.ReadLine();

このコードの実行結果を以下に示します。

# 複数のIObservable<T>の統合(Switch)
# OnNext(10)
Sleep 2000ms...
OnNext: 10
OnNext: 20
# OnNext(100)
Sleep 2000ms...
OnNext: 100
# OnNext(1000)
Sleep 2000ms...
OnNext: 1000
# OnCompleted
OnNext: 2000
OnNext: 3000
OnCompleted

上記の実行結果から確認できるように、Mergeの時と異なって9個の値を発行しているにも関わらず、最終的にOnNextまで値が流れているのが6つになっています。これは、Mergeメソッドが元になるIObservableが発行した値を全て後続に流すのに対して、Switchメソッドは、最後に値を発行したIObservableの値を後続に流します。Switchメソッドは、このような特徴から一定間隔で非同期にデータを取得する処理を実行して、最後に応答が返ってきたところからデータを抽出したいといったケースで利用できます。

MVVM Light + ReactiveProperty to @kamebuchi さん

今日はデベロッパーサミット2012にいってきたのですが、そこで抱かれたい男No1で有名な@kamebuchiさんにお会いできました。

訓練されたあじゅらーの人にとっては有名なBlogだと思います。私もAzureの情報を探してていきつくことが多々あります。そんな@kamebuchiさんがMVVM Light + Reactive Propertyで画面にバインドできないということだったので帰宅して試してみます!!

プロジェクトの準備

  • WP7アプリケーションを作成します
  • MVVM LightをNuGetから導入します。
  • Reactive Property for Rx StableをNuGetから導入します

MainViewModelのコードを以下のようにします

using GalaSoft.MvvmLight;
using Codeplex.Reactive;

namespace PhoneApp1.ViewModel
{
    public class MainViewModel : ViewModelBase
    {
        public ReactiveProperty<string> Name { get; private set; }

        public MainViewModel()
        {
            this.Name = new ReactiveProperty<string>("kamebuchi");
        }
    }
}
ビルドするとこんなエラーがデザイナで起きるようになります

これはきっとVisual Studio側の不具合と思われ

ほっといてバインドします。

デザイナが死ぬので手打ち(インテリセンスも死ぬので実力が試される)

<phone:PhoneApplicationPage 
    x:Class="PhoneApp1.MainPage"
    xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
    xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
    xmlns:phone="clr-namespace:Microsoft.Phone.Controls;assembly=Microsoft.Phone"
    xmlns:shell="clr-namespace:Microsoft.Phone.Shell;assembly=Microsoft.Phone"
    xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
    xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
    mc:Ignorable="d" d:DesignWidth="480" d:DesignHeight="768"
    FontFamily="{StaticResource PhoneFontFamilyNormal}"
    FontSize="{StaticResource PhoneFontSizeNormal}"
    Foreground="{StaticResource PhoneForegroundBrush}"
    SupportedOrientations="Portrait" Orientation="Portrait"
    shell:SystemTray.IsVisible="True"
    DataContext="{Binding Source={StaticResource Locator}, Path=Main}">
    <!-- ↑ ViewModelLocatorからViewModelをDataContextにBinding -->
    
    <!--LayoutRoot は、すべてのページ コンテンツが配置されるルート グリッドです-->
    <Grid x:Name="LayoutRoot" Background="Transparent">
        <Grid.RowDefinitions>
            <RowDefinition Height="Auto"/>
            <RowDefinition Height="*"/>
        </Grid.RowDefinitions>

        <!--TitlePanel は、アプリケーション名とページ タイトルを格納します-->
        <StackPanel x:Name="TitlePanel" Grid.Row="0" Margin="12,17,0,28">
            <TextBlock x:Name="ApplicationTitle" Text="マイ アプリケーション" Style="{StaticResource PhoneTextNormalStyle}"/>
            <TextBlock x:Name="PageTitle" Text="ページ名" Margin="9,-7,0,0" Style="{StaticResource PhoneTextTitle1Style}"/>
        </StackPanel>

        <!--ContentPanel - 追加コンテンツをここに入力します-->
        <Grid x:Name="ContentPanel" Grid.Row="1" Margin="12,0,12,0">
            <!-- ReactivePropertyの値をバインド -->
            <TextBlock Text="{Binding Name.Value}"/>
        </Grid>
    </Grid>
</phone:PhoneApplicationPage>
実行するとバインドできてます!

まとめ

ということで、問題なくバインドできました。(IDataErrorInfo継承してるとデザイナが死ぬという問題はありますが…)