かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Reactive ExtensionsのForkJoinは何処へ消えた?

(not yet) 101 Rx Samplesでも序盤のほうに出てきたり、id:neueccさんのBlogにも「それForkJoinでできるよ?」と暗に言ってるように聞こえるくらいバンバン出てきてたForkJoinですが、Rx-MainのStableでやってみるか!!と思ってみるとMSDN内のメソッドのドキュメントにもないしインテリセンスにも出てきませんorz
どういうこと?とみてみるとフォーラムにこんなスレッドが立ってました。

これによるとStable版ではなくて、Experimental版に入ってるとのこと。oh...。でも、このフォーラムの記事からリンクされてるフォーラムのスレッドの面白いこと面白いこと。

最初はTupleで値まとめるようなZipメソッド使ってしのいでる風なノリから、こんな感じの拡張メソッドでよくない?という風に興味深いコードがポコポコ出てきてます。ここら辺のコードになってくるといったい何をしてるんだ?と考えちゃいます。

/// <summary>
/// Multicasts the source observable using the provided subject Factory. 
/// Each subscription to the outer observable will start a new subscription to the source.
/// The outer return observable controls the lifetime of the subscription to the source.
/// (Similar to the Connect method on an IConnectableObservable, except with multiple source subscriptions)
/// The inner observable is the subject that has been created. 
/// (Equivalent to the Subscribe method on an IConnectableObservable)
/// </summary>
/// <typeparam name="TSource">The type of the source.</typeparam>
/// <typeparam name="TResult">The type of the result.</typeparam>
/// <param name="source">The source obsevable.</param>
/// <param name="subjectFactory">The subject factory.</param>
/// <returns>An observable whose lifetime is the same as the original source, and a subject which is immediately returned on subscription.</returns>
public static IObservable<IObservable<TResult>> MulticastEmbedded<TSource, TResult>(
    this IObservable<TSource> source, 
    Func<ISubject<TSource, TResult>> subjectFactory)
{
    return Observable.Create<IObservable<TResult>>(
        o =>
            {
                var s = subjectFactory();

                // Ensure that the subject is sent out prior to subscribing to the source
                o.OnNext(s);

                return source.Subscribe(
                    s.OnNext,
                    ex =>
                        {
                            // Ensure inner observable completes before us
                            s.OnError(ex);
                            o.OnError(ex);
                        },
                    () =>
                        {
                            // Ensure inner observable completes before us
                            s.OnCompleted();
                            o.OnCompleted();
                        });
            });
}

private static IObservable<TSource[]> ForkJoinWithStable3<TSource>(params IObservable<TSource>[] sources)
{
    return sources.Select(o => o.MulticastEmbedded(() => new AsyncSubject<TSource>()))
                    .Merge() // Subscribe to all the wrapped observables, which internally connects to the source
                    .Concat() // Subscribe to the AsyncSubject in order, to output the results
                    .ToArray()
                    .Where(a => a.Length == sources.Length);
}

う〜んパズル・・・。