rls: move the data cache implementation into the rls package (#5060)

This commit is contained in:
Easwar Swaminathan 2021-12-22 13:50:56 -08:00 committed by GitHub
parent ec7cf6c977
commit b3d19efee6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 792 additions and 506 deletions

View File

@ -0,0 +1,404 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"container/list"
"time"
"google.golang.org/grpc/internal/backoff"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
)
// TODO(easwars): Remove this once all RLS code is merged.
//lint:file-ignore U1000 Ignore all unused code, not all code is merged yet.
// cacheKey represents the key used to uniquely identify an entry in the data
// cache and in the pending requests map.
type cacheKey struct {
// path is the full path of the incoming RPC request.
path string
// keys is a stringified version of the RLS request key map built using the
// RLS keyBuilder. Since maps are not a type which is comparable in Go, it
// cannot be part of the key for another map (entries in the data cache and
// pending requests map are stored in maps).
keys string
}
// cacheEntry wraps all the data to be stored in a data cache entry.
type cacheEntry struct {
// childPolicyWrappers contains the list of child policy wrappers
// corresponding to the targets returned by the RLS server for this entry.
childPolicyWrappers []*childPolicyWrapper
// headerData is received in the RLS response and is to be sent in the
// X-Google-RLS-Data header for matching RPCs.
headerData string
// expiryTime is the absolute time at which this cache entry entry stops
// being valid. When an RLS request succeeds, this is set to the current
// time plus the max_age field from the LB policy config.
expiryTime time.Time
// staleTime is the absolute time after which this cache entry will be
// proactively refreshed if an incoming RPC matches this entry. When an RLS
// request succeeds, this is set to the current time plus the stale_age from
// the LB policy config.
staleTime time.Time
// earliestEvictTime is the absolute time before which this entry should not
// be evicted from the cache. When a cache entry is created, this is set to
// the current time plus a default value of 5 seconds. This is required to
// make sure that a new entry added to the cache is not evicted before the
// RLS response arrives (usually when the cache is too small).
earliestEvictTime time.Time
// status stores the RPC status of the previous RLS request for this
// entry. Picks for entries with a non-nil value for this field are failed
// with the error stored here.
status error
// backoffState contains all backoff related state. When an RLS request
// succeeds, backoffState is reset. This state moves between the data cache
// and the pending requests map.
backoffState *backoffState
// backoffTime is the absolute time at which the backoff period for this
// entry ends. When an RLS request fails, this is set to the current time
// plus the backoff value returned by the backoffState. The backoff timer is
// also setup with this value. No new RLS requests are sent out for this
// entry until the backoff period ends.
//
// Set to zero time instant upon a successful RLS response.
backoffTime time.Time
// backoffExpiryTime is the absolute time at which an entry which has gone
// through backoff stops being valid. When an RLS request fails, this is
// set to the current time plus twice the backoff time. The cache expiry
// timer will only delete entries for which both expiryTime and
// backoffExpiryTime are in the past.
//
// Set to zero time instant upon a successful RLS response.
backoffExpiryTime time.Time
// size stores the size of this cache entry. Used to enforce the cache size
// specified in the LB policy configuration.
size int64
// onEvict is the callback to be invoked when this cache entry is evicted.
onEvict func()
}
// backoffState wraps all backoff related state associated with a cache entry.
type backoffState struct {
// retries keeps track of the number of RLS failures, to be able to
// determine the amount of time to backoff before the next attempt.
retries int
// bs is the exponential backoff implementation which returns the amount of
// time to backoff, given the number of retries.
bs backoff.Strategy
// timer fires when the backoff period ends and incoming requests after this
// will trigger a new RLS request.
timer *time.Timer
}
// lru is a cache implementation with a least recently used eviction policy.
// Internally it uses a doubly linked list, with the least recently used element
// at the front of the list and the most recently used element at the back of
// the list. The value stored in this cache will be of type `cacheKey`.
//
// It is not safe for concurrent access.
type lru struct {
ll *list.List
// A map from the value stored in the lru to its underlying list element is
// maintained to have a clean API. Without this, a subset of the lru's API
// would accept/return cacheKey while another subset would accept/return
// list elements.
m map[cacheKey]*list.Element
}
// newLRU creates a new cache with a least recently used eviction policy.
func newLRU() *lru {
return &lru{
ll: list.New(),
m: make(map[cacheKey]*list.Element),
}
}
func (l *lru) addEntry(key cacheKey) {
e := l.ll.PushBack(key)
l.m[key] = e
}
func (l *lru) makeRecent(key cacheKey) {
e := l.m[key]
l.ll.MoveToBack(e)
}
func (l *lru) removeEntry(key cacheKey) {
e := l.m[key]
l.ll.Remove(e)
delete(l.m, key)
}
func (l *lru) getLeastRecentlyUsed() cacheKey {
e := l.ll.Front()
if e == nil {
return cacheKey{}
}
return e.Value.(cacheKey)
}
// iterateAndRun traverses the lru in least-recently-used order and calls the
// provided function for every element.
//
// Callers may delete the cache entry associated with the cacheKey passed into
// f, but they may not perform any other operation which reorders the elements
// in the lru.
func (l *lru) iterateAndRun(f func(cacheKey)) {
var next *list.Element
for e := l.ll.Front(); e != nil; e = next {
next = e.Next()
f(e.Value.(cacheKey))
}
}
// dataCache contains a cache of RLS data used by the LB policy to make routing
// decisions.
//
// The dataCache will be keyed by the request's path and keys, represented by
// the `cacheKey` type. It will maintain the cache keys in an `lru` and the
// cache data, represented by the `cacheEntry` type, in a native map.
//
// It is not safe for concurrent access.
type dataCache struct {
maxSize int64 // Maximum allowed size.
currentSize int64 // Current size.
keys *lru // Cache keys maintained in lru order.
entries map[cacheKey]*cacheEntry
logger *internalgrpclog.PrefixLogger
shutdown *grpcsync.Event
}
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache {
return &dataCache{
maxSize: size,
keys: newLRU(),
entries: make(map[cacheKey]*cacheEntry),
logger: logger,
shutdown: grpcsync.NewEvent(),
}
}
// resize changes the maximum allowed size of the data cache.
//
// The return value indicates if an entry with a valid backoff timer was
// evicted. This is important to the RLS LB policy which would send a new picker
// on the channel to re-process any RPCs queued as a result of this backoff
// timer.
func (dc *dataCache) resize(size int64) (backoffCancelled bool) {
if dc.shutdown.HasFired() {
return false
}
backoffCancelled = false
for dc.currentSize > size {
key := dc.keys.getLeastRecentlyUsed()
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to resize it", key)
break
}
// When we encounter a cache entry whose minimum expiration time is in
// the future, we abort the LRU pass, which may temporarily leave the
// cache being too large. This is necessary to ensure that in cases
// where the cache is too small, when we receive an RLS Response, we
// keep the resulting cache entry around long enough for the pending
// incoming requests to be re-processed through the new Picker. If we
// didn't do this, then we'd risk throwing away each RLS response as we
// receive it, in which case we would fail to actually route any of our
// incoming requests.
if entry.earliestEvictTime.After(time.Now()) {
dc.logger.Warningf("cachekey %+v is too recent to be evicted. Stopping cache resizing for now", key)
break
}
// Stop the backoff timer before evicting the entry.
if entry.backoffState != nil && entry.backoffState.timer != nil {
if entry.backoffState.timer.Stop() {
entry.backoffState.timer = nil
backoffCancelled = true
}
}
dc.deleteAndcleanup(key, entry)
}
dc.maxSize = size
return backoffCancelled
}
// evictExpiredEntries sweeps through the cache and deletes expired entries. An
// expired entry is one for which both the `expiryTime` and `backoffExpiryTime`
// fields are in the past.
//
// The return value indicates if any expired entries were evicted.
//
// The LB policy invokes this method periodically to purge expired entries.
func (dc *dataCache) evictExpiredEntries() (evicted bool) {
if dc.shutdown.HasFired() {
return false
}
evicted = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}
// Only evict entries for which both the data expiration time and
// backoff expiration time fields are in the past.
now := time.Now()
if entry.expiryTime.After(now) || entry.backoffExpiryTime.After(now) {
return
}
evicted = true
dc.deleteAndcleanup(key, entry)
})
return evicted
}
// resetBackoffState sweeps through the cache and for entries with a backoff
// state, the backoff timer is cancelled and the backoff state is reset. The
// return value indicates if any entries were mutated in this fashion.
//
// The LB policy invokes this method when the control channel moves from READY
// to TRANSIENT_FAILURE back to READY. See `monitorConnectivityState` method on
// the `controlChannel` type for more details.
func (dc *dataCache) resetBackoffState(newBackoffState *backoffState) (backoffReset bool) {
if dc.shutdown.HasFired() {
return false
}
backoffReset = false
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while attempting to perform periodic cleanup of expired entries", key)
return
}
if entry.backoffState == nil {
return
}
if entry.backoffState.timer != nil {
entry.backoffState.timer.Stop()
entry.backoffState.timer = nil
}
entry.backoffState = &backoffState{bs: newBackoffState.bs}
entry.backoffTime = time.Time{}
entry.backoffExpiryTime = time.Time{}
backoffReset = true
})
return backoffReset
}
// addEntry adds a cache entry for the given key.
//
// Return value backoffCancelled indicates if a cache entry with a valid backoff
// timer was evicted to make space for the current entry. This is important to
// the RLS LB policy which would send a new picker on the channel to re-process
// any RPCs queued as a result of this backoff timer.
//
// Return value ok indicates if entry was successfully added to the cache.
func (dc *dataCache) addEntry(key cacheKey, entry *cacheEntry) (backoffCancelled bool, ok bool) {
if dc.shutdown.HasFired() {
return false, false
}
// Handle the extremely unlikely case that a single entry is bigger than the
// size of the cache.
if entry.size > dc.maxSize {
return false, false
}
dc.entries[key] = entry
dc.currentSize += entry.size
dc.keys.addEntry(key)
// If the new entry makes the cache go over its configured size, remove some
// old entries.
if dc.currentSize > dc.maxSize {
backoffCancelled = dc.resize(dc.maxSize)
}
return backoffCancelled, true
}
// updateEntrySize updates the size of a cache entry and the current size of the
// data cache. An entry's size can change upon receipt of an RLS response.
func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) {
dc.currentSize -= entry.size
entry.size = newSize
dc.currentSize += entry.size
}
func (dc *dataCache) getEntry(key cacheKey) *cacheEntry {
if dc.shutdown.HasFired() {
return nil
}
entry, ok := dc.entries[key]
if !ok {
return nil
}
dc.keys.makeRecent(key)
return entry
}
func (dc *dataCache) removeEntryForTesting(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
return
}
dc.deleteAndcleanup(key, entry)
}
// deleteAndCleanup performs actions required at the time of deleting an entry
// from the data cache.
// - the entry is removed from the map of entries
// - current size of the data cache is update
// - the key is removed from the LRU
// - onEvict is invoked in a separate goroutine
func (dc *dataCache) deleteAndcleanup(key cacheKey, entry *cacheEntry) {
delete(dc.entries, key)
dc.currentSize -= entry.size
dc.keys.removeEntry(key)
if entry.onEvict != nil {
go entry.onEvict()
}
}
func (dc *dataCache) stop() {
dc.keys.iterateAndRun(func(key cacheKey) {
entry, ok := dc.entries[key]
if !ok {
// This should never happen.
dc.logger.Errorf("cacheKey %+v not found in the cache while shutting down", key)
return
}
dc.deleteAndcleanup(key, entry)
})
dc.shutdown.Fire()
}

