Kubernetes components: add "kubeconfigPath" metadata (#3060)
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
parent
9fdad8f3f7
commit
ecf14bc713
|
|
@ -17,13 +17,14 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
|
@ -32,28 +33,27 @@ import (
|
|||
kubeclient "github.com/dapr/components-contrib/internal/authentication/kubernetes"
|
||||
"github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/kit/logger"
|
||||
"github.com/dapr/kit/ptr"
|
||||
)
|
||||
|
||||
type kubernetesInput struct {
|
||||
kubeClient kubernetes.Interface
|
||||
namespace string
|
||||
resyncPeriod time.Duration
|
||||
logger logger.Logger
|
||||
closed atomic.Bool
|
||||
closeCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
metadata kubernetesMetadata
|
||||
kubeClient kubernetes.Interface
|
||||
logger logger.Logger
|
||||
closed atomic.Bool
|
||||
closeCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
type EventResponse struct {
|
||||
Event string `json:"event"`
|
||||
OldVal v1.Event `json:"oldVal"`
|
||||
NewVal v1.Event `json:"newVal"`
|
||||
Event string `json:"event"`
|
||||
OldVal corev1.Event `json:"oldVal"`
|
||||
NewVal corev1.Event `json:"newVal"`
|
||||
}
|
||||
|
||||
type kubernetesMetadata struct {
|
||||
Namespace string `mapstructure:"namespace"`
|
||||
ResyncPeriod *time.Duration `mapstructure:"resyncPeriodInSec"`
|
||||
Namespace string `mapstructure:"namespace"`
|
||||
KubeconfigPath string `mapstructure:"kubeconfigPath"`
|
||||
ResyncPeriod time.Duration `mapstructure:"resyncPeriod" mapstructurealiases:"resyncPeriodInSec"`
|
||||
}
|
||||
|
||||
// NewKubernetes returns a new Kubernetes event input binding.
|
||||
|
|
@ -65,32 +65,42 @@ func NewKubernetes(logger logger.Logger) bindings.InputBinding {
|
|||
}
|
||||
|
||||
func (k *kubernetesInput) Init(ctx context.Context, metadata bindings.Metadata) error {
|
||||
client, err := kubeclient.GetKubeClient(k.logger)
|
||||
err := k.parseMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to parse metadata: %w", err)
|
||||
}
|
||||
|
||||
kubeconfigPath := k.metadata.KubeconfigPath
|
||||
if kubeconfigPath == "" {
|
||||
kubeconfigPath = kubeclient.GetKubeconfigPath(k.logger, os.Args)
|
||||
}
|
||||
|
||||
client, err := kubeclient.GetKubeClient(kubeconfigPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize Kubernetes client: %w", err)
|
||||
}
|
||||
k.kubeClient = client
|
||||
|
||||
return k.parseMetadata(metadata)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *kubernetesInput) parseMetadata(meta bindings.Metadata) error {
|
||||
m := kubernetesMetadata{}
|
||||
err := metadata.DecodeMetadata(meta.Properties, &m)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "resyncPeriodInSec") {
|
||||
k.logger.Warnf("invalid resyncPeriodInSec; %v; defaulting to 10s", err)
|
||||
m.ResyncPeriod = ptr.Of(time.Second * 10)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
// Set default values
|
||||
k.metadata = kubernetesMetadata{
|
||||
ResyncPeriod: 10 * time.Second,
|
||||
}
|
||||
k.resyncPeriod = *m.ResyncPeriod
|
||||
|
||||
if m.Namespace == "" {
|
||||
// Decode
|
||||
err := metadata.DecodeMetadata(meta.Properties, &k.metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Validate
|
||||
if k.metadata.Namespace == "" {
|
||||
return errors.New("namespace is missing in metadata")
|
||||
}
|
||||
k.namespace = m.Namespace
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
@ -101,21 +111,21 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er
|
|||
watchlist := cache.NewListWatchFromClient(
|
||||
k.kubeClient.CoreV1().RESTClient(),
|
||||
"events",
|
||||
k.namespace,
|
||||
k.metadata.Namespace,
|
||||
fields.Everything(),
|
||||
)
|
||||
resultChan := make(chan EventResponse)
|
||||
_, controller := cache.NewInformer(
|
||||
watchlist,
|
||||
&v1.Event{},
|
||||
k.resyncPeriod,
|
||||
&corev1.Event{},
|
||||
k.metadata.ResyncPeriod,
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
if obj != nil {
|
||||
resultChan <- EventResponse{
|
||||
Event: "add",
|
||||
NewVal: *(obj.(*v1.Event)),
|
||||
OldVal: v1.Event{},
|
||||
NewVal: *(obj.(*corev1.Event)),
|
||||
OldVal: corev1.Event{},
|
||||
}
|
||||
} else {
|
||||
k.logger.Warnf("Nil Object in Add handle %v", obj)
|
||||
|
|
@ -125,8 +135,8 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er
|
|||
if obj != nil {
|
||||
resultChan <- EventResponse{
|
||||
Event: "delete",
|
||||
OldVal: *(obj.(*v1.Event)),
|
||||
NewVal: v1.Event{},
|
||||
OldVal: *(obj.(*corev1.Event)),
|
||||
NewVal: corev1.Event{},
|
||||
}
|
||||
} else {
|
||||
k.logger.Warnf("Nil Object in Delete handle %v", obj)
|
||||
|
|
@ -136,8 +146,8 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er
|
|||
if oldObj != nil && newObj != nil {
|
||||
resultChan <- EventResponse{
|
||||
Event: "update",
|
||||
OldVal: *(oldObj.(*v1.Event)),
|
||||
NewVal: *(newObj.(*v1.Event)),
|
||||
OldVal: *(oldObj.(*corev1.Event)),
|
||||
NewVal: *(newObj.(*corev1.Event)),
|
||||
}
|
||||
} else {
|
||||
k.logger.Warnf("Nil Objects in Update handle %v %v", oldObj, newObj)
|
||||
|
|
@ -159,7 +169,7 @@ func (k *kubernetesInput) Read(ctx context.Context, handler bindings.Handler) er
|
|||
}
|
||||
}()
|
||||
|
||||
// Start the controller in backgound
|
||||
// Start the controller in background
|
||||
go func() {
|
||||
defer k.wg.Done()
|
||||
controller.Run(readCtx.Done())
|
||||
|
|
|
|||
|
|
@ -33,8 +33,8 @@ func TestParseMetadata(t *testing.T) {
|
|||
i := kubernetesInput{logger: logger.NewLogger("test")}
|
||||
i.parseMetadata(m)
|
||||
|
||||
assert.Equal(t, nsName, i.namespace, "The namespaces should be the same.")
|
||||
assert.Equal(t, resyncPeriod, i.resyncPeriod, "The resyncPeriod should be the same.")
|
||||
assert.Equal(t, nsName, i.metadata.Namespace, "The namespaces should be the same.")
|
||||
assert.Equal(t, resyncPeriod, i.metadata.ResyncPeriod, "The resyncPeriod should be the same.")
|
||||
})
|
||||
t.Run("parse metadata no namespace", func(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
|
|
@ -43,18 +43,7 @@ func TestParseMetadata(t *testing.T) {
|
|||
i := kubernetesInput{logger: logger.NewLogger("test")}
|
||||
err := i.parseMetadata(m)
|
||||
|
||||
assert.NotNil(t, err, "Expected err to be returned.")
|
||||
assert.Equal(t, "namespace is missing in metadata", err.Error(), "Error message not same.")
|
||||
})
|
||||
t.Run("parse metadata invalid resync period", func(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = map[string]string{"namespace": nsName, "resyncPeriodInSec": "invalid"}
|
||||
|
||||
i := kubernetesInput{logger: logger.NewLogger("test")}
|
||||
err := i.parseMetadata(m)
|
||||
|
||||
assert.Nil(t, err, "Expected err to be nil.")
|
||||
assert.Equal(t, nsName, i.namespace, "The namespaces should be the same.")
|
||||
assert.Equal(t, time.Second*10, i.resyncPeriod, "The resyncPeriod should be the same.")
|
||||
assert.Error(t, err, "Expected err to be returned.")
|
||||
assert.ErrorContains(t, err, "namespace is missing in metadata", "Error message not same.")
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
|
@ -65,7 +66,11 @@ func (k *kubeSecretsCrypto) Init(_ context.Context, metadata contribCrypto.Metad
|
|||
}
|
||||
|
||||
// Init Kubernetes client
|
||||
k.kubeClient, err = kubeclient.GetKubeClient(k.logger)
|
||||
kubeconfigPath := k.md.KubeconfigPath
|
||||
if kubeconfigPath == "" {
|
||||
kubeconfigPath = kubeclient.GetKubeconfigPath(k.logger, os.Args)
|
||||
}
|
||||
k.kubeClient, err = kubeclient.GetKubeClient(kubeconfigPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to init Kubernetes client: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,8 +20,12 @@ import (
|
|||
|
||||
type secretsMetadata struct {
|
||||
// Default namespace to retrieve secrets from.
|
||||
// If unset, the namespace must be specified for each key, as `namespace/secretName/key`
|
||||
// If unset, the namespace must be specified for each key, as `namespace/secretName/key`.
|
||||
DefaultNamespace string `json:"defaultNamespace" mapstructure:"defaultNamespace"`
|
||||
|
||||
// Path to a kubeconfig file.
|
||||
// If empty, uses the default values.
|
||||
KubeconfigPath string `json:"kubeconfigPath" mapstructure:"kubeconfigPath"`
|
||||
}
|
||||
|
||||
func (m *secretsMetadata) InitWithMetadata(meta contribCrypto.Metadata) error {
|
||||
|
|
|
|||
|
|
@ -39,7 +39,7 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
func getKubeconfigPath(log logger.Logger, args []string) string {
|
||||
func GetKubeconfigPath(log logger.Logger, args []string) string {
|
||||
// Check if the path is set via the CLI flag `--kubeconfig`
|
||||
// This is deprecated but kept for backwards compatibility
|
||||
var cliVal string
|
||||
|
|
@ -73,10 +73,10 @@ func getKubeconfigPath(log logger.Logger, args []string) string {
|
|||
}
|
||||
|
||||
// GetKubeClient returns a kubernetes client.
|
||||
func GetKubeClient(log logger.Logger) (*kubernetes.Clientset, error) {
|
||||
func GetKubeClient(kubeconfig string) (*kubernetes.Clientset, error) {
|
||||
conf, err := rest.InClusterConfig()
|
||||
if err != nil {
|
||||
conf, err = clientcmd.BuildConfigFromFlags("", getKubeconfigPath(log, os.Args))
|
||||
conf, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,7 +53,7 @@ func TestGetKubeconfigPath(t *testing.T) {
|
|||
if args == nil {
|
||||
args = []string{}
|
||||
}
|
||||
if got := getKubeconfigPath(log, args); got != tt.want {
|
||||
if got := GetKubeconfigPath(log, args); got != tt.want {
|
||||
t.Errorf("getKubeconfigPath() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
|
|
|
|||
|
|
@ -13,13 +13,13 @@ limitations under the License.
|
|||
|
||||
package kubernetes
|
||||
|
||||
//nolint:nosnakecase
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
kubeclient "github.com/dapr/components-contrib/internal/authentication/kubernetes"
|
||||
|
|
@ -32,6 +32,7 @@ var _ secretstores.SecretStore = (*kubernetesSecretStore)(nil)
|
|||
|
||||
type kubernetesSecretStore struct {
|
||||
kubeClient kubernetes.Interface
|
||||
md kubernetesMetadata
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
|
|
@ -42,11 +43,21 @@ func NewKubernetesSecretStore(logger logger.Logger) secretstores.SecretStore {
|
|||
|
||||
// Init creates a Kubernetes client.
|
||||
func (k *kubernetesSecretStore) Init(_ context.Context, metadata secretstores.Metadata) error {
|
||||
client, err := kubeclient.GetKubeClient(k.logger)
|
||||
// Init metadata
|
||||
err := k.md.InitWithMetadata(metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to load metadata: %w", err)
|
||||
}
|
||||
|
||||
// Init Kubernetes client
|
||||
kubeconfigPath := k.md.KubeconfigPath
|
||||
if kubeconfigPath == "" {
|
||||
kubeconfigPath = kubeclient.GetKubeconfigPath(k.logger, os.Args)
|
||||
}
|
||||
k.kubeClient, err = kubeclient.GetKubeClient(kubeconfigPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k.kubeClient = client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
@ -61,7 +72,7 @@ func (k *kubernetesSecretStore) GetSecret(ctx context.Context, req secretstores.
|
|||
return resp, err
|
||||
}
|
||||
|
||||
secret, err := k.kubeClient.CoreV1().Secrets(namespace).Get(ctx, req.Name, meta_v1.GetOptions{}) //nolint:nosnakecase
|
||||
secret, err := k.kubeClient.CoreV1().Secrets(namespace).Get(ctx, req.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
|
@ -83,7 +94,7 @@ func (k *kubernetesSecretStore) BulkGetSecret(ctx context.Context, req secretsto
|
|||
return resp, err
|
||||
}
|
||||
|
||||
secrets, err := k.kubeClient.CoreV1().Secrets(namespace).List(ctx, meta_v1.ListOptions{}) //nolint:nosnakecase
|
||||
secrets, err := k.kubeClient.CoreV1().Secrets(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return resp, err
|
||||
}
|
||||
|
|
@ -103,12 +114,15 @@ func (k *kubernetesSecretStore) getNamespaceFromMetadata(metadata map[string]str
|
|||
return val, nil
|
||||
}
|
||||
|
||||
val := os.Getenv("NAMESPACE")
|
||||
if val != "" {
|
||||
if val := os.Getenv("NAMESPACE"); val != "" {
|
||||
return val, nil
|
||||
}
|
||||
|
||||
return "", errors.New("namespace is missing on metadata and NAMESPACE env variable")
|
||||
if k.md.DefaultNamespace != "" {
|
||||
return k.md.DefaultNamespace, nil
|
||||
}
|
||||
|
||||
return "", errors.New("namespace is missing on metadata and NAMESPACE env variable, and no default namespace is set")
|
||||
}
|
||||
|
||||
// Features returns the features available in this secret store.
|
||||
|
|
|
|||
|
|
@ -28,16 +28,16 @@ func TestGetNamespace(t *testing.T) {
|
|||
namespace := "a"
|
||||
|
||||
ns, err := store.getNamespaceFromMetadata(map[string]string{"namespace": namespace})
|
||||
assert.Nil(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, namespace, ns)
|
||||
})
|
||||
|
||||
t.Run("has namespace env", func(t *testing.T) {
|
||||
store := kubernetesSecretStore{logger: logger.NewLogger("test")}
|
||||
os.Setenv("NAMESPACE", "b")
|
||||
t.Setenv("NAMESPACE", "b")
|
||||
|
||||
ns, err := store.getNamespaceFromMetadata(map[string]string{})
|
||||
assert.Nil(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "b", ns)
|
||||
})
|
||||
|
||||
|
|
@ -46,8 +46,21 @@ func TestGetNamespace(t *testing.T) {
|
|||
os.Setenv("NAMESPACE", "")
|
||||
_, err := store.getNamespaceFromMetadata(map[string]string{})
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, "namespace is missing on metadata and NAMESPACE env variable", err.Error())
|
||||
assert.Error(t, err)
|
||||
assert.ErrorContains(t, err, "namespace is missing")
|
||||
})
|
||||
|
||||
t.Run("has default namespace", func(t *testing.T) {
|
||||
store := kubernetesSecretStore{
|
||||
logger: logger.NewLogger("test"),
|
||||
md: kubernetesMetadata{
|
||||
DefaultNamespace: "c",
|
||||
},
|
||||
}
|
||||
|
||||
ns, err := store.getNamespaceFromMetadata(map[string]string{})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, "c", ns)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"github.com/dapr/components-contrib/metadata"
|
||||
"github.com/dapr/components-contrib/secretstores"
|
||||
)
|
||||
|
||||
type kubernetesMetadata struct {
|
||||
// Default namespace to retrieve secrets from.
|
||||
// If unset, the namespace must be specified for each key, as `namespace/secretName/key`.
|
||||
DefaultNamespace string `json:"defaultNamespace" mapstructure:"defaultNamespace"`
|
||||
|
||||
// Path to a kubeconfig file.
|
||||
// If empty, uses the default values.
|
||||
KubeconfigPath string `json:"kubeconfigPath" mapstructure:"kubeconfigPath"`
|
||||
}
|
||||
|
||||
func (m *kubernetesMetadata) InitWithMetadata(meta secretstores.Metadata) error {
|
||||
m.reset()
|
||||
|
||||
// Decode the metadata
|
||||
err := metadata.DecodeMetadata(meta.Properties, &m)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reset the object
|
||||
func (m *kubernetesMetadata) reset() {
|
||||
m.DefaultNamespace = ""
|
||||
}
|
||||
Loading…
Reference in New Issue