Задача. Получение данных о смещении
Задание:
Получение ифнормации о текущем состоянии параметров:
1) Общее кол-во сообщений в topic
2) Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup)
3) Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup)
Опционально по каждому partition.
Данные по topic можно определить по сумме совокупности данных по каждом partition, принажлежащему данному topic.
https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161
https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically
Решение:
Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition.
Данные по topic формируются через сумму данных всех partitions, входящих в него.
consumer
.QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout)
.High
//Получить активное смещение (последнее прочитанное сообщение)
consumer
.Committed(
new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) },
ConnectionParamsEntity.ActionTimeout
)
.FirstOrDefault()
.Offset
.Value;