refactor benchmarks
parent
0b87a4e26c
commit
84eaec85d2
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
|
@ -173,14 +174,7 @@ func tailLogs(ctx context.Context, res k8s.Loggable, c chan<- string, opts LogOp
|
|||
req.Context(ctx)
|
||||
|
||||
var blocked int32 = 1
|
||||
go func(c chan<- string) {
|
||||
select {
|
||||
case <-time.After(defaultTimeout):
|
||||
if atomic.LoadInt32(&blocked) == 1 {
|
||||
log.Debug().Msgf("Closing channel %s:%s", opts.Name, opts.Container)
|
||||
}
|
||||
}
|
||||
}(c)
|
||||
go logsTimeout(blocked, c, opts)
|
||||
|
||||
// This call will block if nothing is in the stream!!
|
||||
stream, err := req.Stream()
|
||||
|
|
@ -190,27 +184,38 @@ func tailLogs(ctx context.Context, res k8s.Loggable, c chan<- string, opts LogOp
|
|||
}
|
||||
|
||||
atomic.StoreInt32(&blocked, 0)
|
||||
go func(c chan<- string) {
|
||||
defer func() {
|
||||
log.Debug().Msgf("Closing stream `%s", opts.Path())
|
||||
stream.Close()
|
||||
}()
|
||||
|
||||
head := opts.NormalizeName()
|
||||
scanner := bufio.NewScanner(stream)
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case c <- head + strings.TrimSpace(scanner.Text()):
|
||||
default:
|
||||
}
|
||||
}
|
||||
}(c)
|
||||
go readLogs(ctx, stream, c, opts)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func logsTimeout(blocked int32, c chan<- string, opts LogOptions) {
|
||||
select {
|
||||
case <-time.After(defaultTimeout):
|
||||
if atomic.LoadInt32(&blocked) == 1 {
|
||||
log.Debug().Msgf("Closing channel %s:%s", opts.Name, opts.Container)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func readLogs(ctx context.Context, stream io.ReadCloser, c chan<- string, opts LogOptions) {
|
||||
defer func() {
|
||||
log.Debug().Msgf("Closing stream `%s", opts.Path())
|
||||
stream.Close()
|
||||
}()
|
||||
|
||||
head := opts.NormalizeName()
|
||||
scanner := bufio.NewScanner(stream)
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case c <- head + strings.TrimSpace(scanner.Text()):
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// List resources for a given namespace.
|
||||
func (r *Pod) List(ns string) (Columnars, error) {
|
||||
pods, err := r.Resource.List(ns)
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/derailed/k9s/internal/config"
|
||||
"github.com/derailed/k9s/internal/resource"
|
||||
"github.com/derailed/tview"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
|
|
@ -136,7 +137,7 @@ func (v *benchView) enterCmd(evt *tcell.EventKey) *tcell.EventKey {
|
|||
return nil
|
||||
}
|
||||
|
||||
data, err := v.loadBenchFile(v.selectedItem)
|
||||
data, err := readBenchFile(v.app.config, v.selectedItem)
|
||||
if err != nil {
|
||||
v.app.flash().errf("Unable to load bench file %s", err)
|
||||
return nil
|
||||
|
|
@ -181,24 +182,8 @@ func (v *benchView) hints() hints {
|
|||
return v.CurrentPage().Item.(hinter).hints()
|
||||
}
|
||||
|
||||
func (v *benchView) benchDir() string {
|
||||
return filepath.Join(K9sBenchDir, v.app.config.K9s.CurrentCluster)
|
||||
}
|
||||
|
||||
func (v *benchView) loadBenchDir() ([]os.FileInfo, error) {
|
||||
return ioutil.ReadDir(v.benchDir())
|
||||
}
|
||||
|
||||
func (v *benchView) loadBenchFile(n string) (string, error) {
|
||||
data, err := ioutil.ReadFile(filepath.Join(v.benchDir(), n))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
|
||||
func (v *benchView) hydrate() resource.TableData {
|
||||
ff, err := v.loadBenchDir()
|
||||
ff, err := loadBenchDir(v.app.config)
|
||||
if err != nil {
|
||||
v.app.flash().errf("Unable to read bench directory %s", err)
|
||||
}
|
||||
|
|
@ -206,7 +191,7 @@ func (v *benchView) hydrate() resource.TableData {
|
|||
data := initTable()
|
||||
blank := make(resource.Row, len(benchHeader))
|
||||
for _, f := range ff {
|
||||
bench, err := v.loadBenchFile(f.Name())
|
||||
bench, err := readBenchFile(v.app.config, f.Name())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("Unable to load bench file %s", f.Name())
|
||||
continue
|
||||
|
|
@ -246,7 +231,7 @@ func (v *benchView) getDetails() *detailsView {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (v *benchView) resetTitle() {
|
||||
func (v *benchView) resetTitle1() {
|
||||
v.SetTitle(fmt.Sprintf(benchTitleFmt, benchTitle, v.getTV().GetRowCount()-1))
|
||||
}
|
||||
|
||||
|
|
@ -275,7 +260,7 @@ func (v *benchView) watchBenchDir(ctx context.Context) error {
|
|||
}
|
||||
}()
|
||||
|
||||
return w.Add(v.benchDir())
|
||||
return w.Add(benchDir(v.app.config))
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
|
@ -345,3 +330,19 @@ func augmentRow(fields resource.Row, data string) {
|
|||
fields[col] = asNum(sum)
|
||||
}
|
||||
}
|
||||
|
||||
func benchDir(cfg *config.Config) string {
|
||||
return filepath.Join(K9sBenchDir, cfg.K9s.CurrentCluster)
|
||||
}
|
||||
|
||||
func loadBenchDir(cfg *config.Config) ([]os.FileInfo, error) {
|
||||
return ioutil.ReadDir(benchDir(cfg))
|
||||
}
|
||||
|
||||
func readBenchFile(cfg *config.Config, n string) (string, error) {
|
||||
data, err := ioutil.ReadFile(filepath.Join(benchDir(cfg), n))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue