Search code examples
goio

Go: Copy data from io.Reader to io.Writer implementations with sleep timeout, empty Writes


-edit-

This was not caused by typos and it is reproducible, contrary to the designation given by stackoverflow. The code provided was written specifically to be self-contained examples of this issue (the issue is right there in the code, all you have to do is run it to reproduce the issues...)

Furthermore, this is the exact question I had and the exact solution I needed that I did not find on StackOverflow prior to asking the question. The answer was derived from a conversation on IRC and I have forwarded the solution here for others like myself who may experience this same problem. This is very much a relevant Q/A and certainly can help future readers.

Thank you

-edit-

I am trying to copy data from an io.Reader implementation to an io.Writer implementation with a delay (time.Sleep) before the next iteration. Ideally I want to control the process (i.e. io.Copy is not ideal because I may want to perform some action between the Read and Write).

In any case, there are 4 approaches attempted in the code below, it should run in go.dev/play. I was unable to get any of the approaches to Write anything but an empty string, though all of the approaches to Write did report the correct number of written bytes (the same as reported by Read). I am probably missing something fundamental, but any help/explanation is greatly appreciated as I am quite confused.

The following code contains comments and functions for 4 different approaches to copying and logging data from custom implementations of io.Reader to an io.Writer with a specified delay (time.Sleep) before the next iteration. You may comment/uncomment the desired function below to observe the results.

package main

import (
    "bufio"
    "fmt"
    "io"
    "log"
    "math/rand"
    "time"
)

// READ
type MyReader struct{}

func (r *MyReader) Read(p []byte) (read int, err error) {

    someString := fmt.Sprintf("testing-%d", rand.Int())

    p = []byte(someString)
    read = len(p)

    log.Printf("io read %d bytes: %s\n", read, string(p))

    return read, io.EOF
}

// WRITE
type MyWriter struct{}

func (w *MyWriter) Write(p []byte) (wrote int, err error) {

    wrote = len(p)
    log.Printf("io wrote %d bytes: %s\n", wrote, string(p))

    return
}

// MAIN
func main() {

    // The following contains comments and functions for 4 different approaches to copying and logging data from
    // custom implementations of io.Reader to an io.Writer with a specified delay (time.Sleep) before the next
    // iteration. You may comment/uncomment the desired function below to observe the results.

    // AutoIoCopy - Approach 1) io.Copy
    //
    // Expected is to read and log the correct value (generated in the method)
    // then write the value (output another log)
    //
    // Actual is that the bufio.Write method is called and the MyWriter.Write method of the io.Writer implementation
    // is executed, but the output logged by MyWriter.Write is empty instead of the expected string reported by the
    // MyReader.Read log.
    //
    AutoIoCopy()

    // ReadBytesApproach - Approach 2) Using ReadBytes('\n')
    //
    // Expected is to read and log the correct value (generated in the method)
    // then write the value (output another log)
    //
    // Actual is that the bufio.Write method is called and reports written bytes, but the Write method of MyWriter
    // io.Writer implementation is never executed, it is skipped and another Read occurs
    //
    //ReadBytesApproach()

    // ReadLineApproach - Approach 3) Using ReadLine()
    //
    // Expected is to read and log the correct value (generated in the method)
    // then write the value (output another log)
    //
    // Actual is that the bufio.Write method is called and reports written bytes, but the Write method of MyWriter
    // io.Writer implementation is never executed, it is skipped and another Read occurs
    //
    //ReadLineApproach()

    // WriteToApproach - Approach 4) Using WriteTo()
    //
    // Expected is to read and log the correct value (generated in the method)
    // then write the value (output another log)
    //
    // Actual is that the bufio.Write method is called and reports written bytes, but the Write method of MyWriter
    // io.Writer implementation is never executed, it is skipped and another Read occurs
    //
    //WriteToApproach()

}

// Approaches:

// AutoIoCopy - Approach 1) io.Copy
//
// Expected is to read and log the correct value (generated in the method)
// then write the value (output another log)
//
// Actual is that the bufio.Write method is called and the MyWriter.Write method of the io.Writer implementation
// is executed, but the output logged by MyWriter.Write is empty instead of the expected string reported by the
// MyReader.Read log.
//
func AutoIoCopy() {

    reader := MyReader{}
    writer := MyWriter{}

    for {
        _, _ = io.Copy(&writer, &reader)
        time.Sleep(1000 * time.Millisecond)
    }
}

// ReadBytesApproach - Approach 2) Using ReadBytes('\n')
//
// Expected is to read and log the correct value (generated in the method)
// then write the value (output another log)
//
// Actual is that the bufio.Write method is called but the Write method of MyWriter io.Writer implementation
// is never executed, it is skipped and another Read occurs
//
func ReadBytesApproach() {

    reader := MyReader{}
    writer := MyWriter{}

    bufRead := bufio.NewReader(&reader)
    bufWrite := bufio.NewWriter(&writer)

    for {

        // Using ReadBytes('\n')
        readBytes, err := bufRead.ReadBytes('\n')

        if err != nil {

            switch err {
            case io.EOF:

                log.Printf("io.EOF detected\n")
                wrote, err := bufWrite.Write(readBytes)
                if err != nil {
                    log.Printf("error writing: %s\n", err)
                }
                convertedValue := string(readBytes)
                log.Printf("bufio wrote %d bytes: %s\n", wrote, convertedValue)
                break

            default:
                log.Printf("bufio error reading: %s\n", err.Error())
                break
            }

        } else {
            log.Printf("no error, continue to read\n")
        }

        time.Sleep(1000 * time.Millisecond)
    }
}

// ReadLineApproach - Approach 3) Using ReadLine()
//
// Expected is to read and log the correct value (generated in the method)
// then write the value (output another log)
//
// Actual is that the bufio.Write method is called but the Write method of MyWriter io.Writer implementation
// is never executed, it is skipped and another Read occurs
//
func ReadLineApproach() {

    reader := MyReader{}
    writer := MyWriter{}

    bufRead := bufio.NewReader(&reader)
    bufWrite := bufio.NewWriter(&writer)

    for {

        // Using ReadLine()
        readBytes, _, err := bufRead.ReadLine()

        if err != nil {

            switch err {
            case io.EOF:

                log.Printf("io.EOF detected\n")
                wrote, err := bufWrite.Write(readBytes)
                if err != nil {
                    log.Printf("error writing: %s\n", err)
                }
                convertedValue := string(readBytes)
                log.Printf("bufio wrote %d bytes: %s\n", wrote, convertedValue)
                break

            default:
                log.Printf("bufio error reading: %s\n", err.Error())
                break
            }

        } else {
            log.Printf("no error, continue to read\n")
        }

        time.Sleep(1000 * time.Millisecond)
    }

}

// WriteToApproach - Approach 4) Using WriteTo()
//
// Expected is to read and log the correct value (generated in the method)
// then write the value (output another log)
//
// Actual is that neither the bufio.Write or the Write method of MyWriter io.Writer implementation is executed,
// it is skipped and another Read occurs
//
func WriteToApproach() {

    reader := MyReader{}
    writer := MyWriter{}

    bufRead := bufio.NewReader(&reader)
    bufWrite := bufio.NewWriter(&writer)

    for {
        wrote, _ := bufRead.WriteTo(bufWrite)
        log.Printf("bufio wrote %d bytes\n", wrote)
        time.Sleep(1000 * time.Millisecond)
    }
}

Solution

  • The issue was in the MyReader.Read method

    The line

    p = []byte(someString)
    

    Should be

    read = copy(p, someString)