Изменения документа Получение данных о смещении
Редактировал(а) Alexandr Fokin 2023/05/10 11:40
От версии 1.1
отредактировано Alexandr Fokin
на 2020/08/17 11:43
на 2020/08/17 11:43
Изменить комментарий:
К данной версии нет комментариев
К версии 2.16
отредактировано Alexandr Fokin
на 2023/05/10 11:40
на 2023/05/10 11:40
Изменить комментарий:
К данной версии нет комментариев
Сводка
-
Свойства страницы (3 изменено, 0 добавлено, 0 удалено)
Подробности
- Свойства страницы
-
- Название
-
... ... @@ -1,1 +1,1 @@ 1 - Задача.Получение данных о смещении1 +Получение данных о смещении - Родительский документ
-
... ... @@ -1,1 +1,1 @@ 1 -Разработка.NET. Библиотеки.Confluentinc\.Confluent-kafka-dotnet.WebHome1 +Разработка.NET.Работа с брокерами сообщений.Confluentinc\. Confluent-kafka-dotnet.Сценарии.WebHome - Содержимое
-
... ... @@ -1,0 +1,33 @@ 1 +|Задание: 2 +Получение информации о текущем состоянии параметров:((( 3 +1. Общее кол-во сообщений в topic 4 +1. Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup) 5 +1. Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup) 6 + 7 +Опционально по каждому partition. 8 +Данные по topic можно определить по сумме совокупности данных по каждом partition, принадлежащему данному topic. 9 +))) 10 +|((( 11 +Get Consumer lag using LibrdkafkaHandle.OutQueueLength 12 +https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161 13 + 14 +Aug 2019 - Kafka Consumer Lag programmatically 15 +https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically 16 +))) 17 +|Решение: 18 +Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition. 19 +Данные по topic формируются через сумму данных всех partitions, входящих в него. 20 +|{{code language="c#"}} //номер смещения последнего сообщения в очереди 21 + consumer 22 + .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout) 23 + .High 24 + 25 + //Получить активное смещение (последнее прочитанное сообщение) 26 + consumer 27 + .Committed( 28 + new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) }, 29 + ConnectionParamsEntity.ActionTimeout 30 + ) 31 + .FirstOrDefault() 32 + .Offset 33 + .Value;{{/code}}