かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive Extensions再入門 その45「Scheduler」

過去記事インデックス

はじめに

ここまでは、IObservableの作成や合成やLINQのメソッドなどについてざ〜っと見てきましたが、今回は毛色を変えてSchedulerという機能についてみていこうと思います。

Scheduler

ここでは、Reactive Extensionsの特徴的な機能の1つSchedulerについて説明します。Reactive Extensionsでは、Schedulerという仕組みを使ってIObservableのシーケンスの処理の実行場所と時間を自在に制御することができます。この機能を提供するためにReactive Extensionsでは、ISchedulerというインターフェースを定義しています。ISchedulerインターフェースは下記のような1つのプロパティと3つのメソッドを持つシンプルなインターフェースです。

public interface IScheduler
{
    DateTimeOffset Now { get; }

    public IDisposable Schedule<TState>(
        TState state, 
        DateTimeOffset dueTime, 
        Func<IScheduler, TState, IDisposable> action);

    public IDisposable Schedule<TState>(
        TState state, 
        TimeSpan dueTime, 
        Func<IScheduler, TState, IDisposable> action);

    public IDisposable Schedule<TState>(
        TState state, 
        Func<IScheduler, TState, IDisposable> action);
}

ISchedulerインターフェースのNowプロパティがスケジューラ上の現在の時間を表し、ScheduleメソッドでSchedulerで実行する処理を設定します。実際にISchedulerインターフェースを実装することや、ここで定義されているメソッドを使用することは、ほとんどありませんが、基本としては上記のようなものを使用しているという点はなんとなく頭の片隅に入れておいてください。

実行場所の切り替え

Reactive Extensionsは実行場所を制御するためのISchedulerインターフェースの実装クラスがいくつか定義されています。Reactive Extensionsで定義されている実装クラスを以下に示します。

  • ImmediateScheduler
    • 現在のスレッド上で即座に処理を実行するSchedulerです。
  • CurrentThreadScheduler
    • 現在のスレッド上で処理を逐次実行するSchedulerです。
  • ThreadPoolScheduler
    • ThreadPool上で処理を実行するSchedulerです。
  • NewThreadScheduler
    • 新規スレッドを作り、その上で処理を実行するSchedulerです。
  • TaskPoolScheduler
    • TPLのTask上で処理を実行するSchedulerです。

上記のSchedulerはSystem.Reactive.Concurrency.Schedulerクラスのstaticプロパティとしてインスタンスが提供されています。ImmediateSchedulerとCurrentThreadSchedulerは、どちらも現在のスレッド上で処理を実行しますが、ImmediateSchedulerが、依頼された処理をその場で実行するのに対して、CurrentThreadSchedulerは、依頼された処理をキューに追加して順次実行していきます。動作の違いを示すコード例を下記に示します。

Console.WriteLine("CurrentThread");
// ChrrentThreadSchedulerで処理を実行
Scheduler.CurrentThread.Schedule(() =>
{
    Console.WriteLine("Start");
    // 処理の内部でさらに処理をCurrentThreadSchedulerに依頼
    Scheduler.CurrentThread.Schedule(() =>
    {
        Console.WriteLine("    Start");
        Console.WriteLine("    End");
    });
    Console.WriteLine("End");
});

Console.WriteLine("Immediate");
// ImmediateSchedulerで処理を実行
Scheduler.Immediate.Schedule(() =>
{
    Console.WriteLine("Start");
    // 処理の内部でさらに処理をImmediateSchedulerに依頼
    Scheduler.Immediate.Schedule(() =>
    {
        Console.WriteLine("    Start");
        Console.WriteLine("    End");
    });
    Console.WriteLine("End");
});

CurrentThreadSchedulerとImmediateSchedulerの双方で、Scheduleメソッドを再起呼び出ししています。このコードの実行結果を以下に示します。

CurrentThread
Start
End
    Start
    End
Immediate
Start
    Start
    End
End

CurrentThreadSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理が終了してから実行しているのに対して、ImmediateSchedulerが、Scheduleメソッドで登録した処理を現在実行中の処理を中断して実行している(通常のメソッド呼び出しをしたのと同じ)ことが確認できます。

IObservable生成時のSchedulerの指定方法

