かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

MS AzureのEvent Hubsの使い方

IoTでデータ集めるときはWebAPIに直接叩き込むんじゃなくて、間にEventHubsを挟むとスケールしやすくていいぜHAHAHA。ということを聞いたのでぐぐってみたが、コード例があまりにも少ない…!!

まぁ英語読めばいいんですが、TOEICの点数が確率論の世界の人には辛い今日この頃。ということで自分用メモを残しておきます。

EventHubを作る

AzureのポータルのServiceBusのところで、名前空間を作ります。 イベントハブを使うにはSTANDARDじゃないといけないので、要注意(あとで変えれますが)。 ここでは、okazukiという名前で作成しました。

しばらく待っていると、名前空間が作成されます。名前空間ができたらEventHubを作成します。画面上部のイベントハブを選択して、画面左下の+ボタンで新しいイベントハブを作成します。入力項目は、イベントハブの名前だけです。ここではsampleにしました。

接続文字列の確認

ハブを作っただけでは何もできないので、ハブに送信や受信をするためのアクセスポリシーを作成します。構成からできます。ここでは送信とリッスンができるmessagingという名前のアクセスポリシーを作成しました。

この状態で接続文字列を表示すると、messagingへの接続文字列が表示されるのでこぴっておきます。

データの送信

EventHubClientを作成して、Sendメソッドを呼ぶだけです。非同期版のSendAsyncや一括送信するためのSendBatchなどもあります。

クライアントは接続文字列をイベントハブの名前から作れます。

// クライアント作って
var client = EventHubClient.CreateFromConnectionString(
    "Endpoint=sb://okazuki.servicebus.windows.net/;SharedAccessKeyName=messaging;SharedAccessKey=xHJuv7BQM1atBJfhKdmGI/SXUJjuANUVwo0D/Teq9WA=", 
    "sample");
Task.Run(() =>
{
    while (true)
    {
        // とりあえず延々と同じデータを投げ続ける
        var data = new EventData(new byte[] { 1, 2, 3 });
        client.Send(data);
    }
});

データの受信

データの受信には、ICheckpointManagerを実装したクラス

class MyCheckpointManager : ICheckpointManager
{
    public Task CheckpointAsync(Lease lease, string offset, long sequenceNumber)
    {
        throw new NotImplementedException();
    }
}

class MyEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        return Task.FromResult(0);
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult(0);
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var message in messages)
        {
            var data = message.GetBytes();
            Console.WriteLine("{0} {1} {2}", data[0], data[1], data[2]);
            Console.WriteLine(message.PartitionKey);
        }

        return Task.FromResult(0);
    }
}

CheckpointManagerはちょっと調べてないので謎だけど、空実装でいいみたいです。IEventProcessorが本命で、OpenAsyncで開始時、CloseAsyncで終了時、ProcessEventsAsyncでイベント受信時の処理をやります。ここでは単純にデータを表示しています。

あとは、これをPatition単位に登録してやるだけです。

var g = client.GetDefaultConsumerGroup();
foreach (var p in client.GetRuntimeInformation().PartitionIds)
{
    g.RegisterProcessor<MyEventProcessor>(new Lease { PartitionId = p }, new MyCheckpointManager());
}

簡単だ。