かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Microsoft AzureのEventHubに叩き込んだデータをStream Analyticsで加工してTableStorageで突っ込む

時代はIoTですよね。

ということで、データの受け口として推されてるっぽいEventHubからStream Analyticsを使ってTableStorageで出力してみたいと思います。

受け口の作成

EventHubを作成します。Azureのポータル(クラシック使い)からService BusのEventHubを作成します。

f:id:okazuki:20150826215621p:plain

InputHubという名前で作成しました。

f:id:okazuki:20150826215734p:plain

あとの設定はとりあえず適当で。

f:id:okazuki:20150826215824p:plain

とりあえずデータを投げこむための口を作っておきます。イベントハブの構成から共有アクセスポリシーで送信できる口を作ります。今回はInputという名前で作りました。

f:id:okazuki:20150826220608p:plain

出力先の作成

とりあえず適当にストレージサービスを作っておきます。

f:id:okazuki:20150826220005p:plain

Stream Analyticsの作成

こいつもサクサクと作っていきます。

f:id:okazuki:20150826220110p:plain

f:id:okazuki:20150826220332p:plain

入力の作成

そして、作成したら"入力"を作ります。入力のタブにいくと入力がないので作るか?というような感じのリンクがあるのでクリックします。EventHubを入力にするので、データストリームを選択します。

f:id:okazuki:20150826220818p:plain

イベントハブを選びます。

f:id:okazuki:20150826220903p:plain

先ほど作ったイベントハブを選択します。入力のエイリアスは、クエリの中で使うときのテーブル名みたいなものです。

f:id:okazuki:20150826221124p:plain

次のシリアル化とかのページはそのままJSONとUTF-8でいきます。

f:id:okazuki:20150826221256p:plain

出力の作成

次にStream Analyticsでの変換結果を出力するためのテーブルストレージの設定を行います。出力のタブに行くと、いろいろな出力先が選べますが、今回はテーブルストレージを選んで次へ行きます。

f:id:okazuki:20150826221405p:plain

テーブルストレージのアカウントを選んで出力先のテーブルを作成するようにします。エイリアスはクエリの中で使う名前です。

f:id:okazuki:20150826221611p:plain

そして、PartitionKeyとRowKeyに突っ込むデータを決めます。今回はDeviceIdとTimeという列をそれぞれ設定するようにしました。これは後で作成するクエリの中で作る列になります。

f:id:okazuki:20150826221736p:plain

クエリの作成

ついに本番のクエリ作成です。Stream AnalyticsはSQLライクな言語で入力を出力に変換できます。よく使う例は以下のページから参照できます。

azure.microsoft.com

クエリ言語のリファレンスはここになります。

Stream Analytics Query Language Reference

今回は、以下のようなデータを処理することを仮定します。

  • DeviceId : デバイスの識別子
  • Value : デバイスから送られてきた値
  • Time : デバイスから送られてきたデータの日時

これを、処理して10秒ごとの最大値を抜き取るようにしてみました。先ほどテーブルストレージの設定でDeviceIdとTimeという列をPartitionKeyとRowKeyに指定したので、最低でもその列を含む必要がある点に注意です。

SELECT
    DeviceId,
    Max(Value),
    System.Timestamp as Time
INTO
    [OkazukiOutput] 
FROM
    [OkazukiInput] TIMESTAMP BY Time
GROUP BY 
    DeviceId,
    TumblingWindow(second, 10)

このクエリを保存します。テスト用のJSONデータを作れば簡単にクエリのテストができるようになってる点ちょっと気に入りました。

ダッシュボードから開始を選択してStream Analyticsの処理を開始します。これをしてないと動かないので要注意。

f:id:okazuki:20150826222512p:plain

データを投げ込む処理の作成

コンソールアプリケーションを作成してEventHubにデータを投げ込むアプリを作ります。ここらへんめんどいですね。本番だとラズパイとかからセンサーデータ拾ってきてREST APIたたく感じになるんでしょうが、今回はEventHubの.NETのAPIを使ってさくっといきます。

EventHubでNuGetを検索してEventHubに必要なライブラリを参照します。

  • Microsoft.Azure.ServiceBus.EventProcessorHost

追加したら、以下のようなプログラムを書きます。詳細な説明は本題じゃないので省きます。接続文字列は、ポータルから接続情報でとってこれます。

using Microsoft.ServiceBus.Messaging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Json;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication8
{
    class Program
    {
        static void Main(string[] args)
        {
            var client = EventHubClient.CreateFromConnectionString(
                "Endpoint=sb://inputhub-ns.servicebus.windows.net/;SharedAccessKeyName=Input;SharedAccessKey=dHS+OgxrBH+brIruyum+vHJhvlF93eMQxnsiqAxLxjg=", 
                "inputhub");
            var random = new Random();
            while (true)
            {
                var d = new DeviceInfo
                {
                    DeviceId = random.Next(5),
                    Value = random.Next(10),
                    Time = DateTimeOffset.UtcNow.ToString("yyyy/MM/ddTHH:mm:ss.fffffffzzz"),
                };
                client.Send(new EventData(d, new DataContractJsonSerializer(typeof(DeviceInfo))));
            }
        }
    }

    public class DeviceInfo
    {
        public int DeviceId { get; set; }
        public int Value { get; set; }
        public string Time { get; set; }
    }
}

プログラムの実行

このプログラムを実行してイベントハブにデータを叩き込み始めます。しばらくするとStream Analyticsの監視タブで出力があったことが確認できるようになります。

f:id:okazuki:20150826223412p:plain

きりのいいところでプログラムを止めます。

アウトプットの確認

Azureのテーブルストレージが見れるツール(ここではVS使ってます)を使ってOkazukiOutputテーブルの中身を見ます。

f:id:okazuki:20150826223520p:plain

ちゃんとデータが入ってることが確認できます。

f:id:okazuki:20150826223645p:plain

感想

以外とお手軽なのでStream Analyticsのクエリでできる処理の範囲内なら使ってみるのもいいかもと思いました。