かずきのBlog@hatena

日本マイクロソフトに勤めています。XAML + C#の組み合わせをメインに、たまにASP.NETやJavaなどの.NET系以外のことも書いています。掲載内容は個人の見解であり、所属する企業を代表するものではありません。

IoT HubでEventProcessorHostを使ってメッセージを受信する

IEventProcessorを作って。

class EventProcessor : IEventProcessor
{
    private Stopwatch stopwatch;

    public async Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    public Task OpenAsync(PartitionContext context)
    {
        stopwatch = Stopwatch.StartNew();
        return Task.FromResult(0);
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        var convertedMessages = messages
            .Select(x => x.GetBytes())
            .Select(x => Encoding.UTF8.GetString(x))
            .Select(x => JsonConvert.DeserializeObject<ReceiveData>(x));

        foreach (var p in convertedMessages)
        {
            Console.WriteLine($"{p.Name} {p.Age}");
        }

        if (this.stopwatch.Elapsed.TotalMinutes >= 5)
        {
            await context.CheckpointAsync();
        }
    }
}

class ReceiveData
{
    public string Name { get; set; }
    public int Age { get; set; }
}

EventProcessorHostを作るだけ。基本的にEventHubと一緒です。messages/eventsが固定っぽいかな。

const string IoTHubConnectionString = "IoTHubの接続文字列"
static readonly string StorageConnectionString = "Storageの接続文字列"

static void Main()
{
    var eventProcessorHost = new EventProcessorHost(
        Guid.NewGuid().ToString(),
        "messages/events",
        EventHubConsumerGroup.DefaultGroupName,
        IoTHubConnectionString,
        StorageConnectionString,
        "messages-events");
    eventProcessorHost.RegisterEventProcessorAsync<EventProcessor>().Wait();

    var host = new JobHost();
    // The following code ensures that the WebJob will be running continuously
    host.RunAndBlock();
}