From 8b02a0c69d840609d3917836a16eaf722c6c0348 Mon Sep 17 00:00:00 2001 From: Varun Deep Saini Date: Tue, 25 Nov 2025 00:39:20 +0530 Subject: [PATCH 1/2] Fix spinner interference with debug logs --- cmd/pipelines/root/logger.go | 8 ++- cmd/root/logger.go | 8 ++- libs/cmdio/io.go | 97 ++++++++++++++++++++++++++++++++ libs/cmdio/io_test.go | 45 +++++++++++++++ libs/flags/log_file_flag.go | 15 +++-- libs/flags/log_file_flag_test.go | 14 ++--- 6 files changed, 171 insertions(+), 16 deletions(-) diff --git a/cmd/pipelines/root/logger.go b/cmd/pipelines/root/logger.go index 54957e8650..01324f861e 100644 --- a/cmd/pipelines/root/logger.go +++ b/cmd/pipelines/root/logger.go @@ -28,12 +28,16 @@ type logFlags struct { debug bool } -func (f *logFlags) makeLogHandler(opts slog.HandlerOptions) (slog.Handler, error) { +func (f *logFlags) makeLogHandler(ctx context.Context, opts slog.HandlerOptions) (slog.Handler, error) { switch f.output { case flags.OutputJSON: return slog.NewJSONHandler(f.file.Writer(), &opts), nil case flags.OutputText: w := f.file.Writer() + // If logging to stderr, use the coordinated writer to avoid interference with spinner + if f.file.String() == flags.LogFileStderr { + w = cmdio.CoordinatedWriter(ctx) + } return handler.NewFriendlyHandler(w, &handler.Options{ Color: cmdio.IsTTY(w), Level: opts.Level, @@ -66,7 +70,7 @@ func (f *logFlags) initializeContext(ctx context.Context) (context.Context, erro return nil, err } - handler, err := f.makeLogHandler(opts) + handler, err := f.makeLogHandler(ctx, opts) if err != nil { return nil, err } diff --git a/cmd/root/logger.go b/cmd/root/logger.go index 38e09b9c9f..121140a88b 100644 --- a/cmd/root/logger.go +++ b/cmd/root/logger.go @@ -27,12 +27,16 @@ type logFlags struct { debug bool } -func (f *logFlags) makeLogHandler(opts slog.HandlerOptions) (slog.Handler, error) { +func (f *logFlags) makeLogHandler(ctx context.Context, opts slog.HandlerOptions) (slog.Handler, error) { switch f.output { case flags.OutputJSON: return slog.NewJSONHandler(f.file.Writer(), &opts), nil case flags.OutputText: w := f.file.Writer() + // If logging to stderr, use the coordinated writer to avoid interference with spinner + if f.file.String() == flags.LogFileStderr { + w = cmdio.CoordinatedWriter(ctx) + } return handler.NewFriendlyHandler(w, &handler.Options{ Color: cmdio.IsTTY(w), Level: opts.Level, @@ -65,7 +69,7 @@ func (f *logFlags) initializeContext(ctx context.Context) (context.Context, erro return nil, err } - handler, err := f.makeLogHandler(opts) + handler, err := f.makeLogHandler(ctx, opts) if err != nil { return nil, err } diff --git a/libs/cmdio/io.go b/libs/cmdio/io.go index 356c6f9ac8..8317ab34b1 100644 --- a/libs/cmdio/io.go +++ b/libs/cmdio/io.go @@ -16,6 +16,89 @@ import ( "github.com/mattn/go-isatty" ) +// writeRequest represents a write operation to be performed by the coordinator. +type writeRequest struct { + data []byte + result chan writeResult +} + +type writeResult struct { + n int + err error +} + +type spinnerOp struct { + spinner *spinner.Spinner // nil means clear spinner +} + +// coordinatedWriter coordinates writes to stderr, ensuring that spinner +// updates and log messages don't interfere with each other. +// All operations are lock-free and performed via channels. +type coordinatedWriter struct { + underlying io.Writer + writeChan chan writeRequest + spinnerOp chan spinnerOp + stopChan chan struct{} + spinner *spinner.Spinner // only accessed by coordinator goroutine +} + +func newCoordinatedWriter(underlying io.Writer) *coordinatedWriter { + w := &coordinatedWriter{ + underlying: underlying, + writeChan: make(chan writeRequest, 10), + spinnerOp: make(chan spinnerOp, 1), + stopChan: make(chan struct{}), + } + go w.coordinator() + return w +} + +// coordinator runs in a goroutine and handles all writes sequentially. +func (w *coordinatedWriter) coordinator() { + for { + select { + case <-w.stopChan: + return + case op := <-w.spinnerOp: + w.spinner = op.spinner + case req := <-w.writeChan: + // If spinner is active, pause it before writing + if w.spinner != nil && w.spinner.Active() { + w.spinner.Stop() + n, err := w.underlying.Write(req.data) + w.spinner.Start() + req.result <- writeResult{n: n, err: err} + } else { + n, err := w.underlying.Write(req.data) + req.result <- writeResult{n: n, err: err} + } + } + } +} + +func (w *coordinatedWriter) Write(p []byte) (n int, err error) { + // Make a copy since p might be reused by the caller + data := make([]byte, len(p)) + copy(data, p) + + result := make(chan writeResult, 1) + w.writeChan <- writeRequest{data: data, result: result} + res := <-result + return res.n, res.err +} + +func (w *coordinatedWriter) setSpinner(sp *spinner.Spinner) { + w.spinnerOp <- spinnerOp{spinner: sp} +} + +func (w *coordinatedWriter) clearSpinner() { + w.spinnerOp <- spinnerOp{spinner: nil} +} + +func (w *coordinatedWriter) close() { + close(w.stopChan) +} + // cmdIO is the private instance, that is not supposed to be accessed // outside of `cmdio` package. Use the public package-level functions // to access the inner state. @@ -30,6 +113,7 @@ type cmdIO struct { in io.Reader out io.Writer err io.Writer + coordErr *coordinatedWriter } func NewIO(ctx context.Context, outputFormat flags.Output, in io.Reader, out, err io.Writer, headerTemplate, template string) *cmdIO { @@ -50,6 +134,8 @@ func NewIO(ctx context.Context, outputFormat flags.Output, in io.Reader, out, er // - not to be running in Git Bash on Windows prompt := interactive && IsTTY(in) && IsTTY(out) && !isGitBash(ctx) + coordErr := newCoordinatedWriter(err) + return &cmdIO{ interactive: interactive, prompt: prompt, @@ -59,6 +145,7 @@ func NewIO(ctx context.Context, outputFormat flags.Output, in io.Reader, out, er in: in, out: out, err: err, + coordErr: coordErr, } } @@ -196,11 +283,14 @@ func (c *cmdIO) Spinner(ctx context.Context) chan string { spinner.WithWriter(c.err), spinner.WithColor("green")) sp.Start() + // Register spinner with coordinated writer so it can pause during log writes + c.coordErr.setSpinner(sp) } updates := make(chan string) go func() { if c.interactive { defer sp.Stop() + defer c.coordErr.clearSpinner() } for { select { @@ -226,6 +316,13 @@ func Spinner(ctx context.Context) chan string { return c.Spinner(ctx) } +// CoordinatedWriter returns a writer that coordinates with the spinner +// to avoid interference between spinner updates and log messages. +func CoordinatedWriter(ctx context.Context) io.Writer { + c := fromContext(ctx) + return c.coordErr +} + type cmdIOType int var cmdIOKey cmdIOType diff --git a/libs/cmdio/io_test.go b/libs/cmdio/io_test.go index 9b2b073b28..afa8091b6a 100644 --- a/libs/cmdio/io_test.go +++ b/libs/cmdio/io_test.go @@ -1,11 +1,15 @@ package cmdio import ( + "bytes" "context" + "sync" "testing" + "time" "github.com/databricks/cli/libs/env" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestIsGitBash(t *testing.T) { @@ -17,3 +21,44 @@ func TestIsGitBash(t *testing.T) { ctx = env.Set(ctx, "PS1", "\\[\033]0;$TITLEPREFIX:$PWD\007\\]\n\\[\033[32m\\]\\u@\\h \\[\033[35m\\]$MSYSTEM \\[\033[33m\\]\\w\\[\033[36m\\]`__git_ps1`\\[\033[0m\\]\n$") assert.True(t, isGitBash(ctx)) } + +func TestCoordinatedWriter(t *testing.T) { + var buf bytes.Buffer + w := newCoordinatedWriter(&buf) + defer w.close() + + // Test simple write + n, err := w.Write([]byte("test message\n")) + require.NoError(t, err) + assert.Equal(t, 13, n) + + // Allow coordinator goroutine to process + time.Sleep(10 * time.Millisecond) + assert.Equal(t, "test message\n", buf.String()) +} + +func TestCoordinatedWriterConcurrent(t *testing.T) { + var buf bytes.Buffer + w := newCoordinatedWriter(&buf) + defer w.close() + + // Write concurrently from multiple goroutines + var wg sync.WaitGroup + for i := range 10 { + wg.Add(1) + go func(i int) { + defer wg.Done() + msg := []byte("message\n") + _, err := w.Write(msg) + require.NoError(t, err) + }(i) + } + + wg.Wait() + + // Allow coordinator goroutine to process all writes + time.Sleep(50 * time.Millisecond) + + // All messages should be written + assert.Equal(t, 10, bytes.Count(buf.Bytes(), []byte("message\n"))) +} diff --git a/libs/flags/log_file_flag.go b/libs/flags/log_file_flag.go index d2fe51d917..83fe3c0824 100644 --- a/libs/flags/log_file_flag.go +++ b/libs/flags/log_file_flag.go @@ -8,6 +8,11 @@ import ( "github.com/spf13/cobra" ) +const ( + LogFileStderr = "stderr" + LogFileStdout = "stdout" +) + // Abstract over files that are already open (e.g. stderr) and // files that need to be opened before use. type logFile interface { @@ -71,7 +76,7 @@ type LogFileFlag struct { func NewLogFileFlag() LogFileFlag { return LogFileFlag{ - name: "stderr", + name: LogFileStderr, logFile: &nopLogFile{os.Stderr}, } } @@ -83,10 +88,10 @@ func (f *LogFileFlag) String() string { func (f *LogFileFlag) Set(s string) error { lower := strings.ToLower(s) switch lower { - case "stderr": + case LogFileStderr: f.name = lower f.logFile = &nopLogFile{os.Stderr} - case "stdout": + case LogFileStdout: f.name = lower f.logFile = &nopLogFile{os.Stdout} default: @@ -104,7 +109,7 @@ func (f *LogFileFlag) Type() string { // Complete is the Cobra compatible completion function for this flag. func (f *LogFileFlag) Complete(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) { return []string{ - "stdout", - "stderr", + LogFileStdout, + LogFileStderr, }, cobra.ShellCompDirectiveDefault } diff --git a/libs/flags/log_file_flag_test.go b/libs/flags/log_file_flag_test.go index 3a9df3b465..bd05de2ee6 100644 --- a/libs/flags/log_file_flag_test.go +++ b/libs/flags/log_file_flag_test.go @@ -12,35 +12,35 @@ import ( func TestLogFileFlagDefault(t *testing.T) { f := NewLogFileFlag() assert.Equal(t, os.Stderr, f.Writer()) - assert.Equal(t, "stderr", f.String()) + assert.Equal(t, LogFileStderr, f.String()) } func TestLogFileFlagSetStdout(t *testing.T) { var err error f := NewLogFileFlag() - err = f.Set("stdout") + err = f.Set(LogFileStdout) require.NoError(t, err) assert.Equal(t, os.Stdout, f.Writer()) - assert.Equal(t, "stdout", f.String()) + assert.Equal(t, LogFileStdout, f.String()) err = f.Set("STDOUT") require.NoError(t, err) assert.Equal(t, os.Stdout, f.Writer()) - assert.Equal(t, "stdout", f.String()) + assert.Equal(t, LogFileStdout, f.String()) } func TestLogFileFlagSetStderr(t *testing.T) { var err error f := NewLogFileFlag() - err = f.Set("stderr") + err = f.Set(LogFileStderr) require.NoError(t, err) assert.Equal(t, os.Stderr, f.Writer()) - assert.Equal(t, "stderr", f.String()) + assert.Equal(t, LogFileStderr, f.String()) err = f.Set("STDERR") require.NoError(t, err) assert.Equal(t, os.Stderr, f.Writer()) - assert.Equal(t, "stderr", f.String()) + assert.Equal(t, LogFileStderr, f.String()) } func TestLogFileFlagSetNewFile(t *testing.T) { From aab98d997811895616986823ade286f9bd3e1902 Mon Sep 17 00:00:00 2001 From: Varun Deep Saini Date: Tue, 25 Nov 2025 00:47:36 +0530 Subject: [PATCH 2/2] fixed nil in context --- libs/cmdio/io.go | 8 ++++++-- libs/cmdio/io_test.go | 9 +++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/libs/cmdio/io.go b/libs/cmdio/io.go index 8317ab34b1..c5e3257ef2 100644 --- a/libs/cmdio/io.go +++ b/libs/cmdio/io.go @@ -319,8 +319,12 @@ func Spinner(ctx context.Context) chan string { // CoordinatedWriter returns a writer that coordinates with the spinner // to avoid interference between spinner updates and log messages. func CoordinatedWriter(ctx context.Context) io.Writer { - c := fromContext(ctx) - return c.coordErr + io, ok := ctx.Value(cmdIOKey).(*cmdIO) + if !ok { + // cmdIO not yet initialized (e.g., during early logger setup) + return os.Stderr + } + return io.coordErr } type cmdIOType int diff --git a/libs/cmdio/io_test.go b/libs/cmdio/io_test.go index afa8091b6a..e3277e3245 100644 --- a/libs/cmdio/io_test.go +++ b/libs/cmdio/io_test.go @@ -62,3 +62,12 @@ func TestCoordinatedWriterConcurrent(t *testing.T) { // All messages should be written assert.Equal(t, 10, bytes.Count(buf.Bytes(), []byte("message\n"))) } + +func TestCoordinatedWriterBeforeCmdIOInitialized(t *testing.T) { + // Test that CoordinatedWriter returns a fallback writer when cmdIO + // is not yet in the context (e.g., during early logger initialization) + ctx := context.Background() + w := CoordinatedWriter(ctx) + require.NotNil(t, w) + // Should return os.Stderr as fallback, not panic +}