過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
- Reactive Extensions再入門 その8「SkipとTakeメソッド」
- Reactive Extensions再入門 その9「Skip + Take + Repeat = ドラッグ」
- Reactive Extensions再入門 その10「Doメソッド」
- Reactive Extensions再入門 その11「Catchメソッド」
- Reactive Extensions再入門 その12「Finallyメソッドとリソース解放」
- Reactive Extensions再入門 その13「最後の値を取得するLatestとMostRecentメソッド」
- Reactive Extensions再入門 その14「Nextメソッド」
- Reactive Extensions再入門 その15「To*****系メソッド」
- Reactive Extensions再入門 その16「最大、最少、平均を求めるメソッド」
- Reactive Extensions再入門 その17「集計するメソッド」
- Reactive Extensions再入門 その18「CountメソッドとLongCountメソッド」
- Reactive Extensions再入門 その19「AnyメソッドとAllメソッド」
- Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」
- Reactive Extensions再入門 その21「GroupByUntilメソッド」
- Reactive Extensions再入門 その22「単一の値を取得するメソッド」
- Reactive Extensions再入門 その23「重複を排除するメソッド」
- Reactive Extensions再入門 その24「単一の値を取得するメソッド その2」
- Reactive Extensions再入門 その25「値をまとめるBufferメソッド」
- Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
- Reactive Extensions再入門 その27「時間でフィルタリング?Sampleメソッド」
- Reactive Extensions再入門 その28「落ち着いたら流すThrottleメソッド」
- Reactive Extensions再入門 その29「値を指定した時間だけ遅延させるDelayメソッド」
- Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」
- Reactive Extensions再入門 その31「時間に関する情報を付与するTimestampとTimeIntervalメソッド」
- Reactive Extensions再入門 その32「型変換を行うCastとOfTypeメソッド」
- Reactive Extensions再入門 その33「シーケンスの最後を起点にSkipとTake」
- Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」
- Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」
- Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと参照カウンタ?RefCountメソッド」
- Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」
- Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」
- Reactive Extensions再入門 その39「Subject系クラス」
- Reactive Extensions再入門 その40「IObservableの合成はじめました」
- Reactive Extensions再入門 その41「どんどん合成するよ」
- Reactive Extensions再入門 その42「StartWithメソッドとJoinメソッド」
- Reactive Extensions再入門 その43「GroupJoinメソッド」
- Reactive Extensions再入門 その44「And, Then, Whenメソッド」
はじめに
ここまでは、IObservableの作成や合成やLINQのメソッドなどについてざ〜っと見てきましたが、今回は毛色を変えてSchedulerという機能についてみていこうと思います。
Scheduler
ここでは、Reactive Extensionsの特徴的な機能の1つSchedulerについて説明します。Reactive Extensionsでは、Schedulerという仕組みを使ってIObservable
public interface IScheduler { DateTimeOffset Now { get; } public IDisposable Schedule<TState>( TState state, DateTimeOffset dueTime, Func<IScheduler, TState, IDisposable> action); public IDisposable Schedule<TState>( TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action); public IDisposable Schedule<TState>( TState state, Func<IScheduler, TState, IDisposable> action); }
ISchedulerインターフェースのNowプロパティがスケジューラ上の現在の時間を表し、ScheduleメソッドでSchedulerで実行する処理を設定します。実際にISchedulerインターフェースを実装することや、ここで定義されているメソッドを使用することは、ほとんどありませんが、基本としては上記のようなものを使用しているという点はなんとなく頭の片隅に入れておいてください。
実行場所の切り替え
Reactive Extensionsは実行場所を制御するためのISchedulerインターフェースの実装クラスがいくつか定義されています。Reactive Extensionsで定義されている実装クラスを以下に示します。
- ImmediateScheduler
- 現在のスレッド上で即座に処理を実行するSchedulerです。
- CurrentThreadScheduler
- 現在のスレッド上で処理を逐次実行するSchedulerです。
- ThreadPoolScheduler
- ThreadPool上で処理を実行するSchedulerです。
- NewThreadScheduler
- 新規スレッドを作り、その上で処理を実行するSchedulerです。
- TaskPoolScheduler
- TPLのTask上で処理を実行するSchedulerです。
上記のSchedulerはSystem.Reactive.Concurrency.Schedulerクラスのstaticプロパティとしてインスタンスが提供されています。ImmediateSchedulerとCurrentThreadSchedulerは、どちらも現在のスレッド上で処理を実行しますが、ImmediateSchedulerが、依頼された処理をその場で実行するのに対して、CurrentThreadSchedulerは、依頼された処理をキューに追加して順次実行していきます。動作の違いを示すコード例を下記に示します。
Console.WriteLine("CurrentThread"); // ChrrentThreadSchedulerで処理を実行 Scheduler.CurrentThread.Schedule(() => { Console.WriteLine("Start"); // 処理の内部でさらに処理をCurrentThreadSchedulerに依頼 Scheduler.CurrentThread.Schedule(() => { Console.WriteLine(" Start"); Console.WriteLine(" End"); }); Console.WriteLine("End"); }); Console.WriteLine("Immediate"); // ImmediateSchedulerで処理を実行 Scheduler.Immediate.Schedule(() => { Console.WriteLine("Start"); // 処理の内部でさらに処理をImmediateSchedulerに依頼 Scheduler.Immediate.Schedule(() => { Console.WriteLine(" Start"); Console.WriteLine(" End"); }); Console.WriteLine("End"); });
CurrentThreadSchedulerとImmediateSchedulerの双方で、Scheduleメソッドを再起呼び出ししています。このコードの実行結果を以下に示します。
CurrentThread Start End Start End Immediate Start Start End End
CurrentThreadSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理が終了してから実行しているのに対して、ImmediateSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理を中断して実行している(通常のメソッド呼び出しをしたのと同じ)ことが確認できます。
IObservable生成時のSchedulerの指定方法
IObservable
Console.WriteLine("Normal"); // 開始のログ Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now); // 1秒後に値を発行する Observable.Timer(TimeSpan.FromSeconds(1)) .Subscribe( i => Console.WriteLine("OnNext({0})", i), () => Console.WriteLine("OnCompleted")); // 終了のログ Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now); // Enterが押されるまで待つ Console.ReadLine(); Console.WriteLine("CurrentThreadScheduler"); // 開始のログ Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now); // 1秒後に値を発行する, 実行場所は現在のスレッド Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.CurrentThread) .Subscribe( i => Console.WriteLine("OnNext({0})", i), () => Console.WriteLine("OnCompleted")); // 終了のログ Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now); // Enterが押されるまで待つ Console.ReadLine();
Timerメソッドを2回呼び出して発行された値をコンソールに出力しています。最初のTimerメソッドの呼び出しは、Schedulerを指定しないものです。2つ目のTimerメソッドの呼び出しではScheduler.CurrentThreadを指定しています。このようにメソッドの最後の引数にSchedulerを渡すことで、生成されたIObservable
Normal 20:25:22: Start!! 20:25:22: End!! OnNext(0) OnCompleted CurrentThreadScheduler 20:25:26: Start!! OnNext(0) OnCompleted 20:25:27: End!!
CurrentThreadSchedulerを指定したほうでは、OnNextとOnCompletedが現在のスレッド上で実行されるため、Start!!とEnd!!の実行順序がSchedulerを指定しない場合と異なっていることが確認できます。
デフォルトのScheduler
TimerメソッドやIntervalメソッドのように、現在のスレッドをブロックしない形のIObservable
Schedulerの切り替え
ここでは、IObservable
ObserveOnメソッド
IObservable
public static IObservable<TSource> ObserveOn<T>( this IObservable<T> source, IScheduler scheduler); public static IObservable<T> ObserveOn<TSource>( this IObservable<T> source, SynchronizationContext context);
ISchedulerを受け取るオーバーロードは、後続の処理を指定したScheduler経由で実行するメソッドです。SynchronizationContextを受け取るオーバーロードは、SynchronizationContext経由で後続の処理を実行するメソッドです。実際には、SynchronizationContextSchedulerというクラスがReactive Extensionsで定義されているので、実際にはObserveOn(new SynchronizationContextScheduler(
// 1秒間隔で3つ値を発行する Observable .Interval(TimeSpan.FromSeconds(1)) .Take(3) // 現在のスレッドIDを表示 .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId)) // 実行スレッドを新しいスレッドに切り替える .ObserveOn(Scheduler.NewThread) // 現在のスレッドIDを表示 .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId)) // 購読して、発行された値とスレッドIDを表示 .Subscribe( i => Console.WriteLine("OnNext({0}), ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId), () => Console.WriteLine("OnCompleted, ThreadId: {0}", Thread.CurrentThread.ManagedThreadId)); Console.ReadLine();
TimerメソッドとTakeメソッドを使って1秒間隔で3つの値を発行しています。そしてObserveOnメソッドを使ってスレッドを新規スレッドに切り替えています。スレッドの切り替え前後でDoメソッドを使ってスレッドIDを表示しています。SubscribeメソッドでもスレッドIDを表示して実行スレッドがどうなっているのか確認しています。実行結果を以下に示します。
ThreadId: 4 ThreadId: 5 OnNext(0), ThreadId: 5 ThreadId: 4 ThreadId: 7 OnNext(1), ThreadId: 7 ThreadId: 6 ThreadId: 8 OnNext(2), ThreadId: 8 OnCompleted, ThreadId: 9
スレッドIDが切り替わっていることが確認できます。このことから、IObservable
ObserveOnの使用用途
実行場所(ここでの例では実行スレッド)の切り替えが、どのような用途で使えるのかについて説明します。代表的な用途では、Windows FormsやWPFなどのようなUIを操作するケースです。たとえば、ボタンクリックのイベントなどをトリガーにバックグラウンドでWeb APIを呼び出して、結果を画面に反映させる操作があるとします。
このとき、イベントの購読とイベントの値の加工程度まではUIスレッドで行い、Web APIを呼び出す箇所はバックグラウンドで行うのがUIをフリーズさせないようにするため最近では一般的な方法です。そして、最後のUIへの結果の反映で、再びUIスレッドに切り替えて処理を行います。これは、UIフレームワークが一般的にUIスレッド以外からの操作に対応していないために必要になります。このように「イベントの発火→非同期での処理の呼び出し→結果の処理」といったReactive Extensionsで1シーケンスで記述できるような処理内でのスレッドの切り替えにObserveOnメソッドは効果を発揮します。
時間を制御するScheduler
Schedulerには、実行場所を制御するもののほかに、時間を制御するタイプのSchedulerがあります。実際の処理内部では、あまり出てくる頻度は高くないですがテスト時に24時間かかる処理を24時間待たずに一瞬で24時間後の状態にするといったことも可能なため、このようなものもあるのだと把握しておいてください。
HistoricalSchedulerクラス
時間を制御するSchedulerクラスとしてHistoricalSchedulerを紹介します。HistoricalSchedulerはAdvancedByメソッドやAdvancedToメソッドを使ってScheduler内の時間を制御します。AdvancedByがTimeSpan型でAdvancedToメソッドがDateTimeOffcetを使って時間を指定します。HistoricalSchedulerクラスの使用例を下記に示します。
var scheduler = new HistoricalScheduler(); // 1秒間隔で値を発行する Observable.Interval(TimeSpan.FromSeconds(1), scheduler) // 購読 .Subscribe( i => Console.WriteLine("OnNext({0})", i), () => Console.WriteLine("OnCompleted")); // HistoricalSchedulerを使ってプログラム内でSchedulerの時間を進める // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5)); // 0.5sec時間を進める Console.WriteLine("AdvanceBy(0.5sec)"); scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservable
AdvanceBy(0.5sec) AdvanceBy(0.5sec) OnNext(0) AdvanceBy(0.5sec) AdvanceBy(0.5sec) OnNext(1) AdvanceBy(0.5sec)
AdvanceByを2回呼んだ時点で値が発行され、さらにAdvanceByを2回呼んだ時点で値が発行されていることが確認できます。このようにReactive Extensionsでは時間をScheduler経由で取得するようにすることで、プログラムから柔軟に時間を制御することができるようになっています。