Исходный код вики Примеры

Версия 8.6 от Alexandr Fokin на 2026/04/29 11:34

Скрыть последних авторов
Alexandr Fokin 8.5 1 |1|(% style="width:188px" %)1 родительский процесс и N дочерних процессов.|(% style="width:1268px" %)(((
Alexandr Fokin 8.4 2 |В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
3 Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).
Alexandr Fokin 1.2 4 |(((
Alexandr Fokin 8.4 5 |(((
6 Вариант 1: CounterTrigger.
Alexandr Fokin 1.2 7 )))
8 |(((
9 1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
10 1. Дочерние процесс при завершении публикует TriggerEvent.
11 1. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
12 1. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
13 1. Триггер пробуждает родительский процесс для дальнейшего выполнения.
14 )))
15 |TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).
16 |(((
17 Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).
18
19 Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
20 Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.
21 )))
Alexandr Fokin 8.1 22 |[[image:Родительский дочерний процесс. Sequence.jpg]]
Alexandr Fokin 1.2 23 )))
Alexandr Fokin 8.4 24 |(((
25 Возможен вариант №2:
26
27 Мы просто ставит timerTrigger на условно 1-5-10 минут (насколько важна задержка) и перепроверяем условие завершения.
28 В этом случае будет
29
30 * Из минус: что родительский процесс узнает о завершении дочерних процессов с задержкой (хотя в задержке можно использовать функцию от количества необработанных дочерних процессов, но тогда нужно считать количество или хотя бы что оно не больше N).
31 * Из плюсов: будет меньше пишущей нагрузки на БД (но больше читающей - на проверку) т.к. у нас не будет CounterTrigger, но будет периодический запрос на проверку завершения всех дочерних процессов (аналогично страхующему триггер). \
32 )))
33 )))
Alexandr Fokin 8.5 34 |2|(% style="width:188px" %)Transaction outbox stream process.|(% style="width:1268px" %)[[image:TransactionOutbox. Sequence.jpg]]
35 |3|(% style="width:188px" %)Stream trigger|(% style="width:1268px" %)(((
Alexandr Fokin 1.2 36 | |(((
37 * Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
38 * Позволяет полностью убрать задержку после остановки процесса (если есть новое сообщения, то он сразу же будет пробужден).
39 За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
40 * Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
41 * Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
42 )))
43 |Алгоритм триггера.|(((
44 * При получении события о засыпании процесса:
45 Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
46 Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
47 * При получении события о новом сообщении:
48 Фиксирует новое наибольшее смещение.
49 Если процесс не спит (по флагу в триггере), то ничего не делает.
50 Если процесс спит (по флагу), то пробуждает процесс.
51
52 Отслеживает смещение обработки процесса и последнего события.
53 Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
54 Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
55 Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит
56 )))
57 | |TODO:
58 )))
Alexandr Fokin 8.5 59 |4|(% style="width:188px" %)Групповое действие|(% style="width:1268px" %)(((
Alexandr Fokin 8.2 60 | |Действие, которое нужно применить к диапазону строк, независимо для каждой строки.
Alexandr Fokin 8.6 61 Наличие у строк упорядоченного столбца (для выделения диапазонов).
Alexandr Fokin 8.2 62 | |(((
Alexandr Fokin 8.3 63 |(% style="width:888px" %)Родительские процесс определяет границы диапазона [min, max].|(% style="width:266px" %){{code language="none"}}select min(), max()
64 where condition(){{/code}}
65 |(% style="width:888px" %)Родительский процесс нарезает диапазон [min, max] на поддиапазоны. На каждый поддиапазон создается дочерний процесс.|(% style="width:266px" %)
66 |(% style="width:888px" %)Каждый дочерний процесс обрабатывает свой поддиапазон строк (параллельно).|(% style="width:266px" %)Внутри поддиапазона может использоваться keyset пагинация.
67 |(% style="width:888px" %)Родительский процесс ожидает завершения дочерних процессов.|(% style="width:266px" %)
Alexandr Fokin 8.2 68 )))
69 )))