Long running tasks

this service is used to run long running tasks in background and return immediately HTTP 200. The user doesn't have to wait the end of the task and it uses a background worker with an async queue.


LongRunningTaskService and LongRunningTaskQueueWorker in core module

public interface ILongRunningTaskQueueWorker
{
    ValueTask QueueAsync(Func<CancellationToken, ValueTask> workItem);
    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken);
}

public class LongRunningTaskService : BackgroundService
{
    private readonly ILongRunningTaskQueueWorker _queue;

    public LongRunningTaskService(ILongRunningTaskQueueWorker queue)
    {
        _queue = queue;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var workItem = await _queue.DequeueAsync(stoppingToken);

            await workItem(stoppingToken);
        }
    }
}

public class LongRunningTaskQueueWorker : ILongRunningTaskQueueWorker
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public LongRunningTaskQueueWorker(int capacity)
    {
        // Capacity should be set based on the expected application load and
        // number of concurrent threads accessing the queue.
        // BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
        // which completes only when space became available. This leads to backpressure,
        // in case too many publishers/calls start accumulating.
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueAsync(Func<CancellationToken, ValueTask> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}


How to register LongRunningTaskService and LongRunningTaskQueueWorker for dependency injection

services.AddScoped<LongRunningTaskService>();
services.AddSingleton<ILongRunningTaskQueueWorker>(ctx => new LongRunningTaskQueueWorker(100));


How to use LongRunningTaskQueueWorker

[ApiController]
[Route("test")]
public class TestController : ControllerBase
{
    private readonly ILongRunningTaskQueueWorker _longRunningTaskQueueWorker;

    public TestController(ILongRunningTaskQueueWorker longRunningTaskQueueWorker)
    {
        _longRunningTaskQueueWorker = longRunningTaskQueueWorker;
    }

    [HttpPost("call")]
    public async Task CallSlowApi()
    {
        Console.WriteLine($"Starting at {DateTime.UtcNow.TimeOfDay}");
        await _longRunningTaskQueueWorker.QueueAsync(async token =>
        {
            // Call your logic here
            await Task.Delay(10000, token);
            Console.WriteLine($"Done at {DateTime.UtcNow.TimeOfDay}");
        });
    }
}


References

results matching ""

    No results matching ""