@xin9leさんと話しをしてて、EventHubに突っ込んだデータがStream Analyticsが処理してQueueに突っ込んだ結果どれくらいで返ってくるものなのかという話しになりました。 なので、まぁ適当にプログラム組んでみました。
Service Bus名前空間の作成
まず、EventHubとQueueのServiceBusの名前空間を作成します。
今回はokazuki-servicebusという名前で作成しました。
EventHubの作成
イベントハブ作ります。
okazuki-eventhubという名前で作りました。パーティション数とかは最小の値にしました。
共有アクセスポリシーをIOという名前でリッスンと送信で作ります。
Queueの作成
キューを作ります。
okazuki-queueという名前で作りました。設定はよくわからんのでデフォルトで。
共有アクセスポリシーをIOという名前でリッスンと送信で作りました。
接続文字列の取得
後でプログラムで使うので、EventHubとQueueのダッシュボードから作榮した共有アクセスポリシーに対する接続文字列を取得しておきます。
Stream Analyticsの作成
作成は適当に。
入力を作ります。先ほど作ったイベントハブを使いたいのでデータストリームのイベントハブを選びます。共有アクセスポリシーもIOを選びます。
あとはUTF-8のJSONを選んでおきます。
出力は、ServiceBus キューを選んで先ほど作ったQueueを選びます。共有アクセスポリシーはIOを選びます。
次のページはJSONのUTF-8の改行区切りを選んで作成します。
クエリは右から左への垂れ流しです。
SELECT * INTO [Output] FROM [Input]
保存して実行しましょう。
プログラムの作成
NuGetから以下のものを参照に追加します。
- WindowsAzure.ServiceBus
- Newtonsoft.Json
そして、さくっとEventHubへのデータの送信とQueueからのデータの受信の処理を書きます。キーはさっき取得したやつを使います。
using Microsoft.ServiceBus.Messaging; using Newtonsoft.Json; using System; using System.Text; namespace ConsoleApplication11 { class Program { static void Main(string[] args) { var qc = QueueClient.CreateFromConnectionString( "Endpoint=sb://okazuki-servicebus.servicebus.windows.net/;SharedAccessKeyName=IO;SharedAccessKey=/yXU2i/FO5ZEVsijSscJGYXi5KPBRT4m10cU7y9pgu8=", "okazuki-queue"); qc.OnMessage(x => { var receiveData = JsonConvert.DeserializeObject<Data>(x.GetBody<string>()); Console.WriteLine(DateTime.Now - receiveData.Time); }); var ec = EventHubClient.CreateFromConnectionString( "Endpoint=sb://okazuki-servicebus.servicebus.windows.net/;SharedAccessKeyName=Input;SharedAccessKey=xbw5s/+EMgfb05abvHMKGe/7lto9+TIWlQ4v4zKOAK4=", "okazuki-eventhub"); for (int i = 0; i < 10; i++) { var data = new EventData( Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new Data { Time = DateTime.Now }))); ec.Send(data); } Console.ReadKey(); } } class Data { public DateTime Time { get; set; } } }
DateTimeつっこんだデータをEventHubに渡して、Queueで受け取ったときの現在時間と比較してます。
実行結果はこんな感じになりました。
まぁ、クラウドにいって色々経由してる割にはいい感じ?