かずきのBlog@hatena

すきな言語は C# + XAML の組み合わせ。Azure Functions も好き。最近は Go 言語勉強中。日本マイクロソフトで働いていますが、ここに書いていることは個人的なメモなので会社の公式見解ではありません。

IoT HubでEventProcessorHostを使ってメッセージを受信する

IEventProcessorを作って。

class EventProcessor : IEventProcessor
{
    private Stopwatch stopwatch;

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        stopwatch = Stopwatch.StartNew();
        return Task.FromResult(0);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        var convertedMessages = messages
            .Select(x => x.GetBytes())
            .Select(x => Encoding.UTF8.GetString(x))
            .Select(x => JsonConvert.DeserializeObject<ReceiveData>(x));

        foreach (var p in convertedMessages)
        {
            Console.WriteLine($"{p.Name} {p.Age}");
        }

        if (this.stopwatch.Elapsed.TotalMinutes >= 5)
        {
            await context.CheckpointAsync();
        }
    }
}

class ReceiveData
{
    public string Name { get; set; }
    public int Age { get; set; }
}

EventProcessorHostを作るだけ。基本的にEventHubと一緒です。messages/eventsが固定っぽいかな。

const string IoTHubConnectionString = "IoTHubの接続文字列"
static readonly string StorageConnectionString = "Storageの接続文字列"

static void Main()
{
    var eventProcessorHost = new EventProcessorHost(
        Guid.NewGuid().ToString(),
        "messages/events",
        EventHubConsumerGroup.DefaultGroupName,
        IoTHubConnectionString,
        StorageConnectionString,
        "messages-events");
    eventProcessorHost.RegisterEventProcessorAsync<EventProcessor>().Wait();

    var host = new JobHost();
    // The following code ensures that the WebJob will be running continuously
    host.RunAndBlock();
}