diff --git a/internal/watch/no_mx.go b/internal/watch/no_mx.go index 6c9539a6..1e4972d7 100644 --- a/internal/watch/no_mx.go +++ b/internal/watch/no_mx.go @@ -1,6 +1,7 @@ package watch import ( + "errors" "fmt" "time" @@ -112,6 +113,7 @@ func (n *nodeMxWatcher) Run() { for { select { case <-n.doneChan: + close(n.eventChan) return case <-time.After(nodeMXRefresh): list, err := c.MetricsV1beta1().NodeMetricses().List(metav1.ListOptions{}) @@ -127,7 +129,6 @@ func (n *nodeMxWatcher) Run() { func (n *nodeMxWatcher) Stop() { log.Debug().Msg("Stopping NodeMetrix informer!") close(n.doneChan) - close(n.eventChan) } // ResultChan retrieves event channel. @@ -135,6 +136,15 @@ func (n *nodeMxWatcher) ResultChan() <-chan watch.Event { return n.eventChan } +func (n *nodeMxWatcher) notify(event watch.Event) error { + select { + case n.eventChan <- event: + return nil + case <-n.doneChan: + return errors.New("watcher has ben closed.") + } +} + func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { fqns := map[string]runtime.Object{} for i := range list.Items { @@ -144,9 +154,8 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { for k, v := range n.cache { if _, ok := fqns[k]; !ok { if notify { - n.eventChan <- watch.Event{ - Type: watch.Deleted, - Object: v, + if err := n.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil { + return } } delete(n.cache, k) @@ -161,7 +170,9 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { kind = watch.Modified } if notify { - n.eventChan <- watch.Event{Type: kind, Object: v} + if err := n.notify(watch.Event{Type: kind, Object: v}); err != nil { + return + } } n.cache[k] = v } diff --git a/internal/watch/pod_mx.go b/internal/watch/pod_mx.go index 2d9d7b51..01c06202 100644 --- a/internal/watch/pod_mx.go +++ b/internal/watch/pod_mx.go @@ -1,6 +1,7 @@ package watch import ( + "errors" "fmt" "time" @@ -124,6 +125,7 @@ func (p *podMxWatcher) Run() { for { select { case <-p.doneChan: + close(p.eventChan) return case <-time.After(podMXRefresh): list, err := c.MetricsV1beta1().PodMetricses(p.ns).List(metav1.ListOptions{}) @@ -135,6 +137,15 @@ func (p *podMxWatcher) Run() { } } +func (p *podMxWatcher) notify(event watch.Event) error { + select { + case p.eventChan <- event: + return nil + case <-p.doneChan: + return errors.New("watcher has ben closed.") + } +} + func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { fqns := map[string]runtime.Object{} for i := range list.Items { @@ -145,9 +156,8 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { for k, v := range p.cache { if _, ok := fqns[k]; !ok { if notify { - p.eventChan <- watch.Event{ - Type: watch.Deleted, - Object: v, + if err := p.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil { + return } } delete(p.cache, k) @@ -163,7 +173,9 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { kind = watch.Modified } if notify { - p.eventChan <- watch.Event{Type: kind, Object: v} + if err := p.notify(watch.Event{Type: kind, Object: v}); err != nil { + return + } } p.cache[k] = v } @@ -173,7 +185,6 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { func (p *podMxWatcher) Stop() { log.Debug().Msg("Stopping PodMetrix informer!!") close(p.doneChan) - close(p.eventChan) } // ResultChan retrieves event channel.