かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive Extensions再入門 その8「SkipとTakeメソッド」

SkipとTakeメソッド

LINQの基本のメソッドのWhereとSelectの動作を確認したので、次はSkipとTakeを使用したいと思います。Skipは、指定した数だけ値を読み飛ばして、Takeは指定した数だけ値を通過させるイメージです。コード例を下記に示します。

// 値の発行元になるSubject
var s = new Subject<int>();

// 最初の3つをスキップして、その後3つを通知する
s.Skip(3).Take(3).Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("OnCompleted()"));

// 1-10の値を流し込む
Observable.Range(1, 10).ForEach(i => s.OnNext(i));

Skip(3)をしてTake(3)をしているところに1〜10の値を発行しているので4, 5, 6がSubscribeしているところに通知されます。実行結果を下記に示します。

OnNext(4)
OnNext(5)
OnNext(6)
OnCompleted()

注目するのは、6が発行されたあとにOnCompletedが呼ばれている点です。Take(3)で値が3つ通過した後には、もう値が発行されないので、完了通知が発行されます。

Repeatメソッドとの組み合わせ

SkipとTakeの動作自体は非常にシンプルなので、これにRepeatを組み合わせてみようと思います。Repeat自体は以前、紹介しているとおり、指定した回数繰り返しを行うメソッドです。SkipとTakeと組み合わせることで下記のような動作をさせることが出来ます。コードを下記に示します。

// 値の発行元になるSubject
var s = new Subject<int>();

// 最初の3つをスキップして、その後3つを通知することを繰り返す
s.Skip(3).Take(3).Repeat().Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("OnCompleted()"));

// 1-20の値を流し込む
Observable.Range(1, 20).ForEach(i => s.OnNext(i));

先ほどのコードのTakeの後にRepeatを追加しています。引数の無いRepeatメソッドは、無限に繰り返しを行います。どのような動きになるのか実行結果を下記に示します。

OnNext(4)   <- 最初のSkip(3).Take(3)
OnNext(5)   <- 最初のSkip(3).Take(3)
OnNext(6)   <- 最初のSkip(3).Take(3)
OnNext(10)  <- 2回目のSkip(3).Take(3)
OnNext(11)  <- 2回目のSkip(3).Take(3)
OnNext(12)  <- 2回目のSkip(3).Take(3)
OnNext(16)  <- 3回目のSkip(3).Take(3)
OnNext(17)  <- 3回目のSkip(3).Take(3)
OnNext(18)  <- 3回目のSkip(3).Take(3)

Repeatを追加することで3つ値をスキップして3つ値を発行するという動作を繰り返しています。このようにIObservableの拡張メソッドを組み合わせることで様々なIObservableを定義することが出来ます。

SkipWhileとTakeWhileメソッド

次は、SkipWhileとTakeWhileメソッドについてみていきます。このメソッドはFuncのデリゲートを受け取ります。このデリゲートがtrueを返す間はSkipしたりTakeをします。動作確認のためのコードを下記に示します。

// 値の発行元になるSubject
var s = new Subject<int>();

// 発行される値が5より小さい間はスキップして10より小さい間は通過させる
s.SkipWhile(i => i < 5).TakeWhile(i => i < 10).Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("OnCompleted()"));

// 1-20の値を流し込む
Console.WriteLine("1-20 OnNext start.");
Observable.Range(1, 20).ForEach(i => s.OnNext(i));
Console.WriteLine("1-20 OnNext end.");
Console.WriteLine();

5より小さい間はスキップして10より小さい間は通過させるので5,6,7,8,9がSubscribeまで通知されます。実行結果を下記に示します。

1-20 OnNext start.
OnNext(5)
OnNext(6)
OnNext(7)
OnNext(8)
OnNext(9)
OnCompleted()
1-20 OnNext end.

この例も、9が発行された段階で値が発行されなくなるためOnCompletedが通知されていることが確認できます。コードでは示しませんが、この例でもRepeatメソッドを組み合わせることで任意の回数この挙動を繰り返すことが出来ます。

