Merge pull request #111986 from enj/enj/i/transformer_leak
kms: fix go routine leak in gRPC connection Kubernetes-commit: cc4b7dc3c52d2a4c7f48b678d2d4817064ca2dda
This commit is contained in:
commit
8ae614979e
4
go.mod
4
go.mod
|
|
@ -38,7 +38,7 @@ require (
|
|||
google.golang.org/grpc v1.47.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
gopkg.in/square/go-jose.v2 v2.2.2
|
||||
k8s.io/api v0.0.0-20220902183307-6dd661f59220
|
||||
k8s.io/api v0.0.0-20220908151745-929c3f077990
|
||||
k8s.io/apimachinery v0.0.0-20220902183049-31bc292891b6
|
||||
k8s.io/client-go v0.0.0-20220907225019-2698e8276e72
|
||||
k8s.io/component-base v0.0.0-20220902184152-5e2346794287
|
||||
|
|
@ -118,7 +118,7 @@ require (
|
|||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.0.0-20220902183307-6dd661f59220
|
||||
k8s.io/api => k8s.io/api v0.0.0-20220908151745-929c3f077990
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20220902183049-31bc292891b6
|
||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20220907225019-2698e8276e72
|
||||
k8s.io/component-base => k8s.io/component-base v0.0.0-20220902184152-5e2346794287
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -843,8 +843,8 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
k8s.io/api v0.0.0-20220902183307-6dd661f59220 h1:b+NdHWOhLTKviv3QHZvUrs1+6n9W03D7u6Sjd71RkEA=
|
||||
k8s.io/api v0.0.0-20220902183307-6dd661f59220/go.mod h1:79dXRhEG/Q3WjCgZ7l8YtD1tVlH/shHuG6msPmRw/DQ=
|
||||
k8s.io/api v0.0.0-20220908151745-929c3f077990 h1:i9qPcRqRAx+FjGDG24rliKydPgBQR2PlvOWOm8XhT0E=
|
||||
k8s.io/api v0.0.0-20220908151745-929c3f077990/go.mod h1:79dXRhEG/Q3WjCgZ7l8YtD1tVlH/shHuG6msPmRw/DQ=
|
||||
k8s.io/apimachinery v0.0.0-20220902183049-31bc292891b6 h1:BH5hWQ9ueHKgA42ApyJ9RXSibEUHvd9TDr1RBZ6e7lE=
|
||||
k8s.io/apimachinery v0.0.0-20220902183049-31bc292891b6/go.mod h1:L+XGzqdcOht8VoBmzPAoKcYLqUOWkB5nxGF/6z4/ubo=
|
||||
k8s.io/client-go v0.0.0-20220907225019-2698e8276e72 h1:damnU0wJxjV0BTpBhCjIytb5EGbKHo0TbyreV8nf1zI=
|
||||
|
|
|
|||
|
|
@ -33,6 +33,8 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
apiserverconfig "k8s.io/apiserver/pkg/apis/config"
|
||||
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/config/v1"
|
||||
"k8s.io/apiserver/pkg/apis/config/validation"
|
||||
|
|
@ -93,7 +95,7 @@ func (p *kmsv2PluginProbe) toHealthzCheck(idx int) healthz.HealthChecker {
|
|||
}
|
||||
|
||||
// GetKMSPluginHealthzCheckers extracts KMSPluginProbes from the EncryptionConfig.
|
||||
func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, error) {
|
||||
func GetKMSPluginHealthzCheckers(filepath string, stopCh <-chan struct{}) ([]healthz.HealthChecker, error) {
|
||||
f, err := os.Open(filepath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err)
|
||||
|
|
@ -101,7 +103,7 @@ func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, erro
|
|||
defer f.Close()
|
||||
|
||||
var result []healthz.HealthChecker
|
||||
probes, err := getKMSPluginProbes(f)
|
||||
probes, err := getKMSPluginProbes(f, stopCh)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -120,7 +122,10 @@ func GetKMSPluginHealthzCheckers(filepath string) ([]healthz.HealthChecker, erro
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) {
|
||||
func getKMSPluginProbes(reader io.Reader, stopCh <-chan struct{}) ([]interface{}, error) {
|
||||
// we ignore the cancel func because this context should only be canceled when stopCh is closed
|
||||
ctx, _ := wait.ContextForChannel(stopCh)
|
||||
|
||||
var result []interface{}
|
||||
|
||||
configFileContents, err := io.ReadAll(reader)
|
||||
|
|
@ -138,7 +143,7 @@ func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) {
|
|||
if p.KMS != nil {
|
||||
switch p.KMS.APIVersion {
|
||||
case kmsAPIVersionV1:
|
||||
s, err := envelope.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration)
|
||||
s, err := envelope.NewGRPCService(ctx, p.KMS.Endpoint, p.KMS.Timeout.Duration)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not configure KMSv1-Plugin's probe %q, error: %v", p.KMS.Name, err)
|
||||
}
|
||||
|
|
@ -156,7 +161,7 @@ func getKMSPluginProbes(reader io.Reader) ([]interface{}, error) {
|
|||
return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, KMSv2 feature is not enabled", p.KMS.Name)
|
||||
}
|
||||
|
||||
s, err := envelopekmsv2.NewGRPCService(p.KMS.Endpoint, p.KMS.Timeout.Duration)
|
||||
s, err := envelopekmsv2.NewGRPCService(ctx, p.KMS.Endpoint, p.KMS.Timeout.Duration)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not configure KMSv2-Plugin's probe %q, error: %v", p.KMS.Name, err)
|
||||
}
|
||||
|
|
@ -254,21 +259,21 @@ func isKMSv2ProviderHealthy(name string, response *envelopekmsv2.StatusResponse)
|
|||
}
|
||||
|
||||
// GetTransformerOverrides returns the transformer overrides by reading and parsing the encryption provider configuration file
|
||||
func GetTransformerOverrides(filepath string) (map[schema.GroupResource]value.Transformer, error) {
|
||||
func GetTransformerOverrides(filepath string, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) {
|
||||
f, err := os.Open(filepath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error opening encryption provider configuration file %q: %v", filepath, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
result, err := parseEncryptionConfiguration(f)
|
||||
result, err := parseEncryptionConfiguration(f, stopCh)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error while parsing encryption provider configuration file %q: %v", filepath, err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.Transformer, error) {
|
||||
func parseEncryptionConfiguration(f io.Reader, stopCh <-chan struct{}) (map[schema.GroupResource]value.Transformer, error) {
|
||||
configFileContents, err := io.ReadAll(f)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not read contents: %v", err)
|
||||
|
|
@ -283,7 +288,7 @@ func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.T
|
|||
|
||||
// For each entry in the configuration
|
||||
for _, resourceConfig := range config.Resources {
|
||||
transformers, err := prefixTransformers(&resourceConfig)
|
||||
transformers, err := prefixTransformers(&resourceConfig, stopCh)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -307,8 +312,8 @@ func parseEncryptionConfiguration(f io.Reader) (map[schema.GroupResource]value.T
|
|||
func loadConfig(data []byte) (*apiserverconfig.EncryptionConfiguration, error) {
|
||||
scheme := runtime.NewScheme()
|
||||
codecs := serializer.NewCodecFactory(scheme)
|
||||
apiserverconfig.AddToScheme(scheme)
|
||||
apiserverconfigv1.AddToScheme(scheme)
|
||||
utilruntime.Must(apiserverconfig.AddToScheme(scheme))
|
||||
utilruntime.Must(apiserverconfigv1.AddToScheme(scheme))
|
||||
|
||||
configObj, gvk, err := codecs.UniversalDecoder().Decode(data, nil, nil)
|
||||
if err != nil {
|
||||
|
|
@ -330,7 +335,10 @@ var (
|
|||
envelopeKMSv2ServiceFactory = envelopekmsv2.NewGRPCService
|
||||
)
|
||||
|
||||
func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.PrefixTransformer, error) {
|
||||
func prefixTransformers(config *apiserverconfig.ResourceConfiguration, stopCh <-chan struct{}) ([]value.PrefixTransformer, error) {
|
||||
// we ignore the cancel func because this context should only be canceled when stopCh is closed
|
||||
ctx, _ := wait.ContextForChannel(stopCh)
|
||||
|
||||
var result []value.PrefixTransformer
|
||||
for _, provider := range config.Providers {
|
||||
var (
|
||||
|
|
@ -349,7 +357,7 @@ func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.
|
|||
switch provider.KMS.APIVersion {
|
||||
case kmsAPIVersionV1:
|
||||
var envelopeService envelope.Service
|
||||
if envelopeService, err = envelopeServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
|
||||
if envelopeService, err = envelopeServiceFactory(ctx, provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
|
||||
return nil, fmt.Errorf("could not configure KMS plugin %q, error: %v", provider.KMS.Name, err)
|
||||
}
|
||||
transformer, err = envelopePrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV1)
|
||||
|
|
@ -359,7 +367,7 @@ func prefixTransformers(config *apiserverconfig.ResourceConfiguration) ([]value.
|
|||
}
|
||||
|
||||
var envelopeService envelopekmsv2.Service
|
||||
if envelopeService, err = envelopeKMSv2ServiceFactory(provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
|
||||
if envelopeService, err = envelopeKMSv2ServiceFactory(ctx, provider.KMS.Endpoint, provider.KMS.Timeout.Duration); err != nil {
|
||||
return nil, fmt.Errorf("could not configure KMSv2 plugin %q, error: %v", provider.KMS.Name, err)
|
||||
}
|
||||
transformer, err = envelopekmsv2PrefixTransformer(provider.KMS, envelopeService, kmsTransformerPrefixV2)
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ func (t *testKMSv2EnvelopeService) Status(ctx context.Context) (*envelopekmsv2.S
|
|||
}
|
||||
|
||||
// The factory method to create mock envelope service.
|
||||
func newMockEnvelopeService(endpoint string, timeout time.Duration) (envelope.Service, error) {
|
||||
func newMockEnvelopeService(ctx context.Context, endpoint string, timeout time.Duration) (envelope.Service, error) {
|
||||
return &testEnvelopeService{nil}, nil
|
||||
}
|
||||
|
||||
|
|
@ -125,7 +125,7 @@ func newMockErrorEnvelopeService(endpoint string, timeout time.Duration) (envelo
|
|||
}
|
||||
|
||||
// The factory method to create mock envelope kmsv2 service.
|
||||
func newMockEnvelopeKMSv2Service(endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) {
|
||||
func newMockEnvelopeKMSv2Service(ctx context.Context, endpoint string, timeout time.Duration) (envelopekmsv2.Service, error) {
|
||||
return &testKMSv2EnvelopeService{nil}, nil
|
||||
}
|
||||
|
||||
|
|
@ -193,41 +193,43 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
|||
envelopeKMSv2ServiceFactory = factoryKMSv2
|
||||
}()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
// Creates compound/prefix transformers with different ordering of available transformers.
|
||||
// Transforms data using one of them, and tries to untransform using the others.
|
||||
// Repeats this for all possible combinations.
|
||||
correctConfigWithIdentityFirst := "testdata/valid-configs/identity-first.yaml"
|
||||
identityFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithIdentityFirst))
|
||||
identityFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithIdentityFirst), ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithIdentityFirst)
|
||||
}
|
||||
|
||||
correctConfigWithAesGcmFirst := "testdata/valid-configs/aes-gcm-first.yaml"
|
||||
aesGcmFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesGcmFirst))
|
||||
aesGcmFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesGcmFirst), ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesGcmFirst)
|
||||
}
|
||||
|
||||
correctConfigWithAesCbcFirst := "testdata/valid-configs/aes-cbc-first.yaml"
|
||||
aesCbcFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesCbcFirst))
|
||||
aesCbcFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithAesCbcFirst), ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithAesCbcFirst)
|
||||
}
|
||||
|
||||
correctConfigWithSecretboxFirst := "testdata/valid-configs/secret-box-first.yaml"
|
||||
secretboxFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithSecretboxFirst))
|
||||
secretboxFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithSecretboxFirst), ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithSecretboxFirst)
|
||||
}
|
||||
|
||||
correctConfigWithKMSFirst := "testdata/valid-configs/kms-first.yaml"
|
||||
kmsFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSFirst))
|
||||
kmsFirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSFirst), ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSFirst)
|
||||
}
|
||||
|
||||
correctConfigWithKMSv2First := "testdata/valid-configs/kmsv2-first.yaml"
|
||||
kmsv2FirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSv2First))
|
||||
kmsv2FirstTransformerOverrides, err := parseEncryptionConfiguration(mustConfigReader(t, correctConfigWithKMSv2First), ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatalf("error while parsing configuration file: %s.\nThe file was:\n%s", err, correctConfigWithKMSv2First)
|
||||
}
|
||||
|
|
@ -240,7 +242,6 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
|||
kmsFirstTransformer := kmsFirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
kmsv2FirstTransformer := kmsv2FirstTransformerOverrides[schema.ParseGroupResource("secrets")]
|
||||
|
||||
ctx := context.Background()
|
||||
dataCtx := value.DefaultContext([]byte(sampleContextText))
|
||||
originalText := []byte(sampleText)
|
||||
|
||||
|
|
@ -280,11 +281,13 @@ func TestEncryptionProviderConfigCorrect(t *testing.T) {
|
|||
func TestKMSPluginHealthz(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.KMSv2, true)()
|
||||
|
||||
service, err := envelope.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
ctx := testContext(t)
|
||||
|
||||
service, err := envelope.NewGRPCService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not initialize envelopeService, error: %v", err)
|
||||
}
|
||||
serviceKMSv2, err := envelopekmsv2.NewGRPCService("unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
serviceKMSv2, err := envelopekmsv2.NewGRPCService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("Could not initialize kmsv2 envelopeService, error: %v", err)
|
||||
}
|
||||
|
|
@ -347,7 +350,7 @@ func TestKMSPluginHealthz(t *testing.T) {
|
|||
|
||||
for _, tt := range testCases {
|
||||
t.Run(tt.desc, func(t *testing.T) {
|
||||
got, err := getKMSPluginProbes(mustConfigReader(t, tt.config))
|
||||
got, err := getKMSPluginProbes(mustConfigReader(t, tt.config), ctx.Done())
|
||||
if err != nil && !tt.wantErr {
|
||||
t.Fatalf("got %v, want nil for error", err)
|
||||
}
|
||||
|
|
@ -360,7 +363,9 @@ func TestKMSPluginHealthz(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestKMSPluginHealthzTTL(t *testing.T) {
|
||||
service, _ := newMockEnvelopeService("unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
ctx := testContext(t)
|
||||
|
||||
service, _ := newMockEnvelopeService(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
errService, _ := newMockErrorEnvelopeService("unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
|
||||
testCases := []struct {
|
||||
|
|
@ -403,7 +408,9 @@ func TestKMSPluginHealthzTTL(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestKMSv2PluginHealthzTTL(t *testing.T) {
|
||||
service, _ := newMockEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
ctx := testContext(t)
|
||||
|
||||
service, _ := newMockEnvelopeKMSv2Service(ctx, "unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
errService, _ := newMockErrorEnvelopeKMSv2Service("unix:///tmp/testprovider.sock", 3*time.Second)
|
||||
|
||||
testCases := []struct {
|
||||
|
|
@ -529,8 +536,10 @@ func testCBCKeyRotationWithProviders(t *testing.T, firstEncryptionConfig, firstP
|
|||
}
|
||||
|
||||
func getTransformerFromEncryptionConfig(t *testing.T, encryptionConfigPath string) value.Transformer {
|
||||
ctx := testContext(t)
|
||||
|
||||
t.Helper()
|
||||
transformers, err := parseEncryptionConfiguration(mustConfigReader(t, encryptionConfigPath))
|
||||
transformers, err := parseEncryptionConfiguration(mustConfigReader(t, encryptionConfigPath), ctx.Done())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
@ -577,3 +586,9 @@ func TestIsKMSv2ProviderHealthyError(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testContext(t *testing.T) context.Context {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
return ctx
|
||||
}
|
||||
|
|
|
|||
|
|
@ -200,7 +200,7 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
|
|||
transformerOverrides := make(map[schema.GroupResource]value.Transformer)
|
||||
if len(s.EncryptionProviderConfigFilepath) > 0 {
|
||||
var err error
|
||||
transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath)
|
||||
transformerOverrides, err = encryptionconfig.GetTransformerOverrides(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
@ -246,7 +246,7 @@ func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
|
|||
}))
|
||||
|
||||
if s.EncryptionProviderConfigFilepath != "" {
|
||||
kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath)
|
||||
kmsPluginHealthzChecks, err := encryptionconfig.GetKMSPluginHealthzCheckers(s.EncryptionProviderConfigFilepath, c.DrainedNotify())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,13 +24,13 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
|
||||
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -52,7 +52,7 @@ type gRPCService struct {
|
|||
}
|
||||
|
||||
// NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider.
|
||||
func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) {
|
||||
func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) {
|
||||
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
|
||||
|
||||
addr, err := util.ParseEndpoint(endpoint)
|
||||
|
|
@ -84,6 +84,14 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error)
|
|||
}
|
||||
|
||||
s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection)
|
||||
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
<-ctx.Done()
|
||||
_ = s.connection.Close()
|
||||
}()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ limitations under the License.
|
|||
package envelope
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
|
@ -56,7 +57,9 @@ func TestKMSPluginLateStart(t *testing.T) {
|
|||
callTimeout := 3 * time.Second
|
||||
s := newEndpoint()
|
||||
|
||||
service, err := NewGRPCService(s.endpoint, callTimeout)
|
||||
ctx := testContext(t)
|
||||
|
||||
service, err := NewGRPCService(ctx, s.endpoint, callTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
|
|
@ -132,12 +135,14 @@ func TestTimeouts(t *testing.T) {
|
|||
testCompletedWG.Add(1)
|
||||
defer testCompletedWG.Done()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
kubeAPIServerWG.Add(1)
|
||||
go func() {
|
||||
// Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase.
|
||||
time.Sleep(tt.kubeAPIServerDelay)
|
||||
|
||||
service, err = NewGRPCService(socketName.endpoint, tt.callTimeout)
|
||||
service, err = NewGRPCService(ctx, socketName.endpoint, tt.callTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
|
|
@ -203,8 +208,10 @@ func TestIntermittentConnectionLoss(t *testing.T) {
|
|||
t.Fatalf("Failed to start kms-plugin, err: %v", err)
|
||||
}
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
// connect to kms plugin
|
||||
service, err := NewGRPCService(endpoint.endpoint, timeout)
|
||||
service, err := NewGRPCService(ctx, endpoint.endpoint, timeout)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
|
|
@ -271,7 +278,9 @@ func TestUnsupportedVersion(t *testing.T) {
|
|||
}
|
||||
defer f.CleanUp()
|
||||
|
||||
s, err := NewGRPCService(endpoint.endpoint, 1*time.Second)
|
||||
ctx := testContext(t)
|
||||
|
||||
s, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
@ -285,7 +294,7 @@ func TestUnsupportedVersion(t *testing.T) {
|
|||
|
||||
destroyService(s)
|
||||
|
||||
s, err = NewGRPCService(endpoint.endpoint, 1*time.Second)
|
||||
s, err = NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
@ -312,8 +321,10 @@ func TestGRPCService(t *testing.T) {
|
|||
}
|
||||
defer f.CleanUp()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
// Create the gRPC client service.
|
||||
service, err := NewGRPCService(endpoint.endpoint, 1*time.Second)
|
||||
service, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
|
|
@ -351,8 +362,10 @@ func TestGRPCServiceConcurrentAccess(t *testing.T) {
|
|||
}
|
||||
defer f.CleanUp()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
// Create the gRPC client service.
|
||||
service, err := NewGRPCService(endpoint.endpoint, 15*time.Second)
|
||||
service, err := NewGRPCService(ctx, endpoint.endpoint, 15*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
|
|
@ -406,6 +419,8 @@ func TestInvalidConfiguration(t *testing.T) {
|
|||
}
|
||||
defer f.CleanUp()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
invalidConfigs := []struct {
|
||||
name string
|
||||
endpoint string
|
||||
|
|
@ -416,10 +431,16 @@ func TestInvalidConfiguration(t *testing.T) {
|
|||
|
||||
for _, testCase := range invalidConfigs {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
_, err := NewGRPCService(testCase.endpoint, 1*time.Second)
|
||||
_, err := NewGRPCService(ctx, testCase.endpoint, 1*time.Second)
|
||||
if err == nil {
|
||||
t.Fatalf("should fail to create envelope service for %s.", testCase.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testContext(t *testing.T) context.Context {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
return ctx
|
||||
}
|
||||
|
|
|
|||
|
|
@ -23,12 +23,12 @@ import (
|
|||
"net"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/util"
|
||||
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v2alpha1"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -44,7 +44,7 @@ type gRPCService struct {
|
|||
}
|
||||
|
||||
// NewGRPCService returns an envelope.Service which use gRPC to communicate the remote KMS provider.
|
||||
func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error) {
|
||||
func NewGRPCService(ctx context.Context, endpoint string, callTimeout time.Duration) (Service, error) {
|
||||
klog.V(4).Infof("Configure KMS provider with endpoint: %s", endpoint)
|
||||
|
||||
addr, err := util.ParseEndpoint(endpoint)
|
||||
|
|
@ -75,6 +75,14 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error)
|
|||
}
|
||||
|
||||
s.kmsClient = kmsapi.NewKeyManagementServiceClient(s.connection)
|
||||
|
||||
go func() {
|
||||
defer utilruntime.HandleCrash()
|
||||
|
||||
<-ctx.Done()
|
||||
_ = s.connection.Close()
|
||||
}()
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -54,7 +54,9 @@ func TestKMSPluginLateStart(t *testing.T) {
|
|||
callTimeout := 3 * time.Second
|
||||
s := newEndpoint()
|
||||
|
||||
service, err := NewGRPCService(s.endpoint, callTimeout)
|
||||
ctx := testContext(t)
|
||||
|
||||
service, err := NewGRPCService(ctx, s.endpoint, callTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
|
|
@ -72,7 +74,7 @@ func TestKMSPluginLateStart(t *testing.T) {
|
|||
|
||||
data := []byte("test data")
|
||||
uid := string(uuid.NewUUID())
|
||||
_, err = service.Encrypt(context.Background(), uid, data)
|
||||
_, err = service.Encrypt(ctx, uid, data)
|
||||
if err != nil {
|
||||
t.Fatalf("failed when execute encrypt, error: %v", err)
|
||||
}
|
||||
|
|
@ -131,12 +133,14 @@ func TestTimeouts(t *testing.T) {
|
|||
testCompletedWG.Add(1)
|
||||
defer testCompletedWG.Done()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
kubeAPIServerWG.Add(1)
|
||||
go func() {
|
||||
// Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase.
|
||||
time.Sleep(tt.kubeAPIServerDelay)
|
||||
|
||||
service, err = NewGRPCService(socketName.endpoint, tt.callTimeout)
|
||||
service, err = NewGRPCService(ctx, socketName.endpoint, tt.callTimeout)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
|
|
@ -165,7 +169,7 @@ func TestTimeouts(t *testing.T) {
|
|||
}()
|
||||
|
||||
kubeAPIServerWG.Wait()
|
||||
_, err = service.Encrypt(context.Background(), uid, data)
|
||||
_, err = service.Encrypt(ctx, uid, data)
|
||||
|
||||
if err == nil && tt.wantErr != "" {
|
||||
t.Fatalf("got nil, want %s", tt.wantErr)
|
||||
|
|
@ -203,14 +207,15 @@ func TestIntermittentConnectionLoss(t *testing.T) {
|
|||
t.Fatalf("Failed to start kms-plugin, err: %v", err)
|
||||
}
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
// connect to kms plugin
|
||||
service, err := NewGRPCService(endpoint.endpoint, timeout)
|
||||
service, err := NewGRPCService(ctx, endpoint.endpoint, timeout)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
defer destroyService(service)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = service.Encrypt(ctx, uid, data)
|
||||
if err != nil {
|
||||
t.Fatalf("failed when execute encrypt, error: %v", err)
|
||||
|
|
@ -269,14 +274,15 @@ func TestGRPCService(t *testing.T) {
|
|||
}
|
||||
defer f.CleanUp()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
// Create the gRPC client service.
|
||||
service, err := NewGRPCService(endpoint.endpoint, 1*time.Second)
|
||||
service, err := NewGRPCService(ctx, endpoint.endpoint, 1*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
defer destroyService(service)
|
||||
|
||||
ctx := context.Background()
|
||||
// Call service to encrypt data.
|
||||
data := []byte("test data")
|
||||
uid := string(uuid.NewUUID())
|
||||
|
|
@ -311,14 +317,15 @@ func TestGRPCServiceConcurrentAccess(t *testing.T) {
|
|||
}
|
||||
defer f.CleanUp()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
// Create the gRPC client service.
|
||||
service, err := NewGRPCService(endpoint.endpoint, 15*time.Second)
|
||||
service, err := NewGRPCService(ctx, endpoint.endpoint, 15*time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create envelope service, error: %v", err)
|
||||
}
|
||||
defer destroyService(service)
|
||||
|
||||
ctx := context.Background()
|
||||
var wg sync.WaitGroup
|
||||
n := 100
|
||||
wg.Add(n)
|
||||
|
|
@ -369,6 +376,8 @@ func TestInvalidConfiguration(t *testing.T) {
|
|||
}
|
||||
defer f.CleanUp()
|
||||
|
||||
ctx := testContext(t)
|
||||
|
||||
invalidConfigs := []struct {
|
||||
name string
|
||||
endpoint string
|
||||
|
|
@ -379,10 +388,16 @@ func TestInvalidConfiguration(t *testing.T) {
|
|||
|
||||
for _, testCase := range invalidConfigs {
|
||||
t.Run(testCase.name, func(t *testing.T) {
|
||||
_, err := NewGRPCService(testCase.endpoint, 1*time.Second)
|
||||
_, err := NewGRPCService(ctx, testCase.endpoint, 1*time.Second)
|
||||
if err == nil {
|
||||
t.Fatalf("should fail to create envelope service for %s.", testCase.name)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testContext(t *testing.T) context.Context {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
return ctx
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue