I have a producer-consumer scenario¹ based on a bounded Channel<T>
.
Channel<Item> channel = Channel.CreateBounded<Item>(10);
The items are coming from an RDBMS, to which the producer connects and fetches them one by one. The peculiarity is that I don't have the luxury of keeping the DB connection alive for the whole lifetime of the producer. I have to close the connection when the channel is full, and reopen the connection when the channel has again space available for a new item. So I implemented the producer like this:
// Producer
while (true)
{
await channel.Writer.WaitToWriteAsync();
connection.Open();
Item item;
while (true)
{
item = connection.GetNextItem(); // This is fast
if (!channel.Writer.TryWrite(item)) break;
}
connection.Close();
await channel.Writer.WriteAsync(item);
}
The producer waits until the channel.Writer.WaitToWriteAsync()
task completes, then opens the DB connection, writes in the channel as many items as it can until one is rejected, closes the DB connection, writes asynchronously the rejected item, and loops back to the waiting.
The consumer is pretty standard:
// Consumer
await foreach (Item item in channel.Reader.ReadAllAsync())
{
// Process the item (this is slow)
}
My problem with this design is that the DB connection is opened and closed too often. Both opening and closing the connection has a non-negligible overhead, so I would like to minimize how frequently it happens. Although the capacity of the channel is 10, I would prefer if the WaitToWriteAsync
task was completing when the channel is half-full (5 items), not immediately when the stored items drop from 10 to 9.
My question is: How can I modify my producer, so that it connects to the database when there are 5 or less items in the channel, and closes the connection when the channel is full with 10 items?
Below is the output from a minimal example that I wrote, that reproduces the undesirable behavior:
19:20:55.811 [4] > Opening connection -->
19:20:55.933 [4] > Produced #1
19:20:55.934 [4] > Produced #2
19:20:55.934 [4] > Produced #3
19:20:55.934 [4] > Produced #4
19:20:55.934 [4] > Produced #5
19:20:55.934 [4] > Produced #6
19:20:55.935 [4] > Produced #7
19:20:55.935 [4] > Produced #8
19:20:55.935 [4] > Produced #9
19:20:55.935 [4] > Produced #10
19:20:55.935 [4] > Produced #11
19:20:55.935 [4] > Closing connection <--
19:20:55.936 [6] > Consuming: 1
19:20:56.037 [4] > Consuming: 2
19:20:56.037 [6] > Opening connection -->
19:20:56.137 [6] > Produced #12
19:20:56.137 [6] > Produced #13
19:20:56.137 [6] > Closing connection <--
19:20:56.137 [4] > Consuming: 3
19:20:56.238 [6] > Consuming: 4
19:20:56.238 [4] > Opening connection -->
19:20:56.338 [4] > Produced #14
19:20:56.338 [4] > Produced #15
19:20:56.338 [4] > Closing connection <--
19:20:56.338 [6] > Consuming: 5
19:20:56.439 [4] > Consuming: 6
19:20:56.439 [6] > Opening connection -->
19:20:56.539 [6] > Produced #16
19:20:56.539 [6] > Produced #17
19:20:56.539 [6] > Closing connection <--
19:20:56.539 [4] > Consuming: 7
19:20:56.644 [6] > Consuming: 8
19:20:56.644 [4] > Opening connection -->
19:20:56.744 [4] > Produced #18
19:20:56.745 [7] > Consuming: 9
19:20:56.745 [4] > Produced #19
19:20:56.745 [4] > Produced #20
19:20:56.745 [4] > Closing connection <--
19:20:56.846 [7] > Consuming: 10
19:20:56.847 [4] > Producer completed
19:20:56.946 [4] > Consuming: 11
19:20:57.046 [4] > Consuming: 12
19:20:57.147 [4] > Consuming: 13
19:20:57.247 [4] > Consuming: 14
19:20:57.347 [4] > Consuming: 15
19:20:57.452 [4] > Consuming: 16
19:20:57.552 [4] > Consuming: 17
19:20:57.653 [4] > Consuming: 18
19:20:57.753 [4] > Consuming: 19
19:20:57.854 [4] > Consuming: 20
19:20:57.955 [1] > Finished
As you can see there is a lot of "Opening/Closing connection" going on.
My question has similarities with this older question:
Given an external producer API that can be stopped and started, efficiently stop the producer when local buffer is full.
The difference is that in my case the producer is a loop, and not the event handler of a service, as in the other question.
¹ This scenario is contrived. It was inspired by a relatively recent GitHub API proposal.
Clarification: The channel should not be drained completely before reconnecting to the DB. That's because opening the connection takes some time, and I don't want the consumer to be idle during this time. So the producer should reconnect when the channel has dropped to 5 items or less, not when it is completely empty.
One way to solve this problem is to use two channels, a bounded Channel<T>
with the desirable capacity, and a second bounded Channel<int>
with capacity 1 that is used only for the WaitToWriteAsync
functionality. Synchronizing the two channels is not trivial, so I wrote a custom Channel<T>
implementation that wraps these two channels, and does the synchronization internally:
public sealed class DoubleCapacityChannel<T> : Channel<T>
{
private readonly Channel<T> _channel;
private readonly Channel<int> _channelLow;
private readonly int _lowCapacity;
public DoubleCapacityChannel(int highCapacity, int lowCapacity)
{
if (highCapacity < 1)
throw new ArgumentOutOfRangeException(nameof(highCapacity));
if (lowCapacity < 1 || lowCapacity > highCapacity)
throw new ArgumentOutOfRangeException(nameof(lowCapacity));
_lowCapacity = lowCapacity;
_channel = Channel.CreateBounded<T>(highCapacity);
Debug.Assert(_channel.Reader.CanCount);
_channelLow = Channel.CreateBounded<int>(1);
this.Writer = new ChannelWriter(this);
this.Reader = new ChannelReader(this);
}
private class ChannelWriter : ChannelWriter<T>
{
private readonly DoubleCapacityChannel<T> _parent;
public ChannelWriter(DoubleCapacityChannel<T> parent) => _parent = parent;
public override bool TryComplete(Exception error = null)
{
lock (_parent._channel)
{
bool success = _parent._channel.Writer.TryComplete(error);
if (success) _parent._channelLow.Writer.TryComplete(error);
return success;
}
}
public override bool TryWrite(T item)
{
lock (_parent._channel)
{
bool success = _parent._channel.Writer.TryWrite(item);
if (!success || _parent._channel.Reader.Count >= _parent._lowCapacity)
_parent._channelLow.Writer.TryWrite(0);
return success;
}
}
public override async ValueTask WriteAsync(T item,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
while (true)
{
if (this.TryWrite(item)) break;
try
{
if (await _parent._channel.Writer.WaitToWriteAsync(
cancellationToken).ConfigureAwait(false)) continue;
}
catch (Exception ex) { throw new ChannelClosedException(ex); }
throw new ChannelClosedException();
}
}
public override ValueTask<bool> WaitToWriteAsync(
CancellationToken cancellationToken = default)
=> _parent._channelLow.Writer.WaitToWriteAsync(cancellationToken);
}
private class ChannelReader : ChannelReader<T>
{
private readonly DoubleCapacityChannel<T> _parent;
public ChannelReader(DoubleCapacityChannel<T> parent) => _parent = parent;
public override Task Completion => _parent._channel.Reader.Completion;
public override bool CanCount => _parent._channel.Reader.CanCount;
public override int Count => _parent._channel.Reader.Count;
public override bool TryRead(out T item)
{
lock (_parent._channel)
{
bool success = _parent._channel.Reader.TryRead(out item);
if (!success || _parent._channel.Reader.Count < _parent._lowCapacity)
_parent._channelLow.Reader.TryRead(out _);
return success;
}
}
public override ValueTask<T> ReadAsync(
CancellationToken cancellationToken = default)
=> _parent._channel.Reader.ReadAsync(cancellationToken);
public override ValueTask<bool> WaitToReadAsync(
CancellationToken cancellationToken = default)
=> _parent._channel.Reader.WaitToReadAsync(cancellationToken);
public override bool CanPeek => _parent._channel.Reader.CanPeek;
public override bool TryPeek(out T item)
=> _parent._channel.Reader.TryPeek(out item);
}
}
The lowCapacity
affects only the WaitToWriteAsync
method.
The DoubleCapacityChannel<T>
class can solve the problem in the question by changing the line:
Channel<Item> channel = Channel.CreateBounded<Item>(10);
to
Channel<Item> channel = new DoubleCapacityChannel<Item>(10, 5);
Below is the output from the original minimal example, modified to use the DoubleCapacityChannel<T>
:
19:26:19.119 [4] > Opening connection -->
19:26:19.241 [4] > Produced #1
19:26:19.243 [4] > Produced #2
19:26:19.243 [4] > Produced #3
19:26:19.243 [4] > Produced #4
19:26:19.243 [4] > Produced #5
19:26:19.243 [4] > Produced #6
19:26:19.243 [4] > Produced #7
19:26:19.243 [4] > Produced #8
19:26:19.243 [4] > Produced #9
19:26:19.243 [4] > Produced #10
19:26:19.243 [4] > Produced #11
19:26:19.243 [4] > Closing connection <--
19:26:19.244 [6] > Consuming: 1
19:26:19.345 [6] > Consuming: 2
19:26:19.446 [6] > Consuming: 3
19:26:19.547 [6] > Consuming: 4
19:26:19.651 [6] > Consuming: 5
19:26:19.752 [6] > Consuming: 6
19:26:19.852 [6] > Consuming: 7
19:26:19.853 [4] > Opening connection -->
19:26:19.953 [6] > Consuming: 8
19:26:19.953 [4] > Produced #12
19:26:19.953 [4] > Produced #13
19:26:19.953 [4] > Produced #14
19:26:19.953 [4] > Produced #15
19:26:19.953 [4] > Produced #16
19:26:19.953 [4] > Produced #17
19:26:19.953 [4] > Produced #18
19:26:19.953 [4] > Produced #19
19:26:19.953 [4] > Closing connection <--
19:26:20.053 [6] > Consuming: 9
19:26:20.154 [4] > Consuming: 10
19:26:20.254 [4] > Consuming: 11
19:26:20.355 [4] > Consuming: 12
19:26:20.455 [4] > Consuming: 13
19:26:20.556 [4] > Consuming: 14
19:26:20.656 [4] > Consuming: 15
19:26:20.656 [6] > Opening connection -->
19:26:20.757 [6] > Produced #20
19:26:20.757 [6] > Produced #21
19:26:20.757 [6] > Produced #22
19:26:20.757 [6] > Produced #23
19:26:20.757 [6] > Produced #24
19:26:20.757 [6] > Produced #25
19:26:20.757 [6] > Produced #26
19:26:20.757 [6] > Produced #27
19:26:20.757 [6] > Closing connection <--
19:26:20.757 [4] > Consuming: 16
19:26:20.858 [4] > Consuming: 17
19:26:20.859 [6] > Producer completed
19:26:20.959 [6] > Consuming: 18
19:26:21.059 [6] > Consuming: 19
19:26:21.160 [6] > Consuming: 20
19:26:21.260 [6] > Consuming: 21
19:26:21.361 [6] > Consuming: 22
19:26:21.461 [6] > Consuming: 23
19:26:21.562 [6] > Consuming: 24
19:26:21.662 [6] > Consuming: 25
19:26:21.763 [6] > Consuming: 26
19:26:21.863 [7] > Consuming: 27
19:26:21.964 [1] > Finished