Search code examples
gokuberneteskubernetes-go-client

Stream output from remote kubernetes command with golang client


I'm using the golang kubernetes client to create kubernetes pods and execute remote commands in them. However, I'm finding that I can't get feedback on the status of the remote execution until it's finished because I can't figure out how to stream the logs of the remote command. Here's my current implementation for executing a remote command:

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
    req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
    scheme := runtime.NewScheme()
    if err := v1.AddToScheme(scheme); err != nil {
        return "", 0, fmt.Errorf("could not add to scheme: %w", err)
    }
    parameterCodec := runtime.NewParameterCodec(scheme)
    req.VersionedParams(&v1.PodExecOptions{
        Stdin:     false,
        Stdout:    true,
        Stderr:    true,
        TTY:       false,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }, parameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
    if err != nil {
        return "", 0, fmt.Errorf("could not exec command: %w", err)
    }
    var stdout, stderr bytes.Buffer
    var streamErr error
    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  nil,
        Stdout: &stdout,
        Stderr: &stderr,
        Tty:    false,
    })

    if streamErr != nil {
        if strings.Contains(streamErr.Error(), "command terminated with exit code") {
            return stderr.String(), 1, nil
        } else {
            return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
        }
    }
    return stdout.String(), 0, nil
}

In this implementation I don't get to know the state of the remote command until it's finished executing at which point I get all of the output logs at once.

Is there a way to read stdout/stderr while they're being written by the call to exec.Stream? In an ideal world I'd like to be able to print the output of the remote command line by line. I noticed that bytes.Buffer has a ReadString method which accepts a delimiter. That looks like a useful method but I haven't been able to figure out how to use it.


Solution

  • This is only a partial answer but if I set use the following PodExecOptions and StreamOptions then I see each log line get printed in real time (note that Tty is true and I'm using stdin and stdout, not custom buffers):

    v1.PodExecOptions{
            Stdin:     true,
            Stdout:    true,
            Stderr:    false,
            TTY:       true,
            Container: args.ContainerId,
            Command:   []string{"sh", "-c", args.Command},
        }
    

    and

    remotecommand.StreamOptions{
                Stdin:  os.Stdin,
                Stdout: os.Stdout,
                Stderr: nil,
                Tty:    true,
            }
    

    However, if I try to use something other than os.Stdin and os.Stdout then I never get any log lines. For example, the following usage doesn't print anything:

        var stdout, stdin bytes.Buffer
        var streamErr error
    
        go func() {
            streamErr = exec.Stream(remotecommand.StreamOptions{
                Stdin:  &stdin,
                Stdout: &stdout,
                Stderr: nil,
                Tty:    true,
            })
        }()
        time.Sleep(5*time.Second)
        log.Info("doing raw string calls on both buffers")
        log.Info(stdin.String())
        log.Info(stdout.String())
        log.Info("starting scan of stdin")
        scanner := bufio.NewScanner(&stdin)
        scanner.Split(bufio.ScanLines)
        for scanner.Scan() {
            m := scanner.Text()
            fmt.Println(m)
        }
        log.Info("starting scan of stdout")
        scanner = bufio.NewScanner(&stdout)
        scanner.Split(bufio.ScanLines)
        for scanner.Scan() {
            m := scanner.Text()
            fmt.Println(m)
        }
        log.Info("finished scanning of stdout")
    

    I'm still trying to figure out how to use custom buffers so I can manage what's written to my logs instead of piping directly to stdout (I want to attach some custom fields to each line that gets logged).


    EDIT: alright, I figured out a solution that works. Here's the full code

    type LogStreamer struct{
        b bytes.Buffer
    }
    
    func (l *LogStreamer) String() string {
        return l.b.String()
    }
    
    func (l *LogStreamer) Write(p []byte) (n int, err error) {
        a := strings.TrimSpace(string(p))
        l.b.WriteString(a)
        log.Info(a)
        return len(p), nil
    }
    
    func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
        req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
        scheme := runtime.NewScheme()
        if err := v1.AddToScheme(scheme); err != nil {
            return "", 0, fmt.Errorf("could not add to scheme: %w", err)
        }
        parameterCodec := runtime.NewParameterCodec(scheme)
        req.VersionedParams(&v1.PodExecOptions{
            Stdin:     true,
            Stdout:    true,
            Stderr:    false,
            TTY:       true,
            Container: args.ContainerId,
            Command:   []string{"sh", "-c", args.Command},
        }, parameterCodec)
        exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
        if err != nil {
            return "", 0, fmt.Errorf("could not exec command: %w", err)
        }
        var streamErr error
        l := &LogStreamer{}
    
        streamErr = exec.Stream(remotecommand.StreamOptions{
            Stdin:  os.Stdin,
            Stdout: l,
            Stderr: nil,
            Tty:    true,
        })
    
        if streamErr != nil {
            if strings.Contains(streamErr.Error(), "command terminated with exit code") {
                return l.String(), 1, nil
            } else {
                return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
            }
        }
        return l.String(), 0, nil
    }
    

    I created a struct which implements the io.Writer interface and use that in the StreamOptions struct. Also note that I had to use os.Stdin in the StreamOptions struct or else only a single line would be streamed back for Stdout.

    Also note that I had to trim the buffer passed to LogStreamer.Write because it seems that carriage returns or newlines cause problems with the logrus package. There's still more polish to add to this solution but it's definitely headed in the right direction.