かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

非同期でのリクエスト処理をRxでということの取り組みのメモ その2

追記
DelegateのBeginInvokeとEndInvoke使ってるので、ThreadPoolに無駄なスレッド作ってるのがいけてないです。
IOCPのうま味ゼロ?

前回の続きです。前回のはいまいち感がぬぐいきれなかったのですがなんとなく動いてました。ということでIObservableの拡張メソッドとして切り出して、重複のあった無駄なSubject系クラスをとっぱらってうんぬんかんぬんして以下のようになりました。

namespace WebApplication1
{
    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading;
    using System.Web.UI;

    public static class ObservableExtensions
    {
        /// <summary>
        /// IObservableのシーケンスの処理をASP.NETの非同期ページの仕組みの上で実行する
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="self"></param>
        /// <param name="page"></param>
        /// <returns></returns>
        public static IObservable<T> ToPreRenderAsync<T>(this IObservable<T> self, Page page)
        {
            var subject = new AsyncSubject<T>();
            Action action = () =>
            {
                using (var gate = new EventWaitHandle(false, EventResetMode.AutoReset))
                {
                    self.Subscribe(
                        subject.OnNext,
                        subject.OnError,
                        () =>
                        {
                            subject.OnCompleted();
                            gate.Set();
                        });
                    // 完了まで待機
                    gate.WaitOne();
                }
            };

            page.AddOnPreRenderCompleteAsync(
                (sender, e, callback, extraData) =>
                {
                    return action.BeginInvoke(callback, extraData);
                },
                ar =>
                {
                    try
                    {
                        action.EndInvoke(ar);
                    }
                    catch (Exception ex)
                    {
                        subject.OnError(ex);
                    }
                });

            return subject.Zip(
                Observable.FromEvent<EventHandler, EventArgs>(
                    h => (_, e) => h(e),
                    h => page.PreRenderComplete += h,
                    h => page.PreRenderComplete -= h).Take(1),
                    (x, _) => x);
        }

    }
}

使う側のWebページでは、こんな具合です。

using System;
using System.Reactive.Linq;
using System.Threading;

public partial class _Default : System.Web.UI.Page
{
    protected void Page_Load(object sender, EventArgs e)
    {
    }

    protected void Button1_Click(object sender, EventArgs e)
    {
        var dac = new Dac();
        dac.HeavyProcess()
            // 非同期ページの仕組みの上にのっける
            .ToPreRenderAsync(this)
            .Subscribe(i =>
            {
                // ここでUIに反映
                Label1.Text = i.ToString();
            });
    }
}

public class Dac
{
    // おも〜い処理
    public IObservable<int> HeavyProcess()
    {
        return Observable.ToAsync(() =>
        {
            Thread.Sleep(5000);
            return new Random().Next(100);
        }).Invoke();
    }
}

前よりスッキリ。でも、あれですよね。実行場所を変えるという意味ではSchedulerを実装するのが筋なのかな?