Rls spec sync (#9437)

rls: Update implementation to match spec.

* Cleanup cache if exceeds max size when add an entry. Make cache entry size calculations more accurate
* Trigger pending RPC processing if unexpired backoff entries were removed from the cache by triggering helper to call it's parent updateBalancingState with the same state and picker
* Introduce minimum time before eviction (5 seconds)
* Change default accept ratio for AdaptiveThrottler from 1.2 -> 2.0
* Configuration validation
* When checking key names for duplicates also look at headers
* Check extra keys for duplicates

See analysis of implementation versus spec at https://docs.google.com/spreadsheets/d/18w5s1TEebRumWzk1pvWnjiHFGKc6MW-vt8tRLY4eNs0/
This commit is contained in:
Larry Safran 2022-08-19 13:31:05 -07:00 committed by GitHub
parent 618a4de705
commit b66250e9e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 122 additions and 25 deletions

View File

@ -38,7 +38,7 @@ import javax.annotation.Nonnull;
* looks for an "rpc_behavior" field in its configuration and includes the value in the
* "rpc-behavior" metadata entry that is sent to the server. This will cause the test server to
* behave in a predefined way. Endpoint picking logic is delegated to the
* {@link RoundRobinLoadBalancer}.
* io.grpc.util.RoundRobinLoadBalancer.
*
* <p>Initial use case is to prove that a custom load balancer can be configured by the control
* plane via xDS. An interop test will configure this LB and then verify it has been correctly

View File

@ -44,7 +44,7 @@ final class AdaptiveThrottler implements Throttler {
private static final int DEFAULT_HISTORY_SECONDS = 30;
private static final int DEFAULT_REQUEST_PADDING = 8;
private static final float DEFAULT_RATIO_FOR_ACCEPT = 1.2f;
private static final float DEFAULT_RATIO_FOR_ACCEPT = 2.0f;
/**
* The duration of history of calls used by Adaptive Throttler.

View File

@ -81,12 +81,17 @@ final class CachingRlsLbClient {
REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
public static final int BYTES_PER_CHAR = 2;
public static final int STRING_OVERHEAD_BYTES = 38;
/** Minimum bytes for a Java Object. */
public static final int OBJ_OVERHEAD_B = 16;
// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
// LRU cache based on access order (BACKOFF and actual data will be here)
@GuardedBy("lock")
private final LinkedHashLruCache<RouteLookupRequest, CacheEntry> linkedHashLruCache;
private final RlsAsyncLruCache linkedHashLruCache;
// any RPC on the fly will cached in this map
@GuardedBy("lock")
private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
@ -287,12 +292,12 @@ final class CachingRlsLbClient {
try {
RouteLookupResponse response = asyncCall.get();
DataCacheEntry dataEntry = new DataCacheEntry(request, response);
linkedHashLruCache.cache(request, dataEntry);
linkedHashLruCache.cacheAndClean(request, dataEntry);
return CachedRouteLookupResponse.dataEntry(dataEntry);
} catch (Exception e) {
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cache(request, backoffEntry);
linkedHashLruCache.cacheAndClean(request, backoffEntry);
return CachedRouteLookupResponse.backoffEntry(backoffEntry);
}
}
@ -336,6 +341,10 @@ final class CachingRlsLbClient {
}
});
}
void triggerPendingRpcProcessing() {
super.updateBalancingState(state, picker);
}
}
/**
@ -488,14 +497,15 @@ final class CachingRlsLbClient {
ChannelLogLevel.DEBUG,
"Transition to data cache: routeLookupResponse={0}",
routeLookupResponse);
linkedHashLruCache.cache(request, new DataCacheEntry(request, routeLookupResponse));
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, routeLookupResponse));
}
}
private void transitionToBackOff(Status status) {
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
linkedHashLruCache.cache(request, new BackoffCacheEntry(request, status, backoffPolicy));
linkedHashLruCache.cacheAndClean(request,
new BackoffCacheEntry(request, status, backoffPolicy));
}
}
@ -525,11 +535,20 @@ final class CachingRlsLbClient {
abstract boolean isExpired(long now);
abstract void cleanup();
protected long getMinEvictionTime() {
return 0L;
}
protected void triggerPendingRpcProcessing() {
helper.triggerPendingRpcProcessing();
}
}
/** Implementation of {@link CacheEntry} contains valid data. */
final class DataCacheEntry extends CacheEntry {
private final RouteLookupResponse response;
private final long minEvictionTime;
private final long expireTime;
private final long staleTime;
private final List<ChildPolicyWrapper> childPolicyWrappers;
@ -543,6 +562,7 @@ final class CachingRlsLbClient {
refCountedChildPolicyWrapperFactory
.createOrGet(response.targets());
long now = ticker.read();
minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
expireTime = now + maxAgeNanos;
staleTime = now + staleAgeNanos;
}
@ -574,13 +594,13 @@ final class CachingRlsLbClient {
// async call returned finished future is most likely throttled
try {
RouteLookupResponse response = asyncCall.get();
linkedHashLruCache.cache(request, new DataCacheEntry(request, response));
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cache(request, backoffEntry);
linkedHashLruCache.cacheAndClean(request, backoffEntry);
}
}
}
@ -611,11 +631,19 @@ final class CachingRlsLbClient {
return response.getHeaderData();
}
// Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
int calcStringSize(String target) {
return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
}
@Override
int getSizeBytes() {
// size of strings and java object overhead, actual memory usage is more than this.
return
(response.targets().get(0).length() + response.getHeaderData().length()) * 2 + 38 * 2;
int targetSize = 0;
for (String target : response.targets()) {
targetSize += calcStringSize(target);
}
return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
+ Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
}
@Override
@ -627,6 +655,11 @@ final class CachingRlsLbClient {
return staleTime - now <= 0;
}
@Override
protected long getMinEvictionTime() {
return minEvictionTime;
}
@Override
void cleanup() {
synchronized (lock) {
@ -700,11 +733,11 @@ final class CachingRlsLbClient {
} else {
try {
RouteLookupResponse response = call.get();
linkedHashLruCache.cache(request, new DataCacheEntry(request, response));
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
linkedHashLruCache.cache(
linkedHashLruCache.cacheAndClean(
request,
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
}
@ -718,7 +751,7 @@ final class CachingRlsLbClient {
@Override
int getSizeBytes() {
return 0;
return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
}
@Override
@ -876,8 +909,22 @@ final class CachingRlsLbClient {
@Override
protected boolean shouldInvalidateEldestEntry(
RouteLookupRequest eldestKey, CacheEntry eldestValue) {
if (eldestValue.getMinEvictionTime() > now()) {
return false;
}
// eldest entry should be evicted if size limit exceeded
return true;
return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
}
public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
CacheEntry newEntry = cache(key, value);
// force cleanup if new entry pushed cache over max size (in bytes)
if (fitToLimit()) {
value.triggerPendingRpcProcessing();
}
return newEntry;
}
}

View File

@ -118,6 +118,10 @@ abstract class LinkedHashLruCache<K, V> implements LruCache<K, V> {
return 1;
}
protected long estimatedMaxSizeBytes() {
return estimatedMaxSizeBytes;
}
/** Updates size for given key if entry exists. It is useful if the cache value is mutated. */
public void updateEntrySize(K key) {
synchronized (lock) {
@ -233,30 +237,50 @@ abstract class LinkedHashLruCache<K, V> implements LruCache<K, V> {
}
}
protected long now() {
return ticker.read();
}
/**
* Resizes cache. If new size is smaller than current estimated size, it will free up space by
* Cleans up cache if needed to fit into max size bytes by
* removing expired entries and removing oldest entries by LRU order.
* Returns TRUE if any unexpired entries were removed
*/
public final void resize(int newSizeBytes) {
long now = ticker.read();
protected final boolean fitToLimit() {
boolean removedAnyUnexpired = false;
synchronized (lock) {
this.estimatedMaxSizeBytes = newSizeBytes;
if (estimatedSizeBytes.get() <= newSizeBytes) {
if (estimatedSizeBytes.get() <= estimatedMaxSizeBytes) {
// new size is larger no need to do cleanup
return;
return false;
}
// cleanup expired entries
cleanupExpiredEntries(now);
cleanupExpiredEntries(now());
// cleanup eldest entry until new size limit
Iterator<Map.Entry<K, SizedValue>> lruIter = delegate.entrySet().iterator();
while (lruIter.hasNext() && estimatedMaxSizeBytes < this.estimatedSizeBytes.get()) {
Map.Entry<K, SizedValue> entry = lruIter.next();
if (!shouldInvalidateEldestEntry(entry.getKey(), entry.getValue().value)) {
break; // Violates some constraint like minimum age so stop our cleanup
}
lruIter.remove();
// eviction listener will update the estimatedSizeBytes
evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.SIZE);
removedAnyUnexpired = true;
}
}
return removedAnyUnexpired;
}
/**
* Resizes cache. If new size is smaller than current estimated size, it will free up space by
* removing expired entries and removing oldest entries by LRU order.
*/
public final void resize(long newSizeBytes) {
synchronized (lock) {
this.estimatedMaxSizeBytes = newSizeBytes;
fitToLimit();
}
}
@Override

View File

@ -112,13 +112,28 @@ final class RlsProtoConverters {
ImmutableList<GrpcKeyBuilder> grpcKeybuilders =
GrpcKeyBuilderConverter.covertAll(
checkNotNull(JsonUtil.getListOfObjects(json, "grpcKeybuilders"), "grpcKeybuilders"));
// Validate grpc_keybuilders
checkArgument(!grpcKeybuilders.isEmpty(), "must have at least one GrpcKeyBuilder");
Set<Name> names = new HashSet<>();
for (GrpcKeyBuilder keyBuilder : grpcKeybuilders) {
for (Name name : keyBuilder.names()) {
checkArgument(names.add(name), "duplicate names in grpc_keybuilders: " + name);
}
Set<String> keys = new HashSet<>();
for (NameMatcher header : keyBuilder.headers()) {
checkKeys(keys, header.key(), "header");
}
for (String key : keyBuilder.constantKeys().keySet()) {
checkKeys(keys, key, "constant");
}
String extraKeyStr = keyToString(keyBuilder.extraKeys());
checkArgument(keys.add(extraKeyStr),
"duplicate extra key in grpc_keybuilders: " + extraKeyStr);
}
// Validate lookup_service
String lookupService = JsonUtil.getString(json, "lookupService");
checkArgument(!Strings.isNullOrEmpty(lookupService), "lookupService must not be empty");
try {
@ -157,6 +172,11 @@ final class RlsProtoConverters {
.build();
}
private static String keyToString(ExtraKeys extraKeys) {
return String.format("host: %s, service: %s, method: %s",
extraKeys.host(), extraKeys.service(), extraKeys.method());
}
private static <T> T orDefault(@Nullable T value, T defaultValue) {
if (value == null) {
return checkNotNull(defaultValue, "defaultValue");
@ -170,6 +190,12 @@ final class RlsProtoConverters {
}
}
private static void checkKeys(Set<String> keys, String key, String keyType) {
checkArgument(key != null, "unset " + keyType + " key");
checkArgument(!key.isEmpty(), "Empty string for " + keyType + " key");
checkArgument(keys.add(key), "duplicate " + keyType + " key in grpc_keybuilders: " + key);
}
private static final class GrpcKeyBuilderConverter {
public static ImmutableList<GrpcKeyBuilder> covertAll(List<Map<String, ?>> keyBuilders) {
ImmutableList.Builder<GrpcKeyBuilder> keyBuilderList = ImmutableList.builder();

View File

@ -52,8 +52,8 @@ final class RlsRequestFactory {
Map<String, GrpcKeyBuilder> table = new HashMap<>();
for (GrpcKeyBuilder grpcKeyBuilder : config.grpcKeybuilders()) {
for (Name name : grpcKeyBuilder.names()) {
boolean hasMethod = name.method() == null || name.method().isEmpty();
String method = hasMethod ? "*" : name.method();
boolean noMethod = name.method() == null || name.method().isEmpty();
String method = noMethod ? "*" : name.method();
String path = "/" + name.service() + "/" + method;
table.put(path, grpcKeyBuilder);
}