読者です 読者をやめる 読者になる 読者になる

かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

Universal WIndows Platform appからセンサーの値を拾ってイベントハブに叩き込んでStream Analyticsで処理してSQL Databaseに突っ込む

これまでの集大成です。

okazuki.hatenablog.com

okazuki.hatenablog.com

okazuki.hatenablog.com

Unviersal Windows Platform appからBluetoothで接続したSensorTagから温度を拾ってきてAzureのイベントハブに叩き込みます。イベントハブに叩き込んだデータは、1分単位で平均値に集計してSQL Databaseに突っ込みます。センサーの値って無慈悲に飛んでくるので、ある程度の時間単位に丸めてからSQL Databaseに突っ込まないとあっという間にデータたまっていっちゃいますからね。

SQL Databaseの準備

普通にSQL Databaseを作って以下のようなテーブルを作りました。

CREATE TABLE [dbo].[SensorTags]
(
    [Id] INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
    [SensorId] NVARCHAR(256) NOT NULL, 
    [Value] REAL NOT NULL, 
    [StartTimestamp] DATETIME NOT NULL, 
    [EndTimestamp] DATETIME NOT NULL
)

EventHubとStreamAnalyticsの準備

okazukihub-nsという名前空間でokazukihubというイベントハブを作りました。SensorTagという名前で送信を受け付ける口も作っておきます。

次にokazukistreamという名前で、Stream Analyticsを作って入力にInputという名前で先ほどのokazukihubを紐づけて出力にOutputという名前でSQL DatabaseのSensorTagsテーブルを紐づけます。

クエリは以下のようなクエリにしました。

SELECT
    SensorId AS SensorId,
    AVG(Value) AS Value,
    MIN(Time) AS StartTimestamp,
    MAX(Time) AS EndTimestamp
FROM
    Input TIMESTAMP BY Time
GROUP BY TumblingWindow(minute, 1), SensorId

UWPの準備

UWPではセンサータグから値をとってきてデータに詰め込んでイベントハブに叩き込んでいます。データの入れ物になるクラスを準備します。

namespace App30
{
    internal class DeviceInfo
    {
        public string SensorId { get; set; }
        public double Value { get; set; }
        public string Time { get; set; }
    }
}

そして、ページに以下のコードを書きます。SensorTagからセンサーデータを読み込んで、読み込んだ値をイベントハブに突っ込んでいます。

public sealed partial class MainPage : Page
{
    private IRTemperatureSensor tempSensor;

    public MainPage()
    {
        this.InitializeComponent();
    }

    private string CreateToken(string resourceUri, string keyName, string key)
    {
        TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);
        var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + 3600); //EXPIRES in 1h 
        string stringToSign = WebUtility.UrlEncode(resourceUri) + "\n" + expiry;
        var signature = ComputeSignature(stringToSign, key);
        var sasToken = String.Format(CultureInfo.InvariantCulture,
            "SharedAccessSignature sr={0}&sig={1}&se={2}&skn={3}",
            WebUtility.UrlEncode(resourceUri), WebUtility.UrlEncode(signature), expiry, keyName);

        return sasToken;
    }

    public string ComputeSignature(string content, string key, BinaryStringEncoding encoding = BinaryStringEncoding.Utf8)
    {
        var algorithmProvider = MacAlgorithmProvider.OpenAlgorithm(MacAlgorithmNames.HmacSha256);
        var contentBuffer = CryptographicBuffer.ConvertStringToBinary(content, encoding);
        var keyBuffer = CryptographicBuffer.ConvertStringToBinary(key, encoding);
        var signatureKey = algorithmProvider.CreateKey(keyBuffer);
        var signedBuffer = CryptographicEngine.Sign(signatureKey, contentBuffer);
        return CryptographicBuffer.EncodeToBase64String(signedBuffer);
    }

    private async void Page_Loaded(object sender, RoutedEventArgs e)
    {
        this.tempSensor = new IRTemperatureSensor();
        this.tempSensor.SensorValueChanged += this.TemSensorValueChanged;
        try
        {
            await this.tempSensor.Initialize();
            await this.tempSensor.EnableSensor();
            await this.tempSensor.EnableNotifications();
        }
        catch(Exception ex)
        {
            Debug.WriteLine("Dispose: " + ex);
            this.tempSensor?.Dispose();
        }
    }

    private readonly string sensorId = Guid.NewGuid().ToString();

    private async void TemSensorValueChanged(object sender, SensorValueChangedEventArgs e)
    {
        var value = IRTemperatureSensor.CalculateAmbientTemperature(e.RawData, TemperatureScale.Celsius);
        var client = new HttpClient();
        var data = new DeviceInfo
        {
            SensorId = sensorId,
            Value = value,
            Time = e.Timestamp.ToString("yyyy-MM-ddTHH:mm:ss.fffffzzz")
        };
        var content = new ObjectContent<DeviceInfo>(data, new JsonMediaTypeFormatter());

        var sas = this.CreateToken("https://okazukihub-ns.servicebus.windows.net/okazukihub/messages", "SensorTag", "eNxUBXIsBlCPXG0aZ+Mm8bnyqr0RPVsbBNfhHc2DhZ0=");
        client.DefaultRequestHeaders.TryAddWithoutValidation("Authorization", sas);
        content.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/atom+xml")
        {
            CharSet = "utf-8",
        };
        content.Headers.ContentType.Parameters.Add(new System.Net.Http.Headers.NameValueHeaderValue("type", "entry"));
        await client.PostAsync("https://okazukihub-ns.servicebus.windows.net/okazukihub/messages", content);

        Debug.WriteLine($"senddata: {data.SensorId}, {data.Value}, {data.Time}");
    }
}

実行して結果の確認

SensorTagとWindowsをペアリングしてアプリを暫く動かしているとStream Analyticsに出力イベントが上がってきます。SQL Databaseを確認すると以下のような感じでデータが投入されていました。

f:id:okazuki:20150829150944p:plain

これはいい感じですね。