Движок cccc1808. ProcessEngine

Версия 27.11 от Alexandr Fokin на 2026/04/11 00:30
Предупреждение: Из соображений безопасности документ отображается в ограниченном режиме, поскольку это не текущая версия. Из-за этого могут быть расхождения и ошибки.

Теги поиска

cccc1808. ProcessEngine, cccc1808.ProcessEngine
Очередь задач, Система обработки процессов, Движок обработки процессов.
Процесс

 Универсальный движок для выполнения процессов и очередей задач, позволяющий комбинировать несколько подходов к обработке (см особенности).
 Процесс является единицей исполнения. В реализации может содержать машину состояний.
Система триггеров используется для таймеров и передачи сигналов для процессов (с оптимизацией нагрузки). 
Разветывание
База данных

Для надежного хранения данных процессов и триггеров.

  • Сейчас есть реализация под EntityFramework.
  • Но модель позволяет сделать реализацию под другие решения (linq2db или чистый Ado.Net) (за счет наличия IProcessContainer и методов Update).

Для текущей реализации в качестве хранилище может выступать БД, поддерживающая:

  • Транзакции: транзакции.
  • Транзакции: savepoint.
    (если используются, можно обрабатывать каждый шаг отдельной транзакцией или весь процесс без savepoint изоляции ошибок между шагами).
  • Блокировка: updatelock.
  • Блокировка: updatelock skip locked.
    (частично можно обойтись без него).
  • Блокировка: sharelock
    (можно обойтись без него без сильного влияния)
  • Уровень изоляции: работает на read committed, то что нужно блокируется руками.
  • Для некоторых кейсов желательно возможность выполнить upsert (insert on conflict).
Брокер сообщений

Используется для накопления и доставки TriggerEvent.

  • Сейчас есть реализация под Apache Kafka
  • Скорее всего можно сделать под RabbitMq.
  • Если запускать в одном экземпляре, то можно обойтись inmemory реализаций.
Особенности
Батчинг при выполнении.

Возможность использовать и комбинировать разные типы выполнения как

  • (1 транзакция - 1 процесс),
  • (1 транзакция - N процессов).
Изоляция шагов и процессов (внутри оной транзакции)
  • Изоляция через db savepoint.
  • Для EntityFramework | EntityFrameworkCore возможность делать InMemory снимок Создание снимка ChangeTracker.
  • Допустима реализация на основе того, что InMemory состояние процесса (и задействованных бизнес сущностей) реализовано в виде Immutable компонентов. В этом случае достаточно просто сохранять и восстанавливать ссылку.
    Но предполагаю, что в большинстве бизнес моделей не так и это не не будет востребованным.
Передача сигналов для процессов через систему триггеров.

В том числе для передачи сигналов реализована система триггеров.
Она позволяет оптимизировать операции с БД и уменьшить конкуренцию между процессами за общее состояние (родительский процесс).

См. пример 1.

Stream process

Возможность реализации stream процессов. Которые обрабатываются некоторый поток сообщений.
Через использование триггеров (при поступлении сообщения публикуется TriggerEvent) и системы гарантированного пробуждения (процесс гарантировано не уснет, если есть необработанные сообщения).

См. пример 2.

Перехват ошибокПерехват и обработка ошибок, если процесс выкинул exception в движок. Реализацию простого retry с задержкой (создается триггер на следующую попытку).
Параллельное выполнениеДопускается запуск нескольких раннеров (на разных нодах), работающих с одной таблицей процессов для распределения нагрузки между ними.
Допускается фильтрация типов процессов между нодами (чтобы нода выполняла только определенные типы процессов, в том числе по приоритету).
Доступно для раннеров процессов и триггеров.

Также возможна частичная поставка новой версии процесса и этот процесс будет браться в обработку только ограниченным количеством новых нод исполнителей.

В перспективе возможны решения с шардированием.
Soft timeout

Возможность указывать soft timeout, который будет мягко приостанавливать цикл (внутри шага процесса), чтобы ограничить общее время транзакции (не делать транзакции долгими (например горизонт postgres)).

