Исходный код вики Получение данных о смещении
Редактировал(а) Alexandr Fokin 2023/05/10 11:40
Последние авторы
| author | version | line-number | content |
|---|---|---|---|
| 1 | |Задание: | ||
| 2 | Получение информации о текущем состоянии параметров:((( | ||
| 3 | 1. Общее кол-во сообщений в topic | ||
| 4 | 1. Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup) | ||
| 5 | 1. Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup) | ||
| 6 | |||
| 7 | Опционально по каждому partition. | ||
| 8 | Данные по topic можно определить по сумме совокупности данных по каждом partition, принадлежащему данному topic. | ||
| 9 | ))) | ||
| 10 | |((( | ||
| 11 | Get Consumer lag using LibrdkafkaHandle.OutQueueLength | ||
| 12 | https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161 | ||
| 13 | |||
| 14 | Aug 2019 - Kafka Consumer Lag programmatically | ||
| 15 | https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically | ||
| 16 | ))) | ||
| 17 | |Решение: | ||
| 18 | Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition. | ||
| 19 | Данные по topic формируются через сумму данных всех partitions, входящих в него. | ||
| 20 | |{{code language="c#"}} //номер смещения последнего сообщения в очереди | ||
| 21 | consumer | ||
| 22 | .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout) | ||
| 23 | .High | ||
| 24 | |||
| 25 | //Получить активное смещение (последнее прочитанное сообщение) | ||
| 26 | consumer | ||
| 27 | .Committed( | ||
| 28 | new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) }, | ||
| 29 | ConnectionParamsEntity.ActionTimeout | ||
| 30 | ) | ||
| 31 | .FirstOrDefault() | ||
| 32 | .Offset | ||
| 33 | .Value;{{/code}} |