Исходный код вики Примеры

Версия 8.13 от Alexandr Fokin на 2026/05/01 19:37

Скрыть последних авторов
Alexandr Fokin 8.5 1 |1|(% style="width:188px" %)1 родительский процесс и N дочерних процессов.|(% style="width:1268px" %)(((
Alexandr Fokin 8.4 2 |В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
3 Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).
Alexandr Fokin 1.2 4 |(((
Alexandr Fokin 8.4 5 |(((
6 Вариант 1: CounterTrigger.
Alexandr Fokin 1.2 7 )))
8 |(((
9 1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
10 1. Дочерние процесс при завершении публикует TriggerEvent.
11 1. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
12 1. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
13 1. Триггер пробуждает родительский процесс для дальнейшего выполнения.
14 )))
15 |TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).
16 |(((
17 Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).
18
19 Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
20 Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.
21 )))
Alexandr Fokin 8.1 22 |[[image:Родительский дочерний процесс. Sequence.jpg]]
Alexandr Fokin 1.2 23 )))
Alexandr Fokin 8.4 24 |(((
25 Возможен вариант №2:
26
27 Мы просто ставит timerTrigger на условно 1-5-10 минут (насколько важна задержка) и перепроверяем условие завершения.
28 В этом случае будет
29
30 * Из минус: что родительский процесс узнает о завершении дочерних процессов с задержкой (хотя в задержке можно использовать функцию от количества необработанных дочерних процессов, но тогда нужно считать количество или хотя бы что оно не больше N).
31 * Из плюсов: будет меньше пишущей нагрузки на БД (но больше читающей - на проверку) т.к. у нас не будет CounterTrigger, но будет периодический запрос на проверку завершения всех дочерних процессов (аналогично страхующему триггер). \
32 )))
33 )))
Alexandr Fokin 8.5 34 |2|(% style="width:188px" %)Transaction outbox stream process.|(% style="width:1268px" %)[[image:TransactionOutbox. Sequence.jpg]]
35 |3|(% style="width:188px" %)Stream trigger|(% style="width:1268px" %)(((
Alexandr Fokin 1.2 36 | |(((
37 * Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
Alexandr Fokin 8.10 38 * __Позволяет полностью убрать задержку после остановки процесса__ (если есть новое сообщения, то он сразу же будет пробужден).
Alexandr Fokin 1.2 39 За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
40 * Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
41 * Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
Alexandr Fokin 8.10 42 * В некоторых случаях позволяет не выполнять wakeup код в конце сессии обработки (если отключить wakeup, оставить только stream trigger)
43 (блокировка и обновление wakeup entity, проверка wakeup условия), __улучшает перформанс такта работы__.
Alexandr Fokin 1.2 44 )))
45 |Алгоритм триггера.|(((
46 * При получении события о засыпании процесса:
47 Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
48 Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
49 * При получении события о новом сообщении:
50 Фиксирует новое наибольшее смещение.
51 Если процесс не спит (по флагу в триггере), то ничего не делает.
52 Если процесс спит (по флагу), то пробуждает процесс.
53
54 Отслеживает смещение обработки процесса и последнего события.
55 Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
56 Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
57 Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит
58 )))
Alexandr Fokin 8.9 59 |Заготовка|[[https:~~/~~/github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger>>https://github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger]]
Alexandr Fokin 1.2 60 )))
Alexandr Fokin 8.5 61 |4|(% style="width:188px" %)Групповое действие|(% style="width:1268px" %)(((
Alexandr Fokin 8.7 62 | |Действие, которое нужно применить к диапазону строк (сравнительно большому), независимо для каждой строки.
Alexandr Fokin 8.6 63 Наличие у строк упорядоченного столбца (для выделения диапазонов).
Alexandr Fokin 8.2 64 | |(((
Alexandr Fokin 8.3 65 |(% style="width:888px" %)Родительские процесс определяет границы диапазона [min, max].|(% style="width:266px" %){{code language="none"}}select min(), max()
66 where condition(){{/code}}
67 |(% style="width:888px" %)Родительский процесс нарезает диапазон [min, max] на поддиапазоны. На каждый поддиапазон создается дочерний процесс.|(% style="width:266px" %)
68 |(% style="width:888px" %)Каждый дочерний процесс обрабатывает свой поддиапазон строк (параллельно).|(% style="width:266px" %)Внутри поддиапазона может использоваться keyset пагинация.
Alexandr Fokin 8.8 69 |(% style="width:888px" %)Родительский процесс ожидает завершения дочерних процессов (см. пример 1).|(% style="width:266px" %)
Alexandr Fokin 8.2 70 )))
71 )))