Например:

  1. родительский процесс может создать
    1) N/2 дочерних процессов за первую сессию выполнения (транзакцию)
    2) N/2 за вторую сессию выполнения (транзакцию) и уснуть до окончания дочерних процессов.
  2. Процесс - групповое действие (когда нужно применить действие к строкам таблицы по указанному фильтру).
Range preload process data(Необязательно) возможность в начале сессии обработки пакетно предзагружать данные и бизнес сущности для процессов (Range query) из обрабатываемого батча для оптимизации чтения (если используется обработка пакета процессов в одной транзакции).
Примеры
1Родительский процесс, N дочерних процессов.

В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).

  1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
  2. Дочерние процесс при завершении публикует TriggerEvent.
  3. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
  4. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
  5. Триггер пробуждает родительский процесс для дальнейшего выполнения.
TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).

Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).

Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.

Родительский дочерний процесс. Sequence.jpg

Возможен вариант №2, когда мы просто ставит timerTrigger на условно 1-5-10 минуту (насколько важна задержка) и перепроверяем условие завершения. Из минус, что родительский процесс узнает о завершении дочерних процессов с задержкой (хотя в задержке можно использовать функцию от количества необработанных дочерних процессов, но тогда нужно считать количество или хотя бы что оно не больше N).

  • Но тут будет join нагрузка на БД (если шаг проверки выполняется пакетно), иначе будет просто много одиночны запросов на чтение (условно раз в минуту).
  • Если дочерний процесс остановиться в ошибке, то родительский либо также продолжит крутиться в проверке (впустую доя разрешения проблемы), либо должен также пробросить ошибку в себя чтобы приостановиться.
    При это если родительский процесс приостановиться при обнаружении хотя бы одной ошибки в дочернем процессе, то стартануть нужно будет и родительский (он сам не узнает), а в случае с сигналом останавливать родительский процесс не нужно.
    В случае с решением 1, со страхующим триггером это можно обойти через фильтрующий индекс если мы начинаем идти сразу с таблицы процессов (т.е. процессы с ошибкой сразу будут игнорироваться).
  • Но все равно, именно данный движек может позволить настроить 2 процесса таким образом, что 1 процесс будет исполняться (1 процесс - 1 транзакция) в параллельном режиме (пока создаются множественные дочерние процессы), 2 процесс будет исполняться в пакетном режиме (N процессов - 1 транзакция) чтобы проверять выполнение условия завершения дочерних процессов через запрос (один пакетный).

плюсы: меньше пишущей нагрузки (т.к. триггер со счетчиком будет делать условно одну запись на счетчик триггер в 5-20 секунд), а тут будет одна запись в 1-5 минуту на обновление таймера.
минусы: больше читающей нагрузки с join (раз 1-5 минуту нужно будет выполнить join незавершенных процессов с дочерними). У решения 1 тоже есть такая нагрузка, но на страхующем триггер (условно раз 10-30 минут) и у решения 1 процессы упавшие в ошибку не будут генерировать нагрузку (если использовать фильтрующий индекс).

2Transaction outbox stream process.TransactionOutbox. Sequence.jpg
3Stream trigger
 
  • Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
  • Позволяет полностью убрать задержку после остановки процесса (если есть новое сообщения, то он сразу же будет пробужден).
    За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
  • Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
  • Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
Алгоритм триггера.
  • При получении события о засыпании процесса:
    Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
    Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
  • При получении события о новом сообщении:
    Фиксирует новое наибольшее смещение.
    Если процесс не спит (по флагу в триггере), то ничего не делает.
    Если процесс спит (по флагу), то пробуждает процесс.

Отслеживает смещение обработки процесса и последнего события.
Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит

 TODO:

Внутренние ссылки:

Дочерние страницы:
Обратные ссылки:

Не удалось выполнить макрос [velocity]. Причина: [The execution of the [velocity] script macro is not allowed in [xwiki:Проекты и репозитории.Библиотеки.Движок cccc1808\. ProcessEngine.WebHome]. Check the rights of its last author or the parameters if it's rendered from another script.]. Нажмите на это сообщение для получения подробной информации.