かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

Universal WIndows Platform appからセンサーの値を拾ってイベントハブに叩き込んでStream Analyticsで処理してSQL Databaseに突っ込む

これまでの集大成です。

okazuki.hatenablog.com

okazuki.hatenablog.com

okazuki.hatenablog.com

Unviersal Windows Platform appからBluetoothで接続したSensorTagから温度を拾ってきてAzureのイベントハブに叩き込みます。イベントハブに叩き込んだデータは、1分単位で平均値に集計してSQL Databaseに突っ込みます。センサーの値って無慈悲に飛んでくるので、ある程度の時間単位に丸めてからSQL Databaseに突っ込まないとあっという間にデータたまっていっちゃいますからね。

SQL Databaseの準備

普通にSQL Databaseを作って以下のようなテーブルを作りました。

CREATE TABLE [dbo].[SensorTags]
(
    [Id] INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
    [SensorId] NVARCHAR(256) NOT NULL, 
    [Value] REAL NOT NULL, 
    [StartTimestamp] DATETIME NOT NULL, 
    [EndTimestamp] DATETIME NOT NULL
)

EventHubとStreamAnalyticsの準備

okazukihub-nsという名前空間でokazukihubというイベントハブを作りました。SensorTagという名前で送信を受け付ける口も作っておきます。

次にokazukistreamという名前で、Stream Analyticsを作って入力にInputという名前で先ほどのokazukihubを紐づけて出力にOutputという名前でSQL DatabaseのSensorTagsテーブルを紐づけます。

クエリは以下のようなクエリにしました。

SELECT
    SensorId AS SensorId,
    AVG(Value) AS Value,
    MIN(Time) AS StartTimestamp,
    MAX(Time) AS EndTimestamp
FROM
    Input TIMESTAMP BY Time
GROUP BY TumblingWindow(minute, 1), SensorId

UWPの準備

UWPではセンサータグから値をとってきてデータに詰め込んでイベントハブに叩き込んでいます。データの入れ物になるクラスを準備します。

namespace App30
{
    internal class DeviceInfo
    {
        public string SensorId { get; set; }
        public double Value { get; set; }
        public string Time { get; set; }
    }
}

そして、ページに以下のコードを書きます。SensorTagからセンサーデータを読み込んで、読み込んだ値をイベントハブに突っ込んでいます。

public sealed partial class MainPage : Page
{
    private IRTemperatureSensor tempSensor;

    public MainPage()
    {
        this.InitializeComponent();
    }

    private string CreateToken(string resourceUri, string keyName, string key)
    {
        TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
        var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + 3600); //EXPIRES in 1h 
        string stringToSign = WebUtility.UrlEncode(resourceUri) + "\n" + expiry;
        var signature = ComputeSignature(stringToSign, key);
        var sasToken = String.Format(CultureInfo.InvariantCulture,
            "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}",
            WebUtility.UrlEncode(resourceUri), WebUtility.UrlEncode(signature), expiry, keyName);

        return sasToken;
    }

    public string ComputeSignature(string content, string key, BinaryStringEncoding encoding = BinaryStringEncoding.Utf8)
    {
        var algorithmProvider = MacAlgorithmProvider.OpenAlgorithm(MacAlgorithmNames.HmacSha256);
        var contentBuffer = CryptographicBuffer.ConvertStringToBinary(content, encoding);
        var keyBuffer = CryptographicBuffer.ConvertStringToBinary(key, encoding);
        var signatureKey = algorithmProvider.CreateKey(keyBuffer);
        var signedBuffer = CryptographicEngine.Sign(signatureKey, contentBuffer);
        return CryptographicBuffer.EncodeToBase64String(signedBuffer);
    }

    private async void Page_Loaded(object sender, RoutedEventArgs e)
    {
        this.tempSensor = new IRTemperatureSensor();
        this.tempSensor.SensorValueChanged += this.TemSensorValueChanged;
        try
        {
            await this.tempSensor.Initialize();
            await this.tempSensor.EnableSensor();
            await this.tempSensor.EnableNotifications();
        }
        catch(Exception ex)
        {
            Debug.WriteLine("Dispose: " + ex);
            this.tempSensor?.Dispose();
        }
    }

    private readonly string sensorId = Guid.NewGuid().ToString();

    private async void TemSensorValueChanged(object sender, SensorValueChangedEventArgs e)
    {
        var value = IRTemperatureSensor.CalculateAmbientTemperature(e.RawData, TemperatureScale.Celsius);
        var client = new HttpClient();
        var data = new DeviceInfo
        {
            SensorId = sensorId,
            Value = value,
            Time = e.Timestamp.ToString("yyyy-MM-ddTHH:mm:ss.fffffzzz")
        };
        var content = new ObjectContent<DeviceInfo>(data, new JsonMediaTypeFormatter());

        var sas = this.CreateToken("https://okazukihub-ns.servicebus.windows.net/okazukihub/messages", "SensorTag", "eNxUBXIsBlCPXG0aZ+Mm8bnyqr0RPVsbBNfhHc2DhZ0=");
        client.DefaultRequestHeaders.TryAddWithoutValidation("Authorization", sas);
        content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/atom+xml")
        {
            CharSet = "utf-8",
        };
        content.Headers.ContentType.Parameters.Add(new System.Net.Http.Headers.NameValueHeaderValue("type", "entry"));
        await client.PostAsync("https://okazukihub-ns.servicebus.windows.net/okazukihub/messages", content);

        Debug.WriteLine($"senddata: {data.SensorId}, {data.Value}, {data.Time}");
    }
}

実行して結果の確認

SensorTagとWindowsをペアリングしてアプリを暫く動かしているとStream Analyticsに出力イベントが上がってきます。SQL Databaseを確認すると以下のような感じでデータが投入されていました。

f:id:okazuki:20150829150944p:plain

これはいい感じですね。