From cacfd1ef10cc1349dcace3f0e798c56b64a104e5 Mon Sep 17 00:00:00 2001 From: derailed Date: Sun, 5 May 2019 12:15:45 -0600 Subject: [PATCH] Perf pass and tuning --- internal/resource/cluster_test.go | 5 + internal/resource/container.go | 42 +- internal/resource/helpers.go | 46 +- internal/resource/helpers_test.go | 18 + internal/resource/list.go | 9 +- internal/resource/no.go | 113 ++--- internal/resource/no_int_test.go | 35 +- internal/resource/pod.go | 16 +- internal/resource/pod_test.go | 21 +- internal/resource/ro_binding_int_test.go | 2 +- internal/views/cluster_info.go | 4 +- internal/views/context.go | 8 +- internal/views/helpers.go | 3 +- internal/views/helpers_test.go | 7 +- internal/views/logs.go | 2 +- internal/views/rbac_test.go | 4 +- internal/views/resource.go | 8 +- internal/views/utils.go | 1 - internal/watch/container.go | 318 ++---------- internal/watch/container_test.go | 30 ++ internal/watch/helper_test.go | 81 +++ internal/watch/helpers.go | 208 +------- internal/watch/meta.go | 45 +- internal/watch/meta_test.go | 52 ++ internal/watch/mock_connection_test.go | 613 +++++++++++++++++++++++ internal/watch/no.go | 261 +--------- internal/watch/no_mx.go | 124 ++--- internal/watch/no_mx_test.go | 32 ++ internal/watch/no_test.go | 55 +- internal/watch/pod.go | 316 +----------- internal/watch/pod_mx.go | 125 ++--- internal/watch/pod_mx_test.go | 73 +-- internal/watch/pod_test.go | 139 +++-- 33 files changed, 1281 insertions(+), 1535 deletions(-) create mode 100644 internal/watch/container_test.go create mode 100644 internal/watch/helper_test.go create mode 100644 internal/watch/meta_test.go create mode 100644 internal/watch/mock_connection_test.go create mode 100644 internal/watch/no_mx_test.go diff --git a/internal/resource/cluster_test.go b/internal/resource/cluster_test.go index 4aecd03f..c4b89ce6 100644 --- a/internal/resource/cluster_test.go +++ b/internal/resource/cluster_test.go @@ -7,9 +7,14 @@ import ( "github.com/derailed/k9s/internal/k8s" "github.com/derailed/k9s/internal/resource" m "github.com/petergtz/pegomock" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" ) +func init() { + zerolog.SetGlobalLevel(zerolog.Disabled) +} + func TestClusterVersion(t *testing.T) { mm, mx := NewMockClusterMeta(), NewMockMetricsServer() m.When(mm.Version()).ThenReturn("1.2.3", nil) diff --git a/internal/resource/container.go b/internal/resource/container.go index 81b14de3..00b1b22b 100644 --- a/internal/resource/container.go +++ b/internal/resource/container.go @@ -20,7 +20,7 @@ type ( pod *v1.Pod isInit bool - instance *v1.Container + instance v1.Container MetricsServer MetricsServer metrics *mv1beta1.PodMetrics mx sync.RWMutex @@ -43,7 +43,6 @@ func NewContainer(c Connection, mx MetricsServer, pod *v1.Pod) *Container { Base: &Base{Connection: c, Resource: k8s.NewPod(c)}, pod: pod, MetricsServer: mx, - metrics: &mv1beta1.PodMetrics{}, } co.Factory = &co @@ -53,7 +52,7 @@ func NewContainer(c Connection, mx MetricsServer, pod *v1.Pod) *Container { // New builds a new Container instance from a k8s resource. func (r *Container) New(i interface{}) Columnar { co := NewContainer(r.Connection, r.MetricsServer, r.pod) - co.instance = i.(*v1.Container) + co.instance = i.(v1.Container) co.path = r.namespacedName(r.pod.ObjectMeta) + ":" + co.instance.Name return co @@ -108,7 +107,7 @@ func (r *Container) Logs(c chan<- string, ns, n, co string, lines int64, prev bo go func() { defer func() { - log.Debug().Msg("!!!Closing Stream!!!") + log.Debug().Msgf("Closing stream %s:%s", n, co) close(c) stream.Close() cancel() @@ -171,9 +170,12 @@ func (r *Container) Fields(ns string) Row { ff := make(Row, 0, len(r.Header(ns))) i := r.instance - var cpu int64 - var mem float64 + scpu, smem := NAValue, NAValue if r.metrics != nil { + var ( + cpu int64 + mem float64 + ) for _, co := range r.metrics.Containers { if co.Name == i.Name { cpu = co.Usage.Cpu().MilliValue() @@ -181,6 +183,7 @@ func (r *Container) Fields(ns string) Row { break } } + scpu, smem = ToMillicore(cpu), ToMi(mem) } rcpu, rmem := resources(i) @@ -214,8 +217,8 @@ func (r *Container) Fields(ns string) Row { restarts, probe(i.LivenessProbe), probe(i.ReadinessProbe), - ToMillicore(cpu), - ToMi(mem), + scpu, + smem, rcpu, rmem, toAge(r.pod.CreationTimestamp), @@ -251,30 +254,25 @@ func toRes(r v1.ResourceList) (string, string) { return ToMillicore(cpu.MilliValue()), ToMi(k8s.ToMB(mem.Value())) } -func resources(c *v1.Container) (cpu, mem string) { +func resources(c v1.Container) (cpu, mem string) { req, lim := c.Resources.Requests, c.Resources.Limits - - if len(req) == 0 { - if len(lim) != 0 { - return toRes(lim) - } - } else { + if len(req) != 0 { return toRes(req) } + if len(lim) != 0 { + return toRes(lim) + } - return "0", "0" + return NAValue, NAValue } func probe(p *v1.Probe) string { if p == nil { - return "no" + return "on" } - - return "yes" + return "off" } func asMi(v int64) float64 { - const megaByte = 1024 * 1024 - - return float64(v) / megaByte + return float64(v) / 1024 * 1024 } diff --git a/internal/resource/helpers.go b/internal/resource/helpers.go index 057270fe..5a3327ed 100644 --- a/internal/resource/helpers.go +++ b/internal/resource/helpers.go @@ -32,9 +32,53 @@ const ( // MissingValue indicates an unset value. MissingValue = "" // NAValue indicates a value that does not pertain. - NAValue = "" + NAValue = "n/a" ) +func empty(s []string) bool { + for _, v := range s { + if len(v) != 0 { + return false + } + } + return true +} + +// Join a slice of strings, skipping blanks. +func join(a []string, sep string) string { + switch len(a) { + case 0: + return "" + case 1: + return a[0] + } + + var b []string + for _, s := range a { + if s != "" { + b = append(b, s) + } + } + if len(b) == 0 { + return "" + } + + n := len(sep) * (len(b) - 1) + for i := 0; i < len(b); i++ { + n += len(a[i]) + } + + var buff strings.Builder + buff.Grow(n) + buff.WriteString(a[0]) + for _, s := range b[1:] { + buff.WriteString(sep) + buff.WriteString(s) + } + + return buff.String() +} + // AsPerc prints a number as a percentage. func AsPerc(f float64) string { return strconv.Itoa(int(f)) + "%" diff --git a/internal/resource/helpers_test.go b/internal/resource/helpers_test.go index bd0a9e49..204f7b08 100644 --- a/internal/resource/helpers_test.go +++ b/internal/resource/helpers_test.go @@ -6,6 +6,24 @@ import ( "github.com/stretchr/testify/assert" ) +func TestJoin(t *testing.T) { + uu := map[string]struct { + i []string + e string + }{ + "zero": {[]string{}, ""}, + "std": {[]string{"a", "b", "c"}, "a,b,c"}, + "blank": {[]string{"", "", ""}, ""}, + "sparse": {[]string{"a", "", "c"}, "a,c"}, + } + + for k, v := range uu { + t.Run(k, func(t *testing.T) { + assert.Equal(t, v.e, join(v.i, ",")) + }) + } +} + func TestBoolPtrToStr(t *testing.T) { tv, fv := true, false diff --git a/internal/resource/list.go b/internal/resource/list.go index ee7a588d..8f0276da 100644 --- a/internal/resource/list.go +++ b/internal/resource/list.go @@ -257,7 +257,7 @@ func (l *list) Reconcile(m *wa.Meta, path *string) error { res = l.resource.New(r) nmx, err := m.Get(wa.NodeMXIndex, fqn) if err != nil { - log.Warn().Err(err).Msg("No node metrics") + log.Warn().Err(err).Msg("NodeMetrics") } if mx, ok := nmx.(*mv1beta1.NodeMetrics); ok { res.SetNodeMetrics(mx) @@ -267,18 +267,17 @@ func (l *list) Reconcile(m *wa.Meta, path *string) error { res = l.resource.New(r) pmx, err := m.Get(wa.PodMXIndex, fqn) if err != nil { - log.Warn().Err(err).Msg("No pod metrics") + log.Warn().Err(err).Msg("PodMetrics") } if mx, ok := pmx.(*mv1beta1.PodMetrics); ok { res.SetPodMetrics(mx) } - case *v1.Container: - log.Debug().Msgf("Got container %s", ns) + case v1.Container: fqn = ns res = l.resource.New(r) pmx, err := m.Get(wa.PodMXIndex, fqn) if err != nil { - log.Warn().Err(err).Msg("No pod metrics") + log.Warn().Err(err).Msg("PodMetrics") } if mx, ok := pmx.(*mv1beta1.PodMetrics); ok { res.SetPodMetrics(mx) diff --git a/internal/resource/no.go b/internal/resource/no.go index d5fdd6c2..cd6a8e1b 100644 --- a/internal/resource/no.go +++ b/internal/resource/no.go @@ -111,8 +111,6 @@ func (*Node) Header(ns string) Row { "EXTERNAL-IP", "CPU", "MEM", - // "RCPU", - // "RMEM", "ACPU", "AMEM", "AGE", @@ -123,83 +121,75 @@ func (*Node) Header(ns string) Row { func (r *Node) Fields(ns string) Row { ff := make(Row, 0, len(r.Header(ns))) - i := r.instance - iIP, eIP := r.getIPs(i.Status.Addresses) + no := r.instance + iIP, eIP := r.getIPs(no.Status.Addresses) iIP, eIP = missing(iIP), missing(eIP) - var ( - cpu int64 - mem float64 - ) + ccpu, cmem, scpu, smem := NAValue, NAValue, NAValue, NAValue if r.metrics != nil { + var ( + cpu int64 + mem float64 + ) + cpu = r.metrics.Usage.Cpu().MilliValue() mem = k8s.ToMB(r.metrics.Usage.Memory().Value()) + + acpu := no.Status.Allocatable.Cpu().MilliValue() + amem := k8s.ToMB(no.Status.Allocatable.Memory().Value()) + ccpu = withPerc(ToMillicore(cpu), AsPerc(toPerc(float64(cpu), float64(acpu)))) + cmem = withPerc(ToMi(mem), AsPerc(toPerc(mem, amem))) + scpu = ToMillicore(cpu) + smem = ToMi(mem) } - acpu := i.Status.Allocatable.Cpu().MilliValue() - amem := k8s.ToMB(i.Status.Allocatable.Memory().Value()) - - // reqs, _, err := r.podsResources(i.Name) - // if err != nil { - // if !errors.IsForbidden(err) { - // log.Warn().Msgf("User is not authorized to list pods on nodes: %v", err) - // } - // log.Error().Msgf("%#v", err) - // } - - // rcpu, rmem := reqs["cpu"], reqs["memory"] - // pcpur := toPerc(float64(rcpu.MilliValue()), float64(r.metrics.AvailCPU)) - // pmemr := toPerc(k8s.ToMB(rmem.Value()), float64(r.metrics.AvailMEM)) + sta := make([]string, 10) + r.status(no.Status, no.Spec.Unschedulable, sta) + ro := make([]string, 10) + r.nodeRoles(no, ro) return append(ff, - i.Name, - r.status(i.Status, i.Spec.Unschedulable), - r.nodeRoles(i), - i.Status.NodeInfo.KubeletVersion, - i.Status.NodeInfo.KernelVersion, + no.Name, + join(sta, ","), + join(ro, ","), + no.Status.NodeInfo.KubeletVersion, + no.Status.NodeInfo.KernelVersion, iIP, eIP, - withPerc(ToMillicore(cpu), AsPerc(toPerc(float64(cpu), float64(acpu)))), - withPerc(ToMi(mem), AsPerc(toPerc(mem, amem))), - // withPerc(rcpu.String(), AsPerc(pcpur)), - // withPerc(rmem.String(), AsPerc(pmemr)), - ToMillicore(cpu), - ToMi(mem), - toAge(i.ObjectMeta.CreationTimestamp), + ccpu, + cmem, + scpu, + smem, + toAge(no.ObjectMeta.CreationTimestamp), ) } -func withPerc(v, p string) string { - return v + " (" + p + ")" -} - // ---------------------------------------------------------------------------- // Helpers... -func (*Node) nodeRoles(node *v1.Node) string { - const ( - labelNodeRolePrefix = "node-role.kubernetes.io/" - nodeLabelRole = "kubernetes.io/role" - ) +func withPerc(v, p string) string { + return v + " (" + p + ")" +} - roles := sets.NewString() +func (*Node) nodeRoles(node *v1.Node, res []string) { + index := 0 for k, v := range node.Labels { switch { case strings.HasPrefix(k, labelNodeRolePrefix): if role := strings.TrimPrefix(k, labelNodeRolePrefix); len(role) > 0 { - roles.Insert(role) + res[index] = role + index++ } - case k == nodeLabelRole && v != "": - roles.Insert(v) + res[index] = v + index++ } } - if len(roles) == 0 { - return MissingValue + if empty(res) { + res[index] = MissingValue + index++ } - - return strings.Join(roles.List(), ",") } func (*Node) getIPs(addrs []v1.NodeAddress) (iIP, eIP string) { @@ -215,14 +205,14 @@ func (*Node) getIPs(addrs []v1.NodeAddress) (iIP, eIP string) { return } -func (*Node) status(status v1.NodeStatus, exempt bool) string { +func (*Node) status(status v1.NodeStatus, exempt bool, res []string) { + var index int conditions := make(map[v1.NodeConditionType]*v1.NodeCondition) for n := range status.Conditions { cond := status.Conditions[n] conditions[cond.Type] = &cond } - var conds []string validConditions := []v1.NodeConditionType{v1.NodeReady} for _, validCondition := range validConditions { condition, ok := conditions[validCondition] @@ -233,22 +223,23 @@ func (*Node) status(status v1.NodeStatus, exempt bool) string { if condition.Status != v1.ConditionTrue { neg = "Not" } - conds = append(conds, neg+string(condition.Type)) + res[index] = neg + string(condition.Type) + index++ } - if len(conds) == 0 { - conds = append(conds, "Unknown") + if len(res) == 0 { + res[index] = "Unknown" + index++ } if exempt { - conds = append(conds, "SchedulingDisabled") + res[index] = "SchedulingDisabled" + index++ } - - return strings.Join(conds, ",") } -func findNodeRoles(i *v1.Node) []string { +func findNodeRoles(no *v1.Node) []string { roles := sets.NewString() - for k, v := range i.Labels { + for k, v := range no.Labels { switch { case strings.HasPrefix(k, labelNodeRolePrefix): if role := strings.TrimPrefix(k, labelNodeRolePrefix); len(role) > 0 { diff --git a/internal/resource/no_int_test.go b/internal/resource/no_int_test.go index 7e6b40d3..477998b6 100644 --- a/internal/resource/no_int_test.go +++ b/internal/resource/no_int_test.go @@ -5,6 +5,7 @@ import ( "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func TestNodeStatus(t *testing.T) { @@ -27,7 +28,37 @@ func TestNodeStatus(t *testing.T) { no := NewNode(nil) for _, u := range uu { - cond := no.status(u.s, false) - assert.Equal(t, "Ready", cond) + res := make([]string, 5) + no.status(u.s, false, res) + assert.Equal(t, "Ready", join(res, ",")) + } +} + +func BenchmarkNodeFields(b *testing.B) { + n := NewNode(nil) + no := makeNode() + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + _ = n.New(no).Fields("") + } +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func makeNode() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fred", + CreationTimestamp: metav1.Time{Time: testTime()}, + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Address: "1.1.1.1"}, + }, + }, } } diff --git a/internal/resource/pod.go b/internal/resource/pod.go index a0df5001..34fb4f1d 100644 --- a/internal/resource/pod.go +++ b/internal/resource/pod.go @@ -127,7 +127,7 @@ func (r *Pod) Logs(c chan<- string, ns, n, co string, lines int64, prev bool) (c } r.mx.RUnlock() if closes { - log.Debug().Msg(">>Closing Channel<<") + log.Debug().Msgf("Closing channel %s:%s", n, co) close(c) cancel() } @@ -149,7 +149,7 @@ func (r *Pod) Logs(c chan<- string, ns, n, co string, lines int64, prev bool) (c go func() { defer func() { - log.Debug().Msg("!!!Closing Stream!!!") + log.Debug().Msgf("Closing stream %s:%s", n, co) close(c) stream.Close() cancel() @@ -221,13 +221,17 @@ func (r *Pod) Fields(ns string) Row { ss := i.Status.ContainerStatuses cr, _, rc := r.statuses(ss) - var cpu int64 - var mem float64 + scpu, smem := NAValue, NAValue if r.metrics != nil { + var cpu int64 + var mem float64 + for _, c := range r.metrics.Containers { cpu += c.Usage.Cpu().MilliValue() mem += k8s.ToMB(c.Usage.Memory().Value()) } + scpu = ToMillicore(cpu) + smem = ToMi(mem) } return append(ff, @@ -235,8 +239,8 @@ func (r *Pod) Fields(ns string) Row { strconv.Itoa(cr)+"/"+strconv.Itoa(len(ss)), r.phase(i), strconv.Itoa(rc), - ToMillicore(cpu), - ToMi(mem), + scpu, + smem, i.Status.PodIP, i.Spec.NodeName, r.mapQOS(i.Status.QOSClass), diff --git a/internal/resource/pod_test.go b/internal/resource/pod_test.go index 45b72ecd..25c90fb0 100644 --- a/internal/resource/pod_test.go +++ b/internal/resource/pod_test.go @@ -47,7 +47,7 @@ func TestPodFields(t *testing.T) { func TestPodMarshal(t *testing.T) { mc := NewMockConnection() mr := NewMockCruder() - m.When(mr.Get("blee", "fred")).ThenReturn(k8sPod(), nil) + m.When(mr.Get("blee", "fred")).ThenReturn(makePod(), nil) mx := NewMockMetricsServer() cm := NewPodWithArgs(mc, mr, mx) @@ -61,7 +61,7 @@ func TestPodMarshal(t *testing.T) { func TestPodListData(t *testing.T) { mc := NewMockConnection() mr := NewMockCruder() - m.When(mr.List("blee")).ThenReturn(k8s.Collection{*k8sPod()}, nil) + m.When(mr.List("blee")).ThenReturn(k8s.Collection{*makePod()}, nil) mx := NewMockMetricsServer() m.When(mx.HasMetrics()).ThenReturn(true) m.When(mx.FetchPodsMetrics("blee")). @@ -86,9 +86,22 @@ func TestPodListData(t *testing.T) { assert.Equal(t, "fred", strings.TrimSpace(row.Fields[:1][0])) } +func BenchmarkPodFields(b *testing.B) { + p := resource.NewPod(nil) + po := makePod() + + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + _ = p.New(po).Fields("") + } +} + +// ---------------------------------------------------------------------------- // Helpers... -func k8sPod() *v1.Pod { +func makePod() *v1.Pod { var i int32 = 1 var t = v1.HostPathDirectory return &v1.Pod{ @@ -143,7 +156,7 @@ func k8sPod() *v1.Pod { func newPod() resource.Columnar { mc := NewMockConnection() - return resource.NewPod(mc).New(k8sPod()) + return resource.NewPod(mc).New(makePod()) } func poYaml() string { diff --git a/internal/resource/ro_binding_int_test.go b/internal/resource/ro_binding_int_test.go index d3ef208d..e1a47e74 100644 --- a/internal/resource/ro_binding_int_test.go +++ b/internal/resource/ro_binding_int_test.go @@ -37,7 +37,7 @@ func TestRenderSubjects(t *testing.T) { }, { []rbacv1.Subject{}, - "", + NAValue, "", }, } diff --git a/internal/views/cluster_info.go b/internal/views/cluster_info.go index 2a13eb95..1ae3c9e3 100644 --- a/internal/views/cluster_info.go +++ b/internal/views/cluster_info.go @@ -65,9 +65,9 @@ func (v *clusterInfoView) init() { row++ v.SetCell(row, 0, v.sectionCell("CPU")) - v.SetCell(row, 1, v.infoCell("n/a")) + v.SetCell(row, 1, v.infoCell(resource.NAValue)) v.SetCell(row+1, 0, v.sectionCell("MEM")) - v.SetCell(row+1, 1, v.infoCell("n/a")) + v.SetCell(row+1, 1, v.infoCell(resource.NAValue)) v.refresh() } diff --git a/internal/views/context.go b/internal/views/context.go index 71d23888..feed63e7 100644 --- a/internal/views/context.go +++ b/internal/views/context.go @@ -57,10 +57,10 @@ func (v *contextView) useContext(name string) error { } v.app.startInformer() - // Update cluster info on context switch. - v.app.QueueUpdateDraw(func() { - v.app.clusterInfoView.refresh() - }) + // // Update cluster info on context switch. + // v.app.QueueUpdateDraw(func() { + // v.app.clusterInfoView.refresh() + // }) v.app.config.Reset() v.app.config.Save() diff --git a/internal/views/helpers.go b/internal/views/helpers.go index 85bc8661..5cc914fd 100644 --- a/internal/views/helpers.go +++ b/internal/views/helpers.go @@ -6,12 +6,13 @@ import ( "strings" "time" + res "github.com/derailed/k9s/internal/resource" "k8s.io/apimachinery/pkg/api/resource" ) func deltas(o, n string) string { o, n = strings.TrimSpace(o), strings.TrimSpace(n) - if o == "" || o == "n/a" { + if o == "" || o == res.NAValue { return "" } diff --git a/internal/views/helpers_test.go b/internal/views/helpers_test.go index c92cc737..d9c924eb 100644 --- a/internal/views/helpers_test.go +++ b/internal/views/helpers_test.go @@ -4,16 +4,21 @@ import ( "testing" "github.com/derailed/k9s/internal/resource" + "github.com/rs/zerolog" "github.com/stretchr/testify/assert" ) +func init() { + zerolog.SetGlobalLevel(zerolog.Disabled) +} + func TestDeltas(t *testing.T) { uu := []struct { s1, s2, e string }{ {"", "", ""}, {resource.MissingValue, "", delta()}, - {resource.NAValue, "", delta()}, + {resource.NAValue, "", ""}, {"fred", "fred", ""}, {"fred", "blee", delta()}, {"1", "1", ""}, diff --git a/internal/views/logs.go b/internal/views/logs.go index fca5c061..7070ebbc 100644 --- a/internal/views/logs.go +++ b/internal/views/logs.go @@ -17,7 +17,7 @@ const ( refreshRate = 200 * time.Millisecond maxCleanse = 100 logBuffSize = 100 - flushTimeout = 500 * time.Millisecond + flushTimeout = 200 * time.Millisecond ) type logsView struct { diff --git a/internal/views/rbac_test.go b/internal/views/rbac_test.go index d6550204..bc9728ed 100644 --- a/internal/views/rbac_test.go +++ b/internal/views/rbac_test.go @@ -92,7 +92,7 @@ func TestParseRules(t *testing.T) { {APIGroups: []string{}, Resources: []string{}, Verbs: []string{"get"}, NonResourceURLs: []string{"/fred"}}, }, map[string]resource.Row{ - "/fred": {"/fred", "", ok, nok, nok, nok, nok, nok, nok, nok, ""}, + "/fred": {"/fred", resource.NAValue, ok, nok, nok, nok, nok, nok, nok, nok, ""}, }, }, { @@ -100,7 +100,7 @@ func TestParseRules(t *testing.T) { {APIGroups: []string{}, Resources: []string{}, Verbs: []string{"get"}, NonResourceURLs: []string{"fred"}}, }, map[string]resource.Row{ - "/fred": {"/fred", "", ok, nok, nok, nok, nok, nok, nok, nok, ""}, + "/fred": {"/fred", resource.NAValue, ok, nok, nok, nok, nok, nok, nok, nok, ""}, }, }, } diff --git a/internal/views/resource.go b/internal/views/resource.go index 0a7c8a93..756cd6aa 100644 --- a/internal/views/resource.go +++ b/internal/views/resource.go @@ -17,7 +17,10 @@ import ( "github.com/rs/zerolog/log" ) -const noSelection = "" +const ( + noSelection = "" + clusterRefresh = time.Duration(15 * time.Second) +) type ( details interface { @@ -99,6 +102,7 @@ func (v *resourceView) init(ctx context.Context, ns string) { } go v.updater(ctx) + v.app.clusterInfoView.refresh() v.refresh() if tv, ok := v.CurrentPage().Item.(*tableView); ok { tv.Select(1, 0) @@ -113,7 +117,7 @@ func (v *resourceView) updater(ctx context.Context) { case <-ctx.Done(): log.Debug().Msgf("%s cluster updater canceled!", v.list.GetName()) return - case <-time.After(time.Duration(15 * time.Second)): + case <-time.After(clusterRefresh): if v.isSuspended() { continue } diff --git a/internal/views/utils.go b/internal/views/utils.go index 57f0db06..a10a9225 100644 --- a/internal/views/utils.go +++ b/internal/views/utils.go @@ -66,7 +66,6 @@ func (b *logBuffer) length() int { func (*logBuffer) decorateLine(l string) string { return l - // return "[" + newLogColor + "::]" + l + "[::]" } func (b *logBuffer) trimLine(l string) string { diff --git a/internal/watch/container.go b/internal/watch/container.go index b78a0c78..21dc1e9c 100644 --- a/internal/watch/container.go +++ b/internal/watch/container.go @@ -2,12 +2,10 @@ package watch import ( "fmt" - "strconv" "github.com/derailed/k9s/internal/k8s" "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/watch" ) const ( @@ -18,43 +16,22 @@ const ( // Container tracks container activities. type Container struct { - CallbackInformer - - data RowEvents - mxData k8s.PodMetrics - listener TableListenerFn - activeFQN *string + StoreInformer } // NewContainer returns a new container. -func NewContainer(po CallbackInformer) *Container { - co := Container{ - CallbackInformer: po, - data: RowEvents{}, - } - po.AddEventHandler(&co) - - return &co +func NewContainer(po StoreInformer) *Container { + return &Container{StoreInformer: po} } +// StartWatching registers container event listener. +func (c *Container) StartWatching(stopCh <-chan struct{}) {} + // Run starts out the informer loop. func (c *Container) Run(closeCh <-chan struct{}) {} // Get retrieves a given container from store. func (c *Container) Get(fqn string) (interface{}, error) { - o, ok, err := c.GetStore().GetByKey(fqn) - if err != nil { - return nil, err - } - if !ok { - return nil, fmt.Errorf("Container %s not found", fqn) - } - - return o, nil -} - -// List retrieves a given containers from store. -func (c *Container) List(fqn string) (k8s.Collection, error) { o, ok, err := c.GetStore().GetByKey(fqn) if err != nil { return nil, err @@ -62,279 +39,42 @@ func (c *Container) List(fqn string) (k8s.Collection, error) { if !ok { return nil, fmt.Errorf("Pod %s not found", fqn) } - po := o.(*v1.Pod) - var cc k8s.Collection - for i := 0; i < len(po.Spec.InitContainers); i++ { - cc = append(cc, &po.Spec.InitContainers[i]) - } - for i := 0; i < len(po.Spec.Containers); i++ { - cc = append(cc, &po.Spec.Containers[i]) - } + cc := make(k8s.Collection, len(po.Spec.InitContainers)+len(po.Spec.Containers)) + toContainers(po, cc) return cc, nil } -// SetListener registers event recipient. -func (c *Container) SetListener(fqn string, cb TableListenerFn) { - c.listener, c.activeFQN = cb, &fqn - o, err := c.Get(fqn) +// List retrieves a given containers from store. +func (c *Container) List(fqn string) k8s.Collection { + o, ok, err := c.GetStore().GetByKey(fqn) if err != nil { - log.Error().Err(err).Msgf("Pod `%q not found", fqn) - return + log.Error().Err(err).Msg("Pod") + return nil } - // Clear out all rows - for k := range c.data { - delete(c.data, k) + if !ok { + log.Error().Err(fmt.Errorf("Pod %s not found", fqn)).Msg("Pod") + return nil } - c.updateData(watch.Added, o.(*v1.Pod)) - c.fireChanged() -} + po := o.(*v1.Pod) + cc := make(k8s.Collection, len(po.Spec.InitContainers)+len(po.Spec.Containers)) + toContainers(po, cc) -// UnsetListener unregister event recipient. -func (c *Container) UnsetListener(_ string) { - c.listener, c.activeFQN = nil, nil -} - -// Data return current data. -func (c *Container) tableData(ns string) TableData { - return TableData{ - Header: c.header(), - Rows: c.data, - Namespace: ns, - } -} - -func (c *Container) fireChanged() { - if cb := c.listener; cb != nil { - cb(c.tableData(NotNamespaced)) - } -} - -// StartWatching registers container event listener. -func (c *Container) StartWatching(stopCh <-chan struct{}) {} - -// OnAdd notify container added. -func (c *Container) OnAdd(obj interface{}) { - if c.activeFQN == nil { - return - } - - po := obj.(*v1.Pod) - fqn := MetaFQN(po.ObjectMeta) - if fqn != *c.activeFQN { - return - } - - log.Debug().Msgf("Pod Added %s", fqn) - // ff := make(Row, containerCols) - c.updateData(watch.Added, po) - - if c.HasSynced() { - c.fireChanged() - } -} - -// OnUpdate notify container updated. -func (c *Container) OnUpdate(oldObj, newObj interface{}) { - if c.activeFQN == nil { - return - } - - opo, npo := oldObj.(*v1.Pod), newObj.(*v1.Pod) - k1 := MetaFQN(opo.ObjectMeta) - k2 := MetaFQN(npo.ObjectMeta) - - if k1 != *c.activeFQN && k2 != *c.activeFQN { - return - } - - log.Debug().Msgf("Pod Updated %#v - %#v", opo.Name, npo.Name) - - // Check if this is a rollout - if k1 != k2 { - c.updateData(watch.Modified, opo) - } else { - c.updateData(watch.Modified, npo) - } - c.fireChanged() -} - -// OnDelete notify container was deleted. -func (c *Container) OnDelete(obj interface{}) { - if c.activeFQN == nil { - return - } - - po := obj.(*v1.Pod) - fqn := MetaFQN(po.ObjectMeta) - if fqn != *c.activeFQN { - return - } - log.Debug().Msgf("Pod Deleted %s", fqn) - c.data = RowEvents{} - c.fireChanged() -} - -// header return resource header. -func (*Container) header() Row { - var hh Row - return append(hh, - "NAME", - "IMAGE", - "READY", - "STATE", - "RS", - "LPROB", - "RPROB", - "CPU", - "MEM", - "RCPU", - "RMEM", - "AGE", - ) -} - -// Fields retrieves displayable fields. -func (c *Container) fields(pod *v1.Pod, co v1.Container) Row { - ff := make(Row, 0, containerCols) - - // mxs, _ := c.MetricsServer.FetchPodsMetrics(r.pod.Namespace) - - // var cpu, mem string - // for _, mx := range mxs.Items { - // if mx.Name != r.pod.Name { - // continue - // } - // for _, co := range mx.Containers { - // if co.Name != i.Name { - // continue - // } - // cpu, mem = toRes(co.Usage) - // } - // } - - // rcpu, rmem := resources(i) - - var cs *v1.ContainerStatus - for _, cos := range pod.Status.ContainerStatuses { - if cos.Name != co.Name { - continue - } - cs = &cos - } - - if cs == nil { - for _, cos := range pod.Status.InitContainerStatuses { - if cos.Name != co.Name { - continue - } - cs = &cos - } - } - - ready, state, restarts := "false", MissingValue, "0" - if cs != nil { - ready, state, restarts = boolToStr(cs.Ready), toState(cs.State), strconv.Itoa(int(cs.RestartCount)) - } - - cpu, mem, rcpu, rmem := "Z", "Z", "Z", "Z" - - return append(ff, - co.Name, - co.Image, - ready, - state, - restarts, - probe(co.LivenessProbe), - probe(co.ReadinessProbe), - cpu, - mem, - rcpu, - rmem, - toAge(pod.CreationTimestamp), - ) -} - -func (c *Container) updateData(action watch.EventType, po *v1.Pod) { - for _, co := range po.Spec.InitContainers { - ff := c.fields(po, co) - - if re, ok := c.data[co.Name]; ok { - re.Action = action - re.Deltas = re.Fields - re.Fields = ff - } else { - c.data[co.Name] = &RowEvent{ - Action: action, - Deltas: make(Row, containerCols), - Fields: ff, - } - } - } - for _, co := range po.Spec.Containers { - ff := c.fields(po, co) - if re, ok := c.data[co.Name]; ok { - re.Action = action - re.Deltas = re.Fields - re.Fields = ff - } else { - c.data[co.Name] = &RowEvent{ - Action: action, - Deltas: make(Row, containerCols), - Fields: ff, - } - } - } + return cc } // ---------------------------------------------------------------------------- // Helpers... -func toState(s v1.ContainerState) string { - switch { - case s.Waiting != nil: - if s.Waiting.Reason != "" { - return s.Waiting.Reason - } - return "Waiting" - - case s.Terminated != nil: - if s.Terminated.Reason != "" { - return s.Terminated.Reason - } - return "Terminated" - case s.Running != nil: - return "Running" - default: - return MissingValue +func toContainers(po *v1.Pod, c k8s.Collection) { + var index int + for _, co := range po.Spec.InitContainers { + c[index] = co + index++ + } + for _, co := range po.Spec.Containers { + c[index] = co + index++ } } - -func toRes(r v1.ResourceList) (string, string) { - cpu, mem := r[v1.ResourceCPU], r[v1.ResourceMemory] - - return ToMillicore(cpu.MilliValue()), ToMi(k8s.ToMB(mem.Value())) -} - -func resources(c v1.Container) (cpu, mem string) { - req, lim := c.Resources.Requests, c.Resources.Limits - - if len(req) == 0 { - if len(lim) != 0 { - return toRes(lim) - } - } else { - return toRes(req) - } - - return "0", "0" -} - -func probe(p *v1.Probe) string { - if p == nil { - return "no" - } - - return "yes" -} diff --git a/internal/watch/container_test.go b/internal/watch/container_test.go new file mode 100644 index 00000000..64e64250 --- /dev/null +++ b/internal/watch/container_test.go @@ -0,0 +1,30 @@ +package watch + +import ( + "testing" + + "gotest.tools/assert" + // "github.com/stretchr/testify/assert" +) + +func TestContainerGet(t *testing.T) { + cmo := NewMockConnection() + + c := NewContainer(NewPod(cmo, "")) + + o, err := c.Get("fred") + assert.ErrorContains(t, err, "not found") + assert.Assert(t, o == nil) +} + +func TestContainerList(t *testing.T) { + cmo := NewMockConnection() + + c := NewContainer(NewPod(cmo, "")) + + o := c.List("fred") + assert.Assert(t, o == nil) +} + +// ---------------------------------------------------------------------------- +// Helpers... diff --git a/internal/watch/helper_test.go b/internal/watch/helper_test.go new file mode 100644 index 00000000..0d7c56bd --- /dev/null +++ b/internal/watch/helper_test.go @@ -0,0 +1,81 @@ +package watch + +import ( + "strconv" + "testing" + + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" +) + +func TestMetaFQN(t *testing.T) { + uu := map[string]struct { + m metav1.ObjectMeta + e string + }{ + "full": {metav1.ObjectMeta{Namespace: "fred", Name: "blee"}, "fred/blee"}, + } + + for k, v := range uu { + t.Run(k, func(t *testing.T) { + assert.Equal(t, v.e, MetaFQN(v.m)) + }) + } +} + +func TestMxResourceDiff(t *testing.T) { + uu := map[string]struct { + r1, r2 v1.ResourceList + e bool + }{ + "same": {makeRes("0m", "0Mi"), makeRes("0m", "0Mi"), false}, + "omem": {makeRes("0m", "10Mi"), makeRes("0m", "1Mi"), true}, + "nmem": {makeRes("0m", "0Mi"), makeRes("0m", "1Mi"), true}, + "ocpu": {makeRes("1m", "0Mi"), makeRes("0m", "0Mi"), true}, + "ncpu": {makeRes("1m", "0Mi"), makeRes("2m", "0Mi"), true}, + } + + for k, v := range uu { + t.Run(k, func(t *testing.T) { + assert.Equal(t, v.e, resourceDiff(v.r1, v.r2)) + }) + } +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func makeRes(c, m string) v1.ResourceList { + cpu, _ := resource.ParseQuantity(c) + mem, _ := resource.ParseQuantity(m) + + return v1.ResourceList{ + v1.ResourceCPU: cpu, + v1.ResourceMemory: mem, + } +} + +func makePodMxCo(name, cpu, mem string, co int) *mv1beta1.PodMetrics { + mx := makePodMx(name) + for i := 0; i < co; i++ { + mx.Containers = append( + mx.Containers, + mv1beta1.ContainerMetrics{ + Name: "c" + strconv.Itoa(i), + Usage: makeRes(cpu, mem)}) + } + + return mx +} + +func makePodMx(name string) *mv1beta1.PodMetrics { + return &mv1beta1.PodMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + } +} diff --git a/internal/watch/helpers.go b/internal/watch/helpers.go index 475e0e80..6b567534 100644 --- a/internal/watch/helpers.go +++ b/internal/watch/helpers.go @@ -1,96 +1,19 @@ package watch import ( - "path" - "sort" - "strconv" - "strings" - "time" - - "github.com/derailed/tview" - runewidth "github.com/mattn/go-runewidth" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/duration" - "k8s.io/apimachinery/pkg/watch" ) -const ( - // DefaultNamespace indicator to fetch default namespace. - DefaultNamespace = "default" - // AllNamespace namespace name to span all namespaces. - AllNamespace = "all" - // AllNamespaces indicator to retrieve K8s resource for all namespaces. - AllNamespaces = "" - // NotNamespaced indicator for non namespaced resource. - NotNamespaced = "-" - - // New track new resource events. - New watch.EventType = "NEW" - // Unchanged provides no change events. - Unchanged watch.EventType = "UNCHANGED" - - // MissingValue indicates an unset value. - MissingValue = "" - // NAValue indicates a value that does not pertain. - NAValue = "" - - megaByte = 1024 * 1024 -) - -func asMi(v int64) float64 { - return float64(v) / megaByte -} - -func empty(s []string) bool { - for _, v := range s { - if len(v) != 0 { - return false - } +func resourceDiff(l1, l2 v1.ResourceList) bool { + if l1.Cpu().Cmp(*l2.Cpu()) != 0 { + return true } - return true -} - -// Join a slice of strings, skipping blanks. -func join(a []string, sep string) string { - switch len(a) { - case 0: - return "" - case 1: - return a[0] + if l1.Memory().Cmp(*l2.Memory()) != 0 { + return true } - var b []string - for _, s := range a { - if s != "" { - b = append(b, s) - } - } - if len(b) == 0 { - return "" - } - - n := len(sep) * (len(b) - 1) - for i := 0; i < len(b); i++ { - n += len(a[i]) - } - - var buff strings.Builder - buff.Grow(n) - buff.WriteString(a[0]) - for _, s := range b[1:] { - buff.WriteString(sep) - buff.WriteString(s) - } - - return buff.String() -} - -func searchFQN(res, fqn string) string { - return res + ":" + fqn -} - -func indexFQN(res string, m metav1.ObjectMeta) string { - return res + ":" + MetaFQN(m) + return false } // MetaFQN computes unique resource id based on metadata. @@ -101,120 +24,3 @@ func MetaFQN(m metav1.ObjectMeta) string { return m.Namespace + "/" + m.Name } - -// ToMillicore shows cpu reading for human. -func ToMillicore(v int64) string { - return strconv.Itoa(int(v)) + "m" -} - -// ToMi shows mem reading for human. -func ToMi(v float64) string { - return strconv.Itoa(int(v)) + "Mi" -} - -// WithPerc shows a combined number and ratio. -func withPerc(v, p string) string { - var b strings.Builder - b.Grow(len(v) + len(p) + 2) - b.WriteString(v) - b.WriteString("(") - b.WriteString(p) - b.WriteString(")") - // return v + " (" + p + ")" - return b.String() -} - -// AsPerc prints a number as a percentage. -func AsPerc(f float64) string { - return strconv.Itoa(int(f)) + "%" -} - -// ToPerc computes the ratio of two numbers as a percentage. -func toPerc(v1, v2 float64) float64 { - if v2 == 0 { - return 0 - } - return (v1 / v2) * 100 -} - -func namespaced(n string) (string, string) { - ns, po := path.Split(n) - - return strings.Trim(ns, "/"), po -} - -func missing(s string) string { - return check(s, MissingValue) -} - -func na(s string) string { - return check(s, NAValue) -} - -func check(s, sub string) string { - if len(s) == 0 { - return sub - } - - return s -} - -func intToStr(i int64) string { - return strconv.Itoa(int(i)) -} - -func boolToStr(b bool) string { - switch b { - case true: - return "true" - default: - return "false" - } -} - -func toAge(timestamp metav1.Time) string { - return time.Since(timestamp.Time).String() -} - -func toAgeHuman(s string) string { - d, err := time.ParseDuration(s) - if err != nil { - return "" - } - - return duration.HumanDuration(d) -} - -// Truncate a string to the given l and suffix ellipsis if needed. -func Truncate(str string, width int) string { - return runewidth.Truncate(str, width, string(tview.SemigraphicsHorizontalEllipsis)) -} - -func mapToStr(m map[string]string) (s string) { - if len(m) == 0 { - return MissingValue - } - - kk := make([]string, 0, len(m)) - for k := range m { - kk = append(kk, k) - } - sort.Strings(kk) - - for i, k := range kk { - s += k + "=" + m[k] - if i < len(kk)-1 { - s += "," - } - } - - return -} - -func boolPtrToStr(b *bool) string { - if b == nil { - return "false" - } - - return boolToStr(*b) -} diff --git a/internal/watch/meta.go b/internal/watch/meta.go index c13a14e0..1611a15b 100644 --- a/internal/watch/meta.go +++ b/internal/watch/meta.go @@ -1,11 +1,9 @@ package watch import ( - "errors" "fmt" "github.com/derailed/k9s/internal/k8s" - "github.com/rs/zerolog/log" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) @@ -36,18 +34,16 @@ type ( // TableListenerFn represents a table data listener. type TableListenerFn func(TableData) -// CallbackInformer an informer that allows listeners registration. -type CallbackInformer interface { +// StoreInformer an informer that allows listeners registration. +type StoreInformer interface { cache.SharedIndexInformer Get(fqn string) (interface{}, error) - List(ns string) (k8s.Collection, error) - SetListener(ns string, cb TableListenerFn) - UnsetListener(ns string) + List(ns string) k8s.Collection } // Meta represents a collection of cluster wide watchers. type Meta struct { - informers map[string]CallbackInformer + informers map[string]StoreInformer client k8s.Connection podInformer *Pod listenerFn TableListenerFn @@ -55,7 +51,7 @@ type Meta struct { // NewMeta creates a new cluster resource informer func NewMeta(client k8s.Connection, ns string) *Meta { - m := Meta{client: client, informers: map[string]CallbackInformer{}} + m := Meta{client: client, informers: map[string]StoreInformer{}} m.init(ns) return &m @@ -63,13 +59,16 @@ func NewMeta(client k8s.Connection, ns string) *Meta { func (m *Meta) init(ns string) { po := NewPod(m.client, ns) - m.informers = map[string]CallbackInformer{ + m.informers = map[string]StoreInformer{ NodeIndex: NewNode(m.client), - NodeMXIndex: NewNodeMetrics(m.client), PodIndex: po, - PodMXIndex: NewPodMetrics(m.client, ns), ContainerIndex: NewContainer(po), } + + if m.client.HasMetrics() { + m.informers[NodeMXIndex] = NewNodeMetrics(m.client) + m.informers[PodMXIndex] = NewPodMetrics(m.client, ns) + } } // List items from store. @@ -78,41 +77,25 @@ func (m *Meta) List(res, ns string) (k8s.Collection, error) { return nil, fmt.Errorf("No meta exists") } if i, ok := m.informers[res]; ok { - return i.List(ns) + return i.List(ns), nil } return nil, fmt.Errorf("No informer found for resource %s", res) } -// RegisterListener register table data listeners. -func (m *Meta) RegisterListener(res, ns string, cb TableListenerFn) { - if informer, ok := m.informers[res]; ok { - informer.SetListener(ns, cb) - } else { - log.Error().Msgf("No informer for %q:%s", ns, res) - } -} - -// UnregisterListener register table data listeners. -func (m *Meta) UnregisterListener(ns string) { - for _, i := range m.informers { - i.UnsetListener(ns) - } -} - // Get a resource by name. func (m Meta) Get(res, fqn string) (interface{}, error) { if informer, ok := m.informers[res]; ok { return informer.Get(fqn) } - return nil, errors.New("Unable to local resource") + return nil, fmt.Errorf("No informer found for resource %s", res) } // Run starts watching cluster resources. func (m *Meta) Run(closeCh <-chan struct{}) { for i := range m.informers { - go func(informer CallbackInformer, c <-chan struct{}) { + go func(informer StoreInformer, c <-chan struct{}) { informer.Run(c) }(m.informers[i], closeCh) } diff --git a/internal/watch/meta_test.go b/internal/watch/meta_test.go new file mode 100644 index 00000000..766747eb --- /dev/null +++ b/internal/watch/meta_test.go @@ -0,0 +1,52 @@ +package watch + +import ( + "testing" + + "gotest.tools/assert" +) + +func TestMetaList(t *testing.T) { + cmo := NewMockConnection() + m := NewMeta(cmo, "") + + o, err := m.List(PodIndex, "fred") + assert.NilError(t, err) + assert.Assert(t, len(o) == 0) +} + +func TestMetaListNoRes(t *testing.T) { + cmo := NewMockConnection() + m := NewMeta(cmo, "") + + o, err := m.List("dp", "fred") + assert.ErrorContains(t, err, "No informer found") + assert.Assert(t, len(o) == 0) +} + +func TestMetaGet(t *testing.T) { + cmo := NewMockConnection() + m := NewMeta(cmo, "") + + o, err := m.Get(PodIndex, "fred") + assert.ErrorContains(t, err, "Pod fred not found") + assert.Assert(t, o == nil) +} + +func TestMetaGetNoRes(t *testing.T) { + cmo := NewMockConnection() + m := NewMeta(cmo, "") + + o, err := m.Get("rs", "fred") + assert.ErrorContains(t, err, "No informer found") + assert.Assert(t, o == nil) +} + +func TestMetaRun(t *testing.T) { + cmo := NewMockConnection() + m := NewMeta(cmo, "") + + c := make(chan struct{}) + m.Run(c) + close(c) +} diff --git a/internal/watch/mock_connection_test.go b/internal/watch/mock_connection_test.go new file mode 100644 index 00000000..cb566272 --- /dev/null +++ b/internal/watch/mock_connection_test.go @@ -0,0 +1,613 @@ +// Code generated by pegomock. DO NOT EDIT. +// Source: github.com/derailed/k9s/internal/watch (interfaces: Connection) + +package watch + +import ( + k8s "github.com/derailed/k9s/internal/k8s" + pegomock "github.com/petergtz/pegomock" + v1 "k8s.io/api/core/v1" + version "k8s.io/apimachinery/pkg/version" + dynamic "k8s.io/client-go/dynamic" + kubernetes "k8s.io/client-go/kubernetes" + rest "k8s.io/client-go/rest" + versioned "k8s.io/metrics/pkg/client/clientset/versioned" + "reflect" + "time" +) + +type MockConnection struct { + fail func(message string, callerSkip ...int) +} + +func NewMockConnection() *MockConnection { + return &MockConnection{fail: pegomock.GlobalFailHandler} +} + +func (mock *MockConnection) Config() *k8s.Config { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("Config", params, []reflect.Type{reflect.TypeOf((**k8s.Config)(nil)).Elem()}) + var ret0 *k8s.Config + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(*k8s.Config) + } + } + return ret0 +} + +func (mock *MockConnection) DialOrDie() kubernetes.Interface { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("DialOrDie", params, []reflect.Type{reflect.TypeOf((*kubernetes.Interface)(nil)).Elem()}) + var ret0 kubernetes.Interface + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(kubernetes.Interface) + } + } + return ret0 +} + +func (mock *MockConnection) DynDialOrDie() dynamic.Interface { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("DynDialOrDie", params, []reflect.Type{reflect.TypeOf((*dynamic.Interface)(nil)).Elem()}) + var ret0 dynamic.Interface + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(dynamic.Interface) + } + } + return ret0 +} + +func (mock *MockConnection) FetchNodes() (*v1.NodeList, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("FetchNodes", params, []reflect.Type{reflect.TypeOf((**v1.NodeList)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 *v1.NodeList + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(*v1.NodeList) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockConnection) HasMetrics() bool { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("HasMetrics", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem()}) + var ret0 bool + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(bool) + } + } + return ret0 +} + +func (mock *MockConnection) IsNamespaced(_param0 string) bool { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{_param0} + result := pegomock.GetGenericMockFrom(mock).Invoke("IsNamespaced", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem()}) + var ret0 bool + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(bool) + } + } + return ret0 +} + +func (mock *MockConnection) MXDial() (*versioned.Clientset, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("MXDial", params, []reflect.Type{reflect.TypeOf((**versioned.Clientset)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 *versioned.Clientset + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(*versioned.Clientset) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockConnection) NSDialOrDie() dynamic.NamespaceableResourceInterface { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("NSDialOrDie", params, []reflect.Type{reflect.TypeOf((*dynamic.NamespaceableResourceInterface)(nil)).Elem()}) + var ret0 dynamic.NamespaceableResourceInterface + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(dynamic.NamespaceableResourceInterface) + } + } + return ret0 +} + +func (mock *MockConnection) NodePods(_param0 string) (*v1.PodList, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{_param0} + result := pegomock.GetGenericMockFrom(mock).Invoke("NodePods", params, []reflect.Type{reflect.TypeOf((**v1.PodList)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 *v1.PodList + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(*v1.PodList) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockConnection) RestConfigOrDie() *rest.Config { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("RestConfigOrDie", params, []reflect.Type{reflect.TypeOf((**rest.Config)(nil)).Elem()}) + var ret0 *rest.Config + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(*rest.Config) + } + } + return ret0 +} + +func (mock *MockConnection) ServerVersion() (*version.Info, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("ServerVersion", params, []reflect.Type{reflect.TypeOf((**version.Info)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 *version.Info + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(*version.Info) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockConnection) SupportsRes(_param0 string, _param1 []string) (string, bool) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{_param0, _param1} + result := pegomock.GetGenericMockFrom(mock).Invoke("SupportsRes", params, []reflect.Type{reflect.TypeOf((*string)(nil)).Elem(), reflect.TypeOf((*bool)(nil)).Elem()}) + var ret0 string + var ret1 bool + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(string) + } + if result[1] != nil { + ret1 = result[1].(bool) + } + } + return ret0, ret1 +} + +func (mock *MockConnection) SupportsResource(_param0 string) bool { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{_param0} + result := pegomock.GetGenericMockFrom(mock).Invoke("SupportsResource", params, []reflect.Type{reflect.TypeOf((*bool)(nil)).Elem()}) + var ret0 bool + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].(bool) + } + } + return ret0 +} + +func (mock *MockConnection) SwitchContextOrDie(_param0 string) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{_param0} + pegomock.GetGenericMockFrom(mock).Invoke("SwitchContextOrDie", params, []reflect.Type{}) +} + +func (mock *MockConnection) ValidNamespaces() ([]v1.Namespace, error) { + if mock == nil { + panic("mock must not be nil. Use myMock := NewMockConnection().") + } + params := []pegomock.Param{} + result := pegomock.GetGenericMockFrom(mock).Invoke("ValidNamespaces", params, []reflect.Type{reflect.TypeOf((*[]v1.Namespace)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}) + var ret0 []v1.Namespace + var ret1 error + if len(result) != 0 { + if result[0] != nil { + ret0 = result[0].([]v1.Namespace) + } + if result[1] != nil { + ret1 = result[1].(error) + } + } + return ret0, ret1 +} + +func (mock *MockConnection) VerifyWasCalledOnce() *VerifierConnection { + return &VerifierConnection{ + mock: mock, + invocationCountMatcher: pegomock.Times(1), + } +} + +func (mock *MockConnection) VerifyWasCalled(invocationCountMatcher pegomock.Matcher) *VerifierConnection { + return &VerifierConnection{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + } +} + +func (mock *MockConnection) VerifyWasCalledInOrder(invocationCountMatcher pegomock.Matcher, inOrderContext *pegomock.InOrderContext) *VerifierConnection { + return &VerifierConnection{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + inOrderContext: inOrderContext, + } +} + +func (mock *MockConnection) VerifyWasCalledEventually(invocationCountMatcher pegomock.Matcher, timeout time.Duration) *VerifierConnection { + return &VerifierConnection{ + mock: mock, + invocationCountMatcher: invocationCountMatcher, + timeout: timeout, + } +} + +type VerifierConnection struct { + mock *MockConnection + invocationCountMatcher pegomock.Matcher + inOrderContext *pegomock.InOrderContext + timeout time.Duration +} + +func (verifier *VerifierConnection) Config() *Connection_Config_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "Config", params, verifier.timeout) + return &Connection_Config_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_Config_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_Config_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_Config_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) DialOrDie() *Connection_DialOrDie_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "DialOrDie", params, verifier.timeout) + return &Connection_DialOrDie_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_DialOrDie_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_DialOrDie_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_DialOrDie_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) DynDialOrDie() *Connection_DynDialOrDie_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "DynDialOrDie", params, verifier.timeout) + return &Connection_DynDialOrDie_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_DynDialOrDie_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_DynDialOrDie_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_DynDialOrDie_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) FetchNodes() *Connection_FetchNodes_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "FetchNodes", params, verifier.timeout) + return &Connection_FetchNodes_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_FetchNodes_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_FetchNodes_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_FetchNodes_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) HasMetrics() *Connection_HasMetrics_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "HasMetrics", params, verifier.timeout) + return &Connection_HasMetrics_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_HasMetrics_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_HasMetrics_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_HasMetrics_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) IsNamespaced(_param0 string) *Connection_IsNamespaced_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "IsNamespaced", params, verifier.timeout) + return &Connection_IsNamespaced_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_IsNamespaced_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_IsNamespaced_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *Connection_IsNamespaced_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(params[0])) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} + +func (verifier *VerifierConnection) MXDial() *Connection_MXDial_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "MXDial", params, verifier.timeout) + return &Connection_MXDial_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_MXDial_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_MXDial_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_MXDial_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) NSDialOrDie() *Connection_NSDialOrDie_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "NSDialOrDie", params, verifier.timeout) + return &Connection_NSDialOrDie_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_NSDialOrDie_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_NSDialOrDie_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_NSDialOrDie_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) NodePods(_param0 string) *Connection_NodePods_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "NodePods", params, verifier.timeout) + return &Connection_NodePods_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_NodePods_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_NodePods_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *Connection_NodePods_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(params[0])) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} + +func (verifier *VerifierConnection) RestConfigOrDie() *Connection_RestConfigOrDie_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "RestConfigOrDie", params, verifier.timeout) + return &Connection_RestConfigOrDie_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_RestConfigOrDie_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_RestConfigOrDie_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_RestConfigOrDie_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) ServerVersion() *Connection_ServerVersion_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "ServerVersion", params, verifier.timeout) + return &Connection_ServerVersion_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_ServerVersion_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_ServerVersion_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_ServerVersion_OngoingVerification) GetAllCapturedArguments() { +} + +func (verifier *VerifierConnection) SupportsRes(_param0 string, _param1 []string) *Connection_SupportsRes_OngoingVerification { + params := []pegomock.Param{_param0, _param1} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SupportsRes", params, verifier.timeout) + return &Connection_SupportsRes_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_SupportsRes_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_SupportsRes_OngoingVerification) GetCapturedArguments() (string, []string) { + _param0, _param1 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1], _param1[len(_param1)-1] +} + +func (c *Connection_SupportsRes_OngoingVerification) GetAllCapturedArguments() (_param0 []string, _param1 [][]string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(params[0])) + for u, param := range params[0] { + _param0[u] = param.(string) + } + _param1 = make([][]string, len(params[1])) + for u, param := range params[1] { + _param1[u] = param.([]string) + } + } + return +} + +func (verifier *VerifierConnection) SupportsResource(_param0 string) *Connection_SupportsResource_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SupportsResource", params, verifier.timeout) + return &Connection_SupportsResource_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_SupportsResource_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_SupportsResource_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *Connection_SupportsResource_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(params[0])) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} + +func (verifier *VerifierConnection) SwitchContextOrDie(_param0 string) *Connection_SwitchContextOrDie_OngoingVerification { + params := []pegomock.Param{_param0} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "SwitchContextOrDie", params, verifier.timeout) + return &Connection_SwitchContextOrDie_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_SwitchContextOrDie_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_SwitchContextOrDie_OngoingVerification) GetCapturedArguments() string { + _param0 := c.GetAllCapturedArguments() + return _param0[len(_param0)-1] +} + +func (c *Connection_SwitchContextOrDie_OngoingVerification) GetAllCapturedArguments() (_param0 []string) { + params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) + if len(params) > 0 { + _param0 = make([]string, len(params[0])) + for u, param := range params[0] { + _param0[u] = param.(string) + } + } + return +} + +func (verifier *VerifierConnection) ValidNamespaces() *Connection_ValidNamespaces_OngoingVerification { + params := []pegomock.Param{} + methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "ValidNamespaces", params, verifier.timeout) + return &Connection_ValidNamespaces_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} +} + +type Connection_ValidNamespaces_OngoingVerification struct { + mock *MockConnection + methodInvocations []pegomock.MethodInvocation +} + +func (c *Connection_ValidNamespaces_OngoingVerification) GetCapturedArguments() { +} + +func (c *Connection_ValidNamespaces_OngoingVerification) GetAllCapturedArguments() { +} diff --git a/internal/watch/no.go b/internal/watch/no.go index 6c60ae06..0a6673ba 100644 --- a/internal/watch/no.go +++ b/internal/watch/no.go @@ -2,12 +2,8 @@ package watch import ( "fmt" - "strings" "github.com/derailed/k9s/internal/k8s" - "github.com/rs/zerolog/log" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/watch" wv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" ) @@ -21,42 +17,23 @@ const ( // Node tracks node activities. type Node struct { cache.SharedIndexInformer - - client k8s.Connection - data RowEvents - mxData k8s.NodesMetrics - listener TableListenerFn } // NewNode returns a new node. -func NewNode(client k8s.Connection) *Node { - no := Node{ - client: client, - data: RowEvents{}, - mxData: k8s.NodesMetrics{}, +func NewNode(c k8s.Connection) *Node { + return &Node{ + SharedIndexInformer: wv1.NewNodeInformer(c.DialOrDie(), 0, cache.Indexers{}), } - - if client == nil { - return &no - } - - no.SharedIndexInformer = wv1.NewNodeInformer( - client.DialOrDie(), - 0, - cache.Indexers{}, - ) - no.SharedIndexInformer.AddEventHandler(&no) - - return &no } // List all nodes. -func (n *Node) List(_ string) (k8s.Collection, error) { +func (n *Node) List(_ string) k8s.Collection { var res k8s.Collection for _, o := range n.GetStore().List() { res = append(res, o) } - return res, nil + + return res } // Get retrieves a given node from store. @@ -71,229 +48,3 @@ func (n *Node) Get(fqn string) (interface{}, error) { return o, nil } - -// SetListener registers event recipient. -func (n *Node) SetListener(_ string, cb TableListenerFn) { - n.listener = cb - n.fireChanged() -} - -// UnsetListener unregister event recipient. -func (n *Node) UnsetListener(_ string) { - n.listener = nil -} - -func (n *Node) fetchMetrics() { - if n.client == nil { - return - } - - client := k8s.NewMetricsServer(n.client) - mx, err := client.FetchNodesMetrics() - if err != nil { - log.Error().Err(err).Msg("Node metrics failed") - return - } - client.NodesMetrics(n.GetStore().List(), mx, n.mxData) -} - -// Data return current data. -func (n *Node) tableData() TableData { - return TableData{ - Header: n.header(), - Rows: n.data, - Namespace: NotNamespaced, - } -} - -func (n *Node) fireChanged() { - if cb := n.listener; cb != nil { - cb(n.tableData()) - } -} - -// OnAdd notify node added. -func (n *Node) OnAdd(obj interface{}) { - no := obj.(*v1.Node) - n.fetchMetrics() - ff := make(Row, nodeCols) - n.fields(no, ff) - fqn := MetaFQN(no.ObjectMeta) - log.Debug().Msgf("Node Added %s", fqn) - - n.data[fqn] = &RowEvent{ - Action: watch.Added, - Fields: ff, - Deltas: make(Row, len(ff)), - } - - if n.HasSynced() { - n.fireChanged() - } -} - -// OnUpdate notify node updated. -func (n *Node) OnUpdate(oldObj, newObj interface{}) { - ono, nno := oldObj.(*v1.Node), newObj.(*v1.Node) - k1 := MetaFQN(ono.ObjectMeta) - k2 := MetaFQN(nno.ObjectMeta) - - ff := make(Row, nodeCols) - n.fields(nno, ff) - if re, ok := n.data[k1]; ok { - re.Action = watch.Modified - re.Deltas = re.Fields - re.Fields = ff - } - n.data[k2] = &RowEvent{ - Action: watch.Added, - Fields: ff, - Deltas: make(Row, len(ff)), - } - n.fireChanged() -} - -// OnDelete notify node was deleted. -func (n *Node) OnDelete(obj interface{}) { - po := obj.(*v1.Node) - key := MetaFQN(po.ObjectMeta) - - log.Debug().Msgf("Node Delete %s", key) - - delete(n.data, key) - n.fireChanged() -} - -// Header returns resource header. -func (*Node) header() Row { - return Row{ - "NAME", - "STATUS", - "ROLE", - "VERSION", - "KERNEL", - "INTERNAL-IP", - "EXTERNAL-IP", - "CPU", - "MEM", - "ACPU", - "AMEM", - "AGE", - } -} - -// Fields returns displayable fields. -func (n *Node) fields(no *v1.Node, r Row) { - col := 0 - r[col] = no.Name - col++ - sta := make([]string, 10) - n.status(no.Status, no.Spec.Unschedulable, sta) - r[col] = join(sta, ",") - col++ - ro := make([]string, 10) - n.nodeRoles(no, ro) - r[col] = join(ro, ",") - col++ - r[col] = no.Status.NodeInfo.KubeletVersion - col++ - r[col] = no.Status.NodeInfo.KernelVersion - col++ - iIP, eIP := n.getIPs(no.Status.Addresses) - iIP, eIP = missing(iIP), missing(eIP) - r[col], r[col+1] = iIP, eIP - col += 2 - - fqn := MetaFQN(no.ObjectMeta) - n.fetchMetrics() - mx := n.mxData[fqn] - r[col] = withPerc( - ToMillicore(mx.CurrentCPU), - AsPerc(toPerc(float64(mx.CurrentCPU), float64(mx.AvailCPU))), - ) - col++ - r[col] = withPerc( - ToMi(mx.CurrentMEM), - AsPerc(toPerc(mx.CurrentMEM, mx.AvailMEM)), - ) - col++ - r[col] = ToMillicore(mx.AvailCPU) - col++ - r[col] = ToMi(mx.AvailMEM) - col++ - r[col] = toAge(no.ObjectMeta.CreationTimestamp) -} - -// ---------------------------------------------------------------------------- -// Helpers... - -const ( - labelNodeRolePrefix = "node-role.kubernetes.io/" - nodeLabelRole = "kubernetes.io/role" -) - -func (*Node) nodeRoles(node *v1.Node, res []string) { - index := 0 - for k, v := range node.Labels { - switch { - case strings.HasPrefix(k, labelNodeRolePrefix): - if role := strings.TrimPrefix(k, labelNodeRolePrefix); len(role) > 0 { - res[index] = role - index++ - } - case k == nodeLabelRole && v != "": - res[index] = v - index++ - } - } - - if empty(res) { - res[index] = MissingValue - index++ - } -} - -func (*Node) getIPs(addrs []v1.NodeAddress) (iIP, eIP string) { - for _, a := range addrs { - switch a.Type { - case v1.NodeExternalIP: - eIP = a.Address - case v1.NodeInternalIP: - iIP = a.Address - } - } - - return -} - -func (*Node) status(status v1.NodeStatus, exempt bool, res []string) { - var index int - conditions := make(map[v1.NodeConditionType]*v1.NodeCondition) - for n := range status.Conditions { - cond := status.Conditions[n] - conditions[cond.Type] = &cond - } - - validConditions := []v1.NodeConditionType{v1.NodeReady} - for _, validCondition := range validConditions { - condition, ok := conditions[validCondition] - if !ok { - continue - } - neg := "" - if condition.Status != v1.ConditionTrue { - neg = "Not" - } - res[index] = neg + string(condition.Type) - index++ - - } - if len(res) == 0 { - res[index] = "Unknown" - index++ - } - if exempt { - res[index] = "SchedulingDisabled" - index++ - } -} diff --git a/internal/watch/no_mx.go b/internal/watch/no_mx.go index ff3cb7bf..1d267b8e 100644 --- a/internal/watch/no_mx.go +++ b/internal/watch/no_mx.go @@ -11,12 +11,13 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" - versioned "k8s.io/metrics/pkg/client/clientset/versioned" ) const ( // NodeMXIndex track store indexer. NodeMXIndex string = "nmx" + // NodeMXRefresh node metrics sync rate. + nodeMXRefresh = 30 * time.Second ) // NodeMetrics tracks node metrics. @@ -27,28 +28,16 @@ type NodeMetrics struct { } // NewNodeMetrics returns a node metrics informer. -func NewNodeMetrics(client k8s.Connection) *NodeMetrics { - mx := NodeMetrics{ - client: client, +func NewNodeMetrics(c k8s.Connection) *NodeMetrics { + return &NodeMetrics{ + SharedIndexInformer: newNodeMetricsInformer(c, 0, cache.Indexers{}), + client: c, } - - if client == nil { - return &mx - } - - c, err := client.MXDial() - if err != nil { - return &mx - } - mx.SharedIndexInformer = NewNodeMetricsInformer(c, 0, cache.Indexers{}) - mx.SharedIndexInformer.AddEventHandler(&mx) - - return &mx } // List node metrics from store. -func (p *NodeMetrics) List(string) (k8s.Collection, error) { - return p.GetStore().List(), nil +func (p *NodeMetrics) List(string) k8s.Collection { + return p.GetStore().List() } // Get node metrics from store. @@ -58,49 +47,25 @@ func (p *NodeMetrics) Get(MetaFQN string) (interface{}, error) { return nil, err } if !ok { - return nil, fmt.Errorf("NodeMetric for %q not found", MetaFQN) + return nil, fmt.Errorf("No node metrics for %q", MetaFQN) } return o, nil } -// SetListener register an event listiner. -func (p *NodeMetrics) SetListener(ns string, cb TableListenerFn) {} - -// UnsetListener unregister event listener. -func (p *NodeMetrics) UnsetListener(ns string) {} - -// OnAdd notify node added. -func (p *NodeMetrics) OnAdd(obj interface{}) { - po := obj.(*mv1beta1.NodeMetrics) - fqn := MetaFQN(po.ObjectMeta) - log.Debug().Msgf("NMX Added %s", fqn) -} - -// OnUpdate notify node updated. -func (p *NodeMetrics) OnUpdate(oldObj, newObj interface{}) { - opo, npo := oldObj.(*mv1beta1.NodeMetrics), newObj.(*mv1beta1.NodeMetrics) - - k1 := MetaFQN(opo.ObjectMeta) - k2 := MetaFQN(npo.ObjectMeta) - log.Debug().Msgf("NMX Updated %#v -- %#v", k1, k2) -} - -// OnDelete notify node was deleted. -func (p *NodeMetrics) OnDelete(obj interface{}) { - po := obj.(*mv1beta1.NodeMetrics) - key := MetaFQN(po.ObjectMeta) - log.Debug().Msgf("NMX Delete %s", key) -} - // NewNodeMetricsInformer return an informer to return node metrix. -func NewNodeMetricsInformer(client *versioned.Clientset, sync time.Duration, idxs cache.Indexers) cache.SharedIndexInformer { - pw := NewNodeMxWatcher(client) +func newNodeMetricsInformer(client k8s.Connection, sync time.Duration, idxs cache.Indexers) cache.SharedIndexInformer { + pw := newNodeMxWatcher(client) return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - l, err := client.MetricsV1beta1().NodeMetricses().List(opts) + c, err := client.MXDial() + if err != nil { + return nil, err + } + + l, err := c.MetricsV1beta1().NodeMetricses().List(opts) if err == nil { pw.update(l, false) } @@ -117,19 +82,17 @@ func NewNodeMetricsInformer(client *versioned.Clientset, sync time.Duration, idx ) } -const nodeMXRefresh = 30 * time.Second - -// NodeMxWatcher tracks node metrics. -type NodeMxWatcher struct { - client *versioned.Clientset +// nodeMxWatcher tracks node metrics. +type nodeMxWatcher struct { + client k8s.Connection cache map[string]runtime.Object eventChan chan watch.Event doneChan chan struct{} } -// NewNodeMxWatcher returns a new metrics watcher. -func NewNodeMxWatcher(c *versioned.Clientset) *NodeMxWatcher { - return &NodeMxWatcher{ +// NewnodeMxWatcher returns a new metrics watcher. +func newNodeMxWatcher(c k8s.Connection) *nodeMxWatcher { + return &nodeMxWatcher{ client: c, cache: map[string]runtime.Object{}, eventChan: make(chan watch.Event), @@ -138,7 +101,7 @@ func NewNodeMxWatcher(c *versioned.Clientset) *NodeMxWatcher { } // Run watcher to monitor node metrics. -func (n *NodeMxWatcher) Run() { +func (n *nodeMxWatcher) Run() { go func() { defer log.Debug().Msg("Node metrics watcher canceled!") for { @@ -146,7 +109,12 @@ func (n *NodeMxWatcher) Run() { case <-n.doneChan: return case <-time.After(nodeMXRefresh): - list, err := n.client.MetricsV1beta1().NodeMetricses().List(metav1.ListOptions{}) + c, err := n.client.MXDial() + if err != nil { + return + } + + list, err := c.MetricsV1beta1().NodeMetricses().List(metav1.ListOptions{}) if err != nil { log.Error().Err(err).Msg("Fetch node metrics") } @@ -156,7 +124,19 @@ func (n *NodeMxWatcher) Run() { }() } -func (n *NodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { +// Stop the metrics informer. +func (n *nodeMxWatcher) Stop() { + log.Debug().Msg("Stopping node watcher!") + close(n.doneChan) + close(n.eventChan) +} + +// ResultChan retrieves event channel. +func (n *nodeMxWatcher) ResultChan() <-chan watch.Event { + return n.eventChan +} + +func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { fqns := map[string]runtime.Object{} for i := range list.Items { fqn := MetaFQN(list.Items[i].ObjectMeta) @@ -178,7 +158,7 @@ func (n *NodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { for k, v := range fqns { kind := watch.Added if v1, ok := n.cache[k]; ok { - if !n.deltas(v1.(*mv1beta1.NodeMetrics), v.(*mv1beta1.NodeMetrics)) { + if !resourceDiff(v1.(*mv1beta1.NodeMetrics).Usage, v.(*mv1beta1.NodeMetrics).Usage) { continue } kind = watch.Modified @@ -193,19 +173,3 @@ func (n *NodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { n.cache[k] = v } } - -// Stop the metrics informer. -func (n *NodeMxWatcher) Stop() { - log.Debug().Msg("Stopping node watcher!") - close(n.doneChan) - close(n.eventChan) -} - -// ResultChan retrieves event channel. -func (n *NodeMxWatcher) ResultChan() <-chan watch.Event { - return n.eventChan -} - -func (n *NodeMxWatcher) deltas(m1, m2 *mv1beta1.NodeMetrics) bool { - return resourceDiff(m1.Usage, m2.Usage) -} diff --git a/internal/watch/no_mx_test.go b/internal/watch/no_mx_test.go new file mode 100644 index 00000000..8365a5f0 --- /dev/null +++ b/internal/watch/no_mx_test.go @@ -0,0 +1,32 @@ +package watch + +import ( + "testing" + + "gotest.tools/assert" +) + +func TestNodeMXList(t *testing.T) { + cmo := NewMockConnection() + no := NewNodeMetrics(cmo) + + o := no.List("") + assert.Assert(t, len(o) == 0) +} + +func TestNodeMXGet(t *testing.T) { + cmo := NewMockConnection() + no := NewNodeMetrics(cmo) + + o, err := no.Get("") + assert.ErrorContains(t, err, "No node metrics") + assert.Assert(t, o == nil) +} + +func TestNodeMXRun(t *testing.T) { + cmo := NewMockConnection() + w := newNodeMxWatcher(cmo) + + w.Run() + w.Stop() +} diff --git a/internal/watch/no_test.go b/internal/watch/no_test.go index 77e0d380..2397d51b 100644 --- a/internal/watch/no_test.go +++ b/internal/watch/no_test.go @@ -4,54 +4,21 @@ import ( "testing" "gotest.tools/assert" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func BenchmarkNodeFields(b *testing.B) { - n := NewNode(nil) - no := makeNode() - ff := make(Row, 12) +func TestNodeList(t *testing.T) { + cmo := NewMockConnection() + no := NewNode(cmo) - b.ResetTimer() - b.ReportAllocs() - for i := 0; i < b.N; i++ { - n.fields(no, ff) - } + o := no.List("") + assert.Assert(t, o == nil) } -func TestJoin(t *testing.T) { - uu := map[string]struct { - i []string - e string - }{ - "zero": {[]string{}, ""}, - "std": {[]string{"a", "b", "c"}, "a,b,c"}, - "blank": {[]string{"", "", ""}, ""}, - "sparse": {[]string{"a", "", "c"}, "a,c"}, - } +func TestNodeGet(t *testing.T) { + cmo := NewMockConnection() + no := NewNode(cmo) - for k, v := range uu { - t.Run(k, func(t *testing.T) { - assert.Equal(t, v.e, join(v.i, ",")) - }) - } -} - -// ---------------------------------------------------------------------------- -// Helpers... - -func makeNode() *v1.Node { - return &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "fred", - CreationTimestamp: metav1.Time{Time: testTime()}, - }, - Spec: v1.NodeSpec{}, - Status: v1.NodeStatus{ - Addresses: []v1.NodeAddress{ - {Address: "1.1.1.1"}, - }, - }, - } + o, err := no.Get("") + assert.ErrorContains(t, err, "not found") + assert.Assert(t, o == nil) } diff --git a/internal/watch/pod.go b/internal/watch/pod.go index cecb91f5..bfacecd9 100644 --- a/internal/watch/pod.go +++ b/internal/watch/pod.go @@ -2,15 +2,11 @@ package watch import ( "fmt" - "strconv" "github.com/derailed/k9s/internal/k8s" - "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/watch" wv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" - "k8s.io/kubernetes/pkg/util/node" ) const ( @@ -19,73 +15,23 @@ const ( podCols = 11 ) +// Connection represents an client api server connection. +type Connection k8s.Connection + // Pod tracks pod activities. type Pod struct { cache.SharedIndexInformer - - client k8s.Connection - data RowEvents - mxData k8s.PodsMetrics - ns string - listener TableListenerFn - activeNS *string } // NewPod returns a new pod. -func NewPod(client k8s.Connection, ns string) *Pod { - po := Pod{ - ns: ns, - client: client, - data: RowEvents{}, - mxData: k8s.PodsMetrics{}, - } - - if client == nil { - return &po - } - - po.SharedIndexInformer = wv1.NewPodInformer( - client.DialOrDie(), - ns, - 0, - cache.Indexers{}, - ) - po.AddEventHandler(&po) - - return &po -} - -// SetListener registers event recipient. -func (p *Pod) SetListener(ns string, cb TableListenerFn) { - p.listener, p.activeNS = cb, &ns - p.fireChanged(*p.activeNS) -} - -// UnsetListener unregister event recipient. -func (p *Pod) UnsetListener(_ string) { - p.listener, p.activeNS = nil, nil -} - -// Data return current data. -func (p *Pod) tableData(ns string) TableData { - // Filter list based on active namespace. - data := RowEvents{} - for k := range p.data { - pns, _ := namespaced(k) - if ns == AllNamespaces || pns == ns { - data[k] = p.data[k] - } - } - - return TableData{ - Header: p.header(), - Rows: data, - Namespace: ns, +func NewPod(c Connection, ns string) *Pod { + return &Pod{ + SharedIndexInformer: wv1.NewPodInformer(c.DialOrDie(), ns, 0, cache.Indexers{}), } } // List all pods from store in the given namespace. -func (p *Pod) List(ns string) (k8s.Collection, error) { +func (p *Pod) List(ns string) k8s.Collection { var res k8s.Collection for _, o := range p.GetStore().List() { pod := o.(*v1.Pod) @@ -93,7 +39,7 @@ func (p *Pod) List(ns string) (k8s.Collection, error) { res = append(res, pod) } } - return res, nil + return res } // Get retrieves a given pod from store. @@ -108,249 +54,3 @@ func (p *Pod) Get(fqn string) (interface{}, error) { return o, nil } - -func (p *Pod) fireChanged(ns string) { - if cb := p.listener; cb != nil { - cb(p.tableData(*p.activeNS)) - } -} - -// OnAdd notify pod added. -func (p *Pod) OnAdd(obj interface{}) { - po := obj.(*v1.Pod) - ff := make(Row, podCols) - p.fields(p.ns, po, ff) - fqn := MetaFQN(po.ObjectMeta) - // log.Debug().Msgf("Pod Added %s", fqn) - - p.data[fqn] = &RowEvent{ - Action: watch.Added, - Fields: ff, - Deltas: make(Row, len(ff)), - } - - if p.HasSynced() { - p.fireChanged(po.Namespace) - } -} - -// OnUpdate notify pod updated. -func (p *Pod) OnUpdate(oldObj, newObj interface{}) { - opo, npo := oldObj.(*v1.Pod), newObj.(*v1.Pod) - - k1 := MetaFQN(opo.ObjectMeta) - k2 := MetaFQN(npo.ObjectMeta) - // log.Debug().Msgf("Pod Updated %#v -- %#v", opo.Name, npo.Name) - - p.deltas(opo, npo) - ff := make(Row, podCols) - p.fields(p.ns, npo, ff) - if re, ok := p.data[k1]; ok { - re.Action = watch.Modified - re.Deltas = re.Fields - re.Fields = ff - } - p.data[k2] = &RowEvent{ - Action: watch.Added, - Fields: ff, - Deltas: make(Row, len(ff)), - } - p.fireChanged(npo.Namespace) -} - -func (p *Pod) deltas(p1, p2 *v1.Pod) { - f1 := make(Row, podCols) - p.fields(p.ns, p1, f1) - f2 := make(Row, podCols) - p.fields(p.ns, p2, f2) - for i := 0; i < len(f1); i++ { - if f1[i] != f2[i] { - log.Debug().Msgf("Pod changed %s - %s", f1[i], f2[i]) - } - } -} - -// OnDelete notify pod was deleted. -func (p *Pod) OnDelete(obj interface{}) { - po := obj.(*v1.Pod) - key := MetaFQN(po.ObjectMeta) - // log.Debug().Msgf("Pod Delete %s", key) - delete(p.data, key) - p.fireChanged(po.Namespace) -} - -// header return resource header. -func (*Pod) header() Row { - var hh Row - return append(hh, - "NAMESPACE", - "NAME", - "READY", - "STATUS", - "RS", - "CPU", - "MEM", - "IP", - "NODE", - "QOS", - "AGE", - ) -} - -// fields retrieves displayable fields. -func (p *Pod) fields(ns string, pod *v1.Pod, ff Row) { - var col int - ff[col] = pod.ObjectMeta.Namespace - col++ - ff[col] = pod.ObjectMeta.Name - col++ - ss := pod.Status.ContainerStatuses - cr, _, rc := p.statuses(ss) - ff[col] = strconv.Itoa(cr) + "/" + strconv.Itoa(len(ss)) - col++ - ff[col] = p.phase(pod) - col++ - ff[col] = strconv.Itoa(rc) - col++ - fqn := MetaFQN(pod.ObjectMeta) - p.fetchMetrics() - mx := p.mxData[fqn] - ff[col] = ToMillicore(mx.CurrentCPU) - col++ - ff[col] = ToMi(mx.CurrentMEM) - col++ - ff[col] = pod.Status.PodIP - col++ - ff[col] = pod.Spec.NodeName - col++ - ff[col] = p.mapQOS(pod.Status.QOSClass) - col++ - ff[col] = toAge(pod.ObjectMeta.CreationTimestamp) - col++ -} - -func (p *Pod) fetchMetrics() { - if p.client == nil { - return - } - - client := k8s.NewMetricsServer(p.client) - mx, err := client.FetchPodsMetrics(p.ns) - if err != nil { - log.Error().Err(err).Msg("Pod metrics failed") - return - } - client.PodsMetrics(mx, p.mxData) -} - -// ---------------------------------------------------------------------------- -// Helpers... - -func (*Pod) mapQOS(class v1.PodQOSClass) string { - switch class { - case v1.PodQOSGuaranteed: - return "GA" - case v1.PodQOSBurstable: - return "BU" - default: - return "BE" - } -} - -func (*Pod) statuses(ss []v1.ContainerStatus) (cr, ct, rc int) { - for _, c := range ss { - if c.State.Terminated != nil { - ct++ - } - if c.Ready { - cr = cr + 1 - } - rc += int(c.RestartCount) - } - - return -} - -func isSet(s *string) bool { - return s != nil && *s != "" -} - -func (p *Pod) phase(po *v1.Pod) string { - status := string(po.Status.Phase) - if po.Status.Reason != "" { - if po.DeletionTimestamp != nil && po.Status.Reason == node.NodeUnreachablePodReason { - return "Unknown" - } - status = po.Status.Reason - } - - var init bool - init, status = p.initContainerPhase(po.Status, len(po.Spec.InitContainers), status) - if init { - return status - } - - var running bool - running, status = p.containerPhase(po.Status, status) - if status == "Completed" && running { - status = "Running" - } - - if po.DeletionTimestamp == nil { - return status - } - - return "Terminated" -} - -func (*Pod) containerPhase(st v1.PodStatus, status string) (bool, string) { - var running bool - for i := len(st.ContainerStatuses) - 1; i >= 0; i-- { - cs := st.ContainerStatuses[i] - switch { - case cs.State.Waiting != nil && cs.State.Waiting.Reason != "": - status = cs.State.Waiting.Reason - case cs.State.Terminated != nil && cs.State.Terminated.Reason != "": - status = cs.State.Terminated.Reason - case cs.State.Terminated != nil: - if cs.State.Terminated.Signal != 0 { - status = "Signal:" + strconv.Itoa(int(cs.State.Terminated.Signal)) - } else { - status = "ExitCode:" + strconv.Itoa(int(cs.State.Terminated.ExitCode)) - } - case cs.Ready && cs.State.Running != nil: - running = true - } - } - - return running, status -} - -func (*Pod) initContainerPhase(st v1.PodStatus, initCount int, status string) (bool, string) { - var init bool - for i, cs := range st.InitContainerStatuses { - switch { - case cs.State.Terminated != nil: - if cs.State.Terminated.ExitCode == 0 { - continue - } - if cs.State.Terminated.Reason != "" { - status = "Init:" + cs.State.Terminated.Reason - break - } - if cs.State.Terminated.Signal != 0 { - status = "Init:Signal:" + strconv.Itoa(int(cs.State.Terminated.Signal)) - } else { - status = "Init:ExitCode:" + strconv.Itoa(int(cs.State.Terminated.ExitCode)) - } - case cs.State.Waiting != nil && cs.State.Waiting.Reason != "" && cs.State.Waiting.Reason != "PodInitializing": - status = "Init:" + cs.State.Waiting.Reason - default: - status = "Init:" + strconv.Itoa(i) + "/" + strconv.Itoa(initCount) - } - init = true - break - } - - return init, status -} diff --git a/internal/watch/pod_mx.go b/internal/watch/pod_mx.go index 71ef811d..72bcb438 100644 --- a/internal/watch/pod_mx.go +++ b/internal/watch/pod_mx.go @@ -1,6 +1,7 @@ package watch import ( + "errors" "fmt" "time" @@ -12,44 +13,33 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" - versioned "k8s.io/metrics/pkg/client/clientset/versioned" ) const ( // PodMXIndex track store indexer. PodMXIndex string = "pmx" + // PodMXRefresh pod metrics sync rate. + podMXRefresh = 15 * time.Second ) // PodMetrics tracks pod metrics. type PodMetrics struct { cache.SharedIndexInformer - client k8s.Connection ns string } // NewPodMetrics returns a pod metrics informer. -func NewPodMetrics(client k8s.Connection, ns string) *PodMetrics { - mx := PodMetrics{ - ns: ns, - client: client, +func NewPodMetrics(c k8s.Connection, ns string) *PodMetrics { + return &PodMetrics{ + SharedIndexInformer: newPodMetricsInformer(c, ns, 0, cache.Indexers{}), + ns: ns, + client: c, } - - if client == nil { - return &mx - } - - c, err := client.MXDial() - if err != nil { - return &mx - } - mx.SharedIndexInformer = NewPodMetricsInformer(c, ns, 0, cache.Indexers{}) - - return &mx } // List pod metrics from store. -func (p *PodMetrics) List(ns string) (k8s.Collection, error) { +func (p *PodMetrics) List(ns string) k8s.Collection { var res k8s.Collection for _, o := range p.GetStore().List() { mx := o.(*mv1beta1.PodMetrics) @@ -57,7 +47,8 @@ func (p *PodMetrics) List(ns string) (k8s.Collection, error) { res = append(res, mx) } } - return res, nil + + return res } // Get pod metrics from store. @@ -67,49 +58,29 @@ func (p *PodMetrics) Get(fqn string) (interface{}, error) { return nil, err } if !ok { - return nil, fmt.Errorf("PodMetric for %q not found", fqn) + return nil, fmt.Errorf("No pod metrics for %q found", fqn) } return o, nil } -// SetListener register an event listiner. -func (p *PodMetrics) SetListener(ns string, cb TableListenerFn) {} - -// UnsetListener unregister event listener. -func (p *PodMetrics) UnsetListener(ns string) {} - -// OnAdd notify pod added. -func (p *PodMetrics) OnAdd(obj interface{}) { - po := obj.(*mv1beta1.PodMetrics) - fqn := MetaFQN(po.ObjectMeta) - log.Debug().Msgf("MX Added %s", fqn) -} - -// OnUpdate notify pod updated. -func (p *PodMetrics) OnUpdate(oldObj, newObj interface{}) { - opo, npo := oldObj.(*mv1beta1.PodMetrics), newObj.(*mv1beta1.PodMetrics) - - k1 := MetaFQN(opo.ObjectMeta) - k2 := MetaFQN(npo.ObjectMeta) - log.Debug().Msgf("MX Updated %#v -- %#v", k1, k2) -} - -// OnDelete notify pod was deleted. -func (p *PodMetrics) OnDelete(obj interface{}) { - po := obj.(*mv1beta1.PodMetrics) - key := MetaFQN(po.ObjectMeta) - log.Debug().Msgf("MX Delete %s", key) -} - // NewPodMetricsInformer return an informer to return pod metrix. -func NewPodMetricsInformer(client *versioned.Clientset, ns string, sync time.Duration, idxs cache.Indexers) cache.SharedIndexInformer { - pw := NewPodMxWatcher(client, ns) +func newPodMetricsInformer(client k8s.Connection, ns string, sync time.Duration, idxs cache.Indexers) cache.SharedIndexInformer { + pw := newPodMxWatcher(client, ns) return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - l, err := client.MetricsV1beta1().PodMetricses(ns).List(opts) + c, err := client.MXDial() + if err != nil { + return nil, err + } + + if !client.HasMetrics() { + return nil, errors.New("metrics-server not supported") + } + + l, err := c.MetricsV1beta1().PodMetricses(ns).List(opts) if err == nil { pw.update(l, false) } @@ -126,20 +97,18 @@ func NewPodMetricsInformer(client *versioned.Clientset, ns string, sync time.Dur ) } -const podMXRefresh = 15 * time.Second - // PodMxWatcher tracks pod metrics. -type PodMxWatcher struct { - client *versioned.Clientset +type podMxWatcher struct { + client k8s.Connection ns string cache map[string]runtime.Object eventChan chan watch.Event doneChan chan struct{} } -// NewPodMxWatcher returns a new metrics watcher. -func NewPodMxWatcher(c *versioned.Clientset, ns string) *PodMxWatcher { - return &PodMxWatcher{ +// NewpodMxWatcher returns a new metrics watcher. +func newPodMxWatcher(c k8s.Connection, ns string) *podMxWatcher { + return &podMxWatcher{ client: c, ns: ns, eventChan: make(chan watch.Event), @@ -149,7 +118,7 @@ func NewPodMxWatcher(c *versioned.Clientset, ns string) *PodMxWatcher { } // Run watcher to monitor pod metrics. -func (p *PodMxWatcher) Run() { +func (p *podMxWatcher) Run() { go func() { defer log.Debug().Msg("Podmetrics watcher canceled!") for { @@ -157,7 +126,12 @@ func (p *PodMxWatcher) Run() { case <-p.doneChan: return case <-time.After(podMXRefresh): - list, err := p.client.MetricsV1beta1().PodMetricses(p.ns).List(metav1.ListOptions{}) + c, err := p.client.MXDial() + if err != nil || !p.client.HasMetrics() { + return + } + + list, err := c.MetricsV1beta1().PodMetricses(p.ns).List(metav1.ListOptions{}) if err != nil { log.Error().Err(err).Msg("Fetch pod metrics") } @@ -167,7 +141,7 @@ func (p *PodMxWatcher) Run() { }() } -func (p *PodMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { +func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { fqns := map[string]runtime.Object{} for i := range list.Items { fqn := MetaFQN(list.Items[i].ObjectMeta) @@ -206,18 +180,18 @@ func (p *PodMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { } // Stop the metrics informer. -func (p *PodMxWatcher) Stop() { +func (p *podMxWatcher) Stop() { log.Debug().Msg("Stopping pod watcher!") close(p.doneChan) close(p.eventChan) } // ResultChan retrieves event channel. -func (p *PodMxWatcher) ResultChan() <-chan watch.Event { +func (p *podMxWatcher) ResultChan() <-chan watch.Event { return p.eventChan } -func (p *PodMxWatcher) deltas(m1, m2 *mv1beta1.PodMetrics) bool { +func (p *podMxWatcher) deltas(m1, m2 *mv1beta1.PodMetrics) bool { mm1 := map[string]v1.ResourceList{} for _, co := range m1.Containers { mm1[co.Name] = co.Usage @@ -230,31 +204,12 @@ func (p *PodMxWatcher) deltas(m1, m2 *mv1beta1.PodMetrics) bool { for k2, v2 := range mm2 { v1, ok := mm1[k2] if !ok { - log.Debug().Msgf("Missing container %s", k2) return true } if resourceDiff(v1, v2) { - log.Debug().Msgf("Resources mismatch on container %s", k2) return true } } return false } - -// ---------------------------------------------------------------------------- -// Helpers... - -func resourceDiff(l1, l2 v1.ResourceList) bool { - c1, c2 := l1[v1.ResourceCPU], l2[v1.ResourceCPU] - if c1.Cmp(c2) != 0 { - log.Debug().Msgf("CPU Delta %v vs %v", c1, c2) - return true - } - m1, m2 := l1[v1.ResourceMemory], l2[v1.ResourceMemory] - if m1.Cmp(m2) != 0 { - log.Debug().Msgf("MEM Delta %d vs %d", m1.Value(), m2.Value()) - return true - } - return false -} diff --git a/internal/watch/pod_mx_test.go b/internal/watch/pod_mx_test.go index fcd30fd0..02effdff 100644 --- a/internal/watch/pod_mx_test.go +++ b/internal/watch/pod_mx_test.go @@ -1,14 +1,10 @@ package watch import ( - "strconv" "testing" "github.com/rs/zerolog" "gotest.tools/assert" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) @@ -16,23 +12,21 @@ func init() { zerolog.SetGlobalLevel(zerolog.Disabled) } -func TestMxResourceDiff(t *testing.T) { - uu := map[string]struct { - r1, r2 v1.ResourceList - e bool - }{ - "same": {makeRes("0m", "0Mi"), makeRes("0m", "0Mi"), false}, - "omem": {makeRes("0m", "10Mi"), makeRes("0m", "1Mi"), true}, - "nmem": {makeRes("0m", "0Mi"), makeRes("0m", "1Mi"), true}, - "ocpu": {makeRes("1m", "0Mi"), makeRes("0m", "0Mi"), true}, - "ncpu": {makeRes("1m", "0Mi"), makeRes("2m", "0Mi"), true}, - } +func TestPodMXList(t *testing.T) { + cmo := NewMockConnection() + no := NewPodMetrics(cmo, "") - for k, v := range uu { - t.Run(k, func(t *testing.T) { - assert.Equal(t, v.e, resourceDiff(v.r1, v.r2)) - }) - } + o := no.List("") + assert.Assert(t, len(o) == 0) +} + +func TestPodMXGet(t *testing.T) { + cmo := NewMockConnection() + no := NewPodMetrics(cmo, "") + + o, err := no.Get("") + assert.ErrorContains(t, err, "No pod metrics") + assert.Assert(t, o == nil) } func TestMxDeltas(t *testing.T) { @@ -46,7 +40,7 @@ func TestMxDeltas(t *testing.T) { "dco": {makePodMxCo("p1", "0m", "10Mi", 1), makePodMxCo("p1", "0m", "0Mi", 2), true}, } - var p PodMxWatcher + var p podMxWatcher for k, v := range uu { t.Run(k, func(t *testing.T) { assert.Equal(t, v.e, p.deltas(v.m1, v.m2)) @@ -54,37 +48,10 @@ func TestMxDeltas(t *testing.T) { } } -// ---------------------------------------------------------------------------- -// Helpers... +func TestPodMXRun(t *testing.T) { + cmo := NewMockConnection() + w := newPodMxWatcher(cmo, "") -func makeRes(c, m string) v1.ResourceList { - cpu, _ := resource.ParseQuantity(c) - mem, _ := resource.ParseQuantity(m) - - return v1.ResourceList{ - v1.ResourceCPU: cpu, - v1.ResourceMemory: mem, - } -} - -func makePodMxCo(name, cpu, mem string, co int) *mv1beta1.PodMetrics { - mx := makePodMx(name) - for i := 0; i < co; i++ { - mx.Containers = append( - mx.Containers, - mv1beta1.ContainerMetrics{ - Name: "c" + strconv.Itoa(i), - Usage: makeRes(cpu, mem)}) - } - - return mx -} - -func makePodMx(name string) *mv1beta1.PodMetrics { - return &mv1beta1.PodMetrics{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - }, - } + w.Run() + w.Stop() } diff --git a/internal/watch/pod_test.go b/internal/watch/pod_test.go index 01682a24..e9698bc7 100644 --- a/internal/watch/pod_test.go +++ b/internal/watch/pod_test.go @@ -1,87 +1,80 @@ package watch import ( - "fmt" "testing" - "time" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "gotest.tools/assert" ) -func BenchmarkPodFields(b *testing.B) { - p := NewPod(nil, "") - po := makePod() - ff := make(Row, podCols) +func TestPodList(t *testing.T) { + cmo := NewMockConnection() + no := NewPod(cmo, "") - b.ResetTimer() - b.ReportAllocs() + o := no.List("") + assert.Assert(t, o == nil) +} - for n := 0; n < b.N; n++ { - p.fields("", po, ff) - } +func TestPodGet(t *testing.T) { + cmo := NewMockConnection() + no := NewPod(cmo, "") + + o, err := no.Get("") + assert.ErrorContains(t, err, "not found") + assert.Assert(t, o == nil) } // ---------------------------------------------------------------------------- // Helpers... -func testTime() time.Time { - t, err := time.Parse(time.RFC3339, "2018-12-14T10:36:43.326972-07:00") - if err != nil { - fmt.Println("TestTime Failed", err) - } - return t -} - -func makePod() *v1.Pod { - var i int32 = 1 - var t = v1.HostPathDirectory - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "blee", - Name: "fred", - Labels: map[string]string{"blee": "duh"}, - CreationTimestamp: metav1.Time{Time: testTime()}, - }, - Spec: v1.PodSpec{ - Priority: &i, - PriorityClassName: "bozo", - Containers: []v1.Container{ - { - Name: "fred", - Image: "blee", - Env: []v1.EnvVar{ - { - Name: "fred", - Value: "1", - ValueFrom: &v1.EnvVarSource{ - ConfigMapKeyRef: &v1.ConfigMapKeySelector{Key: "blee"}, - }, - }, - }, - }, - }, - Volumes: []v1.Volume{ - { - Name: "fred", - VolumeSource: v1.VolumeSource{ - HostPath: &v1.HostPathVolumeSource{ - Path: "/blee", - Type: &t, - }, - }, - }, - }, - }, - Status: v1.PodStatus{ - Phase: "Running", - ContainerStatuses: []v1.ContainerStatus{ - { - Name: "fred", - State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, - RestartCount: 0, - }, - }, - }, - } -} +// func makePod() *v1.Pod { +// var i int32 = 1 +// var t = v1.HostPathDirectory +// return &v1.Pod{ +// ObjectMeta: metav1.ObjectMeta{ +// Namespace: "blee", +// Name: "fred", +// Labels: map[string]string{"blee": "duh"}, +// CreationTimestamp: metav1.Time{Time: testTime()}, +// }, +// Spec: v1.PodSpec{ +// Priority: &i, +// PriorityClassName: "bozo", +// Containers: []v1.Container{ +// { +// Name: "fred", +// Image: "blee", +// Env: []v1.EnvVar{ +// { +// Name: "fred", +// Value: "1", +// ValueFrom: &v1.EnvVarSource{ +// ConfigMapKeyRef: &v1.ConfigMapKeySelector{Key: "blee"}, +// }, +// }, +// }, +// }, +// }, +// Volumes: []v1.Volume{ +// { +// Name: "fred", +// VolumeSource: v1.VolumeSource{ +// HostPath: &v1.HostPathVolumeSource{ +// Path: "/blee", +// Type: &t, +// }, +// }, +// }, +// }, +// }, +// Status: v1.PodStatus{ +// Phase: "Running", +// ContainerStatuses: []v1.ContainerStatus{ +// { +// Name: "fred", +// State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, +// RestartCount: 0, +// }, +// }, +// }, +// } +// }