From 0326ed9810b51bea65717b33d3dd4aa0c848da29 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Wed, 6 Nov 2019 16:23:21 -0800 Subject: [PATCH] migrate token cache to cache.Expiring Kubernetes-commit: 3f194d5b413daeba93063f4610b9951069eaf13c --- .../authenticatorfactory/delegating.go | 3 ++- .../token/cache/cache_simple.go | 17 +++++++++------- pkg/authentication/token/cache/cache_test.go | 17 ++++++++++++---- .../token/cache/cached_token_authenticator.go | 8 ++++---- .../cache/cached_token_authenticator_test.go | 13 ++++++++++-- .../token/webhook/webhook_v1_test.go | 20 +++++++++++++------ .../token/webhook/webhook_v1beta1_test.go | 20 +++++++++++++------ 7 files changed, 68 insertions(+), 30 deletions(-) diff --git a/pkg/authentication/authenticatorfactory/delegating.go b/pkg/authentication/authenticatorfactory/delegating.go index b9c7e2e6e..7b384b6bd 100644 --- a/pkg/authentication/authenticatorfactory/delegating.go +++ b/pkg/authentication/authenticatorfactory/delegating.go @@ -17,6 +17,7 @@ limitations under the License. package authenticatorfactory import ( + "context" "errors" "time" @@ -83,7 +84,7 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur if err != nil { return nil, nil, err } - cachingTokenAuth := cache.New(tokenAuth, false, c.CacheTTL, c.CacheTTL) + cachingTokenAuth := cache.New(context.TODO(), tokenAuth, false, c.CacheTTL, c.CacheTTL) authenticators = append(authenticators, bearertoken.New(cachingTokenAuth), websocket.NewProtocolAuthenticator(cachingTokenAuth)) securityDefinitions["BearerToken"] = &spec.SecurityScheme{ diff --git a/pkg/authentication/token/cache/cache_simple.go b/pkg/authentication/token/cache/cache_simple.go index 18d5692d7..501209734 100644 --- a/pkg/authentication/token/cache/cache_simple.go +++ b/pkg/authentication/token/cache/cache_simple.go @@ -17,22 +17,25 @@ limitations under the License. package cache import ( + "context" "time" - lrucache "k8s.io/apimachinery/pkg/util/cache" + utilcache "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/clock" ) type simpleCache struct { - lru *lrucache.LRUExpireCache + cache *utilcache.Expiring } -func newSimpleCache(size int, clock clock.Clock) cache { - return &simpleCache{lru: lrucache.NewLRUExpireCacheWithClock(size, clock)} +func newSimpleCache(ctx context.Context, clock clock.Clock) cache { + c := &simpleCache{cache: utilcache.NewExpiringWithClock(clock)} + go c.cache.Run(ctx) + return c } func (c *simpleCache) get(key string) (*cacheRecord, bool) { - record, ok := c.lru.Get(key) + record, ok := c.cache.Get(key) if !ok { return nil, false } @@ -41,9 +44,9 @@ func (c *simpleCache) get(key string) (*cacheRecord, bool) { } func (c *simpleCache) set(key string, value *cacheRecord, ttl time.Duration) { - c.lru.Add(key, value, ttl) + c.cache.Set(key, value, ttl) } func (c *simpleCache) remove(key string) { - c.lru.Remove(key) + c.cache.Delete(key) } diff --git a/pkg/authentication/token/cache/cache_test.go b/pkg/authentication/token/cache/cache_test.go index b7fe4cb73..1984c4c6b 100644 --- a/pkg/authentication/token/cache/cache_test.go +++ b/pkg/authentication/token/cache/cache_test.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "fmt" "math/rand" "testing" @@ -30,7 +31,9 @@ import ( ) func TestSimpleCache(t *testing.T) { - testCache(newSimpleCache(4096, clock.RealClock{}), t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testCache(newSimpleCache(ctx, clock.RealClock{}), t) } // Note: the performance profile of this benchmark may not match that in the production. @@ -39,16 +42,22 @@ func TestSimpleCache(t *testing.T) { func BenchmarkCacheContentions(b *testing.B) { for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} { b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) { - benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + benchmarkCache(newSimpleCache(ctx, clock.RealClock{}), b, numKeys) }) b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) { - benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), b, numKeys) }) } } func TestStripedCache(t *testing.T) { - testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), t) } func benchmarkCache(cache cache, b *testing.B, numKeys int) { diff --git a/pkg/authentication/token/cache/cached_token_authenticator.go b/pkg/authentication/token/cache/cached_token_authenticator.go index d94866d5b..19d8e27e1 100644 --- a/pkg/authentication/token/cache/cached_token_authenticator.go +++ b/pkg/authentication/token/cache/cached_token_authenticator.go @@ -64,11 +64,11 @@ type cache interface { } // New returns a token authenticator that caches the results of the specified authenticator. A ttl of 0 bypasses the cache. -func New(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { - return newWithClock(authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) +func New(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token { + return newWithClock(ctx, authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{}) } -func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { +func newWithClock(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token { randomCacheKey := make([]byte, 32) if _, err := rand.Read(randomCacheKey); err != nil { panic(err) // rand should never fail @@ -86,7 +86,7 @@ func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, // used. Currently we advertise support 5k nodes and 10k // namespaces; a 32k entry cache is therefore a 2x safety // margin. - cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(1024, clock) }), + cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock) }), hashPool: &sync.Pool{ New: func() interface{} { diff --git a/pkg/authentication/token/cache/cached_token_authenticator_test.go b/pkg/authentication/token/cache/cached_token_authenticator_test.go index c6fb207e3..921f079e9 100644 --- a/pkg/authentication/token/cache/cached_token_authenticator_test.go +++ b/pkg/authentication/token/cache/cached_token_authenticator_test.go @@ -50,7 +50,10 @@ func TestCachedTokenAuthenticator(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) calledWithToken, resultUsers, resultOk, resultErr = []string{}, nil, false, nil a.AuthenticateToken(context.Background(), "bad1") @@ -124,7 +127,10 @@ func TestCachedTokenAuthenticatorWithAudiences(t *testing.T) { }) fakeClock := utilclock.NewFakeClock(time.Now()) - a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock) resultUsers["audAusertoken1"] = &user.DefaultInfo{Name: "user1"} resultUsers["audBusertoken1"] = &user.DefaultInfo{Name: "user1-different"} @@ -270,6 +276,8 @@ func (s *singleBenchmark) run(b *testing.B) { } func (s *singleBenchmark) bench(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() // Simulate slowness, qps limit, external service limitation, etc const maxInFlight = 40 chokepoint := make(chan struct{}, maxInFlight) @@ -277,6 +285,7 @@ func (s *singleBenchmark) bench(b *testing.B) { var lookups uint64 a := newWithClock( + ctx, authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) { atomic.AddUint64(&lookups, 1) diff --git a/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go b/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go index 518cf7063..fa02b605b 100644 --- a/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go +++ b/plugin/pkg/authenticator/token/webhook/webhook_v1_test.go @@ -170,7 +170,7 @@ func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode } // newV1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -203,7 +203,7 @@ func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, return nil, err } - return cache.New(authn, false, cacheTime, cacheTime), nil + return cache.New(ctx, authn, false, cacheTime, cacheTime), nil } func TestV1TLSConfig(t *testing.T) { @@ -259,7 +259,10 @@ func TestV1TLSConfig(t *testing.T) { } defer server.Close() - wh, err := newV1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -482,12 +485,14 @@ func TestV1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } - ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -554,8 +559,11 @@ func TestV1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) } diff --git a/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go b/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go index 5c228b690..998d6a782 100644 --- a/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go +++ b/plugin/pkg/authenticator/token/webhook/webhook_v1beta1_test.go @@ -172,7 +172,7 @@ func (m *mockV1beta1Service) HTTPStatusCode() int { return m.statusCode } // newV1beta1TokenAuthenticator creates a temporary kubeconfig file from the provided // arguments and attempts to load a new WebhookTokenAuthenticator from it. -func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { +func newV1beta1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) { tempfile, err := ioutil.TempFile("", "") if err != nil { return nil, err @@ -205,7 +205,7 @@ func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca [] return nil, err } - return cache.New(authn, false, cacheTime, cacheTime), nil + return cache.New(ctx, authn, false, cacheTime, cacheTime), nil } func TestV1beta1TLSConfig(t *testing.T) { @@ -261,7 +261,10 @@ func TestV1beta1TLSConfig(t *testing.T) { } defer server.Close() - wh, err := newV1beta1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1beta1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil) if err != nil { t.Errorf("%s: failed to create client: %v", tt.test, err) return @@ -484,12 +487,14 @@ func TestV1beta1WebhookTokenAuthenticator(t *testing.T) { token := "my-s3cr3t-t0ken" for _, tt := range tests { t.Run(tt.description, func(t *testing.T) { - wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds) if err != nil { t.Fatal(err) } - ctx := context.Background() if tt.reqAuds != nil { ctx = authenticator.WithAudiences(ctx, tt.reqAuds) } @@ -556,8 +561,11 @@ func TestV1beta1WebhookCacheAndRetry(t *testing.T) { } defer s.Close() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create an authenticator that caches successful responses "forever" (100 days). - wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) + wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil) if err != nil { t.Fatal(err) }