かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

ReactiveProperty v1.0.2をリリースしました

間にv1.0.1も挟んでますが、こんな感じの変更をしています。

ReadOnlyReactiveCollectionの生成を簡略化

ObservableCollectionからReadOnlyReactiveCollectionを作ろうと思うと今まで以下のようなコードが必要でした。

var models = new ObservableCollection<Model>();

var changes = models
    .ToCollectionChanged()
    .Select(x => new CollectionChanged<ViewModel>
    {
        Action = x.Action,
        Index = x.Index,
        Value = x.Value == null ? null : new ViewModel(x.Value)
    });
var source = new ObservableCollection<ViewModel>(
    models.Select(m => new ViewModel(m));

var viewModels = new ReadOnlyReactiveCollection<ViewModel>(changes, source);

めんどくさいので下のように書けるようにしました。

var models = new ObservableCollection<Model>();
var viewModels = models.ToReadOnlyReactiveCollection(m => new ViewModel(m));

IDisposableExtensions.AddToメソッドに戻り値を追加

地味にReactivePropertyやReadOnlyReactiveCollectionやReactiveCommandはIDisposableを実装しています。こいつらは、時がきたらDisposeを呼んでやる必要があります。個別にDisposeを呼ぶのがだるいので、CompositeDisposableあたりに纏めておいて呼ぶのが楽なので、今までこんな感じに書いてました。

var d = new CompositeDisposable();
var p1 = ...何か重いIObservable<T>...().ToReactiveProperty();
p1.AddTo(d);

// 不要になったタイミングで
d.Dispose();

AddTo拡張メソッドの戻り値がvoidだったので、メソッドチェインの中に入れなかったのですが、AddTo拡張メソッドの戻り値で第一引数を戻り値として返すようにしました。なので、以下のように破棄予定のReactiveProperty等はメソッドチェイン内でCompositeDisposableに追加できるようになりました。

var d = new CompositeDisposable();
var p1 = ...何か重いIObservable<T>...().ToReactiveProperty().AddTo(d);

// 不要になったタイミングで
d.Dispose();

接続中のみ処理したい…!

ほんとは、whileループの中はHttpClient使った処理あたり・・・?う~ん、無理やり感があるかなぁ?というメモ。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var connectionSubject = new Subject<bool>();
            var connectObservable = connectionSubject.Where(b => b).AsObservable();
            var disconnectObservable = connectionSubject.Where(b => !b).AsObservable();
            var src = connectObservable.SelectMany(_ =>
                {
                    return Observable.Create<string>(async o =>
                    {
                        Console.WriteLine("connection start!");
                        var connected = true;
                        while (connected)
                        {
                            o.OnNext("foo");
                            await Task.Delay(1000);
                            o.OnNext("ooo");
                            await Task.Delay(1000);
                        }
                    });
                })
                .TakeUntil(disconnectObservable)
                .Repeat();

            src.ObserveOn(Scheduler.Default).Subscribe(Console.WriteLine);
            while (true)
            {
                Console.ReadKey();
                Console.WriteLine("OnNext(true)");
                connectionSubject.OnNext(true);
                Console.ReadKey();
                Console.WriteLine("OnNext(false)");
                connectionSubject.OnNext(false);
            }
        }
    }
}

連続して発生するイベントをなんとかしたい

Reactive Extensionsを使えば、楽勝なんですけどね…でも、Reactive Extensionsの導入を許されてない悲しい職場もありますよね。Reactive Extensionsが使える時はおとなしくThrottleメソッドを使ってましょう。

そうじゃない場合…あまり考えたくないけど.NET Framework 4.5時代になると単純なケースの場合は、そんなに悲しまなくてもいいかも。

private DateTimeOffset movedTime;
private async void Hogehoge_Moved()
{
    this.movedTime = DateTimeOffset.Now;
    await Task.Delay(200);
    if (DateTimeOffset.Now - this.movedTime < TimeSpan.FromMilliseconds(150))
    {
        return;
    }

    // 何かやる
}

こんな具合でいけるかな?

Taskだけなら・・・?

これでもいいのだろうか?(未確認)

private DateTimeOffset movedTime;
private void Hogehoge_Moved()
{
    this.movedTime = DateTimeOffset.Now;
    Task.Factory.StartNew(() =>
    {
        Thread.Sleep(200);
    }).ContinueWith(_ =>
    {
        if (DateTimeOffset.Now - this.movedTime < TimeSpan.FromMilliseconds(150))
        {
            return;
        }
        // 何かやる
    }, TaskScheduler.FromCurrentSynchronizationContext());
}

Reactive Extensions再入門 その48「連続して発生する値の監視」

過去記事インデックス

はじめに

しばらく間があいたRx関連の記事ですがぼっちぼっち思い出したようにやっていこうと思います。Rx2.0の正式版が出たら、また加速するかも?ということで、今回はRxを使ったら簡単にこんな処理かけるよ!というのをやってみようと思います。Rx使わない版はだれか書いてくれるとうれしいな♪

