Редактировал(а) Alexandr Fokin 2023/05/10 11:40

Последние авторы
1 |Задание:
2 Получение информации о текущем состоянии параметров:(((
3 1. Общее кол-во сообщений в topic
4 1. Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup)
5 1. Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup)
6
7 Опционально по каждому partition.
8 Данные по topic можно определить по сумме совокупности данных по каждом partition, принадлежащему данному topic.
9 )))
10 |(((
11 Get Consumer lag using LibrdkafkaHandle.OutQueueLength
12 https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161
13
14 Aug 2019 - Kafka Consumer Lag programmatically
15 https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically
16 )))
17 |Решение:
18 Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition.
19 Данные по topic формируются через сумму данных всех partitions, входящих в него.
20 |{{code language="c#"}} //номер смещения последнего сообщения в очереди
21 consumer
22 .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout)
23 .High
24
25 //Получить активное смещение (последнее прочитанное сообщение)
26 consumer
27 .Committed(
28 new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) },
29 ConnectionParamsEntity.ActionTimeout
30 )
31 .FirstOrDefault()
32 .Offset
33 .Value;{{/code}}