[KMSv2] Add tracing

Signed-off-by: Anish Ramasekar <anish.ramasekar@gmail.com>

Kubernetes-commit: 8d3a25c7c98d77419111a02917f459aab8970087
This commit is contained in:
Anish Ramasekar 2023-10-09 23:43:46 +00:00 committed by Kubernetes Publisher
parent 0e3cbbf0fb
commit bfdac7f8f4
2 changed files with 215 additions and 1 deletions

View File

@ -28,6 +28,7 @@ import (
"unsafe" "unsafe"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/crypto/cryptobyte" "golang.org/x/crypto/cryptobyte"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -39,6 +40,7 @@ import (
aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes" aestransformer "k8s.io/apiserver/pkg/storage/value/encrypt/aes"
kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2" kmstypes "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/kmsv2/v2"
"k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics" "k8s.io/apiserver/pkg/storage/value/encrypt/envelope/metrics"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2" "k8s.io/klog/v2"
kmsservice "k8s.io/kms/pkg/service" kmsservice "k8s.io/kms/pkg/service"
"k8s.io/utils/clock" "k8s.io/utils/clock"
@ -133,11 +135,28 @@ func newEnvelopeTransformerWithClock(envelopeService kmsservice.Service, provide
// TransformFromStorage decrypts data encrypted by this transformer using envelope encryption. // TransformFromStorage decrypts data encrypted by this transformer using envelope encryption.
func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) {
ctx, span := tracing.Start(ctx, "TransformFromStorage with envelopeTransformer",
attribute.String("transformer.provider.name", t.providerName),
// The service.instance_id of the apiserver is already available in the trace
/*
{
"key": "service.instance.id",
"type": "string",
"value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m"
}
*/
)
defer span.End(500 * time.Millisecond)
span.AddEvent("About to decode encrypted object")
// Deserialize the EncryptedObject from the data. // Deserialize the EncryptedObject from the data.
encryptedObject, err := t.doDecode(data) encryptedObject, err := t.doDecode(data)
if err != nil { if err != nil {
span.AddEvent("Decoding encrypted object failed")
span.RecordError(err)
return nil, false, err return nil, false, err
} }
span.AddEvent("Decoded encrypted object")
useSeed := encryptedObject.EncryptedDEKSourceType == kmstypes.EncryptedDEKSourceType_HKDF_SHA256_XNONCE_AES_GCM_SEED useSeed := encryptedObject.EncryptedDEKSourceType == kmstypes.EncryptedDEKSourceType_HKDF_SHA256_XNONCE_AES_GCM_SEED
@ -158,6 +177,7 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
// fallback to the envelope service if we do not have the transformer locally // fallback to the envelope service if we do not have the transformer locally
if transformer == nil { if transformer == nil {
span.AddEvent("About to decrypt DEK using remote service")
value.RecordCacheMiss() value.RecordCacheMiss()
requestInfo := getRequestInfoFromContext(ctx) requestInfo := getRequestInfoFromContext(ctx)
@ -172,8 +192,11 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
Annotations: encryptedObject.Annotations, Annotations: encryptedObject.Annotations,
}) })
if err != nil { if err != nil {
span.AddEvent("DEK decryption failed")
span.RecordError(err)
return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err) return nil, false, fmt.Errorf("failed to decrypt DEK, error: %w", err)
} }
span.AddEvent("DEK decryption succeeded")
transformer, err = t.addTransformerForDecryption(encryptedObjectCacheKey, key, useSeed) transformer, err = t.addTransformerForDecryption(encryptedObjectCacheKey, key, useSeed)
if err != nil { if err != nil {
@ -182,11 +205,15 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
} }
metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID, t.apiServerID) metrics.RecordKeyID(metrics.FromStorageLabel, t.providerName, encryptedObject.KeyID, t.apiServerID)
span.AddEvent("About to decrypt data using DEK")
out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx) out, stale, err := transformer.TransformFromStorage(ctx, encryptedObject.EncryptedData, dataCtx)
if err != nil { if err != nil {
span.AddEvent("Data decryption failed")
span.RecordError(err)
return nil, false, err return nil, false, err
} }
span.AddEvent("Data decryption succeeded")
// data is considered stale if the key ID does not match our current write transformer // data is considered stale if the key ID does not match our current write transformer
return out, return out,
stale || stale ||
@ -197,6 +224,19 @@ func (t *envelopeTransformer) TransformFromStorage(ctx context.Context, data []b
// TransformToStorage encrypts data to be written to disk using envelope encryption. // TransformToStorage encrypts data to be written to disk using envelope encryption.
func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
ctx, span := tracing.Start(ctx, "TransformToStorage with envelopeTransformer",
attribute.String("transformer.provider.name", t.providerName),
// The service.instance_id of the apiserver is already available in the trace
/*
{
"key": "service.instance.id",
"type": "string",
"value": "apiserver-zsteyir5lyrtdcmqqmd5kzze6m"
}
*/
)
defer span.End(500 * time.Millisecond)
state, err := t.stateFunc() state, err := t.stateFunc()
if err != nil { if err != nil {
return nil, err return nil, err
@ -215,18 +255,31 @@ func (t *envelopeTransformer) TransformToStorage(ctx context.Context, data []byt
"group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource, "group", requestInfo.APIGroup, "version", requestInfo.APIVersion, "resource", requestInfo.Resource, "subresource", requestInfo.Subresource,
"verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name) "verb", requestInfo.Verb, "namespace", requestInfo.Namespace, "name", requestInfo.Name)
span.AddEvent("About to encrypt data using DEK")
result, err := state.Transformer.TransformToStorage(ctx, data, dataCtx) result, err := state.Transformer.TransformToStorage(ctx, data, dataCtx)
if err != nil { if err != nil {
span.AddEvent("Data encryption failed")
span.RecordError(err)
return nil, err return nil, err
} }
span.AddEvent("Data encryption succeeded")
metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.EncryptedObject.KeyID, t.apiServerID) metrics.RecordKeyID(metrics.ToStorageLabel, t.providerName, state.EncryptedObject.KeyID, t.apiServerID)
encObjectCopy := state.EncryptedObject encObjectCopy := state.EncryptedObject
encObjectCopy.EncryptedData = result encObjectCopy.EncryptedData = result
span.AddEvent("About to encode encrypted object")
// Serialize the EncryptedObject to a byte array. // Serialize the EncryptedObject to a byte array.
return t.doEncode(&encObjectCopy) out, err := t.doEncode(&encObjectCopy)
if err != nil {
span.AddEvent("Encoding encrypted object failed")
span.RecordError(err)
return nil, err
}
span.AddEvent("Encoded encrypted object")
return out, nil
} }
// addTransformerForDecryption inserts a new transformer to the Envelope cache of DEKs for future reads. // addTransformerForDecryption inserts a new transformer to the Envelope cache of DEKs for future reads.

View File

@ -33,6 +33,8 @@ import (
"time" "time"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
utilrand "k8s.io/apimachinery/pkg/util/rand" utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
@ -1262,6 +1264,165 @@ func TestGenerateTransformer(t *testing.T) {
} }
} }
func TestEnvelopeTracing_TransformToStorage(t *testing.T) {
testCases := []struct {
desc string
expected []string
}{
{
desc: "encrypt",
expected: []string{
"About to encrypt data using DEK",
"Data encryption succeeded",
"About to encode encrypted object",
"Encoded encrypted object",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeRecorder := tracetest.NewSpanRecorder()
otelTracer := trace.NewTracerProvider(trace.WithSpanProcessor(fakeRecorder)).Tracer("test")
ctx := testContext(t)
ctx, span := otelTracer.Start(ctx, "parent")
defer span.End()
envelopeService := newTestEnvelopeService()
fakeClock := testingclock.NewFakeClock(time.Now())
state, err := testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())()
if err != nil {
t.Fatal(err)
}
transformer := newEnvelopeTransformerWithClock(envelopeService, testProviderName,
func() (State, error) { return state, nil }, testAPIServerID, 1*time.Second, fakeClock)
dataCtx := value.DefaultContext([]byte(testContextText))
originalText := []byte(testText)
if _, err := transformer.TransformToStorage(ctx, originalText, dataCtx); err != nil {
t.Fatalf("envelopeTransformer: error while transforming data to storage: %v", err)
}
output := fakeRecorder.Ended()
if len(output) != 1 {
t.Fatalf("expected 1 span, got %d", len(output))
}
out := output[0]
validateTraceSpan(t, out, "TransformToStorage with envelopeTransformer", testProviderName, testAPIServerID, tc.expected)
})
}
}
func TestEnvelopeTracing_TransformFromStorage(t *testing.T) {
testCases := []struct {
desc string
cacheTTL time.Duration
simulateKMSPluginFailure bool
expected []string
}{
{
desc: "decrypt",
cacheTTL: 5 * time.Second,
expected: []string{
"About to decode encrypted object",
"Decoded encrypted object",
"About to decrypt data using DEK",
"Data decryption succeeded",
},
},
{
desc: "decrypt with cache miss",
cacheTTL: 1 * time.Second,
expected: []string{
"About to decode encrypted object",
"Decoded encrypted object",
"About to decrypt DEK using remote service",
"DEK decryption succeeded",
"About to decrypt data using DEK",
"Data decryption succeeded",
},
},
{
desc: "decrypt with cache miss, simulate KMS plugin failure",
cacheTTL: 1 * time.Second,
simulateKMSPluginFailure: true,
expected: []string{
"About to decode encrypted object",
"Decoded encrypted object",
"About to decrypt DEK using remote service",
"DEK decryption failed",
"exception",
},
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeRecorder := tracetest.NewSpanRecorder()
otelTracer := trace.NewTracerProvider(trace.WithSpanProcessor(fakeRecorder)).Tracer("test")
ctx := testContext(t)
envelopeService := newTestEnvelopeService()
fakeClock := testingclock.NewFakeClock(time.Now())
state, err := testStateFunc(ctx, envelopeService, clock.RealClock{}, randomBool())()
if err != nil {
t.Fatal(err)
}
transformer := newEnvelopeTransformerWithClock(envelopeService, testProviderName,
func() (State, error) { return state, nil }, testAPIServerID, tc.cacheTTL, fakeClock)
dataCtx := value.DefaultContext([]byte(testContextText))
originalText := []byte(testText)
transformedData, _ := transformer.TransformToStorage(ctx, originalText, dataCtx)
// advance the clock to allow cache entries to expire depending on TTL
fakeClock.Step(2 * time.Second)
// force GC to run by performing a write
transformer.(*envelopeTransformer).cache.set([]byte("some-other-unrelated-key"), &envelopeTransformer{})
envelopeService.SetDisabledStatus(tc.simulateKMSPluginFailure)
// start recording only for the decrypt call
ctx, span := otelTracer.Start(ctx, "parent")
defer span.End()
_, _, _ = transformer.TransformFromStorage(ctx, transformedData, dataCtx)
output := fakeRecorder.Ended()
validateTraceSpan(t, output[0], "TransformFromStorage with envelopeTransformer", testProviderName, testAPIServerID, tc.expected)
})
}
}
func validateTraceSpan(t *testing.T, span trace.ReadOnlySpan, spanName, providerName, apiserverID string, expected []string) {
t.Helper()
if span.Name() != spanName {
t.Fatalf("expected span name %q, got %q", spanName, span.Name())
}
attrs := span.Attributes()
if len(attrs) != 1 {
t.Fatalf("expected 1 attributes, got %d", len(attrs))
}
if attrs[0].Key != "transformer.provider.name" && attrs[0].Value.AsString() != providerName {
t.Errorf("expected providerName %q, got %q", providerName, attrs[0].Value.AsString())
}
if len(span.Events()) != len(expected) {
t.Fatalf("expected %d events, got %d", len(expected), len(span.Events()))
}
for i, event := range span.Events() {
if event.Name != expected[i] {
t.Errorf("expected event %q, got %q", expected[i], event.Name)
}
}
}
func errString(err error) string { func errString(err error) string {
if err == nil { if err == nil {
return "" return ""