So I am very new to go! But I had this idea about something I wanted to try.
I would like to have a go routine that accepts strings from a channel but only after it has received N strings should it execute on them.
I looked around for similar questions or cases but I only found ones where the idea was to execute several routines in parallel and wait to aggregate the result.
I though about the idea of creating an array and just pass it to a routine where the length was sufficient. However I want to keep a certain separation of concerns and control this on the receiving end.
My questions are.
Is there a better way to do this, what is it?
func main() {
ch := make(chan string)
go func() {
tasks := []string{}
for {
tasks = append(tasks,<- ch)
if len(tasks) < 3 {
fmt.Println("Queue still to small")
}
if len(tasks) > 3 {
for i := 0; i < len(tasks); i++ {
fmt.Println(tasks[i])
}
}
}
}()
ch <- "Msg 1"
time.Sleep(time.Second)
ch <- "Msg 2"
time.Sleep(time.Second)
ch <- "Msg 3"
time.Sleep(time.Second)
ch <- "Msg 4"
time.Sleep(time.Second)
}
Edit for simpler more accurate example.
Based on a few comments, it looks like what you are looking for is some form of batching.
Batching has a few scenarios when you would want to take the batch and send it along:
Your given example does not account for the second scenario. This can lead to some awkward behavior if you just never flush because you quit getting load.
Therefore I would recommend either looking into a library (e.g., cloudfoundry/go-batching) or simply use channels, a Timer and a select statement.
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan string)
go func() {
tasks := []string{}
timer := time.NewTimer(time.Second) // Adjust this based on a reasonable user experience
for {
select {
case <-timer.C:
fmt.Println("Flush partial batch due to time")
flush(tasks)
tasks = nil
timer.Reset(time.Second)
case data := <-ch:
tasks = append(tasks, data)
// Reset the timer for each data point so that we only flush
// partial batches when we stop receiving data.
if !timer.Stop() {
<-timer.C
}
timer.Reset(time.Second)
// Guard clause to for batch size
if len(tasks) < 3 {
fmt.Println("Queue still too small")
continue
}
flush(tasks)
tasks = nil // reset tasks
}
}
}()
ch <- "Msg 1"
time.Sleep(time.Second)
ch <- "Msg 2"
time.Sleep(time.Second)
ch <- "Msg 3"
time.Sleep(time.Second)
ch <- "Msg 4"
time.Sleep(time.Second)
}
func flush(tasks []string) {
// Guard against emtpy flushes
if len(tasks) == 0 {
return
}
fmt.Println("Flush")
for _, t := range tasks {
fmt.Println(t)
}
}