Coroutines

9 min read
Rapid overview

Async Streams, Channels & Coroutine Patterns in C#

IAsyncEnumerable\ Fundamentals

Q: What is IAsyncEnumerable<T> and why was it introduced?

A: IAsyncEnumerable<T> (C# 8 / .NET Core 3.0) is the asynchronous counterpart to IEnumerable<T>. It enables pull-based, lazy, asynchronous iteration -- the consumer requests items one at a time, and each item can involve an await. This is critical for streaming data from databases, APIs, or files without buffering the entire result set into memory.

public interface IAsyncEnumerable<out T>
{
    IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default);
}

public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
    T Current { get; }
    ValueTask<bool> MoveNextAsync();
}

Key details: MoveNextAsync() returns ValueTask<bool> (not Task<bool>) to avoid heap allocations when the result is synchronously available. The enumerator extends IAsyncDisposable so resources like database connections are properly cleaned up.

Async yield return

Q: How does async yield return work in C#?

A: When a method is declared async, returns IAsyncEnumerable<T>, and contains yield return, the compiler generates a state machine that handles both await suspension (for I/O) and yield return suspension (to hand a value to the consumer).

public async IAsyncEnumerable<string> ReadLinesAsync(string path)
{
    using var reader = new StreamReader(path);
    string? line;
    while ((line = await reader.ReadLineAsync()) is not null)
    {
        yield return line; // suspends, hands line to caller
    }
}

// Consumption
await foreach (var line in ReadLinesAsync("huge-file.csv"))
{
    Process(line); // only one line in memory at a time
}
Q: Can you use yield break inside an async iterator?

A: Yes. yield break terminates the sequence early. This is useful for applying limits or short-circuiting on a condition.

public async IAsyncEnumerable<Order> StreamUntilCutoff(
    IAsyncEnumerable<Order> source, DateTime cutoff)
{
    await foreach (var order in source)
    {
        if (order.CreatedAt > cutoff)
            yield break; // stop producing items

        yield return order;
    }
}

Comparison: IAsyncEnumerable\ vs Task\>

AspectTask<IEnumerable<T>>IAsyncEnumerable<T>
MemoryEntire collection bufferedOne item at a time
Time to first itemMust wait for all itemsImmediate on first yield
BackpressureNone -- all or nothingNatural -- consumer pulls
CancellationAt the Task levelPer-element via CancellationToken
Q: What is the difference between returning Task<IEnumerable<T>> and IAsyncEnumerable<T>?

A: Task<IEnumerable<T>> buffers all results into memory before returning. The caller waits for the entire collection. IAsyncEnumerable<T> streams results one at a time -- the producer and consumer run in a push-pull rhythm. This means lower memory usage, faster time-to-first-item, and natural backpressure.

// Buffered: loads ALL orders into memory, then returns
public async Task<IEnumerable<Order>> GetAllOrdersBuffered()
{
    return await _dbContext.Orders.ToListAsync(); // entire table in RAM
}

// Streaming: yields one order at a time
public async IAsyncEnumerable<Order> GetAllOrdersStreaming()
{
    await foreach (var order in _dbContext.Orders.AsAsyncEnumerable())
    {
        yield return order; // one row at a time
    }
}

CancellationToken with [EnumeratorCancellation]

Q: How do you wire cancellation into an async iterator?

A: Apply [EnumeratorCancellation] to a CancellationToken parameter. The await foreach statement automatically passes its WithCancellation token to the enumerator. Inside the method, check the token or pass it to async calls.

public async IAsyncEnumerable<SensorReading> StreamReadingsAsync(
    string sensorId,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    while (!ct.IsCancellationRequested)
    {
        var reading = await _sensorClient.ReadAsync(sensorId, ct);
        yield return reading;
        await Task.Delay(TimeSpan.FromSeconds(1), ct);
    }
}

// Caller passes cancellation via WithCancellation
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
await foreach (var reading in StreamReadingsAsync("temp-01")
    .WithCancellation(cts.Token))
{
    Console.WriteLine(reading.Value);
}
Q: Why is [EnumeratorCancellation] needed? Why not just pass the token normally?

A: The await foreach desugars to GetAsyncEnumerator(cancellationToken). The [EnumeratorCancellation] attribute tells the compiler to merge the token from WithCancellation() with the parameter so the iterator method receives it. Without it, the token passed via WithCancellation is ignored inside the method body.

Async Streams in ASP.NET Core

Q: How do you return an async stream from a controller action?

A: Return IAsyncEnumerable<T> from a controller method. ASP.NET Core serializes items into the response as they arrive, flushing chunks over the wire. This keeps server memory flat even for large datasets.

[HttpGet("orders/stream")]
public async IAsyncEnumerable<OrderDto> StreamOrders(
    [EnumeratorCancellation] CancellationToken ct)
{
    await foreach (var order in _orderService.GetAllAsync(ct))
    {
        yield return _mapper.Map<OrderDto>(order);
    }
}

The response is serialized as a JSON array by System.Text.Json, with each element flushed as it becomes available. The HttpContext.RequestAborted token propagates into ct.

Q: How does SignalR use async streams for server-to-client streaming?

A: A hub method that returns IAsyncEnumerable<T> becomes a streaming endpoint. The client receives items as they are yielded.

public class StockHub : Hub
{
    public async IAsyncEnumerable<StockPrice> StreamPrices(
        string symbol,
        [EnumeratorCancellation] CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            yield return await _priceService.GetLatestAsync(symbol, ct);
            await Task.Delay(250, ct);
        }
    }
}

Channel\ for Producer-Consumer

Q: What is Channel<T> and how does it relate to async streams?

A: Channel<T> (from System.Threading.Channels) is a high-performance, thread-safe producer-consumer queue. It bridges producers and consumers that run at different speeds. A channel's Reader exposes ReadAllAsync() which returns IAsyncEnumerable<T>, making it composable with async stream patterns.

// Bounded channel: backpressure kicks in at capacity
var channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait // producer waits when full
});

// Producer (e.g., request middleware)
await channel.Writer.WriteAsync(new LogEntry("Request received"), ct);

// Consumer (background service)
public class LogConsumer : BackgroundService
{
    private readonly ChannelReader<LogEntry> _reader;

    public LogConsumer(Channel<LogEntry> channel) => _reader = channel.Reader;

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        await foreach (var entry in _reader.ReadAllAsync(ct))
        {
            await _sink.WriteAsync(entry);
        }
    }
}
OptionBoundedUnbounded
Memory safetyCapped at capacityGrows without limit
BackpressureWriteAsync blocks at limitNever blocks
Use caseHigh-throughput pipelinesLow-volume, trusted producers
Q: When would you choose a bounded vs unbounded channel?

A: Use bounded channels when the producer can outrun the consumer -- the capacity limit provides backpressure. Use unbounded channels when the producer rate is naturally constrained or you accept unbounded memory growth. In most real-world scenarios, bounded channels are safer.

System.Linq.Async

Q: What is System.Linq.Async and why is it needed?

A: Standard LINQ operators (Where, Select, Take, etc.) work on IEnumerable<T>. They do not work on IAsyncEnumerable<T>. The NuGet package System.Linq.Async provides async LINQ operators so you can compose async streams declaratively.

// NuGet: dotnet add package System.Linq.Async

var results = _repository.StreamOrdersAsync()
    .WhereAwait(async o => await _validator.IsValidAsync(o))
    .Select(o => new OrderDto(o.Id, o.Total))
    .Take(100);

await foreach (var dto in results)
{
    Console.WriteLine(dto);
}

Key operators: WhereAwait, SelectAwait (when the projection itself is async), Where, Select (for synchronous lambdas), Take, Skip, ToListAsync, FirstOrDefaultAsync, AnyAsync, CountAsync.

Backpressure

Q: What is backpressure and how do async streams provide it?

A: Backpressure is a mechanism where a slow consumer signals a fast producer to slow down, preventing memory exhaustion. IAsyncEnumerable<T> provides natural backpressure because the producer only runs when the consumer calls MoveNextAsync(). If the consumer is slow, the producer is idle.

With Channel<T>, backpressure is explicit: a bounded channel's WriteAsync blocks when the buffer is full, forcing the producer to wait.

// Natural backpressure: producer yields only when consumer asks
public async IAsyncEnumerable<DataChunk> ProduceAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    await foreach (var raw in _source.ReadAllAsync(ct))
    {
        var chunk = await TransformAsync(raw, ct); // only runs when consumer is ready
        yield return chunk;
    }
}
Q: What happens if you need to decouple producer and consumer speeds entirely?

A: Use a bounded Channel<T> as a buffer between them. The producer writes at its own pace (up to the channel capacity), and the consumer reads at its own pace. When the channel is full, the producer awaits until space opens. This decouples the two without unbounded memory growth.

