Исходный код вики Примеры

Версия 8.14 от Alexandr Fokin на 2026/05/05 18:21

Последние авторы
1 |1|(% style="width:188px" %)1 родительский процесс и N дочерних процессов.|(% style="width:1268px" %)(((
2 |В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
3 Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).
4 |(((
5 |(((
6 Вариант 1: CounterTrigger.
7 )))
8 |(((
9 1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
10 1. Дочерние процесс при завершении публикует TriggerEvent.
11 1. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
12 1. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
13 1. Триггер пробуждает родительский процесс для дальнейшего выполнения.
14 )))
15 |TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).
16 |(((
17 Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).
18
19 Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
20 Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.
21 )))
22 |[[image:Родительский дочерний процесс. Sequence.jpg]]
23 )))
24 |(((
25 Вариант №2:
26
27 Мы просто ставит timerTrigger на условно 1-5-10 минут (насколько важна задержка) и перепроверяем условие завершения.
28 В этом случае будет
29
30 * Из минус: что родительский процесс узнает о завершении дочерних процессов с задержкой (хотя в задержке можно использовать функцию от количества необработанных дочерних процессов, но тогда нужно считать количество или хотя бы что оно не больше N).
31 * Из плюсов: будет меньше пишущей нагрузки на БД (но больше читающей - на проверку) т.к. у нас не будет CounterTrigger, но будет периодический запрос на проверку завершения всех дочерних процессов (аналогично страхующему триггер). \
32 )))
33 |(((
34 Вариант №3:
35
36 Дочерние процессы выполняются через родительский (ограничение в рамках одной ноды).
37 Точкой выполнения является родительский процесс, который внутри себя (параллельно или последовательно) выполняет дочерние процессы.
38 За счет такого способа у нас также отсутствует конкуренция передачи сигнала в родительский процесс.
39 Но мы ограничены выполнением дочерних процессов одной одной сервиса.
40 Также решает проблему, если дочерний процесс содержит ожидание (например асинхронный запрос-ответ).
41 )))
42 )))
43 |2|(% style="width:188px" %)Transaction outbox stream process.|(% style="width:1268px" %)[[image:TransactionOutbox. Sequence.jpg]]
44 |3|(% style="width:188px" %)Stream trigger|(% style="width:1268px" %)(((
45 | |(((
46 * Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
47 * __Позволяет полностью убрать задержку после остановки процесса__ (если есть новое сообщения, то он сразу же будет пробужден).
48 За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
49 * Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
50 * Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
51 * В некоторых случаях позволяет не выполнять wakeup код в конце сессии обработки (если отключить wakeup, оставить только stream trigger)
52 (блокировка и обновление wakeup entity, проверка wakeup условия), __улучшает перформанс такта работы__.
53 )))
54 |Алгоритм триггера.|(((
55 * При получении события о засыпании процесса:
56 Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
57 Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
58 * При получении события о новом сообщении:
59 Фиксирует новое наибольшее смещение.
60 Если процесс не спит (по флагу в триггере), то ничего не делает.
61 Если процесс спит (по флагу), то пробуждает процесс.
62
63 Отслеживает смещение обработки процесса и последнего события.
64 Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
65 Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
66 Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит
67 )))
68 |Заготовка|[[https:~~/~~/github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger>>https://github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger]]
69 )))
70 |4|(% style="width:188px" %)Групповое действие|(% style="width:1268px" %)(((
71 | |Действие, которое нужно применить к диапазону строк (сравнительно большому), независимо для каждой строки.
72 Наличие у строк упорядоченного столбца (для выделения диапазонов).
73 | |(((
74 |(% style="width:888px" %)Родительские процесс определяет границы диапазона [min, max].|(% style="width:266px" %){{code language="none"}}select min(), max()
75 where condition(){{/code}}
76 |(% style="width:888px" %)Родительский процесс нарезает диапазон [min, max] на поддиапазоны. На каждый поддиапазон создается дочерний процесс.|(% style="width:266px" %)
77 |(% style="width:888px" %)Каждый дочерний процесс обрабатывает свой поддиапазон строк (параллельно).|(% style="width:266px" %)Внутри поддиапазона может использоваться keyset пагинация.
78 |(% style="width:888px" %)Родительский процесс ожидает завершения дочерних процессов (см. пример 1).|(% style="width:266px" %)
79 )))
80 )))