Версия 5.3 от Alexandr Fokin на 2023/01/11 15:18

Скрыть последних авторов
Alexandr Fokin 5.1 1 [[Асинхронность. Async Await>>doc:Разработка.NET.C#.Многопоточность и асинхронность.Асинхронность\. Async Await.WebHome]]
Alexandr Fokin 1.1 2
Alexandr Fokin 3.4 3
4 ----
5
Alexandr Fokin 3.1 6 Используемые инструменты
7 1) Пакетная обработка
8 2) LongPooling
9
Alexandr Fokin 1.1 10
Alexandr Fokin 3.6 11 |(% style="width:105px" %)Исполнитель|(% style="width:1370px" %)Алгоритм
12 |(% style="width:105px" %)Клиент|(% style="width:1370px" %)1) От клиентов поступают заявки на некоторую обработку.
13 На основе заявки создается запись (State) в потокобезопасной коллекции ([[ConcurrentDictionary>>doc:Разработка.NET.C#.Коллекции.System\. Collections\. Concurrent.ConcurrentDictionary.WebHome]]). (В записи также содержится CancalidationToken, позволяющий прервать await клиента)
14 Запрос встает на ожидание обработки с await. (LongPooling не разрываем соединение с клиентом)
15 |(% style="width:105px" %)Воркер|(% style="width:1370px" %)2) Через некоторые промежутки времени запускается некоторый [[Концепция Worker (Worker and WorkerParallelWrapper)>>doc:Архитектура и модели.Модели.Концепция Worker (Worker and WorkerParallelWrapper).WebHome]], который берет группу заявок из потокобезопасной коллекции и начинает их обрабатывать.
16 По окончанию обработки Worker записывает результаты в соответствущие State и вызывает CancalidationToken, чтобы разблокировать ожидающего клиента.
17 |(% style="width:105px" %)Клиент|(% style="width:1370px" %)3) Клиент выходит с await и забирает свой State, в котором содержится некоторый результат обработки.
Alexandr Fokin 2.1 18 (
Alexandr Fokin 3.6 19 Статус успешно и результат
Alexandr Fokin 2.1 20 либо статус ошибка и сообщенеи об ошибке
Alexandr Fokin 3.6 21 )
Alexandr Fokin 3.5 22
Alexandr Fokin 3.6 23 ----
24
Alexandr Fokin 3.7 25 {{code language="C#"}}
Alexandr Fokin 3.8 26 public class BatchProcessingContainer<TRequest, TResponse>
Alexandr Fokin 3.7 27 {
28 private readonly ConcurrentDictionary<Guid, (TRequest Request, TResponse? Response, CancelationTokenSource WaitToken) _Buffer
29
Alexandr Fokin 3.9 30 public async Task<TResponse?> RequestAsync(
Alexandr Fokin 3.7 31 TRequest request,
32 CancelationToken token = default
33 );
34
35
36 public async ValueTask<(Guid RequestId, TRequest Request)[]> GetForProcessingAsync(
37 int count? = null
38 );
39
40 public async ValueTask SetResultAsync(
41 Guid requestId,
42 TResponse response
43 )
44 }
45 {{/code}}