Real-World Use Cases

Q: Name five real-world scenarios where IAsyncEnumerable<T> shines.

A: (1) Large database result sets -- stream rows without loading the entire table via EF Core's AsAsyncEnumerable(). (2) File processing -- read and process lines from multi-GB files one at a time. (3) API pagination -- fetch pages from an external API and yield items across page boundaries transparently. (4) gRPC server streaming -- gRPC uses IAsyncEnumerable<T> for server-streaming RPCs. (5) Real-time data feeds -- combine with Channel<T> to push market data, IoT telemetry, or log entries to consumers.

// Transparent pagination over an external API
public async IAsyncEnumerable<User> GetAllUsersAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    string? cursor = null;
    do
    {
        var page = await _httpClient.GetFromJsonAsync<PagedResult<User>>(
            $"/users?cursor={cursor}", ct);
        foreach (var user in page!.Items)
            yield return user;
        cursor = page.NextCursor;
    } while (cursor is not null);
}

Performance Considerations

Q: What are the performance characteristics of IAsyncEnumerable<T> vs buffered collections?

A: Async streams reduce peak memory by avoiding full materialization. The ValueTask<bool> return from MoveNextAsync() avoids Task heap allocations in the common synchronous-completion case. However, there is per-element overhead from the state machine. For small collections (under ~100 items), Task<List<T>> can be faster due to fewer state transitions. For large or unbounded sequences, async streams win on memory and time-to-first-item.

Q: How does ConfigureAwait(false) work with await foreach?

A: Use ConfigureAwait(false) on the enumerable to avoid capturing the synchronization context on each iteration. This is important in library code.

await foreach (var item in StreamDataAsync().ConfigureAwait(false))
{
    Process(item); // runs on thread-pool thread, not original context
}
Q: Are there allocation concerns with async iterators?

A: The compiler-generated state machine for an async iterator is allocated once on the heap. Each MoveNextAsync() call reuses the same object. The ValueTask<bool> avoids per-call Task allocations when the next item is immediately available. For hot paths, Channel<T> can outperform async iterators because its internal ring buffer is optimized for concurrent access with minimal allocations.

Questions & Answers

Q1: What is the difference between IAsyncEnumerable<T> and IObservable<T> (Rx)?

A: IAsyncEnumerable<T> is pull-based -- the consumer requests items via MoveNextAsync(). IObservable<T> is push-based -- the producer pushes items to subscribers. Use async enumerables when the consumer controls the pace; use Rx when the producer controls the pace and you need operators like Throttle, Buffer, or CombineLatest.

Q2: Can EF Core return IAsyncEnumerable<T> directly?

A: Yes. Call AsAsyncEnumerable() on any IQueryable<T> to get streaming results. This avoids ToListAsync() buffering. The database cursor remains open during iteration, so keep the DbContext alive for the duration.

Q3: How do you test async iterators?

A: Consume them with await foreach in a test and collect results into a list. Use System.Linq.Async's ToListAsync() for convenience. For cancellation tests, pass a pre-cancelled token and assert the sequence terminates.

[Fact]
public async Task StreamOrders_respects_cancellation()
{
    using var cts = new CancellationTokenSource();
    var items = new List<Order>();

    await foreach (var order in _service.StreamOrdersAsync()
        .WithCancellation(cts.Token))
    {
        items.Add(order);
        if (items.Count == 3) cts.Cancel();
    }

    Assert.Equal(3, items.Count);
}

Q4: What is ChannelWriter.Complete() and why is it important?

A: Calling Complete() on the writer signals that no more items will be produced. The reader's ReadAllAsync() loop will finish gracefully once all buffered items are consumed. Forgetting to call Complete() causes the consumer to hang indefinitely.

Q5: How do you handle errors in an async stream?

A: Exceptions thrown inside an async iterator propagate to the consumer on the next MoveNextAsync() call. For Channel<T>, call Writer.Complete(exception) to signal an error. The consumer's ReadAllAsync() will throw that exception after draining buffered items.

Q6: What is the role of BoundedChannelFullMode?

A: It determines what happens when a bounded channel is full: Wait (default) blocks the producer, DropNewest discards the newest item, DropOldest discards the oldest, and DropWrite discards the item being written. Choose based on your domain -- Wait for lossless processing, DropOldest for live telemetry where stale data is irrelevant.