かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

非同期でのリクエスト処理をRxでということの取り組みのメモ

id:neueccさんとお話ししてて出てきた話題です。いまいちな感じはひしひしとしますが、とりあえず寝る前に今日書いたコードだけはっとこうと思う。aspxはボタンとラベルを置いただけ。

using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace WebApplication1
{
    public partial class _Default : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
        }

        protected void Button1_Click(object sender, EventArgs e)
        {
            // 重いIO処理のつもり
            Func<int> heavey = () =>
            {
                Thread.Sleep(5000);
                return new Random().Next(100);
            };

            PreRenderAsync(
                // ここらへんにRxのメソッドチェインが入るはず
                Observable.FromAsyncPattern<int>(heavey.BeginInvoke, heavey.EndInvoke).Invoke())
                .Subscribe(i =>
                {
                    Debug.WriteLine("Label set");
                    Label1.Text = i.ToString();
                });
            // 最終目標
            // Rxのメソッドチェイン.ObserveOn(new AsynRequestScheduler(this)).Subscribe(v =>
            // {
            //   レンダリング処理
            // });
        }

        private IObservable<T> PreRenderAsync<T>(IObservable<T> seq)
        {
            var source = new ReplaySubject<T>();
            this.AddOnPreRenderCompleteAsync(
                (sender, e, cb, extraData) =>
                {
                    Debug.WriteLine("OK1");
                    return new ObservableAsyncResult<T>(seq, cb);
                },
                ar =>
                {
                    Debug.WriteLine("OK2");
                    var oar = ar as ObservableAsyncResult<T>;
                    oar.Zip(Observable.FromEvent<EventHandler, EventArgs>(
                            h => (s, e) => h(e),
                            h => this.PreRenderComplete += h,
                            h => this.PreRenderComplete -= h).Take(1),
                            (x, y) =>
                            {
                                Debug.WriteLine("Render");
                                return x;
                            })
                        .Subscribe(v =>
                        {
                            Debug.WriteLine("OnNext: " + v);
                            source.OnNext(v);
                        },
                        ex =>
                        {
                            Debug.WriteLine("OnError");
                        },
                        () =>
                        {
                            Debug.WriteLine("OnCompleted");
                            source.OnCompleted();
                        });
                });
            return source.AsObservable();
        }
    }

    class ObservableAsyncResult<T> : IObservable<T>, IAsyncResult
    {
        private ReplaySubject<T> source = new ReplaySubject<T>();

        public ObservableAsyncResult(IObservable<T> seq, AsyncCallback callback)
        {
            seq.Subscribe(source.OnNext,
                source.OnError,
                () =>
                {
                    source.OnCompleted();
                    callback(this);
                });
        }

        #region IAsyncResult メンバー

        public T AsyncState
        {
            get;
            set;
        }

        object IAsyncResult.AsyncState
        {
            get { return this.AsyncState; }
        }

        public System.Threading.WaitHandle AsyncWaitHandle
        {
            get;
            set;
        }

        public bool CompletedSynchronously
        {
            get;
            set;
        }

        public bool IsCompleted
        {
            get;
            set;
        }

        #endregion

        #region IObservable<T> メンバー

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return this.source.Subscribe(observer);
        }

        #endregion
    }

}

無駄なコードが多いので本気でメモブログ!間違っても本番用に引用しないようにね!