IObservableのシーケンスを生成するメソッドの大半にはISchedulerインターフェースを受け取るオーバーロードがあります。このオーバーロードにISchedulerの実装クラスのインスタンスを渡すことで実行場所を設定できます。Timerメソッドを例にスケジューラの指定方法を示します。コード例を下記に示します。

Console.WriteLine("Normal");
// 開始のログ
Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now);
// 1秒後に値を発行する
Observable.Timer(TimeSpan.FromSeconds(1))
    .Subscribe(
        i => Console.WriteLine("OnNext({0})", i),
        () => Console.WriteLine("OnCompleted"));
// 終了のログ
Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now);
// Enterが押されるまで待つ
Console.ReadLine();

Console.WriteLine("CurrentThreadScheduler");
// 開始のログ
Console.WriteLine("{0:HH:mm:ss}: Start!!", DateTime.Now);
// 1秒後に値を発行する, 実行場所は現在のスレッド
Observable.Timer(TimeSpan.FromSeconds(1), Scheduler.CurrentThread)
    .Subscribe(
        i => Console.WriteLine("OnNext({0})", i),
        () => Console.WriteLine("OnCompleted"));
// 終了のログ
Console.WriteLine("{0:HH:mm:ss}: End!!", DateTime.Now);
// Enterが押されるまで待つ
Console.ReadLine();

Timerメソッドを2回呼び出して発行された値をコンソールに出力しています。最初のTimerメソッドの呼び出しは、Schedulerを指定しないものです。2つ目のTimerメソッドの呼び出しではScheduler.CurrentThreadを指定しています。このようにメソッドの最後の引数にSchedulerを渡すことで、生成されたIObservableのシーケンスが何処で実行されるのか指定できます。実行結果を以下に示します。

Normal
20:25:22: Start!!
20:25:22: End!!
OnNext(0)
OnCompleted

CurrentThreadScheduler
20:25:26: Start!!
OnNext(0)
OnCompleted
20:25:27: End!!

CurrentThreadSchedulerを指定したほうでは、OnNextとOnCompletedが現在のスレッド上で実行されるため、Start!!とEnd!!の実行順序がSchedulerを指定しない場合と異なっていることが確認できます。

デフォルトのScheduler

TimerメソッドやIntervalメソッドのように、現在のスレッドをブロックしない形のIObservableのシーケンスを生成するメソッドではデフォルトのSchedulerはThreadPoolSchedulerが指定されています。また、現在のスレッドをブロックするIObservableのシーケンスを生成するメソッドではImmediateSchedulerが指定されています。

Schedulerの切り替え

ここでは、IObservableのシーケンスの処理中にSchedulerを切り替える方法について説明します。

ObserveOnメソッド

IObservableのシーケンスの処理中にSchedulerを切り替えるにはObserveOnメソッドを使用します。ObserveOnメソッドはIObservableの拡張メソッドとして定義されています。メソッドのシグネチャを以下に示します。

public static IObservable<TSource> ObserveOn<T>(
    this IObservable<T> source, 
    IScheduler scheduler);

public static IObservable<T> ObserveOn<TSource>(
    this IObservable<T> source, 
    SynchronizationContext context);

ISchedulerを受け取るオーバーロードは、後続の処理を指定したScheduler経由で実行するメソッドです。SynchronizationContextを受け取るオーバーロードは、SynchronizationContext経由で後続の処理を実行するメソッドです。実際には、SynchronizationContextSchedulerというクラスがReactive Extensionsで定義されているので、実際にはObserveOn(new SynchronizationContextScheduler())へのショートカットのメソッドです。ObserveOnメソッドのコード例を下記に示します。

// 1秒間隔で3つ値を発行する
Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Take(3)
    // 現在のスレッドIDを表示
    .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId))
    // 実行スレッドを新しいスレッドに切り替える
    .ObserveOn(Scheduler.NewThread)
    // 現在のスレッドIDを表示
    .Do(_ => Console.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId))
    // 購読して、発行された値とスレッドIDを表示
    .Subscribe(
        i => Console.WriteLine("OnNext({0}), ThreadId: {1}", i,
Thread.CurrentThread.ManagedThreadId),
        () => Console.WriteLine("OnCompleted, ThreadId: {0}",
Thread.CurrentThread.ManagedThreadId));

