Изменения документа Получение данных о смещении
Редактировал(а) Alexandr Fokin 2023/05/10 11:40
От версии 1.1
отредактировано Alexandr Fokin
на 2020/08/17 11:43
на 2020/08/17 11:43
Изменить комментарий:
К данной версии нет комментариев
К версии 2.2
отредактировано Alexandr Fokin
на 2020/08/23 13:15
на 2020/08/23 13:15
Изменить комментарий:
Updated parent field.
Сводка
-
Свойства страницы (2 изменено, 0 добавлено, 0 удалено)
Подробности
- Свойства страницы
-
- Родительский документ
-
... ... @@ -1,1 +1,1 @@ 1 -Разработка.NET.Библиотеки.Confluentinc\.Confluent-kafka-dotnet.WebHome 1 +xwiki:Разработка.NET.Библиотеки.Провайдеры.Confluentinc\.Confluent-kafka-dotnet.WebHome - Содержимое
-
... ... @@ -1,0 +1,33 @@ 1 + 2 +Задание: 3 +Получение ифнормации о текущем состоянии параметров: 4 +1) Общее кол-во сообщений в topic 5 +2) Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup) 6 +3) Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup) 7 +Опционально по каждому partition. 8 +Данные по topic можно определить по сумме совокупности данных по каждом partition, принажлежащему данному topic. 9 + 10 +https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161 11 +https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically 12 + 13 + 14 +Решение: 15 +Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition. 16 +Данные по topic формируются через сумму данных всех partitions, входящих в него. 17 + 18 +{{code language="c#"}} 19 + //номер смещения последнего сообщения в очереди 20 + consumer 21 + .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout) 22 + .High 23 + 24 + //Получить активное смещение (последнее прочитанное сообщение) 25 + consumer 26 + .Committed( 27 + new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) }, 28 + ConnectionParamsEntity.ActionTimeout 29 + ) 30 + .FirstOrDefault() 31 + .Offset 32 + .Value; 33 +{{/code}}