読者です 読者をやめる 読者になる 読者になる

かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

非同期でのリクエスト処理をRxでということの取り組みのメモ その2

ASP.NET Reactive Extensions
追記
DelegateのBeginInvokeとEndInvoke使ってるので、ThreadPoolに無駄なスレッド作ってるのがいけてないです。
IOCPのうま味ゼロ?

前回の続きです。前回のはいまいち感がぬぐいきれなかったのですがなんとなく動いてました。ということでIObservableの拡張メソッドとして切り出して、重複のあった無駄なSubject系クラスをとっぱらってうんぬんかんぬんして以下のようになりました。

namespace WebApplication1
{
    using System;
    using System.Reactive.Linq;
    using System.Reactive.Subjects;
    using System.Threading;
    using System.Web.UI;

    public static class ObservableExtensions
    {
        /// <summary>
        /// IObservableのシーケンスの処理をASP.NETの非同期ページの仕組みの上で実行する
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="self"></param>
        /// <param name="page"></param>
        /// <returns></returns>
        public static IObservable<T> ToPreRenderAsync<T>(this IObservable<T> self, Page page)
        {
            var subject = new AsyncSubject<T>();
            Action action = () =>
            {
                using (var gate = new EventWaitHandle(false, EventResetMode.AutoReset))
                {
                    self.Subscribe(
                        subject.OnNext,
                        subject.OnError,
                        () =>
                        {
                            subject.OnCompleted();
                            gate.Set();
                        });
                    // 完了まで待機
                    gate.WaitOne();
                }
            };

            page.AddOnPreRenderCompleteAsync(
                (sender, e, callback, extraData) =>
                {
                    return action.BeginInvoke(callback, extraData);
                },
                ar =>
                {
                    try
                    {
                        action.EndInvoke(ar);
                    }
                    catch (Exception ex)
                    {
                        subject.OnError(ex);
                    }
                });

            return subject.Zip(
                Observable.FromEvent<EventHandler, EventArgs>(
                    h => (_, e) => h(e),
                    h => page.PreRenderComplete += h,
                    h => page.PreRenderComplete -= h).Take(1),
                    (x, _) => x);
        }

    }
}

使う側のWebページでは、こんな具合です。

using System;
using System.Reactive.Linq;
using System.Threading;

public partial class _Default : System.Web.UI.Page
{
    protected void Page_Load(object sender, EventArgs e)
    {
    }

    protected void Button1_Click(object sender, EventArgs e)
    {
        var dac = new Dac();
        dac.HeavyProcess()
            // 非同期ページの仕組みの上にのっける
            .ToPreRenderAsync(this)
            .Subscribe(i =>
            {
                // ここでUIに反映
                Label1.Text = i.ToString();
            });
    }
}

public class Dac
{
    // おも〜い処理
    public IObservable<int> HeavyProcess()
    {
        return Observable.ToAsync(() =>
        {
            Thread.Sleep(5000);
            return new Random().Next(100);
        }).Invoke();
    }
}

前よりスッキリ。でも、あれですよね。実行場所を変えるという意味ではSchedulerを実装するのが筋なのかな?