かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

MS AzureのEvent Hubsの使い方

IoTでデータ集めるときはWebAPIに直接叩き込むんじゃなくて、間にEventHubsを挟むとスケールしやすくていいぜHAHAHA。ということを聞いたのでぐぐってみたが、コード例があまりにも少ない…!!

まぁ英語読めばいいんですが、TOEICの点数が確率論の世界の人には辛い今日この頃。ということで自分用メモを残しておきます。

EventHubを作る

AzureのポータルのServiceBusのところで、名前空間を作ります。 イベントハブを使うにはSTANDARDじゃないといけないので、要注意(あとで変えれますが)。 ここでは、okazukiという名前で作成しました。

しばらく待っていると、名前空間が作成されます。名前空間ができたらEventHubを作成します。画面上部のイベントハブを選択して、画面左下の+ボタンで新しいイベントハブを作成します。入力項目は、イベントハブの名前だけです。ここではsampleにしました。

接続文字列の確認

ハブを作っただけでは何もできないので、ハブに送信や受信をするためのアクセスポリシーを作成します。構成からできます。ここでは送信とリッスンができるmessagingという名前のアクセスポリシーを作成しました。

この状態で接続文字列を表示すると、messagingへの接続文字列が表示されるのでこぴっておきます。

データの送信

EventHubClientを作成して、Sendメソッドを呼ぶだけです。非同期版のSendAsyncや一括送信するためのSendBatchなどもあります。

クライアントは接続文字列をイベントハブの名前から作れます。

// クライアント作って
var client = EventHubClient.CreateFromConnectionString(
    "Endpoint=sb://okazuki.servicebus.windows.net/;SharedAccessKeyName=messaging;SharedAccessKey=xHJuv7BQM1atBJfhKdmGI/SXUJjuANUVwo0D/Teq9WA=", 
    "sample");
Task.Run(() =>
{
    while (true)
    {
        // とりあえず延々と同じデータを投げ続ける
        var data = new EventData(new byte[] { 1, 2, 3 });
        client.Send(data);
    }
});

データの受信

データの受信には、ICheckpointManagerを実装したクラス

class MyCheckpointManager : ICheckpointManager
{
    public Task CheckpointAsync(Lease lease, string offset, long sequenceNumber)
    {
        throw new NotImplementedException();
    }
}

class MyEventProcessor : IEventProcessor
{
    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        return Task.FromResult(0);
    }

    public Task OpenAsync(PartitionContext context)
    {
        return Task.FromResult(0);
    }

    public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var message in messages)
        {
            var data = message.GetBytes();
            Console.WriteLine("{0} {1} {2}", data[0], data[1], data[2]);
            Console.WriteLine(message.PartitionKey);
        }

        return Task.FromResult(0);
    }
}

CheckpointManagerはちょっと調べてないので謎だけど、空実装でいいみたいです。IEventProcessorが本命で、OpenAsyncで開始時、CloseAsyncで終了時、ProcessEventsAsyncでイベント受信時の処理をやります。ここでは単純にデータを表示しています。

あとは、これをPatition単位に登録してやるだけです。

var g = client.GetDefaultConsumerGroup();
foreach (var p in client.GetRuntimeInformation().PartitionIds)
{
    g.RegisterProcessor<MyEventProcessor>(new Lease { PartitionId = p }, new MyCheckpointManager());
}

簡単だ。