From 8dcd72b0f7d31b3a1247d472cd516624cb7107d4 Mon Sep 17 00:00:00 2001 From: Gustavo Paiva Date: Sat, 19 Oct 2019 01:12:50 -0300 Subject: [PATCH] try to write or quit informer if it has been stopped. --- internal/watch/no_mx.go | 19 +++++++++++++++---- internal/watch/pod_mx.go | 19 +++++++++++++++---- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/internal/watch/no_mx.go b/internal/watch/no_mx.go index 4f4cd708..1e4972d7 100644 --- a/internal/watch/no_mx.go +++ b/internal/watch/no_mx.go @@ -1,6 +1,7 @@ package watch import ( + "errors" "fmt" "time" @@ -135,6 +136,15 @@ func (n *nodeMxWatcher) ResultChan() <-chan watch.Event { return n.eventChan } +func (n *nodeMxWatcher) notify(event watch.Event) error { + select { + case n.eventChan <- event: + return nil + case <-n.doneChan: + return errors.New("watcher has ben closed.") + } +} + func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { fqns := map[string]runtime.Object{} for i := range list.Items { @@ -144,9 +154,8 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { for k, v := range n.cache { if _, ok := fqns[k]; !ok { if notify { - n.eventChan <- watch.Event{ - Type: watch.Deleted, - Object: v, + if err := n.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil { + return } } delete(n.cache, k) @@ -161,7 +170,9 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { kind = watch.Modified } if notify { - n.eventChan <- watch.Event{Type: kind, Object: v} + if err := n.notify(watch.Event{Type: kind, Object: v}); err != nil { + return + } } n.cache[k] = v } diff --git a/internal/watch/pod_mx.go b/internal/watch/pod_mx.go index 03a30262..01c06202 100644 --- a/internal/watch/pod_mx.go +++ b/internal/watch/pod_mx.go @@ -1,6 +1,7 @@ package watch import ( + "errors" "fmt" "time" @@ -136,6 +137,15 @@ func (p *podMxWatcher) Run() { } } +func (p *podMxWatcher) notify(event watch.Event) error { + select { + case p.eventChan <- event: + return nil + case <-p.doneChan: + return errors.New("watcher has ben closed.") + } +} + func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { fqns := map[string]runtime.Object{} for i := range list.Items { @@ -146,9 +156,8 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { for k, v := range p.cache { if _, ok := fqns[k]; !ok { if notify { - p.eventChan <- watch.Event{ - Type: watch.Deleted, - Object: v, + if err := p.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil { + return } } delete(p.cache, k) @@ -164,7 +173,9 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { kind = watch.Modified } if notify { - p.eventChan <- watch.Event{Type: kind, Object: v} + if err := p.notify(watch.Event{Type: kind, Object: v}); err != nil { + return + } } p.cache[k] = v }