Przejdź do treści

.NET Channels

  • przez

Wraz z nadejściem .NET Core 3.0 otrzymaliśmy do rąk potężne narzędzie – bibliotekę System.Threading.Channels. Mimo że od premiery minęło już sporo czasu, wiele zespołów wciąż polega na przestarzałych mechanizmach jak BlockingCollection lub ryzykownych operacjach na wątkach puli, by obsłużyć asynchroniczne przetwarzanie danych.

Channels to nowoczesna implementacja wzorca Producer-Consumer, zoptymalizowana pod kątem wysokiej przepustowości i niskiej alokacji pamięci. W połączeniu z wbudowanym w .NET mechanizmem IHostedService (BackgroundService), pozwala na tworzenie robustnych procesów przetwarzania w tle bezpośrednio w instancji aplikacji.

Dziś przejdziemy przez implementację, która zapewnia nie tylko asynchroniczność, ale także mechanizm Backpressure – chroniący Twój serwer przed zalaniem zbyt dużą liczbą zadań. Zobaczmy, jak to zrobić dobrze.

Użycie System.Threading.Channels w połączeniu z .NET Core BackgroundService (usługami w tle) to jeden z najwydajniejszych i najnowocześniejszych wzorców do obsługi asynchronicznego przetwarzania zadań w aplikacjach .NET.

Jest to realizacja wzorca Producer-Consumer (Producent-Konsument), która pozwala na bezpieczne i szybkie przesyłanie danych między wątkami.

Oto szczegółowe wyjaśnienie tego, jak te dwa elementy współpracują i dlaczego warto ich używać.

1. Kluczowe koncepcje

Czym są Channels (System.Threading.Channels)?

To struktury danych wprowadzone w .NET Core 3.0, które działają jak „inteligentne kolejki”. Są zoptymalizowane pod kątem wysokiej wydajności i bezpieczeństwa wątkowego (thread-safety).

  • Producer (Writer): Zapisuje dane do kanału (np. kontroler API odbierający żądanie).
  • Consumer (Reader): Odczytuje dane z kanału (zazwyczaj usługa w tle).

Dzielimy je na dwa główne typy:

  • Unbounded (Nieograniczone): Może przyjąć nieskończoną liczbę elementów. Ryzykowne, bo jeśli konsument nie nadąża, pamięć RAM może się wyczerpać.
  • Bounded (Ograniczone): Ma ustaloną pojemność (np. 100 elementów). Jeśli kanał jest pełny, producent musi poczekać (asynchronicznie), aż zwolni się miejsce. To zapewnia tzw. Backpressure (kontrolę przepływu).+1

Czym jest BackgroundService?

To klasa bazowa w .NET (implementująca IHostedService), przeznaczona do tworzenia długo działających procesów w tle. Uruchamia się ona wraz ze startem aplikacji i działa aż do jej zamknięcia.

2. Jak to działa razem? (Architektura)

W typowym scenariuszu (np. Web API) wygląda to tak:

  1. Rejestracja: W kontenerze DI rejestrujesz Kanał jako Singleton. Dzięki temu zarówno Producent, jak i Konsument mają dostęp do tej samej instancji.
  2. Produkcja (API): Użytkownik wysyła żądanie HTTP (np. „Wyślij e-mail”). Kontener wstrzykuje Kanał do kontrolera. Kontroler „wrzuca” zadanie do Kanału i natychmiast zwraca odpowiedź „202 Accepted” użytkownikowi (nie czekając na wysyłkę).
  3. Konsumpcja (Worker): BackgroundService działa w pętli while. Nasłuchuje na nowe wiadomości w Kanale. Gdy tylko coś się pojawi, pobiera to i przetwarza (np. faktycznie wysyła e-mail).

3. Przykład implementacji (Code Snippet)

Poniżej znajdziesz uproszczony, ale w pełni funkcjonalny przykład.

Krok A: Definicja kanału (Wrapper)

Zaleca się opakowanie czystego Channel<T> w prostą klasę, aby ukryć szczegóły implementacyjne przed resztą aplikacji.

using System.Threading.Channels;

public class BackgroundTaskChannel
{
    // Używamy Bounded dla bezpieczeństwa pamięci
    private readonly Channel<string> _channel;

    public BackgroundTaskChannel()
    {
        // Opcje: Pojemność 100. Jeśli pełny -> czekaj (Wait).
        var options = new BoundedChannelOptions(100)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _channel = Channel.CreateBounded<string>(options);
    }

    // Metoda dla Producenta
    public async ValueTask AddTaskAsync(string taskItem, CancellationToken ct)
    {
        await _channel.Writer.WriteAsync(taskItem, ct);
    }

    // Metoda dla Konsumenta
    public IAsyncEnumerable<string> ReadAllAsync(CancellationToken ct)
    {
        return _channel.Reader.ReadAllAsync(ct);
    }
}

Krok B: Usługa w tle (Consumer)

public class WorkerService : BackgroundService
{
    private readonly BackgroundTaskChannel _channel;
    private readonly ILogger<WorkerService> _logger;

    public WorkerService(BackgroundTaskChannel channel, ILogger<WorkerService> logger)
    {
        _channel = channel;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Pętla odczytująca z kanału. Zakończy się automatycznie, gdy kanał zostanie zamknięty lub token anulowany.
        await foreach (var taskItem in _channel.ReadAllAsync(stoppingToken))
        {
            try
            {
                _logger.LogInformation($"Przetwarzanie zadania: {taskItem}");
                
                // Symulacja ciężkiej pracy
                await Task.Delay(1000, stoppingToken); 
                
                _logger.LogInformation("Zakończono.");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Błąd podczas przetwarzania zadania.");
            }
        }
    }
}

Krok C: Rejestracja w Program.cs

var builder = WebApplication.CreateBuilder(args);

// 1. Rejestrujemy kanał jako Singleton (Współdzielony stan)
builder.Services.AddSingleton<BackgroundTaskChannel>();

// 2. Rejestrujemy nasz Worker Service (Hosted Service)
builder.Services.AddHostedService<WorkerService>();

var app = builder.Build();

// 3. Symulacja Producenta (Minimal API)
app.MapPost("/send", async (BackgroundTaskChannel channel, string message) =>
{
    // "Fire and forget" z perspektywy HTTP, ale bezpiecznie zakolejkowane
    await channel.AddTaskAsync(message, CancellationToken.None);
    return Results.Accepted(value: "Zadanie przyjęte do realizacji.");
});

app.Run();

4. Zalety tego rozwiązania

  1. Wydajność: System.Threading.Channels jest znacznie lżejsze i szybsze niż tradycyjne blokady (lock) czy BlockingCollection.
  2. Backpressure: Dzięki opcji Bounded, jeśli Twój Worker nie nadąża przetwarzać zadań, API zwolni przyjmowanie nowych (zamiast doprowadzić do wycieku pamięci przez nieskończone kolejkowanie obiektów).
  3. Separacja odpowiedzialności: Kontrolery API zajmują się tylko przyjmowaniem żądań HTTP, a logika biznesowa (często ciężka) jest przeniesiona do tła.
  4. Skalowalność: Łatwo zmienić liczbę Consumerów (Workerów) czytających z tego samego kanału, jeśli potrzebujesz przetwarzać zadania równolegle.

Kiedy NIE używać tego rozwiązania?

Channels są kolejkami in-memory (w pamięci RAM).

  • Jeśli aplikacja się zrestartuje (awaria, deployment) – tracisz wszystkie nieprzetworzone wiadomości.
  • Jeśli potrzebujesz gwarancji dostarczenia (durability) i przetrwania restartów, musisz użyć zewnętrznego brokera wiadomości, takiego jak RabbitMQ, Azure Service Bus lub bazy danych (np. biblioteka Hangfire).
Tagi: