Merge pull request #112050 from nilekhc/kms-hot-reload
Implements hot reload of the KMS `EncryptionConfiguration` Kubernetes-commit: e62cfabf9326cdec65e926b697fa1911b4e85da6
This commit is contained in:
commit
767537116c
4
go.mod
4
go.mod
|
|
@ -45,7 +45,7 @@ require (
|
||||||
k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a
|
k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a
|
||||||
k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5
|
k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5
|
||||||
k8s.io/client-go v0.0.0-20221108173010-769443557e04
|
k8s.io/client-go v0.0.0-20221108173010-769443557e04
|
||||||
k8s.io/component-base v0.0.0-20221108213136-021afb59bb71
|
k8s.io/component-base v0.0.0-20221109013135-4e8a9589a311
|
||||||
k8s.io/klog/v2 v2.80.1
|
k8s.io/klog/v2 v2.80.1
|
||||||
k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6
|
k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6
|
||||||
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280
|
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280
|
||||||
|
|
@ -125,6 +125,6 @@ replace (
|
||||||
k8s.io/api => k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a
|
k8s.io/api => k8s.io/api v0.0.0-20221108053748-98c1aa6b3d0a
|
||||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5
|
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5
|
||||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20221108173010-769443557e04
|
k8s.io/client-go => k8s.io/client-go v0.0.0-20221108173010-769443557e04
|
||||||
k8s.io/component-base => k8s.io/component-base v0.0.0-20221108213136-021afb59bb71
|
k8s.io/component-base => k8s.io/component-base v0.0.0-20221109013135-4e8a9589a311
|
||||||
k8s.io/kms => k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6
|
k8s.io/kms => k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6
|
||||||
)
|
)
|
||||||
|
|
|
||||||
4
go.sum
4
go.sum
|
|
@ -992,8 +992,8 @@ k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5 h1:iFAMJ1evvrO6X7dS7EKujS
|
||||||
k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5/go.mod h1:VXMmlsE7YRJ5vyAyWpkKIfFkEbDNpVs0ObpkuQf1WfM=
|
k8s.io/apimachinery v0.0.0-20221108055230-fd8a60496be5/go.mod h1:VXMmlsE7YRJ5vyAyWpkKIfFkEbDNpVs0ObpkuQf1WfM=
|
||||||
k8s.io/client-go v0.0.0-20221108173010-769443557e04 h1:ad7JkOkiLiyMKWHRkmbJgjCzySdkXxRxWeNosATW0mo=
|
k8s.io/client-go v0.0.0-20221108173010-769443557e04 h1:ad7JkOkiLiyMKWHRkmbJgjCzySdkXxRxWeNosATW0mo=
|
||||||
k8s.io/client-go v0.0.0-20221108173010-769443557e04/go.mod h1:O6sEWJ2BPd8Dag831LA1lzC3WnE29nuwUJZZ4H2vlyo=
|
k8s.io/client-go v0.0.0-20221108173010-769443557e04/go.mod h1:O6sEWJ2BPd8Dag831LA1lzC3WnE29nuwUJZZ4H2vlyo=
|
||||||
k8s.io/component-base v0.0.0-20221108213136-021afb59bb71 h1:Qr7dcMdpWjUZUEkZcbvGGQbtTlVR9b9VqQci/G0jzfY=
|
k8s.io/component-base v0.0.0-20221109013135-4e8a9589a311 h1:rUbuNcL4yd5fSmGBMgRYsaodvXqHiu9TwjcmqH2toRY=
|
||||||
k8s.io/component-base v0.0.0-20221108213136-021afb59bb71/go.mod h1:5bp64lK0p+wJD2BFMin7Akfxiwvt58T4iDz2Q+6woBE=
|
k8s.io/component-base v0.0.0-20221109013135-4e8a9589a311/go.mod h1:5bp64lK0p+wJD2BFMin7Akfxiwvt58T4iDz2Q+6woBE=
|
||||||
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
|
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
|
||||||
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
|
k8s.io/klog/v2 v2.80.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
|
||||||
k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6 h1:d/x+J+EPT4UkD2pH39Ms5xKo1IVDfYlzoxowFd99tFg=
|
k8s.io/kms v0.0.0-20221028080743-a9ba1c11c0c6 h1:d/x+J+EPT4UkD2pH39Ms5xKo1IVDfYlzoxowFd99tFg=
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/aes"
|
"crypto/aes"
|
||||||
"crypto/cipher"
|
"crypto/cipher"
|
||||||
|
"crypto/sha256"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -27,6 +28,7 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
|
@ -59,6 +61,7 @@ const (
|
||||||
kmsPluginHealthzPositiveTTL = 20 * time.Second
|
kmsPluginHealthzPositiveTTL = 20 * time.Second
|
||||||
kmsAPIVersionV1 = "v1"
|
kmsAPIVersionV1 = "v1"
|
||||||
kmsAPIVersionV2 = "v2"
|
kmsAPIVersionV2 = "v2"
|
||||||
|
kmsReloadHealthCheckName = "kms-providers"
|
||||||
)
|
)
|
||||||
|
|
||||||
type kmsPluginHealthzResponse struct {
|
type kmsPluginHealthzResponse struct {
|
||||||
|
|
@ -85,7 +88,7 @@ type kmsv2PluginProbe struct {
|
||||||
type kmsHealthChecker []healthz.HealthChecker
|
type kmsHealthChecker []healthz.HealthChecker
|
||||||
|
|
||||||
func (k kmsHealthChecker) Name() string {
|
func (k kmsHealthChecker) Name() string {
|
||||||
return "kms-providers"
|
return kmsReloadHealthCheckName
|
||||||
}
|
}
|
||||||
|
|
||||||
func (k kmsHealthChecker) Check(req *http.Request) error {
|
func (k kmsHealthChecker) Check(req *http.Request) error {
|
||||||
|
|
@ -113,25 +116,51 @@ func (h *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EncryptionConfiguration represents the parsed and normalized encryption configuration for the apiserver.
|
||||||
|
type EncryptionConfiguration struct {
|
||||||
|
// Transformers is a list of value.Transformer that will be used to encrypt and decrypt data.
|
||||||
|
Transformers map[schema.GroupResource]value.Transformer
|
||||||
|
|
||||||
|
// HealthChecks is a list of healthz.HealthChecker that will be used to check the health of the encryption providers.
|
||||||
|
HealthChecks []healthz.HealthChecker
|
||||||
|
|
||||||
|
// EncryptionFileContentHash is the hash of the encryption config file.
|
||||||
|
EncryptionFileContentHash string
|
||||||
|
|
||||||
|
// KMSCloseGracePeriod is the duration we will wait before closing old transformers.
|
||||||
|
// We wait for any in-flight requests to finish by using the duration which is longer than their timeout.
|
||||||
|
KMSCloseGracePeriod time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
// LoadEncryptionConfig parses and validates the encryption config specified by filepath.
|
// LoadEncryptionConfig parses and validates the encryption config specified by filepath.
|
||||||
// It may launch multiple go routines whose lifecycle is controlled by stopCh.
|
// It may launch multiple go routines whose lifecycle is controlled by stopCh.
|
||||||
// If reload is true, or KMS v2 plugins are used with no KMS v1 plugins, the returned slice of health checkers will always be of length 1.
|
// If reload is true, or KMS v2 plugins are used with no KMS v1 plugins, the returned slice of health checkers will always be of length 1.
|
||||||
func LoadEncryptionConfig(filepath string, reload bool, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, error) {
|
func LoadEncryptionConfig(filepath string, reload bool, stopCh <-chan struct{}) (*EncryptionConfiguration, error) {
|
||||||
config, err := loadConfig(filepath, reload)
|
config, contentHash, err := loadConfig(filepath, reload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("error while parsing file: %w", err)
|
return nil, fmt.Errorf("error while parsing file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
transformers, kmsHealthChecks, kmsUsed, err := getTransformerOverridesAndKMSPluginHealthzCheckers(config, stopCh)
|
transformers, kmsHealthChecks, kmsUsed, err := getTransformerOverridesAndKMSPluginHealthzCheckers(config, stopCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("error while building transformers: %w", err)
|
return nil, fmt.Errorf("error while building transformers: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if reload || (kmsUsed.v2Used && !kmsUsed.v1Used) {
|
if reload || (kmsUsed.v2Used && !kmsUsed.v1Used) {
|
||||||
kmsHealthChecks = []healthz.HealthChecker{kmsHealthChecker(kmsHealthChecks)}
|
kmsHealthChecks = []healthz.HealthChecker{kmsHealthChecker(kmsHealthChecks)}
|
||||||
}
|
}
|
||||||
|
|
||||||
return transformers, kmsHealthChecks, nil
|
// KMSTimeout is the duration we will wait before closing old transformers.
|
||||||
|
// The way we calculate is as follows:
|
||||||
|
// 1. Sum all timeouts across all KMS plugins. (check kmsPrefixTransformer for differences between v1 and v2)
|
||||||
|
// 2. Multiply that by 2 (to allow for some buffer)
|
||||||
|
// The reason we sum all timeout is because kmsHealthChecker() will run all health checks serially
|
||||||
|
return &EncryptionConfiguration{
|
||||||
|
Transformers: transformers,
|
||||||
|
HealthChecks: kmsHealthChecks,
|
||||||
|
EncryptionFileContentHash: contentHash,
|
||||||
|
KMSCloseGracePeriod: 2 * kmsUsed.kmsTimeoutSum,
|
||||||
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTransformerOverridesAndKMSPluginHealthzCheckers(config *apiserverconfig.EncryptionConfiguration, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, *kmsState, error) {
|
func getTransformerOverridesAndKMSPluginHealthzCheckers(config *apiserverconfig.EncryptionConfiguration, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, []healthz.HealthChecker, *kmsState, error) {
|
||||||
|
|
@ -168,6 +197,8 @@ func getTransformerOverridesAndKMSPluginProbes(config *apiserverconfig.Encryptio
|
||||||
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
||||||
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
||||||
|
|
||||||
|
kmsUsed.kmsTimeoutSum += used.kmsTimeoutSum
|
||||||
|
|
||||||
// For each resource, create a list of providers to use
|
// For each resource, create a list of providers to use
|
||||||
for _, resource := range resourceConfig.Resources {
|
for _, resource := range resourceConfig.Resources {
|
||||||
resource := resource
|
resource := resource
|
||||||
|
|
@ -262,19 +293,20 @@ func isKMSv2ProviderHealthy(name string, response *envelopekmsv2.StatusResponse)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, error) {
|
// loadConfig parses the encryption configuration file at filepath and returns the parsed config and hash of the file.
|
||||||
|
func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfiguration, string, error) {
|
||||||
f, err := os.Open(filepath)
|
f, err := os.Open(filepath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
|
return nil, "", fmt.Errorf("error opening encryption provider configuration file %q: %w", filepath, err)
|
||||||
}
|
}
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
||||||
data, err := io.ReadAll(f)
|
data, err := io.ReadAll(f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not read contents: %w", err)
|
return nil, "", fmt.Errorf("could not read contents: %w", err)
|
||||||
}
|
}
|
||||||
if len(data) == 0 {
|
if len(data) == 0 {
|
||||||
return nil, fmt.Errorf("encryption provider configuration file %q is empty", filepath)
|
return nil, "", fmt.Errorf("encryption provider configuration file %q is empty", filepath)
|
||||||
}
|
}
|
||||||
|
|
||||||
scheme := runtime.NewScheme()
|
scheme := runtime.NewScheme()
|
||||||
|
|
@ -284,14 +316,14 @@ func loadConfig(filepath string, reload bool) (*apiserverconfig.EncryptionConfig
|
||||||
|
|
||||||
configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
|
configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
|
config, ok := configObj.(*apiserverconfig.EncryptionConfiguration)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("got unexpected config type: %v", gvk)
|
return nil, "", fmt.Errorf("got unexpected config type: %v", gvk)
|
||||||
}
|
}
|
||||||
|
|
||||||
return config, validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
|
return config, computeEncryptionConfigHash(data), validation.ValidateEncryptionConfiguration(config, reload).ToAggregate()
|
||||||
}
|
}
|
||||||
|
|
||||||
func prefixTransformersAndProbes(config apiserverconfig.ResourceConfiguration, stopCh <-chan struct{}) ([]value.PrefixTransformer, []healthChecker, *kmsState, error) {
|
func prefixTransformersAndProbes(config apiserverconfig.ResourceConfiguration, stopCh <-chan struct{}) ([]value.PrefixTransformer, []healthChecker, *kmsState, error) {
|
||||||
|
|
@ -324,6 +356,9 @@ func prefixTransformersAndProbes(config apiserverconfig.ResourceConfiguration, s
|
||||||
probes = append(probes, probe)
|
probes = append(probes, probe)
|
||||||
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
kmsUsed.v1Used = kmsUsed.v1Used || used.v1Used
|
||||||
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
kmsUsed.v2Used = kmsUsed.v2Used || used.v2Used
|
||||||
|
|
||||||
|
// calculate the maximum timeout for all KMS providers
|
||||||
|
kmsUsed.kmsTimeoutSum += used.kmsTimeoutSum
|
||||||
}
|
}
|
||||||
|
|
||||||
case provider.Identity != nil:
|
case provider.Identity != nil:
|
||||||
|
|
@ -459,6 +494,7 @@ var (
|
||||||
|
|
||||||
type kmsState struct {
|
type kmsState struct {
|
||||||
v1Used, v2Used bool
|
v1Used, v2Used bool
|
||||||
|
kmsTimeoutSum time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, *kmsState, error) {
|
func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-chan struct{}) (value.PrefixTransformer, healthChecker, *kmsState, error) {
|
||||||
|
|
@ -483,7 +519,11 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha
|
||||||
|
|
||||||
transformer := envelopePrefixTransformer(config, envelopeService, kmsTransformerPrefixV1)
|
transformer := envelopePrefixTransformer(config, envelopeService, kmsTransformerPrefixV1)
|
||||||
|
|
||||||
return transformer, probe, &kmsState{v1Used: true}, nil
|
return transformer, probe, &kmsState{
|
||||||
|
v1Used: true,
|
||||||
|
// for v1 we will do encrypt and decrypt for health check. Since these are serial operations, we will double the timeout.
|
||||||
|
kmsTimeoutSum: 2 * config.Timeout.Duration,
|
||||||
|
}, nil
|
||||||
|
|
||||||
case kmsAPIVersionV2:
|
case kmsAPIVersionV2:
|
||||||
if !utilfeature.DefaultFeatureGate.Enabled(features.KMSv2) {
|
if !utilfeature.DefaultFeatureGate.Enabled(features.KMSv2) {
|
||||||
|
|
@ -509,7 +549,10 @@ func kmsPrefixTransformer(config *apiserverconfig.KMSConfiguration, stopCh <-cha
|
||||||
Prefix: []byte(kmsTransformerPrefixV2 + kmsName + ":"),
|
Prefix: []byte(kmsTransformerPrefixV2 + kmsName + ":"),
|
||||||
}
|
}
|
||||||
|
|
||||||
return transformer, probe, &kmsState{v2Used: true}, nil
|
return transformer, probe, &kmsState{
|
||||||
|
v2Used: true,
|
||||||
|
kmsTimeoutSum: config.Timeout.Duration,
|
||||||
|
}, nil
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", kmsName, config.APIVersion)
|
return value.PrefixTransformer{}, nil, nil, fmt.Errorf("could not configure KMS plugin %q, unsupported KMS API version %q", kmsName, config.APIVersion)
|
||||||
|
|
@ -555,3 +598,133 @@ func (u unionTransformers) TransformFromStorage(ctx context.Context, data []byte
|
||||||
func (u unionTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, err error) {
|
func (u unionTransformers) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) (out []byte, err error) {
|
||||||
return u[0].TransformToStorage(ctx, data, dataCtx)
|
return u[0].TransformToStorage(ctx, data, dataCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// computeEncryptionConfigHash returns the expected hash for an encryption config file that has been loaded as bytes.
|
||||||
|
// We use a hash instead of the raw file contents when tracking changes to avoid holding any encryption keys in memory outside of their associated transformers.
|
||||||
|
// This hash must be used in-memory and not externalized to the process because it has no cross-release stability guarantees.
|
||||||
|
func computeEncryptionConfigHash(data []byte) string {
|
||||||
|
return fmt.Sprintf("%x", sha256.Sum256(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ healthz.HealthChecker = &DynamicTransformers{}
|
||||||
|
|
||||||
|
// DynamicTransformers holds transformers that may be dynamically updated via a single external actor, likely a controller.
|
||||||
|
// This struct must avoid locks (even read write locks) as it is inline to all calls to storage.
|
||||||
|
type DynamicTransformers struct {
|
||||||
|
transformTracker *atomic.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
type transformTracker struct {
|
||||||
|
transformerOverrides map[schema.GroupResource]value.Transformer
|
||||||
|
kmsPluginHealthzCheck healthz.HealthChecker
|
||||||
|
closeTransformers context.CancelFunc
|
||||||
|
kmsCloseGracePeriod time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDynamicTransformers returns transformers, health checks for kms providers and an ability to close transformers.
|
||||||
|
func NewDynamicTransformers(
|
||||||
|
transformerOverrides map[schema.GroupResource]value.Transformer,
|
||||||
|
kmsPluginHealthzCheck healthz.HealthChecker,
|
||||||
|
closeTransformers context.CancelFunc,
|
||||||
|
kmsCloseGracePeriod time.Duration,
|
||||||
|
) *DynamicTransformers {
|
||||||
|
dynamicTransformers := &DynamicTransformers{
|
||||||
|
transformTracker: &atomic.Value{},
|
||||||
|
}
|
||||||
|
|
||||||
|
tracker := &transformTracker{
|
||||||
|
transformerOverrides: transformerOverrides,
|
||||||
|
kmsPluginHealthzCheck: kmsPluginHealthzCheck,
|
||||||
|
closeTransformers: closeTransformers,
|
||||||
|
kmsCloseGracePeriod: kmsCloseGracePeriod,
|
||||||
|
}
|
||||||
|
dynamicTransformers.transformTracker.Store(tracker)
|
||||||
|
|
||||||
|
return dynamicTransformers
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check implements healthz.HealthChecker
|
||||||
|
func (d *DynamicTransformers) Check(req *http.Request) error {
|
||||||
|
return d.transformTracker.Load().(*transformTracker).kmsPluginHealthzCheck.Check(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Name implements healthz.HealthChecker
|
||||||
|
func (d *DynamicTransformers) Name() string {
|
||||||
|
return kmsReloadHealthCheckName
|
||||||
|
}
|
||||||
|
|
||||||
|
// TransformerForResource returns the transformer for the given resource.
|
||||||
|
func (d *DynamicTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer {
|
||||||
|
return &resourceTransformer{
|
||||||
|
resource: resource,
|
||||||
|
transformTracker: d.transformTracker,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set sets the transformer overrides. This method is not go routine safe and must only be called by the same, single caller throughout the lifetime of this object.
|
||||||
|
func (d *DynamicTransformers) Set(
|
||||||
|
transformerOverrides map[schema.GroupResource]value.Transformer,
|
||||||
|
closeTransformers context.CancelFunc,
|
||||||
|
kmsPluginHealthzCheck healthz.HealthChecker,
|
||||||
|
kmsCloseGracePeriod time.Duration,
|
||||||
|
) {
|
||||||
|
// store new values
|
||||||
|
newTransformTracker := &transformTracker{
|
||||||
|
transformerOverrides: transformerOverrides,
|
||||||
|
closeTransformers: closeTransformers,
|
||||||
|
kmsPluginHealthzCheck: kmsPluginHealthzCheck,
|
||||||
|
kmsCloseGracePeriod: kmsCloseGracePeriod,
|
||||||
|
}
|
||||||
|
|
||||||
|
// update new transformer overrides
|
||||||
|
oldTransformTracker := d.transformTracker.Swap(newTransformTracker).(*transformTracker)
|
||||||
|
|
||||||
|
// close old transformers once we wait for grpc request to finish any in-flight requests.
|
||||||
|
// by the time we spawn this go routine, the new transformers have already been set and will be used for new requests.
|
||||||
|
// if the server starts shutting down during sleep duration then the transformers will correctly closed early because their lifetime is tied to the api-server drain notifier.
|
||||||
|
go func() {
|
||||||
|
time.Sleep(oldTransformTracker.kmsCloseGracePeriod)
|
||||||
|
oldTransformTracker.closeTransformers()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ value.Transformer = &resourceTransformer{}
|
||||||
|
|
||||||
|
type resourceTransformer struct {
|
||||||
|
resource schema.GroupResource
|
||||||
|
transformTracker *atomic.Value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
|
||||||
|
return r.transformer().TransformFromStorage(ctx, data, dataCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||||
|
return r.transformer().TransformToStorage(ctx, data, dataCtx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *resourceTransformer) transformer() value.Transformer {
|
||||||
|
transformer := r.transformTracker.Load().(*transformTracker).transformerOverrides[r.resource]
|
||||||
|
if transformer == nil {
|
||||||
|
return identity.NewEncryptCheckTransformer()
|
||||||
|
}
|
||||||
|
return transformer
|
||||||
|
}
|
||||||
|
|
||||||
|
type ResourceTransformers interface {
|
||||||
|
TransformerForResource(resource schema.GroupResource) value.Transformer
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ ResourceTransformers = &DynamicTransformers{}
|
||||||
|
var _ ResourceTransformers = &StaticTransformers{}
|
||||||
|
|
||||||
|
type StaticTransformers map[schema.GroupResource]value.Transformer
|
||||||
|
|
||||||
|
// StaticTransformers
|
||||||
|
func (s StaticTransformers) TransformerForResource(resource schema.GroupResource) value.Transformer {
|
||||||
|
transformer := s[resource]
|
||||||
|
if transformer == nil {
|
||||||
|
return identity.NewEncryptCheckTransformer()
|
||||||
|
}
|
||||||
|
return transformer
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -114,7 +114,7 @@ func newMockErrorEnvelopeKMSv2Service(endpoint string, timeout time.Duration) (e
|
||||||
|
|
||||||
func TestLegacyConfig(t *testing.T) {
|
func TestLegacyConfig(t *testing.T) {
|
||||||
legacyV1Config := "testdata/valid-configs/legacy.yaml"
|
legacyV1Config := "testdata/valid-configs/legacy.yaml"
|
||||||
legacyConfigObject, err := loadConfig(legacyV1Config, false)
|
legacyConfigObject, _, err := loadConfig(legacyV1Config, false)
|
||||||
cacheSize := int32(10)
|
cacheSize := int32(10)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, legacyV1Config)
|
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, legacyV1Config)
|
||||||
|
|
@ -177,48 +177,48 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
||||||
// Transforms data using one of them, and tries to untransform using the others.
|
// Transforms data using one of them, and tries to untransform using the others.
|
||||||
// Repeats this for all possible combinations.
|
// Repeats this for all possible combinations.
|
||||||
correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml"
|
correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml"
|
||||||
identityFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithIdentityFirst, false, ctx.Done())
|
identityFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithIdentityFirst, false, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst)
|
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst)
|
||||||
}
|
}
|
||||||
|
|
||||||
correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml"
|
correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml"
|
||||||
aesGcmFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithAesGcmFirst, false, ctx.Done())
|
aesGcmFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithAesGcmFirst, false, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst)
|
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst)
|
||||||
}
|
}
|
||||||
|
|
||||||
correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml"
|
correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml"
|
||||||
aesCbcFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithAesCbcFirst, false, ctx.Done())
|
aesCbcFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithAesCbcFirst, false, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst)
|
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst)
|
||||||
}
|
}
|
||||||
|
|
||||||
correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml"
|
correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml"
|
||||||
secretboxFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithSecretboxFirst, false, ctx.Done())
|
secretboxFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithSecretboxFirst, false, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst)
|
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst)
|
||||||
}
|
}
|
||||||
|
|
||||||
correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml"
|
correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml"
|
||||||
kmsFirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithKMSFirst, false, ctx.Done())
|
kmsFirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithKMSFirst, false, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst)
|
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst)
|
||||||
}
|
}
|
||||||
|
|
||||||
correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml"
|
correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml"
|
||||||
kmsv2FirstTransformerOverrides, _, err := LoadEncryptionConfig(correctConfigWithKMSv2First, false, ctx.Done())
|
kmsv2FirstEncryptionConfiguration, err := LoadEncryptionConfig(correctConfigWithKMSv2First, false, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First)
|
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pick the transformer for any of the returned resources.
|
// Pick the transformer for any of the returned resources.
|
||||||
identityFirstTransformer := identityFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
identityFirstTransformer := identityFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||||
aesGcmFirstTransformer := aesGcmFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
aesGcmFirstTransformer := aesGcmFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||||
aesCbcFirstTransformer := aesCbcFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
aesCbcFirstTransformer := aesCbcFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||||
secretboxFirstTransformer := secretboxFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
secretboxFirstTransformer := secretboxFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||||
kmsFirstTransformer := kmsFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
kmsFirstTransformer := kmsFirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||||
kmsv2FirstTransformer := kmsv2FirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
kmsv2FirstTransformer := kmsv2FirstEncryptionConfiguration.Transformers[schema.ParseGroupResource("secrets")]
|
||||||
|
|
||||||
dataCtx := value.DefaultContext([]byte(sampleContextText))
|
dataCtx := value.DefaultContext([]byte(sampleContextText))
|
||||||
originalText := []byte(sampleText)
|
originalText := []byte(sampleText)
|
||||||
|
|
@ -256,6 +256,222 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestKMSMaxTimeout(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
expectedTimeout time.Duration
|
||||||
|
config apiserverconfig.EncryptionConfiguration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "default timeout",
|
||||||
|
config: apiserverconfig.EncryptionConfiguration{
|
||||||
|
Resources: []apiserverconfig.ResourceConfiguration{
|
||||||
|
{
|
||||||
|
Resources: []string{"secrets"},
|
||||||
|
Providers: []apiserverconfig.ProviderConfiguration{
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "kms",
|
||||||
|
APIVersion: "v1",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
// default timeout is 3s
|
||||||
|
// this will be set automatically if not provided in config file
|
||||||
|
Duration: 3 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/testprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedTimeout: 6 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "with v1 provider",
|
||||||
|
config: apiserverconfig.EncryptionConfiguration{
|
||||||
|
Resources: []apiserverconfig.ResourceConfiguration{
|
||||||
|
{
|
||||||
|
Resources: []string{"secrets"},
|
||||||
|
Providers: []apiserverconfig.ProviderConfiguration{
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "kms",
|
||||||
|
APIVersion: "v1",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
// default timeout is 3s
|
||||||
|
// this will be set automatically if not provided in config file
|
||||||
|
Duration: 3 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/testprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Resources: []string{"configmaps"},
|
||||||
|
Providers: []apiserverconfig.ProviderConfiguration{
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "kms",
|
||||||
|
APIVersion: "v1",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
// default timeout is 3s
|
||||||
|
// this will be set automatically if not provided in config file
|
||||||
|
Duration: 3 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/testprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedTimeout: 12 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "with v2 provider",
|
||||||
|
config: apiserverconfig.EncryptionConfiguration{
|
||||||
|
Resources: []apiserverconfig.ResourceConfiguration{
|
||||||
|
{
|
||||||
|
Resources: []string{"secrets"},
|
||||||
|
Providers: []apiserverconfig.ProviderConfiguration{
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "kms",
|
||||||
|
APIVersion: "v2",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 15 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/testprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "new-kms",
|
||||||
|
APIVersion: "v2",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 5 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Resources: []string{"configmaps"},
|
||||||
|
Providers: []apiserverconfig.ProviderConfiguration{
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "another-kms",
|
||||||
|
APIVersion: "v2",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 10 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/testprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "yet-another-kms",
|
||||||
|
APIVersion: "v2",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 2 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedTimeout: 32 * time.Second,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "with v1 and v2 provider",
|
||||||
|
config: apiserverconfig.EncryptionConfiguration{
|
||||||
|
Resources: []apiserverconfig.ResourceConfiguration{
|
||||||
|
{
|
||||||
|
Resources: []string{"secrets"},
|
||||||
|
Providers: []apiserverconfig.ProviderConfiguration{
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "kms",
|
||||||
|
APIVersion: "v1",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 1 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/testprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "another-kms",
|
||||||
|
APIVersion: "v2",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 1 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Resources: []string{"configmaps"},
|
||||||
|
Providers: []apiserverconfig.ProviderConfiguration{
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "kms",
|
||||||
|
APIVersion: "v1",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 4 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/testprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
KMS: &apiserverconfig.KMSConfiguration{
|
||||||
|
Name: "yet-another-kms",
|
||||||
|
APIVersion: "v1",
|
||||||
|
Timeout: &metav1.Duration{
|
||||||
|
Duration: 2 * time.Second,
|
||||||
|
},
|
||||||
|
Endpoint: "unix:///tmp/anothertestprovider.sock",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedTimeout: 15 * time.Second,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
cacheSize := int32(1000)
|
||||||
|
for _, resource := range testCase.config.Resources {
|
||||||
|
for _, provider := range resource.Providers {
|
||||||
|
if provider.KMS != nil {
|
||||||
|
provider.KMS.CacheSize = &cacheSize
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, kmsUsed, _ := getTransformerOverridesAndKMSPluginHealthzCheckers(&testCase.config, testContext(t).Done())
|
||||||
|
if kmsUsed == nil {
|
||||||
|
t.Fatal("kmsUsed should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if kmsUsed.kmsTimeoutSum != testCase.expectedTimeout {
|
||||||
|
t.Fatalf("expected timeout %v, got %v", testCase.expectedTimeout, kmsUsed.kmsTimeoutSum)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestKMSPluginHealthz(t *testing.T) {
|
func TestKMSPluginHealthz(t *testing.T) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||||
|
|
||||||
|
|
@ -323,7 +539,7 @@ func TestKMSPluginHealthz(t *testing.T) {
|
||||||
|
|
||||||
for _, tt := range testCases {
|
for _, tt := range testCases {
|
||||||
t.Run(tt.desc, func(t *testing.T) {
|
t.Run(tt.desc, func(t *testing.T) {
|
||||||
config, err := loadConfig(tt.config, false)
|
config, _, err := loadConfig(tt.config, false)
|
||||||
if errStr := errString(err); errStr != tt.wantErr {
|
if errStr := errString(err); errStr != tt.wantErr {
|
||||||
t.Fatalf("unexpected error state got=%s want=%s", errStr, tt.wantErr)
|
t.Fatalf("unexpected error state got=%s want=%s", errStr, tt.wantErr)
|
||||||
}
|
}
|
||||||
|
|
@ -541,14 +757,14 @@ func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath strin
|
||||||
ctx := testContext(t)
|
ctx := testContext(t)
|
||||||
|
|
||||||
t.Helper()
|
t.Helper()
|
||||||
transformers, _, err := LoadEncryptionConfig(encryptionConfigPath, false, ctx.Done())
|
encryptionConfiguration, err := LoadEncryptionConfig(encryptionConfigPath, false, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(transformers) != 1 {
|
if len(encryptionConfiguration.Transformers) != 1 {
|
||||||
t.Fatalf("input config does not have exactly one resource: %s", encryptionConfigPath)
|
t.Fatalf("input config does not have exactly one resource: %s", encryptionConfigPath)
|
||||||
}
|
}
|
||||||
for _, transformer := range transformers {
|
for _, transformer := range encryptionConfiguration.Transformers {
|
||||||
return transformer
|
return transformer
|
||||||
}
|
}
|
||||||
panic("unreachable")
|
panic("unreachable")
|
||||||
|
|
@ -602,3 +818,12 @@ func errString(err error) string {
|
||||||
|
|
||||||
return err.Error()
|
return err.Error()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestComputeEncryptionConfigHash(t *testing.T) {
|
||||||
|
// hash the empty string to be sure that sha256 is being used
|
||||||
|
expect := "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
|
||||||
|
sum := computeEncryptionConfigHash([]byte(""))
|
||||||
|
if expect != sum {
|
||||||
|
t.Errorf("expected hash %q but got %q", expect, sum)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,265 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/fsnotify/fsnotify"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apiserver/pkg/server/healthz"
|
||||||
|
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||||
|
"k8s.io/client-go/util/workqueue"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
|
)
|
||||||
|
|
||||||
|
// workqueueKey is the dummy key used to process change in encryption config file.
|
||||||
|
const workqueueKey = "key"
|
||||||
|
|
||||||
|
// DynamicKMSEncryptionConfigContent which can dynamically handle changes in encryption config file.
|
||||||
|
type DynamicKMSEncryptionConfigContent struct {
|
||||||
|
name string
|
||||||
|
|
||||||
|
// filePath is the path of the file to read.
|
||||||
|
filePath string
|
||||||
|
|
||||||
|
// lastLoadedEncryptionConfigHash stores last successfully read encryption config file content.
|
||||||
|
lastLoadedEncryptionConfigHash string
|
||||||
|
|
||||||
|
// queue for processing changes in encryption config file.
|
||||||
|
queue workqueue.RateLimitingInterface
|
||||||
|
|
||||||
|
// dynamicTransformers updates the transformers when encryption config file changes.
|
||||||
|
dynamicTransformers *encryptionconfig.DynamicTransformers
|
||||||
|
|
||||||
|
// stopCh used here is a lifecycle signal of genericapiserver already drained while shutting down.
|
||||||
|
stopCh <-chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewDynamicKMSEncryptionConfiguration returns controller that dynamically reacts to changes in encryption config file.
|
||||||
|
func NewDynamicKMSEncryptionConfiguration(
|
||||||
|
name, filePath string,
|
||||||
|
dynamicTransformers *encryptionconfig.DynamicTransformers,
|
||||||
|
configContentHash string,
|
||||||
|
stopCh <-chan struct{},
|
||||||
|
) *DynamicKMSEncryptionConfigContent {
|
||||||
|
encryptionConfig := &DynamicKMSEncryptionConfigContent{
|
||||||
|
name: name,
|
||||||
|
filePath: filePath,
|
||||||
|
lastLoadedEncryptionConfigHash: configContentHash,
|
||||||
|
dynamicTransformers: dynamicTransformers,
|
||||||
|
stopCh: stopCh,
|
||||||
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("%s-hot-reload", name)),
|
||||||
|
}
|
||||||
|
encryptionConfig.queue.Add(workqueueKey)
|
||||||
|
|
||||||
|
return encryptionConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run starts the controller and blocks until stopCh is closed.
|
||||||
|
func (d *DynamicKMSEncryptionConfigContent) Run(ctx context.Context) {
|
||||||
|
defer utilruntime.HandleCrash()
|
||||||
|
defer d.queue.ShutDown()
|
||||||
|
|
||||||
|
klog.InfoS("Starting controller", "name", d.name)
|
||||||
|
defer klog.InfoS("Shutting down controller", "name", d.name)
|
||||||
|
|
||||||
|
// start worker for processing content
|
||||||
|
go wait.Until(d.runWorker, time.Second, ctx.Done())
|
||||||
|
|
||||||
|
// start the loop that watches the encryption config file until stopCh is closed.
|
||||||
|
go wait.Until(func() {
|
||||||
|
if err := d.watchEncryptionConfigFile(ctx.Done()); err != nil {
|
||||||
|
// if there is an error while setting up or handling the watches, this will ensure that we will process the config file.
|
||||||
|
defer d.queue.Add(workqueueKey)
|
||||||
|
klog.ErrorS(err, "Failed to watch encryption config file, will retry later")
|
||||||
|
}
|
||||||
|
}, time.Second, ctx.Done())
|
||||||
|
|
||||||
|
<-ctx.Done()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DynamicKMSEncryptionConfigContent) watchEncryptionConfigFile(stopCh <-chan struct{}) error {
|
||||||
|
watcher, err := fsnotify.NewWatcher()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error creating fsnotify watcher: %w", err)
|
||||||
|
}
|
||||||
|
defer watcher.Close()
|
||||||
|
|
||||||
|
if err = watcher.Add(d.filePath); err != nil {
|
||||||
|
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case event := <-watcher.Events:
|
||||||
|
if err := d.handleWatchEvent(event, watcher); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
case err := <-watcher.Errors:
|
||||||
|
return fmt.Errorf("received fsnotify error: %w", err)
|
||||||
|
case <-stopCh:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DynamicKMSEncryptionConfigContent) handleWatchEvent(event fsnotify.Event, watcher *fsnotify.Watcher) error {
|
||||||
|
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
|
||||||
|
defer d.queue.Add(workqueueKey)
|
||||||
|
|
||||||
|
// return if file has not been removed or renamed.
|
||||||
|
if event.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := watcher.Remove(d.filePath); err != nil {
|
||||||
|
klog.V(2).InfoS("Failed to remove file watch, it may have been deleted", "file", d.filePath, "err", err)
|
||||||
|
}
|
||||||
|
if err := watcher.Add(d.filePath); err != nil {
|
||||||
|
return fmt.Errorf("error adding watch for file %s: %w", d.filePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// runWorker to process file content
|
||||||
|
func (d *DynamicKMSEncryptionConfigContent) runWorker() {
|
||||||
|
for d.processNextWorkItem() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// processNextWorkItem processes file content when there is a message in the queue.
|
||||||
|
func (d *DynamicKMSEncryptionConfigContent) processNextWorkItem() bool {
|
||||||
|
// key here is dummy item in the queue to trigger file content processing.
|
||||||
|
key, quit := d.queue.Get()
|
||||||
|
if quit {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
defer d.queue.Done(key)
|
||||||
|
|
||||||
|
var (
|
||||||
|
updatedEffectiveConfig bool
|
||||||
|
err error
|
||||||
|
encryptionConfiguration *encryptionconfig.EncryptionConfiguration
|
||||||
|
configChanged bool
|
||||||
|
)
|
||||||
|
|
||||||
|
// get context to close the new transformers.
|
||||||
|
ctx, closeTransformers := wait.ContextForChannel(d.stopCh)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
// TODO: increment success metric when updatedEffectiveConfig=true
|
||||||
|
|
||||||
|
if !updatedEffectiveConfig {
|
||||||
|
// avoid leaking if we're not using the newly constructed transformers (due to an error or them not being changed)
|
||||||
|
closeTransformers()
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// TODO: increment failure metric
|
||||||
|
utilruntime.HandleError(fmt.Errorf("error processing encryption config file %s: %v", d.filePath, err))
|
||||||
|
// add dummy item back to the queue to trigger file content processing.
|
||||||
|
d.queue.AddRateLimited(key)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
encryptionConfiguration, configChanged, err = d.processEncryptionConfig(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if !configChanged {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(encryptionConfiguration.HealthChecks) != 1 {
|
||||||
|
err = fmt.Errorf("unexpected number of healthz checks: %d. Should have only one", len(encryptionConfiguration.HealthChecks))
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// get healthz checks for all new KMS plugins.
|
||||||
|
if err = d.validateNewTransformersHealth(ctx, encryptionConfiguration.HealthChecks[0], encryptionConfiguration.KMSCloseGracePeriod); err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// update transformers.
|
||||||
|
// when reload=true there must always be one healthz check.
|
||||||
|
d.dynamicTransformers.Set(
|
||||||
|
encryptionConfiguration.Transformers,
|
||||||
|
closeTransformers,
|
||||||
|
encryptionConfiguration.HealthChecks[0],
|
||||||
|
encryptionConfiguration.KMSCloseGracePeriod,
|
||||||
|
)
|
||||||
|
|
||||||
|
// update local copy of recent config content once update is successful.
|
||||||
|
d.lastLoadedEncryptionConfigHash = encryptionConfiguration.EncryptionFileContentHash
|
||||||
|
klog.V(2).InfoS("Loaded new kms encryption config content", "name", d.name)
|
||||||
|
|
||||||
|
updatedEffectiveConfig = true
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadEncryptionConfig processes the next set of content from the file.
|
||||||
|
func (d *DynamicKMSEncryptionConfigContent) processEncryptionConfig(ctx context.Context) (
|
||||||
|
encryptionConfiguration *encryptionconfig.EncryptionConfiguration,
|
||||||
|
configChanged bool,
|
||||||
|
err error,
|
||||||
|
) {
|
||||||
|
// this code path will only execute if reload=true. So passing true explicitly.
|
||||||
|
encryptionConfiguration, err = encryptionconfig.LoadEncryptionConfig(d.filePath, true, ctx.Done())
|
||||||
|
if err != nil {
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if encryptionConfig is different from the current. Do nothing if they are the same.
|
||||||
|
if encryptionConfiguration.EncryptionFileContentHash == d.lastLoadedEncryptionConfigHash {
|
||||||
|
klog.V(4).InfoS("Encryption config has not changed", "name", d.name)
|
||||||
|
return nil, false, nil
|
||||||
|
}
|
||||||
|
return encryptionConfiguration, true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DynamicKMSEncryptionConfigContent) validateNewTransformersHealth(
|
||||||
|
ctx context.Context,
|
||||||
|
kmsPluginHealthzCheck healthz.HealthChecker,
|
||||||
|
kmsPluginCloseGracePeriod time.Duration,
|
||||||
|
) error {
|
||||||
|
// test if new transformers are healthy
|
||||||
|
var healthCheckError error
|
||||||
|
|
||||||
|
if kmsPluginCloseGracePeriod < 10*time.Second {
|
||||||
|
kmsPluginCloseGracePeriod = 10 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
pollErr := wait.PollImmediate(100*time.Millisecond, kmsPluginCloseGracePeriod, func() (bool, error) {
|
||||||
|
// create a fake http get request to health check endpoint
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("/healthz/%s", kmsPluginHealthzCheck.Name()), nil)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
healthCheckError = kmsPluginHealthzCheck.Check(req)
|
||||||
|
return healthCheckError == nil, nil
|
||||||
|
})
|
||||||
|
if pollErr != nil {
|
||||||
|
return fmt.Errorf("health check for new transformers failed, polling error %v: %w", pollErr, healthCheckError)
|
||||||
|
}
|
||||||
|
klog.V(2).InfoS("Health check succeeded")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,172 @@
|
||||||
|
/*
|
||||||
|
Copyright 2022 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package controller
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestProcessEncryptionConfig(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
filePath string
|
||||||
|
expectError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty config file",
|
||||||
|
filePath: "testdata/empty_config.yaml",
|
||||||
|
expectError: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
d := NewDynamicKMSEncryptionConfiguration(
|
||||||
|
testCase.name,
|
||||||
|
testCase.filePath,
|
||||||
|
nil,
|
||||||
|
"",
|
||||||
|
ctx.Done(),
|
||||||
|
)
|
||||||
|
|
||||||
|
_, _, err := d.processEncryptionConfig(ctx)
|
||||||
|
if testCase.expectError && err == nil {
|
||||||
|
t.Fatalf("expected error but got none")
|
||||||
|
}
|
||||||
|
if !testCase.expectError && err != nil {
|
||||||
|
t.Fatalf("expected no error but got %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatchEncryptionConfigFile(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
generateEvent func(filePath string, cancel context.CancelFunc)
|
||||||
|
expectError bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "file not renamed or removed",
|
||||||
|
expectError: false,
|
||||||
|
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||||
|
os.Chtimes(filePath, time.Now(), time.Now())
|
||||||
|
|
||||||
|
// wait for the event to be handled
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
cancel()
|
||||||
|
os.Remove(filePath)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "file renamed",
|
||||||
|
expectError: true,
|
||||||
|
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||||
|
os.Rename(filePath, filePath+"1")
|
||||||
|
|
||||||
|
// wait for the event to be handled
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
os.Remove(filePath + "1")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "file removed",
|
||||||
|
expectError: true,
|
||||||
|
generateEvent: func(filePath string, cancel context.CancelFunc) {
|
||||||
|
// allow watcher handle to start
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
os.Remove(filePath)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, testCase := range testCases {
|
||||||
|
t.Run(testCase.name, func(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
testFilePath := copyFileForTest(t, "testdata/ec_config.yaml")
|
||||||
|
|
||||||
|
d := NewDynamicKMSEncryptionConfiguration(
|
||||||
|
testCase.name,
|
||||||
|
testFilePath,
|
||||||
|
nil,
|
||||||
|
"",
|
||||||
|
ctx.Done(),
|
||||||
|
)
|
||||||
|
|
||||||
|
errs := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
err := d.watchEncryptionConfigFile(d.stopCh)
|
||||||
|
errs <- err
|
||||||
|
}()
|
||||||
|
|
||||||
|
testCase.generateEvent(d.filePath, cancel)
|
||||||
|
|
||||||
|
err := <-errs
|
||||||
|
if testCase.expectError && err == nil {
|
||||||
|
t.Fatalf("expected error but got none")
|
||||||
|
}
|
||||||
|
if !testCase.expectError && err != nil {
|
||||||
|
t.Fatalf("expected no error but got %v", err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyFileForTest(t *testing.T, srcFilePath string) string {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
// get directory from source file path
|
||||||
|
srcDir := filepath.Dir(srcFilePath)
|
||||||
|
|
||||||
|
// get file name from source file path
|
||||||
|
srcFileName := filepath.Base(srcFilePath)
|
||||||
|
|
||||||
|
// set new file path
|
||||||
|
dstFilePath := filepath.Join(srcDir, "test_"+srcFileName)
|
||||||
|
|
||||||
|
// copy src file to dst file
|
||||||
|
r, err := os.Open(srcFilePath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to open source file: %v", err)
|
||||||
|
}
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
w, err := os.Create(dstFilePath)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create destination file: %v", err)
|
||||||
|
}
|
||||||
|
defer w.Close()
|
||||||
|
|
||||||
|
// copy the file
|
||||||
|
_, err = io.Copy(w, r)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to copy file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = w.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to close destination file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return dstFilePath
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,9 @@
|
||||||
|
kind: EncryptionConfiguration
|
||||||
|
apiVersion: apiserver.config.k8s.io/v1
|
||||||
|
resources:
|
||||||
|
- resources:
|
||||||
|
- secrets
|
||||||
|
providers:
|
||||||
|
- kms:
|
||||||
|
name: foo
|
||||||
|
endpoint: unix:///tmp/testprovider.sock
|
||||||
|
|
@ -27,15 +27,16 @@ import (
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/registry/generic"
|
"k8s.io/apiserver/pkg/registry/generic"
|
||||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||||
"k8s.io/apiserver/pkg/server"
|
"k8s.io/apiserver/pkg/server"
|
||||||
"k8s.io/apiserver/pkg/server/healthz"
|
"k8s.io/apiserver/pkg/server/healthz"
|
||||||
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
"k8s.io/apiserver/pkg/server/options/encryptionconfig"
|
||||||
|
kmsconfigcontroller "k8s.io/apiserver/pkg/server/options/encryptionconfig/controller"
|
||||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
|
||||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
)
|
)
|
||||||
|
|
@ -64,7 +65,7 @@ type EtcdOptions struct {
|
||||||
|
|
||||||
// complete guards fields that must be initialized via Complete before the Apply methods can be used.
|
// complete guards fields that must be initialized via Complete before the Apply methods can be used.
|
||||||
complete bool
|
complete bool
|
||||||
transformerOverrides map[schema.GroupResource]value.Transformer
|
resourceTransformers encryptionconfig.ResourceTransformers
|
||||||
kmsPluginHealthzChecks []healthz.HealthChecker
|
kmsPluginHealthzChecks []healthz.HealthChecker
|
||||||
|
|
||||||
// SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints.
|
// SkipHealthEndpoints, when true, causes the Apply methods to not set up health endpoints.
|
||||||
|
|
@ -125,7 +126,7 @@ func (s *EtcdOptions) Validate() []error {
|
||||||
return allErrors
|
return allErrors
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddEtcdFlags adds flags related to etcd storage for a specific APIServer to the specified FlagSet
|
// AddFlags adds flags related to etcd storage for a specific APIServer to the specified FlagSet
|
||||||
func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return
|
return
|
||||||
|
|
@ -213,7 +214,11 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting
|
// Complete must be called exactly once before using any of the Apply methods. It is responsible for setting
|
||||||
// up objects that must be created once and reused across multiple invocations such as storage transformers.
|
// up objects that must be created once and reused across multiple invocations such as storage transformers.
|
||||||
// This method mutates the receiver (EtcdOptions). It must never mutate the inputs.
|
// This method mutates the receiver (EtcdOptions). It must never mutate the inputs.
|
||||||
func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker, stopCh <-chan struct{}) error {
|
func (s *EtcdOptions) Complete(
|
||||||
|
storageObjectCountTracker flowcontrolrequest.StorageObjectCountTracker,
|
||||||
|
stopCh <-chan struct{},
|
||||||
|
addPostStartHook func(name string, hook server.PostStartHookFunc) error,
|
||||||
|
) error {
|
||||||
if s == nil {
|
if s == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -223,12 +228,56 @@ func (s *EtcdOptions) Complete(storageObjectCountTracker flowcontrolrequest.Stor
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(s.EncryptionProviderConfigFilepath) != 0 {
|
if len(s.EncryptionProviderConfigFilepath) != 0 {
|
||||||
transformerOverrides, kmsPluginHealthzChecks, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload, stopCh)
|
ctx, closeTransformers := wait.ContextForChannel(stopCh)
|
||||||
|
|
||||||
|
encryptionConfiguration, err := encryptionconfig.LoadEncryptionConfig(s.EncryptionProviderConfigFilepath, s.EncryptionProviderConfigAutomaticReload, ctx.Done())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// in case of error, we want to close partially initialized (if any) transformers
|
||||||
|
closeTransformers()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
s.transformerOverrides = transformerOverrides
|
|
||||||
s.kmsPluginHealthzChecks = kmsPluginHealthzChecks
|
// enable kms hot reload controller only if the config file is set to be automatically reloaded
|
||||||
|
if s.EncryptionProviderConfigAutomaticReload {
|
||||||
|
// with reload=true we will always have 1 health check
|
||||||
|
if len(encryptionConfiguration.HealthChecks) != 1 {
|
||||||
|
// in case of error, we want to close partially initialized (if any) transformers
|
||||||
|
closeTransformers()
|
||||||
|
return fmt.Errorf("failed to start kms encryption config hot reload controller. only 1 health check should be available when reload is enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
dynamicTransformers := encryptionconfig.NewDynamicTransformers(encryptionConfiguration.Transformers, encryptionConfiguration.HealthChecks[0], closeTransformers, encryptionConfiguration.KMSCloseGracePeriod)
|
||||||
|
|
||||||
|
s.resourceTransformers = dynamicTransformers
|
||||||
|
s.kmsPluginHealthzChecks = []healthz.HealthChecker{dynamicTransformers}
|
||||||
|
|
||||||
|
// add post start hook to start hot reload controller
|
||||||
|
// adding this hook here will ensure that it gets configured exactly once
|
||||||
|
err = addPostStartHook(
|
||||||
|
"start-encryption-provider-config-automatic-reload",
|
||||||
|
func(hookContext server.PostStartHookContext) error {
|
||||||
|
kmsConfigController := kmsconfigcontroller.NewDynamicKMSEncryptionConfiguration(
|
||||||
|
"kms-encryption-config",
|
||||||
|
s.EncryptionProviderConfigFilepath,
|
||||||
|
dynamicTransformers,
|
||||||
|
encryptionConfiguration.EncryptionFileContentHash,
|
||||||
|
ctx.Done(),
|
||||||
|
)
|
||||||
|
|
||||||
|
go kmsConfigController.Run(ctx)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
// in case of error, we want to close partially initialized (if any) transformers
|
||||||
|
closeTransformers()
|
||||||
|
return fmt.Errorf("failed to add post start hook for kms encryption config hot reload controller: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
s.resourceTransformers = encryptionconfig.StaticTransformers(encryptionConfiguration.Transformers)
|
||||||
|
s.kmsPluginHealthzChecks = encryptionConfiguration.HealthChecks
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s.StorageConfig.StorageObjectCountTracker = storageObjectCountTracker
|
s.StorageConfig.StorageObjectCountTracker = storageObjectCountTracker
|
||||||
|
|
@ -263,10 +312,10 @@ func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFac
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(s.transformerOverrides) > 0 {
|
if s.resourceTransformers != nil {
|
||||||
factory = &transformerStorageFactory{
|
factory = &transformerStorageFactory{
|
||||||
delegate: factory,
|
delegate: factory,
|
||||||
transformerOverrides: s.transformerOverrides,
|
resourceTransformers: s.resourceTransformers,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -400,7 +449,7 @@ var _ serverstorage.StorageFactory = &transformerStorageFactory{}
|
||||||
|
|
||||||
type transformerStorageFactory struct {
|
type transformerStorageFactory struct {
|
||||||
delegate serverstorage.StorageFactory
|
delegate serverstorage.StorageFactory
|
||||||
transformerOverrides map[schema.GroupResource]value.Transformer
|
resourceTransformers encryptionconfig.ResourceTransformers
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
|
func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*storagebackend.ConfigForResource, error) {
|
||||||
|
|
@ -409,14 +458,9 @@ func (t *transformerStorageFactory) NewConfig(resource schema.GroupResource) (*s
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
transformer, ok := t.transformerOverrides[resource]
|
|
||||||
if !ok {
|
|
||||||
return config, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
configCopy := *config
|
configCopy := *config
|
||||||
resourceConfig := configCopy.Config
|
resourceConfig := configCopy.Config
|
||||||
resourceConfig.Transformer = transformer
|
resourceConfig.Transformer = t.resourceTransformers.TransformerForResource(resource)
|
||||||
configCopy.Config = resourceConfig
|
configCopy.Config = resourceConfig
|
||||||
|
|
||||||
return &configCopy, nil
|
return &configCopy, nil
|
||||||
|
|
|
||||||
|
|
@ -306,7 +306,7 @@ func TestKMSHealthzEndpoint(t *testing.T) {
|
||||||
EncryptionProviderConfigAutomaticReload: tc.reload,
|
EncryptionProviderConfigAutomaticReload: tc.reload,
|
||||||
SkipHealthEndpoints: tc.skipHealth,
|
SkipHealthEndpoints: tc.skipHealth,
|
||||||
}
|
}
|
||||||
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
|
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
|
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
|
||||||
|
|
@ -345,7 +345,7 @@ func TestReadinessCheck(t *testing.T) {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
serverConfig := server.NewConfig(codecs)
|
serverConfig := server.NewConfig(codecs)
|
||||||
etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth}
|
etcdOptions := &EtcdOptions{SkipHealthEndpoints: tc.skipHealth}
|
||||||
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify()); err != nil {
|
if err := etcdOptions.Complete(serverConfig.StorageObjectCountTracker, serverConfig.DrainedNotify(), serverConfig.AddPostStartHook); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
|
if err := etcdOptions.ApplyTo(serverConfig); err != nil {
|
||||||
|
|
|
||||||
|
|
@ -101,7 +101,7 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
|
||||||
// ApplyTo adds RecommendedOptions to the server configuration.
|
// ApplyTo adds RecommendedOptions to the server configuration.
|
||||||
// pluginInitializers can be empty, it is only need for additional initializers.
|
// pluginInitializers can be empty, it is only need for additional initializers.
|
||||||
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
|
func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
|
||||||
if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify()); err != nil {
|
if err := o.Etcd.Complete(config.Config.StorageObjectCountTracker, config.Config.DrainedNotify(), config.Config.AddPostStartHook); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := o.Etcd.ApplyTo(&config.Config); err != nil {
|
if err := o.Etcd.ApplyTo(&config.Config); err != nil {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue