読者です 読者をやめる 読者になる 読者になる

かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

7つのサンプルプログラムで学ぶRxJavaの挙動のコードをC#にポーティング

2記事続けて人の記事にのっかった記事になります。

techlife.cookpad.com

解説は元記事がとても丁寧なのでそちらを一読することをお勧めします!C#固有の話とかあったらこちらで補足します。

基本

1~10の数字を渡し、偶数だけにフィルタリングしたうえ、値を10倍にしてログ衆力するというプログラムです。

new[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }
    .ToObservable()
    .Where(x => x % 2 == 0)
    .Select(x => x * 10)
    .Subscribe(
        x => Console.WriteLine(x),
        () => Console.WriteLine("completed"));

C#では、拡張メソッドがあるので配列からIObservableへの変換はToObservable拡張メソッドでやるところが特徴的です。

フィルタがWhere、値の加工がSelectという名前なところもC#のLINQの文化ですね。そして、Subscribeが、ラムダ式で行けるという点も見逃せません。これでかなりシンプルに書けます。

サンプルプログラム1

Rxのメソッドの基本的な実行順番を理解するためのプログラムです。

var sb = new StringBuilder();

Observable.Return(1)
    .Select(x =>
    {
        sb.Append("1");
        return x;
    })
    .Subscribe(
        x => sb.Append("2"),   // OnNext
        ex => sb.Append("3"),  // OnError
        () => sb.Append("4")); // OnCompleted

sb.Append("5");

Console.WriteLine(sb);

RxJava側のコードとの違いは、Java側はSubscribeでonNext, onCompleted, onErrorの順番でメソッドを定義していましたが、C#ではコメントにもある通りOnNext, OnError, OnCompletedの順番でラムダ式を指定します。それに伴い実行結果も以下のようにオリジナルとは異なります。

1245

サンプルプログラム2

Sleepを入れて動きを見てみるコードになります。

var sb = new StringBuilder();

Observable.Return(1)
    .Select(x =>
    {
        Thread.Sleep(500);
        sb.Append("1");
        return x;
    })
    .Subscribe(
        x => sb.Append("2"),   // OnNext
        ex => sb.Append("3"),  // OnError
        () => sb.Append("4")); // OnCompleted

sb.Append("5");

Console.WriteLine(sb);

実行結果もオリジナルと同じです。

1245

サンプルプログラム3

SubscribeOnの動作確認になります。SubscribeOnはSubscribeするスレッドを指定します。

var sb = new StringBuilder();

Observable.Return(1)
    .SubscribeOn(NewThreadScheduler.Default)
    .Select(x =>
    {
        Thread.Sleep(500);
        sb.Append("1");
        return x;
    })
    .Subscribe(
        x => sb.Append("2"),   // OnNext
        ex => sb.Append("3"),  // OnError
        () => sb.Append("4")); // OnCompleted

sb.Append("5");

Thread.Sleep(10000);
Console.WriteLine(sb);

Schedulerは、Schedulersクラスから生えてる各種Schedulerへのインスタンスを取得するというのは非推奨になってるものが多いので、それっぽい名前のSchedulerのDefaultプロパティなどを使って指定します。今回は新しいスレッドを指定したかったので、NewThreadScheduler.Defaultを指定しています。

実行すると、500ms待ってからRxの中のストリームの処理が行われるのでオリジナル同様(OnCompletedの順番が違うのを除いて)になります。

5124

サンプルプログラム4

何処の処理がどのスレッドで実行するか確認するものになります。C#では、スレッド名が特についてないので、ManagedThreadIdを取得するようにしてみました。あとListに突っ込むのではなくてStringBuilderにAppendLineしてます。

var sb = new StringBuilder();

Observable.Return(1)
    .SubscribeOn(NewThreadScheduler.Default)
    .Select(x =>
    {
        sb.AppendLine("1:" + Thread.CurrentThread.ManagedThreadId);
        return x;
    })
    .Subscribe(
        x => sb.AppendLine("2:" + Thread.CurrentThread.ManagedThreadId),   // OnNext
        ex => sb.AppendLine("3:" + Thread.CurrentThread.ManagedThreadId),  // OnError
        () => sb.AppendLine("4:" + Thread.CurrentThread.ManagedThreadId)); // OnCompleted

