Исходный код вики Примеры

Редактировал(а) Alexandr Fokin 2026/05/09 02:00

Скрыть последних авторов
Alexandr Fokin 8.5 1 |1|(% style="width:188px" %)1 родительский процесс и N дочерних процессов.|(% style="width:1268px" %)(((
Alexandr Fokin 8.4 2 |В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
3 Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).
Alexandr Fokin 1.2 4 |(((
Alexandr Fokin 8.4 5 |(((
6 Вариант 1: CounterTrigger.
Alexandr Fokin 1.2 7 )))
8 |(((
9 1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
10 1. Дочерние процесс при завершении публикует TriggerEvent.
11 1. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
12 1. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
13 1. Триггер пробуждает родительский процесс для дальнейшего выполнения.
14 )))
15 |TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).
16 |(((
17 Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).
18
19 Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
20 Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.
21 )))
Alexandr Fokin 8.1 22 |[[image:Родительский дочерний процесс. Sequence.jpg]]
Alexandr Fokin 1.2 23 )))
Alexandr Fokin 8.4 24 |(((
Alexandr Fokin 8.14 25 Вариант №2:
Alexandr Fokin 8.4 26
Alexandr Fokin 8.17 27 Мы просто ставим TimerTrigger на условно 1-5-10 минут (насколько важна задержка) и перепроверяем условие завершения.
Alexandr Fokin 8.4 28 В этом случае будет
29
Alexandr Fokin 8.17 30 * Из минус: что родительский процесс узнает о завершении дочерних процессов с задержкой.
31 Если дочерний процесс падает в ошибку, TimerTrigger все равно будет крутиться и создавать пустую нагрузку.
Alexandr Fokin 8.22 32 * Из плюсов: будет меньше пишущей нагрузки на БД чем в варианте 1 (но больше читающей - на проверку) т.к. у нас не будет CounterTrigger, но будет периодический запрос на проверку завершения всех дочерних процессов (аналогично страхующему триггер).
Alexandr Fokin 8.17 33 * [Расширенный]: Дочерние процессы в блоке wakeup condition проверяют наличие незавершенных процессов.
34 ** Если все процессы завершены или (незавершенных процессов мало и нет процессов с ошибкой), то можно опубликовать событие на TimerTrigger на сброс или установку минимальной задержки.
Alexandr Fokin 8.4 35 )))
Alexandr Fokin 8.14 36 |(((
37 Вариант №3:
38
39 Дочерние процессы выполняются через родительский (ограничение в рамках одной ноды).
40 Точкой выполнения является родительский процесс, который внутри себя (параллельно или последовательно) выполняет дочерние процессы.
41 За счет такого способа у нас также отсутствует конкуренция передачи сигнала в родительский процесс.
42 Но мы ограничены выполнением дочерних процессов одной одной сервиса.
Alexandr Fokin 8.15 43 Сложнее контролировать распределение нагрузки, если будет вложенный параллелизм.
Alexandr Fokin 8.16 44 Также решает проблему, если дочерний процесс содержит ожидание (например асинхронный запрос-ответ), тут будет конкуренция сигнала от хендлера ответа к родительскому процессу.
Alexandr Fokin 8.4 45 )))
Alexandr Fokin 8.17 46 |(((
47 Вариант N4:
48
49 SimpleStreamTrigger + Timer (один из лучших вариантов).
50
51 * Триггер проверяет условие завершения всех дочерних процессов (можно прикинуть количество незавершенных дочерних процессов).
52 ** Если все обработано, то пробуждает процесс и деактивируется.
Alexandr Fokin 8.19 53 ** Иначе:
54 *** деактивируется (до поступления хотя бы одного сигнала),
55 *** взводит признак стрима - процесс ожидает,
56 *** взводит флаг новых сигналов на 0,
57 *** выставляет задержку от оценки количества необработанных процессов (< N - малая задержка, иначе большая задержка).
Alexandr Fokin 8.17 58 * [Расширенный]: Дочерние процессы в блоке wakeup condition проверяют наличие незавершенных процессов.
Alexandr Fokin 8.18 59 ** Если все процессы завершены или (незавершенных процессов мало и нет процессов с ошибкой), то можно опубликовать событие на SimpleStreamTrigger на сброс или установку минимальной задержки (в дополнение к сигналу).
Alexandr Fokin 8.21 60 * Читающей нагрузки будет немного больше чем в варианте 2 (чтение триггера на поступлении сигнала),
61 но пишущей нагрузки будет меньше чем в варианте 1 (запись - только на активации новым сигналом).
62 * Если сигналов нет, то нет пустых срабатываний в отличие от варианта 2 (т.к. нет поступления сигнала от дочерних процессов).
Alexandr Fokin 8.14 63 )))
Alexandr Fokin 8.17 64 )))
Alexandr Fokin 8.5 65 |2|(% style="width:188px" %)Transaction outbox stream process.|(% style="width:1268px" %)[[image:TransactionOutbox. Sequence.jpg]]
66 |3|(% style="width:188px" %)Stream trigger|(% style="width:1268px" %)(((
Alexandr Fokin 1.2 67 | |(((
68 * Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
Alexandr Fokin 8.10 69 * __Позволяет полностью убрать задержку после остановки процесса__ (если есть новое сообщения, то он сразу же будет пробужден).
Alexandr Fokin 1.2 70 За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
71 * Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
72 * Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
Alexandr Fokin 8.10 73 * В некоторых случаях позволяет не выполнять wakeup код в конце сессии обработки (если отключить wakeup, оставить только stream trigger)
74 (блокировка и обновление wakeup entity, проверка wakeup условия), __улучшает перформанс такта работы__.
Alexandr Fokin 1.2 75 )))
76 |Алгоритм триггера.|(((
77 * При получении события о засыпании процесса:
78 Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
79 Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
80 * При получении события о новом сообщении:
81 Фиксирует новое наибольшее смещение.
82 Если процесс не спит (по флагу в триггере), то ничего не делает.
83 Если процесс спит (по флагу), то пробуждает процесс.
84
85 Отслеживает смещение обработки процесса и последнего события.
86 Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
87 Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
88 Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит
89 )))
Alexandr Fokin 8.9 90 |Заготовка|[[https:~~/~~/github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger>>https://github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger]]
Alexandr Fokin 1.2 91 )))
Alexandr Fokin 8.5 92 |4|(% style="width:188px" %)Групповое действие|(% style="width:1268px" %)(((
Alexandr Fokin 8.7 93 | |Действие, которое нужно применить к диапазону строк (сравнительно большому), независимо для каждой строки.
Alexandr Fokin 8.6 94 Наличие у строк упорядоченного столбца (для выделения диапазонов).
Alexandr Fokin 8.2 95 | |(((
Alexandr Fokin 8.3 96 |(% style="width:888px" %)Родительские процесс определяет границы диапазона [min, max].|(% style="width:266px" %){{code language="none"}}select min(), max()
97 where condition(){{/code}}
98 |(% style="width:888px" %)Родительский процесс нарезает диапазон [min, max] на поддиапазоны. На каждый поддиапазон создается дочерний процесс.|(% style="width:266px" %)
99 |(% style="width:888px" %)Каждый дочерний процесс обрабатывает свой поддиапазон строк (параллельно).|(% style="width:266px" %)Внутри поддиапазона может использоваться keyset пагинация.
Alexandr Fokin 8.8 100 |(% style="width:888px" %)Родительский процесс ожидает завершения дочерних процессов (см. пример 1).|(% style="width:266px" %)
Alexandr Fokin 8.2 101 )))
102 )))