From c1d07ea69957fcb07a1be757d8f34b18afa3e441 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=9Cm=C3=BCt=20=C3=96zalp?= <54961032+uozalp@users.noreply.github.com> Date: Wed, 17 Sep 2025 03:31:59 +0200 Subject: [PATCH] fix(logs): enhance log streaming with retry mechanism and error handling (#3503) * fix(logs): enhance log streaming with retry mechanism and error handling * fix(logs): improve log tailing with enhanced retry logic and error handling * renamed function * Enhance log streaming with retry logic based on pod status * Refactor Pod struct definition by moving it above the shouldStopRetrying method * Implement exponential backoff for log streaming retries * fix: reduce log channel buffer size to prevent drops --- internal/dao/pod.go | 190 +++++++++++++++++++++++++++++++++----------- 1 file changed, 142 insertions(+), 48 deletions(-) diff --git a/internal/dao/pod.go b/internal/dao/pod.go index ddd53a9c..20ece728 100644 --- a/internal/dao/pod.go +++ b/internal/dao/pod.go @@ -13,6 +13,7 @@ import ( "sync" "time" + "github.com/cenkalti/backoff/v4" "github.com/derailed/k9s/internal" "github.com/derailed/k9s/internal/client" "github.com/derailed/k9s/internal/render" @@ -39,9 +40,16 @@ var ( _ ImageLister = (*Pod)(nil) ) +type streamResult int + const ( - logRetryCount = 20 - logRetryWait = 1 * time.Second + logRetryCount = 20 + logBackoffInitial = 500 * time.Millisecond + logBackoffMax = 30 * time.Second + logChannelBuffer = 50 // Buffer size for log channel to reduce drops + streamEOF streamResult = iota // legit container log close (no retry) + streamError // retryable error (network, auth, etc.) + streamCanceled // context canceled ) // Pod represents a pod resource. @@ -49,6 +57,25 @@ type Pod struct { Resource } +// shouldStopRetrying checks if we should stop retrying log streaming based on pod status. +func (p *Pod) shouldStopRetrying(path string) bool { + pod, err := p.GetInstance(path) + if err != nil { + return true + } + + if pod.DeletionTimestamp != nil { + return true + } + + switch pod.Status.Phase { + case v1.PodSucceeded, v1.PodFailed: + return true + } + + return false +} + // Get returns a resource instance if found, else an error. func (p *Pod) Get(ctx context.Context, path string) (runtime.Object, error) { o, err := p.Resource.Get(ctx, path) @@ -328,47 +355,106 @@ func (p *Pod) Scan(_ context.Context, gvr *client.GVR, fqn string, wait bool) (R // Helpers... func tailLogs(ctx context.Context, logger Logger, opts *LogOptions) LogChan { - var ( - out = make(LogChan, 2) - wg sync.WaitGroup - ) + out := make(LogChan, logChannelBuffer) + var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() podOpts := opts.ToPodLogOptions() + + // Setup exponential backoff following project pattern + bf := backoff.NewExponentialBackOff() + bf.InitialInterval = logBackoffInitial + bf.MaxElapsedTime = 0 + bf.MaxInterval = logBackoffMax / 2 + backoffCtx := backoff.WithContext(bf, ctx) + delay := logBackoffInitial + for range logRetryCount { - req, err := logger.Logs(opts.Path, podOpts) - if err == nil { - // This call will block if nothing is in the stream!! - stream, e := req.Stream(ctx) - if e == nil { - wg.Add(1) - go readLogs(ctx, &wg, stream, out, opts) - return - } - slog.Error("Stream logs failed", - slogs.Error, e, + // Check if we should stop retrying based on pod status + if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) { + slog.Debug("Stopping log retry - pod is terminating or deleted", slogs.Container, opts.Info(), ) - } else { + return + } + + req, err := logger.Logs(opts.Path, podOpts) + if err != nil { slog.Error("Log request failed", slogs.Container, opts.Info(), slogs.Error, err, ) + select { + case <-ctx.Done(): + return + case <-time.After(delay): + if delay = backoffCtx.NextBackOff(); delay == backoff.Stop { + return + } + } + continue } - select { - case <-ctx.Done(): - return - default: - if err != nil { - out <- opts.ToErrLogItem(err) + stream, e := req.Stream(ctx) + if e != nil { + slog.Error("Stream logs failed", + slogs.Error, e, + slogs.Container, opts.Info(), + ) + select { + case <-ctx.Done(): + return + case <-time.After(delay): + if delay = backoffCtx.NextBackOff(); delay == backoff.Stop { + return + } } - time.Sleep(logRetryWait) + continue } + + // Process logs until completion + result := readLogs(ctx, stream, out, opts) + switch result { + case streamEOF: + slog.Debug("Log stream ended cleanly", + slogs.Container, opts.Info(), + ) + return + case streamError: + // Check if we should stop retrying based on pod status + if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) { + slog.Debug("Stopping log retry after stream error - pod is terminating or deleted", + slogs.Container, opts.Info(), + ) + return + } + slog.Debug("Log stream error, retrying", + slogs.Container, opts.Info(), + ) + select { + case <-ctx.Done(): + return + case <-time.After(delay): + if delay = backoffCtx.NextBackOff(); delay == backoff.Stop { + return + } + } + continue + case streamCanceled: + return + } + + // Reset backoff and delay on successful connection + bf.Reset() + delay = logBackoffInitial } + + // Out of retries + out <- opts.ToErrLogItem(fmt.Errorf("failed to maintain log stream after %d retries", logRetryCount)) }() + go func() { wg.Wait() close(out) @@ -377,45 +463,53 @@ func tailLogs(ctx context.Context, logger Logger, opts *LogOptions) LogChan { return out } -func readLogs(ctx context.Context, wg *sync.WaitGroup, stream io.ReadCloser, out chan<- *LogItem, opts *LogOptions) { +func readLogs(ctx context.Context, stream io.ReadCloser, out chan<- *LogItem, opts *LogOptions) streamResult { defer func() { - if err := stream.Close(); err != nil { + if err := stream.Close(); err != nil && !errors.Is(err, io.ErrClosedPipe) { slog.Error("Fail to close stream", slogs.Container, opts.Info(), slogs.Error, err, ) } - wg.Done() }() - slog.Debug("Processing logs", slogs.Options, opts.Info()) r := bufio.NewReader(stream) + for { - var item *LogItem - if bytes, err := r.ReadBytes('\n'); err == nil { - item = opts.ToLogItem(tview.EscapeBytes(bytes)) - } else { - if errors.Is(err, io.EOF) { - e := fmt.Errorf("stream closed %w for %s", err, opts.Info()) - item = opts.ToErrLogItem(e) - slog.Warn("Log reader EOF", + bytes, err := r.ReadBytes('\n') + if err == nil { + item := opts.ToLogItem(tview.EscapeBytes(bytes)) + select { + case <-ctx.Done(): + return streamCanceled + case out <- item: + default: + // Avoid deadlock if consumer is too slow + slog.Warn("Dropping log line due to slow consumer", slogs.Container, opts.Info(), - slogs.Error, e, ) - } else { - e := fmt.Errorf("stream canceled %w for %s", err, opts.Info()) - item = opts.ToErrLogItem(e) - slog.Warn("Log stream canceled") } + continue } - select { - case <-ctx.Done(): - return - case out <- item: - if item.IsError { - return + + if errors.Is(err, io.EOF) { + if len(bytes) > 0 { + // Emit trailing partial line before EOF + out <- opts.ToLogItem(tview.EscapeBytes(bytes)) } + slog.Debug("Log reader reached EOF", slogs.Container, opts.Info()) + out <- opts.ToErrLogItem(fmt.Errorf("stream closed: %w for %s", err, opts.Info())) + return streamEOF } + + // Non-EOF error + slog.Debug("Log stream error, will retry connection", + slogs.Container, opts.Info(), + slogs.Error, fmt.Errorf("stream error: %w for %s", err, opts.Info()), + ) + // Don't send stream errors to user - they will be retried + // Only final retry exhaustion message is shown + return streamError } }