fix(logs): enhance log streaming with retry mechanism and error handling (#3503)

* fix(logs): enhance log streaming with retry mechanism and error handling

* fix(logs): improve log tailing with enhanced retry logic and error handling

* renamed function

* Enhance log streaming with retry logic based on pod status

* Refactor Pod struct definition by moving it above the shouldStopRetrying method

* Implement exponential backoff for log streaming retries

* fix: reduce log channel buffer size to prevent drops
mine
Ümüt Özalp 2025-09-17 03:31:59 +02:00 committed by GitHub
parent a6fbf984cb
commit c1d07ea699
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 142 additions and 48 deletions

View File

@ -13,6 +13,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/cenkalti/backoff/v4"
"github.com/derailed/k9s/internal" "github.com/derailed/k9s/internal"
"github.com/derailed/k9s/internal/client" "github.com/derailed/k9s/internal/client"
"github.com/derailed/k9s/internal/render" "github.com/derailed/k9s/internal/render"
@ -39,9 +40,16 @@ var (
_ ImageLister = (*Pod)(nil) _ ImageLister = (*Pod)(nil)
) )
type streamResult int
const ( const (
logRetryCount = 20 logRetryCount = 20
logRetryWait = 1 * time.Second logBackoffInitial = 500 * time.Millisecond
logBackoffMax = 30 * time.Second
logChannelBuffer = 50 // Buffer size for log channel to reduce drops
streamEOF streamResult = iota // legit container log close (no retry)
streamError // retryable error (network, auth, etc.)
streamCanceled // context canceled
) )
// Pod represents a pod resource. // Pod represents a pod resource.
@ -49,6 +57,25 @@ type Pod struct {
Resource Resource
} }
// shouldStopRetrying checks if we should stop retrying log streaming based on pod status.
func (p *Pod) shouldStopRetrying(path string) bool {
pod, err := p.GetInstance(path)
if err != nil {
return true
}
if pod.DeletionTimestamp != nil {
return true
}
switch pod.Status.Phase {
case v1.PodSucceeded, v1.PodFailed:
return true
}
return false
}
// Get returns a resource instance if found, else an error. // Get returns a resource instance if found, else an error.
func (p *Pod) Get(ctx context.Context, path string) (runtime.Object, error) { func (p *Pod) Get(ctx context.Context, path string) (runtime.Object, error) {
o, err := p.Resource.Get(ctx, path) o, err := p.Resource.Get(ctx, path)
@ -328,47 +355,106 @@ func (p *Pod) Scan(_ context.Context, gvr *client.GVR, fqn string, wait bool) (R
// Helpers... // Helpers...
func tailLogs(ctx context.Context, logger Logger, opts *LogOptions) LogChan { func tailLogs(ctx context.Context, logger Logger, opts *LogOptions) LogChan {
var ( out := make(LogChan, logChannelBuffer)
out = make(LogChan, 2) var wg sync.WaitGroup
wg sync.WaitGroup
)
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
podOpts := opts.ToPodLogOptions() podOpts := opts.ToPodLogOptions()
// Setup exponential backoff following project pattern
bf := backoff.NewExponentialBackOff()
bf.InitialInterval = logBackoffInitial
bf.MaxElapsedTime = 0
bf.MaxInterval = logBackoffMax / 2
backoffCtx := backoff.WithContext(bf, ctx)
delay := logBackoffInitial
for range logRetryCount { for range logRetryCount {
req, err := logger.Logs(opts.Path, podOpts) // Check if we should stop retrying based on pod status
if err == nil { if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
// This call will block if nothing is in the stream!! slog.Debug("Stopping log retry - pod is terminating or deleted",
stream, e := req.Stream(ctx)
if e == nil {
wg.Add(1)
go readLogs(ctx, &wg, stream, out, opts)
return
}
slog.Error("Stream logs failed",
slogs.Error, e,
slogs.Container, opts.Info(), slogs.Container, opts.Info(),
) )
} else { return
}
req, err := logger.Logs(opts.Path, podOpts)
if err != nil {
slog.Error("Log request failed", slog.Error("Log request failed",
slogs.Container, opts.Info(), slogs.Container, opts.Info(),
slogs.Error, err, slogs.Error, err,
) )
select {
case <-ctx.Done():
return
case <-time.After(delay):
if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
return
}
}
continue
} }
select { stream, e := req.Stream(ctx)
case <-ctx.Done(): if e != nil {
return slog.Error("Stream logs failed",
default: slogs.Error, e,
if err != nil { slogs.Container, opts.Info(),
out <- opts.ToErrLogItem(err) )
select {
case <-ctx.Done():
return
case <-time.After(delay):
if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
return
}
} }
time.Sleep(logRetryWait) continue
} }
// Process logs until completion
result := readLogs(ctx, stream, out, opts)
switch result {
case streamEOF:
slog.Debug("Log stream ended cleanly",
slogs.Container, opts.Info(),
)
return
case streamError:
// Check if we should stop retrying based on pod status
if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
slog.Debug("Stopping log retry after stream error - pod is terminating or deleted",
slogs.Container, opts.Info(),
)
return
}
slog.Debug("Log stream error, retrying",
slogs.Container, opts.Info(),
)
select {
case <-ctx.Done():
return
case <-time.After(delay):
if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
return
}
}
continue
case streamCanceled:
return
}
// Reset backoff and delay on successful connection
bf.Reset()
delay = logBackoffInitial
} }
// Out of retries
out <- opts.ToErrLogItem(fmt.Errorf("failed to maintain log stream after %d retries", logRetryCount))
}() }()
go func() { go func() {
wg.Wait() wg.Wait()
close(out) close(out)
@ -377,45 +463,53 @@ func tailLogs(ctx context.Context, logger Logger, opts *LogOptions) LogChan {
return out return out
} }
func readLogs(ctx context.Context, wg *sync.WaitGroup, stream io.ReadCloser, out chan<- *LogItem, opts *LogOptions) { func readLogs(ctx context.Context, stream io.ReadCloser, out chan<- *LogItem, opts *LogOptions) streamResult {
defer func() { defer func() {
if err := stream.Close(); err != nil { if err := stream.Close(); err != nil && !errors.Is(err, io.ErrClosedPipe) {
slog.Error("Fail to close stream", slog.Error("Fail to close stream",
slogs.Container, opts.Info(), slogs.Container, opts.Info(),
slogs.Error, err, slogs.Error, err,
) )
} }
wg.Done()
}() }()
slog.Debug("Processing logs", slogs.Options, opts.Info())
r := bufio.NewReader(stream) r := bufio.NewReader(stream)
for { for {
var item *LogItem bytes, err := r.ReadBytes('\n')
if bytes, err := r.ReadBytes('\n'); err == nil { if err == nil {
item = opts.ToLogItem(tview.EscapeBytes(bytes)) item := opts.ToLogItem(tview.EscapeBytes(bytes))
} else { select {
if errors.Is(err, io.EOF) { case <-ctx.Done():
e := fmt.Errorf("stream closed %w for %s", err, opts.Info()) return streamCanceled
item = opts.ToErrLogItem(e) case out <- item:
slog.Warn("Log reader EOF", default:
// Avoid deadlock if consumer is too slow
slog.Warn("Dropping log line due to slow consumer",
slogs.Container, opts.Info(), slogs.Container, opts.Info(),
slogs.Error, e,
) )
} else {
e := fmt.Errorf("stream canceled %w for %s", err, opts.Info())
item = opts.ToErrLogItem(e)
slog.Warn("Log stream canceled")
} }
continue
} }
select {
case <-ctx.Done(): if errors.Is(err, io.EOF) {
return if len(bytes) > 0 {
case out <- item: // Emit trailing partial line before EOF
if item.IsError { out <- opts.ToLogItem(tview.EscapeBytes(bytes))
return
} }
slog.Debug("Log reader reached EOF", slogs.Container, opts.Info())
out <- opts.ToErrLogItem(fmt.Errorf("stream closed: %w for %s", err, opts.Info()))
return streamEOF
} }
// Non-EOF error
slog.Debug("Log stream error, will retry connection",
slogs.Container, opts.Info(),
slogs.Error, fmt.Errorf("stream error: %w for %s", err, opts.Info()),
)
// Don't send stream errors to user - they will be retried
// Only final retry exhaustion message is shown
return streamError
} }
} }