Исходный код вики Движок cccc1808. ProcessEngine

Версия 30.2 от Alexandr Fokin на 2026/04/27 13:25

Скрыть последних авторов
Alexandr Fokin 27.16 1 |(% style="width:132px" %) |(% style="width:1301px" %)(((
2 |Теги поиска|cccc1808. ProcessEngine, cccc1808.ProcessEngine, Process engine
Alexandr Fokin 1.1 3 Очередь задач, Система обработки процессов, Движок обработки процессов.
Alexandr Fokin 1.2 4 [[Процесс>>doc:Архитектура и модели.Модели.Процесс.WebHome]]
Alexandr Fokin 27.16 5 |Описание|Универсальный движок для выполнения процессов и очередей задач, позволяющий комбинировать несколько подходов к обработке (см особенности).
6 |Термины|Процесс является единицей исполнения. В реализации может содержать машину состояний.
7 Система триггеров используется для таймеров и передачи сигналов для процессов (с оптимизацией нагрузки).
8 |Репозиторий|[[https:~~/~~/github.com/cccc1808/cccc1808.ProcessEngine>>https://github.com/cccc1808/cccc1808.ProcessEngine]]
Alexandr Fokin 1.2 9 )))
Alexandr Fokin 1.18 10 |(% style="width:132px" %)Разветывание|(% style="width:1301px" %)(((
11 |(% style="width:150px" %)База данных|(% style="width:1177px" %)(((
12 Для надежного хранения данных процессов и триггеров.
13
14 * Сейчас есть реализация под EntityFramework.
Alexandr Fokin 1.34 15 * Но модель позволяет сделать реализацию под другие решения ([[linq2db>>doc:Разработка.NET.Работа с БД.linq2db.WebHome]] или чистый Ado.Net) (за счет наличия IProcessContainer и методов Update).
Alexandr Fokin 1.18 16
17 Для текущей реализации в качестве хранилище может выступать БД, поддерживающая:
18
Alexandr Fokin 27.17 19 * Транзакции:
20 ** Транзакции.
21 ** Savepoint.
Alexandr Fokin 1.41 22 (если используются, можно обрабатывать каждый шаг отдельной транзакцией или весь процесс без savepoint изоляции ошибок между шагами).
Alexandr Fokin 27.17 23 * [[Блокировки>>doc:Разработка.Базы данных.SQL.Механизмы.Транзакции и блокировки.WebHome]]:
24 ** updatelock.
25 ** updatelock skip locked.
Alexandr Fokin 1.41 26 (частично можно обойтись без него).
Alexandr Fokin 27.17 27 ** sharelock
Alexandr Fokin 1.33 28 (можно обойтись без него без сильного влияния)
Alexandr Fokin 27.13 29 * [[Уровни изоляции>>doc:Разработка.Базы данных.SQL.Механизмы.Транзакции и блокировки.Уровни изоляции | Isolation levels.WebHome]]: работает на read committed, то что нужно блокируется руками.
30 * Для некоторых кейсов желательно возможность выполнить [[Upsert>>doc:Разработка.Базы данных.SQL.Сценарии и вопросы.Insert or update\. Upsert.WebHome]] (insert on conflict).
Alexandr Fokin 1.18 31 )))
32 |(% style="width:150px" %)Брокер сообщений|(% style="width:1177px" %)(((
33 Используется для накопления и доставки TriggerEvent.
34
35 * Сейчас есть реализация под [[Apache Kafka>>doc:Разработка.Брокеры сообщений.Apache Kafka.WebHome]]
Alexandr Fokin 1.44 36 * Скорее всего можно сделать под RabbitMq.
Alexandr Fokin 1.19 37 * Если запускать в одном экземпляре, то можно обойтись inmemory реализаций.
Alexandr Fokin 1.18 38 )))
39 )))
Alexandr Fokin 1.1 40 |(% style="width:132px" %)Особенности|(% style="width:1301px" %)(((
Alexandr Fokin 27.14 41 |(% style="width:159px" %)Пакетные транзакции (батчинг).|(% style="width:1168px" %)(((
42 Возможность использовать и комбинировать типы выполнения для разных типов процессов:
Alexandr Fokin 1.1 43
44 * (1 транзакция - 1 процесс),
45 * (1 транзакция - N процессов).
46 )))
Alexandr Fokin 1.6 47 |(% style="width:159px" %)Изоляция шагов и процессов (внутри оной транзакции)|(% style="width:1168px" %)(((
Alexandr Fokin 1.1 48 * Изоляция через db savepoint.
49 * Для [[EntityFramework ~| EntityFrameworkCore>>doc:Разработка.NET.Работа с БД.EntityFramework | EntityFrameworkCore.WebHome]] возможность делать InMemory снимок [[Создание снимка ChangeTracker>>doc:Разработка.NET.Работа с БД.EntityFramework | EntityFrameworkCore.Сценарии и проблемы.Создание снимка ChangeTracker.WebHome]].
Alexandr Fokin 1.38 50 * Допустима реализация на основе того, что InMemory состояние процесса (и задействованных бизнес сущностей) реализовано в виде Immutable компонентов. В этом случае достаточно просто сохранять и восстанавливать ссылку.
Alexandr Fokin 1.39 51 Но предполагаю, что в большинстве бизнес моделей не так и это не не будет востребованным.
Alexandr Fokin 1.1 52 )))
Alexandr Fokin 1.6 53 |(% style="width:159px" %)Передача сигналов для процессов через систему триггеров.|(% style="width:1168px" %)(((
Alexandr Fokin 1.1 54 В том числе для передачи сигналов реализована система триггеров.
Alexandr Fokin 1.14 55 Она позволяет оптимизировать операции с БД и уменьшить конкуренцию между процессами за общее состояние (родительский процесс).
Alexandr Fokin 1.1 56
Alexandr Fokin 6.2 57 См. пример 1.
Alexandr Fokin 1.1 58 )))
Alexandr Fokin 6.2 59 |(% style="width:159px" %)Stream process|(% style="width:1168px" %)(((
60 Возможность реализации stream процессов. Которые обрабатываются некоторый поток сообщений.
61 Через использование триггеров (при поступлении сообщения публикуется TriggerEvent) и системы гарантированного пробуждения (процесс гарантировано не уснет, если есть необработанные сообщения).
62
63 См. пример 2.
64 )))
Alexandr Fokin 1.11 65 |(% style="width:159px" %)Перехват ошибок|(% style="width:1168px" %)Перехват и обработка ошибок, если процесс выкинул exception в движок. Реализацию простого retry с задержкой (создается триггер на следующую попытку).
Alexandr Fokin 27.15 66 В случае пакетной транзакции движок не знает какой конкретно из процессов породил ошибку (если она не перехвачена вручную), то ошибка выставляется на все незавершенные процессы.
Alexandr Fokin 1.6 67 |(% style="width:159px" %)Параллельное выполнение|(% style="width:1168px" %)Допускается запуск нескольких раннеров (на разных нодах), работающих с одной таблицей процессов для распределения нагрузки между ними.
Alexandr Fokin 1.9 68 Допускается фильтрация типов процессов между нодами (чтобы нода выполняла только определенные типы процессов, в том числе по приоритету).
Alexandr Fokin 1.6 69 Доступно для раннеров процессов и триггеров.
Alexandr Fokin 1.9 70 \\Также возможна частичная поставка новой версии процесса и этот процесс будет браться в обработку только ограниченным количеством новых нод исполнителей.
71 \\В перспективе возможны решения с шардированием.
Alexandr Fokin 1.33 72 |(% style="width:159px" %)Soft timeout|(% style="width:1168px" %)(((
73 Возможность указывать soft timeout, который будет мягко приостанавливать цикл (внутри шага процесса), чтобы ограничить общее время транзакции (не делать транзакции долгими (например горизонт postgres)).
74
75 Например:
76
77 1. родительский процесс может создать
78 1) N/2 дочерних процессов за первую сессию выполнения (транзакцию)
79 2) N/2 за вторую сессию выполнения (транзакцию) и уснуть до окончания дочерних процессов.
80 1. Процесс - групповое действие (когда нужно применить действие к строкам таблицы по указанному фильтру).
81 )))
Alexandr Fokin 1.20 82 |(% style="width:159px" %)Range preload process data|(% style="width:1168px" %)(Необязательно) возможность в начале сессии обработки пакетно предзагружать данные и бизнес сущности для процессов (Range query) из обрабатываемого батча для оптимизации чтения (если используется обработка пакета процессов в одной транзакции).
Alexandr Fokin 1.1 83 )))
84 |(% style="width:132px" %)Примеры|(% style="width:1301px" %)(((
85 |(% style="width:32px" %)1|(% style="width:171px" %)Родительский процесс, N дочерних процессов.|(% style="width:1066px" %)(((
86 |(% style="width:870px" %)(((
Alexandr Fokin 1.29 87 В данном примере имеется в виду, что дочерние процессы могут выполняться параллельно другу и независимо друг от друга, но в конце должны оповестить родительский процесс о необходимости продолжения обработки.
88 Если речь идет о каких-либо зависимостях порядка выполнения в дочерних процессах, то это может контролировать дочерний процесс (выделяя группу, которую сейчас можно запустить и ожидая окончания).
89 )))
90 |(% style="width:870px" %)(((
Alexandr Fokin 1.1 91 1. Родительский процесс создает триггер со счетчиком N, создает и запускает дочерние процессы, засыпает.
92 1. Дочерние процесс при завершении публикует TriggerEvent.
93 1. TriggerConsumerRunner периодически считывает батч TriggerEvent, уменьшает считчик триггера и делает запись в БД. За счет агрегации событий завершения процессов мы уменьшаем нагрузку на БД.
94 1. Когда все дочерние процессы отработали TriggerConsumerRunner получает значение счетчика 0 и взводит триггер.
95 1. Триггер пробуждает родительский процесс для дальнейшего выполнения.
96 )))
97 |(% style="width:870px" %)TriggerEvent публикуются без использования TransactionOutbox напрямую в брокер после коммита транзакции (иначе мы бы нагружали БД).
98 |(% style="width:870px" %)(((
Alexandr Fokin 1.8 99 Предполагаем, что основную часть времени система работает стабильно, но допускается ситуация, что транзакция закоммитилась, но TriggerEvent не смогли опубликоваться (остановка сервиса без graceful shutdown, проблемы соединения или работы с брокером сообщений).
Alexandr Fokin 1.1 100
101 Для таких случаев создается страхующий триггер (1 общий на тип процесса). Этот триггер запускается периодически и проходится по всем ожидающим процессам, проверяя условие (в реализации можно использовать keyset пагинацию) (в реализации можно использовать join для проверки условия).
102 Этот триггер выполняется периодически с более крупной временной задержкой. В случае обнаружения потери TriggerEvent, он поднимет заклинивший родительский процесс и он будет обработан (но позже). Можно установить этому триггеру низкий приоритет.
103 )))
Alexandr Fokin 4.2 104 |(% style="width:870px" %)[[image:Родительский дочерний процесс. Sequence.jpg]]
Alexandr Fokin 27.6 105 |(% style="width:870px" %)(((
Alexandr Fokin 27.12 106
Alexandr Fokin 1.1 107 )))
Alexandr Fokin 27.6 108 )))
Alexandr Fokin 30.2 109 |(% style="width:32px" %)2|(% style="width:171px" %)Transaction outbox stream process.|(% style="width:1066px" %)[[image:Проекты и репозитории.Библиотеки.Движок cccc1808\. ProcessEngine.Примеры.WebHome@TransactionOutbox. Sequence.jpg]]
Alexandr Fokin 6.2 110 |(% style="width:32px" %)3|(% style="width:171px" %)Stream trigger|(% style="width:1066px" %)(((
Alexandr Fokin 27.2 111 |(% style="width:94px" %) |(% style="width:1002px" %)(((
112 * Позволяет убрать лишние запросы пробуждения процесса (когда он и так запущен).
Alexandr Fokin 27.3 113 * Позволяет полностью убрать задержку после остановки процесса (если есть новое сообщения, то он сразу же будет пробужден).
Alexandr Fokin 27.4 114 За счет того, что триггер точно знает, что есть новые сообщения и процесс только что уснул.
Alexandr Fokin 27.2 115 * Вводит 2 типа события, 1 сигнал о новом сообщении (содержит offset значение), 2 - процесс идет спать (содержит offset значение).
116 * Вводит дополнительное состояние в триггер: максимальный offset сообщения, максимальный offset обработанного процессом сообщения, флаг состояния сна процесса.
117 )))
Alexandr Fokin 6.2 118 |(% style="width:94px" %)Алгоритм триггера.|(% style="width:1002px" %)(((
119 * При получении события о засыпании процесса:
120 Фиксирует смещение процесса обработки и сравнивает со смещением сообщения.
121 Если все сообщения обработаны, то не пробуждает процесс, иначе пробуждает процесс.
122 * При получении события о новом сообщении:
123 Фиксирует новое наибольшее смещение.
124 Если процесс не спит (по флагу в триггере), то ничего не делает.
125 Если процесс спит (по флагу), то пробуждает процесс.
126
127 Отслеживает смещение обработки процесса и последнего события.
128 Ожидает от процесса события о том, что он все обработал, его последнее смещение и он идет спать.
129 Если есть сообщения со смещением больше чем указал процесс, то делает гарантированное пробуждение процесса.
130 Когда поступает сигнал о новом сообщении (от отправителя сообщения), то обновляет данные о максимальном смещении и пробуждает процесс, если он спит
Alexandr Fokin 1.1 131 )))
Alexandr Fokin 6.2 132 |(% style="width:94px" %) |(% style="width:1002px" %)TODO:
133 )))
134 )))
Alexandr Fokin 1.1 135
Alexandr Fokin 27.5 136 ----
137
138 ==== Внутренние ссылки: ====
139
140 ====== Дочерние страницы: ======
141
142 {{children/}}
143
144 ====== Обратные ссылки: ======
145
146 {{velocity}}
147 #set ($links = $doc.getBacklinks())
148 #if ($links.size() > 0)
149 #foreach ($docname in $links)
150 #set ($rdoc = $xwiki.getDocument($docname).getTranslatedDocument())
151 * [[$escapetool.xml($rdoc.fullName)]]
152 #end
153 #else
154 No back links for this page!
155 #end
156 {{/velocity}}
157
158 ----