かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensionsのはじめかた

最近Reactive Extensionsがバズってきてて、2年以上前に公開したReactive Extensionsのv1のメソッドを大体網羅したPDFが割と参照されてます。

www.slideshare.net

個人的な入門は、そこに全部書いたので今更感もありますが、Reactive Extensionsを今から入門する人向けに色々書いてみたいと思います。

Reactive Extensionsとは

IObservable<T>とIObserver<T>がコアのインターフェースになります。名前からわかるようにこれは、GoFのデザインパターンの1つ、オブザーバーパターンを表したインターフェースになります。

Observerパターンを簡単に説明すると、IObservableの状態に何かしら変化があったら、IObserverに変更通知を発行します。IObserverは、変更を受け取り何らかの処理を行います。

o: 状態変化
x: 処理
|: データの流れ
/: 終端
Observable ----o------o------o------o-------o----/
               |      |      |      |       |
Observer   ----x------x------x------x-------x----/

コードで表すと以下のようになります。

// Observable監視される側
class Observable : IObservable<string>
{
  private List<IObserver<string>> obs = new List<IObserver<string>>();

  public IDisposable Subscribe(IObserver<string> o)
  {
    this.obs.Add(o);
    return null; // 本当は購読解除するDisposableを返す
  }

  // 変更があったことを通知するメソッド
  public void Execute(string x)
  {
    foreach (var o in this.obs)
    {
      o.OnNext(x);
    }
  }
}
// Observer監視する側
class Observer : IObserver<string>
{
  // 変更通知があったときに呼ばれるメソッド
  public void OnNext(string x)
  {
    // とりあえず標準出力に出力するだけ
    Console.WriteLine(x);
  }

  public void OnCompleted() {} // no use
  public void OnError(Exception ex) {} // no use
}

この2つを繋げます。

var observable = new Observable();
observable.Subscribe(new Observer()); // 監視する人を一人追加する
observable.Execute("Hello"); // Hello と出力される

observable.Subscribe(new Observer()); // 監視する人をもう一人追加する
observable.Execute("World"); // Worldが2回出力される

図にすると大体以下のような感じです。

observable -----o(Hello)----------------------o(World)----------/
                |                             |\
observer1  -----x-----------------------------x-|---------------/
                                               /
observer2                             --------x-----------------/

これまで書いてきた図について

Marble Diagramsという図を崩して書いたものになります。(AAだから許して!) この図は、右に行くと時間軸。縦方向が上がInputで下がOutputという非常に単純な図です。oとかでデータの発生を表します。

何故ObserverパターンをMarble Diagramsで描いてきたかというと、この図をイメージ出来るようになることが大事だからです。Reactive Extensionsの処理は、往々にして、このMarble Diagramsで描かれます。つまり時系列に応じて発生する値を入力として受け取って、何らかの処理を行い出力するのがReactive Extensionsでやっていることになります。

ObserverパターンとPush型のコレクション

ここまでReactive Extensionsの記事なのにObserverパターンについて延々と語ってきました。ObserverパターンのObservableは、時間の経過に応じてデータを出力するものとみなすことができます。時間と入出力の関係を図示するMarble DiagramsでObserverパターンの処理の流れを描くことが出来たのはそのためです。 このように時間に応じてデータを発行するものをPush型のデータ、複数発行する場合はPush型のコレクションと言ったりします。

Push型のコレクション操作

時系列に沿って何かしらの状態変化を起こして値を発行するものを、コレクションと見ることが出来たら後は、コレクション操作をするだけです。幸いにもC#にはLINQという強力なコレクション操作ライブラリがあります。このLINQをIObservableにまで拡張+αしたものがReactive Extensionsになります。(他言語にも大体コレクション操作ライブラリがあって、それに合うような形でObservableに対してコレクション操作を行うノリでReactive Extensionsがポーティングされています。)

Where + Select

一番よく使うLINQのメソッドのWhereやSelectを使ってReactive Extensionsのコード例を示します。1~10の値から、偶数のみを抽出して2倍した結果を出力してみます。Marble Diagramsで描くと以下のような処理です。

source  --(1)--------(2)-------(3)-----(4)------(5)-----...
                      |                 |
Where   -------------(2)---------------(4)--------------...
                      |                 |
Select  -------------(4)---------------(16)-------------...
                      |                 |
処理     ------------(出力)-------------(出力)-----------...
var source = Observable.Range(1, 10);

source
    .Where(x => x % 2 == 0)
    .Select(x => x * x)
    .Subscribe(x => Console.WriteLine("value = {0}", x));

実行すると以下のような結果になります。

value = 4
value = 16
value = 36
value = 64
value = 100

通常のLINQと違うのは、LINQがコレクションに入っている値を処理するのに対して、Reactive Extensionsは、sourceから発行された値を処理している点です。微妙ですが大事な点です。

Reactive Extensionsのソースになるもの

これまでの例では、単純なコレクションの処理を示してきました。しかし、Reactive Extensionsは、時間の経過に伴って値を発行するものなら何でも処理できる点が強みです。このような特徴を持つものの代表例を以下に示します。

  • タイマー
  • イベント
  • 普通のコレクション(0秒で一気に値を発行する)
  • 非同期処理(要素が1つのコレクション)

この中でもイベントを扱えるのは非常に強力で、イベントに対してフィルタリング、バッファリング、変換を行って処理をするといったことが簡単にできるようになります。

応用的な内容

上記が個人的に思ってる基本的な内容です。応用的な内容としては以下のようなものがあります。簡単に触りだけ。

合成

色々なものがReactive Extensionsで値を発行するソースになることを簡単に紹介しました。これらのデータは、Reactive Extensionsで提供されている様々な合成系のメソッドを使って合体させることが出来ます。

時間の操作

Reactive Extensionsで扱うデータは時間軸に応じて、値を発行するものだということは何度も言ってきました。その特徴から、Reactive Extensionsでは、時間に関する操作を行う処理もたくさん提供されています。

参考情報

基本を押さえたうえで読むととても理解が捗ると思われる資料

大作。利用シーンが書かれてて素敵。

www.slideshare.net

安定のぐらばくさん。マーブルダイアグラムとコードがならべて書いてあってとてもわかりやすい。

grabacr.net

あと、上記資料からリンクされてるところも超参考になります。(リンク集め手抜き)

以上!良いRxライフを!