Исходный код вики Пакетная обработка заявок с асинхронностью (Async)
Версия 5.1 от Alexandr Fokin на 2023/01/11 15:18
Последние авторы
author | version | line-number | content |
---|---|---|---|
1 | [[Асинхронность. Async Await>>doc:Разработка.NET.C#.Многопоточность и асинхронность.Асинхронность\. Async Await.WebHome]] | ||
2 | |||
3 | |||
4 | ---- | ||
5 | |||
6 | Используемые инструменты | ||
7 | 1) Пакетная обработка | ||
8 | 2) LongPooling | ||
9 | |||
10 | |||
11 | |(% style="width:105px" %)Исполнитель|(% style="width:1370px" %)Алгоритм | ||
12 | |(% style="width:105px" %)Клиент|(% style="width:1370px" %)1) От клиентов поступают заявки на некоторую обработку. | ||
13 | На основе заявки создается запись (State) в потокобезопасной коллекции ([[ConcurrentDictionary>>doc:Разработка.NET.C#.Коллекции.System\. Collections\. Concurrent.ConcurrentDictionary.WebHome]]). (В записи также содержится CancalidationToken, позволяющий прервать await клиента) | ||
14 | Запрос встает на ожидание обработки с await. (LongPooling не разрываем соединение с клиентом) | ||
15 | |(% style="width:105px" %)Воркер|(% style="width:1370px" %)2) Через некоторые промежутки времени запускается некоторый [[Концепция Worker (Worker and WorkerParallelWrapper)>>doc:Архитектура и модели.Модели.Концепция Worker (Worker and WorkerParallelWrapper).WebHome]], который берет группу заявок из потокобезопасной коллекции и начинает их обрабатывать. | ||
16 | По окончанию обработки Worker записывает результаты в соответствущие State и вызывает CancalidationToken, чтобы разблокировать ожидающего клиента. | ||
17 | |(% style="width:105px" %)Клиент|(% style="width:1370px" %)3) Клиент выходит с await и забирает свой State, в котором содержится некоторый результат обработки. | ||
18 | ( | ||
19 | Статус успешно и результат | ||
20 | либо статус ошибка и сообщенеи об ошибке | ||
21 | ) | ||
22 | |||
23 | ---- | ||
24 | |||
25 | {{code language="C#"}} | ||
26 | public class BatchProcessingContainer<TRequest, TResponse> | ||
27 | { | ||
28 | private readonly ConcurrentDictionary<Guid, (TRequest Request, TResponse? Response, CancelationTokenSource WaitToken) _Buffer | ||
29 | |||
30 | public async Task<TResponse?> RequestAsync( | ||
31 | TRequest request, | ||
32 | CancelationToken token = default | ||
33 | ); | ||
34 | |||
35 | |||
36 | public async ValueTask<(Guid RequestId, TRequest Request)[]> GetForProcessingAsync( | ||
37 | int count? = null | ||
38 | ); | ||
39 | |||
40 | public async ValueTask SetResultAsync( | ||
41 | Guid requestId, | ||
42 | TResponse response | ||
43 | ) | ||
44 | } | ||
45 | {{/code}} |