migrate token cache to cache.Expiring
Kubernetes-commit: 3f194d5b413daeba93063f4610b9951069eaf13c
This commit is contained in:
parent
dadb023ccc
commit
0326ed9810
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package authenticatorfactory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
|
@ -83,7 +84,7 @@ func (c DelegatingAuthenticatorConfig) New() (authenticator.Request, *spec.Secur
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
cachingTokenAuth := cache.New(tokenAuth, false, c.CacheTTL, c.CacheTTL)
|
||||
cachingTokenAuth := cache.New(context.TODO(), tokenAuth, false, c.CacheTTL, c.CacheTTL)
|
||||
authenticators = append(authenticators, bearertoken.New(cachingTokenAuth), websocket.NewProtocolAuthenticator(cachingTokenAuth))
|
||||
|
||||
securityDefinitions["BearerToken"] = &spec.SecurityScheme{
|
||||
|
|
|
@ -17,22 +17,25 @@ limitations under the License.
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
lrucache "k8s.io/apimachinery/pkg/util/cache"
|
||||
utilcache "k8s.io/apimachinery/pkg/util/cache"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
type simpleCache struct {
|
||||
lru *lrucache.LRUExpireCache
|
||||
cache *utilcache.Expiring
|
||||
}
|
||||
|
||||
func newSimpleCache(size int, clock clock.Clock) cache {
|
||||
return &simpleCache{lru: lrucache.NewLRUExpireCacheWithClock(size, clock)}
|
||||
func newSimpleCache(ctx context.Context, clock clock.Clock) cache {
|
||||
c := &simpleCache{cache: utilcache.NewExpiringWithClock(clock)}
|
||||
go c.cache.Run(ctx)
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *simpleCache) get(key string) (*cacheRecord, bool) {
|
||||
record, ok := c.lru.Get(key)
|
||||
record, ok := c.cache.Get(key)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
@ -41,9 +44,9 @@ func (c *simpleCache) get(key string) (*cacheRecord, bool) {
|
|||
}
|
||||
|
||||
func (c *simpleCache) set(key string, value *cacheRecord, ttl time.Duration) {
|
||||
c.lru.Add(key, value, ttl)
|
||||
c.cache.Set(key, value, ttl)
|
||||
}
|
||||
|
||||
func (c *simpleCache) remove(key string) {
|
||||
c.lru.Remove(key)
|
||||
c.cache.Delete(key)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
@ -30,7 +31,9 @@ import (
|
|||
)
|
||||
|
||||
func TestSimpleCache(t *testing.T) {
|
||||
testCache(newSimpleCache(4096, clock.RealClock{}), t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
testCache(newSimpleCache(ctx, clock.RealClock{}), t)
|
||||
}
|
||||
|
||||
// Note: the performance profile of this benchmark may not match that in the production.
|
||||
|
@ -39,16 +42,22 @@ func TestSimpleCache(t *testing.T) {
|
|||
func BenchmarkCacheContentions(b *testing.B) {
|
||||
for _, numKeys := range []int{1 << 8, 1 << 12, 1 << 16} {
|
||||
b.Run(fmt.Sprintf("Simple/keys=%d", numKeys), func(b *testing.B) {
|
||||
benchmarkCache(newSimpleCache(4096, clock.RealClock{}), b, numKeys)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
benchmarkCache(newSimpleCache(ctx, clock.RealClock{}), b, numKeys)
|
||||
})
|
||||
b.Run(fmt.Sprintf("Striped/keys=%d", numKeys), func(b *testing.B) {
|
||||
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), b, numKeys)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
benchmarkCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), b, numKeys)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestStripedCache(t *testing.T) {
|
||||
testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(128, clock.RealClock{}) }), t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
testCache(newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock.RealClock{}) }), t)
|
||||
}
|
||||
|
||||
func benchmarkCache(cache cache, b *testing.B, numKeys int) {
|
||||
|
|
|
@ -64,11 +64,11 @@ type cache interface {
|
|||
}
|
||||
|
||||
// New returns a token authenticator that caches the results of the specified authenticator. A ttl of 0 bypasses the cache.
|
||||
func New(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token {
|
||||
return newWithClock(authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{})
|
||||
func New(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration) authenticator.Token {
|
||||
return newWithClock(ctx, authenticator, cacheErrs, successTTL, failureTTL, utilclock.RealClock{})
|
||||
}
|
||||
|
||||
func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token {
|
||||
func newWithClock(ctx context.Context, authenticator authenticator.Token, cacheErrs bool, successTTL, failureTTL time.Duration, clock utilclock.Clock) authenticator.Token {
|
||||
randomCacheKey := make([]byte, 32)
|
||||
if _, err := rand.Read(randomCacheKey); err != nil {
|
||||
panic(err) // rand should never fail
|
||||
|
@ -86,7 +86,7 @@ func newWithClock(authenticator authenticator.Token, cacheErrs bool, successTTL,
|
|||
// used. Currently we advertise support 5k nodes and 10k
|
||||
// namespaces; a 32k entry cache is therefore a 2x safety
|
||||
// margin.
|
||||
cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(1024, clock) }),
|
||||
cache: newStripedCache(32, fnvHashFunc, func() cache { return newSimpleCache(ctx, clock) }),
|
||||
|
||||
hashPool: &sync.Pool{
|
||||
New: func() interface{} {
|
||||
|
|
|
@ -50,7 +50,10 @@ func TestCachedTokenAuthenticator(t *testing.T) {
|
|||
})
|
||||
fakeClock := utilclock.NewFakeClock(time.Now())
|
||||
|
||||
a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
|
||||
calledWithToken, resultUsers, resultOk, resultErr = []string{}, nil, false, nil
|
||||
a.AuthenticateToken(context.Background(), "bad1")
|
||||
|
@ -124,7 +127,10 @@ func TestCachedTokenAuthenticatorWithAudiences(t *testing.T) {
|
|||
})
|
||||
fakeClock := utilclock.NewFakeClock(time.Now())
|
||||
|
||||
a := newWithClock(fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
a := newWithClock(ctx, fakeAuth, true, time.Minute, 0, fakeClock)
|
||||
|
||||
resultUsers["audAusertoken1"] = &user.DefaultInfo{Name: "user1"}
|
||||
resultUsers["audBusertoken1"] = &user.DefaultInfo{Name: "user1-different"}
|
||||
|
@ -270,6 +276,8 @@ func (s *singleBenchmark) run(b *testing.B) {
|
|||
}
|
||||
|
||||
func (s *singleBenchmark) bench(b *testing.B) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
// Simulate slowness, qps limit, external service limitation, etc
|
||||
const maxInFlight = 40
|
||||
chokepoint := make(chan struct{}, maxInFlight)
|
||||
|
@ -277,6 +285,7 @@ func (s *singleBenchmark) bench(b *testing.B) {
|
|||
var lookups uint64
|
||||
|
||||
a := newWithClock(
|
||||
ctx,
|
||||
authenticator.TokenFunc(func(ctx context.Context, token string) (*authenticator.Response, bool, error) {
|
||||
atomic.AddUint64(&lookups, 1)
|
||||
|
||||
|
|
|
@ -170,7 +170,7 @@ func (m *mockV1Service) HTTPStatusCode() int { return m.statusCode }
|
|||
|
||||
// newV1TokenAuthenticator creates a temporary kubeconfig file from the provided
|
||||
// arguments and attempts to load a new WebhookTokenAuthenticator from it.
|
||||
func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
func newV1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
tempfile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -203,7 +203,7 @@ func newV1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte,
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return cache.New(authn, false, cacheTime, cacheTime), nil
|
||||
return cache.New(ctx, authn, false, cacheTime, cacheTime), nil
|
||||
}
|
||||
|
||||
func TestV1TLSConfig(t *testing.T) {
|
||||
|
@ -259,7 +259,10 @@ func TestV1TLSConfig(t *testing.T) {
|
|||
}
|
||||
defer server.Close()
|
||||
|
||||
wh, err := newV1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: failed to create client: %v", tt.test, err)
|
||||
return
|
||||
|
@ -482,12 +485,14 @@ func TestV1WebhookTokenAuthenticator(t *testing.T) {
|
|||
token := "my-s3cr3t-t0ken"
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if tt.reqAuds != nil {
|
||||
ctx = authenticator.WithAudiences(ctx, tt.reqAuds)
|
||||
}
|
||||
|
@ -554,8 +559,11 @@ func TestV1WebhookCacheAndRetry(t *testing.T) {
|
|||
}
|
||||
defer s.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create an authenticator that caches successful responses "forever" (100 days).
|
||||
wh, err := newV1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
wh, err := newV1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -172,7 +172,7 @@ func (m *mockV1beta1Service) HTTPStatusCode() int { return m.statusCode }
|
|||
|
||||
// newV1beta1TokenAuthenticator creates a temporary kubeconfig file from the provided
|
||||
// arguments and attempts to load a new WebhookTokenAuthenticator from it.
|
||||
func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
func newV1beta1TokenAuthenticator(ctx context.Context, serverURL string, clientCert, clientKey, ca []byte, cacheTime time.Duration, implicitAuds authenticator.Audiences) (authenticator.Token, error) {
|
||||
tempfile, err := ioutil.TempFile("", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -205,7 +205,7 @@ func newV1beta1TokenAuthenticator(serverURL string, clientCert, clientKey, ca []
|
|||
return nil, err
|
||||
}
|
||||
|
||||
return cache.New(authn, false, cacheTime, cacheTime), nil
|
||||
return cache.New(ctx, authn, false, cacheTime, cacheTime), nil
|
||||
}
|
||||
|
||||
func TestV1beta1TLSConfig(t *testing.T) {
|
||||
|
@ -261,7 +261,10 @@ func TestV1beta1TLSConfig(t *testing.T) {
|
|||
}
|
||||
defer server.Close()
|
||||
|
||||
wh, err := newV1beta1TokenAuthenticator(server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1beta1TokenAuthenticator(ctx, server.URL, tt.clientCert, tt.clientKey, tt.clientCA, 0, nil)
|
||||
if err != nil {
|
||||
t.Errorf("%s: failed to create client: %v", tt.test, err)
|
||||
return
|
||||
|
@ -484,12 +487,14 @@ func TestV1beta1WebhookTokenAuthenticator(t *testing.T) {
|
|||
token := "my-s3cr3t-t0ken"
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description, func(t *testing.T) {
|
||||
wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 0, tt.implicitAuds)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if tt.reqAuds != nil {
|
||||
ctx = authenticator.WithAudiences(ctx, tt.reqAuds)
|
||||
}
|
||||
|
@ -556,8 +561,11 @@ func TestV1beta1WebhookCacheAndRetry(t *testing.T) {
|
|||
}
|
||||
defer s.Close()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Create an authenticator that caches successful responses "forever" (100 days).
|
||||
wh, err := newV1beta1TokenAuthenticator(s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
wh, err := newV1beta1TokenAuthenticator(ctx, s.URL, clientCert, clientKey, caCert, 2400*time.Hour, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue