Пакетная обработка заявок с асинхронностью (Async)

Редактировал(а) Alexandr Fokin 2023/01/23 23:00

Асинхронность. Async Await

Модели


Используемые инструменты
1) Пакетная обработка
2) LongPooling

ИсполнительАлгоритм
Клиент1) От клиентов поступают заявки на некоторую обработку.
На основе заявки создается запись (State) в потокобезопасной коллекции (ConcurrentDictionary). (В записи также содержится CancalidationToken, позволяющий прервать await клиента)
Запрос встает на ожидание обработки с await. (LongPooling не разрываем соединение с клиентом)
Воркер2) Через некоторые промежутки времени запускается некоторый Концепция Worker (Worker and WorkerParallelWrapper), который берет группу заявок из потокобезопасной коллекции и начинает их обрабатывать.
По окончанию обработки Worker записывает результаты в соответствущие State и вызывает CancalidationToken, чтобы разблокировать ожидающего клиента.
Клиент3) Клиент выходит с await и забирает свой State, в котором содержится некоторый результат обработки.
(
  Статус успешно и результат
  либо статус ошибка и сообщенеи об ошибке
)

public class BatchProcessingContainer<TRequest, TResponse>
{
 private readonly ConcurrentDictionary<Guid, (TRequest Request, TResponse? Response, CancelationTokenSource WaitToken) _Buffer

 public async Task<TResponse?> RequestAsync(
    TRequest request,
    CancelationToken token = default
  );


 public async ValueTask<(Guid RequestId, TRequest Request)[]> GetForProcessingAsync(
   int count? = null
  );

 public async ValueTask SetResultAsync(
    Guid requestId,
    TResponse response
  )
}
Теги: