From 2897c1c603ee320b5734d64b6b0d4de931d9418c Mon Sep 17 00:00:00 2001 From: derailed Date: Sun, 5 May 2019 00:07:22 -0600 Subject: [PATCH] added informers --- go.mod | 1 + go.sum | 4 + internal/k8s/generic.go | 19 + internal/k8s/metrics.go | 50 +-- internal/k8s/metrics_test.go | 62 ++-- internal/resource/base.go | 7 + internal/resource/cluster.go | 16 +- internal/resource/cluster_test.go | 27 +- internal/resource/cm_test.go | 2 +- internal/resource/container.go | 44 +-- internal/resource/cr_binding_test.go | 2 +- internal/resource/cr_test.go | 2 +- internal/resource/crd_test.go | 2 +- internal/resource/cronjob_test.go | 2 +- internal/resource/custom_test.go | 2 +- internal/resource/dp_test.go | 2 +- internal/resource/ds_test.go | 2 +- internal/resource/ep_test.go | 2 +- internal/resource/evt_test.go | 2 +- internal/resource/hpa_test.go | 2 +- internal/resource/ing_test.go | 2 +- internal/resource/job_test.go | 2 +- internal/resource/list.go | 82 ++++- internal/resource/mock_metricsserver_test.go | 17 +- internal/resource/no.go | 59 +-- internal/resource/no_int_test.go | 2 +- internal/resource/no_test.go | 7 +- internal/resource/ns_test.go | 2 +- internal/resource/pdb_test.go | 2 +- internal/resource/pod.go | 50 ++- internal/resource/pod_test.go | 7 +- internal/resource/pv_test.go | 2 +- internal/resource/pvc_test.go | 2 +- internal/resource/rc_test.go | 2 +- internal/resource/ro_binding_test.go | 2 +- internal/resource/ro_test.go | 2 +- internal/resource/rs_test.go | 2 +- internal/resource/sa_test.go | 2 +- internal/resource/secret_test.go | 2 +- internal/resource/sts_test.go | 2 +- internal/resource/svc_test.go | 2 +- internal/views/app.go | 27 ++ internal/views/cluster_info.go | 19 +- internal/views/command.go | 10 +- internal/views/container.go | 7 +- internal/views/context.go | 11 +- internal/views/mock_metricsserver.go | 17 +- internal/views/no.go | 5 +- internal/views/pod.go | 18 +- internal/views/registrar.go | 20 +- internal/views/resource.go | 21 +- internal/watch/container.go | 340 ++++++++++++++++++ internal/watch/helpers.go | 220 ++++++++++++ internal/watch/meta.go | 119 +++++++ internal/watch/no.go | 299 ++++++++++++++++ internal/watch/no_mx.go | 211 +++++++++++ internal/watch/no_test.go | 57 +++ internal/watch/pod.go | 356 +++++++++++++++++++ internal/watch/pod_mx.go | 260 ++++++++++++++ internal/watch/pod_mx_test.go | 90 +++++ internal/watch/pod_test.go | 87 +++++ 61 files changed, 2399 insertions(+), 299 deletions(-) create mode 100644 internal/k8s/generic.go create mode 100644 internal/watch/container.go create mode 100644 internal/watch/helpers.go create mode 100644 internal/watch/meta.go create mode 100644 internal/watch/no.go create mode 100644 internal/watch/no_mx.go create mode 100644 internal/watch/no_test.go create mode 100644 internal/watch/pod.go create mode 100644 internal/watch/pod_mx.go create mode 100644 internal/watch/pod_mx_test.go create mode 100644 internal/watch/pod_test.go diff --git a/go.mod b/go.mod index 59d67bf6..88abfacd 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( google.golang.org/appengine v1.5.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.2.2 + gotest.tools v2.2.0+incompatible k8s.io/api v0.0.0-20190425012535-181e1f9c52c1 k8s.io/apiextensions-apiserver v0.0.0-20190426053235-842c4571cde0 // indirect k8s.io/apimachinery v0.0.0-20190425132440-17f84483f500 diff --git a/go.sum b/go.sum index c8c99a46..d491f48f 100644 --- a/go.sum +++ b/go.sum @@ -111,6 +111,7 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= @@ -184,6 +185,7 @@ github.com/petergtz/pegomock v0.0.0-20181206220228-b113d17a7e81/go.mod h1:nuBLWZ github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.3.0 h1:OQIvuDgm00gWVWGTf4m4mCt6W1/0YqU7Ntg0mySWgaI= github.com/pkg/profile v1.3.0/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= @@ -341,6 +343,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= @@ -370,6 +373,7 @@ k8s.io/kubernetes v1.13.5/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk= k8s.io/metrics v0.0.0-20190325194013-29123f6a4aa6 h1:JjAl5n2siv5gPLfvXgSoUkV6tf63/EEtvCuni1zIU8Q= k8s.io/metrics v0.0.0-20190325194013-29123f6a4aa6/go.mod h1:a25VAbm3QT3xiVl1jtoF1ueAKQM149UdZ+L93ePfV3M= k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= +k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 h1:8r+l4bNWjRlsFYlQJnKJ2p7s1YQPj4XyXiJVqDHRx7c= k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= diff --git a/internal/k8s/generic.go b/internal/k8s/generic.go new file mode 100644 index 00000000..fb0c960b --- /dev/null +++ b/internal/k8s/generic.go @@ -0,0 +1,19 @@ +package k8s + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func Get(ns, n, string, kind schema.GroupVersionKind, opts metav1.GetOptions) (runtime.Object, error) { + return nil, nil +} + +func List(ns string, kind schema.GroupVersionKind, opts metav1.ListOptions) (runtime.Object, error) { + return nil, nil +} + +func registrar() map[string]func() { + return map[string]func(){} +} diff --git a/internal/k8s/metrics.go b/internal/k8s/metrics.go index b78b3209..5c1bc60a 100644 --- a/internal/k8s/metrics.go +++ b/internal/k8s/metrics.go @@ -1,9 +1,6 @@ package k8s import ( - "time" - - "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" @@ -52,12 +49,8 @@ func NewMetricsServer(c Connection) *MetricsServer { // NodesMetrics retrieves metrics for a given set of nodes. func (m *MetricsServer) NodesMetrics(nodes Collection, metrics *mv1beta1.NodeMetricsList, mmx NodesMetrics) { - defer func(t time.Time) { - log.Debug().Msgf("Node MX %v", time.Since(t)) - }(time.Now()) - for _, n := range nodes { - no := n.(v1.Node) + no := n.(*v1.Node) mmx[no.Name] = NodeMetrics{ AvailCPU: no.Status.Allocatable.Cpu().MilliValue(), AvailMEM: ToMB(no.Status.Allocatable.Memory().Value()), @@ -76,25 +69,22 @@ func (m *MetricsServer) NodesMetrics(nodes Collection, metrics *mv1beta1.NodeMet } // ClusterLoad retrieves all cluster nodes metrics. -func (m *MetricsServer) ClusterLoad(nodes *v1.NodeList, metrics *mv1beta1.NodeMetricsList, mx *ClusterMetrics) { - defer func(t time.Time) { - log.Debug().Msgf("Cluster Load %v", time.Since(t)) - }(time.Now()) - - nodeMetrics := make(NodesMetrics, len(nodes.Items)) - - for _, n := range nodes.Items { - nodeMetrics[n.Name] = NodeMetrics{ - AvailCPU: n.Status.Allocatable.Cpu().MilliValue(), - AvailMEM: ToMB(n.Status.Allocatable.Memory().Value()), +func (m *MetricsServer) ClusterLoad(nos Collection, nmx Collection, mx *ClusterMetrics) { + nodeMetrics := make(NodesMetrics, len(nos)) + for _, n := range nos { + no := n.(*v1.Node) + nodeMetrics[no.Name] = NodeMetrics{ + AvailCPU: no.Status.Allocatable.Cpu().MilliValue(), + AvailMEM: ToMB(no.Status.Allocatable.Memory().Value()), } } - for _, mx := range metrics.Items { - if m, ok := nodeMetrics[mx.Name]; ok { - m.CurrentCPU = mx.Usage.Cpu().MilliValue() - m.CurrentMEM = ToMB(mx.Usage.Memory().Value()) - nodeMetrics[mx.Name] = m + for _, mx := range nmx { + mxx := mx.(*mv1beta1.NodeMetrics) + if m, ok := nodeMetrics[mxx.Name]; ok { + m.CurrentCPU = mxx.Usage.Cpu().MilliValue() + m.CurrentMEM = ToMB(mxx.Usage.Memory().Value()) + nodeMetrics[mxx.Name] = m } } @@ -111,10 +101,6 @@ func (m *MetricsServer) ClusterLoad(nodes *v1.NodeList, metrics *mv1beta1.NodeMe // FetchNodesMetrics return all metrics for pods in a given namespace. func (m *MetricsServer) FetchNodesMetrics() (*mv1beta1.NodeMetricsList, error) { - defer func(t time.Time) { - log.Debug().Msgf("Node metrics %v", time.Since(t)) - }(time.Now()) - client, err := m.MXDial() if err != nil { return nil, err @@ -125,10 +111,6 @@ func (m *MetricsServer) FetchNodesMetrics() (*mv1beta1.NodeMetricsList, error) { // FetchPodsMetrics return all metrics for pods in a given namespace. func (m *MetricsServer) FetchPodsMetrics(ns string) (*mv1beta1.PodMetricsList, error) { - defer func(t time.Time) { - log.Debug().Msgf("Pod Metrics %v", time.Since(t)) - }(time.Now()) - client, err := m.MXDial() if err != nil { return nil, err @@ -139,10 +121,6 @@ func (m *MetricsServer) FetchPodsMetrics(ns string) (*mv1beta1.PodMetricsList, e // PodsMetrics retrieves metrics for all pods in a given namespace. func (m *MetricsServer) PodsMetrics(pods *mv1beta1.PodMetricsList, mmx PodsMetrics) { - defer func(t time.Time) { - log.Debug().Msgf("Pod MX %v", time.Since(t)) - }(time.Now()) - // Compute all pod's containers metrics. for _, p := range pods.Items { var mx PodMetrics diff --git a/internal/k8s/metrics_test.go b/internal/k8s/metrics_test.go index d04143dd..55cba4f8 100644 --- a/internal/k8s/metrics_test.go +++ b/internal/k8s/metrics_test.go @@ -10,13 +10,14 @@ import ( v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) + func TestPodsMetrics(t *testing.T) { m := NewMetricsServer(nil) metrics := v1beta1.PodMetricsList{ Items: []v1beta1.PodMetrics{ - makeMxPod("p1", "1", "4Gi"), - makeMxPod("p2", "50m", "1Mi"), + *makeMxPod("p1", "1", "4Gi"), + *makeMxPod("p2", "50m", "1Mi"), }, } @@ -35,9 +36,9 @@ func BenchmarkPodsMetrics(b *testing.B) { metrics := v1beta1.PodMetricsList{ Items: []v1beta1.PodMetrics{ - makeMxPod("p1", "1", "4Gi"), - makeMxPod("p2", "50m", "1Mi"), - makeMxPod("p3", "50m", "1Mi"), + *makeMxPod("p1", "1", "4Gi"), + *makeMxPod("p2", "50m", "1Mi"), + *makeMxPod("p3", "50m", "1Mi"), }, } mmx := make(PodsMetrics, 3) @@ -59,8 +60,8 @@ func TestNodesMetrics(t *testing.T) { metrics := v1beta1.NodeMetricsList{ Items: []v1beta1.NodeMetrics{ - makeMxNode("n1", "10", "8Gi"), - makeMxNode("n2", "50m", "1Mi"), + *makeMxNode("n1", "10", "8Gi"), + *makeMxNode("n2", "50m", "1Mi"), }, } @@ -85,8 +86,8 @@ func BenchmarkNodesMetrics(b *testing.B) { metrics := v1beta1.NodeMetricsList{ Items: []v1beta1.NodeMetrics{ - makeMxNode("n1", "50m", "1Mi"), - makeMxNode("n2", "50m", "1Mi"), + *makeMxNode("n1", "50m", "1Mi"), + *makeMxNode("n2", "50m", "1Mi"), }, } @@ -103,39 +104,31 @@ func BenchmarkNodesMetrics(b *testing.B) { func TestClusterLoad(t *testing.T) { m := NewMetricsServer(nil) - nodes := v1.NodeList{ - Items: []v1.Node{ + nodes := Collection{ makeNode("n1", "100m", "4Mi", "50m", "2Mi"), makeNode("n2", "100m", "4Mi", "50m", "2Mi"), - }, } - metrics := v1beta1.NodeMetricsList{ - Items: []v1beta1.NodeMetrics{ + metrics := Collection{ makeMxNode("n1", "50m", "1Mi"), makeMxNode("n2", "50m", "1Mi"), - }, } var mx ClusterMetrics - m.ClusterLoad(&nodes, &metrics, &mx) + m.ClusterLoad(nodes, metrics, &mx) assert.Equal(t, 100.0, mx.PercCPU) assert.Equal(t, 50.0, mx.PercMEM) } func BenchmarkClusterLoad(b *testing.B) { - nodes := v1.NodeList{ - Items: []v1.Node{ - makeNode("n1", "100m", "4Mi", "50m", "2Mi"), - makeNode("n2", "100m", "4Mi", "50m", "2Mi"), - }, + nodes := Collection{ + makeNode("n1", "100m", "4Mi", "50m", "2Mi"), + makeNode("n2", "100m", "4Mi", "50m", "2Mi"), } - metrics := v1beta1.NodeMetricsList{ - Items: []v1beta1.NodeMetrics{ - makeMxNode("n1", "50m", "1Mi"), - makeMxNode("n2", "50m", "1Mi"), - }, + metrics := Collection{ + makeMxNode("n1", "50m", "1Mi"), + makeMxNode("n2", "50m", "1Mi"), } m := NewMetricsServer(nil) @@ -143,12 +136,15 @@ func BenchmarkClusterLoad(b *testing.B) { b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - m.ClusterLoad(&nodes, &metrics, &mx) + m.ClusterLoad(nodes, metrics, &mx) } } -func makeMxPod(name, cpu, mem string) v1beta1.PodMetrics { - return v1beta1.PodMetrics{ +// ---------------------------------------------------------------------------- +// Helpers... + +func makeMxPod(name, cpu, mem string) *v1beta1.PodMetrics { + return &v1beta1.PodMetrics{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: "default", @@ -161,8 +157,8 @@ func makeMxPod(name, cpu, mem string) v1beta1.PodMetrics { } } -func makeNode(name, tcpu, tmem, acpu, amem string) v1.Node { - return v1.Node{ +func makeNode(name, tcpu, tmem, acpu, amem string) *v1.Node { + return &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -173,8 +169,8 @@ func makeNode(name, tcpu, tmem, acpu, amem string) v1.Node { } } -func makeMxNode(name, cpu, mem string) v1beta1.NodeMetrics { - return v1beta1.NodeMetrics{ +func makeMxNode(name, cpu, mem string) *v1beta1.NodeMetrics { + return &v1beta1.NodeMetrics{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, diff --git a/internal/resource/base.go b/internal/resource/base.go index 980ba465..e9ef0056 100644 --- a/internal/resource/base.go +++ b/internal/resource/base.go @@ -12,6 +12,7 @@ import ( "k8s.io/cli-runtime/pkg/genericclioptions/printers" "k8s.io/kubernetes/pkg/kubectl/describe" versioned "k8s.io/kubernetes/pkg/kubectl/describe/versioned" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) type ( @@ -53,6 +54,12 @@ func (b *Base) HasSelectors() bool { return b.Resource.HasSelectors() } +// SetPodMetrics attach pod metrics to resource. +func (b *Base) SetPodMetrics(*mv1beta1.PodMetrics) {} + +// SetNodeMetrics attach node metrics to resource. +func (b *Base) SetNodeMetrics(*mv1beta1.NodeMetrics) {} + // SetFieldSelector refines query results via selector. func (b *Base) SetFieldSelector(s string) { b.Resource.SetFieldSelector(s) diff --git a/internal/resource/cluster.go b/internal/resource/cluster.go index d2da6ff2..f9e69718 100644 --- a/internal/resource/cluster.go +++ b/internal/resource/cluster.go @@ -23,7 +23,7 @@ type ( MetricsServer interface { MetricsService - ClusterLoad(*v1.NodeList, *mv1beta1.NodeMetricsList, *k8s.ClusterMetrics) + ClusterLoad(nodes k8s.Collection, metrics k8s.Collection, cmx *k8s.ClusterMetrics) NodesMetrics(k8s.Collection, *mv1beta1.NodeMetricsList, k8s.NodesMetrics) PodsMetrics(*mv1beta1.PodMetricsList, k8s.PodsMetrics) } @@ -78,16 +78,6 @@ func (c *Cluster) UserName() string { } // Metrics gathers node level metrics and compute utilization percentages. -func (c *Cluster) Metrics(nodes *v1.NodeList, nmx *mv1beta1.NodeMetricsList, mx *k8s.ClusterMetrics) { - c.mx.ClusterLoad(nodes, nmx, mx) -} - -// FetchNodesMetrics fetch all nodes metrics. -func (c *Cluster) FetchNodesMetrics() (*mv1beta1.NodeMetricsList, error) { - return c.mx.FetchNodesMetrics() -} - -// GetNodes fetch all available nodes. -func (c *Cluster) GetNodes() (*v1.NodeList, error) { - return c.api.GetNodes() +func (c *Cluster) Metrics(nos k8s.Collection, nmx k8s.Collection, mx *k8s.ClusterMetrics) { + c.mx.ClusterLoad(nos, nmx, mx) } diff --git a/internal/resource/cluster_test.go b/internal/resource/cluster_test.go index 21e9246c..4aecd03f 100644 --- a/internal/resource/cluster_test.go +++ b/internal/resource/cluster_test.go @@ -8,8 +8,6 @@ import ( "github.com/derailed/k9s/internal/resource" m "github.com/petergtz/pegomock" "github.com/stretchr/testify/assert" - v1 "k8s.io/api/core/v1" - mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) func TestClusterVersion(t *testing.T) { @@ -58,33 +56,10 @@ func TestClusterMetrics(t *testing.T) { mxx := clusterMetric() c := resource.NewClusterWithArgs(mm, mx) - c.Metrics(&v1.NodeList{}, &mv1beta1.NodeMetricsList{}, &mxx) + c.Metrics(k8s.Collection{}, k8s.Collection{}, &mxx) assert.Equal(t, clusterMetric(), mxx) } -func TestClusterGetNodes(t *testing.T) { - mm, mx := NewMockClusterMeta(), NewMockMetricsServer() - m.When(mm.GetNodes()).ThenReturn(&v1.NodeList{Items: []v1.Node{*k8sNode()}}, nil) - - c := resource.NewClusterWithArgs(mm, mx) - nodes, err := c.GetNodes() - - assert.Nil(t, err) - assert.Equal(t, 1, len(nodes.Items)) -} - -func TestClusterFetchNodesMetrics(t *testing.T) { - mm, mx := NewMockClusterMeta(), NewMockMetricsServer() - m.When(mm.GetNodes()).ThenReturn(&v1.NodeList{Items: []v1.Node{*k8sNode()}}, nil) - m.When(mx.FetchNodesMetrics()).ThenReturn(&mv1beta1.NodeMetricsList{Items: []mv1beta1.NodeMetrics{makeMxNode("fred", "100m", "10Mi")}}, nil) - - c := resource.NewClusterWithArgs(mm, mx) - metrics, err := c.FetchNodesMetrics() - - assert.Nil(t, err) - assert.Equal(t, 1, len(metrics.Items)) -} - // Helpers... func TestUsingMocks(t *testing.T) { diff --git a/internal/resource/cm_test.go b/internal/resource/cm_test.go index 5dcb0e41..29e08c6d 100644 --- a/internal/resource/cm_test.go +++ b/internal/resource/cm_test.go @@ -124,7 +124,7 @@ func TestCMListData(t *testing.T) { // Make sure we crn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/container.go b/internal/resource/container.go index 0f4557a5..81b14de3 100644 --- a/internal/resource/container.go +++ b/internal/resource/container.go @@ -10,6 +10,7 @@ import ( "github.com/derailed/k9s/internal/k8s" "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) type ( @@ -19,9 +20,9 @@ type ( pod *v1.Pod isInit bool - instance v1.Container + instance *v1.Container MetricsServer MetricsServer - metrics k8s.PodMetrics + metrics *mv1beta1.PodMetrics mx sync.RWMutex } ) @@ -42,7 +43,7 @@ func NewContainer(c Connection, mx MetricsServer, pod *v1.Pod) *Container { Base: &Base{Connection: c, Resource: k8s.NewPod(c)}, pod: pod, MetricsServer: mx, - metrics: k8s.PodMetrics{}, + metrics: &mv1beta1.PodMetrics{}, } co.Factory = &co @@ -52,19 +53,14 @@ func NewContainer(c Connection, mx MetricsServer, pod *v1.Pod) *Container { // New builds a new Container instance from a k8s resource. func (r *Container) New(i interface{}) Columnar { co := NewContainer(r.Connection, r.MetricsServer, r.pod) - co.instance = i.(v1.Container) + co.instance = i.(*v1.Container) co.path = r.namespacedName(r.pod.ObjectMeta) + ":" + co.instance.Name return co } -// Metrics retrieves cpu/mem resource consumption on associated pod. -func (r *Container) Metrics() k8s.PodMetrics { - return r.metrics -} - -// SetMetrics set the current k8s resource metrics on associated pod. -func (r *Container) SetMetrics(m k8s.PodMetrics) { +// SetPodMetrics set the current k8s resource metrics on associated pod. +func (r *Container) SetPodMetrics(m *mv1beta1.PodMetrics) { r.metrics = m } @@ -175,21 +171,17 @@ func (r *Container) Fields(ns string) Row { ff := make(Row, 0, len(r.Header(ns))) i := r.instance - mxs, _ := r.MetricsServer.FetchPodsMetrics(r.pod.Namespace) - - var cpu, mem string - for _, mx := range mxs.Items { - if mx.Name != r.pod.Name { - continue - } - for _, co := range mx.Containers { - if co.Name != i.Name { - continue + var cpu int64 + var mem float64 + if r.metrics != nil { + for _, co := range r.metrics.Containers { + if co.Name == i.Name { + cpu = co.Usage.Cpu().MilliValue() + mem = k8s.ToMB(co.Usage.Memory().Value()) + break } - cpu, mem = toRes(co.Usage) } } - rcpu, rmem := resources(i) var cs *v1.ContainerStatus @@ -222,8 +214,8 @@ func (r *Container) Fields(ns string) Row { restarts, probe(i.LivenessProbe), probe(i.ReadinessProbe), - cpu, - mem, + ToMillicore(cpu), + ToMi(mem), rcpu, rmem, toAge(r.pod.CreationTimestamp), @@ -259,7 +251,7 @@ func toRes(r v1.ResourceList) (string, string) { return ToMillicore(cpu.MilliValue()), ToMi(k8s.ToMB(mem.Value())) } -func resources(c v1.Container) (cpu, mem string) { +func resources(c *v1.Container) (cpu, mem string) { req, lim := c.Resources.Requests, c.Resources.Limits if len(req) == 0 { diff --git a/internal/resource/cr_binding_test.go b/internal/resource/cr_binding_test.go index 75eb1408..8123fd18 100644 --- a/internal/resource/cr_binding_test.go +++ b/internal/resource/cr_binding_test.go @@ -50,7 +50,7 @@ func TestCRBListData(t *testing.T) { l := NewClusterRoleBindingListWithArgs("-", NewClusterRoleBindingWithArgs(conn, ca)) // Make sure we can get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/cr_test.go b/internal/resource/cr_test.go index c13f274f..99d79219 100644 --- a/internal/resource/cr_test.go +++ b/internal/resource/cr_test.go @@ -70,7 +70,7 @@ func TestCRListData(t *testing.T) { l := NewClusterRoleListWithArgs("-", NewClusterRoleWithArgs(mc, mr)) // Make sure we mcn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/crd_test.go b/internal/resource/crd_test.go index bd4e5851..80df837b 100644 --- a/internal/resource/crd_test.go +++ b/internal/resource/crd_test.go @@ -76,7 +76,7 @@ func TestCRDListData(t *testing.T) { l := NewCRDListWithArgs("-", NewCRDWithArgs(mc, cr)) // Make sure we can get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/cronjob_test.go b/internal/resource/cronjob_test.go index 51f21d6a..91180b83 100644 --- a/internal/resource/cronjob_test.go +++ b/internal/resource/cronjob_test.go @@ -62,7 +62,7 @@ func TestCronJobListData(t *testing.T) { l := NewCronJobListWithArgs("-", NewCronJobWithArgs(mc, mr)) // Make sure we can get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/custom_test.go b/internal/resource/custom_test.go index a2472776..b78e3cfe 100644 --- a/internal/resource/custom_test.go +++ b/internal/resource/custom_test.go @@ -63,7 +63,7 @@ func TestCustomListData(t *testing.T) { l := NewCustomListWithArgs("blee", "fred", NewCustomWithArgs(mc, mr)) // Make sure we can get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/dp_test.go b/internal/resource/dp_test.go index 676d6e69..a78cc88b 100644 --- a/internal/resource/dp_test.go +++ b/internal/resource/dp_test.go @@ -62,7 +62,7 @@ func TestDeploymentListData(t *testing.T) { l := NewDeploymentListWithArgs("-", NewDeploymentWithArgs(mc, mr)) // Make sure we can get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/ds_test.go b/internal/resource/ds_test.go index accfc2d8..f2716564 100644 --- a/internal/resource/ds_test.go +++ b/internal/resource/ds_test.go @@ -61,7 +61,7 @@ func TestDSListData(t *testing.T) { l := NewDaemonSetListWithArgs("blee", NewDaemonSetWithArgs(mc, mr)) // Make sure we can get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/ep_test.go b/internal/resource/ep_test.go index 918b31ee..c1e9ddc7 100644 --- a/internal/resource/ep_test.go +++ b/internal/resource/ep_test.go @@ -59,7 +59,7 @@ func TestEndpointsListData(t *testing.T) { l := NewEndpointsListWithArgs("-", NewEndpointsWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/evt_test.go b/internal/resource/evt_test.go index eb4056fc..b55a33ea 100644 --- a/internal/resource/evt_test.go +++ b/internal/resource/evt_test.go @@ -61,7 +61,7 @@ func TestEventData(t *testing.T) { l := NewEventListWithArgs("blee", NewEventWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/hpa_test.go b/internal/resource/hpa_test.go index 62b3fc39..ba9c350f 100644 --- a/internal/resource/hpa_test.go +++ b/internal/resource/hpa_test.go @@ -63,7 +63,7 @@ func TestHPAListData(t *testing.T) { l := NewHPAListWithArgs("blee", NewHPAWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/ing_test.go b/internal/resource/ing_test.go index 0617abf4..ffd96ed1 100644 --- a/internal/resource/ing_test.go +++ b/internal/resource/ing_test.go @@ -61,7 +61,7 @@ func TestIngressListData(t *testing.T) { l := NewIngressListWithArgs("blee", NewIngressWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/job_test.go b/internal/resource/job_test.go index 1501bbc3..c2eed1fb 100644 --- a/internal/resource/job_test.go +++ b/internal/resource/job_test.go @@ -61,7 +61,7 @@ func TestJobListData(t *testing.T) { l := NewJobListWithArgs("blee", NewJobWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/list.go b/internal/resource/list.go index 4106cac2..ee7a588d 100644 --- a/internal/resource/list.go +++ b/internal/resource/list.go @@ -4,9 +4,13 @@ import ( "reflect" "time" + wa "github.com/derailed/k9s/internal/watch" "github.com/rs/zerolog/log" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/cli-runtime/pkg/genericclioptions" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) const ( @@ -64,7 +68,7 @@ type ( AllNamespaces() bool GetNamespace() string SetNamespace(string) - Reconcile() error + Reconcile(informer *wa.Meta, path *string) error GetName() string Access(flag int) bool GetAccess() int @@ -80,17 +84,19 @@ type ( Fields(ns string) Row ExtFields() Properties Name() string + SetPodMetrics(*mv1beta1.PodMetrics) + SetNodeMetrics(*mv1beta1.NodeMetrics) } + // Columnars a collection of columnars. + Columnars []Columnar + // Row represents a collection of string fields. Row []string // Rows represents a collection of rows. Rows []Row - // Columnars a collection of columnars. - Columnars []Columnar - // Resource represents a tabular Kubernetes resource. Resource interface { New(interface{}) Columnar @@ -217,15 +223,75 @@ func (l *list) Data() TableData { } } +func metaFQN(m metav1.ObjectMeta) string { + if m.Namespace == "" { + return m.Name + } + + return m.Namespace + "/" + m.Name +} + // Reconcile previous vs current state and emits delta events. -func (l *list) Reconcile() error { +func (l *list) Reconcile(m *wa.Meta, path *string) error { defer func(t time.Time) { log.Debug().Msgf("Reconcile %v", time.Since(t)) }(time.Now()) - items, err := l.resource.List(l.namespace) - if err != nil { - return err + ns := l.namespace + if path != nil { + ns = *path + } + + var ( + items Columnars + err error + ) + rr, err := m.List(l.name, ns) + if err == nil { + for _, r := range rr { + var fqn string + var res Columnar + switch o := r.(type) { + case *v1.Node: + fqn = metaFQN(o.ObjectMeta) + res = l.resource.New(r) + nmx, err := m.Get(wa.NodeMXIndex, fqn) + if err != nil { + log.Warn().Err(err).Msg("No node metrics") + } + if mx, ok := nmx.(*mv1beta1.NodeMetrics); ok { + res.SetNodeMetrics(mx) + } + case *v1.Pod: + fqn = metaFQN(o.ObjectMeta) + res = l.resource.New(r) + pmx, err := m.Get(wa.PodMXIndex, fqn) + if err != nil { + log.Warn().Err(err).Msg("No pod metrics") + } + if mx, ok := pmx.(*mv1beta1.PodMetrics); ok { + res.SetPodMetrics(mx) + } + case *v1.Container: + log.Debug().Msgf("Got container %s", ns) + fqn = ns + res = l.resource.New(r) + pmx, err := m.Get(wa.PodMXIndex, fqn) + if err != nil { + log.Warn().Err(err).Msg("No pod metrics") + } + if mx, ok := pmx.(*mv1beta1.PodMetrics); ok { + res.SetPodMetrics(mx) + } + } + items = append(items, res) + } + } else { + log.Debug().Msg("Standard load") + items, err = l.resource.List(l.namespace) + if err != nil { + return err + } } if len(l.cache) == 0 { diff --git a/internal/resource/mock_metricsserver_test.go b/internal/resource/mock_metricsserver_test.go index b1a8e0c4..4c170394 100644 --- a/internal/resource/mock_metricsserver_test.go +++ b/internal/resource/mock_metricsserver_test.go @@ -6,7 +6,6 @@ package resource_test import ( k8s "github.com/derailed/k9s/internal/k8s" pegomock "github.com/petergtz/pegomock" - v1 "k8s.io/api/core/v1" v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" "reflect" "time" @@ -20,7 +19,7 @@ func NewMockMetricsServer() *MockMetricsServer { return &MockMetricsServer{fail: pegomock.GlobalFailHandler} } -func (mock *MockMetricsServer) ClusterLoad(_param0 *v1.NodeList, _param1 *v1beta1.NodeMetricsList, _param2 *k8s.ClusterMetrics) { +func (mock *MockMetricsServer) ClusterLoad(_param0 k8s.Collection, _param1 k8s.Collection, _param2 *k8s.ClusterMetrics) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockMetricsServer().") } @@ -134,7 +133,7 @@ type VerifierMetricsServer struct { timeout time.Duration } -func (verifier *VerifierMetricsServer) ClusterLoad(_param0 *v1.NodeList, _param1 *v1beta1.NodeMetricsList, _param2 *k8s.ClusterMetrics) *MetricsServer_ClusterLoad_OngoingVerification { +func (verifier *VerifierMetricsServer) ClusterLoad(_param0 k8s.Collection, _param1 k8s.Collection, _param2 *k8s.ClusterMetrics) *MetricsServer_ClusterLoad_OngoingVerification { params := []pegomock.Param{_param0, _param1, _param2} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "ClusterLoad", params, verifier.timeout) return &MetricsServer_ClusterLoad_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} @@ -145,21 +144,21 @@ type MetricsServer_ClusterLoad_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MetricsServer_ClusterLoad_OngoingVerification) GetCapturedArguments() (*v1.NodeList, *v1beta1.NodeMetricsList, *k8s.ClusterMetrics) { +func (c *MetricsServer_ClusterLoad_OngoingVerification) GetCapturedArguments() (k8s.Collection, k8s.Collection, *k8s.ClusterMetrics) { _param0, _param1, _param2 := c.GetAllCapturedArguments() return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] } -func (c *MetricsServer_ClusterLoad_OngoingVerification) GetAllCapturedArguments() (_param0 []*v1.NodeList, _param1 []*v1beta1.NodeMetricsList, _param2 []*k8s.ClusterMetrics) { +func (c *MetricsServer_ClusterLoad_OngoingVerification) GetAllCapturedArguments() (_param0 []k8s.Collection, _param1 []k8s.Collection, _param2 []*k8s.ClusterMetrics) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]*v1.NodeList, len(params[0])) + _param0 = make([]k8s.Collection, len(params[0])) for u, param := range params[0] { - _param0[u] = param.(*v1.NodeList) + _param0[u] = param.(k8s.Collection) } - _param1 = make([]*v1beta1.NodeMetricsList, len(params[1])) + _param1 = make([]k8s.Collection, len(params[1])) for u, param := range params[1] { - _param1[u] = param.(*v1beta1.NodeMetricsList) + _param1[u] = param.(k8s.Collection) } _param2 = make([]*k8s.ClusterMetrics, len(params[2])) for u, param := range params[2] { diff --git a/internal/resource/no.go b/internal/resource/no.go index da9d5903..d5fdd6c2 100644 --- a/internal/resource/no.go +++ b/internal/resource/no.go @@ -7,6 +7,7 @@ import ( "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) const ( @@ -17,24 +18,28 @@ const ( // Node tracks a kubernetes resource. type Node struct { *Base - instance *v1.Node - MetricsServer MetricsServer - metrics k8s.NodeMetrics + instance *v1.Node + metrics *mv1beta1.NodeMetrics } // NewNodeList returns a new resource list. -func NewNodeList(c Connection, mx MetricsServer, ns string) List { +func NewNodeList(c Connection, _ string) List { return NewList( NotNamespaced, "no", - NewNode(c, mx), + NewNode(c), ViewAccess|DescribeAccess, ) } // NewNode instantiates a new Node. -func NewNode(c Connection, mx MetricsServer) *Node { - n := &Node{&Base{Connection: c, Resource: k8s.NewNode(c)}, nil, mx, k8s.NodeMetrics{}} +func NewNode(c Connection) *Node { + n := &Node{ + Base: &Base{ + Connection: c, + Resource: k8s.NewNode(c), + }, + } n.Factory = n return n @@ -42,7 +47,7 @@ func NewNode(c Connection, mx MetricsServer) *Node { // New builds a new Node instance from a k8s resource. func (r *Node) New(i interface{}) Columnar { - c := NewNode(r.Connection, r.MetricsServer) + c := NewNode(r.Connection) switch instance := i.(type) { case *v1.Node: c.instance = instance @@ -56,6 +61,11 @@ func (r *Node) New(i interface{}) Columnar { return c } +// SetNodeMetrics set the current k8s resource metrics on a given node. +func (r *Node) SetNodeMetrics(m *mv1beta1.NodeMetrics) { + r.metrics = m +} + // List all resources for a given namespace. func (r *Node) List(ns string) (Columnars, error) { nn, err := r.Resource.List(ns) @@ -63,17 +73,10 @@ func (r *Node) List(ns string) (Columnars, error) { return nil, err } - mx := make(k8s.NodesMetrics, len(nn)) - if r.MetricsServer.HasMetrics() { - nmx, _ := r.MetricsServer.FetchNodesMetrics() - r.MetricsServer.NodesMetrics(nn, nmx, mx) - } - cc := make(Columnars, 0, len(nn)) for i := range nn { node := nn[i].(v1.Node) no := r.New(&node).(*Node) - no.metrics = mx[node.Name] cc = append(cc, no) } @@ -124,6 +127,18 @@ func (r *Node) Fields(ns string) Row { iIP, eIP := r.getIPs(i.Status.Addresses) iIP, eIP = missing(iIP), missing(eIP) + var ( + cpu int64 + mem float64 + ) + if r.metrics != nil { + cpu = r.metrics.Usage.Cpu().MilliValue() + mem = k8s.ToMB(r.metrics.Usage.Memory().Value()) + } + + acpu := i.Status.Allocatable.Cpu().MilliValue() + amem := k8s.ToMB(i.Status.Allocatable.Memory().Value()) + // reqs, _, err := r.podsResources(i.Name) // if err != nil { // if !errors.IsForbidden(err) { @@ -144,18 +159,12 @@ func (r *Node) Fields(ns string) Row { i.Status.NodeInfo.KernelVersion, iIP, eIP, - withPerc( - ToMillicore(r.metrics.CurrentCPU), - AsPerc(toPerc(float64(r.metrics.CurrentCPU), float64(r.metrics.AvailCPU))), - ), - withPerc( - ToMi(r.metrics.CurrentMEM), - AsPerc(toPerc(r.metrics.CurrentMEM, r.metrics.AvailMEM)), - ), + withPerc(ToMillicore(cpu), AsPerc(toPerc(float64(cpu), float64(acpu)))), + withPerc(ToMi(mem), AsPerc(toPerc(mem, amem))), // withPerc(rcpu.String(), AsPerc(pcpur)), // withPerc(rmem.String(), AsPerc(pmemr)), - ToMillicore(r.metrics.AvailCPU), - ToMi(r.metrics.AvailMEM), + ToMillicore(cpu), + ToMi(mem), toAge(i.ObjectMeta.CreationTimestamp), ) } diff --git a/internal/resource/no_int_test.go b/internal/resource/no_int_test.go index 7554fb08..7e6b40d3 100644 --- a/internal/resource/no_int_test.go +++ b/internal/resource/no_int_test.go @@ -25,7 +25,7 @@ func TestNodeStatus(t *testing.T) { }, } - no := NewNode(nil, nil) + no := NewNode(nil) for _, u := range uu { cond := no.status(u.s, false) assert.Equal(t, "Ready", cond) diff --git a/internal/resource/no_test.go b/internal/resource/no_test.go index 5637ec31..d2fc3527 100644 --- a/internal/resource/no_test.go +++ b/internal/resource/no_test.go @@ -19,7 +19,7 @@ func NewNodeListWithArgs(ns string, r *resource.Node) resource.List { } func NewNodeWithArgs(conn k8s.Connection, res resource.Cruder, mx resource.MetricsServer) *resource.Node { - r := &resource.Node{Base: resource.NewBase(conn, res), MetricsServer: mx} + r := &resource.Node{Base: resource.NewBase(conn, res)} r.Factory = r return r } @@ -71,7 +71,7 @@ func TestNodeListData(t *testing.T) { l := NewNodeListWithArgs("-", NewNodeWithArgs(mc, mr, mx)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } @@ -127,8 +127,7 @@ func makeRes(c, m string) v1.ResourceList { func newNode() resource.Columnar { mc := NewMockConnection() - mx := NewMockMetricsServer() - return resource.NewNode(mc, mx).New(k8sNode()) + return resource.NewNode(mc).New(k8sNode()) } func noYaml() string { diff --git a/internal/resource/ns_test.go b/internal/resource/ns_test.go index 9280e582..df711583 100644 --- a/internal/resource/ns_test.go +++ b/internal/resource/ns_test.go @@ -62,7 +62,7 @@ func TestNamespaceListData(t *testing.T) { l := NewNamespaceListWithArgs("-", NewNamespaceWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/pdb_test.go b/internal/resource/pdb_test.go index 22b6c455..9554dda1 100644 --- a/internal/resource/pdb_test.go +++ b/internal/resource/pdb_test.go @@ -61,7 +61,7 @@ func TestPDBListData(t *testing.T) { l := NewPDBListWithArgs("blee", NewPDBWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/pod.go b/internal/resource/pod.go index 0b766504..a0df5001 100644 --- a/internal/resource/pod.go +++ b/internal/resource/pod.go @@ -11,6 +11,7 @@ import ( "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/util/node" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ) const ( @@ -37,29 +38,26 @@ type ( // Pod that can be displayed in a table and interacted with. Pod struct { *Base - instance *v1.Pod - MetricsServer MetricsServer - metrics k8s.PodMetrics - mx sync.RWMutex + instance *v1.Pod + metrics *mv1beta1.PodMetrics + mx sync.RWMutex } ) // NewPodList returns a new resource list. -func NewPodList(c Connection, mx MetricsServer, ns string) List { +func NewPodList(c Connection, ns string) List { return NewList( ns, "po", - NewPod(c, mx), + NewPod(c), AllVerbsAccess|DescribeAccess, ) } // NewPod instantiates a new Pod. -func NewPod(c Connection, mx MetricsServer) *Pod { +func NewPod(c Connection) *Pod { p := &Pod{ - Base: &Base{Connection: c, Resource: k8s.NewPod(c)}, - MetricsServer: mx, - metrics: k8s.PodMetrics{}, + Base: &Base{Connection: c, Resource: k8s.NewPod(c)}, } p.Factory = p @@ -68,7 +66,7 @@ func NewPod(c Connection, mx MetricsServer) *Pod { // New builds a new Pod instance from a k8s resource. func (r *Pod) New(i interface{}) Columnar { - c := NewPod(r.Connection, r.MetricsServer) + c := NewPod(r.Connection) switch instance := i.(type) { case *v1.Pod: c.instance = instance @@ -86,13 +84,8 @@ func (r *Pod) New(i interface{}) Columnar { return c } -// Metrics retrieves cpu/mem resource consumption on a pod. -func (r *Pod) Metrics() k8s.PodMetrics { - return r.metrics -} - -// SetMetrics set the current k8s resource metrics on a given pod. -func (r *Pod) SetMetrics(m k8s.PodMetrics) { +// SetPodMetrics set the current k8s resource metrics on a given pod. +func (r *Pod) SetPodMetrics(m *mv1beta1.PodMetrics) { r.metrics = m } @@ -103,7 +96,6 @@ func (r *Pod) Marshal(path string) (string, error) { if err != nil { return "", err } - po := i.(*v1.Pod) po.TypeMeta.APIVersion = "v1" po.TypeMeta.Kind = "Pod" @@ -188,16 +180,9 @@ func (r *Pod) List(ns string) (Columnars, error) { return nil, err } - mx := make(k8s.PodsMetrics, len(pods)) - if r.MetricsServer.HasMetrics() { - pmx, _ := r.MetricsServer.FetchPodsMetrics(ns) - r.MetricsServer.PodsMetrics(pmx, mx) - } - cc := make(Columnars, 0, len(pods)) for i := range pods { po := r.New(&pods[i]).(*Pod) - po.metrics = mx[po.Name()] cc = append(cc, po) } @@ -236,13 +221,22 @@ func (r *Pod) Fields(ns string) Row { ss := i.Status.ContainerStatuses cr, _, rc := r.statuses(ss) + var cpu int64 + var mem float64 + if r.metrics != nil { + for _, c := range r.metrics.Containers { + cpu += c.Usage.Cpu().MilliValue() + mem += k8s.ToMB(c.Usage.Memory().Value()) + } + } + return append(ff, i.ObjectMeta.Name, strconv.Itoa(cr)+"/"+strconv.Itoa(len(ss)), r.phase(i), strconv.Itoa(rc), - ToMillicore(r.metrics.CurrentCPU), - ToMi(r.metrics.CurrentMEM), + ToMillicore(cpu), + ToMi(mem), i.Status.PodIP, i.Spec.NodeName, r.mapQOS(i.Status.QOSClass), diff --git a/internal/resource/pod_test.go b/internal/resource/pod_test.go index 81cfea6c..45b72ecd 100644 --- a/internal/resource/pod_test.go +++ b/internal/resource/pod_test.go @@ -18,7 +18,7 @@ func NewPodListWithArgs(ns string, r *resource.Pod) resource.List { } func NewPodWithArgs(conn k8s.Connection, res resource.Cruder, mx resource.MetricsServer) *resource.Pod { - r := &resource.Pod{Base: resource.NewBase(conn, res), MetricsServer: mx} + r := &resource.Pod{Base: resource.NewBase(conn, res)} r.Factory = r return r } @@ -70,7 +70,7 @@ func TestPodListData(t *testing.T) { l := NewPodListWithArgs("blee", NewPodWithArgs(mc, mr, mx)) // Make sure we mcn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } @@ -143,8 +143,7 @@ func k8sPod() *v1.Pod { func newPod() resource.Columnar { mc := NewMockConnection() - mx := NewMockMetricsServer() - return resource.NewPod(mc, mx).New(k8sPod()) + return resource.NewPod(mc).New(k8sPod()) } func poYaml() string { diff --git a/internal/resource/pv_test.go b/internal/resource/pv_test.go index 7bba6b9a..e794c8d4 100644 --- a/internal/resource/pv_test.go +++ b/internal/resource/pv_test.go @@ -61,7 +61,7 @@ func TestPVListData(t *testing.T) { l := NewPVListWithArgs("-", NewPVWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/pvc_test.go b/internal/resource/pvc_test.go index cefd3210..b9aa35c4 100644 --- a/internal/resource/pvc_test.go +++ b/internal/resource/pvc_test.go @@ -62,7 +62,7 @@ func TestPVCListData(t *testing.T) { l := NewPVCListWithArgs("blee", NewPVCWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/rc_test.go b/internal/resource/rc_test.go index 17a67bf7..80569e1a 100644 --- a/internal/resource/rc_test.go +++ b/internal/resource/rc_test.go @@ -62,7 +62,7 @@ func TestRCListData(t *testing.T) { l := NewRCListWithArgs("blee", NewRCWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/ro_binding_test.go b/internal/resource/ro_binding_test.go index d3bb82a9..0774776b 100644 --- a/internal/resource/ro_binding_test.go +++ b/internal/resource/ro_binding_test.go @@ -42,7 +42,7 @@ func TestRBListData(t *testing.T) { l := NewRBListWithArgs("blee", NewRBWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/ro_test.go b/internal/resource/ro_test.go index 82bef2bd..ae161553 100644 --- a/internal/resource/ro_test.go +++ b/internal/resource/ro_test.go @@ -41,7 +41,7 @@ func TestRoleListData(t *testing.T) { l := NewRoleListWithArgs("blee", NewRoleWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/rs_test.go b/internal/resource/rs_test.go index 486c509e..140c52dc 100644 --- a/internal/resource/rs_test.go +++ b/internal/resource/rs_test.go @@ -41,7 +41,7 @@ func TestReplicaSetListData(t *testing.T) { l := NewReplicaSetListWithArgs("blee", NewReplicaSetWithArgs(mc, mr)) // Make sure we can get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/sa_test.go b/internal/resource/sa_test.go index 21396eeb..2b99c455 100644 --- a/internal/resource/sa_test.go +++ b/internal/resource/sa_test.go @@ -78,7 +78,7 @@ func TestSAListData(t *testing.T) { l := NewServiceAccountListWithArgs("blee", NewServiceAccountWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/secret_test.go b/internal/resource/secret_test.go index ea90273b..3da743c5 100644 --- a/internal/resource/secret_test.go +++ b/internal/resource/secret_test.go @@ -130,7 +130,7 @@ func TestSecretListData(t *testing.T) { l := NewSecretListWithArgs("blee", NewSecretWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/sts_test.go b/internal/resource/sts_test.go index 156053ab..a0c535b4 100644 --- a/internal/resource/sts_test.go +++ b/internal/resource/sts_test.go @@ -79,7 +79,7 @@ func TestSTSListData(t *testing.T) { l := NewStatefulSetListWithArgs("blee", NewStatefulSetWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/resource/svc_test.go b/internal/resource/svc_test.go index 517abccc..4ec42e78 100644 --- a/internal/resource/svc_test.go +++ b/internal/resource/svc_test.go @@ -89,7 +89,7 @@ func TestSVCListData(t *testing.T) { l := NewServiceListWithArgs("blee", NewServiceWithArgs(mc, mr)) // Make sure we mrn get deltas! for i := 0; i < 2; i++ { - err := l.Reconcile() + err := l.Reconcile(nil, nil) assert.Nil(t, err) } diff --git a/internal/views/app.go b/internal/views/app.go index ba21a758..040522da 100644 --- a/internal/views/app.go +++ b/internal/views/app.go @@ -7,6 +7,7 @@ import ( "github.com/derailed/k9s/internal/config" "github.com/derailed/k9s/internal/k8s" + "github.com/derailed/k9s/internal/watch" "github.com/derailed/tview" "github.com/fsnotify/fsnotify" "github.com/gdamore/tcell" @@ -65,6 +66,8 @@ type ( cmdBuff *cmdBuff cmdView *cmdView actions keyActions + stopCh chan struct{} + informer *watch.Meta } ) @@ -104,6 +107,7 @@ func NewApp(cfg *config.Config) *appView { func (a *appView) Init(v string, rate int, flags *genericclioptions.ConfigFlags) { a.version = v a.flags = flags + a.startInformer() a.clusterInfoView.init() a.cmdBuff.addListener(a.cmdView) @@ -130,6 +134,29 @@ func (a *appView) Init(v string, rate int, flags *genericclioptions.ConfigFlags) a.SetRoot(a.pages, true) } +func (a *appView) startInformer() { + if a.stopCh != nil { + close(a.stopCh) + } + + a.stopCh = make(chan struct{}) + ns := "" + if a.flags.Namespace != nil { + ns = *a.flags.Namespace + } + a.informer = watch.NewMeta(a.conn(), ns) + log.Debug().Msgf(">> Starting Watcher") + a.informer.Run(a.stopCh) +} + +func (a *appView) bailOut() { + if a.stopCh != nil { + log.Debug().Msg("<<<< Stopping Watcher") + close(a.stopCh) + } + a.Stop() +} + func (a *appView) conn() k8s.Connection { return a.config.GetConnection() } diff --git a/internal/views/cluster_info.go b/internal/views/cluster_info.go index f961359a..2a13eb95 100644 --- a/internal/views/cluster_info.go +++ b/internal/views/cluster_info.go @@ -2,10 +2,12 @@ package views import ( "strings" + "time" "github.com/derailed/k9s/internal/config" "github.com/derailed/k9s/internal/k8s" "github.com/derailed/k9s/internal/resource" + "github.com/derailed/k9s/internal/watch" "github.com/derailed/tview" "github.com/gdamore/tcell" "github.com/rs/zerolog/log" @@ -90,6 +92,10 @@ func (v *clusterInfoView) infoCell(t string) *tview.TableCell { } func (v *clusterInfoView) refresh() { + defer func(t time.Time) { + log.Debug().Msgf("Cluster Refresh %v", time.Since(t)) + }(time.Now()) + cluster := resource.NewCluster(v.app.conn(), &log.Logger, v.mxs) var row int @@ -102,26 +108,25 @@ func (v *clusterInfoView) refresh() { v.GetCell(row, 1).SetText(cluster.Version()) row++ - nodes, err := cluster.GetNodes() + nos, err := v.app.informer.List(watch.NodeIndex, "") if err != nil { log.Warn().Err(err).Msg("List nodes") return } - mxNodes, err := cluster.FetchNodesMetrics() + nmx, err := v.app.informer.List(watch.NodeMXIndex, "") if err != nil { log.Warn().Err(err).Msg("List node metrics") return } - - var mx k8s.ClusterMetrics - cluster.Metrics(nodes, mxNodes, &mx) + var cmx k8s.ClusterMetrics + cluster.Metrics(nos, nmx, &cmx) c := v.GetCell(row, 1) - cpu := resource.AsPerc(mx.PercCPU) + cpu := resource.AsPerc(cmx.PercCPU) c.SetText(cpu + deltas(strip(c.Text), cpu)) row++ c = v.GetCell(row, 1) - mem := resource.AsPerc(mx.PercMEM) + mem := resource.AsPerc(cmx.PercMEM) c.SetText(mem + deltas(strip(c.Text), mem)) } diff --git a/internal/views/command.go b/internal/views/command.go index 4ee0cb43..94bf9d35 100644 --- a/internal/views/command.go +++ b/internal/views/command.go @@ -4,7 +4,6 @@ import ( "fmt" "regexp" - "github.com/derailed/k9s/internal/k8s" "github.com/derailed/k9s/internal/resource" "github.com/rs/zerolog/log" ) @@ -54,7 +53,7 @@ func (c *command) run(cmd string) bool { var v resourceViewer switch { case cmd == "q", cmd == "quit": - c.app.Stop() + c.app.bailOut() return true case cmd == "?", cmd == "help": c.app.inject(newHelpView(c.app)) @@ -71,12 +70,7 @@ func (c *command) run(cmd string) bool { default: if res, ok := resourceViews(c.app.conn())[cmd]; ok { var r resource.List - if res.listMxFn != nil { - r = res.listMxFn(c.app.conn(), - k8s.NewMetricsServer(c.app.conn()), - resource.DefaultNamespace, - ) - } else if res.listFn != nil { + if res.listFn != nil { r = res.listFn(c.app.conn(), resource.DefaultNamespace) } v = res.viewFn(res.title, c.app, r) diff --git a/internal/views/container.go b/internal/views/container.go index 3ba4f40d..c586424a 100644 --- a/internal/views/container.go +++ b/internal/views/container.go @@ -10,13 +10,12 @@ type containerView struct { *resourceView current igniter - path string } func newContainerView(t string, app *appView, list resource.List, path string) resourceViewer { v := containerView{resourceView: newResourceView(t, app, list).(*resourceView)} { - v.path = path + v.path = &path v.extraActionsFn = v.extraActions v.current = app.content.GetPrimitive("main").(igniter) } @@ -42,7 +41,7 @@ func (v *containerView) getList() resource.List { } func (v *containerView) getSelection() string { - return v.path + return *v.path } // Handlers... @@ -76,7 +75,7 @@ func (v *containerView) shellCmd(evt *tcell.EventKey) *tcell.EventKey { return evt } log.Debug().Msgf("Selected %s", v.selectedItem) - v.shellIn(v.path, v.selectedItem) + v.shellIn(*v.path, v.selectedItem) return nil } diff --git a/internal/views/context.go b/internal/views/context.go index 092e08bc..71d23888 100644 --- a/internal/views/context.go +++ b/internal/views/context.go @@ -34,11 +34,6 @@ func (v *contextView) useCmd(evt *tcell.EventKey) *tcell.EventKey { return evt } - // Update cluster info on context switch. - v.app.QueueUpdateDraw(func() { - v.app.clusterInfoView.refresh() - }) - v.app.gotoResource("po", true) return nil @@ -61,6 +56,12 @@ func (v *contextView) useContext(name string) error { return err } + v.app.startInformer() + // Update cluster info on context switch. + v.app.QueueUpdateDraw(func() { + v.app.clusterInfoView.refresh() + }) + v.app.config.Reset() v.app.config.Save() v.app.flash(flashInfo, "Switching context to", ctx) diff --git a/internal/views/mock_metricsserver.go b/internal/views/mock_metricsserver.go index db21c774..d5700512 100644 --- a/internal/views/mock_metricsserver.go +++ b/internal/views/mock_metricsserver.go @@ -6,7 +6,6 @@ package views import ( k8s "github.com/derailed/k9s/internal/k8s" pegomock "github.com/petergtz/pegomock" - v1 "k8s.io/api/core/v1" v1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" "reflect" "time" @@ -20,7 +19,7 @@ func NewMockMetricsServer() *MockMetricsServer { return &MockMetricsServer{fail: pegomock.GlobalFailHandler} } -func (mock *MockMetricsServer) ClusterLoad(_param0 *v1.NodeList, _param1 *v1beta1.NodeMetricsList, _param2 *k8s.ClusterMetrics) { +func (mock *MockMetricsServer) ClusterLoad(_param0 k8s.Collection, _param1 k8s.Collection, _param2 *k8s.ClusterMetrics) { if mock == nil { panic("mock must not be nil. Use myMock := NewMockMetricsServer().") } @@ -134,7 +133,7 @@ type VerifierMetricsServer struct { timeout time.Duration } -func (verifier *VerifierMetricsServer) ClusterLoad(_param0 *v1.NodeList, _param1 *v1beta1.NodeMetricsList, _param2 *k8s.ClusterMetrics) *MetricsServer_ClusterLoad_OngoingVerification { +func (verifier *VerifierMetricsServer) ClusterLoad(_param0 k8s.Collection, _param1 k8s.Collection, _param2 *k8s.ClusterMetrics) *MetricsServer_ClusterLoad_OngoingVerification { params := []pegomock.Param{_param0, _param1, _param2} methodInvocations := pegomock.GetGenericMockFrom(verifier.mock).Verify(verifier.inOrderContext, verifier.invocationCountMatcher, "ClusterLoad", params, verifier.timeout) return &MetricsServer_ClusterLoad_OngoingVerification{mock: verifier.mock, methodInvocations: methodInvocations} @@ -145,21 +144,21 @@ type MetricsServer_ClusterLoad_OngoingVerification struct { methodInvocations []pegomock.MethodInvocation } -func (c *MetricsServer_ClusterLoad_OngoingVerification) GetCapturedArguments() (*v1.NodeList, *v1beta1.NodeMetricsList, *k8s.ClusterMetrics) { +func (c *MetricsServer_ClusterLoad_OngoingVerification) GetCapturedArguments() (k8s.Collection, k8s.Collection, *k8s.ClusterMetrics) { _param0, _param1, _param2 := c.GetAllCapturedArguments() return _param0[len(_param0)-1], _param1[len(_param1)-1], _param2[len(_param2)-1] } -func (c *MetricsServer_ClusterLoad_OngoingVerification) GetAllCapturedArguments() (_param0 []*v1.NodeList, _param1 []*v1beta1.NodeMetricsList, _param2 []*k8s.ClusterMetrics) { +func (c *MetricsServer_ClusterLoad_OngoingVerification) GetAllCapturedArguments() (_param0 []k8s.Collection, _param1 []k8s.Collection, _param2 []*k8s.ClusterMetrics) { params := pegomock.GetGenericMockFrom(c.mock).GetInvocationParams(c.methodInvocations) if len(params) > 0 { - _param0 = make([]*v1.NodeList, len(params[0])) + _param0 = make([]k8s.Collection, len(params[0])) for u, param := range params[0] { - _param0[u] = param.(*v1.NodeList) + _param0[u] = param.(k8s.Collection) } - _param1 = make([]*v1beta1.NodeMetricsList, len(params[1])) + _param1 = make([]k8s.Collection, len(params[1])) for u, param := range params[1] { - _param1[u] = param.(*v1beta1.NodeMetricsList) + _param1[u] = param.(k8s.Collection) } _param2 = make([]*k8s.ClusterMetrics, len(params[2])) for u, param := range params[2] { diff --git a/internal/views/no.go b/internal/views/no.go index 180a6837..d852e2fb 100644 --- a/internal/views/no.go +++ b/internal/views/no.go @@ -1,7 +1,6 @@ package views import ( - "github.com/derailed/k9s/internal/k8s" "github.com/derailed/k9s/internal/resource" "github.com/gdamore/tcell" ) @@ -53,9 +52,7 @@ func (v *nodeView) backCmd(evt *tcell.EventKey) *tcell.EventKey { } func showPods(app *appView, ns, res, selected, labelSel, fieldSel string, b actionHandler) { - mx := k8s.NewMetricsServer(app.conn()) - list := resource.NewPodList(app.conn(), mx, ns) - + list := resource.NewPodList(app.conn(), ns) list.SetLabelSelector(labelSel) list.SetFieldSelector(fieldSel) diff --git a/internal/views/pod.go b/internal/views/pod.go index 59750622..1a0e13ea 100644 --- a/internal/views/pod.go +++ b/internal/views/pod.go @@ -6,9 +6,10 @@ import ( "github.com/derailed/k9s/internal/k8s" "github.com/derailed/k9s/internal/resource" + "github.com/derailed/k9s/internal/watch" "github.com/gdamore/tcell" "github.com/rs/zerolog/log" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/api/core/v1" ) const containerFmt = "[fg:bg:b]%s([hilite:bg:b]%s[fg:bg:-])" @@ -52,25 +53,28 @@ func (v *podView) listContainers(app *appView, _, res, sel string) { } log.Debug().Msgf("Selected %s", sel) - ns, n := namespaced(sel) - po, err := app.conn().DialOrDie().CoreV1().Pods(ns).Get(n, metav1.GetOptions{}) + // ns, n := namespaced(sel) + po, err := v.app.informer.Get(watch.PodIndex, sel) + // po, err := app.conn().DialOrDie().CoreV1().Pods(ns).Get(n, metav1.GetOptions{}) if err != nil { log.Error().Err(err).Msgf("Unable to retrieve pod %s", sel) app.flash(flashErr, err.Error()) return } + pod := po.(*v1.Pod) mx := k8s.NewMetricsServer(app.conn()) - list := resource.NewContainerList(app.conn(), mx, po) + list := resource.NewContainerList(app.conn(), mx, pod) + log.Debug().Msgf(">>>> Got pod %s", pod.Name) fmat := strings.Replace(containerFmt, "[fg:bg", "["+v.app.styles.Style.Title.FgColor+":"+v.app.styles.Style.Title.BgColor, -1) fmat = strings.Replace(fmat, "[hilite", "["+v.app.styles.Style.Title.CounterColor, 1) - + title := fmt.Sprintf(fmat, "Containers", sel) app.inject(newContainerView( - fmt.Sprintf(fmat, "Containers", sel), + title, app, list, - namespacedName(po.Namespace, po.Name), + namespacedName(pod.Namespace, pod.Name), )) } diff --git a/internal/views/registrar.go b/internal/views/registrar.go index 570def4c..d840c308 100644 --- a/internal/views/registrar.go +++ b/internal/views/registrar.go @@ -9,19 +9,19 @@ import ( ) type ( - viewFn func(ns string, app *appView, list resource.List) resourceViewer - listFn func(c resource.Connection, ns string) resource.List - listMxFn func(c resource.Connection, mx resource.MetricsServer, ns string) resource.List + viewFn func(ns string, app *appView, list resource.List) resourceViewer + listFn func(c resource.Connection, ns string) resource.List + // listMxFn func(c resource.Connection, mx resource.MetricsServer, ns string) resource.List colorerFn func(ns string, evt *resource.RowEvent) tcell.Color enterFn func(app *appView, ns, resource, selection string) decorateFn func(resource.TableData) resource.TableData resCmd struct { - title string - api string - viewFn viewFn - listFn listFn - listMxFn listMxFn + title string + api string + viewFn viewFn + listFn listFn + // listMxFn listMxFn enterFn enterFn colorerFn colorerFn decorateFn decorateFn @@ -193,7 +193,7 @@ func resourceViews(c k8s.Connection) map[string]resCmd { title: "Nodes", api: "", viewFn: newNodeView, - listMxFn: resource.NewNodeList, + listFn: resource.NewNodeList, colorerFn: nsColorer, }, "ns": { @@ -214,7 +214,7 @@ func resourceViews(c k8s.Connection) map[string]resCmd { title: "Pods", api: "", viewFn: newPodView, - listMxFn: resource.NewPodList, + listFn: resource.NewPodList, colorerFn: podColorer, }, "pv": { diff --git a/internal/views/resource.go b/internal/views/resource.go index dfcbb6fe..0a7c8a93 100644 --- a/internal/views/resource.go +++ b/internal/views/resource.go @@ -47,6 +47,7 @@ type ( mx sync.Mutex suspended bool nsListAccess bool + path *string } ) @@ -133,7 +134,7 @@ func (v *resourceView) updater(ctx context.Context) { if v.isSuspended() { continue } - v.app.QueueUpdate(func() { + v.app.QueueUpdateDraw(func() { v.refresh() }) } @@ -301,7 +302,6 @@ func (v *resourceView) describeCmd(evt *tcell.EventKey) *tcell.EventKey { } log.Debug().Msgf("Selected Item %v-%v-%v", v.list.GetNamespace(), v.list.GetName(), v.selectedItem) - v.defaultEnter(v.list.GetNamespace(), v.list.GetName(), v.selectedItem) return nil @@ -377,7 +377,7 @@ func (v *resourceView) doSwitchNamespace(ns string) { func (v *resourceView) refresh() { log.Debug().Msgf("%s", strings.Repeat("-", 80)) defer func(t time.Time) { - log.Debug().Msgf("Refresh %v", time.Since(t)) + log.Debug().Msgf("Refresh %s %v", v.list.GetName(), time.Since(t)) }(time.Now()) if _, ok := v.CurrentPage().Item.(*tableView); !ok { @@ -390,7 +390,7 @@ func (v *resourceView) refresh() { v.refreshActions() - if err := v.list.Reconcile(); err != nil { + if err := v.list.Reconcile(v.app.informer, v.path); err != nil { log.Error().Err(err).Msg("Reconciliation failed") v.app.flash(flashErr, err.Error()) } @@ -461,9 +461,16 @@ func namespaced(n string) (string, string) { func (v *resourceView) refreshActions() { if v.list.Access(resource.NamespaceAccess) { v.namespaces = make(map[int]string, config.MaxFavoritesNS) - for i, n := range v.app.config.FavNamespaces() { - v.actions[tcell.Key(numKeys[i])] = newKeyAction(n, v.switchNamespaceCmd, true) - v.namespaces[i] = n + v.actions[tcell.Key(numKeys[0])] = newKeyAction(resource.AllNamespace, v.switchNamespaceCmd, true) + v.namespaces[0] = resource.AllNamespace + index := 1 + for _, n := range v.app.config.FavNamespaces() { + if n == resource.AllNamespace { + continue + } + v.actions[tcell.Key(numKeys[index])] = newKeyAction(n, v.switchNamespaceCmd, true) + v.namespaces[index] = n + index++ } } diff --git a/internal/watch/container.go b/internal/watch/container.go new file mode 100644 index 00000000..b78a0c78 --- /dev/null +++ b/internal/watch/container.go @@ -0,0 +1,340 @@ +package watch + +import ( + "fmt" + "strconv" + + "github.com/derailed/k9s/internal/k8s" + "github.com/rs/zerolog/log" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" +) + +const ( + // ContainerIndex marker for stored containers. + ContainerIndex string = "co" + containerCols = 12 +) + +// Container tracks container activities. +type Container struct { + CallbackInformer + + data RowEvents + mxData k8s.PodMetrics + listener TableListenerFn + activeFQN *string +} + +// NewContainer returns a new container. +func NewContainer(po CallbackInformer) *Container { + co := Container{ + CallbackInformer: po, + data: RowEvents{}, + } + po.AddEventHandler(&co) + + return &co +} + +// Run starts out the informer loop. +func (c *Container) Run(closeCh <-chan struct{}) {} + +// Get retrieves a given container from store. +func (c *Container) Get(fqn string) (interface{}, error) { + o, ok, err := c.GetStore().GetByKey(fqn) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("Container %s not found", fqn) + } + + return o, nil +} + +// List retrieves a given containers from store. +func (c *Container) List(fqn string) (k8s.Collection, error) { + o, ok, err := c.GetStore().GetByKey(fqn) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("Pod %s not found", fqn) + } + + po := o.(*v1.Pod) + var cc k8s.Collection + for i := 0; i < len(po.Spec.InitContainers); i++ { + cc = append(cc, &po.Spec.InitContainers[i]) + } + for i := 0; i < len(po.Spec.Containers); i++ { + cc = append(cc, &po.Spec.Containers[i]) + } + + return cc, nil +} + +// SetListener registers event recipient. +func (c *Container) SetListener(fqn string, cb TableListenerFn) { + c.listener, c.activeFQN = cb, &fqn + o, err := c.Get(fqn) + if err != nil { + log.Error().Err(err).Msgf("Pod `%q not found", fqn) + return + } + // Clear out all rows + for k := range c.data { + delete(c.data, k) + } + c.updateData(watch.Added, o.(*v1.Pod)) + c.fireChanged() +} + +// UnsetListener unregister event recipient. +func (c *Container) UnsetListener(_ string) { + c.listener, c.activeFQN = nil, nil +} + +// Data return current data. +func (c *Container) tableData(ns string) TableData { + return TableData{ + Header: c.header(), + Rows: c.data, + Namespace: ns, + } +} + +func (c *Container) fireChanged() { + if cb := c.listener; cb != nil { + cb(c.tableData(NotNamespaced)) + } +} + +// StartWatching registers container event listener. +func (c *Container) StartWatching(stopCh <-chan struct{}) {} + +// OnAdd notify container added. +func (c *Container) OnAdd(obj interface{}) { + if c.activeFQN == nil { + return + } + + po := obj.(*v1.Pod) + fqn := MetaFQN(po.ObjectMeta) + if fqn != *c.activeFQN { + return + } + + log.Debug().Msgf("Pod Added %s", fqn) + // ff := make(Row, containerCols) + c.updateData(watch.Added, po) + + if c.HasSynced() { + c.fireChanged() + } +} + +// OnUpdate notify container updated. +func (c *Container) OnUpdate(oldObj, newObj interface{}) { + if c.activeFQN == nil { + return + } + + opo, npo := oldObj.(*v1.Pod), newObj.(*v1.Pod) + k1 := MetaFQN(opo.ObjectMeta) + k2 := MetaFQN(npo.ObjectMeta) + + if k1 != *c.activeFQN && k2 != *c.activeFQN { + return + } + + log.Debug().Msgf("Pod Updated %#v - %#v", opo.Name, npo.Name) + + // Check if this is a rollout + if k1 != k2 { + c.updateData(watch.Modified, opo) + } else { + c.updateData(watch.Modified, npo) + } + c.fireChanged() +} + +// OnDelete notify container was deleted. +func (c *Container) OnDelete(obj interface{}) { + if c.activeFQN == nil { + return + } + + po := obj.(*v1.Pod) + fqn := MetaFQN(po.ObjectMeta) + if fqn != *c.activeFQN { + return + } + log.Debug().Msgf("Pod Deleted %s", fqn) + c.data = RowEvents{} + c.fireChanged() +} + +// header return resource header. +func (*Container) header() Row { + var hh Row + return append(hh, + "NAME", + "IMAGE", + "READY", + "STATE", + "RS", + "LPROB", + "RPROB", + "CPU", + "MEM", + "RCPU", + "RMEM", + "AGE", + ) +} + +// Fields retrieves displayable fields. +func (c *Container) fields(pod *v1.Pod, co v1.Container) Row { + ff := make(Row, 0, containerCols) + + // mxs, _ := c.MetricsServer.FetchPodsMetrics(r.pod.Namespace) + + // var cpu, mem string + // for _, mx := range mxs.Items { + // if mx.Name != r.pod.Name { + // continue + // } + // for _, co := range mx.Containers { + // if co.Name != i.Name { + // continue + // } + // cpu, mem = toRes(co.Usage) + // } + // } + + // rcpu, rmem := resources(i) + + var cs *v1.ContainerStatus + for _, cos := range pod.Status.ContainerStatuses { + if cos.Name != co.Name { + continue + } + cs = &cos + } + + if cs == nil { + for _, cos := range pod.Status.InitContainerStatuses { + if cos.Name != co.Name { + continue + } + cs = &cos + } + } + + ready, state, restarts := "false", MissingValue, "0" + if cs != nil { + ready, state, restarts = boolToStr(cs.Ready), toState(cs.State), strconv.Itoa(int(cs.RestartCount)) + } + + cpu, mem, rcpu, rmem := "Z", "Z", "Z", "Z" + + return append(ff, + co.Name, + co.Image, + ready, + state, + restarts, + probe(co.LivenessProbe), + probe(co.ReadinessProbe), + cpu, + mem, + rcpu, + rmem, + toAge(pod.CreationTimestamp), + ) +} + +func (c *Container) updateData(action watch.EventType, po *v1.Pod) { + for _, co := range po.Spec.InitContainers { + ff := c.fields(po, co) + + if re, ok := c.data[co.Name]; ok { + re.Action = action + re.Deltas = re.Fields + re.Fields = ff + } else { + c.data[co.Name] = &RowEvent{ + Action: action, + Deltas: make(Row, containerCols), + Fields: ff, + } + } + } + for _, co := range po.Spec.Containers { + ff := c.fields(po, co) + if re, ok := c.data[co.Name]; ok { + re.Action = action + re.Deltas = re.Fields + re.Fields = ff + } else { + c.data[co.Name] = &RowEvent{ + Action: action, + Deltas: make(Row, containerCols), + Fields: ff, + } + } + } +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func toState(s v1.ContainerState) string { + switch { + case s.Waiting != nil: + if s.Waiting.Reason != "" { + return s.Waiting.Reason + } + return "Waiting" + + case s.Terminated != nil: + if s.Terminated.Reason != "" { + return s.Terminated.Reason + } + return "Terminated" + case s.Running != nil: + return "Running" + default: + return MissingValue + } +} + +func toRes(r v1.ResourceList) (string, string) { + cpu, mem := r[v1.ResourceCPU], r[v1.ResourceMemory] + + return ToMillicore(cpu.MilliValue()), ToMi(k8s.ToMB(mem.Value())) +} + +func resources(c v1.Container) (cpu, mem string) { + req, lim := c.Resources.Requests, c.Resources.Limits + + if len(req) == 0 { + if len(lim) != 0 { + return toRes(lim) + } + } else { + return toRes(req) + } + + return "0", "0" +} + +func probe(p *v1.Probe) string { + if p == nil { + return "no" + } + + return "yes" +} diff --git a/internal/watch/helpers.go b/internal/watch/helpers.go new file mode 100644 index 00000000..475e0e80 --- /dev/null +++ b/internal/watch/helpers.go @@ -0,0 +1,220 @@ +package watch + +import ( + "path" + "sort" + "strconv" + "strings" + "time" + + "github.com/derailed/tview" + runewidth "github.com/mattn/go-runewidth" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/duration" + "k8s.io/apimachinery/pkg/watch" +) + +const ( + // DefaultNamespace indicator to fetch default namespace. + DefaultNamespace = "default" + // AllNamespace namespace name to span all namespaces. + AllNamespace = "all" + // AllNamespaces indicator to retrieve K8s resource for all namespaces. + AllNamespaces = "" + // NotNamespaced indicator for non namespaced resource. + NotNamespaced = "-" + + // New track new resource events. + New watch.EventType = "NEW" + // Unchanged provides no change events. + Unchanged watch.EventType = "UNCHANGED" + + // MissingValue indicates an unset value. + MissingValue = "" + // NAValue indicates a value that does not pertain. + NAValue = "" + + megaByte = 1024 * 1024 +) + +func asMi(v int64) float64 { + return float64(v) / megaByte +} + +func empty(s []string) bool { + for _, v := range s { + if len(v) != 0 { + return false + } + } + return true +} + +// Join a slice of strings, skipping blanks. +func join(a []string, sep string) string { + switch len(a) { + case 0: + return "" + case 1: + return a[0] + } + + var b []string + for _, s := range a { + if s != "" { + b = append(b, s) + } + } + if len(b) == 0 { + return "" + } + + n := len(sep) * (len(b) - 1) + for i := 0; i < len(b); i++ { + n += len(a[i]) + } + + var buff strings.Builder + buff.Grow(n) + buff.WriteString(a[0]) + for _, s := range b[1:] { + buff.WriteString(sep) + buff.WriteString(s) + } + + return buff.String() +} + +func searchFQN(res, fqn string) string { + return res + ":" + fqn +} + +func indexFQN(res string, m metav1.ObjectMeta) string { + return res + ":" + MetaFQN(m) +} + +// MetaFQN computes unique resource id based on metadata. +func MetaFQN(m metav1.ObjectMeta) string { + if m.Namespace == "" { + return m.Name + } + + return m.Namespace + "/" + m.Name +} + +// ToMillicore shows cpu reading for human. +func ToMillicore(v int64) string { + return strconv.Itoa(int(v)) + "m" +} + +// ToMi shows mem reading for human. +func ToMi(v float64) string { + return strconv.Itoa(int(v)) + "Mi" +} + +// WithPerc shows a combined number and ratio. +func withPerc(v, p string) string { + var b strings.Builder + b.Grow(len(v) + len(p) + 2) + b.WriteString(v) + b.WriteString("(") + b.WriteString(p) + b.WriteString(")") + // return v + " (" + p + ")" + return b.String() +} + +// AsPerc prints a number as a percentage. +func AsPerc(f float64) string { + return strconv.Itoa(int(f)) + "%" +} + +// ToPerc computes the ratio of two numbers as a percentage. +func toPerc(v1, v2 float64) float64 { + if v2 == 0 { + return 0 + } + return (v1 / v2) * 100 +} + +func namespaced(n string) (string, string) { + ns, po := path.Split(n) + + return strings.Trim(ns, "/"), po +} + +func missing(s string) string { + return check(s, MissingValue) +} + +func na(s string) string { + return check(s, NAValue) +} + +func check(s, sub string) string { + if len(s) == 0 { + return sub + } + + return s +} + +func intToStr(i int64) string { + return strconv.Itoa(int(i)) +} + +func boolToStr(b bool) string { + switch b { + case true: + return "true" + default: + return "false" + } +} + +func toAge(timestamp metav1.Time) string { + return time.Since(timestamp.Time).String() +} + +func toAgeHuman(s string) string { + d, err := time.ParseDuration(s) + if err != nil { + return "" + } + + return duration.HumanDuration(d) +} + +// Truncate a string to the given l and suffix ellipsis if needed. +func Truncate(str string, width int) string { + return runewidth.Truncate(str, width, string(tview.SemigraphicsHorizontalEllipsis)) +} + +func mapToStr(m map[string]string) (s string) { + if len(m) == 0 { + return MissingValue + } + + kk := make([]string, 0, len(m)) + for k := range m { + kk = append(kk, k) + } + sort.Strings(kk) + + for i, k := range kk { + s += k + "=" + m[k] + if i < len(kk)-1 { + s += "," + } + } + + return +} + +func boolPtrToStr(b *bool) string { + if b == nil { + return "false" + } + + return boolToStr(*b) +} diff --git a/internal/watch/meta.go b/internal/watch/meta.go new file mode 100644 index 00000000..c13a14e0 --- /dev/null +++ b/internal/watch/meta.go @@ -0,0 +1,119 @@ +package watch + +import ( + "errors" + "fmt" + + "github.com/derailed/k9s/internal/k8s" + "github.com/rs/zerolog/log" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +type ( + // Row represents a collection of string fields. + Row []string + + // RowEvent represents a call for action after a resource reconciliation. + // Tracks whether a resource got added, deleted or updated. + RowEvent struct { + Action watch.EventType + Fields Row + Deltas Row + } + + // RowEvents tracks resource update events. + RowEvents map[string]*RowEvent + + // TableData tracks a K8s resource for tabular display. + TableData struct { + Header Row + Rows RowEvents + Namespace string + } +) + +// TableListenerFn represents a table data listener. +type TableListenerFn func(TableData) + +// CallbackInformer an informer that allows listeners registration. +type CallbackInformer interface { + cache.SharedIndexInformer + Get(fqn string) (interface{}, error) + List(ns string) (k8s.Collection, error) + SetListener(ns string, cb TableListenerFn) + UnsetListener(ns string) +} + +// Meta represents a collection of cluster wide watchers. +type Meta struct { + informers map[string]CallbackInformer + client k8s.Connection + podInformer *Pod + listenerFn TableListenerFn +} + +// NewMeta creates a new cluster resource informer +func NewMeta(client k8s.Connection, ns string) *Meta { + m := Meta{client: client, informers: map[string]CallbackInformer{}} + m.init(ns) + + return &m +} + +func (m *Meta) init(ns string) { + po := NewPod(m.client, ns) + m.informers = map[string]CallbackInformer{ + NodeIndex: NewNode(m.client), + NodeMXIndex: NewNodeMetrics(m.client), + PodIndex: po, + PodMXIndex: NewPodMetrics(m.client, ns), + ContainerIndex: NewContainer(po), + } +} + +// List items from store. +func (m *Meta) List(res, ns string) (k8s.Collection, error) { + if m == nil { + return nil, fmt.Errorf("No meta exists") + } + if i, ok := m.informers[res]; ok { + return i.List(ns) + } + + return nil, fmt.Errorf("No informer found for resource %s", res) +} + +// RegisterListener register table data listeners. +func (m *Meta) RegisterListener(res, ns string, cb TableListenerFn) { + if informer, ok := m.informers[res]; ok { + informer.SetListener(ns, cb) + } else { + log.Error().Msgf("No informer for %q:%s", ns, res) + } +} + +// UnregisterListener register table data listeners. +func (m *Meta) UnregisterListener(ns string) { + for _, i := range m.informers { + i.UnsetListener(ns) + } +} + +// Get a resource by name. +func (m Meta) Get(res, fqn string) (interface{}, error) { + if informer, ok := m.informers[res]; ok { + return informer.Get(fqn) + } + + return nil, errors.New("Unable to local resource") +} + +// Run starts watching cluster resources. +func (m *Meta) Run(closeCh <-chan struct{}) { + for i := range m.informers { + go func(informer CallbackInformer, c <-chan struct{}) { + informer.Run(c) + }(m.informers[i], closeCh) + } +} diff --git a/internal/watch/no.go b/internal/watch/no.go new file mode 100644 index 00000000..6c60ae06 --- /dev/null +++ b/internal/watch/no.go @@ -0,0 +1,299 @@ +package watch + +import ( + "fmt" + "strings" + + "github.com/derailed/k9s/internal/k8s" + "github.com/rs/zerolog/log" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + wv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" +) + +const ( + // NodeIndex marker for stored nodes. + NodeIndex string = "no" + nodeCols = 12 +) + +// Node tracks node activities. +type Node struct { + cache.SharedIndexInformer + + client k8s.Connection + data RowEvents + mxData k8s.NodesMetrics + listener TableListenerFn +} + +// NewNode returns a new node. +func NewNode(client k8s.Connection) *Node { + no := Node{ + client: client, + data: RowEvents{}, + mxData: k8s.NodesMetrics{}, + } + + if client == nil { + return &no + } + + no.SharedIndexInformer = wv1.NewNodeInformer( + client.DialOrDie(), + 0, + cache.Indexers{}, + ) + no.SharedIndexInformer.AddEventHandler(&no) + + return &no +} + +// List all nodes. +func (n *Node) List(_ string) (k8s.Collection, error) { + var res k8s.Collection + for _, o := range n.GetStore().List() { + res = append(res, o) + } + return res, nil +} + +// Get retrieves a given node from store. +func (n *Node) Get(fqn string) (interface{}, error) { + o, ok, err := n.GetStore().GetByKey(fqn) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("Node %s not found", fqn) + } + + return o, nil +} + +// SetListener registers event recipient. +func (n *Node) SetListener(_ string, cb TableListenerFn) { + n.listener = cb + n.fireChanged() +} + +// UnsetListener unregister event recipient. +func (n *Node) UnsetListener(_ string) { + n.listener = nil +} + +func (n *Node) fetchMetrics() { + if n.client == nil { + return + } + + client := k8s.NewMetricsServer(n.client) + mx, err := client.FetchNodesMetrics() + if err != nil { + log.Error().Err(err).Msg("Node metrics failed") + return + } + client.NodesMetrics(n.GetStore().List(), mx, n.mxData) +} + +// Data return current data. +func (n *Node) tableData() TableData { + return TableData{ + Header: n.header(), + Rows: n.data, + Namespace: NotNamespaced, + } +} + +func (n *Node) fireChanged() { + if cb := n.listener; cb != nil { + cb(n.tableData()) + } +} + +// OnAdd notify node added. +func (n *Node) OnAdd(obj interface{}) { + no := obj.(*v1.Node) + n.fetchMetrics() + ff := make(Row, nodeCols) + n.fields(no, ff) + fqn := MetaFQN(no.ObjectMeta) + log.Debug().Msgf("Node Added %s", fqn) + + n.data[fqn] = &RowEvent{ + Action: watch.Added, + Fields: ff, + Deltas: make(Row, len(ff)), + } + + if n.HasSynced() { + n.fireChanged() + } +} + +// OnUpdate notify node updated. +func (n *Node) OnUpdate(oldObj, newObj interface{}) { + ono, nno := oldObj.(*v1.Node), newObj.(*v1.Node) + k1 := MetaFQN(ono.ObjectMeta) + k2 := MetaFQN(nno.ObjectMeta) + + ff := make(Row, nodeCols) + n.fields(nno, ff) + if re, ok := n.data[k1]; ok { + re.Action = watch.Modified + re.Deltas = re.Fields + re.Fields = ff + } + n.data[k2] = &RowEvent{ + Action: watch.Added, + Fields: ff, + Deltas: make(Row, len(ff)), + } + n.fireChanged() +} + +// OnDelete notify node was deleted. +func (n *Node) OnDelete(obj interface{}) { + po := obj.(*v1.Node) + key := MetaFQN(po.ObjectMeta) + + log.Debug().Msgf("Node Delete %s", key) + + delete(n.data, key) + n.fireChanged() +} + +// Header returns resource header. +func (*Node) header() Row { + return Row{ + "NAME", + "STATUS", + "ROLE", + "VERSION", + "KERNEL", + "INTERNAL-IP", + "EXTERNAL-IP", + "CPU", + "MEM", + "ACPU", + "AMEM", + "AGE", + } +} + +// Fields returns displayable fields. +func (n *Node) fields(no *v1.Node, r Row) { + col := 0 + r[col] = no.Name + col++ + sta := make([]string, 10) + n.status(no.Status, no.Spec.Unschedulable, sta) + r[col] = join(sta, ",") + col++ + ro := make([]string, 10) + n.nodeRoles(no, ro) + r[col] = join(ro, ",") + col++ + r[col] = no.Status.NodeInfo.KubeletVersion + col++ + r[col] = no.Status.NodeInfo.KernelVersion + col++ + iIP, eIP := n.getIPs(no.Status.Addresses) + iIP, eIP = missing(iIP), missing(eIP) + r[col], r[col+1] = iIP, eIP + col += 2 + + fqn := MetaFQN(no.ObjectMeta) + n.fetchMetrics() + mx := n.mxData[fqn] + r[col] = withPerc( + ToMillicore(mx.CurrentCPU), + AsPerc(toPerc(float64(mx.CurrentCPU), float64(mx.AvailCPU))), + ) + col++ + r[col] = withPerc( + ToMi(mx.CurrentMEM), + AsPerc(toPerc(mx.CurrentMEM, mx.AvailMEM)), + ) + col++ + r[col] = ToMillicore(mx.AvailCPU) + col++ + r[col] = ToMi(mx.AvailMEM) + col++ + r[col] = toAge(no.ObjectMeta.CreationTimestamp) +} + +// ---------------------------------------------------------------------------- +// Helpers... + +const ( + labelNodeRolePrefix = "node-role.kubernetes.io/" + nodeLabelRole = "kubernetes.io/role" +) + +func (*Node) nodeRoles(node *v1.Node, res []string) { + index := 0 + for k, v := range node.Labels { + switch { + case strings.HasPrefix(k, labelNodeRolePrefix): + if role := strings.TrimPrefix(k, labelNodeRolePrefix); len(role) > 0 { + res[index] = role + index++ + } + case k == nodeLabelRole && v != "": + res[index] = v + index++ + } + } + + if empty(res) { + res[index] = MissingValue + index++ + } +} + +func (*Node) getIPs(addrs []v1.NodeAddress) (iIP, eIP string) { + for _, a := range addrs { + switch a.Type { + case v1.NodeExternalIP: + eIP = a.Address + case v1.NodeInternalIP: + iIP = a.Address + } + } + + return +} + +func (*Node) status(status v1.NodeStatus, exempt bool, res []string) { + var index int + conditions := make(map[v1.NodeConditionType]*v1.NodeCondition) + for n := range status.Conditions { + cond := status.Conditions[n] + conditions[cond.Type] = &cond + } + + validConditions := []v1.NodeConditionType{v1.NodeReady} + for _, validCondition := range validConditions { + condition, ok := conditions[validCondition] + if !ok { + continue + } + neg := "" + if condition.Status != v1.ConditionTrue { + neg = "Not" + } + res[index] = neg + string(condition.Type) + index++ + + } + if len(res) == 0 { + res[index] = "Unknown" + index++ + } + if exempt { + res[index] = "SchedulingDisabled" + index++ + } +} diff --git a/internal/watch/no_mx.go b/internal/watch/no_mx.go new file mode 100644 index 00000000..ff3cb7bf --- /dev/null +++ b/internal/watch/no_mx.go @@ -0,0 +1,211 @@ +package watch + +import ( + "fmt" + "time" + + "github.com/derailed/k9s/internal/k8s" + "github.com/rs/zerolog/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" + versioned "k8s.io/metrics/pkg/client/clientset/versioned" +) + +const ( + // NodeMXIndex track store indexer. + NodeMXIndex string = "nmx" +) + +// NodeMetrics tracks node metrics. +type NodeMetrics struct { + cache.SharedIndexInformer + + client k8s.Connection +} + +// NewNodeMetrics returns a node metrics informer. +func NewNodeMetrics(client k8s.Connection) *NodeMetrics { + mx := NodeMetrics{ + client: client, + } + + if client == nil { + return &mx + } + + c, err := client.MXDial() + if err != nil { + return &mx + } + mx.SharedIndexInformer = NewNodeMetricsInformer(c, 0, cache.Indexers{}) + mx.SharedIndexInformer.AddEventHandler(&mx) + + return &mx +} + +// List node metrics from store. +func (p *NodeMetrics) List(string) (k8s.Collection, error) { + return p.GetStore().List(), nil +} + +// Get node metrics from store. +func (p *NodeMetrics) Get(MetaFQN string) (interface{}, error) { + o, ok, err := p.GetStore().GetByKey(MetaFQN) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("NodeMetric for %q not found", MetaFQN) + } + + return o, nil +} + +// SetListener register an event listiner. +func (p *NodeMetrics) SetListener(ns string, cb TableListenerFn) {} + +// UnsetListener unregister event listener. +func (p *NodeMetrics) UnsetListener(ns string) {} + +// OnAdd notify node added. +func (p *NodeMetrics) OnAdd(obj interface{}) { + po := obj.(*mv1beta1.NodeMetrics) + fqn := MetaFQN(po.ObjectMeta) + log.Debug().Msgf("NMX Added %s", fqn) +} + +// OnUpdate notify node updated. +func (p *NodeMetrics) OnUpdate(oldObj, newObj interface{}) { + opo, npo := oldObj.(*mv1beta1.NodeMetrics), newObj.(*mv1beta1.NodeMetrics) + + k1 := MetaFQN(opo.ObjectMeta) + k2 := MetaFQN(npo.ObjectMeta) + log.Debug().Msgf("NMX Updated %#v -- %#v", k1, k2) +} + +// OnDelete notify node was deleted. +func (p *NodeMetrics) OnDelete(obj interface{}) { + po := obj.(*mv1beta1.NodeMetrics) + key := MetaFQN(po.ObjectMeta) + log.Debug().Msgf("NMX Delete %s", key) +} + +// NewNodeMetricsInformer return an informer to return node metrix. +func NewNodeMetricsInformer(client *versioned.Clientset, sync time.Duration, idxs cache.Indexers) cache.SharedIndexInformer { + pw := NewNodeMxWatcher(client) + + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + l, err := client.MetricsV1beta1().NodeMetricses().List(opts) + if err == nil { + pw.update(l, false) + } + return l, err + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + pw.Run() + return pw, nil + }, + }, + &mv1beta1.NodeMetrics{}, + sync, + idxs, + ) +} + +const nodeMXRefresh = 30 * time.Second + +// NodeMxWatcher tracks node metrics. +type NodeMxWatcher struct { + client *versioned.Clientset + cache map[string]runtime.Object + eventChan chan watch.Event + doneChan chan struct{} +} + +// NewNodeMxWatcher returns a new metrics watcher. +func NewNodeMxWatcher(c *versioned.Clientset) *NodeMxWatcher { + return &NodeMxWatcher{ + client: c, + cache: map[string]runtime.Object{}, + eventChan: make(chan watch.Event), + doneChan: make(chan struct{}), + } +} + +// Run watcher to monitor node metrics. +func (n *NodeMxWatcher) Run() { + go func() { + defer log.Debug().Msg("Node metrics watcher canceled!") + for { + select { + case <-n.doneChan: + return + case <-time.After(nodeMXRefresh): + list, err := n.client.MetricsV1beta1().NodeMetricses().List(metav1.ListOptions{}) + if err != nil { + log.Error().Err(err).Msg("Fetch node metrics") + } + n.update(list, true) + } + } + }() +} + +func (n *NodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { + fqns := map[string]runtime.Object{} + for i := range list.Items { + fqn := MetaFQN(list.Items[i].ObjectMeta) + fqns[fqn] = &list.Items[i] + } + + for k, v := range n.cache { + if _, ok := fqns[k]; !ok { + if notify { + n.eventChan <- watch.Event{ + Type: watch.Deleted, + Object: v, + } + } + delete(n.cache, k) + } + } + + for k, v := range fqns { + kind := watch.Added + if v1, ok := n.cache[k]; ok { + if !n.deltas(v1.(*mv1beta1.NodeMetrics), v.(*mv1beta1.NodeMetrics)) { + continue + } + kind = watch.Modified + } + + if notify { + n.eventChan <- watch.Event{ + Type: kind, + Object: v, + } + } + n.cache[k] = v + } +} + +// Stop the metrics informer. +func (n *NodeMxWatcher) Stop() { + log.Debug().Msg("Stopping node watcher!") + close(n.doneChan) + close(n.eventChan) +} + +// ResultChan retrieves event channel. +func (n *NodeMxWatcher) ResultChan() <-chan watch.Event { + return n.eventChan +} + +func (n *NodeMxWatcher) deltas(m1, m2 *mv1beta1.NodeMetrics) bool { + return resourceDiff(m1.Usage, m2.Usage) +} diff --git a/internal/watch/no_test.go b/internal/watch/no_test.go new file mode 100644 index 00000000..77e0d380 --- /dev/null +++ b/internal/watch/no_test.go @@ -0,0 +1,57 @@ +package watch + +import ( + "testing" + + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func BenchmarkNodeFields(b *testing.B) { + n := NewNode(nil) + no := makeNode() + ff := make(Row, 12) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + n.fields(no, ff) + } +} + +func TestJoin(t *testing.T) { + uu := map[string]struct { + i []string + e string + }{ + "zero": {[]string{}, ""}, + "std": {[]string{"a", "b", "c"}, "a,b,c"}, + "blank": {[]string{"", "", ""}, ""}, + "sparse": {[]string{"a", "", "c"}, "a,c"}, + } + + for k, v := range uu { + t.Run(k, func(t *testing.T) { + assert.Equal(t, v.e, join(v.i, ",")) + }) + } +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func makeNode() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fred", + CreationTimestamp: metav1.Time{Time: testTime()}, + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Addresses: []v1.NodeAddress{ + {Address: "1.1.1.1"}, + }, + }, + } +} diff --git a/internal/watch/pod.go b/internal/watch/pod.go new file mode 100644 index 00000000..cecb91f5 --- /dev/null +++ b/internal/watch/pod.go @@ -0,0 +1,356 @@ +package watch + +import ( + "fmt" + "strconv" + + "github.com/derailed/k9s/internal/k8s" + "github.com/rs/zerolog/log" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/watch" + wv1 "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/util/node" +) + +const ( + // PodIndex marker for stored pods. + PodIndex string = "po" + podCols = 11 +) + +// Pod tracks pod activities. +type Pod struct { + cache.SharedIndexInformer + + client k8s.Connection + data RowEvents + mxData k8s.PodsMetrics + ns string + listener TableListenerFn + activeNS *string +} + +// NewPod returns a new pod. +func NewPod(client k8s.Connection, ns string) *Pod { + po := Pod{ + ns: ns, + client: client, + data: RowEvents{}, + mxData: k8s.PodsMetrics{}, + } + + if client == nil { + return &po + } + + po.SharedIndexInformer = wv1.NewPodInformer( + client.DialOrDie(), + ns, + 0, + cache.Indexers{}, + ) + po.AddEventHandler(&po) + + return &po +} + +// SetListener registers event recipient. +func (p *Pod) SetListener(ns string, cb TableListenerFn) { + p.listener, p.activeNS = cb, &ns + p.fireChanged(*p.activeNS) +} + +// UnsetListener unregister event recipient. +func (p *Pod) UnsetListener(_ string) { + p.listener, p.activeNS = nil, nil +} + +// Data return current data. +func (p *Pod) tableData(ns string) TableData { + // Filter list based on active namespace. + data := RowEvents{} + for k := range p.data { + pns, _ := namespaced(k) + if ns == AllNamespaces || pns == ns { + data[k] = p.data[k] + } + } + + return TableData{ + Header: p.header(), + Rows: data, + Namespace: ns, + } +} + +// List all pods from store in the given namespace. +func (p *Pod) List(ns string) (k8s.Collection, error) { + var res k8s.Collection + for _, o := range p.GetStore().List() { + pod := o.(*v1.Pod) + if ns == "" || pod.Namespace == ns { + res = append(res, pod) + } + } + return res, nil +} + +// Get retrieves a given pod from store. +func (p *Pod) Get(fqn string) (interface{}, error) { + o, ok, err := p.GetStore().GetByKey(fqn) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("Pod %s not found", fqn) + } + + return o, nil +} + +func (p *Pod) fireChanged(ns string) { + if cb := p.listener; cb != nil { + cb(p.tableData(*p.activeNS)) + } +} + +// OnAdd notify pod added. +func (p *Pod) OnAdd(obj interface{}) { + po := obj.(*v1.Pod) + ff := make(Row, podCols) + p.fields(p.ns, po, ff) + fqn := MetaFQN(po.ObjectMeta) + // log.Debug().Msgf("Pod Added %s", fqn) + + p.data[fqn] = &RowEvent{ + Action: watch.Added, + Fields: ff, + Deltas: make(Row, len(ff)), + } + + if p.HasSynced() { + p.fireChanged(po.Namespace) + } +} + +// OnUpdate notify pod updated. +func (p *Pod) OnUpdate(oldObj, newObj interface{}) { + opo, npo := oldObj.(*v1.Pod), newObj.(*v1.Pod) + + k1 := MetaFQN(opo.ObjectMeta) + k2 := MetaFQN(npo.ObjectMeta) + // log.Debug().Msgf("Pod Updated %#v -- %#v", opo.Name, npo.Name) + + p.deltas(opo, npo) + ff := make(Row, podCols) + p.fields(p.ns, npo, ff) + if re, ok := p.data[k1]; ok { + re.Action = watch.Modified + re.Deltas = re.Fields + re.Fields = ff + } + p.data[k2] = &RowEvent{ + Action: watch.Added, + Fields: ff, + Deltas: make(Row, len(ff)), + } + p.fireChanged(npo.Namespace) +} + +func (p *Pod) deltas(p1, p2 *v1.Pod) { + f1 := make(Row, podCols) + p.fields(p.ns, p1, f1) + f2 := make(Row, podCols) + p.fields(p.ns, p2, f2) + for i := 0; i < len(f1); i++ { + if f1[i] != f2[i] { + log.Debug().Msgf("Pod changed %s - %s", f1[i], f2[i]) + } + } +} + +// OnDelete notify pod was deleted. +func (p *Pod) OnDelete(obj interface{}) { + po := obj.(*v1.Pod) + key := MetaFQN(po.ObjectMeta) + // log.Debug().Msgf("Pod Delete %s", key) + delete(p.data, key) + p.fireChanged(po.Namespace) +} + +// header return resource header. +func (*Pod) header() Row { + var hh Row + return append(hh, + "NAMESPACE", + "NAME", + "READY", + "STATUS", + "RS", + "CPU", + "MEM", + "IP", + "NODE", + "QOS", + "AGE", + ) +} + +// fields retrieves displayable fields. +func (p *Pod) fields(ns string, pod *v1.Pod, ff Row) { + var col int + ff[col] = pod.ObjectMeta.Namespace + col++ + ff[col] = pod.ObjectMeta.Name + col++ + ss := pod.Status.ContainerStatuses + cr, _, rc := p.statuses(ss) + ff[col] = strconv.Itoa(cr) + "/" + strconv.Itoa(len(ss)) + col++ + ff[col] = p.phase(pod) + col++ + ff[col] = strconv.Itoa(rc) + col++ + fqn := MetaFQN(pod.ObjectMeta) + p.fetchMetrics() + mx := p.mxData[fqn] + ff[col] = ToMillicore(mx.CurrentCPU) + col++ + ff[col] = ToMi(mx.CurrentMEM) + col++ + ff[col] = pod.Status.PodIP + col++ + ff[col] = pod.Spec.NodeName + col++ + ff[col] = p.mapQOS(pod.Status.QOSClass) + col++ + ff[col] = toAge(pod.ObjectMeta.CreationTimestamp) + col++ +} + +func (p *Pod) fetchMetrics() { + if p.client == nil { + return + } + + client := k8s.NewMetricsServer(p.client) + mx, err := client.FetchPodsMetrics(p.ns) + if err != nil { + log.Error().Err(err).Msg("Pod metrics failed") + return + } + client.PodsMetrics(mx, p.mxData) +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func (*Pod) mapQOS(class v1.PodQOSClass) string { + switch class { + case v1.PodQOSGuaranteed: + return "GA" + case v1.PodQOSBurstable: + return "BU" + default: + return "BE" + } +} + +func (*Pod) statuses(ss []v1.ContainerStatus) (cr, ct, rc int) { + for _, c := range ss { + if c.State.Terminated != nil { + ct++ + } + if c.Ready { + cr = cr + 1 + } + rc += int(c.RestartCount) + } + + return +} + +func isSet(s *string) bool { + return s != nil && *s != "" +} + +func (p *Pod) phase(po *v1.Pod) string { + status := string(po.Status.Phase) + if po.Status.Reason != "" { + if po.DeletionTimestamp != nil && po.Status.Reason == node.NodeUnreachablePodReason { + return "Unknown" + } + status = po.Status.Reason + } + + var init bool + init, status = p.initContainerPhase(po.Status, len(po.Spec.InitContainers), status) + if init { + return status + } + + var running bool + running, status = p.containerPhase(po.Status, status) + if status == "Completed" && running { + status = "Running" + } + + if po.DeletionTimestamp == nil { + return status + } + + return "Terminated" +} + +func (*Pod) containerPhase(st v1.PodStatus, status string) (bool, string) { + var running bool + for i := len(st.ContainerStatuses) - 1; i >= 0; i-- { + cs := st.ContainerStatuses[i] + switch { + case cs.State.Waiting != nil && cs.State.Waiting.Reason != "": + status = cs.State.Waiting.Reason + case cs.State.Terminated != nil && cs.State.Terminated.Reason != "": + status = cs.State.Terminated.Reason + case cs.State.Terminated != nil: + if cs.State.Terminated.Signal != 0 { + status = "Signal:" + strconv.Itoa(int(cs.State.Terminated.Signal)) + } else { + status = "ExitCode:" + strconv.Itoa(int(cs.State.Terminated.ExitCode)) + } + case cs.Ready && cs.State.Running != nil: + running = true + } + } + + return running, status +} + +func (*Pod) initContainerPhase(st v1.PodStatus, initCount int, status string) (bool, string) { + var init bool + for i, cs := range st.InitContainerStatuses { + switch { + case cs.State.Terminated != nil: + if cs.State.Terminated.ExitCode == 0 { + continue + } + if cs.State.Terminated.Reason != "" { + status = "Init:" + cs.State.Terminated.Reason + break + } + if cs.State.Terminated.Signal != 0 { + status = "Init:Signal:" + strconv.Itoa(int(cs.State.Terminated.Signal)) + } else { + status = "Init:ExitCode:" + strconv.Itoa(int(cs.State.Terminated.ExitCode)) + } + case cs.State.Waiting != nil && cs.State.Waiting.Reason != "" && cs.State.Waiting.Reason != "PodInitializing": + status = "Init:" + cs.State.Waiting.Reason + default: + status = "Init:" + strconv.Itoa(i) + "/" + strconv.Itoa(initCount) + } + init = true + break + } + + return init, status +} diff --git a/internal/watch/pod_mx.go b/internal/watch/pod_mx.go new file mode 100644 index 00000000..71ef811d --- /dev/null +++ b/internal/watch/pod_mx.go @@ -0,0 +1,260 @@ +package watch + +import ( + "fmt" + "time" + + "github.com/derailed/k9s/internal/k8s" + "github.com/rs/zerolog/log" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" + versioned "k8s.io/metrics/pkg/client/clientset/versioned" +) + +const ( + // PodMXIndex track store indexer. + PodMXIndex string = "pmx" +) + +// PodMetrics tracks pod metrics. +type PodMetrics struct { + cache.SharedIndexInformer + + client k8s.Connection + ns string +} + +// NewPodMetrics returns a pod metrics informer. +func NewPodMetrics(client k8s.Connection, ns string) *PodMetrics { + mx := PodMetrics{ + ns: ns, + client: client, + } + + if client == nil { + return &mx + } + + c, err := client.MXDial() + if err != nil { + return &mx + } + mx.SharedIndexInformer = NewPodMetricsInformer(c, ns, 0, cache.Indexers{}) + + return &mx +} + +// List pod metrics from store. +func (p *PodMetrics) List(ns string) (k8s.Collection, error) { + var res k8s.Collection + for _, o := range p.GetStore().List() { + mx := o.(*mv1beta1.PodMetrics) + if ns == "" || mx.Namespace == ns { + res = append(res, mx) + } + } + return res, nil +} + +// Get pod metrics from store. +func (p *PodMetrics) Get(fqn string) (interface{}, error) { + o, ok, err := p.GetStore().GetByKey(fqn) + if err != nil { + return nil, err + } + if !ok { + return nil, fmt.Errorf("PodMetric for %q not found", fqn) + } + + return o, nil +} + +// SetListener register an event listiner. +func (p *PodMetrics) SetListener(ns string, cb TableListenerFn) {} + +// UnsetListener unregister event listener. +func (p *PodMetrics) UnsetListener(ns string) {} + +// OnAdd notify pod added. +func (p *PodMetrics) OnAdd(obj interface{}) { + po := obj.(*mv1beta1.PodMetrics) + fqn := MetaFQN(po.ObjectMeta) + log.Debug().Msgf("MX Added %s", fqn) +} + +// OnUpdate notify pod updated. +func (p *PodMetrics) OnUpdate(oldObj, newObj interface{}) { + opo, npo := oldObj.(*mv1beta1.PodMetrics), newObj.(*mv1beta1.PodMetrics) + + k1 := MetaFQN(opo.ObjectMeta) + k2 := MetaFQN(npo.ObjectMeta) + log.Debug().Msgf("MX Updated %#v -- %#v", k1, k2) +} + +// OnDelete notify pod was deleted. +func (p *PodMetrics) OnDelete(obj interface{}) { + po := obj.(*mv1beta1.PodMetrics) + key := MetaFQN(po.ObjectMeta) + log.Debug().Msgf("MX Delete %s", key) +} + +// NewPodMetricsInformer return an informer to return pod metrix. +func NewPodMetricsInformer(client *versioned.Clientset, ns string, sync time.Duration, idxs cache.Indexers) cache.SharedIndexInformer { + pw := NewPodMxWatcher(client, ns) + + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + l, err := client.MetricsV1beta1().PodMetricses(ns).List(opts) + if err == nil { + pw.update(l, false) + } + return l, err + }, + WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + pw.Run() + return pw, nil + }, + }, + &mv1beta1.PodMetrics{}, + sync, + idxs, + ) +} + +const podMXRefresh = 15 * time.Second + +// PodMxWatcher tracks pod metrics. +type PodMxWatcher struct { + client *versioned.Clientset + ns string + cache map[string]runtime.Object + eventChan chan watch.Event + doneChan chan struct{} +} + +// NewPodMxWatcher returns a new metrics watcher. +func NewPodMxWatcher(c *versioned.Clientset, ns string) *PodMxWatcher { + return &PodMxWatcher{ + client: c, + ns: ns, + eventChan: make(chan watch.Event), + doneChan: make(chan struct{}), + cache: map[string]runtime.Object{}, + } +} + +// Run watcher to monitor pod metrics. +func (p *PodMxWatcher) Run() { + go func() { + defer log.Debug().Msg("Podmetrics watcher canceled!") + for { + select { + case <-p.doneChan: + return + case <-time.After(podMXRefresh): + list, err := p.client.MetricsV1beta1().PodMetricses(p.ns).List(metav1.ListOptions{}) + if err != nil { + log.Error().Err(err).Msg("Fetch pod metrics") + } + p.update(list, true) + } + } + }() +} + +func (p *PodMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { + fqns := map[string]runtime.Object{} + for i := range list.Items { + fqn := MetaFQN(list.Items[i].ObjectMeta) + fqns[fqn] = &list.Items[i] + } + + for k, v := range p.cache { + if _, ok := fqns[k]; !ok { + if notify { + p.eventChan <- watch.Event{ + Type: watch.Deleted, + Object: v, + } + } + delete(p.cache, k) + } + } + + for k, v := range fqns { + kind := watch.Added + if v1, ok := p.cache[k]; ok { + if !p.deltas(v1.(*mv1beta1.PodMetrics), v.(*mv1beta1.PodMetrics)) { + continue + } + kind = watch.Modified + } + + if notify { + p.eventChan <- watch.Event{ + Type: kind, + Object: v, + } + } + p.cache[k] = v + } +} + +// Stop the metrics informer. +func (p *PodMxWatcher) Stop() { + log.Debug().Msg("Stopping pod watcher!") + close(p.doneChan) + close(p.eventChan) +} + +// ResultChan retrieves event channel. +func (p *PodMxWatcher) ResultChan() <-chan watch.Event { + return p.eventChan +} + +func (p *PodMxWatcher) deltas(m1, m2 *mv1beta1.PodMetrics) bool { + mm1 := map[string]v1.ResourceList{} + for _, co := range m1.Containers { + mm1[co.Name] = co.Usage + } + mm2 := map[string]v1.ResourceList{} + for _, co := range m2.Containers { + mm2[co.Name] = co.Usage + } + + for k2, v2 := range mm2 { + v1, ok := mm1[k2] + if !ok { + log.Debug().Msgf("Missing container %s", k2) + return true + } + if resourceDiff(v1, v2) { + log.Debug().Msgf("Resources mismatch on container %s", k2) + return true + } + } + + return false +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func resourceDiff(l1, l2 v1.ResourceList) bool { + c1, c2 := l1[v1.ResourceCPU], l2[v1.ResourceCPU] + if c1.Cmp(c2) != 0 { + log.Debug().Msgf("CPU Delta %v vs %v", c1, c2) + return true + } + m1, m2 := l1[v1.ResourceMemory], l2[v1.ResourceMemory] + if m1.Cmp(m2) != 0 { + log.Debug().Msgf("MEM Delta %d vs %d", m1.Value(), m2.Value()) + return true + } + return false +} diff --git a/internal/watch/pod_mx_test.go b/internal/watch/pod_mx_test.go new file mode 100644 index 00000000..fcd30fd0 --- /dev/null +++ b/internal/watch/pod_mx_test.go @@ -0,0 +1,90 @@ +package watch + +import ( + "strconv" + "testing" + + "github.com/rs/zerolog" + "gotest.tools/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + mv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" +) + +func init() { + zerolog.SetGlobalLevel(zerolog.Disabled) +} + +func TestMxResourceDiff(t *testing.T) { + uu := map[string]struct { + r1, r2 v1.ResourceList + e bool + }{ + "same": {makeRes("0m", "0Mi"), makeRes("0m", "0Mi"), false}, + "omem": {makeRes("0m", "10Mi"), makeRes("0m", "1Mi"), true}, + "nmem": {makeRes("0m", "0Mi"), makeRes("0m", "1Mi"), true}, + "ocpu": {makeRes("1m", "0Mi"), makeRes("0m", "0Mi"), true}, + "ncpu": {makeRes("1m", "0Mi"), makeRes("2m", "0Mi"), true}, + } + + for k, v := range uu { + t.Run(k, func(t *testing.T) { + assert.Equal(t, v.e, resourceDiff(v.r1, v.r2)) + }) + } +} + +func TestMxDeltas(t *testing.T) { + uu := map[string]struct { + m1, m2 *mv1beta1.PodMetrics + e bool + }{ + "same": {makePodMxCo("p1", "1m", "0Mi", 1), makePodMxCo("p1", "1m", "0Mi", 1), false}, + "dcpu": {makePodMxCo("p1", "10m", "0Mi", 1), makePodMxCo("p1", "0m", "0Mi", 1), true}, + "dmem": {makePodMxCo("p1", "0m", "10Mi", 1), makePodMxCo("p1", "0m", "0Mi", 1), true}, + "dco": {makePodMxCo("p1", "0m", "10Mi", 1), makePodMxCo("p1", "0m", "0Mi", 2), true}, + } + + var p PodMxWatcher + for k, v := range uu { + t.Run(k, func(t *testing.T) { + assert.Equal(t, v.e, p.deltas(v.m1, v.m2)) + }) + } +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func makeRes(c, m string) v1.ResourceList { + cpu, _ := resource.ParseQuantity(c) + mem, _ := resource.ParseQuantity(m) + + return v1.ResourceList{ + v1.ResourceCPU: cpu, + v1.ResourceMemory: mem, + } +} + +func makePodMxCo(name, cpu, mem string, co int) *mv1beta1.PodMetrics { + mx := makePodMx(name) + for i := 0; i < co; i++ { + mx.Containers = append( + mx.Containers, + mv1beta1.ContainerMetrics{ + Name: "c" + strconv.Itoa(i), + Usage: makeRes(cpu, mem)}) + } + + return mx +} + +func makePodMx(name string) *mv1beta1.PodMetrics { + return &mv1beta1.PodMetrics{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + } +} diff --git a/internal/watch/pod_test.go b/internal/watch/pod_test.go new file mode 100644 index 00000000..01682a24 --- /dev/null +++ b/internal/watch/pod_test.go @@ -0,0 +1,87 @@ +package watch + +import ( + "fmt" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func BenchmarkPodFields(b *testing.B) { + p := NewPod(nil, "") + po := makePod() + ff := make(Row, podCols) + + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + p.fields("", po, ff) + } +} + +// ---------------------------------------------------------------------------- +// Helpers... + +func testTime() time.Time { + t, err := time.Parse(time.RFC3339, "2018-12-14T10:36:43.326972-07:00") + if err != nil { + fmt.Println("TestTime Failed", err) + } + return t +} + +func makePod() *v1.Pod { + var i int32 = 1 + var t = v1.HostPathDirectory + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "blee", + Name: "fred", + Labels: map[string]string{"blee": "duh"}, + CreationTimestamp: metav1.Time{Time: testTime()}, + }, + Spec: v1.PodSpec{ + Priority: &i, + PriorityClassName: "bozo", + Containers: []v1.Container{ + { + Name: "fred", + Image: "blee", + Env: []v1.EnvVar{ + { + Name: "fred", + Value: "1", + ValueFrom: &v1.EnvVarSource{ + ConfigMapKeyRef: &v1.ConfigMapKeySelector{Key: "blee"}, + }, + }, + }, + }, + }, + Volumes: []v1.Volume{ + { + Name: "fred", + VolumeSource: v1.VolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/blee", + Type: &t, + }, + }, + }, + }, + }, + Status: v1.PodStatus{ + Phase: "Running", + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "fred", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, + RestartCount: 0, + }, + }, + }, + } +}