Console.ReadLine();

TimerメソッドとTakeメソッドを使って1秒間隔で3つの値を発行しています。そしてObserveOnメソッドを使ってスレッドを新規スレッドに切り替えています。スレッドの切り替え前後でDoメソッドを使ってスレッドIDを表示しています。SubscribeメソッドでもスレッドIDを表示して実行スレッドがどうなっているのか確認しています。実行結果を以下に示します。

ThreadId: 4
ThreadId: 5
OnNext(0), ThreadId: 5
ThreadId: 4
ThreadId: 7
OnNext(1), ThreadId: 7
ThreadId: 6
ThreadId: 8
OnNext(2), ThreadId: 8
OnCompleted, ThreadId: 9

スレッドIDが切り替わっていることが確認できます。このことから、IObservableのシーケンスの途中で実行スレッドの切り替えができることがわかります。

ObserveOnの使用用途

実行場所(ここでの例では実行スレッド)の切り替えが、どのような用途で使えるのかについて説明します。代表的な用途では、Windows FormsやWPFなどのようなUIを操作するケースです。たとえば、ボタンクリックのイベントなどをトリガーにバックグラウンドでWeb APIを呼び出して、結果を画面に反映させる操作があるとします。
このとき、イベントの購読とイベントの値の加工程度まではUIスレッドで行い、Web APIを呼び出す箇所はバックグラウンドで行うのがUIをフリーズさせないようにするため最近では一般的な方法です。そして、最後のUIへの結果の反映で、再びUIスレッドに切り替えて処理を行います。これは、UIフレームワークが一般的にUIスレッド以外からの操作に対応していないために必要になります。このように「イベントの発火→非同期での処理の呼び出し→結果の処理」といったReactive Extensionsで1シーケンスで記述できるような処理内でのスレッドの切り替えにObserveOnメソッドは効果を発揮します。

時間を制御するScheduler

Schedulerには、実行場所を制御するもののほかに、時間を制御するタイプのSchedulerがあります。実際の処理内部では、あまり出てくる頻度は高くないですがテスト時に24時間かかる処理を24時間待たずに一瞬で24時間後の状態にするといったことも可能なため、このようなものもあるのだと把握しておいてください。

HistoricalSchedulerクラス

時間を制御するSchedulerクラスとしてHistoricalSchedulerを紹介します。HistoricalSchedulerはAdvancedByメソッドやAdvancedToメソッドを使ってScheduler内の時間を制御します。AdvancedByがTimeSpan型でAdvancedToメソッドがDateTimeOffcetを使って時間を指定します。HistoricalSchedulerクラスの使用例を下記に示します。

var scheduler = new HistoricalScheduler();
// 1秒間隔で値を発行する
Observable.Interval(TimeSpan.FromSeconds(1), scheduler)
    // 購読
    .Subscribe(
        i => Console.WriteLine("OnNext({0})", i),
        () => Console.WriteLine("OnCompleted"));

// HistoricalSchedulerを使ってプログラム内でSchedulerの時間を進める
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));
// 0.5sec時間を進める
Console.WriteLine("AdvanceBy(0.5sec)");
scheduler.AdvanceBy(TimeSpan.FromSeconds(0.5));

Observable.Intervalメソッドを使って1秒間隔で値を発行するIObservableのシーケンスを作成しています。このときのSchedulerにHistoricalSchedulerを指定しています。そしてAdvanceByメソッドを使ってプログラム内でSchedulerの時間を進めています。0.5秒×5回の時間を進めているので合計で2.5秒の時間が進んでいることになります。実行結果を以下に示します。

AdvanceBy(0.5sec)
AdvanceBy(0.5sec)
OnNext(0)
AdvanceBy(0.5sec)
AdvanceBy(0.5sec)
OnNext(1)
AdvanceBy(0.5sec)

AdvanceByを2回呼んだ時点で値が発行され、さらにAdvanceByを2回呼んだ時点で値が発行されていることが確認できます。このようにReactive Extensionsでは時間をScheduler経由で取得するようにすることで、プログラムから柔軟に時間を制御することができるようになっています。