SkipUntilとTakeUntilメソッド

次はLINQには定義されていないReactive Extensions固有のメソッドのSkipUntilとTakeUntilメソッドについてみていきます。SkipUntilとTakeUntilメソッドのシグネチャを下記に示します。

public IObservable<T> SkipUntil<T, TOther>(IObservable<TOther> other);
public IObservable<T> TakeUntil<T, TOther>(IObservable<TOther> other);

どちらも引数にIObservableを受け取ります。SkipUntilは引数で渡されたIObservableから値が発行されるまで、値のスキップを行います。TakeUntilは引数で渡されたIObservableから値が発行されるまで、値を通過させます。コード例を下記に示します。

// 値の発行元になるSubject
var s = new Subject<int>();
// 値のスキップを辞めるきっかけ
var startTrigger = new Subject<Unit>();
// 値を後続に流すことを辞めるきっかけ
var endTrigger = new Subject<Unit>();

// startTriggerのOnNextが呼ばれるまで値をスキップしてendTriggerのOnNextが
// 呼ばれるまで後続に値を流す。
s.SkipUntil(startTrigger).TakeUntil(endTrigger).Subscribe(
    i => Console.WriteLine("OnNext({0})", i),
    ex => Console.WriteLine("OnError({0})", ex.Message),
    () => Console.WriteLine("OnCompleted()"));

// 1-5の値を流し込む
Console.WriteLine("1-5 OnNext start.");
Observable.Range(1, 5).ForEach(i => s.OnNext(i));
Console.WriteLine("1-5 OnNext end.");
Console.WriteLine();

// startTriggerのOnNextを発行してから1-5の値を流し込む
Console.WriteLine("startTrigger.OnNext called.");
startTrigger.OnNext(Unit.Default);
Console.WriteLine("1-5 OnNext start.");
Observable.Range(1, 5).ForEach(i => s.OnNext(i));
Console.WriteLine("1-5 OnNext end.");
Console.WriteLine();

// endTriggerのOnNextを発行してから1-5の値を流し込む
Console.WriteLine("endTrigger.OnNext called.");
endTrigger.OnNext(Unit.Default);
Console.WriteLine("1-5 OnNext start.");
Observable.Range(1, 5).ForEach(i => s.OnNext(i));
Console.WriteLine("1-5 OnNext end.");
Console.WriteLine();

コードは長いですがポイントは、startTriggerがSkipUntilに渡すIObservableで、endTriggerがTakeUntilに渡すIObservableになる点です。そして、startTriggerとendTriggerのOnNextが呼ばれた際に発行された値がSubscribeに通知されるようになったり、通知されなくなったりする点に注目してください。実行結果を下記に示します。

1-5 OnNext start.
1-5 OnNext end.

startTrigger.OnNext called.
1-5 OnNext start.
OnNext(1)
OnNext(2)
OnNext(3)
OnNext(4)
OnNext(5)
1-5 OnNext end.

endTrigger.OnNext called.
OnCompleted()
1-5 OnNext start.
1-5 OnNext end.

startTriggerのOnNextを呼ぶ前に発行した値は全てスキップされて、startTriggerのOnNextを呼ぶと値が通知されてOnNext(1)〜OnNext(5)までが表示されています。endTriggerのOnNextを呼ぶとOnCompletedが呼ばれて終了状態になり、以降に発行された値はSubscribeまで通知が届いていないことが確認できます。

ということで

今回はSkip***とTake***系のメソッドを見ました。特にSkipUntilとTakeUntilは強力で非同期通信の処理結果を、途中で止めるためにTakeUntilを使用したり、マウスダウン、マウスアップ、マウスムーブのイベントを組み合わせてドラッグ中のみ値が発行されるIObservableを作成したりと応用が効きます。ということで、次回はそこらへんを見ていけたらいいなと思います。