Merge pull request #83811 from immutableT/single-kms-mock

Use single kms-plugin mock in unit and integration tests.

Kubernetes-commit: 1f8b3bfd98c8099c5830b2c329867fa29c2b2575
This commit is contained in:
Kubernetes Publisher 2019-10-17 18:17:57 -07:00
commit ced80a6097
3 changed files with 255 additions and 104 deletions

View File

@ -68,6 +68,8 @@ func NewGRPCService(endpoint string, callTimeout time.Duration) (Service, error)
c, err := net.DialUnix(unixProtocol, nil, &net.UnixAddr{Name: addr})
if err != nil {
klog.Errorf("failed to create connection to unix socket: %s, error: %v", addr, err)
} else {
klog.V(4).Infof("Successfully dialed Unix socket %v", addr)
}
return c, err
}))

View File

@ -20,44 +20,56 @@ limitations under the License.
package envelope
import (
"context"
"encoding/base64"
"fmt"
"net"
"os"
"reflect"
"runtime"
"strings"
"sync"
"testing"
"time"
"google.golang.org/grpc"
mock "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing"
"k8s.io/apimachinery/pkg/util/uuid"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1"
)
type testSocket struct {
path string
endpoint string
}
// newEndpoint constructs a unique name for a Linux Abstract Socket to be used in a test.
// This package uses Linux Domain Sockets to remove the need for clean-up of socket files.
func newEndpoint() *testSocket {
p := fmt.Sprintf("@%s.sock", uuid.NewUUID())
return &testSocket{
path: p,
endpoint: fmt.Sprintf("unix:///%s", p),
}
}
// TestKMSPluginLateStart tests the scenario where kms-plugin pod/container starts after kube-apiserver pod/container.
// Since the Dial to kms-plugin is non-blocking we expect the construction of gRPC service to succeed even when
// kms-plugin is not yet up - dialing happens in the background.
func TestKMSPluginLateStart(t *testing.T) {
t.Parallel()
callTimeout := 3 * time.Second
endpoint := getSocketName()
s := newEndpoint()
service, err := NewGRPCService(endpoint, callTimeout)
service, err := NewGRPCService(s.endpoint, callTimeout)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
defer destroyService(service)
time.Sleep(callTimeout / 2)
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
f, err := mock.NewBase64Plugin(s.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
defer f.Stop()
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
data := []byte("test data")
_, err = service.Encrypt(data)
@ -113,7 +125,7 @@ func TestTimeouts(t *testing.T) {
kubeAPIServerWG sync.WaitGroup
kmsPluginWG sync.WaitGroup
testCompletedWG sync.WaitGroup
socketName = getSocketName()
socketName = newEndpoint()
)
testCompletedWG.Add(1)
@ -124,7 +136,7 @@ func TestTimeouts(t *testing.T) {
// Simulating late start of kube-apiserver - plugin is up before kube-apiserver, if requested by the testcase.
time.Sleep(tt.kubeAPIServerDelay)
service, err = NewGRPCService(socketName, tt.callTimeout)
service, err = NewGRPCService(socketName.endpoint, tt.callTimeout)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
@ -139,11 +151,14 @@ func TestTimeouts(t *testing.T) {
// Simulating delayed start of kms-plugin, kube-apiserver is up before the plugin, if requested by the testcase.
time.Sleep(tt.pluginDelay)
f, err := startFakeKMSProvider(kmsapiVersion, socketName)
f, err := mock.NewBase64Plugin(socketName.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
t.Fatalf("failed to construct test KMS provider server, error: %v", err)
}
defer f.Stop()
if err := f.Start(); err != nil {
t.Fatalf("Failed to start test KMS provider server, error: %v", err)
}
defer f.CleanUp()
kmsPluginWG.Done()
// Keeping plugin up to process requests.
testCompletedWG.Wait()
@ -175,16 +190,19 @@ func TestIntermittentConnectionLoss(t *testing.T) {
timeout = 30 * time.Second
blackOut = 1 * time.Second
data = []byte("test data")
endpoint = getSocketName()
endpoint = newEndpoint()
)
// Start KMS Plugin
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
f, err := mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
// connect to kms plugin
service, err := NewGRPCService(endpoint, timeout)
service, err := NewGRPCService(endpoint.endpoint, timeout)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
@ -198,7 +216,7 @@ func TestIntermittentConnectionLoss(t *testing.T) {
// Stop KMS Plugin - simulating connection loss
t.Log("KMS Plugin is stopping")
f.Stop()
f.CleanUp()
time.Sleep(2 * time.Second)
wg1.Add(1)
@ -217,11 +235,14 @@ func TestIntermittentConnectionLoss(t *testing.T) {
wg1.Wait()
time.Sleep(blackOut)
// Start KMS Plugin
f, err = startFakeKMSProvider(kmsapiVersion, endpoint)
f, err = mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
defer f.Stop()
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
t.Log("Restarted KMS Plugin")
wg2.Wait()
@ -232,15 +253,19 @@ func TestUnsupportedVersion(t *testing.T) {
ver := "invalid"
data := []byte("test data")
wantErr := fmt.Errorf(versionErrorf, ver, kmsapiVersion)
endpoint := getSocketName()
endpoint := newEndpoint()
f, err := startFakeKMSProvider(ver, endpoint)
f, err := mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %ver", err)
}
defer f.Stop()
f.SetVersion(ver)
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
s, err := NewGRPCService(endpoint, 1*time.Second)
s, err := NewGRPCService(endpoint.endpoint, 1*time.Second)
if err != nil {
t.Fatal(err)
}
@ -254,7 +279,7 @@ func TestUnsupportedVersion(t *testing.T) {
destroyService(s)
s, err = NewGRPCService(endpoint, 1*time.Second)
s, err = NewGRPCService(endpoint.endpoint, 1*time.Second)
if err != nil {
t.Fatal(err)
}
@ -271,15 +296,18 @@ func TestUnsupportedVersion(t *testing.T) {
func TestGRPCService(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
endpoint := getSocketName()
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
endpoint := newEndpoint()
f, err := mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
t.Fatalf("failed to construct test KMS provider server, error: %v", err)
}
defer f.Stop()
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
// Create the gRPC client service.
service, err := NewGRPCService(endpoint, 1*time.Second)
service, err := NewGRPCService(endpoint.endpoint, 1*time.Second)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
@ -307,15 +335,18 @@ func TestGRPCService(t *testing.T) {
func TestGRPCServiceConcurrentAccess(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
endpoint := getSocketName()
f, err := startFakeKMSProvider(kmsapiVersion, endpoint)
endpoint := newEndpoint()
f, err := mock.NewBase64Plugin(endpoint.path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
defer f.Stop()
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
// Create the gRPC client service.
service, err := NewGRPCService(endpoint, 15*time.Second)
service, err := NewGRPCService(endpoint.endpoint, 15*time.Second)
if err != nil {
t.Fatalf("failed to create envelope service, error: %v", err)
}
@ -356,32 +387,29 @@ func destroyService(service Service) {
}
}
func getSocketName() string {
return fmt.Sprintf("unix:///@%s.sock", uuid.NewUUID())
}
// Test all those invalid configuration for KMS provider.
func TestInvalidConfiguration(t *testing.T) {
t.Parallel()
// Start a test gRPC server.
f, err := startFakeKMSProvider(kmsapiVersion, getSocketName())
f, err := mock.NewBase64Plugin(newEndpoint().path)
if err != nil {
t.Fatalf("failed to start test KMS provider server, error: %v", err)
}
defer f.Stop()
if err := f.Start(); err != nil {
t.Fatalf("Failed to start kms-plugin, err: %v", err)
}
defer f.CleanUp()
invalidConfigs := []struct {
name string
apiVersion string
endpoint string
name string
endpoint string
}{
{"emptyConfiguration", kmsapiVersion, ""},
{"invalidScheme", kmsapiVersion, "tcp://localhost:6060"},
{"emptyConfiguration", ""},
{"invalidScheme", "tcp://localhost:6060"},
}
for _, testCase := range invalidConfigs {
t.Run(testCase.name, func(t *testing.T) {
f.apiVersion = testCase.apiVersion
_, err := NewGRPCService(testCase.endpoint, 1*time.Second)
if err == nil {
t.Fatalf("should fail to create envelope service for %s.", testCase.name)
@ -389,58 +417,3 @@ func TestInvalidConfiguration(t *testing.T) {
})
}
}
// Start the gRPC server that listens on unix socket.
func startFakeKMSProvider(version, endpoint string) (*fakeKMSPlugin, error) {
sockFile, err := parseEndpoint(endpoint)
if err != nil {
return nil, fmt.Errorf("failed to parse endpoint:%q, error %v", endpoint, err)
}
listener, err := net.Listen(unixProtocol, sockFile)
if err != nil {
return nil, fmt.Errorf("failed to listen on the unix socket, error: %v", err)
}
s := grpc.NewServer()
f := &fakeKMSPlugin{apiVersion: version, server: s, sockFile: sockFile}
kmsapi.RegisterKeyManagementServiceServer(s, f)
go s.Serve(listener)
return f, nil
}
// Fake gRPC sever for remote KMS provider.
// Use base64 to simulate encrypt and decrypt.
type fakeKMSPlugin struct {
apiVersion string
server *grpc.Server
sockFile string
}
func (s *fakeKMSPlugin) Stop() {
// Stop the server
s.server.Stop()
// If this isn't a Linux abstract namespace socket, or if we're on a non-linux platform, clean up the socket file
if !strings.HasPrefix(s.sockFile, "@") || runtime.GOOS != "linux" {
os.Remove(s.sockFile)
}
}
func (s *fakeKMSPlugin) Version(ctx context.Context, request *kmsapi.VersionRequest) (*kmsapi.VersionResponse, error) {
return &kmsapi.VersionResponse{Version: s.apiVersion, RuntimeName: "testKMS", RuntimeVersion: "0.0.1"}, nil
}
func (s *fakeKMSPlugin) Decrypt(ctx context.Context, request *kmsapi.DecryptRequest) (*kmsapi.DecryptResponse, error) {
buf := make([]byte, base64.StdEncoding.DecodedLen(len(request.Cipher)))
n, err := base64.StdEncoding.Decode(buf, request.Cipher)
if err != nil {
return nil, err
}
return &kmsapi.DecryptResponse{Plain: buf[:n]}, nil
}
func (s *fakeKMSPlugin) Encrypt(ctx context.Context, request *kmsapi.EncryptRequest) (*kmsapi.EncryptResponse, error) {
buf := make([]byte, base64.StdEncoding.EncodedLen(len(request.Plain)))
base64.StdEncoding.Encode(buf, request.Plain)
return &kmsapi.EncryptResponse{Cipher: buf}, nil
}

View File

@ -0,0 +1,176 @@
// +build !windows
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"context"
"encoding/base64"
"fmt"
"net"
"os"
"runtime"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/apimachinery/pkg/util/wait"
kmsapi "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1"
"k8s.io/klog"
)
const (
// Now only supported unix domain socket.
unixProtocol = "unix"
// Current version for the protocol interface definition.
kmsapiVersion = "v1beta1"
)
// Base64Plugin gRPC sever for a mock KMS provider.
// Uses base64 to simulate encrypt and decrypt.
type Base64Plugin struct {
grpcServer *grpc.Server
listener net.Listener
mu *sync.Mutex
lastEncryptRequest *kmsapi.EncryptRequest
inFailedState bool
ver string
socketPath string
}
// NewBase64Plugin is a constructor for Base64Plugin.
func NewBase64Plugin(socketPath string) (*Base64Plugin, error) {
server := grpc.NewServer()
result := &Base64Plugin{
grpcServer: server,
mu: &sync.Mutex{},
ver: kmsapiVersion,
socketPath: socketPath,
}
kmsapi.RegisterKeyManagementServiceServer(server, result)
return result, nil
}
// WaitForBase64PluginToBeUp waits until the plugin is ready to serve requests.
func WaitForBase64PluginToBeUp(plugin *Base64Plugin) error {
var gRPCErr error
pollErr := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
_, gRPCErr = plugin.Encrypt(context.Background(), &kmsapi.EncryptRequest{Plain: []byte("foo")})
return gRPCErr == nil, nil
})
if pollErr == wait.ErrWaitTimeout {
return fmt.Errorf("failed to start kms-plugin, error: %v", gRPCErr)
}
return nil
}
// LastEncryptRequest returns the last EncryptRequest.Plain sent to the plugin.
func (s *Base64Plugin) LastEncryptRequest() []byte {
return s.lastEncryptRequest.Plain
}
// SetVersion sets the version of kms-plugin.
func (s *Base64Plugin) SetVersion(ver string) {
s.ver = ver
}
// Start starts plugin's gRPC service.
func (s *Base64Plugin) Start() error {
var err error
s.listener, err = net.Listen(unixProtocol, s.socketPath)
if err != nil {
return fmt.Errorf("failed to listen on the unix socket, error: %v", err)
}
klog.Infof("Listening on %s", s.socketPath)
go s.grpcServer.Serve(s.listener)
return nil
}
// CleanUp stops gRPC server and the underlying listener.
func (s *Base64Plugin) CleanUp() {
s.grpcServer.Stop()
s.listener.Close()
if !strings.HasPrefix(s.socketPath, "@") || runtime.GOOS != "linux" {
os.Remove(s.socketPath)
}
}
// EnterFailedState places the plugin into failed state.
func (s *Base64Plugin) EnterFailedState() {
s.mu.Lock()
defer s.mu.Unlock()
s.inFailedState = true
}
// ExitFailedState removes the plugin from the failed state.
func (s *Base64Plugin) ExitFailedState() {
s.mu.Lock()
defer s.mu.Unlock()
s.inFailedState = false
}
// Version returns the version of the kms-plugin.
func (s *Base64Plugin) Version(ctx context.Context, request *kmsapi.VersionRequest) (*kmsapi.VersionResponse, error) {
klog.Infof("Received request for Version: %v", request)
return &kmsapi.VersionResponse{Version: s.ver, RuntimeName: "testKMS", RuntimeVersion: "0.0.1"}, nil
}
// Decrypt performs base64 decoding of the payload of kms.DecryptRequest.
func (s *Base64Plugin) Decrypt(ctx context.Context, request *kmsapi.DecryptRequest) (*kmsapi.DecryptResponse, error) {
klog.V(3).Infof("Received Decrypt Request for DEK: %s", string(request.Cipher))
s.mu.Lock()
defer s.mu.Unlock()
if s.inFailedState {
return nil, status.Error(codes.FailedPrecondition, "failed precondition - key disabled")
}
buf := make([]byte, base64.StdEncoding.DecodedLen(len(request.Cipher)))
n, err := base64.StdEncoding.Decode(buf, request.Cipher)
if err != nil {
return nil, err
}
return &kmsapi.DecryptResponse{Plain: buf[:n]}, nil
}
// Encrypt performs base64 encoding of the payload of kms.EncryptRequest.
func (s *Base64Plugin) Encrypt(ctx context.Context, request *kmsapi.EncryptRequest) (*kmsapi.EncryptResponse, error) {
klog.V(3).Infof("Received Encrypt Request for DEK: %x", request.Plain)
s.mu.Lock()
defer s.mu.Unlock()
s.lastEncryptRequest = request
if s.inFailedState {
return nil, status.Error(codes.FailedPrecondition, "failed precondition - key disabled")
}
buf := make([]byte, base64.StdEncoding.EncodedLen(len(request.Plain)))
base64.StdEncoding.Encode(buf, request.Plain)
return &kmsapi.EncryptResponse{Cipher: buf}, nil
}