diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProvider.java b/interop-testing/src/main/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProvider.java
index 212ea7bed1..83c416765e 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProvider.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/RpcBehaviorLoadBalancerProvider.java
@@ -38,7 +38,7 @@ import javax.annotation.Nonnull;
* looks for an "rpc_behavior" field in its configuration and includes the value in the
* "rpc-behavior" metadata entry that is sent to the server. This will cause the test server to
* behave in a predefined way. Endpoint picking logic is delegated to the
- * {@link RoundRobinLoadBalancer}.
+ * io.grpc.util.RoundRobinLoadBalancer.
*
*
Initial use case is to prove that a custom load balancer can be configured by the control
* plane via xDS. An interop test will configure this LB and then verify it has been correctly
diff --git a/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java b/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java
index ff5789c517..576b234b0c 100644
--- a/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java
+++ b/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java
@@ -44,7 +44,7 @@ final class AdaptiveThrottler implements Throttler {
private static final int DEFAULT_HISTORY_SECONDS = 30;
private static final int DEFAULT_REQUEST_PADDING = 8;
- private static final float DEFAULT_RATIO_FOR_ACCEPT = 1.2f;
+ private static final float DEFAULT_RATIO_FOR_ACCEPT = 2.0f;
/**
* The duration of history of calls used by Adaptive Throttler.
diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
index fcac2a37ce..80d36b34b0 100644
--- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
+++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java
@@ -81,12 +81,17 @@ final class CachingRlsLbClient {
REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
private static final Converter
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
+ public static final long MIN_EVICTION_TIME_DELTA_NANOS = TimeUnit.SECONDS.toNanos(5);
+ public static final int BYTES_PER_CHAR = 2;
+ public static final int STRING_OVERHEAD_BYTES = 38;
+ /** Minimum bytes for a Java Object. */
+ public static final int OBJ_OVERHEAD_B = 16;
// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
// LRU cache based on access order (BACKOFF and actual data will be here)
@GuardedBy("lock")
- private final LinkedHashLruCache linkedHashLruCache;
+ private final RlsAsyncLruCache linkedHashLruCache;
// any RPC on the fly will cached in this map
@GuardedBy("lock")
private final Map pendingCallCache = new HashMap<>();
@@ -287,12 +292,12 @@ final class CachingRlsLbClient {
try {
RouteLookupResponse response = asyncCall.get();
DataCacheEntry dataEntry = new DataCacheEntry(request, response);
- linkedHashLruCache.cache(request, dataEntry);
+ linkedHashLruCache.cacheAndClean(request, dataEntry);
return CachedRouteLookupResponse.dataEntry(dataEntry);
} catch (Exception e) {
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
- linkedHashLruCache.cache(request, backoffEntry);
+ linkedHashLruCache.cacheAndClean(request, backoffEntry);
return CachedRouteLookupResponse.backoffEntry(backoffEntry);
}
}
@@ -336,6 +341,10 @@ final class CachingRlsLbClient {
}
});
}
+
+ void triggerPendingRpcProcessing() {
+ super.updateBalancingState(state, picker);
+ }
}
/**
@@ -488,14 +497,15 @@ final class CachingRlsLbClient {
ChannelLogLevel.DEBUG,
"Transition to data cache: routeLookupResponse={0}",
routeLookupResponse);
- linkedHashLruCache.cache(request, new DataCacheEntry(request, routeLookupResponse));
+ linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, routeLookupResponse));
}
}
private void transitionToBackOff(Status status) {
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
- linkedHashLruCache.cache(request, new BackoffCacheEntry(request, status, backoffPolicy));
+ linkedHashLruCache.cacheAndClean(request,
+ new BackoffCacheEntry(request, status, backoffPolicy));
}
}
@@ -525,11 +535,20 @@ final class CachingRlsLbClient {
abstract boolean isExpired(long now);
abstract void cleanup();
+
+ protected long getMinEvictionTime() {
+ return 0L;
+ }
+
+ protected void triggerPendingRpcProcessing() {
+ helper.triggerPendingRpcProcessing();
+ }
}
/** Implementation of {@link CacheEntry} contains valid data. */
final class DataCacheEntry extends CacheEntry {
private final RouteLookupResponse response;
+ private final long minEvictionTime;
private final long expireTime;
private final long staleTime;
private final List childPolicyWrappers;
@@ -543,6 +562,7 @@ final class CachingRlsLbClient {
refCountedChildPolicyWrapperFactory
.createOrGet(response.targets());
long now = ticker.read();
+ minEvictionTime = now + MIN_EVICTION_TIME_DELTA_NANOS;
expireTime = now + maxAgeNanos;
staleTime = now + staleAgeNanos;
}
@@ -574,13 +594,13 @@ final class CachingRlsLbClient {
// async call returned finished future is most likely throttled
try {
RouteLookupResponse response = asyncCall.get();
- linkedHashLruCache.cache(request, new DataCacheEntry(request, response));
+ linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
- linkedHashLruCache.cache(request, backoffEntry);
+ linkedHashLruCache.cacheAndClean(request, backoffEntry);
}
}
}
@@ -611,11 +631,19 @@ final class CachingRlsLbClient {
return response.getHeaderData();
}
+ // Assume UTF-16 (2 bytes) and overhead of a String object is 38 bytes
+ int calcStringSize(String target) {
+ return target.length() * BYTES_PER_CHAR + STRING_OVERHEAD_BYTES;
+ }
+
@Override
int getSizeBytes() {
- // size of strings and java object overhead, actual memory usage is more than this.
- return
- (response.targets().get(0).length() + response.getHeaderData().length()) * 2 + 38 * 2;
+ int targetSize = 0;
+ for (String target : response.targets()) {
+ targetSize += calcStringSize(target);
+ }
+ return targetSize + calcStringSize(response.getHeaderData()) + OBJ_OVERHEAD_B // response size
+ + Long.SIZE * 2 + OBJ_OVERHEAD_B; // Other fields
}
@Override
@@ -627,6 +655,11 @@ final class CachingRlsLbClient {
return staleTime - now <= 0;
}
+ @Override
+ protected long getMinEvictionTime() {
+ return minEvictionTime;
+ }
+
@Override
void cleanup() {
synchronized (lock) {
@@ -700,11 +733,11 @@ final class CachingRlsLbClient {
} else {
try {
RouteLookupResponse response = call.get();
- linkedHashLruCache.cache(request, new DataCacheEntry(request, response));
+ linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
- linkedHashLruCache.cache(
+ linkedHashLruCache.cacheAndClean(
request,
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
}
@@ -718,7 +751,7 @@ final class CachingRlsLbClient {
@Override
int getSizeBytes() {
- return 0;
+ return OBJ_OVERHEAD_B * 3 + Long.SIZE + 8; // 3 java objects, 1 long and a boolean
}
@Override
@@ -876,8 +909,22 @@ final class CachingRlsLbClient {
@Override
protected boolean shouldInvalidateEldestEntry(
RouteLookupRequest eldestKey, CacheEntry eldestValue) {
+ if (eldestValue.getMinEvictionTime() > now()) {
+ return false;
+ }
+
// eldest entry should be evicted if size limit exceeded
- return true;
+ return this.estimatedSizeBytes() > this.estimatedMaxSizeBytes();
+ }
+
+ public CacheEntry cacheAndClean(RouteLookupRequest key, CacheEntry value) {
+ CacheEntry newEntry = cache(key, value);
+
+ // force cleanup if new entry pushed cache over max size (in bytes)
+ if (fitToLimit()) {
+ value.triggerPendingRpcProcessing();
+ }
+ return newEntry;
}
}
diff --git a/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java b/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java
index dde3381a1f..77376a374f 100644
--- a/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java
+++ b/rls/src/main/java/io/grpc/rls/LinkedHashLruCache.java
@@ -118,6 +118,10 @@ abstract class LinkedHashLruCache implements LruCache {
return 1;
}
+ protected long estimatedMaxSizeBytes() {
+ return estimatedMaxSizeBytes;
+ }
+
/** Updates size for given key if entry exists. It is useful if the cache value is mutated. */
public void updateEntrySize(K key) {
synchronized (lock) {
@@ -233,30 +237,50 @@ abstract class LinkedHashLruCache implements LruCache {
}
}
+ protected long now() {
+ return ticker.read();
+ }
+
/**
- * Resizes cache. If new size is smaller than current estimated size, it will free up space by
+ * Cleans up cache if needed to fit into max size bytes by
* removing expired entries and removing oldest entries by LRU order.
+ * Returns TRUE if any unexpired entries were removed
*/
- public final void resize(int newSizeBytes) {
- long now = ticker.read();
+ protected final boolean fitToLimit() {
+ boolean removedAnyUnexpired = false;
synchronized (lock) {
- this.estimatedMaxSizeBytes = newSizeBytes;
- if (estimatedSizeBytes.get() <= newSizeBytes) {
+ if (estimatedSizeBytes.get() <= estimatedMaxSizeBytes) {
// new size is larger no need to do cleanup
- return;
+ return false;
}
// cleanup expired entries
- cleanupExpiredEntries(now);
+ cleanupExpiredEntries(now());
// cleanup eldest entry until new size limit
Iterator> lruIter = delegate.entrySet().iterator();
while (lruIter.hasNext() && estimatedMaxSizeBytes < this.estimatedSizeBytes.get()) {
Map.Entry entry = lruIter.next();
+ if (!shouldInvalidateEldestEntry(entry.getKey(), entry.getValue().value)) {
+ break; // Violates some constraint like minimum age so stop our cleanup
+ }
lruIter.remove();
// eviction listener will update the estimatedSizeBytes
evictionListener.onEviction(entry.getKey(), entry.getValue(), EvictionType.SIZE);
+ removedAnyUnexpired = true;
}
}
+ return removedAnyUnexpired;
+ }
+
+ /**
+ * Resizes cache. If new size is smaller than current estimated size, it will free up space by
+ * removing expired entries and removing oldest entries by LRU order.
+ */
+ public final void resize(long newSizeBytes) {
+ synchronized (lock) {
+ this.estimatedMaxSizeBytes = newSizeBytes;
+ fitToLimit();
+ }
}
@Override
diff --git a/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java b/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java
index efc516919d..cd164f5e2a 100644
--- a/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java
+++ b/rls/src/main/java/io/grpc/rls/RlsProtoConverters.java
@@ -112,13 +112,28 @@ final class RlsProtoConverters {
ImmutableList grpcKeybuilders =
GrpcKeyBuilderConverter.covertAll(
checkNotNull(JsonUtil.getListOfObjects(json, "grpcKeybuilders"), "grpcKeybuilders"));
+
+ // Validate grpc_keybuilders
checkArgument(!grpcKeybuilders.isEmpty(), "must have at least one GrpcKeyBuilder");
Set names = new HashSet<>();
for (GrpcKeyBuilder keyBuilder : grpcKeybuilders) {
for (Name name : keyBuilder.names()) {
checkArgument(names.add(name), "duplicate names in grpc_keybuilders: " + name);
}
+
+ Set keys = new HashSet<>();
+ for (NameMatcher header : keyBuilder.headers()) {
+ checkKeys(keys, header.key(), "header");
+ }
+ for (String key : keyBuilder.constantKeys().keySet()) {
+ checkKeys(keys, key, "constant");
+ }
+ String extraKeyStr = keyToString(keyBuilder.extraKeys());
+ checkArgument(keys.add(extraKeyStr),
+ "duplicate extra key in grpc_keybuilders: " + extraKeyStr);
}
+
+ // Validate lookup_service
String lookupService = JsonUtil.getString(json, "lookupService");
checkArgument(!Strings.isNullOrEmpty(lookupService), "lookupService must not be empty");
try {
@@ -157,6 +172,11 @@ final class RlsProtoConverters {
.build();
}
+ private static String keyToString(ExtraKeys extraKeys) {
+ return String.format("host: %s, service: %s, method: %s",
+ extraKeys.host(), extraKeys.service(), extraKeys.method());
+ }
+
private static T orDefault(@Nullable T value, T defaultValue) {
if (value == null) {
return checkNotNull(defaultValue, "defaultValue");
@@ -170,6 +190,12 @@ final class RlsProtoConverters {
}
}
+ private static void checkKeys(Set keys, String key, String keyType) {
+ checkArgument(key != null, "unset " + keyType + " key");
+ checkArgument(!key.isEmpty(), "Empty string for " + keyType + " key");
+ checkArgument(keys.add(key), "duplicate " + keyType + " key in grpc_keybuilders: " + key);
+ }
+
private static final class GrpcKeyBuilderConverter {
public static ImmutableList covertAll(List