rls: caching rls client (#6966)

This commit is contained in:
Jihun Cho 2020-05-01 12:02:05 -07:00 committed by GitHub
parent a423900491
commit 50a829ad9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1518 additions and 24 deletions

View File

@ -24,7 +24,8 @@ import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
final class PickSubchannelArgsImpl extends PickSubchannelArgs {
/** Implementation of {@link PickSubchannelArgs}. */
public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
private final CallOptions callOptions;
private final Metadata headers;
private final MethodDescriptor<?, ?> method;
@ -32,7 +33,8 @@ final class PickSubchannelArgsImpl extends PickSubchannelArgs {
/**
* Creates call args object for given method with its call options, metadata.
*/
PickSubchannelArgsImpl(MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
public PickSubchannelArgsImpl(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
this.method = checkNotNull(method, "method");
this.headers = checkNotNull(headers, "headers");
this.callOptions = checkNotNull(callOptions, "callOptions");

View File

@ -14,6 +14,8 @@ dependencies {
project(':grpc-stub')
compileOnly libraries.javax_annotation
testCompile libraries.truth,
project(':grpc-testing'),
project(':grpc-testing-proto'),
project(':grpc-core').sourceSets.test.output // for FakeClock
}

View File

@ -0,0 +1,963 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.rls.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.Converter;
import com.google.common.base.MoreObjects;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ConnectivityState;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.NameResolver.ConfigOrError;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.PickSubchannelArgsImpl;
import io.grpc.internal.TimeProvider;
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub;
import io.grpc.rls.internal.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildLbStatusListener;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildLoadBalancingPolicy;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildPolicyWrapper;
import io.grpc.rls.internal.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
import io.grpc.rls.internal.LruCache.EvictionListener;
import io.grpc.rls.internal.LruCache.EvictionType;
import io.grpc.rls.internal.RlsProtoConverters.RouteLookupResponseConverter;
import io.grpc.rls.internal.RlsProtoData.RequestProcessingStrategy;
import io.grpc.rls.internal.RlsProtoData.RouteLookupConfig;
import io.grpc.rls.internal.RlsProtoData.RouteLookupRequest;
import io.grpc.rls.internal.RlsProtoData.RouteLookupResponse;
import io.grpc.rls.internal.Throttler.ThrottledException;
import io.grpc.stub.StreamObserver;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
/**
* A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request
* routing by fetching the decision from route lookup server. Every single request is routed by
* the server's decision. To reduce the performance penalty, {@link LruCache} is used.
*/
@ThreadSafe
public final class CachingRlsLbClient {
private static final Converter<RouteLookupRequest, io.grpc.lookup.v1.RouteLookupRequest>
REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse();
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
// All cache status changes (pending, backoff, success) must be under this lock
private final Object lock = new Object();
// LRU cache based on access order (BACKOFF and actual data will be here)
@GuardedBy("lock")
private final LinkedHashLruCache<RouteLookupRequest, CacheEntry> linkedHashLruCache;
// any RPC on the fly will cached in this map
@GuardedBy("lock")
private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
private final SynchronizationContext synchronizationContext;
private final ScheduledExecutorService scheduledExecutorService;
private final TimeProvider timeProvider;
private final Throttler throttler;
private final LbPolicyConfiguration lbPolicyConfig;
private final BackoffPolicy.Provider backoffProvider;
private final long maxAgeNanos;
private final long staleAgeNanos;
private final long callTimeoutNanos;
private final Helper helper;
private final ManagedChannel rlsChannel;
private final RouteLookupServiceStub rlsStub;
private final RlsPicker rlsPicker;
private final ResolvedAddressFactory childLbResolvedAddressFactory;
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
private CachingRlsLbClient(Builder builder) {
helper = checkNotNull(builder.helper, "helper");
scheduledExecutorService = helper.getScheduledExecutorService();
synchronizationContext = helper.getSynchronizationContext();
lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
maxAgeNanos = TimeUnit.MILLISECONDS.toNanos(rlsConfig.getMaxAgeInMillis());
staleAgeNanos = TimeUnit.MILLISECONDS.toNanos(rlsConfig.getStaleAgeInMillis());
callTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(rlsConfig.getLookupServiceTimeoutInMillis());
timeProvider = checkNotNull(builder.timeProvider, "timeProvider");
throttler = checkNotNull(builder.throttler, "throttler");
linkedHashLruCache =
new RlsAsyncLruCache(
rlsConfig.getCacheSizeBytes(),
builder.evictionListener,
scheduledExecutorService,
timeProvider);
RlsRequestFactory requestFactory = new RlsRequestFactory(lbPolicyConfig.getRouteLookupConfig());
rlsPicker = new RlsPicker(requestFactory);
rlsChannel = helper.createResolvingOobChannel(rlsConfig.getLookupService());
helper.updateBalancingState(ConnectivityState.CONNECTING, rlsPicker);
rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
childLbResolvedAddressFactory =
checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
backoffProvider = builder.backoffProvider;
ChildLoadBalancerHelperProvider childLbHelperProvider =
new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker);
if (rlsConfig.getRequestProcessingStrategy()
== RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR) {
refCountedChildPolicyWrapperFactory =
new RefCountedChildPolicyWrapperFactory(
childLbHelperProvider, new BackoffRefreshListener());
} else {
refCountedChildPolicyWrapperFactory =
new RefCountedChildPolicyWrapperFactory(childLbHelperProvider, null);
}
}
@CheckReturnValue
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
if (throttler.shouldThrottle()) {
response.setException(new ThrottledException());
return response;
}
rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
.routeLookup(
REQUEST_CONVERTER.convert(request),
new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
@Override
public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
response.set(RESPONSE_CONVERTER.reverse().convert(value));
}
@Override
public void onError(Throwable t) {
response.setException(t);
throttler.registerBackendResponse(false);
}
@Override
public void onCompleted() {
throttler.registerBackendResponse(true);
}
});
return response;
}
/**
* Returns async response of the {@code request}. The returned value can be in 3 different states;
* cached, pending and backed-off due to error. The result remains same even if the status is
* changed after the return.
*/
@CheckReturnValue
public final CachedRouteLookupResponse get(final RouteLookupRequest request) {
synchronizationContext.throwIfNotInThisSynchronizationContext();
synchronized (lock) {
final CacheEntry cacheEntry;
cacheEntry = linkedHashLruCache.read(request);
if (cacheEntry == null) {
return handleNewRequest(request);
}
if (cacheEntry instanceof DataCacheEntry) {
// cache hit, initiate async-refresh if entry is staled
DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
if (dataEntry.isStaled(timeProvider.currentTimeNanos())) {
dataEntry.maybeRefresh();
}
return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry);
}
return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry);
}
}
/** Performs any pending maintenance operations needed by the cache. */
public void close() {
synchronized (lock) {
// all childPolicyWrapper will be returned via AutoCleaningEvictionListener
linkedHashLruCache.close();
// TODO(creamsoup) maybe cancel all pending requests
pendingCallCache.clear();
rlsChannel.shutdown();
rlsPicker.close();
}
}
/**
* Populates async cache entry for new request. This is only methods directly modifies the cache,
* any status change is happening via event (async request finished, timed out, etc) in {@link
* PendingCacheEntry}, {@link DataCacheEntry} and {@link BackoffCacheEntry}.
*/
private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) {
synchronized (lock) {
PendingCacheEntry pendingEntry = pendingCallCache.get(request);
if (pendingEntry != null) {
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
}
ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
pendingEntry = new PendingCacheEntry(request, asyncCall);
pendingCallCache.put(request, pendingEntry);
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
} else {
// async call returned finished future is most likely throttled
try {
RouteLookupResponse response = asyncCall.get();
DataCacheEntry dataEntry = new DataCacheEntry(request, response);
linkedHashLruCache.cache(request, dataEntry);
return CachedRouteLookupResponse.dataEntry(dataEntry);
} catch (Exception e) {
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cache(request, backoffEntry);
return CachedRouteLookupResponse.backoffEntry(backoffEntry);
}
}
}
}
public void requestConnection() {
rlsChannel.getState(true);
}
/**
* Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}.
*/
static final class CachedRouteLookupResponse {
private final RouteLookupRequest request;
// Should only have 1 of following 3 cache entries
@Nullable
private final DataCacheEntry dataCacheEntry;
@Nullable
private final PendingCacheEntry pendingCacheEntry;
@Nullable
private final BackoffCacheEntry backoffCacheEntry;
CachedRouteLookupResponse(
RouteLookupRequest request,
DataCacheEntry dataCacheEntry,
PendingCacheEntry pendingCacheEntry,
BackoffCacheEntry backoffCacheEntry) {
this.request = checkNotNull(request, "request");
this.dataCacheEntry = dataCacheEntry;
this.pendingCacheEntry = pendingCacheEntry;
this.backoffCacheEntry = backoffCacheEntry;
checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null)
&& !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null),
"Expected only 1 cache entry value provided");
}
static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) {
return new CachedRouteLookupResponse(pendingEntry.request, null, pendingEntry, null);
}
static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) {
return new CachedRouteLookupResponse(backoffEntry.request, null, null, backoffEntry);
}
static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) {
return new CachedRouteLookupResponse(dataEntry.request, dataEntry, null, null);
}
boolean hasData() {
return dataCacheEntry != null;
}
@Nullable
ChildPolicyWrapper getChildPolicyWrapper() {
if (!hasData()) {
return null;
}
return dataCacheEntry.getChildPolicyWrapper();
}
@Nullable
public String getHeaderData() {
if (!hasData()) {
return null;
}
return dataCacheEntry.getHeaderData();
}
boolean hasError() {
return backoffCacheEntry != null;
}
boolean isPending() {
return pendingCacheEntry != null;
}
@Nullable
Status getStatus() {
if (!hasError()) {
return null;
}
return backoffCacheEntry.getStatus();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("request", request)
.add("dataCacheEntry", dataCacheEntry)
.add("pendingCacheEntry", pendingCacheEntry)
.add("backoffCacheEntry", backoffCacheEntry)
.toString();
}
}
/** A pending cache entry when the async RouteLookup RPC is still on the fly. */
final class PendingCacheEntry {
private final ListenableFuture<RouteLookupResponse> pendingCall;
private final RouteLookupRequest request;
private final BackoffPolicy backoffPolicy;
PendingCacheEntry(
RouteLookupRequest request, ListenableFuture<RouteLookupResponse> pendingCall) {
this(request, pendingCall, null);
}
PendingCacheEntry(
RouteLookupRequest request,
ListenableFuture<RouteLookupResponse> pendingCall,
@Nullable BackoffPolicy backoffPolicy) {
this.request = checkNotNull(request, "request");
this.pendingCall = pendingCall;
this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy;
pendingCall.addListener(
new Runnable() {
@Override
public void run() {
handleDoneFuture();
}
},
synchronizationContext);
}
private void handleDoneFuture() {
synchronized (lock) {
pendingCallCache.remove(request);
if (pendingCall.isCancelled()) {
return;
}
try {
transitionToDataEntry(pendingCall.get());
} catch (Exception e) {
if (e instanceof ThrottledException) {
transitionToBackOff(Status.RESOURCE_EXHAUSTED.withCause(e));
} else {
transitionToBackOff(Status.fromThrowable(e));
}
}
}
}
private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) {
synchronized (lock) {
linkedHashLruCache.cache(request, new DataCacheEntry(request, routeLookupResponse));
}
}
private void transitionToBackOff(Status status) {
synchronized (lock) {
linkedHashLruCache.cache(request, new BackoffCacheEntry(request, status, backoffPolicy));
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("request", request)
.add("pendingCall", pendingCall)
.add("backoffPolicy", backoffPolicy)
.toString();
}
}
/** Common cache entry data for {@link RlsAsyncLruCache}. */
abstract class CacheEntry {
protected final RouteLookupRequest request;
CacheEntry(RouteLookupRequest request) {
this.request = checkNotNull(request, "request");
}
abstract int getSizeBytes();
final boolean isExpired() {
return isExpired(timeProvider.currentTimeNanos());
}
abstract boolean isExpired(long now);
abstract void cleanup();
}
/** Implementation of {@link CacheEntry} contains valid data. */
final class DataCacheEntry extends CacheEntry {
private final RouteLookupResponse response;
private final long expireTime;
private final long staleTime;
private ChildPolicyWrapper childPolicyWrapper;
DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) {
super(request);
this.response = checkNotNull(response, "response");
childPolicyWrapper =
refCountedChildPolicyWrapperFactory
.createOrGet(response.getTarget());
long now = timeProvider.currentTimeNanos();
expireTime = now + maxAgeNanos;
staleTime = now + staleAgeNanos;
if (childPolicyWrapper.getPicker() != null) {
// using cached childPolicyWrapper
updateLbState();
} else {
createChildLbPolicy();
}
}
private void updateLbState() {
childPolicyWrapper
.getHelper()
.updateBalancingState(
childPolicyWrapper.getConnectivityStateInfo().getState(),
childPolicyWrapper.getPicker());
}
private void createChildLbPolicy() {
ChildLoadBalancingPolicy childPolicy = lbPolicyConfig.getLoadBalancingPolicy();
LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider();
ConfigOrError lbConfig =
lbProvider
.parseLoadBalancingPolicyConfig(
childPolicy.getEffectiveChildPolicy(childPolicyWrapper.getTarget()));
LoadBalancer lb = lbProvider.newLoadBalancer(childPolicyWrapper.getHelper());
lb.handleResolvedAddresses(childLbResolvedAddressFactory.create(lbConfig.getConfig()));
lb.requestConnection();
}
/**
* Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code
* PendingCacheEntry} received data from RLS server, it will replace the data entry if valid
* data still exists. Flow looks like following.
*
* <pre>
* Timeline | async refresh
* V put new cache (entry2)
* entry1: Pending | hasValue | staled |
* entry2: | OV* | pending | hasValue | staled |
*
* OV: old value
* </pre>
*/
void maybeRefresh() {
synchronized (lock) {
if (pendingCallCache.containsKey(request)) {
// pending already requested
return;
}
final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall));
} else {
// async call returned finished future is most likely throttled
try {
RouteLookupResponse response = asyncCall.get();
linkedHashLruCache.cache(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cache(request, backoffEntry);
}
}
}
}
@Nullable
ChildPolicyWrapper getChildPolicyWrapper() {
return childPolicyWrapper;
}
String getHeaderData() {
return response.getHeaderData();
}
@Override
int getSizeBytes() {
// size of strings and java object overhead, actual memory usage is more than this.
return (response.getTarget().length() + response.getHeaderData().length()) * 2 + 38 * 2;
}
@Override
boolean isExpired(long now) {
return expireTime <= now;
}
boolean isStaled(long now) {
return staleTime <= now;
}
@Override
void cleanup() {
refCountedChildPolicyWrapperFactory.release(childPolicyWrapper);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("request", request)
.add("response", response)
.add("expireTime", expireTime)
.add("staleTime", staleTime)
.add("childPolicyWrapper", childPolicyWrapper)
.toString();
}
}
/**
* Implementation of {@link CacheEntry} contains error. This entry will transition to pending
* status when the backoff time is expired.
*/
private final class BackoffCacheEntry extends CacheEntry {
private final Status status;
private final ScheduledHandle scheduledHandle;
private final BackoffPolicy backoffPolicy;
private final long expireNanos;
private boolean shutdown = false;
BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
super(request);
this.status = checkNotNull(status, "status");
this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
long delayNanos = backoffPolicy.nextBackoffNanos();
this.expireNanos = timeProvider.currentTimeNanos() + delayNanos;
this.scheduledHandle =
synchronizationContext.schedule(
new Runnable() {
@Override
public void run() {
transitionToPending();
}
},
delayNanos,
TimeUnit.NANOSECONDS,
scheduledExecutorService);
}
/** Forcefully refreshes cache entry by ignoring the backoff timer. */
void forceRefresh() {
if (scheduledHandle.isPending()) {
scheduledHandle.cancel();
transitionToPending();
}
}
private void transitionToPending() {
synchronized (lock) {
if (shutdown) {
return;
}
ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
if (!call.isDone()) {
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
pendingCallCache.put(request, pendingEntry);
linkedHashLruCache.invalidate(request);
} else {
try {
RouteLookupResponse response = call.get();
linkedHashLruCache.cache(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
linkedHashLruCache.cache(
request,
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
}
}
}
}
Status getStatus() {
return status;
}
@Override
int getSizeBytes() {
return 0;
}
@Override
boolean isExpired(long now) {
return expireNanos <= now;
}
@Override
void cleanup() {
if (shutdown) {
return;
}
shutdown = true;
if (!scheduledHandle.isPending()) {
scheduledHandle.cancel();
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("request", request)
.add("status", status)
.add("backoffPolicy", backoffPolicy)
.add("scheduledFuture", scheduledHandle)
.toString();
}
}
/** Returns a Builder for {@link CachingRlsLbClient}. */
public static Builder newBuilder() {
return new Builder();
}
/** A Builder for {@link CachingRlsLbClient}. */
public static final class Builder {
private Helper helper;
private LbPolicyConfiguration lbPolicyConfig;
private Throttler throttler = new HappyThrottler();
private ResolvedAddressFactory resolvedAddressFactory;
private TimeProvider timeProvider = TimeProvider.SYSTEM_TIME_PROVIDER;
private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider();
public Builder setHelper(Helper helper) {
this.helper = checkNotNull(helper, "helper");
return this;
}
public Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) {
this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig");
return this;
}
public Builder setThrottler(Throttler throttler) {
this.throttler = checkNotNull(throttler, "throttler");
return this;
}
/**
* Sets a factory to create {@link ResolvedAddresses} for child load balancer.
*/
public Builder setResolvedAddressesFactory(
ResolvedAddressFactory resolvedAddressFactory) {
this.resolvedAddressFactory =
checkNotNull(resolvedAddressFactory, "resolvedAddressFactory");
return this;
}
public Builder setTimeProvider(TimeProvider timeProvider) {
this.timeProvider = checkNotNull(timeProvider, "timeProvider");
return this;
}
public Builder setEvictionListener(
@Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener) {
this.evictionListener = evictionListener;
return this;
}
public Builder setBackoffProvider(BackoffPolicy.Provider provider) {
this.backoffProvider = checkNotNull(provider, "provider");
return this;
}
public CachingRlsLbClient build() {
return new CachingRlsLbClient(this);
}
}
/**
* When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link
* CacheEntry#cleanup()} after original {@link EvictionListener} is finished.
*/
private static final class AutoCleaningEvictionListener
implements EvictionListener<RouteLookupRequest, CacheEntry> {
private final EvictionListener<RouteLookupRequest, CacheEntry> delegate;
AutoCleaningEvictionListener(
@Nullable EvictionListener<RouteLookupRequest, CacheEntry> delegate) {
this.delegate = delegate;
}
@Override
public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) {
if (delegate != null) {
delegate.onEviction(key, value, cause);
}
// performs cleanup after delegation
value.cleanup();
}
}
/** A Throttler never throttles. */
private static final class HappyThrottler implements Throttler {
@Override
public boolean shouldThrottle() {
return false;
}
@Override
public void registerBackendResponse(boolean throttled) {
// no-op
}
}
/** Implementation of {@link LinkedHashLruCache} for RLS. */
private static final class RlsAsyncLruCache
extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
RlsAsyncLruCache(long maxEstimatedSizeBytes,
@Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
ScheduledExecutorService ses, TimeProvider timeProvider) {
super(
maxEstimatedSizeBytes,
new AutoCleaningEvictionListener(evictionListener),
1,
TimeUnit.MINUTES,
ses,
timeProvider);
}
@Override
protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) {
return value.isExpired();
}
@Override
protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) {
return value.getSizeBytes();
}
@Override
protected boolean shouldInvalidateEldestEntry(
RouteLookupRequest eldestKey, CacheEntry eldestValue) {
// eldest entry should be evicted if size limit exceeded
return true;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.toString();
}
}
/**
* LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link
* ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}.
*/
private final class BackoffRefreshListener implements ChildLbStatusListener {
@Nullable
private ConnectivityState prevState = null;
@Override
public void onStatusChanged(ConnectivityState newState) {
if (prevState == ConnectivityState.TRANSIENT_FAILURE
&& newState == ConnectivityState.READY) {
synchronized (lock) {
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
((BackoffCacheEntry) value).forceRefresh();
}
}
}
}
prevState = newState;
}
}
/** A header will be added when RLS server respond with additional header data. */
public static final Metadata.Key<String> RLS_DATA_KEY =
Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER);
final class RlsPicker extends SubchannelPicker {
private final RlsRequestFactory requestFactory;
RlsPicker(RlsRequestFactory requestFactory) {
this.requestFactory = checkNotNull(requestFactory, "requestFactory");
}
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
String[] methodName = args.getMethodDescriptor().getFullMethodName().split("/", 2);
RouteLookupRequest request =
requestFactory.create(methodName[0], methodName[1], args.getHeaders());
final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
PickSubchannelArgs rlsAppliedArgs = getApplyRlsHeader(args, response);
if (response.hasData()) {
ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper();
ConnectivityState connectivityState =
childPolicyWrapper.getConnectivityStateInfo().getState();
switch (connectivityState) {
case CONNECTING:
return PickResult.withNoResult();
case IDLE:
// fall through
case READY:
if (childPolicyWrapper.getPicker() == null) {
return PickResult.withNoResult();
}
return childPolicyWrapper.getPicker().pickSubchannel(rlsAppliedArgs);
case TRANSIENT_FAILURE:
return handleError(rlsAppliedArgs, Status.INTERNAL);
case SHUTDOWN:
default:
return handleError(rlsAppliedArgs, Status.ABORTED);
}
} else if (response.hasError()) {
return handleError(rlsAppliedArgs, response.getStatus());
} else {
return PickResult.withNoResult();
}
}
private PickSubchannelArgs getApplyRlsHeader(
PickSubchannelArgs args, CachedRouteLookupResponse response) {
if (response.getHeaderData() == null || response.getHeaderData().isEmpty()) {
return args;
}
Metadata headers = new Metadata();
headers.merge(args.getHeaders());
headers.put(RLS_DATA_KEY, response.getHeaderData());
return new PickSubchannelArgsImpl(args.getMethodDescriptor(), headers, args.getCallOptions());
}
private PickResult handleError(PickSubchannelArgs args, Status cause) {
RequestProcessingStrategy strategy =
lbPolicyConfig.getRouteLookupConfig().getRequestProcessingStrategy();
switch (strategy) {
case SYNC_LOOKUP_CLIENT_SEES_ERROR:
return PickResult.withError(cause);
case SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR:
return useFallback(args);
default:
throw new AssertionError("Unknown RequestProcessingStrategy: " + strategy);
}
}
private ChildPolicyWrapper fallbackChildPolicyWrapper;
/** Uses Subchannel connected to default target. */
private PickResult useFallback(PickSubchannelArgs args) {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget();
if (fallbackChildPolicyWrapper == null
|| !fallbackChildPolicyWrapper.getTarget().equals(defaultTarget)) {
// TODO(creamsoup) wait until lb is ready
startFallbackChildPolicy();
}
switch (fallbackChildPolicyWrapper.getConnectivityStateInfo().getState()) {
case CONNECTING:
return PickResult.withNoResult();
case TRANSIENT_FAILURE:
// fall through
case SHUTDOWN:
return
PickResult
.withError(fallbackChildPolicyWrapper.getConnectivityStateInfo().getStatus());
case IDLE:
// fall through
case READY:
SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker();
if (picker == null) {
return PickResult.withNoResult();
}
return picker.pickSubchannel(args);
default:
throw new AssertionError();
}
}
private void startFallbackChildPolicy() {
String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget();
fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget);
LoadBalancerProvider lbProvider =
lbPolicyConfig.getLoadBalancingPolicy().getEffectiveLbProvider();
final LoadBalancer lb =
lbProvider.newLoadBalancer(fallbackChildPolicyWrapper.getHelper());
final ConfigOrError lbConfig =
lbProvider
.parseLoadBalancingPolicyConfig(
lbPolicyConfig
.getLoadBalancingPolicy()
.getEffectiveChildPolicy(defaultTarget));
helper.getSynchronizationContext().execute(
new Runnable() {
@Override
public void run() {
lb.handleResolvedAddresses(
childLbResolvedAddressFactory.create(lbConfig.getConfig()));
lb.requestConnection();
}
});
}
void close() {
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
}
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("target", lbPolicyConfig.getRouteLookupConfig().getLookupService())
.toString();
}
}
}

