かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

非同期でのリクエスト処理をRxでということの取り組みのメモ

id:neueccさんとお話ししてて出てきた話題です。いまいちな感じはひしひしとしますが、とりあえず寝る前に今日書いたコードだけはっとこうと思う。aspxはボタンとラベルを置いただけ。

using System;
using System.Diagnostics;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;

namespace WebApplication1
{
    public partial class _Default : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {
        }

        protected void Button1_Click(object sender, EventArgs e)
        {
            // 重いIO処理のつもり
            Func<int> heavey = () =>
            {
                Thread.Sleep(5000);
                return new Random().Next(100);
            };

            PreRenderAsync(
                // ここらへんにRxのメソッドチェインが入るはず
                Observable.FromAsyncPattern<int>(heavey.BeginInvoke, heavey.EndInvoke).Invoke())
                .Subscribe(i =>
                {
                    Debug.WriteLine("Label set");
                    Label1.Text = i.ToString();
                });
            // 最終目標
            // Rxのメソッドチェイン.ObserveOn(new AsynRequestScheduler(this)).Subscribe(v =>
            // {
            //   レンダリング処理
            // });
        }

        private IObservable<T> PreRenderAsync<T>(IObservable<T> seq)
        {
            var source = new ReplaySubject<T>();
            this.AddOnPreRenderCompleteAsync(
                (sender, e, cb, extraData) =>
                {
                    Debug.WriteLine("OK1");
                    return new ObservableAsyncResult<T>(seq, cb);
                },
                ar =>
                {
                    Debug.WriteLine("OK2");
                    var oar = ar as ObservableAsyncResult<T>;
                    oar.Zip(Observable.FromEvent<EventHandler, EventArgs>(
                            h => (s, e) => h(e),
                            h => this.PreRenderComplete += h,
                            h => this.PreRenderComplete -= h).Take(1),
                            (x, y) =>
                            {
                                Debug.WriteLine("Render");
                                return x;
                            })
                        .Subscribe(v =>
                        {
                            Debug.WriteLine("OnNext: " + v);
                            source.OnNext(v);
                        },
                        ex =>
                        {
                            Debug.WriteLine("OnError");
                        },
                        () =>
                        {
                            Debug.WriteLine("OnCompleted");
                            source.OnCompleted();
                        });
                });
            return source.AsObservable();
        }
    }

    class ObservableAsyncResult<T> : IObservable<T>, IAsyncResult
    {
        private ReplaySubject<T> source = new ReplaySubject<T>();

        public ObservableAsyncResult(IObservable<T> seq, AsyncCallback callback)
        {
            seq.Subscribe(source.OnNext,
                source.OnError,
                () =>
                {
                    source.OnCompleted();
                    callback(this);
                });
        }

        #region IAsyncResult メンバー

        public T AsyncState
        {
            get;
            set;
        }

        object IAsyncResult.AsyncState
        {
            get { return this.AsyncState; }
        }

        public System.Threading.WaitHandle AsyncWaitHandle
        {
            get;
            set;
        }

        public bool CompletedSynchronously
        {
            get;
            set;
        }

        public bool IsCompleted
        {
            get;
            set;
        }

        #endregion

        #region IObservable<T> メンバー

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return this.source.Subscribe(observer);
        }

        #endregion
    }

}

無駄なコードが多いので本気でメモブログ!間違っても本番用に引用しないようにね!