Примеры

Версия 7.1 от Alexandr Fokin на 2026/04/27 13:26

1Родительский процесс, N дочерних процессов.

В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).

  1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
  2. Дочерние процесс при завершении публикует TriggerEvent.
  3. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
  4. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
  5. Триггер пробуждает родительский процесс для дальнейшего выполнения.
TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).

Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).

Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.

 

 

2Transaction outbox stream process. 
3Stream trigger
 
  • Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
  • Позволяет полностью убрать задержку после остановки процесса (если есть новое сообщения, то он сразу же будет пробужден).
    За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
  • Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
  • Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
Алгоритм триггера.
  • При получении события о засыпании процесса:
    Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
    Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
  • При получении события о новом сообщении:
    Фиксирует новое наибольшее смещение.
    Если процесс не спит (по флагу в триггере), то ничего не делает.
    Если процесс спит (по флагу), то пробуждает процесс.

Отслеживает смещение обработки процесса и последнего события.
Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит

 TODO: