diff --git a/pkg/supervisor/logwriter.go b/pkg/supervisor/logwriter.go new file mode 100644 index 000000000000..0e91780a9baa --- /dev/null +++ b/pkg/supervisor/logwriter.go @@ -0,0 +1,98 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package supervisor + +import ( + "bytes" + "unicode/utf8" + + "github.com/sirupsen/logrus" +) + +// logWriter implements [io.Writer] by forwarding whole lines to log. In case +// the lines get too long, it logs them in multiple chunks. +// +// This is in contrast to logrus's implementation of io.Writer, which simply +// errors out if the log line gets longer than 64k. +type logWriter struct { + log logrus.FieldLogger // receives (possibly chunked) log lines + buf []byte // buffer in which to accumulate chunks; len(buf) determines the chunk length + len int // current buffer length + chunkNo uint // current chunk number; 0 means "no chunk" +} + +// Write implements [io.Writer]. +func (w *logWriter) Write(in []byte) (int, error) { + w.writeBytes(in) + return len(in), nil +} + +func (w *logWriter) writeBytes(in []byte) { + // Fill and drain buffer with available data until everything has been consumed. + for rest := in; len(rest) > 0; { + + n := copy(w.buf[w.len:], rest) // fill buffer with new input data + rest = rest[n:] // strip copied input data + w.len += n // increase buffer length accordingly + + // Loop over buffer as long as there are newlines in it + for off := 0; ; { + idx := bytes.IndexRune(w.buf[off:w.len], '\n') + + // Discard already logged chunks and break if no newline left + if idx < 0 { + if off > 0 { + w.len = copy(w.buf, w.buf[off:w.len]) + } + break + } + + // Strip trailing carriage returns + line := bytes.TrimRight(w.buf[off:off+idx], "\r") + + if w.chunkNo == 0 { + w.log.Infof("%s", line) + } else { + if len(line) > 0 { + w.log.WithField("chunk", w.chunkNo+1).Infof("%s", line) + } + w.chunkNo = 0 + } + + off += idx + 1 // advance read offset behind the newline + } + + // Issue a chunked log entry in case the buffer is full + if w.len == len(w.buf) { + // Try to chunk at UTF-8 rune boundaries + len := w.len + for i := 0; i < utf8.MaxRune && i < w.len; i++ { + if r, _ := utf8.DecodeLastRune(w.buf[:w.len-i]); r != utf8.RuneError { + len = len - i + break + } + } + + // Strip trailing carriage returns + line := bytes.TrimRight(w.buf[:len], "\r") + + w.log.WithField("chunk", w.chunkNo+1).Infof("%s", line) + w.chunkNo++ // increase chunk number + w.len = copy(w.buf, w.buf[len:]) // discard logged bytes + } + } +} diff --git a/pkg/supervisor/logwriter_test.go b/pkg/supervisor/logwriter_test.go new file mode 100644 index 000000000000..4e96f0470bf7 --- /dev/null +++ b/pkg/supervisor/logwriter_test.go @@ -0,0 +1,108 @@ +/* +Copyright 2022 k0s authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package supervisor + +import ( + "testing" + + logtest "github.com/sirupsen/logrus/hooks/test" + "github.com/stretchr/testify/assert" +) + +func TestLogWriter(t *testing.T) { + type entry struct { + chunk uint + msg string + } + + for _, test := range []struct { + name string + bufSize int + in []string + out []entry + }{ + {"empty_write", 3, + []string{""}, + []entry{}}, + {"single_line", 3, + []string{"ab\n"}, + []entry{{0, "ab"}}}, + {"exact_lines", 3, + []string{"abc\n", "def\n"}, + []entry{{1, "abc"}, {1, "def"}}}, + {"multi_line", 3, + []string{"ab\ncd\n"}, + []entry{{0, "ab"}, {0, "cd"}}}, + {"overlong_lines", 3, + []string{"abcd\nef\n"}, + []entry{{1, "abc"}, {2, "d"}, {0, "ef"}}}, + {"overlong_lines_2", 3, + []string{"abcd\ne", "f", "\n"}, + []entry{{1, "abc"}, {2, "d"}, {0, "ef"}}}, + {"unterminated_consecutive_writes_4", 3, + []string{"ab", "cd"}, + []entry{{1, "abc"}}}, + {"unterminated_consecutive_writes_6", 3, + []string{"ab", "cd", "ef"}, + []entry{{1, "abc"}, {2, "def"}}}, + {"unterminated_consecutive_writes_8", 3, + []string{"ab", "cd", "ef", "gh"}, + []entry{{1, "abc"}, {2, "def"}}}, + {"unterminated_consecutive_writes_10", 3, + []string{"ab", "cd", "ef", "gh", "ij"}, + []entry{{1, "abc"}, {2, "def"}, {3, "ghi"}}}, + {"long_buffer_short_lines", 16, + []string{"a\nb\nc\n"}, + []entry{{0, "a"}, {0, "b"}, {0, "c"}}}, + {"utf8", 26, // would split after the third byte of 🫣 + []string{"this is four bytes: >>>🫣\n<<<\n"}, + []entry{{1, "this is four bytes: >>>"}, {2, "🫣"}, {0, "<<<"}}}, + {"strips_carriage_returns", 5, + []string{"abc\r\ndef\r\n"}, + []entry{{0, "abc"}, {0, "def"}}}, + } { + t.Run(test.name, func(t *testing.T) { + log, logs := logtest.NewNullLogger() + underTest := logWriter{log: log, buf: make([]byte, test.bufSize)} + + for _, line := range test.in { + underTest.writeBytes([]byte(line)) + } + + remaining := logs.AllEntries() + + for i, line := range test.out { + if !assert.NotEmpty(t, remaining, "Expected additional log entry: %s", line) { + continue + } + + chunk, isChunk := remaining[0].Data["chunk"] + assert.Equal(t, line.chunk != 0, isChunk, "Log entry %d chunk mismatch", i) + if isChunk { + assert.Equal(t, line.chunk, chunk, "Log entry %d differs in chunk", i) + } + + assert.Equal(t, line.msg, remaining[0].Message, "Log entry %d differs in message", i) + remaining = remaining[1:] + } + + for _, entry := range remaining { + assert.Fail(t, "Unexpected log entry", "%s", entry.Message) + } + }) + } +} diff --git a/pkg/supervisor/supervisor.go b/pkg/supervisor/supervisor.go index 281201720963..aff432045560 100644 --- a/pkg/supervisor/supervisor.go +++ b/pkg/supervisor/supervisor.go @@ -59,7 +59,7 @@ type Supervisor struct { cmd *exec.Cmd done chan bool - log *logrus.Entry + log logrus.FieldLogger mutex sync.Mutex startStopMutex sync.Mutex cancel context.CancelFunc @@ -182,8 +182,15 @@ func (s *Supervisor) Supervise() error { // get signals sent directly to parent. s.cmd.SysProcAttr = DetachAttr(s.UID, s.GID) - s.cmd.Stdout = s.log.Writer() - s.cmd.Stderr = s.log.Writer() + const maxLogChunkLen = 16 * 1024 + s.cmd.Stdout = &logWriter{ + log: s.log.WithField("stream", "stdout"), + buf: make([]byte, maxLogChunkLen), + } + s.cmd.Stderr = &logWriter{ + log: s.log.WithField("stream", "stderr"), + buf: make([]byte, maxLogChunkLen), + } err = s.cmd.Start() }