Search code examples
c#multithreadingasynchronoustask-parallel-libraryproducer-consumer

How to expose a class implementing producer consumer pipeline to client code in an API?


I'm a bit confused on how to expose a class that implements a producer consumer pipeline to client code.

Let's say I have two classes that represents the producer and consumer, like this:

public class Consumer
{
  ...

  public void Cosume();

  ...
}

Then the producer:

public class Producer
{
  ...

  public void Produce();

  ...
}

The there is a third class that orchestrate the producer and the consumer (and here comes the problem, by the way)

public class ProducerConsumer
{
  ...

  private Producer producer;
  private Consumer consumer;

  ...

  public void Start()
  {
     ...
  }
}

How should I implement start?

I was thinking to call producer.Produce() and consumer.Consume() wrapped in Task.Run and the await for them to complete, like so:

public async Task Start()
{
     await Task.WhenAll(Task.Run(() => producer.Produce(), Task.Run(() => consumer.Consume());
}

But I've read that having Task.Run() in the implementation is not a very good practice, so I've discarded it.

I've also thought of Parallel.Invoke(), and leave the responsibility to offload to a different Thread the blocking wait of Parallel.Invoke() to the client code, something like this:

public void Start()
{
      Parallel.Invoke(() => producer.Produce(), () => consumer.Consume());
}

And the the client code would do something like:

public async void ButtonHandler(object sender, RoutedEventArgs e)
{
   await Task.Run(() => producerConsumer.Start());
}

But offloading to a different Thread just to wait two other thread to finish looked a bit strange to me because I'm wasting a Thread just to wait something.

After reading other questions on StackOverflow many suggested to leave the responsibility on how to call the parallel code to the calling code, so I thought to expose the Producer and the Consumer from the ProducerConsumer class and let the client code call Producer.Produce() and Consumer.Consume() the way it wants, more or less like:

public async void ButtonHandler(object sender, RoutedEventArgs e)
{
   Task producerTask = Task.Run(() => producerConsumer.Producer.Produce());
   Task consumerTask = Task.Run(() => producerConsumer.Consumer.Consumer());

   await Task.WhenAll(producerTask, consumerTask);
}

But this last implementation looks awkward to me because the caller code is responsible to call Produce() and Consume() which could lead to errors if one of the two method is omitted.

I know about TPL.DataFlow to implement a producer consumer pipeline but I can't add external dependencies to the library.

So how should I write the Start() method?

And more in general: how should I expose code that is parallel by its nature to library client code?


Solution

  • If you have an async-compatible producer/consumer queue (e.g., BufferBlock<T> from TPL Dataflow), and your producer is throttled (e.g., the queue has a reasonable maximum number of elements, or the producer's data comes from some I/O operation), then you can make your producer and consumer async and call them directly like this:

    public async Task ExecuteAsync()
    {
      await Task.WhenAll(producer.ProduceAsync(), consumer.ConsumeAsync())
          .ConfigureAwait(false);
    }
    

    Otherwise, you'll have to "give" somewhere. If your producer and consumer are synchronous, then your containing class - by definition - controls two threads. And in that case it's fine to use Task.Run:

    public async Task ExecuteAsync()
    {
      await Task.WhenAll(Task.Run(() => producer.Produce()), Task.Run(() => consumer.Consume()))
          .ConfigureAwait(false);
    }