View File

@ -1,244 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package cache provides an LRU cache implementation to be used by the RLS LB
// policy to cache RLS response data.
package cache
import (
"container/list"
"sync"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
)
var logger = grpclog.Component("rls")
// Key represents the cache key used to uniquely identify a cache entry.
type Key struct {
// Path is the full path of the incoming RPC request.
Path string
// KeyMap is a stringified version of the RLS request keys built using the
// RLS keyBuilder. Since map is not a Type which is comparable in Go, it
// cannot be part of the key for another map (the LRU cache is implemented
// using a native map type).
KeyMap string
}
// Entry wraps all the data to be stored in a cache entry.
type Entry struct {
// Mu synchronizes access to this particular cache entry. The LB policy
// will also hold another mutex to synchronize access to the cache as a
// whole. To avoid holding the top-level mutex for the whole duration for
// which one particular cache entry is acted upon, we use this entry mutex.
Mu sync.Mutex
// ExpiryTime is the absolute time at which the data cached as part of this
// entry stops being valid. When an RLS request succeeds, this is set to
// the current time plus the max_age field from the LB policy config. An
// entry with this field in the past is not used to process picks.
ExpiryTime time.Time
// BackoffExpiryTime is the absolute time at which an entry which has gone
// through backoff stops being valid. When an RLS request fails, this is
// set to the current time plus twice the backoff time. The cache expiry
// timer will only delete entries for which both ExpiryTime and
// BackoffExpiryTime are in the past.
BackoffExpiryTime time.Time
// StaleTime is the absolute time after which this entry will be
// proactively refreshed if we receive a request for it. When an RLS
// request succeeds, this is set to the current time plus the stale_age
// from the LB policy config.
StaleTime time.Time
// BackoffTime is the absolute time at which the backoff period for this
// entry ends. The backoff timer is setup with this value. No new RLS
// requests are sent out for this entry until the backoff period ends.
BackoffTime time.Time
// EarliestEvictTime is the absolute time before which this entry should
// not be evicted from the cache. This is set to a default value of 5
// seconds when the entry is created. This is required to make sure that a
// new entry added to the cache is not evicted before the RLS response
// arrives (usually when the cache is too small).
EarliestEvictTime time.Time
// CallStatus stores the RPC status of the previous RLS request for this
// entry. Picks for entries with a non-nil value for this field are failed
// with the error stored here.
CallStatus error
// Backoff contains all backoff related state. When an RLS request
// succeeds, backoff state is reset.
Backoff BackoffState
// HeaderData is received in an RLS response and is to be sent in the
// X-Google-RLS-Data header for matching RPCs.
HeaderData string
// ChildPicker is a very thin wrapper around the child policy wrapper.
// The type is declared as a Picker interface since the users of
// the cache only care about the picker provided by the child policy, and
// this makes it easy for testing.
ChildPicker balancer.Picker
// size stores the size of this cache entry. Uses only a subset of the
// fields. See `entrySize` for this is computed.
size int64
// key contains the cache key corresponding to this entry. This is required
// from methods like `removeElement` which only have a pointer to the
// list.Element which contains a reference to the cache.Entry. But these
// methods need the cache.Key to be able to remove the entry from the
// underlying map.
key Key
}
// BackoffState wraps all backoff related state associated with a cache entry.
type BackoffState struct {
// Retries keeps track of the number of RLS failures, to be able to
// determine the amount of time to backoff before the next attempt.
Retries int
// Backoff is an exponential backoff implementation which returns the
// amount of time to backoff, given the number of retries.
Backoff backoff.Strategy
// Timer fires when the backoff period ends and incoming requests after
// this will trigger a new RLS request.
Timer *time.Timer
// Callback provided by the LB policy to be notified when the backoff timer
// expires. This will trigger a new picker to be returned to the
// ClientConn, to force queued up RPCs to be retried.
Callback func()
}
// LRU is a cache with a least recently used eviction policy. It is not safe
// for concurrent access.
type LRU struct {
maxSize int64
usedSize int64
onEvicted func(Key, *Entry)
ll *list.List
cache map[Key]*list.Element
}
// NewLRU creates a cache.LRU with a size limit of maxSize and the provided
// eviction callback.
//
// Currently, only the cache.Key and the HeaderData field from cache.Entry
// count towards the size of the cache (other overhead per cache entry is not
// counted). The cache could temporarily exceed the configured maxSize because
// we want the entries to spend a configured minimum amount of time in the
// cache before they are LRU evicted (so that all the work performed in sending
// an RLS request and caching the response is not a total waste).
//
// The provided onEvited callback must not attempt to re-add the entry inline
// and the RLS LB policy does not have a need to do that.
//
// The cache package trusts the RLS policy (its only user) to supply a default
// minimum non-zero maxSize, in the event that the ServiceConfig does not
// provide a value for it.
func NewLRU(maxSize int64, onEvicted func(Key, *Entry)) *LRU {
return &LRU{
maxSize: maxSize,
onEvicted: onEvicted,
ll: list.New(),
cache: make(map[Key]*list.Element),
}
}
// Resize sets the size limit of the LRU to newMaxSize and removes older
// entries, if required, to comply with the new limit.
func (lru *LRU) Resize(newMaxSize int64) {
lru.maxSize = newMaxSize
lru.removeToFit(0)
}
// TODO(easwars): If required, make this function more sophisticated.
func entrySize(key Key, value *Entry) int64 {
return int64(len(key.Path) + len(key.KeyMap) + len(value.HeaderData))
}
// removeToFit removes older entries from the cache to make room for a new
// entry of size newSize.
func (lru *LRU) removeToFit(newSize int64) {
now := time.Now()
for lru.usedSize+newSize > lru.maxSize {
elem := lru.ll.Back()
if elem == nil {
// This is a corner case where the cache is empty, but the new entry
// to be added is bigger than maxSize.
logger.Info("rls: newly added cache entry exceeds cache maxSize")
return
}
entry := elem.Value.(*Entry)
if t := entry.EarliestEvictTime; !t.IsZero() && t.Before(now) {
// When the oldest entry is too new (it hasn't even spent a default
// minimum amount of time in the cache), we abort and allow the
// cache to grow bigger than the configured maxSize.
logger.Info("rls: LRU eviction finds oldest entry to be too new. Allowing cache to exceed maxSize momentarily")
return
}
lru.removeElement(elem)
}
}
// Add adds a new entry to the cache.
func (lru *LRU) Add(key Key, value *Entry) {
size := entrySize(key, value)
elem, ok := lru.cache[key]
if !ok {
lru.removeToFit(size)
lru.usedSize += size
value.size = size
value.key = key
elem := lru.ll.PushFront(value)
lru.cache[key] = elem
return
}
existing := elem.Value.(*Entry)
sizeDiff := size - existing.size
lru.removeToFit(sizeDiff)
value.size = size
elem.Value = value
lru.ll.MoveToFront(elem)
lru.usedSize += sizeDiff
}
// Remove removes a cache entry wth key key, if one exists.
func (lru *LRU) Remove(key Key) {
if elem, ok := lru.cache[key]; ok {
lru.removeElement(elem)
}
}
func (lru *LRU) removeElement(e *list.Element) {
entry := e.Value.(*Entry)
lru.ll.Remove(e)
delete(lru.cache, entry.key)
lru.usedSize -= entry.size
if lru.onEvicted != nil {
lru.onEvicted(entry.key, entry)
}
}
// Get returns a cache entry with key key.
func (lru *LRU) Get(key Key) *Entry {
elem, ok := lru.cache[key]
if !ok {
return nil
}
lru.ll.MoveToFront(elem)
return elem.Value.(*Entry)
}

View File

@ -1,262 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package cache
import (
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
)
const (
defaultTestCacheSize = 5
defaultTestCacheMaxSize = 1000000
defaultTestTimeout = 1 * time.Second
)
// TestGet verifies the Add and Get methods of cache.LRU.
func TestGet(t *testing.T) {
key1 := Key{Path: "/service1/method1", KeyMap: "k1=v1,k2=v2"}
key2 := Key{Path: "/service2/method2", KeyMap: "k1=v1,k2=v2"}
val1 := Entry{HeaderData: "h1=v1"}
val2 := Entry{HeaderData: "h2=v2"}
tests := []struct {
desc string
keysToAdd []Key
valsToAdd []*Entry
keyToGet Key
wantEntry *Entry
}{
{
desc: "Empty cache",
keyToGet: Key{},
},
{
desc: "Single entry miss",
keysToAdd: []Key{key1},
valsToAdd: []*Entry{&val1},
keyToGet: Key{},
},
{
desc: "Single entry hit",
keysToAdd: []Key{key1},
valsToAdd: []*Entry{&val1},
keyToGet: key1,
wantEntry: &val1,
},
{
desc: "Multi entry miss",
keysToAdd: []Key{key1, key2},
valsToAdd: []*Entry{&val1, &val2},
keyToGet: Key{},
},
{
desc: "Multi entry hit",
keysToAdd: []Key{key1, key2},
valsToAdd: []*Entry{&val1, &val2},
keyToGet: key1,
wantEntry: &val1,
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
lru := NewLRU(defaultTestCacheMaxSize, nil)
for i, key := range test.keysToAdd {
lru.Add(key, test.valsToAdd[i])
}
opts := []cmp.Option{
cmpopts.IgnoreInterfaces(struct{ sync.Locker }{}),
cmpopts.IgnoreUnexported(Entry{}),
}
if gotEntry := lru.Get(test.keyToGet); !cmp.Equal(gotEntry, test.wantEntry, opts...) {
t.Errorf("lru.Get(%+v) = %+v, want %+v", test.keyToGet, gotEntry, test.wantEntry)
}
})
}
}
// TestRemove verifies the Add and Remove methods of cache.LRU.
func TestRemove(t *testing.T) {
keys := []Key{
{Path: "/service1/method1", KeyMap: "k1=v1,k2=v2"},
{Path: "/service2/method2", KeyMap: "k1=v1,k2=v2"},
{Path: "/service3/method3", KeyMap: "k1=v1,k2=v2"},
}
lru := NewLRU(defaultTestCacheMaxSize, nil)
for _, k := range keys {
lru.Add(k, &Entry{})
}
for _, k := range keys {
lru.Remove(k)
if entry := lru.Get(k); entry != nil {
t.Fatalf("lru.Get(%+v) after a call to lru.Remove succeeds, should have failed", k)
}
}
}
// TestExceedingSizeCausesEviction verifies the case where adding a new entry
// to the cache leads to eviction of old entries to make space for the new one.
func TestExceedingSizeCausesEviction(t *testing.T) {
evictCh := make(chan Key, defaultTestCacheSize)
onEvicted := func(k Key, _ *Entry) {
t.Logf("evicted key {%+v} from cache", k)
evictCh <- k
}
keysToFill := []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}}
keysCausingEviction := []Key{{Path: "f"}, {Path: "g"}, {Path: "h"}, {Path: "i"}, {Path: "j"}}
lru := NewLRU(defaultTestCacheSize, onEvicted)
for _, key := range keysToFill {
lru.Add(key, &Entry{})
}
for i, key := range keysCausingEviction {
lru.Add(key, &Entry{})
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Test timeout waiting for eviction")
case k := <-evictCh:
timer.Stop()
if !cmp.Equal(k, keysToFill[i]) {
t.Fatalf("Evicted key %+v, wanted %+v", k, keysToFill[i])
}
}
}
}
// TestAddCausesMultipleEvictions verifies the case where adding one new entry
// causes the eviction of multiple old entries to make space for the new one.
func TestAddCausesMultipleEvictions(t *testing.T) {
evictCh := make(chan Key, defaultTestCacheSize)
onEvicted := func(k Key, _ *Entry) {
evictCh <- k
}
keysToFill := []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}}
keyCausingEviction := Key{Path: "abcde"}
lru := NewLRU(defaultTestCacheSize, onEvicted)
for _, key := range keysToFill {
lru.Add(key, &Entry{})
}
lru.Add(keyCausingEviction, &Entry{})
for i := range keysToFill {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Test timeout waiting for eviction")
case k := <-evictCh:
timer.Stop()
if !cmp.Equal(k, keysToFill[i]) {
t.Fatalf("Evicted key %+v, wanted %+v", k, keysToFill[i])
}
}
}
}
// TestModifyCausesMultipleEvictions verifies the case where mofiying an
// existing entry to increase its size leads to the eviction of older entries
// to make space for the new one.
func TestModifyCausesMultipleEvictions(t *testing.T) {
evictCh := make(chan Key, defaultTestCacheSize)
onEvicted := func(k Key, _ *Entry) {
evictCh <- k
}
keysToFill := []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}}
lru := NewLRU(defaultTestCacheSize, onEvicted)
for _, key := range keysToFill {
lru.Add(key, &Entry{})
}
lru.Add(keysToFill[len(keysToFill)-1], &Entry{HeaderData: "xxxx"})
for i := range keysToFill[:len(keysToFill)-1] {
timer := time.NewTimer(defaultTestTimeout)
select {
case <-timer.C:
t.Fatal("Test timeout waiting for eviction")
case k := <-evictCh:
timer.Stop()
if !cmp.Equal(k, keysToFill[i]) {
t.Fatalf("Evicted key %+v, wanted %+v", k, keysToFill[i])
}
}
}
}
func TestLRUResize(t *testing.T) {
tests := []struct {
desc string
maxSize int64
keysToFill []Key
newMaxSize int64
wantEvictedKeys []Key
}{
{
desc: "resize causes multiple evictions",
maxSize: 5,
keysToFill: []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}},
newMaxSize: 3,
wantEvictedKeys: []Key{{Path: "a"}, {Path: "b"}},
},
{
desc: "resize causes no evictions",
maxSize: 50,
keysToFill: []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}},
newMaxSize: 10,
wantEvictedKeys: []Key{},
},
{
desc: "resize to higher value",
maxSize: 5,
keysToFill: []Key{{Path: "a"}, {Path: "b"}, {Path: "c"}, {Path: "d"}, {Path: "e"}},
newMaxSize: 10,
wantEvictedKeys: []Key{},
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
var evictedKeys []Key
onEvicted := func(k Key, _ *Entry) {
evictedKeys = append(evictedKeys, k)
}
lru := NewLRU(test.maxSize, onEvicted)
for _, key := range test.keysToFill {
lru.Add(key, &Entry{})
}
lru.Resize(test.newMaxSize)
if !cmp.Equal(evictedKeys, test.wantEvictedKeys, cmpopts.EquateEmpty()) {
t.Fatalf("lru.Resize evicted keys {%v}, should have evicted {%v}", evictedKeys, test.wantEvictedKeys)
}
})
}
}

View File

@ -0,0 +1,276 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/internal/backoff"
)
var (
cacheKeys = []cacheKey{
{path: "0", keys: "a"},
{path: "1", keys: "b"},
{path: "2", keys: "c"},
{path: "3", keys: "d"},
{path: "4", keys: "e"},
}
longDuration = 10 * time.Minute
shortDuration = 1 * time.Millisecond
cacheEntries []*cacheEntry
)
func initCacheEntries() {
// All entries have a dummy size of 1 to simplify resize operations.
cacheEntries = []*cacheEntry{
{
// Entry is valid and minimum expiry time has not expired.
expiryTime: time.Now().Add(longDuration),
earliestEvictTime: time.Now().Add(longDuration),
size: 1,
},
{
// Entry is valid and is in backoff.
expiryTime: time.Now().Add(longDuration),
backoffTime: time.Now().Add(longDuration),
backoffState: &backoffState{timer: time.NewTimer(longDuration)},
size: 1,
},
{
// Entry is valid, and not in backoff.
expiryTime: time.Now().Add(longDuration),
size: 1,
},
{
// Entry is invalid.
expiryTime: time.Time{}.Add(shortDuration),
size: 1,
},
{
// Entry is invalid valid and backoff has expired.
expiryTime: time.Time{}.Add(shortDuration),
backoffExpiryTime: time.Time{}.Add(shortDuration),
size: 1,
},
}
}
func (s) TestLRU_BasicOperations(t *testing.T) {
initCacheEntries()
// Create an LRU and add some entries to it.
lru := newLRU()
for _, k := range cacheKeys {
lru.addEntry(k)
}
// Get the least recent entry. This should be the first entry we added.
if got, want := lru.getLeastRecentlyUsed(), cacheKeys[0]; got != want {
t.Fatalf("lru.getLeastRecentlyUsed() = %v, want %v", got, want)
}
// Iterate through the slice of keys we added earlier, making them the most
// recent entry, one at a time. The least recent entry at that point should
// be the next entry from our slice of keys.
for i, k := range cacheKeys {
lru.makeRecent(k)
lruIndex := (i + 1) % len(cacheKeys)
if got, want := lru.getLeastRecentlyUsed(), cacheKeys[lruIndex]; got != want {
t.Fatalf("lru.getLeastRecentlyUsed() = %v, want %v", got, want)
}
}
// Iterate through the slice of keys we added earlier, removing them one at
// a time The least recent entry at that point should be the next entry from
// our slice of keys, except for the last one because the lru will be empty.
for i, k := range cacheKeys {
lru.removeEntry(k)
var want cacheKey
if i < len(cacheKeys)-1 {
want = cacheKeys[i+1]
}
if got := lru.getLeastRecentlyUsed(); got != want {
t.Fatalf("lru.getLeastRecentlyUsed() = %v, want %v", got, want)
}
}
}
func (s) TestLRU_IterateAndRun(t *testing.T) {
initCacheEntries()
// Create an LRU and add some entries to it.
lru := newLRU()
for _, k := range cacheKeys {
lru.addEntry(k)
}
// Iterate through the lru to make sure that entries are returned in the
// least recently used order.
var gotKeys []cacheKey
lru.iterateAndRun(func(key cacheKey) {
gotKeys = append(gotKeys, key)
})
if !cmp.Equal(gotKeys, cacheKeys, cmp.AllowUnexported(cacheKey{})) {
t.Fatalf("lru.iterateAndRun returned %v, want %v", gotKeys, cacheKeys)
}
// Make sure that removing entries from the lru while iterating through it
// is a safe operation.
lru.iterateAndRun(func(key cacheKey) {
lru.removeEntry(key)
})
// Check the lru internals to make sure we freed up all the memory.
if len := lru.ll.Len(); len != 0 {
t.Fatalf("Number of entries in the lru's underlying list is %d, want 0", len)
}
if len := len(lru.m); len != 0 {
t.Fatalf("Number of entries in the lru's underlying map is %d, want 0", len)
}
}
func (s) TestDataCache_BasicOperations(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
for i, k := range cacheKeys {
entry := dc.getEntry(k)
if !cmp.Equal(entry, cacheEntries[i], cmp.AllowUnexported(cacheEntry{}, backoffState{}), cmpopts.IgnoreUnexported(time.Timer{})) {
t.Fatalf("Data cache lookup for key %v returned entry %v, want %v", k, entry, cacheEntries[i])
}
}
}
func (s) TestDataCache_AddForcesResize(t *testing.T) {
initCacheEntries()
dc := newDataCache(1, nil)
// The first entry in cacheEntries has a minimum expiry time in the future.
// This entry would stop the resize operation since we do not evict entries
// whose minimum expiration time is in the future. So, we do not use that
// entry in this test. The entry being added has a running backoff timer.
evicted, ok := dc.addEntry(cacheKeys[1], cacheEntries[1])
if evicted || !ok {
t.Fatalf("dataCache.addEntry() returned (%v, %v) want (false, true)", evicted, ok)
}
// Add another entry leading to the eviction of the above entry which has a
// running backoff timer. The first return value is expected to be true.
backoffCancelled, ok := dc.addEntry(cacheKeys[2], cacheEntries[2])
if !backoffCancelled || !ok {
t.Fatalf("dataCache.addEntry() returned (%v, %v) want (true, true)", backoffCancelled, ok)
}
// Add another entry leading to the eviction of the above entry which does not
// have a running backoff timer. This should evict the above entry, but the
// first return value is expected to be false.
backoffCancelled, ok = dc.addEntry(cacheKeys[3], cacheEntries[3])
if backoffCancelled || !ok {
t.Fatalf("dataCache.addEntry() returned (%v, %v) want (false, true)", backoffCancelled, ok)
}
}
func (s) TestDataCache_Resize(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
// The first cache entry (with a key of cacheKeys[0]) that we added has an
// earliestEvictTime in the future. As part of the resize operation, we
// traverse the cache in least recently used order, and this will be first
// entry that we will encounter. And since the earliestEvictTime is in the
// future, the resize operation will stop, leaving the cache bigger than
// what was asked for.
if dc.resize(1) {
t.Fatalf("dataCache.resize() returned true, want false")
}
if dc.currentSize != 5 {
t.Fatalf("dataCache.size is %d, want 5", dc.currentSize)
}
// Remove the entry with earliestEvictTime in the future and retry the
// resize operation.
dc.removeEntryForTesting(cacheKeys[0])
if !dc.resize(1) {
t.Fatalf("dataCache.resize() returned false, want true")
}
if dc.currentSize != 1 {
t.Fatalf("dataCache.size is %d, want 1", dc.currentSize)
}
}
func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
initCacheEntries()
dc := newDataCache(5, nil)
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
// The last two entries in the cacheEntries list have expired, and will be
// evicted. The first three should still remain in the cache.
if !dc.evictExpiredEntries() {
t.Fatal("dataCache.evictExpiredEntries() returned false, want true")
}
if dc.currentSize != 3 {
t.Fatalf("dataCache.size is %d, want 3", dc.currentSize)
}
for i := 0; i < 3; i++ {
entry := dc.getEntry(cacheKeys[i])
if !cmp.Equal(entry, cacheEntries[i], cmp.AllowUnexported(cacheEntry{}, backoffState{}), cmpopts.IgnoreUnexported(time.Timer{})) {
t.Fatalf("Data cache lookup for key %v returned entry %v, want %v", cacheKeys[i], entry, cacheEntries[i])
}
}
}
func (s) TestDataCache_ResetBackoffState(t *testing.T) {
type fakeBackoff struct {
backoff.Strategy
}
initCacheEntries()
dc := newDataCache(5, nil)
for i, k := range cacheKeys {
dc.addEntry(k, cacheEntries[i])
}
newBackoffState := &backoffState{bs: &fakeBackoff{}}
if updatePicker := dc.resetBackoffState(newBackoffState); !updatePicker {
t.Fatal("dataCache.resetBackoffState() returned updatePicker is false, want true")
}
// Make sure that the entry with no backoff state was not touched.
if entry := dc.getEntry(cacheKeys[0]); cmp.Equal(entry.backoffState, newBackoffState, cmp.AllowUnexported(backoffState{})) {
t.Fatal("dataCache.resetBackoffState() touched entries without a valid backoffState")
}
// Make sure that the entry with a valid backoff state was reset.
entry := dc.getEntry(cacheKeys[1])
if diff := cmp.Diff(entry.backoffState, newBackoffState, cmp.AllowUnexported(backoffState{})); diff != "" {
t.Fatalf("unexpected diff in backoffState for cache entry after dataCache.resetBackoffState(): %s", diff)
}
}

