Версия 2.12 от Alexandr Fokin на 2022/11/27 23:41

Последние авторы
1
2 Задание:
3 Получение ифнормации о текущем состоянии параметров:
4 1) Общее кол-во сообщений в topic
5 2) Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup)
6 3) Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup)
7 Опционально по каждому partition.
8 Данные по topic можно определить по сумме совокупности данных по каждом partition, принажлежащему данному topic.
9
10 https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161
11 https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically
12
13
14 Решение:
15 Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition.
16 Данные по topic формируются через сумму данных всех partitions, входящих в него.
17
18 {{code language="c#"}}
19 //номер смещения последнего сообщения в очереди
20 consumer
21 .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout)
22 .High
23
24 //Получить активное смещение (последнее прочитанное сообщение)
25 consumer
26 .Committed(
27 new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) },
28 ConnectionParamsEntity.ActionTimeout
29 )
30 .FirstOrDefault()
31 .Offset
32 .Value;
33 {{/code}}