読者です 読者をやめる 読者になる 読者になる

かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

AzureのEventHub -> Stream Analytics -> ServiceBus Queueのスルーパスでどれくらいのスピードなのか見てみた

@xin9leさんと話しをしてて、EventHubに突っ込んだデータがStream Analyticsが処理してQueueに突っ込んだ結果どれくらいで返ってくるものなのかという話しになりました。 なので、まぁ適当にプログラム組んでみました。

Service Bus名前空間の作成

まず、EventHubとQueueのServiceBusの名前空間を作成します。

今回はokazuki-servicebusという名前で作成しました。

f:id:okazuki:20151012220020p:plain

EventHubの作成

イベントハブ作ります。

okazuki-eventhubという名前で作りました。パーティション数とかは最小の値にしました。

f:id:okazuki:20151012220152p:plain

f:id:okazuki:20151012220159p:plain

共有アクセスポリシーをIOという名前でリッスンと送信で作ります。

f:id:okazuki:20151012222246p:plain

Queueの作成

キューを作ります。

okazuki-queueという名前で作りました。設定はよくわからんのでデフォルトで。

f:id:okazuki:20151012220434p:plain

f:id:okazuki:20151012220442p:plain

共有アクセスポリシーをIOという名前でリッスンと送信で作りました。

f:id:okazuki:20151012220556p:plain

接続文字列の取得

後でプログラムで使うので、EventHubとQueueのダッシュボードから作榮した共有アクセスポリシーに対する接続文字列を取得しておきます。

Stream Analyticsの作成

作成は適当に。

f:id:okazuki:20151012221029p:plain

入力を作ります。先ほど作ったイベントハブを使いたいのでデータストリームのイベントハブを選びます。共有アクセスポリシーもIOを選びます。

f:id:okazuki:20151012222447p:plain

あとはUTF-8のJSONを選んでおきます。

出力は、ServiceBus キューを選んで先ほど作ったQueueを選びます。共有アクセスポリシーはIOを選びます。

f:id:okazuki:20151012221508p:plain

次のページはJSONのUTF-8の改行区切りを選んで作成します。

クエリは右から左への垂れ流しです。

SELECT
    *
INTO
    [Output]
FROM
    [Input]

保存して実行しましょう。

プログラムの作成

NuGetから以下のものを参照に追加します。

  • WindowsAzure.ServiceBus
  • Newtonsoft.Json

そして、さくっとEventHubへのデータの送信とQueueからのデータの受信の処理を書きます。キーはさっき取得したやつを使います。

using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
using System;
using System.Text;

namespace ConsoleApplication11
{
    class Program
    {
        static void Main(string[] args)
        {
            var qc = QueueClient.CreateFromConnectionString(
                "Endpoint=sb://okazuki-servicebus.servicebus.windows.net/;SharedAccessKeyName=IO;SharedAccessKey=/yXU2i/FO5ZEVsijSscJGYXi5KPBRT4m10cU7y9pgu8=",
                "okazuki-queue");
            qc.OnMessage(x =>
            {
                var receiveData = JsonConvert.DeserializeObject<Data>(x.GetBody<string>());
                Console.WriteLine(DateTime.Now - receiveData.Time);
            });

            var ec = EventHubClient.CreateFromConnectionString(
                "Endpoint=sb://okazuki-servicebus.servicebus.windows.net/;SharedAccessKeyName=Input;SharedAccessKey=xbw5s/+EMgfb05abvHMKGe/7lto9+TIWlQ4v4zKOAK4=",
                "okazuki-eventhub");
            for (int i = 0; i < 10; i++)
            {
                var data = new EventData(
                    Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Data { Time = DateTime.Now })));
                ec.Send(data);
            }

            Console.ReadKey();
        }
    }

    class Data
    {
        public DateTime Time { get; set; }
    }
}

DateTimeつっこんだデータをEventHubに渡して、Queueで受け取ったときの現在時間と比較してます。

実行結果はこんな感じになりました。

f:id:okazuki:20151012222818p:plain

まぁ、クラウドにいって色々経由してる割にはいい感じ?