Исходный код вики Примеры

Версия 7.1 от Alexandr Fokin на 2026/04/27 13:26

Скрыть последних авторов
Alexandr Fokin 1.2 1 |1|Родительский процесс, N дочерних процессов.|(((
2 |(((
3 В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
4 Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).
5 )))
6 |(((
7 1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
8 1. Дочерние процесс при завершении публикует TriggerEvent.
9 1. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
10 1. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
11 1. Триггер пробуждает родительский процесс для дальнейшего выполнения.
12 )))
13 |TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).
14 |(((
15 Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).
16
17 Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
18 Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.
19 )))
20 |
21 |(((
22
23 )))
24 )))
25 |2|Transaction outbox stream process.|
26 |3|Stream trigger|(((
27 | |(((
28 * Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
29 * Позволяет полностью убрать задержку после остановки процесса (если есть новое сообщения, то он сразу же будет пробужден).
30 За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
31 * Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
32 * Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
33 )))
34 |Алгоритм триггера.|(((
35 * При получении события о засыпании процесса:
36 Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
37 Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
38 * При получении события о новом сообщении:
39 Фиксирует новое наибольшее смещение.
40 Если процесс не спит (по флагу в триггере), то ничего не делает.
41 Если процесс спит (по флагу), то пробуждает процесс.
42
43 Отслеживает смещение обработки процесса и последнего события.
44 Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
45 Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
46 Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит
47 )))
48 | |TODO:
49 )))