I'm using the golang kubernetes client to create kubernetes pods and execute remote commands in them. However, I'm finding that I can't get feedback on the status of the remote execution until it's finished because I can't figure out how to stream the logs of the remote command. Here's my current implementation for executing a remote command:
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var stdout, stderr bytes.Buffer
var streamErr error
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: nil,
Stdout: &stdout,
Stderr: &stderr,
Tty: false,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return stderr.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return stdout.String(), 0, nil
}
In this implementation I don't get to know the state of the remote command until it's finished executing at which point I get all of the output logs at once.
Is there a way to read stdout
/stderr
while they're being written by the call to exec.Stream
? In an ideal world I'd like to be able to print the output of the remote command line by line. I noticed that bytes.Buffer
has a ReadString
method which accepts a delimiter. That looks like a useful method but I haven't been able to figure out how to use it.
This is only a partial answer but if I set use the following PodExecOptions
and StreamOptions
then I see each log line get printed in real time (note that Tty
is true
and I'm using stdin and stdout, not custom buffers):
v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}
and
remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: os.Stdout,
Stderr: nil,
Tty: true,
}
However, if I try to use something other than os.Stdin
and os.Stdout
then I never get any log lines. For example, the following usage doesn't print anything:
var stdout, stdin bytes.Buffer
var streamErr error
go func() {
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
Stderr: nil,
Tty: true,
})
}()
time.Sleep(5*time.Second)
log.Info("doing raw string calls on both buffers")
log.Info(stdin.String())
log.Info(stdout.String())
log.Info("starting scan of stdin")
scanner := bufio.NewScanner(&stdin)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("starting scan of stdout")
scanner = bufio.NewScanner(&stdout)
scanner.Split(bufio.ScanLines)
for scanner.Scan() {
m := scanner.Text()
fmt.Println(m)
}
log.Info("finished scanning of stdout")
I'm still trying to figure out how to use custom buffers so I can manage what's written to my logs instead of piping directly to stdout (I want to attach some custom fields to each line that gets logged).
EDIT: alright, I figured out a solution that works. Here's the full code
type LogStreamer struct{
b bytes.Buffer
}
func (l *LogStreamer) String() string {
return l.b.String()
}
func (l *LogStreamer) Write(p []byte) (n int, err error) {
a := strings.TrimSpace(string(p))
l.b.WriteString(a)
log.Info(a)
return len(p), nil
}
func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
scheme := runtime.NewScheme()
if err := v1.AddToScheme(scheme); err != nil {
return "", 0, fmt.Errorf("could not add to scheme: %w", err)
}
parameterCodec := runtime.NewParameterCodec(scheme)
req.VersionedParams(&v1.PodExecOptions{
Stdin: true,
Stdout: true,
Stderr: false,
TTY: true,
Container: args.ContainerId,
Command: []string{"sh", "-c", args.Command},
}, parameterCodec)
exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
if err != nil {
return "", 0, fmt.Errorf("could not exec command: %w", err)
}
var streamErr error
l := &LogStreamer{}
streamErr = exec.Stream(remotecommand.StreamOptions{
Stdin: os.Stdin,
Stdout: l,
Stderr: nil,
Tty: true,
})
if streamErr != nil {
if strings.Contains(streamErr.Error(), "command terminated with exit code") {
return l.String(), 1, nil
} else {
return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
}
}
return l.String(), 0, nil
}
I created a struct which implements the io.Writer
interface and use that in the StreamOptions
struct. Also note that I had to use os.Stdin
in the StreamOptions
struct or else only a single line would be streamed back for Stdout
.
Also note that I had to trim the buffer passed to LogStreamer.Write
because it seems that carriage returns or newlines cause problems with the logrus package. There's still more polish to add to this solution but it's definitely headed in the right direction.