Исходный код вики Примеры

Версия 8.21 от Alexandr Fokin на 2026/05/09 01:59

Последние авторы
1 |1|(% style="width:188px" %)1 родительский процесс и N дочерних процессов.|(% style="width:1268px" %)(((
2 |В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
3 Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).
4 |(((
5 |(((
6 Вариант 1: CounterTrigger.
7 )))
8 |(((
9 1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
10 1. Дочерние процесс при завершении публикует TriggerEvent.
11 1. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
12 1. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
13 1. Триггер пробуждает родительский процесс для дальнейшего выполнения.
14 )))
15 |TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).
16 |(((
17 Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).
18
19 Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
20 Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.
21 )))
22 |[[image:Родительский дочерний процесс. Sequence.jpg]]
23 )))
24 |(((
25 Вариант №2:
26
27 Мы просто ставим TimerTrigger на условно 1-5-10 минут (насколько важна задержка) и перепроверяем условие завершения.
28 В этом случае будет
29
30 * Из минус: что родительский процесс узнает о завершении дочерних процессов с задержкой.
31 Если дочерний процесс падает в ошибку, TimerTrigger все равно будет крутиться и создавать пустую нагрузку.
32 * Из плюсов: будет меньше пишущей нагрузки на БД (но больше читающей - на проверку) т.к. у нас не будет CounterTrigger, но будет периодический запрос на проверку завершения всех дочерних процессов (аналогично страхующему триггер).
33 * [Расширенный]: Дочерние процессы в блоке wakeup condition проверяют наличие незавершенных процессов.
34 ** Если все процессы завершены или (незавершенных процессов мало и нет процессов с ошибкой), то можно опубликовать событие на TimerTrigger на сброс или установку минимальной задержки.
35 )))
36 |(((
37 Вариант №3:
38
39 Дочерние процессы выполняются через родительский (ограничение в рамках одной ноды).
40 Точкой выполнения является родительский процесс, который внутри себя (параллельно или последовательно) выполняет дочерние процессы.
41 За счет такого способа у нас также отсутствует конкуренция передачи сигнала в родительский процесс.
42 Но мы ограничены выполнением дочерних процессов одной одной сервиса.
43 Сложнее контролировать распределение нагрузки, если будет вложенный параллелизм.
44 Также решает проблему, если дочерний процесс содержит ожидание (например асинхронный запрос-ответ), тут будет конкуренция сигнала от хендлера ответа к родительскому процессу.
45 )))
46 |(((
47 Вариант N4:
48
49 SimpleStreamTrigger + Timer (один из лучших вариантов).
50
51 * Триггер проверяет условие завершения всех дочерних процессов (можно прикинуть количество незавершенных дочерних процессов).
52 ** Если все обработано, то пробуждает процесс и деактивируется.
53 ** Иначе:
54 *** деактивируется (до поступления хотя бы одного сигнала),
55 *** взводит признак стрима - процесс ожидает,
56 *** взводит флаг новых сигналов на 0,
57 *** выставляет задержку от оценки количества необработанных процессов (< N - малая задержка, иначе большая задержка).
58 * [Расширенный]: Дочерние процессы в блоке wakeup condition проверяют наличие незавершенных процессов.
59 ** Если все процессы завершены или (незавершенных процессов мало и нет процессов с ошибкой), то можно опубликовать событие на SimpleStreamTrigger на сброс или установку минимальной задержки (в дополнение к сигналу).
60 * Читающей нагрузки будет немного больше чем в варианте 2 (чтение триггера на поступлении сигнала),
61 но пишущей нагрузки будет меньше чем в варианте 1 (запись - только на активации новым сигналом).
62 * Если сигналов нет, то нет пустых срабатываний в отличие от варианта 2 (т.к. нет поступления сигнала от дочерних процессов).
63 )))
64 )))
65 |2|(% style="width:188px" %)Transaction outbox stream process.|(% style="width:1268px" %)[[image:TransactionOutbox. Sequence.jpg]]
66 |3|(% style="width:188px" %)Stream trigger|(% style="width:1268px" %)(((
67 | |(((
68 * Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
69 * __Позволяет полностью убрать задержку после остановки процесса__ (если есть новое сообщения, то он сразу же будет пробужден).
70 За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
71 * Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
72 * Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
73 * В некоторых случаях позволяет не выполнять wakeup код в конце сессии обработки (если отключить wakeup, оставить только stream trigger)
74 (блокировка и обновление wakeup entity, проверка wakeup условия), __улучшает перформанс такта работы__.
75 )))
76 |Алгоритм триггера.|(((
77 * При получении события о засыпании процесса:
78 Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
79 Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
80 * При получении события о новом сообщении:
81 Фиксирует новое наибольшее смещение.
82 Если процесс не спит (по флагу в триггере), то ничего не делает.
83 Если процесс спит (по флагу), то пробуждает процесс.
84
85 Отслеживает смещение обработки процесса и последнего события.
86 Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
87 Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
88 Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит
89 )))
90 |Заготовка|[[https:~~/~~/github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger>>https://github.com/cccc1808/cccc1808.ProcessEngine/tree/cccc1808/feature/trigger_stream_trigger]]
91 )))
92 |4|(% style="width:188px" %)Групповое действие|(% style="width:1268px" %)(((
93 | |Действие, которое нужно применить к диапазону строк (сравнительно большому), независимо для каждой строки.
94 Наличие у строк упорядоченного столбца (для выделения диапазонов).
95 | |(((
96 |(% style="width:888px" %)Родительские процесс определяет границы диапазона [min, max].|(% style="width:266px" %){{code language="none"}}select min(), max()
97 where condition(){{/code}}
98 |(% style="width:888px" %)Родительский процесс нарезает диапазон [min, max] на поддиапазоны. На каждый поддиапазон создается дочерний процесс.|(% style="width:266px" %)
99 |(% style="width:888px" %)Каждый дочерний процесс обрабатывает свой поддиапазон строк (параллельно).|(% style="width:266px" %)Внутри поддиапазона может использоваться keyset пагинация.
100 |(% style="width:888px" %)Родительский процесс ожидает завершения дочерних процессов (см. пример 1).|(% style="width:266px" %)
101 )))
102 )))