連続して発生する値の監視

下記のようなケースを考えます。

0〜1秒間隔で0〜9の値を発行するものがあります。発行された値を1秒間隔で10秒単位にまとめて発行された値の総和が100を超えた場合は異常と表示して、100より小さい場合は平常と表示しなさい。

非常に単純な例ですが、時間がからんでくるので非常に厄介です。Reactive Extensionsは、このような時間とともに発生する値を処理するのが非常に得意です。コード例を下記に示します。

// ダミーデータ作成メソッド
static IEnumerable<int> GetSource()
{
    var r = new Random();
    while (true)
    {
        yield return r.Next(10);
        Thread.Sleep(r.Next(1000));
    }
}

まずは、要件を満たす値を生成するメソッドを定義しました。これをベースに処理を書いていきます。

var subscriber = GetSource()
    // 別スレッドで値を発行するIObservableのシーケンスにする
    .ToObservable(Scheduler.NewThread)
    // 1秒間隔で10秒間値をためる
    .Window(TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(1))
    // 10秒ためた値の合計をとって
    .SelectMany(o => o.Sum())
    // 異常か正常か判断
    .Select(i => new { Value = i, Message = i < 100 ? "正常" : "異常" })
    // 結果を表示
    .Subscribe(v => 
        Console.WriteLine("{0:HH:mm:ss.fff} {1}", DateTime.Now, v));

// Enterを押すまで待機
Console.ReadLine();
// 購読解除
subscriber.Dispose();

1秒間隔で10秒間の値を抜き出すという厄介な処理は、Reactive ExtensionsではWindowメソッドの時間を指定するメソッドで一発で行えます。そのあとSelectManyで合計に変換して異常か正常か判定して表示しています。実行結果を以下に示します。

22:00:06.758 { Value = 84, Message = 正常 }
22:00:07.771 { Value = 85, Message = 正常 }
22:00:08.785 { Value = 96, Message = 正常 }
22:00:09.799 { Value = 99, Message = 正常 }
22:00:10.813 { Value = 95, Message = 正常 }
22:00:11.827 { Value = 102, Message = 異常 }
22:00:12.841 { Value = 106, Message = 異常 }
22:00:13.855 { Value = 88, Message = 正常 }
22:00:14.869 { Value = 92, Message = 正常 }
22:00:15.883 { Value = 109, Message = 異常 }

期待通りに動いていることが確認できます。Reactive Extensionsを使うと受け取った値に対してリアルタイムに集計して判定を行い結果を表示するという処理がとてもシンプルに記述できます。

FileSystemWatcherの連続して発生する同じ種類のイベントを無視る

id:trapemiyaさんのBlog記事のFileSystemWatcherですが、こいつを扱うのは厄介らしいです。

ということでid:trapemiyaさんの対処方法はChangedイベント発生後10秒間は次のイベントを無視るというものでした。ちょっと対処方法は変わりますが、Rxを使うと10秒間イベントが発生しなかったら・・・といった方法での対応がさくっとできます。

using System;
using System.IO;
using System.Reactive.Linq;

namespace FileWatchTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // 監視対象のディレクトリ
            var dir = @"C:\Users\Kazuki\Documents\IISExpress";
            
            // とりあえず適当に監視する
            var watcher = new FileSystemWatcher(dir);
            watcher.Filter = "";
            watcher.NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.LastAccess | NotifyFilters.FileName | NotifyFilters.Security;
            
            // Rx本領発揮
            watcher.ChangedAsObservable()
                // 10秒待つ
                .Throttle(TimeSpan.FromSeconds(10))
                .Subscribe(e => Console.WriteLine("{0}: {1}.", e.FullPath, e.ChangeType));
            
            // 監視開始
            watcher.EnableRaisingEvents = true;
            // 待ち
            Console.ReadLine();
        }
    }

    static class FileSystemWatcherExtensions
    {
        // ChangedイベントをIObservable<TEventArgs>にするヘルパーメソッド
        public static IObservable<FileSystemEventArgs> ChangedAsObservable(this FileSystemWatcher self)
        {
            return Observable.FromEvent<FileSystemEventHandler, FileSystemEventArgs>(
                h => (_, e) => h(e),
                h => self.Changed += h,
                h => self.Changed -= h);
        }
    }
}

厳密に同じにしたかったら、IObservableのシーケンスに対して一度値を発行したら10秒間は値が発行されても無視るみたいな拡張メソッド定義してもよさげですね。
ビバRx!!

Hokuriku.NETのRxのセッション資料が公開されてます!

@xin9leさんのReactive Extensionsのセッション資料とコードが公開されてます。

Blogに埋め込まれたものを見るよりも、SkyDriveに飛んで(右下のフルサイズで見るボタン)見たほうがアニメーションも再生されてぐっといい感じです!!お勧めです。Reactive Extensionsのオーバービューみたいに全体像を理解できる内容になってると思います。Rxの最初の一歩におすすめします!