かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」

過去記事インデックス

はじめに

今回は、エラーに対応するためのメソッドの1つOnErrorResumeNextというメソッドについてみていきます。

OnErrorResumeNextメソッド

ここでは、OnErrorResumeNextメソッドについて説明します。このメソッドはIObservableのシーケンスで例外が発生した際に、どのようにIObservableのシーケンスを再開するのかを指定します。IObservableの拡張メソッドとして定義されているものは「Catchメソッド」で説明したものと同じように、例外発生時に別のIObservableで処理を続けるということが出来ます。
コード例を下記に示します。

Observable
    // 例外を出す
    .Throw<string>(new Exception())
    // OnErrorResumeNextでエラーになったときの代わりを指定しておく
    .OnErrorResumeNext(Observable.Return("OK"))
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        ex => Console.WriteLine("OnError: {0}", ex),
        () => Console.WriteLine("OnCompleted"));

最初にThrowメソッドを使って例外を発生させています。それに対してOnErrorResumeNextメソッドを使ってOKという文字列を流し込んでいます。実行結果を以下に示します。

OnNext: OK
OnCompleted

Throwメソッドで発生された例外は、Subscribeまで届いていないことが確認できます。このように例外発生時に、別のIObservableで処理を再開することが出来ます。因みにCatchメソッドでは例外の種類を指定できましたがOnErrorResumeNextでは例外の種類を指定できないので、この点だけを見るとCatchメソッドのほうが柔軟な対応が出来ます。
OnErrorResumeNextは、IObservableの拡張メソッドの他にIEnumerable>の拡張メソッドや、引数にparams IObservable[]を受け取るオーバーロードが定義されています。メソッドのシグネチャを下記に示します。

public static IObservable<T> OnErrorResumeNext<T>(this IEnumerable<IObservable<T>> sources);
public static IObservable<T> OnErrorResumeNext<T>(params IObservable<T>[] sources);

このメソッドを使うと、最初のIObservableで例外が発生したら、次のIObservableで再開して、そこでも例外が発生したら、次のIObservableで再開して・・・ということが実現できます。コード例を下記に示します。

// 4番目にOK
new[] { "NG", "Error", "Abort", "OK" }
    // インデックスと値のペアに変換
    .Select((s, i) => new { index = i, value = s })
    // OK以外は例外を飛ばすIO<string>を返す(IEnumerable<IObservable<T>>へ変換)
    .Select(s => s.value != "OK" ?
        Observable.Throw<string>(new Exception(s.ToString())) :
        Observable.Return(s.ToString()))
    .OnErrorResumeNext()
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        ex => Console.WriteLine("OnError: {0}", ex),
        () => Console.WriteLine("OnCompleted"));

多少複雑ですが、{“NG”, “Error”, “Abort”, “OK”}という文字列の配列から、IEnumerable>へ変換してOnErrorResumeNextを呼び出しています。内容としては”OK”という文字列以外が渡ってきた場合はObservable.Throwメソッドを使って例外を発生させるIObservableを作成しています。実行結果を以下に示します。

OnNext: { index = 3, value = OK }
OnCompleted

NGやError, Abortの文字列から生成される例外は全て無視してOKがSubscribeに渡ってきていることが確認できます。動作確認のためOKの文字列の場所を下記のように2番目にくるように書き直します。

// 2番目にOK
new[] { "NG", "OK", "Abort", "Error" }
    // インデックスと値のペアに変換
    .Select((s, i) => new { index = i, value = s })
    // OK以外は例外を飛ばすIO<string>を返す(IEnumerable<IObservable<T>>へ変換)
    .Select(s => s.value != "OK" ?
        Observable.Throw<string>(new Exception(s.ToString())) :
        Observable.Return(s.ToString()))
    .OnErrorResumeNext()
    // 購読
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        ex => Console.WriteLine("OnError: {0}", ex),
        () => Console.WriteLine("OnCompleted"));

実行結果を以下に示します。

OnNext: { index = 1, value = OK }
OnCompleted

今度はindexが1になっていることが確認できます。全て例外で終わった場合(このプログラム例では最初の文字列の配列にOKが無い場合)は以下のようにOnCompletedになります。

OnCompleted

これまでのサンプルではOnErrorResumeNextの挙動を確認するために単純なプログラムを書いていましたが、このメソッドを使うと例えば、プライマリのサーバに対して非同期にデータを要求して例外が発生した場合にはセカンダリのサーバに対してデータを要求するというプログラムを書くことができます。