I want to make use of concurrency in Go to send data to a goroutine for processing and computation using channels. The data points come one after another in a function which can be main function or some sendData
function. If possible, I would like to have the sending of the data to be done from main function.
I want to send the data from the send function to a goroutine, where data is stored in a slice (lets call this goroutine getData
). Certain computations are done on this slice. After a certain condition is reached (which depends on the slice), I want the goroutine to signal the sendData
function that processing is complete for a certain batch of data points. And now, the sendData
function keeps sending data points through the channel to getData
goroutine where a new slice keeps getting built and when the condition is reached the signal is sent - that processing is complete and the entire sequence keeps getting repeated.
As an example, lets imagine that data in the form of numbers is being sent from sendData
to getData
. The condition is that the running mean of numbers received by getData
should be equal to 4. Lets take the following sequence of numbers as our data - []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
. Here, the first batch of numbers would be {3, 2, 3, 8}
because after these numbers are sent in this sequence to getData
, it finds that the running mean of numbers is equal to 4 after the number 8 is received by getData
. Then it sends a signal to sendData
. And the process of sending data starts again in sequence of numbers with the next batch being {2, 1, 1, 1, 15}
. Here, after the number 15
is received by getData
it finds that the running mean is equal to 4 an sigal is sent again to sendData
. ( This is a very basic example - in my real use case, the input data and the condition are more complex. I have data which will be read live in sendData
. Here each data point is read sequentially but each data point arrives at a few microseconds after the previous one. As such the arrival of data is fast paced here, and I don't want to do too much of processing and calculations in this function. Furthermore, I want keep concurrency intact, because in the function where data is being read, data is arriving at a fast rate. And, I don't want the reading of data here to be missed because of the processing of data in the goroutine where processing is done.)
Here is how I have tried to structure the code:
package main
import (
"fmt"
)
func main() {
ch := make(chan int)
go sendData(ch)
go getData(ch)
}
func sendData(ch chan int) {
syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
for _, data := range syntheticData {
ch <- data
}
}
func getData(ch chan int) {
dataArr := []int{}
dataArr = append( dataArr, <-ch )
fmt.Println(dataArr)
if mean(dataArr) == 4{
close(ch)
}
}
func sum(array []int) int {
var result int = 0
for _, v := range array {
result += v
}
return result
}
func mean(array []int) float64 {
sumArr := float64(sum(array)) / float64(len(array))
return sumArr
}
I didn't achieve the functionality that I wanted to with the above code. How can I achieve the desired functionality in Go?
You only need one extera receiving goroutine e.g. getData
and then the main
goroutine will send the data as it arrives using a channel named ch
, and you need one buffered channel for signalling e.g. batchCompleted
, and a WaitGroup
to wait for the getData
synchronization, when it is done.
That is all, try it:
package main
import (
"fmt"
"sync"
)
func main() {
wg := &sync.WaitGroup{}
ch := make(chan int)
batchCompleted := make(chan struct{}, 1) // non-blocking signaling channel
wg.Add(1)
go getData(ch, batchCompleted, wg)
syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
i := 0
check := func() {
select {
case <-batchCompleted:
i++
fmt.Println(i, " batch completed")
default:
}
}
for _, data := range syntheticData {
ch <- data
check()
}
close(ch)
wg.Wait()
check()
}
func getData(ch chan int, batchCompleted chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
a := []int{}
sum, n := 0, 0
for v := range ch {
sum += v
n++
a = append(a, v)
if sum == 4*n {
batchCompleted <- struct{}{}
fmt.Println(a)
sum, n = 0, 0
a = a[:0]
}
}
if len(a) > 0 {
fmt.Println("remaining data:", a)
}
}
Output:
[3 2 3 8]
1 batch completed
[2 1 1 1 15]
2 batch completed