Search code examples
goconcurrencychannelgoroutine

How to receive data and send back signal using goroutines


I want to make use of concurrency in Go to send data to a goroutine for processing and computation using channels. The data points come one after another in a function which can be main function or some sendData function. If possible, I would like to have the sending of the data to be done from main function.

I want to send the data from the send function to a goroutine, where data is stored in a slice (lets call this goroutine getData). Certain computations are done on this slice. After a certain condition is reached (which depends on the slice), I want the goroutine to signal the sendData function that processing is complete for a certain batch of data points. And now, the sendData function keeps sending data points through the channel to getData goroutine where a new slice keeps getting built and when the condition is reached the signal is sent - that processing is complete and the entire sequence keeps getting repeated.

As an example, lets imagine that data in the form of numbers is being sent from sendData to getData. The condition is that the running mean of numbers received by getData should be equal to 4. Lets take the following sequence of numbers as our data - []int{3, 2, 3, 8, 2, 1, 1, 1, 15}. Here, the first batch of numbers would be {3, 2, 3, 8} because after these numbers are sent in this sequence to getData, it finds that the running mean of numbers is equal to 4 after the number 8 is received by getData. Then it sends a signal to sendData. And the process of sending data starts again in sequence of numbers with the next batch being {2, 1, 1, 1, 15}. Here, after the number 15 is received by getData it finds that the running mean is equal to 4 an sigal is sent again to sendData. ( This is a very basic example - in my real use case, the input data and the condition are more complex. I have data which will be read live in sendData. Here each data point is read sequentially but each data point arrives at a few microseconds after the previous one. As such the arrival of data is fast paced here, and I don't want to do too much of processing and calculations in this function. Furthermore, I want keep concurrency intact, because in the function where data is being read, data is arriving at a fast rate. And, I don't want the reading of data here to be missed because of the processing of data in the goroutine where processing is done.)

Here is how I have tried to structure the code:

package main

import (
    "fmt"
)

func main() {
    ch := make(chan int)
    go sendData(ch)
    go getData(ch)
}

func sendData(ch chan int) {
    syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
    for _, data := range syntheticData {
            ch <- data
        }
}

func getData(ch chan int) {
        dataArr := []int{}
        dataArr = append( dataArr, <-ch )
        fmt.Println(dataArr)

        if mean(dataArr) == 4{
          close(ch)
        }

}

func sum(array []int) int {
    var result int = 0
    for _, v := range array {
        result += v
    }

    return result
}


func mean(array []int) float64 {
    sumArr := float64(sum(array)) / float64(len(array))
    return sumArr
}

I didn't achieve the functionality that I wanted to with the above code. How can I achieve the desired functionality in Go?


Solution

  • You only need one extera receiving goroutine e.g. getData and then the main goroutine will send the data as it arrives using a channel named ch, and you need one buffered channel for signalling e.g. batchCompleted, and a WaitGroup to wait for the getData synchronization, when it is done.
    That is all, try it:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        wg := &sync.WaitGroup{}
        ch := make(chan int)
        batchCompleted := make(chan struct{}, 1) // non-blocking signaling channel
        wg.Add(1)
        go getData(ch, batchCompleted, wg)
    
        syntheticData := []int{3, 2, 3, 8, 2, 1, 1, 1, 15}
        i := 0
        check := func() {
            select {
            case <-batchCompleted:
                i++
                fmt.Println(i, " batch completed")
            default:
            }
        }
        for _, data := range syntheticData {
            ch <- data
            check()
        }
        close(ch)
        wg.Wait()
        check()
    }
    
    func getData(ch chan int, batchCompleted chan struct{}, wg *sync.WaitGroup) {
        defer wg.Done()
        a := []int{}
        sum, n := 0, 0
        for v := range ch {
            sum += v
            n++
            a = append(a, v)
            if sum == 4*n {
                batchCompleted <- struct{}{}
                fmt.Println(a)
                sum, n = 0, 0
                a = a[:0]
            }
        }
        if len(a) > 0 {
            fmt.Println("remaining data:", a)
        }
    }
    

    Output:

    [3 2 3 8]
    1  batch completed
    [2 1 1 1 15]
    2  batch completed