Merge pull request #372 from paivagustavo/fix-race-condition-when-switching-ns

fix race condition when switching ns.
mine
Fernand Galiana 2019-10-19 06:39:46 -06:00 committed by GitHub
commit 4a5908446d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 32 additions and 10 deletions

View File

@ -1,6 +1,7 @@
package watch package watch
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
@ -112,6 +113,7 @@ func (n *nodeMxWatcher) Run() {
for { for {
select { select {
case <-n.doneChan: case <-n.doneChan:
close(n.eventChan)
return return
case <-time.After(nodeMXRefresh): case <-time.After(nodeMXRefresh):
list, err := c.MetricsV1beta1().NodeMetricses().List(metav1.ListOptions{}) list, err := c.MetricsV1beta1().NodeMetricses().List(metav1.ListOptions{})
@ -127,7 +129,6 @@ func (n *nodeMxWatcher) Run() {
func (n *nodeMxWatcher) Stop() { func (n *nodeMxWatcher) Stop() {
log.Debug().Msg("Stopping NodeMetrix informer!") log.Debug().Msg("Stopping NodeMetrix informer!")
close(n.doneChan) close(n.doneChan)
close(n.eventChan)
} }
// ResultChan retrieves event channel. // ResultChan retrieves event channel.
@ -135,6 +136,15 @@ func (n *nodeMxWatcher) ResultChan() <-chan watch.Event {
return n.eventChan return n.eventChan
} }
func (n *nodeMxWatcher) notify(event watch.Event) error {
select {
case n.eventChan <- event:
return nil
case <-n.doneChan:
return errors.New("watcher has ben closed.")
}
}
func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) { func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) {
fqns := map[string]runtime.Object{} fqns := map[string]runtime.Object{}
for i := range list.Items { for i := range list.Items {
@ -144,9 +154,8 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) {
for k, v := range n.cache { for k, v := range n.cache {
if _, ok := fqns[k]; !ok { if _, ok := fqns[k]; !ok {
if notify { if notify {
n.eventChan <- watch.Event{ if err := n.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil {
Type: watch.Deleted, return
Object: v,
} }
} }
delete(n.cache, k) delete(n.cache, k)
@ -161,7 +170,9 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) {
kind = watch.Modified kind = watch.Modified
} }
if notify { if notify {
n.eventChan <- watch.Event{Type: kind, Object: v} if err := n.notify(watch.Event{Type: kind, Object: v}); err != nil {
return
}
} }
n.cache[k] = v n.cache[k] = v
} }

View File

@ -1,6 +1,7 @@
package watch package watch
import ( import (
"errors"
"fmt" "fmt"
"time" "time"
@ -124,6 +125,7 @@ func (p *podMxWatcher) Run() {
for { for {
select { select {
case <-p.doneChan: case <-p.doneChan:
close(p.eventChan)
return return
case <-time.After(podMXRefresh): case <-time.After(podMXRefresh):
list, err := c.MetricsV1beta1().PodMetricses(p.ns).List(metav1.ListOptions{}) list, err := c.MetricsV1beta1().PodMetricses(p.ns).List(metav1.ListOptions{})
@ -135,6 +137,15 @@ func (p *podMxWatcher) Run() {
} }
} }
func (p *podMxWatcher) notify(event watch.Event) error {
select {
case p.eventChan <- event:
return nil
case <-p.doneChan:
return errors.New("watcher has ben closed.")
}
}
func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) { func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
fqns := map[string]runtime.Object{} fqns := map[string]runtime.Object{}
for i := range list.Items { for i := range list.Items {
@ -145,9 +156,8 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
for k, v := range p.cache { for k, v := range p.cache {
if _, ok := fqns[k]; !ok { if _, ok := fqns[k]; !ok {
if notify { if notify {
p.eventChan <- watch.Event{ if err := p.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil {
Type: watch.Deleted, return
Object: v,
} }
} }
delete(p.cache, k) delete(p.cache, k)
@ -163,7 +173,9 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
kind = watch.Modified kind = watch.Modified
} }
if notify { if notify {
p.eventChan <- watch.Event{Type: kind, Object: v} if err := p.notify(watch.Event{Type: kind, Object: v}); err != nil {
return
}
} }
p.cache[k] = v p.cache[k] = v
} }
@ -173,7 +185,6 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
func (p *podMxWatcher) Stop() { func (p *podMxWatcher) Stop() {
log.Debug().Msg("Stopping PodMetrix informer!!") log.Debug().Msg("Stopping PodMetrix informer!!")
close(p.doneChan) close(p.doneChan)
close(p.eventChan)
} }
// ResultChan retrieves event channel. // ResultChan retrieves event channel.