Исходный код вики Получение данных о смещении
Редактировал(а) Alexandr Fokin 2023/05/10 11:40
Последние авторы
author | version | line-number | content |
---|---|---|---|
1 | |Задание: | ||
2 | Получение информации о текущем состоянии параметров:((( | ||
3 | 1. Общее кол-во сообщений в topic | ||
4 | 1. Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup) | ||
5 | 1. Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup) | ||
6 | |||
7 | Опционально по каждому partition. | ||
8 | Данные по topic можно определить по сумме совокупности данных по каждом partition, принадлежащему данному topic. | ||
9 | ))) | ||
10 | |((( | ||
11 | Get Consumer lag using LibrdkafkaHandle.OutQueueLength | ||
12 | https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161 | ||
13 | |||
14 | Aug 2019 - Kafka Consumer Lag programmatically | ||
15 | https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically | ||
16 | ))) | ||
17 | |Решение: | ||
18 | Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition. | ||
19 | Данные по topic формируются через сумму данных всех partitions, входящих в него. | ||
20 | |{{code language="c#"}} //номер смещения последнего сообщения в очереди | ||
21 | consumer | ||
22 | .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout) | ||
23 | .High | ||
24 | |||
25 | //Получить активное смещение (последнее прочитанное сообщение) | ||
26 | consumer | ||
27 | .Committed( | ||
28 | new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) }, | ||
29 | ConnectionParamsEntity.ActionTimeout | ||
30 | ) | ||
31 | .FirstOrDefault() | ||
32 | .Offset | ||
33 | .Value;{{/code}} |