View File

@ -38,7 +38,7 @@ public final class RlsProtoData {
/** A request object sent to route lookup service. */
@Immutable
public static final class RouteLookupRequest {
static final class RouteLookupRequest {
private final String server;
@ -119,7 +119,7 @@ public final class RlsProtoData {
/** A response from route lookup service. */
@Immutable
public static final class RouteLookupResponse {
static final class RouteLookupResponse {
private final String target;
@ -218,6 +218,8 @@ public final class RlsProtoData {
checkState(
lookupService != null && !lookupService.isEmpty(), "lookupService must not be empty");
this.lookupService = lookupService;
checkState(
lookupServiceTimeoutInMillis > 0, "lookupServiceTimeoutInMillis should be positive");
this.lookupServiceTimeoutInMillis = lookupServiceTimeoutInMillis;
if (maxAgeInMillis == null) {
checkState(
@ -238,9 +240,7 @@ public final class RlsProtoData {
this.requestProcessingStrategy = requestProcessingStrategy;
checkNotNull(requestProcessingStrategy, "requestProcessingStrategy");
checkState(
!((requestProcessingStrategy == RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR
|| requestProcessingStrategy
== RequestProcessingStrategy.ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS)
!(requestProcessingStrategy == RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR
&& defaultTarget.isEmpty()),
"defaultTarget cannot be empty if strategy is %s",
requestProcessingStrategy);
@ -303,13 +303,10 @@ public final class RlsProtoData {
}
/**
* Returns the default target to use. It will be used for request processing strategy
* {@link RequestProcessingStrategy#SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR} if RLS
* returns an error, or strategy {@link
* RequestProcessingStrategy#ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS} if RLS returns an error or
* there is a cache miss in the client. It will also be used if there are no healthy backends
* for an RLS target. Note that requests can be routed only to a subdomain of the original
* target, {@literal e.g.} "us_east_1.cloudbigtable.googleapis.com".
* Returns the default target to use if needed. If nonempty (implies request processing
* strategy SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR is set), it will be used if RLS returns an
* error. Note that requests can be routed only to a subdomain of the original target,
* {@literal e.g.} "us_east_1.cloudbigtable.googleapis.com".
*/
public String getDefaultTarget() {
return defaultTarget;
@ -379,7 +376,7 @@ public final class RlsProtoData {
}
/** RequestProcessingStrategy specifies how to process a request when not already in the cache. */
enum RequestProcessingStrategy {
public enum RequestProcessingStrategy {
/**
* Query the RLS and process the request using target returned by the lookup. The target will
* then be cached and used for processing subsequent requests for the same key. Any errors
@ -394,13 +391,6 @@ public final class RlsProtoData {
* strict regional routing requirements should use this strategy.
*/
SYNC_LOOKUP_CLIENT_SEES_ERROR,
/**
* Query the RLS asynchronously but respond with the default target. The target in the lookup
* response will then be cached and used for subsequent requests. Services with strict latency
* requirements (but not strict regional routing requirements) should use this strategy.
*/
ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS;
}
/**

View File

@ -22,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe;
* A strategy for deciding when to throttle requests at the client.
*/
@ThreadSafe
public interface Throttler {
interface Throttler {
/**
* Checks if a given request should be throttled by the client. This should be called for every

View File

@ -0,0 +1,537 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.rls.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.base.Converter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
import io.grpc.ManagedChannel;
import io.grpc.NameResolver.Factory;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.BackoffPolicy;
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
import io.grpc.rls.internal.CachingRlsLbClient.CacheEntry;
import io.grpc.rls.internal.CachingRlsLbClient.CachedRouteLookupResponse;
import io.grpc.rls.internal.CachingRlsLbClient.RlsPicker;
import io.grpc.rls.internal.DoNotUseDirectScheduledExecutorService.FakeTimeProvider;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildLoadBalancingPolicy;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildPolicyWrapper;
import io.grpc.rls.internal.LruCache.EvictionListener;
import io.grpc.rls.internal.LruCache.EvictionType;
import io.grpc.rls.internal.RlsProtoConverters.RouteLookupResponseConverter;
import io.grpc.rls.internal.RlsProtoData.GrpcKeyBuilder;
import io.grpc.rls.internal.RlsProtoData.GrpcKeyBuilder.Name;
import io.grpc.rls.internal.RlsProtoData.NameMatcher;
import io.grpc.rls.internal.RlsProtoData.RequestProcessingStrategy;
import io.grpc.rls.internal.RlsProtoData.RouteLookupConfig;
import io.grpc.rls.internal.RlsProtoData.RouteLookupRequest;
import io.grpc.rls.internal.RlsProtoData.RouteLookupResponse;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@RunWith(JUnit4.class)
public class CachingRlsLbClientTest {
private static final RouteLookupConfig ROUTE_LOOKUP_CONFIG = getRouteLookupConfig();
private static final int SERVER_LATENCY_MILLIS = 10;
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
@Rule
public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
@Mock
private EvictionListener<RouteLookupRequest, CacheEntry> evictionListener;
@Mock
private SocketAddress socketAddress;
private final SynchronizationContext syncContext =
new SynchronizationContext(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
throw new RuntimeException(e);
}
});
private final FakeBackoffProvider fakeBackoffProvider = new FakeBackoffProvider();
private final ResolvedAddressFactory resolvedAddressFactory =
new ChildLbResolvedAddressFactory(
ImmutableList.of(new EquivalentAddressGroup(socketAddress)), Attributes.EMPTY);
private final TestLoadBalancerProvider lbProvider = new TestLoadBalancerProvider();
private final DoNotUseDirectScheduledExecutorService fakeScheduledExecutorService =
mock(DoNotUseDirectScheduledExecutorService.class, CALLS_REAL_METHODS);
private final FakeTimeProvider fakeTimeProvider =
fakeScheduledExecutorService.getFakeTimeProvider();
private final StaticFixedDelayRlsServerImpl rlsServerImpl =
new StaticFixedDelayRlsServerImpl(
TimeUnit.MILLISECONDS.toNanos(SERVER_LATENCY_MILLIS), fakeScheduledExecutorService);
private final ChildLoadBalancingPolicy childLbPolicy =
new ChildLoadBalancingPolicy("target", Collections.<String, Object>emptyMap(), lbProvider);
private final Helper helper =
mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper()));
private final FakeThrottler fakeThrottler = new FakeThrottler();
private final LbPolicyConfiguration lbPolicyConfiguration =
new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, childLbPolicy);
private CachingRlsLbClient rlsLbClient;
@Before
public void setUp() throws Exception {
rlsLbClient =
CachingRlsLbClient.newBuilder()
.setBackoffProvider(fakeBackoffProvider)
.setResolvedAddressesFactory(resolvedAddressFactory)
.setEvictionListener(evictionListener)
.setHelper(helper)
.setLbPolicyConfig(lbPolicyConfiguration)
.setThrottler(fakeThrottler)
.setTimeProvider(fakeTimeProvider)
.build();
}
@After
public void tearDown() throws Exception {
rlsLbClient.close();
}
private CachedRouteLookupResponse getInSyncContext(
final RouteLookupRequest request)
throws ExecutionException, InterruptedException, TimeoutException {
final SettableFuture<CachedRouteLookupResponse> responseSettableFuture =
SettableFuture.create();
syncContext.execute(new Runnable() {
@Override
public void run() {
responseSettableFuture.set(rlsLbClient.get(request));
}
});
return responseSettableFuture.get(5, TimeUnit.SECONDS);
}
@Test
public void get_noError_lifeCycle() throws Exception {
InOrder inOrder = inOrder(evictionListener);
RouteLookupRequest routeLookupRequest =
new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.<String, String>of());
rlsServerImpl.setLookupTable(
ImmutableMap.of(
routeLookupRequest,
new RouteLookupResponse("target", "header")));
// initial request
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
assertThat(resp.isPending()).isTrue();
// server response
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();
// cache hit for staled entry
fakeTimeProvider.forwardTime(ROUTE_LOOKUP_CONFIG.getStaleAgeInMillis(), TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();
// async refresh finishes
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
inOrder
.verify(evictionListener)
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED));
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();
// existing cache expired
fakeTimeProvider.forwardTime(ROUTE_LOOKUP_CONFIG.getMaxAgeInMillis(), TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.isPending()).isTrue();
inOrder
.verify(evictionListener)
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPIRED));
inOrder.verifyNoMoreInteractions();
}
@Test
public void get_throttledAndRecover() throws Exception {
RouteLookupRequest routeLookupRequest =
new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.<String, String>of());
rlsServerImpl.setLookupTable(
ImmutableMap.of(
routeLookupRequest,
new RouteLookupResponse("target", "header")));
fakeThrottler.nextResult = true;
fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS);
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasError()).isTrue();
fakeTimeProvider.forwardTime(10, TimeUnit.MILLISECONDS);
// initially backed off entry is backed off again
verify(evictionListener)
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED));
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasError()).isTrue();
// let it pass throttler
fakeThrottler.nextResult = false;
fakeTimeProvider.forwardTime(10, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.isPending()).isTrue();
// server responses
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();
}
@Test
public void get_updatesLbState() throws Exception {
InOrder inOrder = inOrder(helper);
RouteLookupRequest routeLookupRequest =
new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.<String, String>of());
rlsServerImpl.setLookupTable(
ImmutableMap.of(
routeLookupRequest,
new RouteLookupResponse("target", "header")));
// valid channel
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
assertThat(resp.isPending()).isTrue();
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();
ArgumentCaptor<SubchannelPicker> pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class);
ArgumentCaptor<ConnectivityState> stateCaptor =
ArgumentCaptor.forClass(ConnectivityState.class);
inOrder.verify(helper, times(2))
.updateBalancingState(stateCaptor.capture(), pickerCaptor.capture());
assertThat(new HashSet<>(pickerCaptor.getAllValues())).hasSize(1);
assertThat(stateCaptor.getAllValues())
.containsExactly(ConnectivityState.CONNECTING, ConnectivityState.READY);
assertThat(pickerCaptor.getValue()).isInstanceOf(RlsPicker.class);
// move backoff further back to only test error behavior
fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
// try to get invalid
RouteLookupRequest invalidRouteLookupRequest =
new RouteLookupRequest(
"unknown_server", "/doesn/exists", "grpc", ImmutableMap.<String, String>of());
CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest);
assertThat(errorResp.isPending()).isTrue();
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
errorResp = getInSyncContext(invalidRouteLookupRequest);
assertThat(errorResp.hasError()).isTrue();
inOrder.verify(helper, never())
.updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class));
}
@Test
public void get_childPolicyWrapper_reusedForSameTarget() throws Exception {
RouteLookupRequest routeLookupRequest =
new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.<String, String>of());
RouteLookupRequest routeLookupRequest2 =
new RouteLookupRequest("server", "/foo/baz", "grpc", ImmutableMap.<String, String>of());
rlsServerImpl.setLookupTable(
ImmutableMap.of(
routeLookupRequest, new RouteLookupResponse("target", "header"),
routeLookupRequest2, new RouteLookupResponse("target", "header2")));
CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest);
assertThat(resp.isPending()).isTrue();
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp = getInSyncContext(routeLookupRequest);
assertThat(resp.hasData()).isTrue();
assertThat(resp.getHeaderData()).isEqualTo("header");
ChildPolicyWrapper childPolicyWrapper = resp.getChildPolicyWrapper();
assertThat(childPolicyWrapper.getTarget()).isEqualTo("target");
assertThat(childPolicyWrapper.getPicker()).isNotInstanceOf(RlsPicker.class);
// request2 has same target, it should reuse childPolicyWrapper
CachedRouteLookupResponse resp2 = getInSyncContext(routeLookupRequest2);
assertThat(resp2.isPending()).isTrue();
fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS);
resp2 = getInSyncContext(routeLookupRequest2);
assertThat(resp2.hasData()).isTrue();
assertThat(resp2.getHeaderData()).isEqualTo("header2");
assertThat(resp2.getChildPolicyWrapper()).isEqualTo(resp.getChildPolicyWrapper());
}
private static RouteLookupConfig getRouteLookupConfig() {
return new RouteLookupConfig(
ImmutableList.of(
new GrpcKeyBuilder(
ImmutableList.of(new Name("service1", "create")),
ImmutableList.of(
new NameMatcher("user", ImmutableList.of("User", "Parent"), true),
new NameMatcher("id", ImmutableList.of("X-Google-Id"), true)))),
/* lookupService= */ "service1",
/* lookupServiceTimeoutInMillis= */ TimeUnit.SECONDS.toMillis(2),
/* maxAgeInMillis= */ TimeUnit.SECONDS.toMillis(300),
/* staleAgeInMillis= */ TimeUnit.SECONDS.toMillis(240),
/* cacheSize= */ 1000,
/* validTargets= */ ImmutableList.of("a valid target"),
/* defaultTarget= */ "us_east_1.cloudbigtable.googleapis.com",
RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR);
}
private static BackoffPolicy createBackoffPolicy(final long delay, final TimeUnit unit) {
checkArgument(delay > 0, "delay should be positive");
checkNotNull(unit, "unit");
return
new BackoffPolicy() {
@Override
public long nextBackoffNanos() {
return TimeUnit.NANOSECONDS.convert(delay, unit);
}
};
}
private static final class FakeBackoffProvider implements BackoffPolicy.Provider {
private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS);
@Override
public BackoffPolicy get() {
return nextPolicy;
}
}
private static final class TestLoadBalancerProvider extends LoadBalancerProvider {
@Override
public boolean isAvailable() {
return true;
}
@Override
public int getPriority() {
return 0;
}
@Override
public String getPolicyName() {
return null;
}
@Override
public LoadBalancer newLoadBalancer(final Helper helper) {
return new LoadBalancer() {
@Override
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
// TODO: make the picker accessible
helper.updateBalancingState(ConnectivityState.READY, mock(SubchannelPicker.class));
}
@Override
public void handleNameResolutionError(final Status error) {
class ErrorPicker extends SubchannelPicker {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return PickResult.withError(error);
}
}
helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker());
}
@Override
public void shutdown() {
}
};
}
}
private static final class StaticFixedDelayRlsServerImpl
extends RouteLookupServiceGrpc.RouteLookupServiceImplBase {
private static final Converter<io.grpc.lookup.v1.RouteLookupRequest, RouteLookupRequest>
REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter();
private static final Converter<RouteLookupResponse, io.grpc.lookup.v1.RouteLookupResponse>
RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse();
private final long responseDelayNano;
private final ScheduledExecutorService scheduledExecutorService;
private Map<RouteLookupRequest, RouteLookupResponse> lookupTable = ImmutableMap.of();
public StaticFixedDelayRlsServerImpl(
long responseDelayNano, ScheduledExecutorService scheduledExecutorService) {
checkArgument(responseDelayNano > 0, "delay must be positive");
this.responseDelayNano = responseDelayNano;
this.scheduledExecutorService =
checkNotNull(scheduledExecutorService, "scheduledExecutorService");
}
private void setLookupTable(Map<RouteLookupRequest, RouteLookupResponse> lookupTable) {
this.lookupTable = checkNotNull(lookupTable, "lookupTable");
}
@Override
public void routeLookup(final io.grpc.lookup.v1.RouteLookupRequest request,
final StreamObserver<io.grpc.lookup.v1.RouteLookupResponse> responseObserver) {
ScheduledFuture<?> unused =
scheduledExecutorService.schedule(
new Runnable() {
@Override
public void run() {
RouteLookupResponse response =
lookupTable.get(REQUEST_CONVERTER.convert(request));
if (response == null) {
responseObserver.onError(new RuntimeException("not found"));
} else {
responseObserver.onNext(RESPONSE_CONVERTER.convert(response));
responseObserver.onCompleted();
}
}
}, responseDelayNano, TimeUnit.NANOSECONDS);
}
}
private final class FakeHelper extends Helper {
@Override
public ManagedChannel createResolvingOobChannel(String target) {
try {
grpcCleanupRule.register(
InProcessServerBuilder.forName(target)
.addService(rlsServerImpl)
.directExecutor()
.build()
.start());
} catch (IOException e) {
throw new RuntimeException("cannot create server: " + target, e);
}
return InProcessChannelBuilder.forName(target).directExecutor().build();
}
@Override
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
throw new UnsupportedOperationException();
}
@Override
public void updateBalancingState(
@Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) {
// no-op
}
@Override
@Deprecated
public Factory getNameResolverFactory() {
throw new UnsupportedOperationException();
}
@Override
public String getAuthority() {
throw new UnsupportedOperationException();
}
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return fakeScheduledExecutorService;
}
@Override
public SynchronizationContext getSynchronizationContext() {
return syncContext;
}
}
private static final class FakeThrottler implements Throttler {
private boolean nextResult = false;
@Override
public boolean shouldThrottle() {
return nextResult;
}
@Override
public void registerBackendResponse(boolean throttled) {
// no-op
}
}
}

View File

@ -66,7 +66,7 @@ public class RlsRequestFactoryTest {
/* cacheSize= */ 1000,
/* validTargets= */ ImmutableList.of("a valid target"),
/* defaultTarget= */ "us_east_1.cloudbigtable.googleapis.com",
RequestProcessingStrategy.ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS);
RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR);
private final RlsRequestFactory factory = new RlsRequestFactory(RLS_CONFIG);