try to write or quit informer if it has been stopped.
parent
a25be06e4d
commit
8dcd72b0f7
|
|
@ -1,6 +1,7 @@
|
||||||
package watch
|
package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -135,6 +136,15 @@ func (n *nodeMxWatcher) ResultChan() <-chan watch.Event {
|
||||||
return n.eventChan
|
return n.eventChan
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *nodeMxWatcher) notify(event watch.Event) error {
|
||||||
|
select {
|
||||||
|
case n.eventChan <- event:
|
||||||
|
return nil
|
||||||
|
case <-n.doneChan:
|
||||||
|
return errors.New("watcher has ben closed.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) {
|
func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) {
|
||||||
fqns := map[string]runtime.Object{}
|
fqns := map[string]runtime.Object{}
|
||||||
for i := range list.Items {
|
for i := range list.Items {
|
||||||
|
|
@ -144,9 +154,8 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) {
|
||||||
for k, v := range n.cache {
|
for k, v := range n.cache {
|
||||||
if _, ok := fqns[k]; !ok {
|
if _, ok := fqns[k]; !ok {
|
||||||
if notify {
|
if notify {
|
||||||
n.eventChan <- watch.Event{
|
if err := n.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil {
|
||||||
Type: watch.Deleted,
|
return
|
||||||
Object: v,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(n.cache, k)
|
delete(n.cache, k)
|
||||||
|
|
@ -161,7 +170,9 @@ func (n *nodeMxWatcher) update(list *mv1beta1.NodeMetricsList, notify bool) {
|
||||||
kind = watch.Modified
|
kind = watch.Modified
|
||||||
}
|
}
|
||||||
if notify {
|
if notify {
|
||||||
n.eventChan <- watch.Event{Type: kind, Object: v}
|
if err := n.notify(watch.Event{Type: kind, Object: v}); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
n.cache[k] = v
|
n.cache[k] = v
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package watch
|
package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -136,6 +137,15 @@ func (p *podMxWatcher) Run() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *podMxWatcher) notify(event watch.Event) error {
|
||||||
|
select {
|
||||||
|
case p.eventChan <- event:
|
||||||
|
return nil
|
||||||
|
case <-p.doneChan:
|
||||||
|
return errors.New("watcher has ben closed.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
|
func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
|
||||||
fqns := map[string]runtime.Object{}
|
fqns := map[string]runtime.Object{}
|
||||||
for i := range list.Items {
|
for i := range list.Items {
|
||||||
|
|
@ -146,9 +156,8 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
|
||||||
for k, v := range p.cache {
|
for k, v := range p.cache {
|
||||||
if _, ok := fqns[k]; !ok {
|
if _, ok := fqns[k]; !ok {
|
||||||
if notify {
|
if notify {
|
||||||
p.eventChan <- watch.Event{
|
if err := p.notify(watch.Event{Type: watch.Deleted, Object: v}); err != nil {
|
||||||
Type: watch.Deleted,
|
return
|
||||||
Object: v,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
delete(p.cache, k)
|
delete(p.cache, k)
|
||||||
|
|
@ -164,7 +173,9 @@ func (p *podMxWatcher) update(list *mv1beta1.PodMetricsList, notify bool) {
|
||||||
kind = watch.Modified
|
kind = watch.Modified
|
||||||
}
|
}
|
||||||
if notify {
|
if notify {
|
||||||
p.eventChan <- watch.Event{Type: kind, Object: v}
|
if err := p.notify(watch.Event{Type: kind, Object: v}); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
p.cache[k] = v
|
p.cache[k] = v
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue