Задача. Получение данных о смещении

Версия 2.3 от Alexandr Fokin на 2020/08/23 13:15

Задание:
Получение ифнормации о текущем состоянии параметров:
1) Общее кол-во сообщений в topic
2) Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup)
3) Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup)
Опционально по каждому partition.
Данные по topic можно определить по сумме совокупности данных по каждом partition, принажлежащему данному topic.

https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161
https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically

Решение:
Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition.
Данные по topic формируются через сумму данных всех partitions, входящих в него.

  //номер смещения последнего сообщения в очереди
 consumer
    .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout)
    .High

 //Получить активное смещение (последнее прочитанное сообщение)
 consumer
    .Committed(
     new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) },
      ConnectionParamsEntity.ActionTimeout
    )
    .FirstOrDefault()
    .Offset
    .Value;