639 lines
16 KiB
Go
639 lines
16 KiB
Go
// SPDX-License-Identifier: Apache-2.0
|
|
// Copyright Authors of K9s
|
|
|
|
package dao
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/derailed/k9s/internal"
|
|
"github.com/derailed/k9s/internal/client"
|
|
"github.com/derailed/k9s/internal/render"
|
|
"github.com/derailed/k9s/internal/slogs"
|
|
"github.com/derailed/k9s/internal/watch"
|
|
"github.com/derailed/tview"
|
|
v1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
restclient "k8s.io/client-go/rest"
|
|
mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
|
|
)
|
|
|
|
var (
|
|
_ Accessor = (*Pod)(nil)
|
|
_ Nuker = (*Pod)(nil)
|
|
_ Loggable = (*Pod)(nil)
|
|
_ Controller = (*Pod)(nil)
|
|
_ ContainsPodSpec = (*Pod)(nil)
|
|
_ ImageLister = (*Pod)(nil)
|
|
)
|
|
|
|
type streamResult int
|
|
|
|
const (
|
|
logRetryCount = 20
|
|
logBackoffInitial = 500 * time.Millisecond
|
|
logBackoffMax = 30 * time.Second
|
|
logChannelBuffer = 50 // Buffer size for log channel to reduce drops
|
|
streamEOF streamResult = iota // legit container log close (no retry)
|
|
streamError // retryable error (network, auth, etc.)
|
|
streamCanceled // context canceled
|
|
)
|
|
|
|
// Pod represents a pod resource.
|
|
type Pod struct {
|
|
Resource
|
|
}
|
|
|
|
// shouldStopRetrying checks if we should stop retrying log streaming based on pod status.
|
|
func (p *Pod) shouldStopRetrying(path string) bool {
|
|
pod, err := p.GetInstance(path)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
|
|
if pod.DeletionTimestamp != nil {
|
|
return true
|
|
}
|
|
|
|
switch pod.Status.Phase {
|
|
case v1.PodSucceeded, v1.PodFailed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// Get returns a resource instance if found, else an error.
|
|
func (p *Pod) Get(ctx context.Context, path string) (runtime.Object, error) {
|
|
o, err := p.Resource.Get(ctx, path)
|
|
if err != nil {
|
|
return o, err
|
|
}
|
|
|
|
u, ok := o.(*unstructured.Unstructured)
|
|
if !ok {
|
|
return nil, fmt.Errorf("expecting *unstructured.Unstructured but got `%T", o)
|
|
}
|
|
|
|
var pmx *mv1beta1.PodMetrics
|
|
if withMx, ok := ctx.Value(internal.KeyWithMetrics).(bool); ok && withMx {
|
|
pmx, _ = client.DialMetrics(p.Client()).FetchPodMetrics(ctx, path)
|
|
}
|
|
|
|
return &render.PodWithMetrics{Raw: u, MX: pmx}, nil
|
|
}
|
|
|
|
// ListImages lists container images.
|
|
func (p *Pod) ListImages(_ context.Context, path string) ([]string, error) {
|
|
pod, err := p.GetInstance(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return render.ExtractImages(&pod.Spec), nil
|
|
}
|
|
|
|
// List returns a collection of nodes.
|
|
func (p *Pod) List(ctx context.Context, ns string) ([]runtime.Object, error) {
|
|
oo, err := p.Resource.List(ctx, ns)
|
|
if err != nil {
|
|
return oo, err
|
|
}
|
|
|
|
var pmx client.PodsMetricsMap
|
|
if withMx, ok := ctx.Value(internal.KeyWithMetrics).(bool); ok && withMx {
|
|
pmx, _ = client.DialMetrics(p.Client()).FetchPodsMetricsMap(ctx, ns)
|
|
}
|
|
sel, _ := ctx.Value(internal.KeyFields).(string)
|
|
fsel, err := labels.ConvertSelectorToLabelsMap(sel)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
nodeName := fsel["spec.nodeName"]
|
|
|
|
res := make([]runtime.Object, 0, len(oo))
|
|
for _, o := range oo {
|
|
u, ok := o.(*unstructured.Unstructured)
|
|
if !ok {
|
|
return res, fmt.Errorf("expecting *unstructured.Unstructured but got `%T", o)
|
|
}
|
|
fqn := extractFQN(o)
|
|
if nodeName == "" {
|
|
res = append(res, &render.PodWithMetrics{Raw: u, MX: pmx[fqn]})
|
|
continue
|
|
}
|
|
|
|
spec, ok := u.Object["spec"].(map[string]any)
|
|
if !ok {
|
|
return res, fmt.Errorf("expecting interface map but got `%T", o)
|
|
}
|
|
if spec["nodeName"] == nodeName {
|
|
res = append(res, &render.PodWithMetrics{Raw: u, MX: pmx[fqn]})
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
// Logs fetch container logs for a given pod and container.
|
|
func (p *Pod) Logs(path string, opts *v1.PodLogOptions) (*restclient.Request, error) {
|
|
ns, n := client.Namespaced(path)
|
|
auth, err := p.Client().CanI(ns, client.NewGVR(client.PodGVR.String()+":log"), n, client.GetAccess)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if !auth {
|
|
return nil, fmt.Errorf("user is not authorized to view pod logs")
|
|
}
|
|
|
|
dial, err := p.Client().DialLogs()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return dial.CoreV1().Pods(ns).GetLogs(n, opts), nil
|
|
}
|
|
|
|
// Containers returns all container names on pod.
|
|
func (p *Pod) Containers(path string, includeInit bool) ([]string, error) {
|
|
pod, err := p.GetInstance(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cc := make([]string, 0, len(pod.Spec.Containers)+len(pod.Spec.InitContainers))
|
|
for i := range pod.Spec.Containers {
|
|
cc = append(cc, pod.Spec.Containers[i].Name)
|
|
}
|
|
|
|
if includeInit {
|
|
for i := range pod.Spec.InitContainers {
|
|
cc = append(cc, pod.Spec.InitContainers[i].Name)
|
|
}
|
|
}
|
|
|
|
return cc, nil
|
|
}
|
|
|
|
// Pod returns a pod victim by name.
|
|
func (*Pod) Pod(fqn string) (string, error) {
|
|
return fqn, nil
|
|
}
|
|
|
|
// GetInstance returns a pod instance.
|
|
func (p *Pod) GetInstance(fqn string) (*v1.Pod, error) {
|
|
o, err := p.getFactory().Get(p.gvr, fqn, true, labels.Everything())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var pod v1.Pod
|
|
err = runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &pod)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &pod, nil
|
|
}
|
|
|
|
// TailLogs tails a given container logs.
|
|
func (p *Pod) TailLogs(ctx context.Context, opts *LogOptions) ([]LogChan, error) {
|
|
fac, ok := ctx.Value(internal.KeyFactory).(*watch.Factory)
|
|
if !ok {
|
|
return nil, errors.New("no factory in context")
|
|
}
|
|
o, err := fac.Get(p.gvr, opts.Path, true, labels.Everything())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var po v1.Pod
|
|
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &po); err != nil {
|
|
return nil, err
|
|
}
|
|
coCounts := len(po.Spec.InitContainers) + len(po.Spec.Containers) + len(po.Spec.EphemeralContainers)
|
|
if coCounts == 1 {
|
|
opts.SingleContainer = true
|
|
}
|
|
|
|
outs := make([]LogChan, 0, coCounts)
|
|
if co, ok := GetDefaultContainer(&po.ObjectMeta, &po.Spec); ok && !opts.AllContainers {
|
|
opts.DefaultContainer = co
|
|
return append(outs, tailLogs(ctx, p, opts)), nil
|
|
}
|
|
if opts.HasContainer() && !opts.AllContainers {
|
|
return append(outs, tailLogs(ctx, p, opts)), nil
|
|
}
|
|
for i := range po.Spec.InitContainers {
|
|
cfg := opts.Clone()
|
|
cfg.Container = po.Spec.InitContainers[i].Name
|
|
outs = append(outs, tailLogs(ctx, p, cfg))
|
|
}
|
|
for i := range po.Spec.Containers {
|
|
cfg := opts.Clone()
|
|
cfg.Container = po.Spec.Containers[i].Name
|
|
outs = append(outs, tailLogs(ctx, p, cfg))
|
|
}
|
|
for i := range po.Spec.EphemeralContainers {
|
|
cfg := opts.Clone()
|
|
cfg.Container = po.Spec.EphemeralContainers[i].Name
|
|
outs = append(outs, tailLogs(ctx, p, cfg))
|
|
}
|
|
|
|
return outs, nil
|
|
}
|
|
|
|
// ScanSA scans for ServiceAccount refs.
|
|
func (p *Pod) ScanSA(_ context.Context, fqn string, wait bool) (Refs, error) {
|
|
ns, n := client.Namespaced(fqn)
|
|
oo, err := p.getFactory().List(p.gvr, ns, wait, labels.Everything())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
refs := make(Refs, 0, len(oo))
|
|
for _, o := range oo {
|
|
var pod v1.Pod
|
|
err = runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &pod)
|
|
if err != nil {
|
|
return nil, errors.New("expecting Deployment resource")
|
|
}
|
|
// Just pick controller less pods...
|
|
if len(pod.OwnerReferences) > 0 {
|
|
continue
|
|
}
|
|
if serviceAccountMatches(pod.Spec.ServiceAccountName, n) {
|
|
refs = append(refs, Ref{
|
|
GVR: p.GVR(),
|
|
FQN: client.FQN(pod.Namespace, pod.Name),
|
|
})
|
|
}
|
|
}
|
|
|
|
return refs, nil
|
|
}
|
|
|
|
// Scan scans for cluster resource refs.
|
|
func (p *Pod) Scan(_ context.Context, gvr *client.GVR, fqn string, wait bool) (Refs, error) {
|
|
ns, n := client.Namespaced(fqn)
|
|
oo, err := p.getFactory().List(p.gvr, ns, wait, labels.Everything())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
refs := make(Refs, 0, len(oo))
|
|
for _, o := range oo {
|
|
var pod v1.Pod
|
|
err = runtime.DefaultUnstructuredConverter.FromUnstructured(o.(*unstructured.Unstructured).Object, &pod)
|
|
if err != nil {
|
|
return nil, errors.New("expecting Pod resource")
|
|
}
|
|
// Just pick controller less pods...
|
|
if len(pod.OwnerReferences) > 0 {
|
|
continue
|
|
}
|
|
switch gvr {
|
|
case client.CmGVR:
|
|
if !hasConfigMap(&pod.Spec, n) {
|
|
continue
|
|
}
|
|
refs = append(refs, Ref{
|
|
GVR: p.GVR(),
|
|
FQN: client.FQN(pod.Namespace, pod.Name),
|
|
})
|
|
case client.SecGVR:
|
|
found, err := hasSecret(p.Factory, &pod.Spec, pod.Namespace, n, wait)
|
|
if err != nil {
|
|
slog.Warn("Locate secret failed",
|
|
slogs.FQN, fqn,
|
|
slogs.Error, err,
|
|
)
|
|
continue
|
|
}
|
|
if !found {
|
|
continue
|
|
}
|
|
refs = append(refs, Ref{
|
|
GVR: p.GVR(),
|
|
FQN: client.FQN(pod.Namespace, pod.Name),
|
|
})
|
|
case client.PvcGVR:
|
|
if !hasPVC(&pod.Spec, n) {
|
|
continue
|
|
}
|
|
refs = append(refs, Ref{
|
|
GVR: p.GVR(),
|
|
FQN: client.FQN(pod.Namespace, pod.Name),
|
|
})
|
|
case client.PcGVR:
|
|
if !hasPC(&pod.Spec, n) {
|
|
continue
|
|
}
|
|
refs = append(refs, Ref{
|
|
GVR: p.GVR(),
|
|
FQN: client.FQN(pod.Namespace, pod.Name),
|
|
})
|
|
}
|
|
}
|
|
|
|
return refs, nil
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
// Helpers...
|
|
|
|
func tailLogs(ctx context.Context, logger Logger, opts *LogOptions) LogChan {
|
|
out := make(LogChan, logChannelBuffer)
|
|
var wg sync.WaitGroup
|
|
|
|
wg.Add(1)
|
|
go func() {
|
|
defer wg.Done()
|
|
podOpts := opts.ToPodLogOptions()
|
|
|
|
// Setup exponential backoff following project pattern
|
|
bf := backoff.NewExponentialBackOff()
|
|
bf.InitialInterval = logBackoffInitial
|
|
bf.MaxElapsedTime = 0
|
|
bf.MaxInterval = logBackoffMax / 2
|
|
backoffCtx := backoff.WithContext(bf, ctx)
|
|
delay := logBackoffInitial
|
|
|
|
for range logRetryCount {
|
|
req, err := logger.Logs(opts.Path, podOpts)
|
|
if err != nil {
|
|
slog.Error("Log request failed",
|
|
slogs.Container, opts.Info(),
|
|
slogs.Error, err,
|
|
)
|
|
// Check if we should stop retrying based on pod status
|
|
if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
|
|
slog.Debug("Stopping log retry - pod is terminating or deleted",
|
|
slogs.Container, opts.Info(),
|
|
)
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(delay):
|
|
if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
|
|
return
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
stream, e := req.Stream(ctx)
|
|
if e != nil {
|
|
slog.Error("Stream logs failed",
|
|
slogs.Error, e,
|
|
slogs.Container, opts.Info(),
|
|
)
|
|
// Check if we should stop retrying based on pod status
|
|
if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
|
|
slog.Debug("Stopping log retry - pod is terminating or deleted",
|
|
slogs.Container, opts.Info(),
|
|
)
|
|
return
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(delay):
|
|
if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
|
|
return
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Process logs until completion
|
|
result := readLogs(ctx, stream, out, opts)
|
|
switch result {
|
|
case streamEOF:
|
|
slog.Debug("Log stream ended cleanly",
|
|
slogs.Container, opts.Info(),
|
|
)
|
|
return
|
|
case streamError:
|
|
// Check if we should stop retrying based on pod status
|
|
if pod, ok := logger.(*Pod); ok && pod.shouldStopRetrying(opts.Path) {
|
|
slog.Debug("Stopping log retry after stream error - pod is terminating or deleted",
|
|
slogs.Container, opts.Info(),
|
|
)
|
|
return
|
|
}
|
|
slog.Debug("Log stream error, retrying",
|
|
slogs.Container, opts.Info(),
|
|
)
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(delay):
|
|
if delay = backoffCtx.NextBackOff(); delay == backoff.Stop {
|
|
return
|
|
}
|
|
}
|
|
continue
|
|
case streamCanceled:
|
|
return
|
|
}
|
|
|
|
// Reset backoff and delay on successful connection
|
|
bf.Reset()
|
|
delay = logBackoffInitial
|
|
}
|
|
|
|
// Out of retries
|
|
out <- opts.ToErrLogItem(fmt.Errorf("failed to maintain log stream after %d retries", logRetryCount))
|
|
}()
|
|
|
|
go func() {
|
|
wg.Wait()
|
|
close(out)
|
|
}()
|
|
|
|
return out
|
|
}
|
|
|
|
func readLogs(ctx context.Context, stream io.ReadCloser, out chan<- *LogItem, opts *LogOptions) streamResult {
|
|
defer func() {
|
|
if err := stream.Close(); err != nil && !errors.Is(err, io.ErrClosedPipe) {
|
|
slog.Error("Failed to close stream",
|
|
slogs.Container, opts.Info(),
|
|
slogs.Error, err,
|
|
)
|
|
}
|
|
}()
|
|
|
|
r := bufio.NewReader(stream)
|
|
|
|
for {
|
|
bytes, err := r.ReadBytes('\n')
|
|
if err == nil {
|
|
item := opts.ToLogItem(tview.EscapeBytes(bytes))
|
|
select {
|
|
case <-ctx.Done():
|
|
return streamCanceled
|
|
case out <- item:
|
|
default:
|
|
// Avoid deadlock if consumer is too slow
|
|
slog.Warn("Dropping log line due to slow consumer",
|
|
slogs.Container, opts.Info(),
|
|
)
|
|
}
|
|
continue
|
|
}
|
|
|
|
if errors.Is(err, io.EOF) {
|
|
if len(bytes) > 0 {
|
|
// Emit trailing partial line before EOF
|
|
out <- opts.ToLogItem(tview.EscapeBytes(bytes))
|
|
}
|
|
slog.Debug("Log reader reached EOF", slogs.Container, opts.Info())
|
|
out <- opts.ToErrLogItem(fmt.Errorf("stream closed: %w for %s", err, opts.Info()))
|
|
return streamEOF
|
|
}
|
|
|
|
// Non-EOF error
|
|
slog.Debug("Log stream error, will retry connection",
|
|
slogs.Container, opts.Info(),
|
|
slogs.Error, fmt.Errorf("stream error: %w for %s", err, opts.Info()),
|
|
)
|
|
// Don't send stream errors to user - they will be retried
|
|
// Only final retry exhaustion message is shown
|
|
return streamError
|
|
}
|
|
}
|
|
|
|
// MetaFQN returns a fully qualified resource name.
|
|
func MetaFQN(m *metav1.ObjectMeta) string {
|
|
if m.Namespace == "" {
|
|
return m.Name
|
|
}
|
|
|
|
return FQN(m.Namespace, m.Name)
|
|
}
|
|
|
|
// GetPodSpec returns a pod spec given a resource.
|
|
func (p *Pod) GetPodSpec(path string) (*v1.PodSpec, error) {
|
|
pod, err := p.GetInstance(path)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
podSpec := pod.Spec
|
|
|
|
return &podSpec, nil
|
|
}
|
|
|
|
// SetImages sets container images.
|
|
func (p *Pod) SetImages(ctx context.Context, path string, imageSpecs ImageSpecs) error {
|
|
ns, n := client.Namespaced(path)
|
|
auth, err := p.Client().CanI(ns, p.gvr, n, client.PatchAccess)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !auth {
|
|
return fmt.Errorf("user is not authorized to patch a deployment")
|
|
}
|
|
manager, isManaged, err := p.isControlled(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isManaged {
|
|
return fmt.Errorf("unable to set image. This pod is managed by %s. Please set the image on the controller", manager)
|
|
}
|
|
jsonPatch, err := GetJsonPatch(imageSpecs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
dial, err := p.Client().Dial()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = dial.CoreV1().Pods(ns).Patch(
|
|
ctx,
|
|
n,
|
|
types.StrategicMergePatchType,
|
|
jsonPatch,
|
|
metav1.PatchOptions{},
|
|
)
|
|
|
|
return err
|
|
}
|
|
|
|
func (p *Pod) isControlled(path string) (fqn string, ok bool, err error) {
|
|
pod, err := p.GetInstance(path)
|
|
if err != nil {
|
|
return "", false, err
|
|
}
|
|
references := pod.GetObjectMeta().GetOwnerReferences()
|
|
if len(references) > 0 {
|
|
return fmt.Sprintf("%s/%s", references[0].Kind, references[0].Name), true, nil
|
|
}
|
|
|
|
return "", false, nil
|
|
}
|
|
|
|
var toastPhases = sets.New(
|
|
render.PhaseCompleted,
|
|
render.PhasePending,
|
|
render.PhaseCrashLoop,
|
|
render.PhaseError,
|
|
render.PhaseImagePullBackOff,
|
|
render.PhaseContainerStatusUnknown,
|
|
render.PhaseEvicted,
|
|
render.PhaseOOMKilled,
|
|
)
|
|
|
|
func (p *Pod) Sanitize(ctx context.Context, ns string) (int, error) {
|
|
oo, err := p.Resource.List(ctx, ns)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
var count int
|
|
for _, o := range oo {
|
|
u, ok := o.(*unstructured.Unstructured)
|
|
if !ok {
|
|
continue
|
|
}
|
|
var pod v1.Pod
|
|
err = runtime.DefaultUnstructuredConverter.FromUnstructured(u.Object, &pod)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if toastPhases.Has(render.PodStatus(&pod)) {
|
|
// !!BOZO!! Might need to bump timeout otherwise rev limit if too many??
|
|
fqn := client.FQN(pod.Namespace, pod.Name)
|
|
slog.Debug("Sanitizing resource", slogs.FQN, fqn)
|
|
if err := p.Delete(ctx, fqn, nil, 0); err != nil {
|
|
slog.Debug("Aborted! Sanitizer delete failed",
|
|
slogs.FQN, fqn,
|
|
slogs.Count, count,
|
|
slogs.Error, err,
|
|
)
|
|
return count, err
|
|
}
|
|
count++
|
|
}
|
|
}
|
|
slog.Debug("Sanitizer deleted pods", slogs.Count, count)
|
|
|
|
return count, nil
|
|
}
|