Пакетная обработка заявок с асинхронностью (Async)
Версия 3.9 от Alexandr Fokin на 2023/01/11 15:10
Используемые инструменты
1) Пакетная обработка
2) LongPooling
Исполнитель | Алгоритм |
Клиент | 1) От клиентов поступают заявки на некоторую обработку. На основе заявки создается запись (State) в потокобезопасной коллекции (ConcurrentDictionary). (В записи также содержится CancalidationToken, позволяющий прервать await клиента) Запрос встает на ожидание обработки с await. (LongPooling не разрываем соединение с клиентом) |
Воркер | 2) Через некоторые промежутки времени запускается некоторый Концепция Worker (Worker and WorkerParallelWrapper), который берет группу заявок из потокобезопасной коллекции и начинает их обрабатывать. По окончанию обработки Worker записывает результаты в соответствущие State и вызывает CancalidationToken, чтобы разблокировать ожидающего клиента. |
Клиент | 3) Клиент выходит с await и забирает свой State, в котором содержится некоторый результат обработки. ( Статус успешно и результат либо статус ошибка и сообщенеи об ошибке ) |
public class BatchProcessingContainer<TRequest, TResponse>
{
private readonly ConcurrentDictionary<Guid, (TRequest Request, TResponse? Response, CancelationTokenSource WaitToken) _Buffer
public async Task<TResponse?> RequestAsync(
TRequest request,
CancelationToken token = default
);
public async ValueTask<(Guid RequestId, TRequest Request)[]> GetForProcessingAsync(
int count? = null
);
public async ValueTask SetResultAsync(
Guid requestId,
TResponse response
)
}
{
private readonly ConcurrentDictionary<Guid, (TRequest Request, TResponse? Response, CancelationTokenSource WaitToken) _Buffer
public async Task<TResponse?> RequestAsync(
TRequest request,
CancelationToken token = default
);
public async ValueTask<(Guid RequestId, TRequest Request)[]> GetForProcessingAsync(
int count? = null
);
public async ValueTask SetResultAsync(
Guid requestId,
TResponse response
)
}