Reactive Extensions Team BlogのReactive Extensions v2.0 Beta available now!の記事が凄くアツイです。
前半は、まぁ置いといて途中の「Rx v2.0 and .NET 4.5 “async” / “await” – a better together story」というところからasyncとawaitを使ったプログラムの例がガンガン出てきますが、これが凄い!目から鱗がポロポロしてました。
かる〜く見るだけでも、今後のC#での非同期プログラミングの方向性がなんとなく見えてくるかもしれません!
Reactive Extensions再入門 その47「 Reactive Extensions 入門 ソースコードスニペット集」
ここまでのBlog記事を書くときにコードを書き溜めたソリューションを公開します。きちんと整理されているものではないですが、実際に動かして試せるコードのベースになると思います。Code Recipeからダウンロードできます。
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
- Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
- Reactive Extensions再入門 その43「GroupJoinメソッド」
- Reactive Extensions再入門 その44「And, Then, Whenメソッド」
- Reactive Extensions再入門 その45「Scheduler」
- Reactive Extensions再入門 その46「 Reactive Extensions 入門 」
Reactive Extensions再入門 その46「 Reactive Extensions 入門 」
ここまで、自分のReactive Extensionsの復習もかねて基本的なメソッドの動作を確認するためのプログラムの記録を淡々と書いてきました。わかりにくかったり、図が少なかったり文章が拙かったりするところはあると思いますが、個人的に一区切りだと思ってるところまで書けたので、Blogの記事をまとめてPDFにしたものを公開したいと思います。
Reactive extensions入門v0.1
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
- Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
- Reactive Extensions再入門 その43「GroupJoinメソッド」
- Reactive Extensions再入門 その44「And, Then, Whenメソッド」
- Reactive Extensions再入門 その45「Scheduler」
Reactive Extensions再入門 その45「Scheduler」
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
- Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
- Reactive Extensions再入門 その43「GroupJoinメソッド」
- Reactive Extensions再入門 その44「And, Then, Whenメソッド」
はじめに
ここまでは、IObservableの作成や合成やLINQのメソッドなどについてざ〜っと見てきましたが、今回は毛色を変えてSchedulerという機能についてみていこうと思います。
Scheduler
ここでは、Reactive Extensionsの特徴的な機能の1つSchedulerについて説明します。Reactive Extensionsでは、Schedulerという仕組みを使ってIObservable
public interface IScheduler { DateTimeOffset Now { get; } public IDisposable Schedule<TState>( TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action); public IDisposable Schedule<TState>( TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action); public IDisposable Schedule<TState>( TState state, Func<IScheduler, TState, IDisposable> action); }
ISchedulerインターフェースのNowプロパティがスケジューラ上の現在の時間を表し、ScheduleメソッドでSchedulerで実行する処理を設定します。実際にISchedulerインターフェースを実装することや、ここで定義されているメソッドを使用することは、ほとんどありませんが、基本としては上記のようなものを使用しているという点はなんとなく頭の片隅に入れておいてください。
実行場所の切り替え
Reactive Extensionsは実行場所を制御するためのISchedulerインターフェースの実装クラスがいくつか定義されています。Reactive Extensionsで定義されている実装クラスを以下に示します。
- ImmediateScheduler
- 現在のスレッド上で即座に処理を実行するSchedulerです。
- CurrentThreadScheduler
- 現在のスレッド上で処理を逐次実行するSchedulerです。
- ThreadPoolScheduler
- ThreadPool上で処理を実行するSchedulerです。
- NewThreadScheduler
- 新規スレッドを作り、その上で処理を実行するSchedulerです。
- TaskPoolScheduler
- TPLのTask上で処理を実行するSchedulerです。
上記のSchedulerはSystem.Reactive.Concurrency.Schedulerクラスのstaticプロパティとしてインスタンスが提供されています。ImmediateSchedulerとCurrentThreadSchedulerは、どちらも現在のスレッド上で処理を実行しますが、ImmediateSchedulerが、依頼された処理をその場で実行するのに対して、CurrentThreadSchedulerは、依頼された処理をキューに追加して順次実行していきます。動作の違いを示すコード例を下記に示します。
Console.WriteLine("CurrentThread"); // ChrrentThreadSchedulerで処理を実行 Scheduler.CurrentThread.Schedule(() => { Console.WriteLine("Start"); // 処理の内部でさらに処理をCurrentThreadSchedulerに依頼 Scheduler.CurrentThread.Schedule(() => { Console.WriteLine(" Start"); Console.WriteLine(" End"); }); Console.WriteLine("End"); }); Console.WriteLine("Immediate"); // ImmediateSchedulerで処理を実行 Scheduler.Immediate.Schedule(() => { Console.WriteLine("Start"); // 処理の内部でさらに処理をImmediateSchedulerに依頼 Scheduler.Immediate.Schedule(() => { Console.WriteLine(" Start"); Console.WriteLine(" End"); }); Console.WriteLine("End"); });
CurrentThreadSchedulerとImmediateSchedulerの双方で、Scheduleメソッドを再起呼び出ししています。このコードの実行結果を以下に示します。
CurrentThread Start End Start End Immediate Start Start End End
CurrentThreadSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理が終了してから実行しているのに対して、ImmediateSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理を中断して実行している(通常のメソッド呼び出しをしたのと同じ)ことが確認できます。
IObservable生成時のSchedulerの指定方法
IObservable
Console.WriteLine("Normal"); // 開始のログ Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now); // 1秒後に値を発行する Observable.Timer(TimeSpan.FromSeconds(1)) .Subscribe( i => Console.WriteLine("OnNext({0})", i), () => Console.WriteLine("OnCompleted")); // 終了のログ Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now); // Enterが押されるまで待つ Console.ReadLine(); Console.WriteLine("CurrentThreadScheduler"); // 開始のログ Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now); // 1秒後に値を発行する, 実行場所は現在のスレッド Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.CurrentThread) .Subscribe( i => Console.WriteLine("OnNext({0})", i), () => Console.WriteLine("OnCompleted")); // 終了のログ Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now); // Enterが押されるまで待つ Console.ReadLine();
Timerメソッドを2回呼び出して発行された値をコンソールに出力しています。最初のTimerメソッドの呼び出しは、Schedulerを指定しないものです。2つ目のTimerメソッドの呼び出しではScheduler.CurrentThreadを指定しています。このようにメソッドの最後の引数にSchedulerを渡すことで、生成されたIObservable
Normal 20:25:22: Start!! 20:25:22: End!! OnNext(0) OnCompleted CurrentThreadScheduler 20:25:26: Start!! OnNext(0) OnCompleted 20:25:27: End!!
CurrentThreadSchedulerを指定したほうでは、OnNextとOnCompletedが現在のスレッド上で実行されるため、Start!!とEnd!!の実行順序がSchedulerを指定しない場合と異なっていることが確認できます。
デフォルトのScheduler
TimerメソッドやIntervalメソッドのように、現在のスレッドをブロックしない形のIObservable
Schedulerの切り替え
ここでは、IObservable
ObserveOnメソッド
IObservable
public static IObservable<TSource> ObserveOn<T>( this IObservable<T> source, IScheduler scheduler); public static IObservable<T> ObserveOn<TSource>( this IObservable<T> source, SynchronizationContext context);
ISchedulerを受け取るオーバーロードは、後続の処理を指定したScheduler経由で実行するメソッドです。SynchronizationContextを受け取るオーバーロードは、SynchronizationContext経由で後続の処理を実行するメソッドです。実際には、SynchronizationContextSchedulerというクラスがReactive Extensionsで定義されているので、実際にはObserveOn(new SynchronizationContextScheduler(
// 1秒間隔で3つ値を発行する Observable .Interval(TimeSpan.FromSeconds(1)) .Take(3) // 現在のスレッドIDを表示 .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId)) // 実行スレッドを新しいスレッドに切り替える .ObserveOn(Scheduler.NewThread) // 現在のスレッドIDを表示 .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId)) // 購読して、発行された値とスレッドIDを表示 .Subscribe( i => Console.WriteLine("OnNext({0}), ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId), () => Console.WriteLine("OnCompleted, ThreadId: {0}", Thread.CurrentThread.ManagedThreadId)); Console.ReadLine();
TimerメソッドとTakeメソッドを使って1秒間隔で3つの値を発行しています。そしてObserveOnメソッドを使ってスレッドを新規スレッドに切り替えています。スレッドの切り替え前後でDoメソッドを使ってスレッドIDを表示しています。SubscribeメソッドでもスレッドIDを表示して実行スレッドがどうなっているのか確認しています。実行結果を以下に示します。
ThreadId: 4 ThreadId: 5 OnNext(0), ThreadId: 5 ThreadId: 4 ThreadId: 7 OnNext(1), ThreadId: 7 ThreadId: 6 ThreadId: 8 OnNext(2), ThreadId: 8 OnCompleted, ThreadId: 9
スレッドIDが切り替わっていることが確認できます。このことから、IObservable
ObserveOnの使用用途
実行場所(ここでの例では実行スレッド)の切り替えが、どのような用途で使えるのかについて説明します。代表的な用途では、Windows FormsやWPFなどのようなUIを操作するケースです。たとえば、ボタンクリックのイベントなどをトリガーにバックグラウンドでWeb APIを呼び出して、結果を画面に反映させる操作があるとします。
このとき、イベントの購読とイベントの値の加工程度まではUIスレッドで行い、Web APIを呼び出す箇所はバックグラウンドで行うのがUIをフリーズさせないようにするため最近では一般的な方法です。そして、最後のUIへの結果の反映で、再びUIスレッドに切り替えて処理を行います。これは、UIフレームワークが一般的にUIスレッド以外からの操作に対応していないために必要になります。このように「イベントの発火→非同期での処理の呼び出し→結果の処理」といったReactive Extensionsで1シーケンスで記述できるような処理内でのスレッドの切り替えにObserveOnメソッドは効果を発揮します。
時間を制御するScheduler
Schedulerには、実行場所を制御するもののほかに、時間を制御するタイプのSchedulerがあります。実際の処理内部では、あまり出てくる頻度は高くないですがテスト時に24時間かかる処理を24時間待たずに一瞬で24時間後の状態にするといったことも可能なため、このようなものもあるのだと把握しておいてください。
HistoricalSchedulerクラス
時間を制御するSchedulerクラスとしてHistoricalSchedulerを紹介します。HistoricalSchedulerはAdvancedByメソッドやAdvancedToメソッドを使ってScheduler内の時間を制御します。AdvancedByがTimeSpan型でAdvancedToメソッドがDateTimeOffcetを使って時間を指定します。HistoricalSchedulerクラスの使用例を下記に示します。
var scheduler = new HistoricalScheduler(); // 1秒間隔で値を発行する Observable.Interval(TimeSpan.FromSeconds(1), scheduler) // 購読 .Subscribe( i => Console.WriteLine("OnNext({0})", i), () => Console.WriteLine("OnCompleted")); // HistoricalSchedulerを使ってプログラム内でSchedulerの時間を進める // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservable
AdvanceBy(0.5sec) AdvanceBy(0.5sec) OnNext(0) AdvanceBy(0.5sec) AdvanceBy(0.5sec) OnNext(1) AdvanceBy(0.5sec)
AdvanceByを2回呼んだ時点で値が発行され、さらにAdvanceByを2回呼んだ時点で値が発行されていることが確認できます。このようにReactive Extensionsでは時間をScheduler経由で取得するようにすることで、プログラムから柔軟に時間を制御することができるようになっています。
Reactive Extensions再入門 その44「And, Then, Whenメソッド」
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
- Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
- Reactive Extensions再入門 その43「GroupJoinメソッド」
はじめに
合成系メソッドの紹介も今回で最後の予定です!今回は、個人的に理解にてこずったWhenメソッドと、その周辺メソッド達です。
Whenメソッド
ここではWhenメソッドについて説明します。Whenメソッドのシグネチャを以下に示します。
public static IObservable<TResult> When<TResult>(params Plan<TResult>[] plans); public static IObservable<TResult> When<TResult>(this IEnumerable<Plan<TResult>> plans);
Whenメソッドには2つのオーバーロードがあり、片方がPlan
Planクラスの作成
Whenメソッドの引数に渡すPlan
public static Pattern<TLeft, TRight> And<TLeft, TRight>( this IObservable<TLeft> left, IObservable<TRight> right);
この戻り値のPattern
public Pattern<T1, T2, T3> And<T3>(IObservable<T3> other);
このように、このPattern
Patternクラスには型引数の数の引数を受け取るデリゲートを受け取るThenメソッドが定義されています。Pattern
public Plan<TResult> Then<TResult>(Func<T1, T2, T3, TResult> selector);
このメソッドからPlan
Whenメソッドの使用例
では、上記のAndメソッドとThenメソッドを使ってPlan
var plan = Observable // plan1という文字列を10個発行するIObservable<string> .Return("plan1").Repeat(10) // 1秒間隔で0からのカウントアップにタイムスタンプをつけるIObservable<Timestamped<long>> .And(Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp()) // 100〜110の値を発行するIObservable<int> .And(Observable.Range(100, 10)) // 3つの値を文字列として纏める .Then((planName, timestamped, value) => string.Format("{0} {1} {2}", planName, timestamped, value)); // WhenでPlan<string>からIObservable<string>にして購読 Observable.When(plan).Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
コメントにある通りですが、無限に値を発行し続けるIObservable
OnNext: plan1 0@2012/02/22 23:47:09 +09:00 100 OnNext: plan1 1@2012/02/22 23:47:10 +09:00 101 OnNext: plan1 2@2012/02/22 23:47:11 +09:00 102 OnNext: plan1 3@2012/02/22 23:47:12 +09:00 103 OnNext: plan1 4@2012/02/22 23:47:13 +09:00 104 OnNext: plan1 5@2012/02/22 23:47:14 +09:00 105 OnNext: plan1 6@2012/02/22 23:47:15 +09:00 106 OnNext: plan1 7@2012/02/22 23:47:16 +09:00 107 OnNext: plan1 8@2012/02/22 23:47:17 +09:00 108 OnNext: plan1 9@2012/02/22 23:47:18 +09:00 109 OnCompleted
Andで連結したIObservable
次に、複数のPlan
var plan1 = Observable // plan1という文字列を10個発行するIObservable<string> .Return("plan1").Repeat(10) // 1秒間隔で0からのカウントアップにタイムスタンプをつけるIObservable<Timestamped<long>> .And(Observable.Interval(TimeSpan.FromSeconds(1)).Timestamp()) // 100〜110の値を発行するIObservable<int> .And(Observable.Range(100, 10)) // 3つの値を文字列として纏める .Then((planName, timestamped, value) => string.Format("{0} {1} {2}", planName, timestamped, value)); var plan2 = Observable // plan2という文字列を20個発行するIObservable<string> .Return("plan2").Repeat(20) // 0.5s間隔で0から値を発行していくIObservable<long> .And(Observable.Interval(TimeSpan.FromSeconds(0.5))) // Thenで文字列に纏める .Then((s, l) => string.Format("{0} {1}", s, l)); Observable.When(plan1, plan2).Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted"));
plan1は、最初の例と同じように作成しています。plan2は、20個のplan2という文字列と、0.5秒間隔で0から値をカウントアップしていくIObservable
OnNext: plan2 0 OnNext: plan1 0@2012/02/22 23:53:21 +09:00 100 OnNext: plan2 1 OnNext: plan2 2 OnNext: plan2 3 OnNext: plan1 1@2012/02/22 23:53:22 +09:00 101 OnNext: plan2 4 OnNext: plan2 5 OnNext: plan1 2@2012/02/22 23:53:23 +09:00 102 OnNext: plan2 6 OnNext: plan2 7 OnNext: plan1 3@2012/02/22 23:53:24 +09:00 103 OnNext: plan2 8 OnNext: plan2 9 OnNext: plan1 4@2012/02/22 23:53:25 +09:00 104 OnNext: plan2 10 OnNext: plan2 11 OnNext: plan1 5@2012/02/22 23:53:26 +09:00 105 OnNext: plan2 12 OnNext: plan2 13 OnNext: plan1 6@2012/02/22 23:53:27 +09:00 106 OnNext: plan2 14 OnNext: plan2 15 OnNext: plan1 7@2012/02/22 23:53:28 +09:00 107 OnNext: plan2 16 OnNext: plan2 17 OnNext: plan1 8@2012/02/22 23:53:29 +09:00 108 OnNext: plan2 18 OnNext: plan1 9@2012/02/22 23:53:30 +09:00 109 OnNext: plan2 19 OnCompleted
plan1から発行された値と、plan2から発行された値が混在して表示されていることが確認できます。このことからWhenで繋いだPlan
まとめ
このようにAndメソッドは使ってZipメソッドのように複数のIObservable
恐らく合成系のメソッドの中で一番自由度が高く複雑な合成が出来るメソッドになります。
Reactive Extensions再入門 その43「GroupJoinメソッド」
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
- Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
はじめに
今回はGroupJoinメソッドについてみていきます。使いどころは、まだわからないですが動きはとりあえず把握しておきましょう!
GroupJoinメソッド
ここでは、GroupJoinメソッドについて説明します。GroupJoinメソッドは、2つのIObservable
public static IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>( this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector);
Joinメソッドと、ほぼ同じシグネチャですが、resultSelectorデリゲートの引数に違いがあります。resultSelectorデリゲートの第二引数がJoinメソッドではTRight型だったのに対してGroupJoinメソッドではIObservable
// センサー名 var sensors = new Subject<string>(); // センサーが受信する値 var values = new Subject<int>(); // 値のリセット用Subject var valueReset = new Subject<Unit>(); sensors.GroupJoin( values, // センサーは有効期限無し _ => Observable.Never<Unit>(), // センサーの値はvalueResetのOnNextで無効に出来る _ => valueReset, // センサーの名前と、センサーが受け取った値の現在の合計値を発行するLogにして後続に流す (l, r) => new { Name = l, Log = r.Scan((x, y) => x + y) }) .Subscribe( sensor => { // Logを表示する sensor .Log .Subscribe(i => Console.WriteLine("{0}: {1}", sensor.Name, i)); }, // 完了 () => Console.WriteLine("OnCompleted")); // センサーを2つ登録 Console.WriteLine("sensors.OnNext(sensor1)"); sensors.OnNext("sensor1"); Console.WriteLine("sensors.OnNext(sensor2)"); sensors.OnNext("sensor2"); // 値を3つ発行 Console.WriteLine("values.OnNext(100)"); values.OnNext(100); Console.WriteLine("values.OnNext(10)"); values.OnNext(10); Console.WriteLine("values.OnNext(1)"); values.OnNext(1); // センサーの値を一旦リセット Console.WriteLine("valueReset.OnNext()"); valueReset.OnNext(Unit.Default); // 新しいセンサーを追加 Console.WriteLine("sensors.OnNext(sensor3)"); sensors.OnNext("sensor3"); // 値を3つ発行 Console.WriteLine("values.OnNext(1)"); values.OnNext(1); Console.WriteLine("values.OnNext(2)"); values.OnNext(2); Console.WriteLine("values.OnNext(3)"); values.OnNext(3); // 終了 Console.WriteLine("values.OnCompleted()"); values.OnCompleted(); Console.WriteLine("sensors.OnCompleted()"); sensors.OnCompleted();
コードのイメージはセンサーと、センサーが受信した値の合計値をリアルタイムで表示するプログラムです。センサーが受信する値は任意のタイミング(valueReset変数のOnNext)でリセットできるようにしています。このメソッドの実行結果を以下に示します。
sensors.OnNext(sensor1) sensors.OnNext(sensor2) values.OnNext(100) sensor1: 100 sensor2: 100 values.OnNext(10) sensor1: 110 sensor2: 110 values.OnNext(1) sensor1: 111 sensor2: 111 valueReset.OnNext() sensors.OnNext(sensor3) values.OnNext(1) sensor1: 112 sensor2: 112 sensor3: 1 values.OnNext(2) sensor1: 114 sensor2: 114 sensor3: 3 values.OnNext(3) sensor1: 117 sensor2: 117 sensor3: 6 values.OnCompleted() sensors.OnCompleted() OnCompleted
sensor1とsensor2は、発行された値の合計を全て保持していることが確認できますが、valueResetのOnNextを呼んだ後に追加したsensor3は、途中からの値の合計しか集計していないことが確認できます。
Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
はじめに
今回も、前回に続いて合成する関係のメソッドを使ってみようと思います。
StartWithメソッド
ここでは、StartWithメソッドについて説明します。StartWithメソッドのシグネチャを以下に示します。
public static IObservable<T> StartWith<T>(this IObservable<T> source, params T[] values);
このメソッドは、sourceで渡したIObservable
Observable // 1〜3の値を発行するIObservable<T>のシーケンス .Range(1, 3) // 頭に10, 20, 30をつける .StartWith(10, 20, 30) // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i));
Rangeメソッドを使って1〜3の値を発行するIObservable
OnNext: 10 OnNext: 20 OnNext: 30 OnNext: 1 OnNext: 2 OnNext: 3
Joinメソッド
ここでは、Joinメソッドについて説明します。Joinメソッドは2つのIObservable
public static IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>( this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector);
Joinメソッドは、leftとrightから値が発行されると、それぞれleftDurationSelector引数とrightDurationSelector引数で指定したデリゲートが呼ばれて、その値の有効期間を示すIObservable
このメソッドのコード例を下記に示します。
// Joinで合成するIObservable<T> var left = new Subject<int>(); var right = new Subject<int>(); left.Join( right, // leftから発行される値の有効期間は永久 _ => Observable.Never<Unit>(), // rightから発行される値の有効期間は永久 _ => Observable.Never<Unit>(), // 発行された値の組を作る Tuple.Create) // 購読 .Subscribe( // 組を表示 tuple => Console.WriteLine("Left: {0}, Right: {1}", tuple.Item1, tuple.Item2), // 完了を表示 () => Console.WriteLine("OnCompleted")); // 値の発行 Console.WriteLine("left.OnNext(1)"); left.OnNext(1); Console.WriteLine("right.OnNext(10)"); right.OnNext(10); Console.WriteLine("right.OnNext(100)"); right.OnNext(100); Console.WriteLine("left.OnNext(2)"); left.OnNext(2); // 終了 Console.WriteLine("left.OnCompleted()"); left.OnCompleted(); Console.WriteLine("right.OnCompleted()"); right.OnCompleted();
Joinメソッドを使って2つのIObservable
left.OnNext(1) ← この段階ではleftに1があるだけ right.OnNext(10) ← この段階でleftに1, rightに10があるため(1, 10)の組が生成される Left: 1, Right: 10 right.OnNext(100) ← この段階でleftに1, rightに10, 100があるため (1, 100)の組が追加で生成される Left: 1, Right: 100 left.OnNext(2) ← この段階でleftに1, 2, rightに10, 100があるため(2, 10), (2, 100)の組が追加で生成される Left: 2, Right: 10 Left: 2, Right: 100 left.OnCompleted() right.OnCompleted() OnCompleted ← rightとleftが両方終了したため、これ以降新たな組み合わせが出来ないため終了
この例のように値の有効期間にObservable.Neverを使うと、leftとrightから発行された全ての組合わせを生成することが出来ます。Never以外の値の有効期間を指定したときのコード例を下記に示します。
// Joinで合成するIObservable<T> var left = new Subject<int>(); var right = new Subject<int>(); left.Join( right, // leftから発行される値の有効期間は永久 _ => Observable.Never<Unit>(), // rightから発行される値の有効期間は一瞬 _ => Observable.Empty<Unit>(), // 発行された値の組を作る Tuple.Create) // 購読 .Subscribe( // 組を表示 tuple => Console.WriteLine("Left: {0}, Right: {1}", tuple.Item1, tuple.Item2), // 完了を表示 () => Console.WriteLine("OnCompleted")); // 値の発行 Console.WriteLine("left.OnNext(1)"); left.OnNext(1); Console.WriteLine("right.OnNext(10)"); right.OnNext(10); Console.WriteLine("right.OnNext(100)"); right.OnNext(100); Console.WriteLine("left.OnNext(2)"); left.OnNext(2); Console.WriteLine("right.OnNext(1000)"); right.OnNext(1000); // 終了 Console.WriteLine("left.OnCompleted()"); left.OnCompleted(); Console.WriteLine("right.OnCompleted()"); right.OnCompleted();
上記のコードはrightから発行された値の有効期間をObservable.Empty
left.OnNext(1) right.OnNext(10) Left: 1, Right: 10 right.OnNext(100) Left: 1, Right: 100 left.OnNext(2) right.OnNext(1000) Left: 1, Right: 1000 Left: 2, Right: 1000 left.OnCompleted() right.OnCompleted() OnCompleted
rightから値が発行されると、今までleftから発行された値との組み合わせが生成されていることが確認できます。逆にleftから値が発行されてもrightから発行された値には有効なものが1つもないため、resultSelectorで指定したデリゲートは呼ばれません。
ここでは、コード例を示しませんが、値の有効期限にタイマーやイベントなどから生成したIObservable
Reactive Extensions再入門 その41「どんどん合成するよ」
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
はじめに
前回に引き続き、どんどんIObservable
Concatメソッド
ここでは、Concatメソッドについて説明します。Concatメソッドは、その名前の通り複数のIObservable
// 引数で渡した全てのIObservable<T>のシーケンスを繋ぐ public static IObservable<T> Concat<T>(params IObservable<T>[] sources); // IEnumerable<IObservable<T>>から取得できるすべてのIObservable<T>を繋ぐ public static IObservable<T> Concat<T>(this IEnumerable<IObservable<T>> sources); // IObservable<IObservable<T>>から発行されるすべてのIObservable<T>を繋ぐ public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> sources); // firstとsecondを繋ぐ public static IObservable<T> Concat<T>(this IObservable<T> first, IObservable<T> second);
Concatメソッドの動作を示すためのコード例を下記に示します。
Observable // IObservable<T>のシーケンスを直列につなげる .Concat( // "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "1st: " + i).Take(3), // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "2nd: " + i).Take(3), // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)).Select(i => "3rd: " + i).Take(3)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservable
OnNext: 1st: 0 OnNext: 1st: 1 OnNext: 1st: 2 OnNext: 2nd: 0 OnNext: 2nd: 1 OnNext: 2nd: 2 OnNext: 3rd: 0 OnNext: 3rd: 1 OnNext: 3rd: 2 OnCompleted
前のIObservable
// "1st: 0" 〜 "1st: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => "1st: " + i) .Take(3) .Concat( // "2nd: 0" 〜 "2nd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => "2nd: " + i) .Take(3)) .Concat( // "3rd: 0" 〜 "3rd: 2"までの値を1秒間隔で発行する Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => "3rd: " + i) .Take(3)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
実行結果は同じため省略します。
Zipメソッド
ここでは、Zipメソッドについて説明します。Zipメソッドは、2つのIObservable
public static IObservable<R> Zip<T, U, R>( this IObservable<T> first, IObservable<U> second, Func<T, U, R> resultSelector);
firstから発行された値とsecondから発行された値をresultSelectorで変換して後ろに流します。また、firstかsecondのどちらかのIObservable
// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ Observable.Interval(TimeSpan.FromSeconds(1)).Take(3) // Zipでまとめる .Zip( // 100, 101, 102を発行するIObservable<T>のシーケンス Observable.Range(100, 3), // 渡された値を元に文字列化 (l, r) => string.Format("{0}-{1}", l, r)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
1秒間隔で0, 1, 2の値を発行するIObservable
OnNext: 0-100 OnNext: 1-101 OnNext: 2-102 OnCompleted
Zipメソッドで合成された3つの値がOnNextに渡ってきていることが確認できます。上記のプログラムを少し変更して片方のIObservable
// 1秒間隔で0から値をカウントアップしていくIObservable<T>のシーケンスの最初の3つ Observable.Interval(TimeSpan.FromSeconds(1)).Take(3) // Zipでまとめる .Zip( // 100を発行するIObservable<T>のシーケンス Observable.Return(100), // 渡された値を元に文字列化 (l, r) => string.Format("{0}-{1}", l, r)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); Console.ReadLine();
IntervalメソッドからTake(3)で3つの値を発行するIObservable
OnNext: 0-100 OnCompleted
OnNextが1回しか実行されていないことが確認できます。このことからZipメソッドが、合成対象のIObservable
Ambメソッド
ここではAmbメソッドについて説明します。Ambメソッドは複数のIObservable
// 引数で渡した全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す public static IObservable<T> Amb<T>(params IObservable<T>[] sources); // sources内の全てのIObservable<T>から最初に値を発行したIObservable<T>の値を後ろに流す public static IObservable<T> Amb<T>(this IEnumerable<IObservable<T>> sources); // firstとsecondのうち最初に値を発行したものの値を後ろに流す public static IObservable<T> Amb<T>(this IObservable<T> first, IObservable<T> second);
他の合成系メソッドと同様に可変長引数やIEnumerable
Observable.Amb( // 3秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => "3sec"), // 10秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(10)).Select(_ => "10sec"), // 2秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(2)).Select(_ => "2sec"), // 6秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(6)).Select(_ => "6sec"), // 22秒後に値を発行するIO<T> Observable.Timer(TimeSpan.FromSeconds(22)).Select(_ => "22sec")) .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted"));
Timerメソッドを使って、指定した時間が経過した後に、値を発行するIObservable
OnNext: 2sec OnCompleted
引数で渡した中で一番早く値を発行するObservable.Timer(TimeSpan.FromSecond(2)).Select(_ => “2sec”)の結果がOnNextやOnCompletedに渡っていることが確認できます。
CombineLatestメソッド
ここでは、CombineLatestメソッドについて説明します。CombineLatestメソッドはZipメソッドのように2つのIObservable
public static IObservable<R> CombineLatest<F, S, R>( this IObservable<F> first, IObservable<S> second, Func<F, S, R> resultSelector);
このメソッドのコード例を下記に示します。
var source1 = new Subject<string>(); var source2 = new Subject<int>(); source1 // source1とsource2を合成 .CombineLatest( source2, // 発行された値を結合して文字列化 (l, r) => string.Format("{0}-{1}", l, r)) // 購読 .Subscribe( s => Console.WriteLine("OnNext: {0}", s), () => Console.WriteLine("OnCompleted")); // 適当に値を発行する Console.WriteLine("source1.OnNext(foo)"); source1.OnNext("foo"); Console.WriteLine("source2.OnNext(100)"); source2.OnNext(100); Console.WriteLine("source2.OnNext(200)"); source2.OnNext(200); Console.WriteLine("source1.OnNext(bar)"); source1.OnNext("bar"); Console.WriteLine("source1.OnNext(boo)"); source1.OnNext("boo"); // source1完了 Console.WriteLine("source1.OnCompleted()"); source1.OnCompleted(); // source1完了後にsource2から値を発行する Console.WriteLine("source2.OnNext(200)"); source2.OnNext(999); // source2完了 Console.WriteLine("source2.OnCompleted()"); source2.OnCompleted();
このコードの実行結果を以下に示します。
source1.OnNext(foo) source2.OnNext(100) OnNext: foo-100 source2.OnNext(200) OnNext: foo-200 source1.OnNext(bar) OnNext: bar-200 source1.OnNext(boo) OnNext: boo-200 source1.OnCompleted() source2.OnNext(200) OnNext: boo-999 source2.OnCompleted() OnCompleted
最初のOnNextでは、source1とsource2の値が両方発行されるまで実行されませんが、それ以降は、source1かsource2のどちらかから値が発行されるとOnNextに値が発行されていることが確認できます。このことから、CombineLatestメソッドの特徴である、最後に発行された値をキャッシュしているということがわかります。そして、CombineLatestメソッドは両方のシーケンスが完了状態になった時点で完了通知を行うという動作も確認できます。これは、片方が完了しても、もう一方から値が発行されたら最後に発行された値を元に処理を継続できるためです。
Reactive Extensions再入門 その40「IObservableの合成はじめました」
過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
IObservableの合成
ここでは、IObservable
Reactive Extensionsでは、時間、イベント、非同期などをIObservable
Mergeメソッド
ここでは、Mergeメソッドについて説明します。Mergeメソッドは名前の通り複数のIObservable
public static IObservable<T> Merge<T>( this IObservable<T> first, IObservable<T> second);
このメソッドは、左辺値のIObservable
Console.WriteLine("# 2つのIObservable<T>の統合"); Observable // 0〜2の値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 1〜3の値に変換 .Select(i => i + 1) // マージ(統合) .Merge( Observable // 0〜2の値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 10, 20, 30に変換 .Select(i => (i + 1) * 10)) // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // Enter押すまで待機 Console.ReadLine();
1秒間隔で1, 2, 3の値を発行するIObservable
# 2つのIObservable<T>の統合 OnNext: 1 OnNext: 10 OnNext: 2 OnNext: 20 OnNext: 3 OnNext: 30 OnCompleted
上記の結果からMergeメソッドではleftとrightで渡したIObservable
次に、複数のIObservable
public static IObservable<T> Merge<T>(params IObservable<T>[] sources);
引数のparamsで渡したIObservable
Console.WriteLine("# 複数のIObservable<T>の統合"); // Observable.Merge<T>(params IObservable<T>[] sources) Observable.Merge( // 3つのIObservable<T>をマージ Observable.Range(0, 3), Observable.Range(10, 3), Observable.Range(100, 3)) // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // Enter押すまで待機 Console.ReadLine();
上記のコードは3つのIObservable
# 複数のIObservable<T>の統合 OnNext: 0 OnNext: 10 OnNext: 100 OnNext: 1 OnNext: 11 OnNext: 101 OnNext: 2 OnNext: 12 OnNext: 102 OnCompleted
3つのIObservable
このメソッドで興味深いオーバーロードとして以下のようなものがあります。
public static IObservable<T> Merge<T>(this IObservable<IObservable<T>> sources);
IObservable
Console.WriteLine("# 複数のIObservable<T>の統合2"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source // IObservable<int>からIObservable<IObservable<int>>に変換 .Select(i => Observable // 0〜2の値を1秒間隔で3つ発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 値を変換 .Select(l => (l + 1) * i)) // IObservable<IObservable<T>>からIObservable<T>へマージ .Merge() // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); // Enter押すまで待機 Console.ReadLine();
Subject
# 複数のIObservable<T>の統合2 # OnNext(10) # OnNext(100) # OnNext(1000) # OnCompleted OnNext: 10 OnNext: 100 OnNext: 1000 OnNext: 200 OnNext: 20 OnNext: 2000 OnNext: 300 OnNext: 3000 OnNext: 30 OnCompleted
3つのIObservable
SelectManyメソッド
ここではSelectManyメソッドについて説明します。SelectManyメソッドには様々なオーバーロードがありますが、一番使用頻度の高いオーバーロードは以下のものになります。
public static IObservable<R> SelectMany<T, R>( this IObservable<T> source, Func<T, IObservable<R>> selector);
sourceから発行された値を受け取ってselectorで渡したデリゲート処理を行いIObservable
Console.WriteLine("# 複数のIObservable<T>の統合(SelectMany)"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source // 発行された値からIObservable<T>を作成してマージ(統合)する .SelectMany(i => Observable // 0〜2の値を1秒間隔で3つ発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 値を変換 .Select(l => (l + 1) * i)) // 購読 .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); // Enter押すまで待機 Console.ReadLine();
IObservable
SelectMany(i => Observable.Return(i)) → Select(i => Observable.Return(i)).Merge()
実行結果を以下に示します。
>|||
# 複数のIObservable
# OnNext(10)
# OnNext(100)
# OnNext(1000)
# OnCompleted
OnNext: 10
OnNext: 100
OnNext: 1000
OnNext: 20
OnNext: 200
OnNext: 2000
OnNext: 30
OnNext: 3000
OnNext: 300
OnCompleted
|
Mergeメソッドと同様の結果になっていることが確認できます。
ここで説明したSelectManyメソッドのオーバーライド以外に以下のようなオーバーライドがあります。
// IObservable<T>がIEnumerable<T>になった感じ public static IObservable<R> SelectMany<T, R>( this IObservable<T> source, Func<T, IEnumerable<R>> selector); // sourceから発行された値に関係なくIObservable<T>を作成する場合 public static IObservable<O> SelectMany<T, O>( this IObservable<T> source, IObservable<O> other); // sourceから発行された値を元にIEnumerable<TCollection>を作り、sourceから発行された値と // IEnumerable<TCollection>の値を元に結果を作成する。その結果を後続に流す。 public static IObservable<R> SelectMany<T, TCollection, R>( this IObservable<T> source, Func<T, IEnumerable<TCollection>> collectionSelector, Func<T, TCollection, R> resultSelector); // sourceから発行された値を元にIObservable<TCollection>を作り、sourceから発行された値と // IObservable<TCollection>の値を元に結果を作成する。その結果を後続に流す。 public static IObservable<R> SelectMany<T, TCollection, R>( this IObservable<T> source, Func<T, IObservable<TCollection>> collectionSelector, Func<T, TCollection, R> resultSelector); // onNext, onError, onCompletedのそれぞれでIObservable<R>を作成して、マージして返す。 public static IObservable<R> SelectMany<T, R>( this IObservable<T> source, Func<T, IObservable<R>> onNext, Func<Exception, IObservable<R>> onError, Func<IObservable<R>> onCompleted);
全てのオーバーロードについてのコード例はここでは割愛します。ここでは上記のメソッドの4番目のコードの使用例を下記に示します。
Console.WriteLine("# 複数のIObservable<T>の統合(SelectManyのオーバーロード)"); var source = new Subject<int>(); source .SelectMany( // sourceから発行された値を元にi, i+1, i+2の値を発行するIObservable<int>を返す i => Observable.Range(i, 3), // sourceから発行された値とi => Observable.Range(i, 3)で生成された値を // 使って最終的に発行される値を作成する。ここでは匿名型にしただけ。 (x, y) => new { x, y }) .Subscribe(Console.WriteLine); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); // Enter押すまで待機 Console.ReadLine();
上記のコードの実行結果を以下に示します。
# 複数のIObservable<T>の統合(SelectManyのオーバーロード) # OnNext(10) { x = 10, y = 10 } { x = 10, y = 11 } { x = 10, y = 12 } # OnNext(100) { x = 100, y = 100 } { x = 100, y = 101 } { x = 100, y = 102 } # OnNext(1000) { x = 1000, y = 1000 } { x = 1000, y = 1001 } { x = 1000, y = 1002 } # OnCompleted
sourceから発行された値と、sourceから発行された値を元に作成した結果のIObservable
Switchメソッド
ここではSwitchメソッドについて説明します。Switchメソッドは、IObservable
public static IObservable<T> Switch<T>(this IObservable<IObservable<T>> sources);
Mergeメソッドとの違いを確認するために、MergeメソッドとSwitchメソッドで同じ処理を書いて比較します。まず、Mergeメソッドのコード例を下記に示します。
Console.WriteLine("# 複数のIObservable<T>の統合(Merge)"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source .Select(i => Observable // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) // 値を変換 .Select(l => (l + 1) * i)) // 統合 .Merge() .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); Console.ReadLine();
若干複雑ですが、時間間隔をあけて複数のIObservable
# 複数のIObservable<T>の統合(Merge) # OnNext(10) Sleep 2000ms... OnNext: 10 OnNext: 20 # OnNext(100) Sleep 2000ms... OnNext: 30 OnNext: 100 # OnNext(1000) Sleep 2000ms... OnNext: 200 OnNext: 1000 OnNext: 300 # OnCompleted OnNext: 2000 OnNext: 3000 OnCompleted
10, 100, 1000の3つの値を発行して、そこからさらに3つの値を発行するIObservable
Console.WriteLine("# 複数のIObservable<T>の統合(Switch)"); // IObservable<T> Merge<T>(IObservable<IObservable<T>> source) var source = new Subject<int>(); source .Select(i => Observable // 1 * i, 2 * i, 3 * iの値を1秒間隔で発行 .Interval(TimeSpan.FromSeconds(1)).Take(3) .Select(l => (l + 1) * i)) // 最後 .Switch() .Subscribe( i => Console.WriteLine("OnNext: {0}", i), () => Console.WriteLine("OnCompleted")); // 値の発行から完了 Console.WriteLine("# OnNext(10)"); source.OnNext(10); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(100)"); source.OnNext(100); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnNext(1000)"); source.OnNext(1000); Console.WriteLine("Sleep 2000ms..."); Thread.Sleep(2000); Console.WriteLine("# OnCompleted"); source.OnCompleted(); Console.ReadLine();
このコードの実行結果を以下に示します。
# 複数のIObservable<T>の統合(Switch) # OnNext(10) Sleep 2000ms... OnNext: 10 OnNext: 20 # OnNext(100) Sleep 2000ms... OnNext: 100 # OnNext(1000) Sleep 2000ms... OnNext: 1000 # OnCompleted OnNext: 2000 OnNext: 3000 OnCompleted
上記の実行結果から確認できるように、Mergeの時と異なって9個の値を発行しているにも関わらず、最終的にOnNextまで値が流れているのが6つになっています。これは、Mergeメソッドが元になるIObservable
MVVM Light + ReactiveProperty to @kamebuchi さん
今日はデベロッパーサミット2012にいってきたのですが、そこで抱かれたい男No1で有名な@kamebuchiさんにお会いできました。
- ブログ:ブチザッキ
訓練されたあじゅらーの人にとっては有名なBlogだと思います。私もAzureの情報を探してていきつくことが多々あります。そんな@kamebuchiさんがMVVM Light + Reactive Propertyで画面にバインドできないということだったので帰宅して試してみます!!
プロジェクトの準備
- WP7アプリケーションを作成します
- MVVM LightをNuGetから導入します。
- Reactive Property for Rx StableをNuGetから導入します
MainViewModelのコードを以下のようにします
using GalaSoft.MvvmLight; using Codeplex.Reactive; namespace PhoneApp1.ViewModel { public class MainViewModel : ViewModelBase { public ReactiveProperty<string> Name { get; private set; } public MainViewModel() { this.Name = new ReactiveProperty<string>("kamebuchi"); } } }
ビルドするとこんなエラーがデザイナで起きるようになります
ほっといてバインドします。
デザイナが死ぬので手打ち(インテリセンスも死ぬので実力が試される)
<phone:PhoneApplicationPage x:Class="PhoneApp1.MainPage" xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:phone="clr-namespace:Microsoft.Phone.Controls;assembly=Microsoft.Phone" xmlns:shell="clr-namespace:Microsoft.Phone.Shell;assembly=Microsoft.Phone" xmlns:d="http://schemas.microsoft.com/expression/blend/2008" xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" mc:Ignorable="d" d:DesignWidth="480" d:DesignHeight="768" FontFamily="{StaticResource PhoneFontFamilyNormal}" FontSize="{StaticResource PhoneFontSizeNormal}" Foreground="{StaticResource PhoneForegroundBrush}" SupportedOrientations="Portrait" Orientation="Portrait" shell:SystemTray.IsVisible="True" DataContext="{Binding Source={StaticResource Locator}, Path=Main}"> <!-- ↑ ViewModelLocatorからViewModelをDataContextにBinding --> <!--LayoutRoot は、すべてのページ コンテンツが配置されるルート グリッドです--> <Grid x:Name="LayoutRoot" Background="Transparent"> <Grid.RowDefinitions> <RowDefinition Height="Auto"/> <RowDefinition Height="*"/> </Grid.RowDefinitions> <!--TitlePanel は、アプリケーション名とページ タイトルを格納します--> <StackPanel x:Name="TitlePanel" Grid.Row="0" Margin="12,17,0,28"> <TextBlock x:Name="ApplicationTitle" Text="マイ アプリケーション" Style="{StaticResource PhoneTextNormalStyle}"/> <TextBlock x:Name="PageTitle" Text="ページ名" Margin="9,-7,0,0" Style="{StaticResource PhoneTextTitle1Style}"/> </StackPanel> <!--ContentPanel - 追加コンテンツをここに入力します--> <Grid x:Name="ContentPanel" Grid.Row="1" Margin="12,0,12,0"> <!-- ReactivePropertyの値をバインド --> <TextBlock Text="{Binding Name.Value}"/> </Grid> </Grid> </phone:PhoneApplicationPage>
まとめ
ということで、問題なくバインドできました。(IDataErrorInfo継承してるとデザイナが死ぬという問題はありますが…)