Примеры

Версия 8.5 от Alexandr Fokin на 2026/04/29 11:33

11 родительский процесс и N дочерних процессов.
В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).

Вариант 1: CounterTrigger.

  1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
  2. Дочерние процесс при завершении публикует TriggerEvent.
  3. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
  4. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
  5. Триггер пробуждает родительский процесс для дальнейшего выполнения.
TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).

Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).

Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.

Родительский дочерний процесс. Sequence.jpg

Возможен вариант №2:

Мы просто ставит timerTrigger на условно 1-5-10 минут (насколько важна задержка) и перепроверяем условие завершения.
В этом случае будет 

  • Из минус: что родительский процесс узнает о завершении дочерних процессов с задержкой (хотя в задержке можно использовать функцию от количества необработанных дочерних процессов, но тогда нужно считать количество или хотя бы что оно не больше N).
  • Из плюсов: будет меньше пишущей нагрузки на БД (но больше читающей - на проверку) т.к. у нас не будет CounterTrigger, но будет периодический запрос на проверку завершения всех дочерних процессов (аналогично страхующему триггер). \
2Transaction outbox stream process.TransactionOutbox. Sequence.jpg
3Stream trigger
 
  • Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
  • Позволяет полностью убрать задержку после остановки процесса (если есть новое сообщения, то он сразу же будет пробужден).
    За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
  • Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
  • Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
Алгоритм триггера.
  • При получении события о засыпании процесса:
    Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
    Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
  • При получении события о новом сообщении:
    Фиксирует новое наибольшее смещение.
    Если процесс не спит (по флагу в триггере), то ничего не делает.
    Если процесс спит (по флагу), то пробуждает процесс.

Отслеживает смещение обработки процесса и последнего события.
Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит

 TODO:
4Групповое действие
 Действие, которое нужно применить к диапазону строк, независимо для каждой строки.
Наличие у строк упорядоченного столбца.
 
Родительские процесс определяет границы диапазона [min, max].select min(), max()
where condition()
Родительский процесс нарезает диапазон [min, max] на поддиапазоны. На каждый поддиапазон создается дочерний процесс. 
Каждый дочерний процесс обрабатывает свой поддиапазон строк (параллельно).Внутри поддиапазона может использоваться keyset пагинация.
Родительский процесс ожидает завершения дочерних процессов.