Изменения документа Получение данных о смещении

Редактировал(а) Alexandr Fokin 2023/05/10 11:40

От версии 1.1
отредактировано Alexandr Fokin
на 2020/08/17 11:43
Изменить комментарий: К данной версии нет комментариев
К версии 2.16
отредактировано Alexandr Fokin
на 2023/05/10 11:40
Изменить комментарий: К данной версии нет комментариев

Сводка

Подробности

Свойства страницы
Название
... ... @@ -1,1 +1,1 @@
1 -Задача. Получение данных о смещении
1 +Получение данных о смещении
Родительский документ
... ... @@ -1,1 +1,1 @@
1 -Разработка.NET.Библиотеки.Confluentinc\.Confluent-kafka-dotnet.WebHome
1 +Разработка.NET.Работа с брокерами сообщений.Confluentinc\. Confluent-kafka-dotnet.Сценарии.WebHome
Содержимое
... ... @@ -1,0 +1,33 @@
1 +|Задание:
2 +Получение информации о текущем состоянии параметров:(((
3 +1. Общее кол-во сообщений в topic
4 +1. Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup)
5 +1. Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup)
6 +
7 +Опционально по каждому partition.
8 +Данные по topic можно определить по сумме совокупности данных по каждом partition, принадлежащему данному topic.
9 +)))
10 +|(((
11 +Get Consumer lag using LibrdkafkaHandle.OutQueueLength
12 +https://github.com/confluentinc/confluent-kafka-dotnet/issues/1161
13 +
14 +Aug 2019 - Kafka Consumer Lag programmatically
15 +https://stackoverflow.com/questions/57302244/aug-2019-kafka-consumer-lag-programmatically
16 +)))
17 +|Решение:
18 +Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition.
19 +Данные по topic формируются через сумму данных всех partitions, входящих в него.
20 +|{{code language="c#"}} //номер смещения последнего сообщения в очереди
21 + consumer
22 + .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout)
23 + .High
24 +
25 + //Получить активное смещение (последнее прочитанное сообщение)
26 + consumer
27 + .Committed(
28 + new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) },
29 + ConnectionParamsEntity.ActionTimeout
30 + )
31 + .FirstOrDefault()
32 + .Offset
33 + .Value;{{/code}}