かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その30「もう待ちきれない!を表現するTimeoutメソッド」

過去記事インデックス

はじめに

長いもので30回目です!今回はTimeoutメソッドを紹介したいと思います。

Timeoutメソッド

ここでは、Timeoutメソッドについて説明します。Timeoutメソッドは名前の通りIObservableのシーケンスから指定した時間、値が発行されなかった場合にエラーにするメソッドです。シグネチャを以下に示します。

public static IObservable<T> Timeout<T>(
    this IObservable<T> source, 
    TimeSpan dueTime);

dueTimeでタイムアウトの時間を指定します。TimeSpan以外にもDateTimeOffsetでタイムアウトを指定するオーバーロードがありますが、ここでは紹介を割愛します。このメソッドのコード例を下記に示します。

var subscriber = Observable
    // 0〜4の値をi秒間隔で発行する
    .Generate(0, i => i < 5, i => i + 1, i => i, i => TimeSpan.FromSeconds(i))
    // 3500ms以上間隔があくとタイムアウト
    .Timeout(TimeSpan.FromMilliseconds(3500))
    // 購読
    .Subscribe(
        i => Console.WriteLine("{0:HH:mm:ss} OnNext({1})", DateTime.Now, i),
        ex => Console.WriteLine("{0:HH:mm:ss} OnError({1})", DateTime.Now, ex),
        () => Console.WriteLine("{0:HH:mm:ss} OnCompleted()", DateTime.Now));
// Enterを押すと購読終了
Console.ReadLine();
subscriber.Dispose();

このコードでは、Generateメソッドを使って0, 1, 2, 3, 4の値を0s, 1s, 2s, 3s, 4s間隔で発行しています。それに対してTimeoutメソッドで3.5s(3500ms)を指定しています。このコードの実行結果を以下に示します。

12:39:48 OnNext(0)
12:39:49 OnNext(1)
12:39:51 OnNext(2)
12:39:54 OnNext(3)
12:39:58 OnError(System.TimeoutException: 操作がタイムアウトしました。)

最後の4が発行されるのに4秒間が空いてしまうので、TimeoutExceptionが発行されSubscribeのOnErrorに処理がいきます。このように、時間を要する処理のタイムアウトを簡単に指定することが出来ます。
このTimeoutメソッドにはタイムアウト時の動作をカスタマイズすることが出来るオーバーロードがあります。そのメソッドのシグネチャを以下に示します。

public static IObservable<T> Timeout<T>(
    this IObservable<T> source, 
    TimeSpan dueTime, 
    IObservable<T> other);

第三引数のotherで、タイムアウトが起きたときに代わりに値を発行するIObservableのシーケンスを指定します。このオーバーロードの使用例を下記に示します。

var subscriber = Observable
    // 0〜4の値をi秒間隔で発行する
    .Generate(0, i => i < 5, i => i + 1, i => i, i => TimeSpan.FromSeconds(i))
    // 3500ms以上間隔があくとタイムアウト
    .Timeout(
        TimeSpan.FromMilliseconds(3500), 
        // タイムアウトの時に流す値を指定
        Observable.Create<int>(o =>
        {
            Console.WriteLine("{0:HH:mm:ss} Error Action", DateTime.Now);
            // -1を流して完了
            o.OnNext(-1);
            o.OnCompleted();
            return Disposable.Empty;
        }))
    // 購読
    .Subscribe(
        i => Console.WriteLine("{0:HH:mm:ss} OnNext({1})", DateTime.Now, i),
        ex => Console.WriteLine("{0:HH:mm:ss} OnError({1})", DateTime.Now, ex),
        () => Console.WriteLine("{0:HH:mm:ss} OnCompleted()", DateTime.Now));
            
// Enterを押すと購読終了
Console.ReadLine();
subscriber.Dispose();

TimeoutメソッドでObservable.Createメソッドを使ってタイムアウトの時に-1の値を流してIObservableのシーケンスを完了させるようにしています。このコードの実行結果を以下に示します。

13:08:09 OnNext(0)
13:08:10 OnNext(1)
13:08:12 OnNext(2)
13:08:15 OnNext(3)
13:08:19 Error Action
13:08:19 OnNext(-1)
13:08:19 OnCompleted()

最初のコード例ではTimeoutExceptionが発生していましたが、この例ではObservable.Createで作成したIObservableから発行された値が後続に流れていることが確認できます。このようにタイムアウトとタイムアウトに伴って発行する値を差し替えたりすることが出来ます。