かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その15「To*****系メソッド」

!!注記!!
コメントにあるneueccさんのコメントまでがこの記事です!(キリッ

はじめに

今回は、Observableクラスに定義されているTo****という名前のメソッドについてみていきます。基本的にこれらのメソッドはIObservableを別の型に変換する際に使用されます。変換結果には配列やList, Dictionary, Lookupなどがあります。

ToArrayメソッド

まず、最初にIObservableからIObservableへ変換するToArrayメソッドについて説明します。これは、単純なSubjectを使ったシーケンスに対して適用するとOnNext〜OnCompletedまでの値を収集して配列にします。コード例を下記に示します。

// IO<T> -> IO<T[]>へ変換して購読
s.ToArray().Subscribe(array =>
{
    // 内容表示
    Console.WriteLine("start array dump");
    foreach (var i in array)
    {
        Console.WriteLine("  array value : {0}", i);
    }
});

// 値の発行からCompleted
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(2)");
s.OnNext(2);
Console.WriteLine("OnNext(3)");
s.OnNext(3);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
start array dump
  array value : 1
  array value : 2
  array value : 3

実行結果からもわかる通り、1つのIObservableシーケンスをまとめて配列にしてIObservableへ変換しています。

ToDictionaryメソッド

このToDictionaryメソッドも、ToArrayと同じような動きをします。IObservableからIObservableへの変換を行います。TKeyはToDictionaryのメソッド引数にKeyを選択するラムダ式を渡すことで指定します。コード例を下記に示します。

var s = new Subject<Tuple<string, int>>();
// IO<T> -> IO<IDictionary<TKey, T>>へ変換して購読
// Keyを選択するラムダ式を渡す。
s.ToDictionary(t => t.Item1).Subscribe(dict =>
{
    Console.WriteLine("one : {0}", dict["one"]);
    Console.WriteLine("two : {0}", dict["two"]);
    Console.WriteLine("three : {0}", dict["three"]);
});

// 値の発行からCompleted
Console.WriteLine("OnNext(one)");
s.OnNext(Tuple.Create("one", 1));
Console.WriteLine("OnNext(two)");
s.OnNext(Tuple.Create("two", 2));
Console.WriteLine("OnNext(three)");
s.OnNext(Tuple.Create("three", 3));
Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(one)
OnNext(two)
OnNext(three)
OnCompleted()
one : (one, 1)
two : (two, 2)
three : (three, 3)

Tupleを発行して、TupleのItem1をキーにしてIDictionaryを作成しています。実行結果からも、one, two, threeなどのTupleの最初の要素がキーになっていることが確認できます。

ToEnumerableメソッド

このメソッドは、単純にIObservableからIEnumerableへ変換します。ToEnumerableでIEnumerableに変換した後に発行された値がIEnumerableに流れて、OnCompletedでIEnumerableのシーケンスも終了します。コード例を下記に示します。

var s = new Subject<int>();
// 進捗状況を管理するためのWaitHandle
var gate = new EventWaitHandle(false, EventResetMode.AutoReset);
Observable.Start(() =>
{
    // IO<T> -> IE<T>への変換
    var e = s.ToEnumerable();
    // 待ち解除
    gate.Set();
    // 値の表示
    foreach (var i in e)
    {
        Console.WriteLine("value : {0}", i);
    }
})
// 最後にWaitHandleを発火
.Finally(() => gate.Set())
.Subscribe();

// ToEnumerableされるまで待つ
gate.WaitOne();

// 値の発行からCompleted
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(2)");
s.OnNext(2);
Console.WriteLine("OnNext(3)");
s.OnNext(3);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

// 列挙が終わるのを待つ
gate.WaitOne();

実行結果を下記に示します。

OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
value : 1
value : 2
value : 3

今回のプログラムは複数のスレッドから値の発行と列挙を行っているため、毎回同じ実行結果になるとは限りません出力が前後することがあります。複数スレッドに分けたのはToEnumerableの結果をIEnumerableでループを行うと、値が発行されるまでブロックするという挙動のためです。因みに、この条件はHotなIObservableを使用している場合に起きる問題でColdなIObservableを使用している場合は問題になることは、ありません。コード例を下記に示します。

// ColdなIObservable<int>を作成
var s = Observable.Range(1, 5);
// IEnumerable<int>に変換して列挙
foreach (var i in s.ToEnumerable())
{
    Console.WriteLine("value : {0}", i);
}

実行結果を下記に示します。

value : 1
value : 2
value : 3
value : 4
value : 5

普通に値の列挙が行われていることが確認できます。

ToEventメソッド

ToEventメソッドは、IObservableからIEventSourceという型に変換を行います。このIEventSourceという型はOnNextイベントを持つ型で、IObservableから値が発行される度にOnNextイベントを発火させます。コード例を下記に示します。

var s = new Subject<int>();
// IO<T>からOnNextイベントを発行するIEventSourct<T>へ変換
var evt = s.ToEvent();
// OnNextをイベントで受け取れる
evt.OnNext += i =>
{
    Console.WriteLine("value : {0}", i);
};

// 値の発行
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(2)");
s.OnNext(2);
Console.WriteLine("OnNext(3)");
s.OnNext(3);

実行結果を下記に示します。

OnNext(1)
value : 1
OnNext(2)
value : 2
OnNext(3)
value : 3

SubjectでOnNextが実行される度にOnNextイベントが発行されていることが確認できます。

ToListメソッド

ToListメソッドはToArrayメソッドと、ほぼ同様の動きを行います。ToArrayがIObservableを返すのに対してToListメソッドはIObservable>を返します。コード例を下記に示します。

var s = new Subject<int>();
// IO<T> -> IO<IList<T>>へ変換
s.ToList().Subscribe(list =>
{
    // 値を表示
    foreach (var i in list)
    {
        Console.WriteLine("value : {0}", i);
    }
});

// 値の発行からCompleted
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(2)");
s.OnNext(2);
Console.WriteLine("OnNext(3)");
s.OnNext(3);
Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
value : 1
value : 2
value : 3

ToArrayと同様にOnNext〜OnCompletedまでの間の値を格納したIListが作成されていることが確認できます。

ToLookupメソッド

次に、ToLookupメソッドについて説明します。これはToDictionaryメソッドと同様にキーを選択するラムダ式を渡します。そうすると、キー値でグルーピングされたIObservable>が返されます。コード例を下記に示します。

var s = new Subject<Tuple<string, string>>();
// IO<T>からIO<ILookup<TKey, T>>へ変換
// Keyを選択するラムダ式を渡す
s.ToLookup(t => t.Item1).Subscribe(l =>
{
    // グループ単位に表示
    foreach (var g in l)
    {
        Console.WriteLine("Key : {0}", g.Key);
        foreach (var i in g)
        {
            Console.WriteLine("  item : {0}", i);
        }
    }
});

Console.WriteLine("OnNext(group A)");
s.OnNext(Tuple.Create("group A", "taro"));
s.OnNext(Tuple.Create("group A", "jiro"));

Console.WriteLine("OnNext(group B)");
s.OnNext(Tuple.Create("group B", "foo"));
s.OnNext(Tuple.Create("group B", "hoge"));
s.OnNext(Tuple.Create("group B", "bar"));

Console.WriteLine("OnCompleted()");
s.OnCompleted();

実行結果を下記に示します。

OnNext(group A)
OnNext(group B)
OnCompleted()
Key : group A
  item : (group A, taro)
  item : (group A, jiro)
Key : group B
  item : (group B, foo)
  item : (group B, hoge)
  item : (group B, bar)

IObservableから発行された値がKey値ごとにグルーピングされていることが確認できます。