かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Reactive ExtensionsでRateLimitを実装してみる

こんな感じ??Queueにいったん詰め込んで、Scheduler上でどんどん処理していく感じ。 1つ処理をするとカウントアップして、指定時間後にカウントダウンする。

static class RateLimitExtensions
{
    public static IObservable<T> RateLimit<T>(this IObservable<T> self, TimeSpan span, int threshold)
    {
        return self.RateLimit(span, threshold, Scheduler.Default);
    }

    public static IObservable<T> RateLimit<T>(this IObservable<T> self, TimeSpan span, int threshold, IScheduler scheduler)
    {
        var queue = new Queue<T>();
        var current = 0;
        var d = new CompositeDisposable();

        return Observable.Create<T>(ox =>
        {
            Action processQueue = null;
            processQueue = () =>
            {
                if (d.IsDisposed)
                {
                    return;
                }

                if (queue.Count != 0)
                {
                    lock (queue)
                    {
                        while (queue.Count != 0 && current < threshold)
                        {
                            ox.OnNext(queue.Dequeue());
                            Interlocked.Increment(ref current);
                            Observable.Return(0, scheduler)
                                .Delay(span)
                                .Subscribe(_ =>
                                {
                                    Interlocked.Decrement(ref current);
                                });
                        }
                    }
                }
                if (queue.Count != 0)
                {
                    // 100ms後にもう一度Queueの中身を吐き出してみる
                    scheduler.Schedule(TimeSpan.FromMilliseconds(100), processQueue);
                }
            };

            scheduler.Schedule(processQueue);
            d.Add(self.Subscribe(x =>
            {
                lock (queue)
                {
                    queue.Enqueue(x);
                }
                scheduler.Schedule(processQueue);
            }));

            return d;
        });
    }
}

どうだろう??

こんな感じで使う

var s = new Subject<int>();
// 1分間に15回まで
var d = s.RateLimit(TimeSpan.FromMinutes(1), 15)
    .Subscribe(Console.WriteLine);

for (int i = 0; i < 10000; i++)
{
    s.OnNext(i);
}