読者です 読者をやめる 読者になる 読者になる

かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive ExtensionsでRateLimitを実装してみる

こんな感じ??Queueにいったん詰め込んで、Scheduler上でどんどん処理していく感じ。 1つ処理をするとカウントアップして、指定時間後にカウントダウンする。

static class RateLimitExtensions
{
    public static IObservable<T> RateLimit<T>(this IObservable<T> self, TimeSpan span, int threshold)
    {
        return self.RateLimit(span, threshold, Scheduler.Default);
    }

    public static IObservable<T> RateLimit<T>(this IObservable<T> self, TimeSpan span, int threshold, IScheduler scheduler)
    {
        var queue = new Queue<T>();
        var current = 0;
        var d = new CompositeDisposable();

        return Observable.Create<T>(ox =>
        {
            Action processQueue = null;
            processQueue = () =>
            {
                if (d.IsDisposed)
                {
                    return;
                }

                if (queue.Count != 0)
                {
                    lock (queue)
                    {
                        while (queue.Count != 0 && current < threshold)
                        {
                            ox.OnNext(queue.Dequeue());
                            Interlocked.Increment(ref current);
                            Observable.Return(0, scheduler)
                                .Delay(span)
                                .Subscribe(_ =>
                                {
                                    Interlocked.Decrement(ref current);
                                });
                        }
                    }
                }
                if (queue.Count != 0)
                {
                    // 100ms後にもう一度Queueの中身を吐き出してみる
                    scheduler.Schedule(TimeSpan.FromMilliseconds(100), processQueue);
                }
            };

            scheduler.Schedule(processQueue);
            d.Add(self.Subscribe(x =>
            {
                lock (queue)
                {
                    queue.Enqueue(x);
                }
                scheduler.Schedule(processQueue);
            }));

            return d;
        });
    }
}

どうだろう??

こんな感じで使う

var s = new Subject<int>();
// 1分間に15回まで
var d = s.RateLimit(TimeSpan.FromMinutes(1), 15)
    .Subscribe(Console.WriteLine);

for (int i = 0; i < 10000; i++)
{
    s.OnNext(i);
}