Исходный код вики Задача. Получение данных о смещении
Версия 2.13 от Alexandr Fokin на 2022/11/27 23:41
Скрыть последних авторов
| author | version | line-number | content |
|---|---|---|---|
| |
2.1 | 1 | |
| 2 | Задание: | ||
| 3 | Получение ифнормации о текущем состоянии параметров: | ||
| 4 | 1) Общее кол-во сообщений в topic | ||
| 5 | 2) Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup) | ||
| 6 | 3) Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup) | ||
| 7 | Опционально по каждому partition. | ||
| 8 | Данные по topic можно определить по сумме совокупности данных по каждом partition, принажлежащему данному topic. | ||
| 9 | |||
| 10 | https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161 | ||
| 11 | https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically | ||
| 12 | |||
| 13 | |||
| 14 | Решение: | ||
| 15 | Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition. | ||
| 16 | Данные по topic формируются через сумму данных всех partitions, входящих в него. | ||
| 17 | |||
| 18 | {{code language="c#"}} | ||
| 19 | //номер смещения последнего сообщения в очереди | ||
| 20 | consumer | ||
| 21 | .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout) | ||
| 22 | .High | ||
| 23 | |||
| 24 | //Получить активное смещение (последнее прочитанное сообщение) | ||
| 25 | consumer | ||
| 26 | .Committed( | ||
| 27 | new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) }, | ||
| 28 | ConnectionParamsEntity.ActionTimeout | ||
| 29 | ) | ||
| 30 | .FirstOrDefault() | ||
| 31 | .Offset | ||
| 32 | .Value; | ||
| 33 | {{/code}} |