Изменения документа Получение данных о смещении

Редактировал(а) Alexandr Fokin 2023/05/10 11:40

От версии 1.1
отредактировано Alexandr Fokin
на 2020/08/17 11:43
Изменить комментарий: К данной версии нет комментариев
К версии 2.6
отредактировано Alexandr Fokin
на 2022/11/07 00:41
Изменить комментарий: Updated parent field.

Сводка

Подробности

Свойства страницы
Родительский документ
... ... @@ -1,1 +1,1 @@
1 -Разработка.NET.Библиотеки.Confluentinc\.Confluent-kafka-dotnet.WebHome
1 +xwiki:Разработка.Брокеры сообщений.Apache Kafka.Клиент.Confluentinc\.Confluent-kafka-dotnet.WebHome
Содержимое
... ... @@ -1,0 +1,33 @@
1 +
2 +Задание:
3 +Получение ифнормации о текущем состоянии параметров:
4 +1) Общее кол-во сообщений в topic
5 +2) Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup)
6 +3) Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup)
7 +Опционально по каждому partition.
8 +Данные по topic можно определить по сумме совокупности данных по каждом partition, принажлежащему данному topic.
9 +
10 +https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161
11 +https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically
12 +
13 +
14 +Решение:
15 +Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition.
16 +Данные по topic формируются через сумму данных всех partitions, входящих в него.
17 +
18 +{{code language="c#"}}
19 + //номер смещения последнего сообщения в очереди
20 + consumer
21 + .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout)
22 + .High
23 +
24 + //Получить активное смещение (последнее прочитанное сообщение)
25 + consumer
26 + .Committed(
27 + new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) },
28 + ConnectionParamsEntity.ActionTimeout
29 + )
30 + .FirstOrDefault()
31 + .Offset
32 + .Value;
33 +{{/code}}