Search code examples
goqueuechannelpool

How to consume a unbuffered go channel in a "pooling" way


To illustrate better my question, here is an example of the code with comments:

(Obs: I'm using golang v1.20.5)

// start to consume a queue
// deliveries is an unbuffered receiver go channel of the type <-chan Message
deliveries, err := qb.pubSubSubscriber.Consume(ctx, qName)
if err != nil {
    return err
}

// infinite loop to consume the messages
for msg := range deliveries {
    // and for every msg I execute a function
    result := myFunc()
}

The idea here is to consume the messages like a pool of n workers that would get a message if one the worker is free.

To be more clear, the example bellow is not a valid solution:

// for the workerPool is this situation i would use the
// tunny worker pool go package
workerPool := newWorkerPoolWithNWorkers()
for msg := range deliveries {
    go func(){
         result:=workerPool(myFunc)
    }()   
}

This is not valid, because the way I see, what this code does is to fetch every message at once and let the workerPool do its job with n workers at a time, but the question is, how get a new message for every "free" worker in an infinite loop?

So lets say we have a queue with 100 messages, the wanted solution is to fetch 3 messages at first, but when one of the fetched messages was processed, the code get another new message in and infinite for loop.

I was trying to do something like

wg := new(sync.WaitGroup)
counter := 0 
for msg := range deliveries {
    wg.Wait()
    go func(){
         counter ++
         if counter == n { // n could be any integer number wanted to limit the pool size
             //this way a new message would be at wg.Wait() if all n goroutines are busy
             wg.Add(1)
         }
         result:= myFunc()
         count--
         wg.Done()// one of the N "workers" is free, so we can ask for one more message
    }()   
}

But it seems too complicated, and i dont think it works.

If someone could help me I'll be very grateful!


Solution

  • I think you are overthinking this a bit. To consume messages from a channel using a worker pool, you can:

    for i:=0;i<nWorkers;i++ {
       go func() {
          for msg:=range deliveries {
             myFunc(msg)
          }
       }()
    }
    

    In other words, you create n goroutines all listening from the same channel. The runtime deals with scheduling which goroutine receives it. When the deliveries channel is closed, all workers terminate.