Seguro que alguna vez has tenido que diseñar e implementar el patrón Productor / Consumidor para alguna funcionalidad asíncrona. En este post vamos a ver una pequeña y sencilla introducción a los Channels en C# y cómo nos ayudan a implementar este patrón.

¿Qué significa este patrón?

El patrón Productor / Consumidor es un patrón de sistemas distribuidos y programación concurrente que separa por un lado la responsabilidad del productor o productores, que son los encargados de añadir un nuevo trabajo, y, por otro lado, la del consumidor o los consumidores que ejecutan este trabajo.

Llevado a un escenario orientado a eventos podríamos decir que el productor es el que añade un evento y el consumidor es el que lo consume y procesa.

graph TD; P1(Producer 1)-->Channel[/Channel/]; PN(Producer N)-->Channel; Channel-->C1(Consumer 1); Channel-->CN(Consumer N);

Vale pero… ¿qué son los Channels?

Un Channel es una estructura de datos thread-safe que permite añadir y consumir elementos de forma asíncrona. Para entenderlo fácilmente podríamos decir que son colas o buffers que permiten leer y escribir de manera asíncrona. Podemos definirlos con una capacidad limitada Channel.CreateBounded<T>(int capacity) o ilimitada Channel.CreateUnbounded<T>(). Con esta última debemos tener cuidado si tenemos un desfase muy grande entre los elementos producidos y los consumidos, pues podemos tener muchos elementos en memoria y provocar un Out Of Memory.

Una vez creados podemos acceder al Writer o al Reader que serán las dos propiedades que exponga el Channel y que nos ayudarán a separar la responsabilidad del productor (Writer) y el consumidor (Reader).

Dentro del Writer encontramos el método WriteAsync(T item), que usaremos para añadir el elemento y esperar en caso de que sea necesario, y el método TryComplete(), que marcará el Channel como completado y que permitirá a los consumidores saber que no se añadirán más datos a este Channel.

En el lado del Reader encontramos el método ReadAsync(), que leerá un elemento de la cola (y esperará en caso necesario), y el método ReadAllAsync(), que devuelve un IAsyncEnumerable y que podemos estar consumiendo de manera indefinida hasta que el Channel se complete. Este último método me parece muy interesante y lo usaré después en el ejemplo.

Cuando el Channel se complete, los métodos ReadAsync() y WriteAsync() lanzarán una excepción de tipo ChannelClosedException mientras que el bucle que estaba leyendo de ReadAllAsync() terminará sin lanzar excepción (es decir, sale del bucle y listo).

Show me the code!

El ejemplo más sencillo que se me ha ocurrido para ilustrar todo esto es una API que recibe y envía mensajes al Channel (productor) y un BackgroundService que los procesa (consumidor). Para hacerlo todavía más sencillo estoy registrando directamente el Channel como un servicio singleton que será inyectado tanto en el método POST de la API como en el BackgroundService.

Esta sería la definición del productor junto con el registro de dependencias usando minimal API:

using System.Threading.Channels;
using Microsoft.AspNetCore.Mvc;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddSingleton(Channel.CreateUnbounded<string>());
builder.Services.AddHostedService<MessageConsumer>();
var app = builder.Build();

// Productor
app.MapPost("/message", async ([FromServices]Channel<string> channel, [FromBody]string message) => 
    await channel.Writer.WriteAsync(message));

app.Run();

Y este sería el consumidor:

using System.Threading.Channels;

public class MessageConsumer : BackgroundService
{
    private readonly Channel<string> _channel;
    private readonly ILogger<MessageConsumer> _logger;

    public MessageConsumer(Channel<string> channel, ILogger<MessageConsumer> logger)
    {
        _channel = channel;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Consumidor
        await foreach (var message in _channel.Reader.ReadAllAsync(stoppingToken))
        {
            _logger.LogInformation("Message received: {Message}", message);
        }
    }
}

Como vemos, la API de Channels es súper intuitiva y sencilla y hace que sea la opción casi por defecto a tener en cuenta cuando nos enfrentamos a este tipo de problemas. Aunque no creo que sea un caso muy común y puede que no encuentres muchas oportunidades de usarlo en tu día a día, siempre es bueno conocer las alternativas que tenemos por si ese momento llega.

P.D. El ejemplo lo podéis descargar de GitHub si queréis probarlo por vuestra cuenta.

Bibliografía