かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」

過去記事インデックス

はじめに

今回は、エラーに対応するためのメソッドの1つOnErrorResumeNextというメソッドについてみていきます。

OnErrorResumeNextメソッド

ここでは、OnErrorResumeNextメソッドについて説明します。このメソッドはIObservableのシーケンスで例外が発生した際に、どのようにIObservableのシーケンスを再開するのかを指定します。IObservableの拡張メソッドとして定義されているものは「Catchメソッド」で説明したものと同じように、例外発生時に別のIObservableで処理を続けるということが出来ます。
コード例を下記に示します。

Observable
    // 例外を出す
    .Throw<string>(new Exception())
    // OnErrorResumeNextでエラーになったときの代わりを指定しておく
    .OnErrorResumeNext(Observable.Return("OK"))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        ex => Console.WriteLine("OnError: {0}", ex),
        () => Console.WriteLine("OnCompleted"));

最初にThrowメソッドを使って例外を発生させています。それに対してOnErrorResumeNextメソッドを使ってOKという文字列を流し込んでいます。実行結果を以下に示します。

OnNext: OK
OnCompleted

Throwメソッドで発生された例外は、Subscribeまで届いていないことが確認できます。このように例外発生時に、別のIObservableで処理を再開することが出来ます。因みにCatchメソッドでは例外の種類を指定できましたがOnErrorResumeNextでは例外の種類を指定できないので、この点だけを見るとCatchメソッドのほうが柔軟な対応が出来ます。
OnErrorResumeNextは、IObservableの拡張メソッドの他にIEnumerable>の拡張メソッドや、引数にparams IObservable[]を受け取るオーバーロードが定義されています。メソッドのシグネチャを下記に示します。

public static IObservable<T> OnErrorResumeNext<T>(this IEnumerable<IObservable<T>> sources);
public static IObservable<T> OnErrorResumeNext<T>(params IObservable<T>[] sources);

このメソッドを使うと、最初のIObservableで例外が発生したら、次のIObservableで再開して、そこでも例外が発生したら、次のIObservableで再開して・・・ということが実現できます。コード例を下記に示します。

// 4番目にOK
new[] { "NG", "Error", "Abort", "OK" }
    // インデックスと値のペアに変換
    .Select((s, i) => new { index = i, value = s })
    // OK以外は例外を飛ばすIO<string>を返す(IEnumerable<IObservable<T>>へ変換)
    .Select(s => s.value != "OK" ?
        Observable.Throw<string>(new Exception(s.ToString())) :
        Observable.Return(s.ToString()))
    .OnErrorResumeNext()
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        ex => Console.WriteLine("OnError: {0}", ex),
        () => Console.WriteLine("OnCompleted"));

多少複雑ですが、{“NG”, “Error”, “Abort”, “OK”}という文字列の配列から、IEnumerable>へ変換してOnErrorResumeNextを呼び出しています。内容としては”OK”という文字列以外が渡ってきた場合はObservable.Throwメソッドを使って例外を発生させるIObservableを作成しています。実行結果を以下に示します。

OnNext: { index = 3, value = OK }
OnCompleted

NGやError, Abortの文字列から生成される例外は全て無視してOKがSubscribeに渡ってきていることが確認できます。動作確認のためOKの文字列の場所を下記のように2番目にくるように書き直します。

// 2番目にOK
new[] { "NG", "OK", "Abort", "Error" }
    // インデックスと値のペアに変換
    .Select((s, i) => new { index = i, value = s })
    // OK以外は例外を飛ばすIO<string>を返す(IEnumerable<IObservable<T>>へ変換)
    .Select(s => s.value != "OK" ?
        Observable.Throw<string>(new Exception(s.ToString())) :
        Observable.Return(s.ToString()))
    .OnErrorResumeNext()
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        ex => Console.WriteLine("OnError: {0}", ex),
        () => Console.WriteLine("OnCompleted"));

実行結果を以下に示します。

OnNext: { index = 1, value = OK }
OnCompleted

今度はindexが1になっていることが確認できます。全て例外で終わった場合(このプログラム例では最初の文字列の配列にOKが無い場合)は以下のようにOnCompletedになります。

OnCompleted

これまでのサンプルではOnErrorResumeNextの挙動を確認するために単純なプログラムを書いていましたが、このメソッドを使うと例えば、プライマリのサーバに対して非同期にデータを要求して例外が発生した場合にはセカンダリのサーバに対してデータを要求するというプログラムを書くことができます。