// SPDX-License-Identifier: Apache-2.0 // Copyright Authors of K9s package client import ( "context" "errors" "fmt" "log/slog" "os" "path/filepath" "strings" "sync" "time" "github.com/derailed/k9s/internal/slogs" authorizationv1 "k8s.io/api/authorization/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/version" "k8s.io/client-go/discovery/cached/disk" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" metricsapi "k8s.io/metrics/pkg/apis/metrics" "k8s.io/metrics/pkg/client/clientset/versioned" ) const ( cacheSize = 100 cacheExpiry = 5 * time.Minute cacheMXAPIKey = "metricsAPI" serverVersion = "serverVersion" cacheNSKey = "validNamespaces" ) var supportedMetricsAPIVersions = []string{"v1beta1"} // NamespaceNames tracks a collection of namespace names. type NamespaceNames map[string]struct{} // APIClient represents a Kubernetes api client. type APIClient struct { client, logClient kubernetes.Interface dClient dynamic.Interface nsClient dynamic.NamespaceableResourceInterface mxsClient *versioned.Clientset cachedClient *disk.CachedDiscoveryClient config *Config mx sync.RWMutex cache *cache.LRUExpireCache connOK bool log *slog.Logger } // NewTestAPIClient for testing ONLY!! func NewTestAPIClient() *APIClient { return &APIClient{ config: NewConfig(nil), cache: cache.NewLRUExpireCache(cacheSize), } } // InitConnection initialize connection from command line args. // Checks for connectivity with the api server. func InitConnection(config *Config, log *slog.Logger) (*APIClient, error) { a := APIClient{ config: config, cache: cache.NewLRUExpireCache(cacheSize), connOK: true, log: log.With(slogs.Subsys, "client"), } err := a.supportsMetricsResources() if err != nil { slog.Warn("Fail to locate metrics-server", slogs.Error, err) } if err == nil || errors.Is(err, noMetricServerErr) || errors.Is(err, metricsUnsupportedErr) { return &a, nil } a.connOK = false return &a, err } // ConnectionOK returns connection status. func (a *APIClient) ConnectionOK() bool { return a.connOK } func makeSAR(ns string, gvr *GVR, name string) *authorizationv1.SelfSubjectAccessReview { if ns == ClusterScope { ns = BlankNamespace } res := gvr.GVR() return &authorizationv1.SelfSubjectAccessReview{ Spec: authorizationv1.SelfSubjectAccessReviewSpec{ ResourceAttributes: &authorizationv1.ResourceAttributes{ Namespace: ns, Group: res.Group, Version: res.Version, Resource: res.Resource, Subresource: gvr.SubResource(), Name: name, }, }, } } func makeCacheKey(ns string, gvr *GVR, n string, vv []string) string { return ns + ":" + gvr.String() + ":" + n + "::" + strings.Join(vv, ",") } // ActiveContext returns the current context name. func (a *APIClient) ActiveContext() string { c, err := a.config.CurrentContextName() if err != nil { slog.Error("unable to located active cluster", slogs.Error, err) return "" } return c } // IsActiveNamespace returns true if namespaces matches. func (a *APIClient) IsActiveNamespace(ns string) bool { if a.ActiveNamespace() == BlankNamespace { return true } return a.ActiveNamespace() == ns } // ActiveNamespace returns the current namespace. func (a *APIClient) ActiveNamespace() string { if ns, err := a.CurrentNamespaceName(); err == nil { return ns } return BlankNamespace } func (a *APIClient) clearCache() { for _, k := range a.cache.Keys() { a.cache.Remove(k) } } // CanI checks if user has access to a certain resource. func (a *APIClient) CanI(ns string, gvr *GVR, name string, verbs []string) (auth bool, err error) { if !a.getConnOK() { return false, errors.New("ACCESS -- No API server connection") } if gvr == NsGVR { // The name of the namespace is required to check permissions in some cases ns = name } if IsClusterWide(ns) { ns = BlankNamespace } if gvr == HmGVR { // helm stores release data in secrets gvr = SecGVR } key := makeCacheKey(ns, gvr, name, verbs) if v, ok := a.cache.Get(key); ok { if auth, ok = v.(bool); ok { return auth, nil } } clog := a.log.With(slogs.Subsys, "can") dial, err := a.Dial() if err != nil { return false, err } client, sar := dial.AuthorizationV1().SelfSubjectAccessReviews(), makeSAR(ns, gvr, name) ctx, cancel := context.WithTimeout(context.Background(), a.config.CallTimeout()) defer cancel() for _, v := range verbs { sar.Spec.ResourceAttributes.Verb = v resp, err := client.Create(ctx, sar, metav1.CreateOptions{}) clog.Debug("[CAN] access", slogs.GVR, gvr, slogs.Namespace, ns, slogs.ResName, name, slogs.Verb, verbs, ) if resp != nil { clog.Debug("[CAN] response", slogs.AuthStatus, resp.Status.Allowed, slogs.AuthReason, resp.Status.Reason, ) } if err != nil { clog.Warn("Auth request failed", slogs.Error, err) a.cache.Add(key, false, cacheExpiry) return auth, err } if !resp.Status.Allowed { a.cache.Add(key, false, cacheExpiry) return auth, fmt.Errorf("(%s) access denied for user on resource %q:%s in namespace %q", v, name, gvr, ns) } } auth = true a.cache.Add(key, true, cacheExpiry) return } // CurrentNamespaceName return namespace name set via either cli arg or cluster config. func (a *APIClient) CurrentNamespaceName() (string, error) { return a.config.CurrentNamespaceName() } // ServerVersion returns the current server version info. func (a *APIClient) ServerVersion() (*version.Info, error) { if v, ok := a.cache.Get(serverVersion); ok { if vi, ok := v.(*version.Info); ok { return vi, nil } } dial, err := a.CachedDiscovery() if err != nil { return nil, err } info, err := dial.ServerVersion() if err != nil { return nil, err } a.cache.Add(serverVersion, info, cacheExpiry) return info, nil } func (a *APIClient) IsValidNamespace(ns string) bool { ok, err := a.isValidNamespace(ns) if err != nil { slog.Warn("Namespace validation failed", slogs.Namespace, ns, slogs.Error, err, ) } return ok } func (a *APIClient) isValidNamespace(n string) (bool, error) { if IsClusterWide(n) || n == NotNamespaced { return true, nil } nn, err := a.ValidNamespaceNames() if err != nil { return false, err } _, ok := nn[n] return ok, nil } // ValidNamespaceNames returns all available namespaces. func (a *APIClient) ValidNamespaceNames() (NamespaceNames, error) { if a == nil { return nil, fmt.Errorf("validNamespaces: no available client found") } if nn, ok := a.cache.Get(cacheNSKey); ok { if nss, ok := nn.(NamespaceNames); ok { return nss, nil } } ok, err := a.CanI(ClusterScope, NsGVR, "", ListAccess) if !ok || err != nil { a.cache.Add(cacheNSKey, NamespaceNames{}, cacheExpiry) return nil, fmt.Errorf("user not authorized to list all namespaces") } dial, err := a.Dial() if err != nil { return nil, err } ctx, cancel := context.WithTimeout(context.Background(), a.config.CallTimeout()) defer cancel() nn, err := dial.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } nns := make(NamespaceNames, len(nn.Items)) for i := range nn.Items { nns[nn.Items[i].Name] = struct{}{} } a.cache.Add(cacheNSKey, nns, cacheExpiry) return nns, nil } // CheckConnectivity return true if api server is cool or false otherwise. func (a *APIClient) CheckConnectivity() bool { defer func() { if err := recover(); err != nil { a.setConnOK(false) } if !a.getConnOK() { a.clearCache() } }() cfg, err := a.config.RESTConfig() if err != nil { slog.Error("RestConfig load failed", slogs.Error, err) a.connOK = false return a.connOK } cfg.Timeout = a.config.CallTimeout() client, err := kubernetes.NewForConfig(cfg) if err != nil { slog.Error("Unable to connect to api server", slogs.Error, err) a.setConnOK(false) return a.getConnOK() } // Check connection if _, err := client.ServerVersion(); err == nil { if !a.getConnOK() { a.reset() } } else { slog.Error("Unable to fetch server version", slogs.Error, err) a.setConnOK(false) } return a.getConnOK() } // Config return a kubernetes configuration. func (a *APIClient) Config() *Config { return a.config } // HasMetrics checks if the cluster supports metrics. func (a *APIClient) HasMetrics() bool { return a.supportsMetricsResources() == nil } func (a *APIClient) getMxsClient() *versioned.Clientset { a.mx.RLock() defer a.mx.RUnlock() return a.mxsClient } func (a *APIClient) setMxsClient(c *versioned.Clientset) { a.mx.Lock() defer a.mx.Unlock() a.mxsClient = c } func (a *APIClient) getCachedClient() *disk.CachedDiscoveryClient { a.mx.RLock() defer a.mx.RUnlock() return a.cachedClient } func (a *APIClient) setCachedClient(c *disk.CachedDiscoveryClient) { a.mx.Lock() defer a.mx.Unlock() a.cachedClient = c } func (a *APIClient) getDClient() dynamic.Interface { a.mx.RLock() defer a.mx.RUnlock() return a.dClient } func (a *APIClient) setDClient(c dynamic.Interface) { a.mx.Lock() defer a.mx.Unlock() a.dClient = c } func (a *APIClient) getConnOK() bool { a.mx.RLock() defer a.mx.RUnlock() return a.connOK } func (a *APIClient) setConnOK(b bool) { a.mx.Lock() defer a.mx.Unlock() a.connOK = b } func (a *APIClient) setLogClient(k kubernetes.Interface) { a.mx.Lock() defer a.mx.Unlock() a.logClient = k } func (a *APIClient) getLogClient() kubernetes.Interface { a.mx.RLock() defer a.mx.RUnlock() return a.logClient } func (a *APIClient) setClient(k kubernetes.Interface) { a.mx.Lock() defer a.mx.Unlock() a.client = k } func (a *APIClient) getClient() kubernetes.Interface { a.mx.RLock() defer a.mx.RUnlock() return a.client } // DialLogs returns a handle to api server for logs. func (a *APIClient) DialLogs() (kubernetes.Interface, error) { if !a.getConnOK() { return nil, errors.New("dialLogs - no connection to dial") } if clt := a.getLogClient(); clt != nil { return clt, nil } cfg, err := a.RestConfig() if err != nil { return nil, err } cfg.Timeout = 0 c, err := kubernetes.NewForConfig(cfg) if err != nil { return nil, err } a.setLogClient(c) return a.getLogClient(), nil } // Dial returns a handle to api server or die. func (a *APIClient) Dial() (kubernetes.Interface, error) { if !a.getConnOK() { return nil, errors.New("no connection to dial") } if c := a.getClient(); c != nil { return c, nil } cfg, err := a.RestConfig() if err != nil { return nil, err } c, err := kubernetes.NewForConfig(cfg) if err != nil { return nil, err } a.setClient(c) return a.getClient(), nil } // RestConfig returns a rest api client. func (a *APIClient) RestConfig() (*restclient.Config, error) { return a.config.RESTConfig() } // CachedDiscovery returns a cached discovery client. func (a *APIClient) CachedDiscovery() (*disk.CachedDiscoveryClient, error) { if !a.getConnOK() { return nil, errors.New("no connection to cached dial") } if c := a.getCachedClient(); c != nil { return c, nil } cfg, err := a.RestConfig() if err != nil { return nil, err } baseCacheDir := os.Getenv("KUBECACHEDIR") if baseCacheDir == "" { baseCacheDir = filepath.Join(mustHomeDir(), ".kube", "cache") } httpCacheDir := filepath.Join(baseCacheDir, "http") discCacheDir := filepath.Join(baseCacheDir, "discovery", toHostDir(cfg.Host)) c, err := disk.NewCachedDiscoveryClientForConfig(cfg, discCacheDir, httpCacheDir, cacheExpiry) if err != nil { return nil, err } a.setCachedClient(c) return a.getCachedClient(), nil } // DynDial returns a handle to a dynamic interface. func (a *APIClient) DynDial() (dynamic.Interface, error) { if c := a.getDClient(); c != nil { return c, nil } cfg, err := a.RestConfig() if err != nil { return nil, err } c, err := dynamic.NewForConfig(cfg) if err != nil { return nil, err } a.setDClient(c) return a.getDClient(), nil } // MXDial returns a handle to the metrics server. func (a *APIClient) MXDial() (*versioned.Clientset, error) { if c := a.getMxsClient(); c != nil { return c, nil } cfg, err := a.RestConfig() if err != nil { return nil, err } c, err := versioned.NewForConfig(cfg) if err != nil { return nil, err } a.setMxsClient(c) return a.getMxsClient(), err } func (a *APIClient) invalidateCache() error { dial, err := a.CachedDiscovery() if err != nil { return err } dial.Invalidate() return nil } // SwitchContext handles kubeconfig context switches. func (a *APIClient) SwitchContext(name string) error { slog.Debug("Switching context", slogs.Context, name) if err := a.config.SwitchContext(name); err != nil { return err } if !a.CheckConnectivity() { slog.Debug("No connectivity, skipping cache invalidation") } else if err := a.invalidateCache(); err != nil { return err } a.reset() ResetMetrics() // Need reload to pick up any kubeconfig changes. a.config = NewConfig(a.config.flags) return a.invalidateCache() } func (a *APIClient) reset() { a.config.reset() a.cache = cache.NewLRUExpireCache(cacheSize) a.nsClient = nil a.setDClient(nil) a.setMxsClient(nil) a.setCachedClient(nil) a.setClient(nil) a.setLogClient(nil) a.setConnOK(true) } func (a *APIClient) checkCacheBool(key string) (state, ok bool) { v, found := a.cache.Get(key) if !found { return } state, ok = v.(bool) return } func (a *APIClient) supportsMetricsResources() error { supported, ok := a.checkCacheBool(cacheMXAPIKey) if ok { if supported { return nil } return noMetricServerErr } defer func() { a.cache.Add(cacheMXAPIKey, supported, cacheExpiry) }() dial, err := a.Dial() if err != nil { slog.Warn("Unable to dial API client for metrics", slogs.Error, err) return err } apiGroups, err := dial.Discovery().ServerGroups() if err != nil { return err } for i := range apiGroups.Groups { if apiGroups.Groups[i].Name != metricsapi.GroupName { continue } if checkMetricsVersion(&(apiGroups.Groups[i])) { supported = true return nil } } return metricsUnsupportedErr } func checkMetricsVersion(grp *metav1.APIGroup) bool { for _, v := range grp.Versions { for _, supportedVersion := range supportedMetricsAPIVersions { if v.Version == supportedVersion { return true } } } return false }