View File

@ -0,0 +1,112 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package rls
import (
"fmt"
"sync/atomic"
"unsafe"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
// TODO(easwars): Remove this once all RLS code is merged.
//lint:file-ignore U1000 Ignore all unused code, not all code is merged yet.
// childPolicyWrapper is a reference counted wrapper around a child policy.
//
// The LB policy maintains a map of these wrappers keyed by the target returned
// by RLS. When a target is seen for the first time, a child policy wrapper is
// created for it and the wrapper is added to the child policy map. Each entry
// in the data cache holds references to the corresponding child policy
// wrappers. The LB policy also holds a reference to the child policy wrapper
// for the default target specified in the LB Policy Configuration
//
// When a cache entry is evicted, it releases references to the child policy
// wrappers that it contains. When all references have been released, the
// wrapper is removed from the child policy map and is destroyed.
//
// The child policy wrapper also caches the connectivity state and most recent
// picker from the child policy. Once the child policy wrapper reports
// TRANSIENT_FAILURE, it will continue reporting that state until it goes READY;
// transitions from TRANSIENT_FAILURE to CONNECTING are ignored.
//
// Whenever a child policy wrapper changes its connectivity state, the LB policy
// returns a new picker to the channel, since the channel may need to re-process
// the picks for queued RPCs.
//
// It is not safe for concurrent access.
type childPolicyWrapper struct {
logger *internalgrpclog.PrefixLogger
target string // RLS target corresponding to this child policy.
refCnt int // Reference count.
// Balancer state reported by the child policy. The RLS LB policy maintains
// these child policies in a BalancerGroup. The state reported by the child
// policy is pushed to the state aggregator (which is also implemented by the
// RLS LB policy) and cached here. See handleChildPolicyStateUpdate() for
// details on how the state aggregation is performed.
//
// While this field is written to by the LB policy, it is read by the picker
// at Pick time. Making this an atomic to enable the picker to read this value
// without a mutex.
state unsafe.Pointer // *balancer.State
}
// newChildPolicyWrapper creates a child policy wrapper for the given target,
// and is initialized with one reference and starts off in CONNECTING state.
func newChildPolicyWrapper(target string) *childPolicyWrapper {
c := &childPolicyWrapper{
target: target,
refCnt: 1,
state: unsafe.Pointer(&balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}),
}
c.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-child-policy-wrapper %s %p] ", c.target, c))
c.logger.Infof("Created")
return c
}
// acquireRef increments the reference count on the child policy wrapper.
func (c *childPolicyWrapper) acquireRef() {
c.refCnt++
}
// releaseRef decrements the reference count on the child policy wrapper. The
// return value indicates whether the released reference was the last one.
func (c *childPolicyWrapper) releaseRef() bool {
c.refCnt--
return c.refCnt == 0
}
// lamify causes the child policy wrapper to return a picker which will always
// fail requests. This is used when the wrapper runs into errors when trying to
// build and parse the child policy configuration.
func (c *childPolicyWrapper) lamify(err error) {
c.logger.Warningf("Entering lame mode: %v", err)
atomic.StorePointer(&c.state, unsafe.Pointer(&balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: base.NewErrPicker(err),
}))
}