過去記事インデックス
- Reactive Extensions再入門 その1
- Reactive Extensions再入門 その2「IObservableインターフェースとIObserverインターフェース」
- Reactive Extensions再入門 その3「IObservableのファクトリメソッド」
- Reactive Extensions再入門 その4「Timer系のファクトリメソッド」
- Reactive Extensions再入門 その5「HotとCold」
- Reactive Extensions再入門 その6「HotなIObservableを作成するファクトリ」
- Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
SkipとTakeメソッド
LINQの基本のメソッドのWhereとSelectの動作を確認したので、次はSkipとTakeを使用したいと思います。Skipは、指定した数だけ値を読み飛ばして、Takeは指定した数だけ値を通過させるイメージです。コード例を下記に示します。
// 値の発行元になるSubject var s = new Subject<int>(); // 最初の3つをスキップして、その後3つを通知する s.Skip(3).Take(3).Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("OnCompleted()")); // 1-10の値を流し込む Observable.Range(1, 10).ForEach(i => s.OnNext(i));
Skip(3)をしてTake(3)をしているところに1〜10の値を発行しているので4, 5, 6がSubscribeしているところに通知されます。実行結果を下記に示します。
OnNext(4) OnNext(5) OnNext(6) OnCompleted()
注目するのは、6が発行されたあとにOnCompletedが呼ばれている点です。Take(3)で値が3つ通過した後には、もう値が発行されないので、完了通知が発行されます。
Repeatメソッドとの組み合わせ
SkipとTakeの動作自体は非常にシンプルなので、これにRepeatを組み合わせてみようと思います。Repeat自体は以前、紹介しているとおり、指定した回数繰り返しを行うメソッドです。SkipとTakeと組み合わせることで下記のような動作をさせることが出来ます。コードを下記に示します。
// 値の発行元になるSubject var s = new Subject<int>(); // 最初の3つをスキップして、その後3つを通知することを繰り返す s.Skip(3).Take(3).Repeat().Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("OnCompleted()")); // 1-20の値を流し込む Observable.Range(1, 20).ForEach(i => s.OnNext(i));
先ほどのコードのTakeの後にRepeatを追加しています。引数の無いRepeatメソッドは、無限に繰り返しを行います。どのような動きになるのか実行結果を下記に示します。
OnNext(4) <- 最初のSkip(3).Take(3) OnNext(5) <- 最初のSkip(3).Take(3) OnNext(6) <- 最初のSkip(3).Take(3) OnNext(10) <- 2回目のSkip(3).Take(3) OnNext(11) <- 2回目のSkip(3).Take(3) OnNext(12) <- 2回目のSkip(3).Take(3) OnNext(16) <- 3回目のSkip(3).Take(3) OnNext(17) <- 3回目のSkip(3).Take(3) OnNext(18) <- 3回目のSkip(3).Take(3)
Repeatを追加することで3つ値をスキップして3つ値を発行するという動作を繰り返しています。このようにIObservable
SkipWhileとTakeWhileメソッド
次は、SkipWhileとTakeWhileメソッドについてみていきます。このメソッドはFunc
// 値の発行元になるSubject var s = new Subject<int>(); // 発行される値が5より小さい間はスキップして10より小さい間は通過させる s.SkipWhile(i => i < 5).TakeWhile(i => i < 10).Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("OnCompleted()")); // 1-20の値を流し込む Console.WriteLine("1-20 OnNext start."); Observable.Range(1, 20).ForEach(i => s.OnNext(i)); Console.WriteLine("1-20 OnNext end."); Console.WriteLine();
5より小さい間はスキップして10より小さい間は通過させるので5,6,7,8,9がSubscribeまで通知されます。実行結果を下記に示します。
1-20 OnNext start. OnNext(5) OnNext(6) OnNext(7) OnNext(8) OnNext(9) OnCompleted() 1-20 OnNext end.
この例も、9が発行された段階で値が発行されなくなるためOnCompletedが通知されていることが確認できます。コードでは示しませんが、この例でもRepeatメソッドを組み合わせることで任意の回数この挙動を繰り返すことが出来ます。
SkipUntilとTakeUntilメソッド
次はLINQには定義されていないReactive Extensions固有のメソッドのSkipUntilとTakeUntilメソッドについてみていきます。SkipUntilとTakeUntilメソッドのシグネチャを下記に示します。
public IObservable<T> SkipUntil<T, TOther>(IObservable<TOther> other); public IObservable<T> TakeUntil<T, TOther>(IObservable<TOther> other);
どちらも引数にIObservable
// 値の発行元になるSubject var s = new Subject<int>(); // 値のスキップを辞めるきっかけ var startTrigger = new Subject<Unit>(); // 値を後続に流すことを辞めるきっかけ var endTrigger = new Subject<Unit>(); // startTriggerのOnNextが呼ばれるまで値をスキップしてendTriggerのOnNextが // 呼ばれるまで後続に値を流す。 s.SkipUntil(startTrigger).TakeUntil(endTrigger).Subscribe( i => Console.WriteLine("OnNext({0})", i), ex => Console.WriteLine("OnError({0})", ex.Message), () => Console.WriteLine("OnCompleted()")); // 1-5の値を流し込む Console.WriteLine("1-5 OnNext start."); Observable.Range(1, 5).ForEach(i => s.OnNext(i)); Console.WriteLine("1-5 OnNext end."); Console.WriteLine(); // startTriggerのOnNextを発行してから1-5の値を流し込む Console.WriteLine("startTrigger.OnNext called."); startTrigger.OnNext(Unit.Default); Console.WriteLine("1-5 OnNext start."); Observable.Range(1, 5).ForEach(i => s.OnNext(i)); Console.WriteLine("1-5 OnNext end."); Console.WriteLine(); // endTriggerのOnNextを発行してから1-5の値を流し込む Console.WriteLine("endTrigger.OnNext called."); endTrigger.OnNext(Unit.Default); Console.WriteLine("1-5 OnNext start."); Observable.Range(1, 5).ForEach(i => s.OnNext(i)); Console.WriteLine("1-5 OnNext end."); Console.WriteLine();
コードは長いですがポイントは、startTriggerがSkipUntilに渡すIObservable
1-5 OnNext start. 1-5 OnNext end. startTrigger.OnNext called. 1-5 OnNext start. OnNext(1) OnNext(2) OnNext(3) OnNext(4) OnNext(5) 1-5 OnNext end. endTrigger.OnNext called. OnCompleted() 1-5 OnNext start. 1-5 OnNext end.
startTriggerのOnNextを呼ぶ前に発行した値は全てスキップされて、startTriggerのOnNextを呼ぶと値が通知されてOnNext(1)〜OnNext(5)までが表示されています。endTriggerのOnNextを呼ぶとOnCompletedが呼ばれて終了状態になり、以降に発行された値はSubscribeまで通知が届いていないことが確認できます。
ということで
今回はSkip***とTake***系のメソッドを見ました。特にSkipUntilとTakeUntilは強力で非同期通信の処理結果を、途中で止めるためにTakeUntilを使用したり、マウスダウン、マウスアップ、マウスムーブのイベントを組み合わせてドラッグ中のみ値が発行されるIObservableを作成したりと応用が効きます。ということで、次回はそこらへんを見ていけたらいいなと思います。