実行すると以下のような結果になります。

1:3
2:3
4:3

スレッドのIDは実行環境によって変わると思います。

サンプルプログラム5

次にObserveOnを使ったコードのスレッドの切り替えをやります。RxJavaでは、mainスレッドに戻すSchedulerが提供されていますが、素のRxにはそのようなSchedulerがないのでC#のコードでは、SynchronizationContextを使用して実行スレッドをもとに戻しています。ここからはUIスレッドが存在することが前提となっているっぽいのでWPFのボタンクリックイベントハンドラにコードを書いています。

var sb = new StringBuilder();

Observable.Return(1)
    .SubscribeOn(NewThreadScheduler.Default)
    .Select(x =>
    {
        sb.AppendLine("1:" + Thread.CurrentThread.ManagedThreadId);
        return x;
    })
    .ObserveOn(SynchronizationContext.Current)
    .Subscribe(
        x => sb.AppendLine("2:" + Thread.CurrentThread.ManagedThreadId),   // OnNext
        ex => sb.AppendLine("3:" + Thread.CurrentThread.ManagedThreadId),  // OnError
        () => sb.AppendLine("4:" + Thread.CurrentThread.ManagedThreadId)); // OnCompleted

await Task.Delay(5000);
Debug.WriteLine(sb);

実行結果は以下のようになります。

1:12
2:10
4:10

因みにWPFやWinRTなどのようなUIスレッド上で処理を行うためのSchedulerは素のRxにはありません。ReactivePropertyなどのようなUI向けの機能を提供した別ライブラリなどで提供されています。

サンプルプログラム6

SubscribeOnを2回やったときの挙動の確認のプログラムになります。

var sb = new StringBuilder();

Observable.Return(1)
    .SubscribeOn(SynchronizationContext.Current)
    .Select(x =>
    {
        sb.AppendLine("1:" + Thread.CurrentThread.ManagedThreadId);
        return x;
    })
    .SubscribeOn(NewThreadScheduler.Default)
    .Select(x =>
    {
        sb.AppendLine("2:" + Thread.CurrentThread.ManagedThreadId);
        return x;
    })
    .ObserveOn(SynchronizationContext.Current)
    .Subscribe(
        x => sb.AppendLine("2:" + Thread.CurrentThread.ManagedThreadId),   // OnNext
        ex => sb.AppendLine("3:" + Thread.CurrentThread.ManagedThreadId),  // OnError
        () => sb.AppendLine("4:" + Thread.CurrentThread.ManagedThreadId)); // OnCompleted

await Task.Delay(5000);
Debug.WriteLine(sb);

実行結果は以下のようになります。最初にSubscribeOnしたスレッドで処理が実行されます。

1:9
2:9
2:9
4:9

サンプルプログラム7

最後は、途中からSubscribeOnした時の挙動です。

var sb = new StringBuilder();

Observable.Return(1)
    .Select(x =>
    {
        sb.AppendLine("1:" + Thread.CurrentThread.ManagedThreadId);
        return x;
    })
    .SubscribeOn(NewThreadScheduler.Default)
    .Select(x =>
    {
        sb.AppendLine("2:" + Thread.CurrentThread.ManagedThreadId);
        return x;
    })
    .Subscribe(
        x => sb.AppendLine("2:" + Thread.CurrentThread.ManagedThreadId),   // OnNext
        ex => sb.AppendLine("3:" + Thread.CurrentThread.ManagedThreadId),  // OnError
        () => sb.AppendLine("4:" + Thread.CurrentThread.ManagedThreadId)); // OnCompleted

await Task.Delay(5000);
Debug.WriteLine(sb);

オリジナルと同じで途中でSubscribeOnしても最初でしても同じ結果になってます。

1:11
2:11
2:11
4:11

まとめ

ちょっとくらい違う動きするかと思ったらRxもRxJavaも同じ動きをしました。よく移植されてる。

techlife.cookpad.com