mirror of https://github.com/grpc/grpc-go.git
balancer/rls: Add cache metrics (#7495)
This commit is contained in:
parent
c8951abc16
commit
9706bf8035
|
@ -79,6 +79,20 @@ var (
|
|||
dataCachePurgeHook = func() {}
|
||||
resetBackoffHook = func() {}
|
||||
|
||||
cacheEntriesMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
|
||||
Name: "grpc.lb.rls.cache_entries",
|
||||
Description: "EXPERIMENTAL. Number of entries in the RLS cache.",
|
||||
Unit: "entry",
|
||||
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
|
||||
Default: false,
|
||||
})
|
||||
cacheSizeMetric = estats.RegisterInt64Gauge(estats.MetricDescriptor{
|
||||
Name: "grpc.lb.rls.cache_size",
|
||||
Description: "EXPERIMENTAL. The current size of the RLS cache.",
|
||||
Unit: "By",
|
||||
Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.instance_uuid"},
|
||||
Default: false,
|
||||
})
|
||||
defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{
|
||||
Name: "grpc.lb.rls.default_target_picks",
|
||||
Description: "EXPERIMENTAL. Number of LB picks sent to the default target.",
|
||||
|
@ -126,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
|
|||
updateCh: buffer.NewUnbounded(),
|
||||
}
|
||||
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
|
||||
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
|
||||
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String())
|
||||
lb.bg = balancergroup.New(balancergroup.Options{
|
||||
CC: cc,
|
||||
BuildOpts: opts,
|
||||
|
@ -317,18 +331,17 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
|
|||
b.stateMu.Unlock()
|
||||
<-done
|
||||
|
||||
// We cannot do cache operations above because `cacheMu` needs to be grabbed
|
||||
// before `stateMu` if we are to hold both locks at the same time.
|
||||
b.cacheMu.Lock()
|
||||
b.dataCache.updateRLSServerTarget(newCfg.lookupService)
|
||||
if resizeCache {
|
||||
// If the new config changes reduces the size of the data cache, we
|
||||
// might have to evict entries to get the cache size down to the newly
|
||||
// specified size.
|
||||
//
|
||||
// And we cannot do this operation above (where we compute the
|
||||
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
|
||||
// `stateMu` if we are to hold both locks at the same time.
|
||||
b.cacheMu.Lock()
|
||||
b.dataCache.resize(newCfg.cacheSizeBytes)
|
||||
b.cacheMu.Unlock()
|
||||
}
|
||||
b.cacheMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,8 @@ import (
|
|||
"container/list"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
estats "google.golang.org/grpc/experimental/stats"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
internalgrpclog "google.golang.org/grpc/internal/grpclog"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
|
@ -163,22 +165,40 @@ func (l *lru) getLeastRecentlyUsed() cacheKey {
|
|||
//
|
||||
// It is not safe for concurrent access.
|
||||
type dataCache struct {
|
||||
maxSize int64 // Maximum allowed size.
|
||||
currentSize int64 // Current size.
|
||||
keys *lru // Cache keys maintained in lru order.
|
||||
entries map[cacheKey]*cacheEntry
|
||||
logger *internalgrpclog.PrefixLogger
|
||||
shutdown *grpcsync.Event
|
||||
maxSize int64 // Maximum allowed size.
|
||||
currentSize int64 // Current size.
|
||||
keys *lru // Cache keys maintained in lru order.
|
||||
entries map[cacheKey]*cacheEntry
|
||||
logger *internalgrpclog.PrefixLogger
|
||||
shutdown *grpcsync.Event
|
||||
rlsServerTarget string
|
||||
|
||||
// Read only after initialization.
|
||||
grpcTarget string
|
||||
uuid string
|
||||
metricsRecorder estats.MetricsRecorder
|
||||
}
|
||||
|
||||
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger) *dataCache {
|
||||
return &dataCache{
|
||||
maxSize: size,
|
||||
keys: newLRU(),
|
||||
entries: make(map[cacheKey]*cacheEntry),
|
||||
logger: logger,
|
||||
shutdown: grpcsync.NewEvent(),
|
||||
func newDataCache(size int64, logger *internalgrpclog.PrefixLogger, metricsRecorder estats.MetricsRecorder, grpcTarget string) *dataCache {
|
||||
dc := &dataCache{
|
||||
maxSize: size,
|
||||
keys: newLRU(),
|
||||
entries: make(map[cacheKey]*cacheEntry),
|
||||
logger: logger,
|
||||
shutdown: grpcsync.NewEvent(),
|
||||
grpcTarget: grpcTarget,
|
||||
uuid: uuid.New().String(),
|
||||
metricsRecorder: metricsRecorder,
|
||||
}
|
||||
cacheSizeMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid)
|
||||
cacheEntriesMetric.Record(dc.metricsRecorder, 0, grpcTarget, "", dc.uuid)
|
||||
return dc
|
||||
}
|
||||
|
||||
// updateRLSServerTarget updates the RLS Server Target the RLS Balancer is
|
||||
// configured with.
|
||||
func (dc *dataCache) updateRLSServerTarget(rlsServerTarget string) {
|
||||
dc.rlsServerTarget = rlsServerTarget
|
||||
}
|
||||
|
||||
// resize changes the maximum allowed size of the data cache.
|
||||
|
@ -319,6 +339,7 @@ func (dc *dataCache) updateEntrySize(entry *cacheEntry, newSize int64) {
|
|||
dc.currentSize -= entry.size
|
||||
entry.size = newSize
|
||||
dc.currentSize += entry.size
|
||||
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
|
||||
}
|
||||
|
||||
func (dc *dataCache) getEntry(key cacheKey) *cacheEntry {
|
||||
|
@ -351,6 +372,8 @@ func (dc *dataCache) deleteAndCleanup(key cacheKey, entry *cacheEntry) {
|
|||
delete(dc.entries, key)
|
||||
dc.currentSize -= entry.size
|
||||
dc.keys.removeEntry(key)
|
||||
cacheSizeMetric.Record(dc.metricsRecorder, dc.currentSize, dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
|
||||
cacheEntriesMetric.Record(dc.metricsRecorder, int64(len(dc.entries)), dc.grpcTarget, dc.rlsServerTarget, dc.uuid)
|
||||
}
|
||||
|
||||
func (dc *dataCache) stop() {
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/testutils/stats"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -119,7 +120,7 @@ func (s) TestLRU_BasicOperations(t *testing.T) {
|
|||
|
||||
func (s) TestDataCache_BasicOperations(t *testing.T) {
|
||||
initCacheEntries()
|
||||
dc := newDataCache(5, nil)
|
||||
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
|
||||
for i, k := range cacheKeys {
|
||||
dc.addEntry(k, cacheEntries[i])
|
||||
}
|
||||
|
@ -133,7 +134,7 @@ func (s) TestDataCache_BasicOperations(t *testing.T) {
|
|||
|
||||
func (s) TestDataCache_AddForcesResize(t *testing.T) {
|
||||
initCacheEntries()
|
||||
dc := newDataCache(1, nil)
|
||||
dc := newDataCache(1, nil, &stats.NoopMetricsRecorder{}, "")
|
||||
|
||||
// The first entry in cacheEntries has a minimum expiry time in the future.
|
||||
// This entry would stop the resize operation since we do not evict entries
|
||||
|
@ -162,7 +163,7 @@ func (s) TestDataCache_AddForcesResize(t *testing.T) {
|
|||
|
||||
func (s) TestDataCache_Resize(t *testing.T) {
|
||||
initCacheEntries()
|
||||
dc := newDataCache(5, nil)
|
||||
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
|
||||
for i, k := range cacheKeys {
|
||||
dc.addEntry(k, cacheEntries[i])
|
||||
}
|
||||
|
@ -193,7 +194,7 @@ func (s) TestDataCache_Resize(t *testing.T) {
|
|||
|
||||
func (s) TestDataCache_EvictExpiredEntries(t *testing.T) {
|
||||
initCacheEntries()
|
||||
dc := newDataCache(5, nil)
|
||||
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
|
||||
for i, k := range cacheKeys {
|
||||
dc.addEntry(k, cacheEntries[i])
|
||||
}
|
||||
|
@ -220,7 +221,7 @@ func (s) TestDataCache_ResetBackoffState(t *testing.T) {
|
|||
}
|
||||
|
||||
initCacheEntries()
|
||||
dc := newDataCache(5, nil)
|
||||
dc := newDataCache(5, nil, &stats.NoopMetricsRecorder{}, "")
|
||||
for i, k := range cacheKeys {
|
||||
dc.addEntry(k, cacheEntries[i])
|
||||
}
|
||||
|
|
|
@ -242,3 +242,17 @@ func (r *TestMetricsRecorder) TagConn(ctx context.Context, _ *stats.ConnTagInfo)
|
|||
}
|
||||
|
||||
func (r *TestMetricsRecorder) HandleConn(context.Context, stats.ConnStats) {}
|
||||
|
||||
// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent
|
||||
// nil panics.
|
||||
type NoopMetricsRecorder struct{}
|
||||
|
||||
func (r *NoopMetricsRecorder) RecordInt64Count(*estats.Int64CountHandle, int64, ...string) {}
|
||||
|
||||
func (r *NoopMetricsRecorder) RecordFloat64Count(*estats.Float64CountHandle, float64, ...string) {}
|
||||
|
||||
func (r *NoopMetricsRecorder) RecordInt64Histo(*estats.Int64HistoHandle, int64, ...string) {}
|
||||
|
||||
func (r *NoopMetricsRecorder) RecordFloat64Histo(*estats.Float64HistoHandle, float64, ...string) {}
|
||||
|
||||
func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64, ...string) {}
|
||||
|
|
Loading…
Reference in New Issue