Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/pipelines/root/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ type logFlags struct {
debug bool
}

func (f *logFlags) makeLogHandler(opts slog.HandlerOptions) (slog.Handler, error) {
func (f *logFlags) makeLogHandler(ctx context.Context, opts slog.HandlerOptions) (slog.Handler, error) {
switch f.output {
case flags.OutputJSON:
return slog.NewJSONHandler(f.file.Writer(), &opts), nil
case flags.OutputText:
w := f.file.Writer()
// If logging to stderr, use the coordinated writer to avoid interference with spinner
if f.file.String() == flags.LogFileStderr {
w = cmdio.CoordinatedWriter(ctx)
}
return handler.NewFriendlyHandler(w, &handler.Options{
Color: cmdio.IsTTY(w),
Level: opts.Level,
Expand Down Expand Up @@ -66,7 +70,7 @@ func (f *logFlags) initializeContext(ctx context.Context) (context.Context, erro
return nil, err
}

handler, err := f.makeLogHandler(opts)
handler, err := f.makeLogHandler(ctx, opts)
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions cmd/root/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,16 @@ type logFlags struct {
debug bool
}

func (f *logFlags) makeLogHandler(opts slog.HandlerOptions) (slog.Handler, error) {
func (f *logFlags) makeLogHandler(ctx context.Context, opts slog.HandlerOptions) (slog.Handler, error) {
switch f.output {
case flags.OutputJSON:
return slog.NewJSONHandler(f.file.Writer(), &opts), nil
case flags.OutputText:
w := f.file.Writer()
// If logging to stderr, use the coordinated writer to avoid interference with spinner
if f.file.String() == flags.LogFileStderr {
w = cmdio.CoordinatedWriter(ctx)
}
return handler.NewFriendlyHandler(w, &handler.Options{
Color: cmdio.IsTTY(w),
Level: opts.Level,
Expand Down Expand Up @@ -65,7 +69,7 @@ func (f *logFlags) initializeContext(ctx context.Context) (context.Context, erro
return nil, err
}

handler, err := f.makeLogHandler(opts)
handler, err := f.makeLogHandler(ctx, opts)
if err != nil {
return nil, err
}
Expand Down
101 changes: 101 additions & 0 deletions libs/cmdio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,89 @@ import (
"github.com/mattn/go-isatty"
)

// writeRequest represents a write operation to be performed by the coordinator.
type writeRequest struct {
data []byte
result chan writeResult
}

type writeResult struct {
n int
err error
}

type spinnerOp struct {
spinner *spinner.Spinner // nil means clear spinner
}

// coordinatedWriter coordinates writes to stderr, ensuring that spinner
// updates and log messages don't interfere with each other.
// All operations are lock-free and performed via channels.
type coordinatedWriter struct {
underlying io.Writer
writeChan chan writeRequest
spinnerOp chan spinnerOp
stopChan chan struct{}
spinner *spinner.Spinner // only accessed by coordinator goroutine
}

func newCoordinatedWriter(underlying io.Writer) *coordinatedWriter {
w := &coordinatedWriter{
underlying: underlying,
writeChan: make(chan writeRequest, 10),
spinnerOp: make(chan spinnerOp, 1),
stopChan: make(chan struct{}),
}
go w.coordinator()
return w
}

// coordinator runs in a goroutine and handles all writes sequentially.
func (w *coordinatedWriter) coordinator() {
for {
select {
case <-w.stopChan:
return
case op := <-w.spinnerOp:
w.spinner = op.spinner
case req := <-w.writeChan:
// If spinner is active, pause it before writing
if w.spinner != nil && w.spinner.Active() {
w.spinner.Stop()
n, err := w.underlying.Write(req.data)
w.spinner.Start()
req.result <- writeResult{n: n, err: err}
} else {
n, err := w.underlying.Write(req.data)
req.result <- writeResult{n: n, err: err}
}
}
}
}

func (w *coordinatedWriter) Write(p []byte) (n int, err error) {
// Make a copy since p might be reused by the caller
data := make([]byte, len(p))
copy(data, p)

result := make(chan writeResult, 1)
w.writeChan <- writeRequest{data: data, result: result}
res := <-result
return res.n, res.err
}

func (w *coordinatedWriter) setSpinner(sp *spinner.Spinner) {
w.spinnerOp <- spinnerOp{spinner: sp}
}

func (w *coordinatedWriter) clearSpinner() {
w.spinnerOp <- spinnerOp{spinner: nil}
}

func (w *coordinatedWriter) close() {
close(w.stopChan)
}

// cmdIO is the private instance, that is not supposed to be accessed
// outside of `cmdio` package. Use the public package-level functions
// to access the inner state.
Expand All @@ -30,6 +113,7 @@ type cmdIO struct {
in io.Reader
out io.Writer
err io.Writer
coordErr *coordinatedWriter
}

func NewIO(ctx context.Context, outputFormat flags.Output, in io.Reader, out, err io.Writer, headerTemplate, template string) *cmdIO {
Expand All @@ -50,6 +134,8 @@ func NewIO(ctx context.Context, outputFormat flags.Output, in io.Reader, out, er
// - not to be running in Git Bash on Windows
prompt := interactive && IsTTY(in) && IsTTY(out) && !isGitBash(ctx)

coordErr := newCoordinatedWriter(err)

return &cmdIO{
interactive: interactive,
prompt: prompt,
Expand All @@ -59,6 +145,7 @@ func NewIO(ctx context.Context, outputFormat flags.Output, in io.Reader, out, er
in: in,
out: out,
err: err,
coordErr: coordErr,
}
}

Expand Down Expand Up @@ -196,11 +283,14 @@ func (c *cmdIO) Spinner(ctx context.Context) chan string {
spinner.WithWriter(c.err),
spinner.WithColor("green"))
sp.Start()
// Register spinner with coordinated writer so it can pause during log writes
c.coordErr.setSpinner(sp)
}
updates := make(chan string)
go func() {
if c.interactive {
defer sp.Stop()
defer c.coordErr.clearSpinner()
}
for {
select {
Expand All @@ -226,6 +316,17 @@ func Spinner(ctx context.Context) chan string {
return c.Spinner(ctx)
}

// CoordinatedWriter returns a writer that coordinates with the spinner
// to avoid interference between spinner updates and log messages.
func CoordinatedWriter(ctx context.Context) io.Writer {
io, ok := ctx.Value(cmdIOKey).(*cmdIO)
if !ok {
// cmdIO not yet initialized (e.g., during early logger setup)
return os.Stderr
}
return io.coordErr
}

type cmdIOType int

var cmdIOKey cmdIOType
Expand Down
54 changes: 54 additions & 0 deletions libs/cmdio/io_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package cmdio

import (
"bytes"
"context"
"sync"
"testing"
"time"

"github.com/databricks/cli/libs/env"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIsGitBash(t *testing.T) {
Expand All @@ -17,3 +21,53 @@ func TestIsGitBash(t *testing.T) {
ctx = env.Set(ctx, "PS1", "\\[\033]0;$TITLEPREFIX:$PWD\007\\]\n\\[\033[32m\\]\\u@\\h \\[\033[35m\\]$MSYSTEM \\[\033[33m\\]\\w\\[\033[36m\\]`__git_ps1`\\[\033[0m\\]\n$")
assert.True(t, isGitBash(ctx))
}

func TestCoordinatedWriter(t *testing.T) {
var buf bytes.Buffer
w := newCoordinatedWriter(&buf)
defer w.close()

// Test simple write
n, err := w.Write([]byte("test message\n"))
require.NoError(t, err)
assert.Equal(t, 13, n)

// Allow coordinator goroutine to process
time.Sleep(10 * time.Millisecond)
assert.Equal(t, "test message\n", buf.String())
}

func TestCoordinatedWriterConcurrent(t *testing.T) {
var buf bytes.Buffer
w := newCoordinatedWriter(&buf)
defer w.close()

// Write concurrently from multiple goroutines
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func(i int) {
defer wg.Done()
msg := []byte("message\n")
_, err := w.Write(msg)
require.NoError(t, err)
}(i)
}

wg.Wait()

// Allow coordinator goroutine to process all writes
time.Sleep(50 * time.Millisecond)

// All messages should be written
assert.Equal(t, 10, bytes.Count(buf.Bytes(), []byte("message\n")))
}

func TestCoordinatedWriterBeforeCmdIOInitialized(t *testing.T) {
// Test that CoordinatedWriter returns a fallback writer when cmdIO
// is not yet in the context (e.g., during early logger initialization)
ctx := context.Background()
w := CoordinatedWriter(ctx)
require.NotNil(t, w)
// Should return os.Stderr as fallback, not panic
}
15 changes: 10 additions & 5 deletions libs/flags/log_file_flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ import (
"github.com/spf13/cobra"
)

const (
LogFileStderr = "stderr"
LogFileStdout = "stdout"
)

// Abstract over files that are already open (e.g. stderr) and
// files that need to be opened before use.
type logFile interface {
Expand Down Expand Up @@ -71,7 +76,7 @@ type LogFileFlag struct {

func NewLogFileFlag() LogFileFlag {
return LogFileFlag{
name: "stderr",
name: LogFileStderr,
logFile: &nopLogFile{os.Stderr},
}
}
Expand All @@ -83,10 +88,10 @@ func (f *LogFileFlag) String() string {
func (f *LogFileFlag) Set(s string) error {
lower := strings.ToLower(s)
switch lower {
case "stderr":
case LogFileStderr:
f.name = lower
f.logFile = &nopLogFile{os.Stderr}
case "stdout":
case LogFileStdout:
f.name = lower
f.logFile = &nopLogFile{os.Stdout}
default:
Expand All @@ -104,7 +109,7 @@ func (f *LogFileFlag) Type() string {
// Complete is the Cobra compatible completion function for this flag.
func (f *LogFileFlag) Complete(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
return []string{
"stdout",
"stderr",
LogFileStdout,
LogFileStderr,
}, cobra.ShellCompDirectiveDefault
}
14 changes: 7 additions & 7 deletions libs/flags/log_file_flag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,35 @@ import (
func TestLogFileFlagDefault(t *testing.T) {
f := NewLogFileFlag()
assert.Equal(t, os.Stderr, f.Writer())
assert.Equal(t, "stderr", f.String())
assert.Equal(t, LogFileStderr, f.String())
}

func TestLogFileFlagSetStdout(t *testing.T) {
var err error

f := NewLogFileFlag()
err = f.Set("stdout")
err = f.Set(LogFileStdout)
require.NoError(t, err)
assert.Equal(t, os.Stdout, f.Writer())
assert.Equal(t, "stdout", f.String())
assert.Equal(t, LogFileStdout, f.String())
err = f.Set("STDOUT")
require.NoError(t, err)
assert.Equal(t, os.Stdout, f.Writer())
assert.Equal(t, "stdout", f.String())
assert.Equal(t, LogFileStdout, f.String())
}

func TestLogFileFlagSetStderr(t *testing.T) {
var err error

f := NewLogFileFlag()
err = f.Set("stderr")
err = f.Set(LogFileStderr)
require.NoError(t, err)
assert.Equal(t, os.Stderr, f.Writer())
assert.Equal(t, "stderr", f.String())
assert.Equal(t, LogFileStderr, f.String())
err = f.Set("STDERR")
require.NoError(t, err)
assert.Equal(t, os.Stderr, f.Writer())
assert.Equal(t, "stderr", f.String())
assert.Equal(t, LogFileStderr, f.String())
}

func TestLogFileFlagSetNewFile(t *testing.T) {
Expand Down