Search code examples
gogoroutine

Bufio Scanner Goroutine - truncated/unordered output


I'm writing a program that reads stderr from a CLI command and streams the stderr logs via a gRPC stream to the client.

The cmd is instantiated as below (the CLI command requires configuration which I pass as stdin):

ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Duration(t)*time.Second)

defer cancel()

cmd := exec.CommandContext(ctxTimeout, "java",
    "-jar", "/opt/myapp/myapp.jar", "scan", "-config", "-",
)

cmd.Stdin = config

I'm using two separate buffers: one to stream the stderr "live" to a client, the other to persist the logs in a DB. For that, I use io.MultiWriter and map it to the cmd stdin:

bufStream := bytes.NewBuffer(make([]byte, 0, 4096))

bufPersist := new(bytes.Buffer)


stderr := io.MultiWriter(bufStream, bufPersist)

// Map the command Standard Error Output to the multiwriter
cmd.Stderr = stderr

Finally, before launching the command I have a goroutine below which uses bufio.Scanner in order to read the stderr buffer and stream line by line via gRPC:

// Go Routine to stream the scan job logs
go func() {
    for {
        select {
        case <-done:
            return
        default:
            scanner := bufio.NewScanner(bufStream)
            for scanner.Scan() {
                time.Sleep(3 * time.Second)
                logging.MyAppLog("warning", "%v", scanner.Text())
                _ = stream.Send(&agentpb.ScanResultsResponse{
                    ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{ScanLogs: scanner.Bytes()},
                },
                )
            }

        }
    }

}()
err := cmd.Run()

done <- true

My issue is I have to use time.sleep(time.Seconds * 3) in the goroutine in order to get the right output. If not, I'm getting the output in incorrect order and truncated.

I believe it is due to the io.multiwriter and bufio.scanner not "in sync" but I would like some guidance on the best approach.

Thanks in advance.


Solution

  • From the Scanner docs:

    Bytes returns the most recent token generated by a call to Scan. The underlying array may point to data that will be overwritten by a subsequent call to Scan. It does no allocation.

    gRPC does its own buffering. This means that when Send returns, the bytes have not necessarily been written to the wire yet, and the next Scan call modifies the yet-to-be-written bytes.

    Copy the bytes returned by Scan and you should be fine:

    for scanner.Scan() {
        b := append([]byte(nil), scanner.Bytes()...)
        stream.Send(&agentpb.ScanResultsResponse{
            ScanLogsWebsocket: &agentpb.ScanLogFileResponseWB{
                ScanLogs: b,
            },
        })
    }