diff --git a/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java b/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java index b6ec5bfac5..dd938303de 100644 --- a/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java +++ b/core/src/main/java/io/grpc/internal/PickSubchannelArgsImpl.java @@ -24,7 +24,8 @@ import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -final class PickSubchannelArgsImpl extends PickSubchannelArgs { +/** Implementation of {@link PickSubchannelArgs}. */ +public final class PickSubchannelArgsImpl extends PickSubchannelArgs { private final CallOptions callOptions; private final Metadata headers; private final MethodDescriptor method; @@ -32,7 +33,8 @@ final class PickSubchannelArgsImpl extends PickSubchannelArgs { /** * Creates call args object for given method with its call options, metadata. */ - PickSubchannelArgsImpl(MethodDescriptor method, Metadata headers, CallOptions callOptions) { + public PickSubchannelArgsImpl( + MethodDescriptor method, Metadata headers, CallOptions callOptions) { this.method = checkNotNull(method, "method"); this.headers = checkNotNull(headers, "headers"); this.callOptions = checkNotNull(callOptions, "callOptions"); diff --git a/rls/build.gradle b/rls/build.gradle index 14dad4e508..549d43c032 100644 --- a/rls/build.gradle +++ b/rls/build.gradle @@ -14,6 +14,8 @@ dependencies { project(':grpc-stub') compileOnly libraries.javax_annotation testCompile libraries.truth, + project(':grpc-testing'), + project(':grpc-testing-proto'), project(':grpc-core').sourceSets.test.output // for FakeClock } diff --git a/rls/src/main/java/io/grpc/rls/internal/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/internal/CachingRlsLbClient.java new file mode 100644 index 0000000000..febe1e2d47 --- /dev/null +++ b/rls/src/main/java/io/grpc/rls/internal/CachingRlsLbClient.java @@ -0,0 +1,963 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.rls.internal; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Converter; +import com.google.common.base.MoreObjects; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.ConnectivityState; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickResult; +import io.grpc.LoadBalancer.PickSubchannelArgs; +import io.grpc.LoadBalancer.ResolvedAddresses; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.NameResolver.ConfigOrError; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.internal.PickSubchannelArgsImpl; +import io.grpc.internal.TimeProvider; +import io.grpc.lookup.v1.RouteLookupServiceGrpc; +import io.grpc.lookup.v1.RouteLookupServiceGrpc.RouteLookupServiceStub; +import io.grpc.rls.internal.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider; +import io.grpc.rls.internal.LbPolicyConfiguration.ChildLbStatusListener; +import io.grpc.rls.internal.LbPolicyConfiguration.ChildLoadBalancingPolicy; +import io.grpc.rls.internal.LbPolicyConfiguration.ChildPolicyWrapper; +import io.grpc.rls.internal.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory; +import io.grpc.rls.internal.LruCache.EvictionListener; +import io.grpc.rls.internal.LruCache.EvictionType; +import io.grpc.rls.internal.RlsProtoConverters.RouteLookupResponseConverter; +import io.grpc.rls.internal.RlsProtoData.RequestProcessingStrategy; +import io.grpc.rls.internal.RlsProtoData.RouteLookupConfig; +import io.grpc.rls.internal.RlsProtoData.RouteLookupRequest; +import io.grpc.rls.internal.RlsProtoData.RouteLookupResponse; +import io.grpc.rls.internal.Throttler.ThrottledException; +import io.grpc.stub.StreamObserver; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.CheckReturnValue; +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; +import javax.annotation.concurrent.ThreadSafe; + +/** + * A CachingRlsLbClient is a core implementation of RLS loadbalancer supports dynamic request + * routing by fetching the decision from route lookup server. Every single request is routed by + * the server's decision. To reduce the performance penalty, {@link LruCache} is used. + */ +@ThreadSafe +public final class CachingRlsLbClient { + + private static final Converter + REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter().reverse(); + private static final Converter + RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse(); + + // All cache status changes (pending, backoff, success) must be under this lock + private final Object lock = new Object(); + // LRU cache based on access order (BACKOFF and actual data will be here) + @GuardedBy("lock") + private final LinkedHashLruCache linkedHashLruCache; + // any RPC on the fly will cached in this map + @GuardedBy("lock") + private final Map pendingCallCache = new HashMap<>(); + + private final SynchronizationContext synchronizationContext; + private final ScheduledExecutorService scheduledExecutorService; + private final TimeProvider timeProvider; + private final Throttler throttler; + + private final LbPolicyConfiguration lbPolicyConfig; + private final BackoffPolicy.Provider backoffProvider; + private final long maxAgeNanos; + private final long staleAgeNanos; + private final long callTimeoutNanos; + + private final Helper helper; + private final ManagedChannel rlsChannel; + private final RouteLookupServiceStub rlsStub; + private final RlsPicker rlsPicker; + private final ResolvedAddressFactory childLbResolvedAddressFactory; + private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory; + + private CachingRlsLbClient(Builder builder) { + helper = checkNotNull(builder.helper, "helper"); + scheduledExecutorService = helper.getScheduledExecutorService(); + synchronizationContext = helper.getSynchronizationContext(); + lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig"); + RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig(); + maxAgeNanos = TimeUnit.MILLISECONDS.toNanos(rlsConfig.getMaxAgeInMillis()); + staleAgeNanos = TimeUnit.MILLISECONDS.toNanos(rlsConfig.getStaleAgeInMillis()); + callTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(rlsConfig.getLookupServiceTimeoutInMillis()); + timeProvider = checkNotNull(builder.timeProvider, "timeProvider"); + throttler = checkNotNull(builder.throttler, "throttler"); + linkedHashLruCache = + new RlsAsyncLruCache( + rlsConfig.getCacheSizeBytes(), + builder.evictionListener, + scheduledExecutorService, + timeProvider); + RlsRequestFactory requestFactory = new RlsRequestFactory(lbPolicyConfig.getRouteLookupConfig()); + rlsPicker = new RlsPicker(requestFactory); + rlsChannel = helper.createResolvingOobChannel(rlsConfig.getLookupService()); + helper.updateBalancingState(ConnectivityState.CONNECTING, rlsPicker); + rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel); + childLbResolvedAddressFactory = + checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory"); + backoffProvider = builder.backoffProvider; + ChildLoadBalancerHelperProvider childLbHelperProvider = + new ChildLoadBalancerHelperProvider(helper, new SubchannelStateManagerImpl(), rlsPicker); + if (rlsConfig.getRequestProcessingStrategy() + == RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR) { + refCountedChildPolicyWrapperFactory = + new RefCountedChildPolicyWrapperFactory( + childLbHelperProvider, new BackoffRefreshListener()); + } else { + refCountedChildPolicyWrapperFactory = + new RefCountedChildPolicyWrapperFactory(childLbHelperProvider, null); + } + } + + @CheckReturnValue + private ListenableFuture asyncRlsCall(RouteLookupRequest request) { + final SettableFuture response = SettableFuture.create(); + if (throttler.shouldThrottle()) { + response.setException(new ThrottledException()); + return response; + } + rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS) + .routeLookup( + REQUEST_CONVERTER.convert(request), + new StreamObserver() { + @Override + public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) { + response.set(RESPONSE_CONVERTER.reverse().convert(value)); + } + + @Override + public void onError(Throwable t) { + response.setException(t); + throttler.registerBackendResponse(false); + } + + @Override + public void onCompleted() { + throttler.registerBackendResponse(true); + } + }); + return response; + } + + /** + * Returns async response of the {@code request}. The returned value can be in 3 different states; + * cached, pending and backed-off due to error. The result remains same even if the status is + * changed after the return. + */ + @CheckReturnValue + public final CachedRouteLookupResponse get(final RouteLookupRequest request) { + synchronizationContext.throwIfNotInThisSynchronizationContext(); + synchronized (lock) { + final CacheEntry cacheEntry; + cacheEntry = linkedHashLruCache.read(request); + if (cacheEntry == null) { + return handleNewRequest(request); + } + + if (cacheEntry instanceof DataCacheEntry) { + // cache hit, initiate async-refresh if entry is staled + DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry); + if (dataEntry.isStaled(timeProvider.currentTimeNanos())) { + dataEntry.maybeRefresh(); + } + return CachedRouteLookupResponse.dataEntry((DataCacheEntry) cacheEntry); + } + return CachedRouteLookupResponse.backoffEntry((BackoffCacheEntry) cacheEntry); + } + } + + /** Performs any pending maintenance operations needed by the cache. */ + public void close() { + synchronized (lock) { + // all childPolicyWrapper will be returned via AutoCleaningEvictionListener + linkedHashLruCache.close(); + // TODO(creamsoup) maybe cancel all pending requests + pendingCallCache.clear(); + rlsChannel.shutdown(); + rlsPicker.close(); + } + } + + /** + * Populates async cache entry for new request. This is only methods directly modifies the cache, + * any status change is happening via event (async request finished, timed out, etc) in {@link + * PendingCacheEntry}, {@link DataCacheEntry} and {@link BackoffCacheEntry}. + */ + private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) { + synchronized (lock) { + PendingCacheEntry pendingEntry = pendingCallCache.get(request); + if (pendingEntry != null) { + return CachedRouteLookupResponse.pendingResponse(pendingEntry); + } + + ListenableFuture asyncCall = asyncRlsCall(request); + if (!asyncCall.isDone()) { + pendingEntry = new PendingCacheEntry(request, asyncCall); + pendingCallCache.put(request, pendingEntry); + return CachedRouteLookupResponse.pendingResponse(pendingEntry); + } else { + // async call returned finished future is most likely throttled + try { + RouteLookupResponse response = asyncCall.get(); + DataCacheEntry dataEntry = new DataCacheEntry(request, response); + linkedHashLruCache.cache(request, dataEntry); + return CachedRouteLookupResponse.dataEntry(dataEntry); + } catch (Exception e) { + BackoffCacheEntry backoffEntry = + new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get()); + linkedHashLruCache.cache(request, backoffEntry); + return CachedRouteLookupResponse.backoffEntry(backoffEntry); + } + } + } + } + + public void requestConnection() { + rlsChannel.getState(true); + } + + /** + * Viewer class for cached {@link RouteLookupResponse} and associated {@link ChildPolicyWrapper}. + */ + static final class CachedRouteLookupResponse { + private final RouteLookupRequest request; + + // Should only have 1 of following 3 cache entries + @Nullable + private final DataCacheEntry dataCacheEntry; + @Nullable + private final PendingCacheEntry pendingCacheEntry; + @Nullable + private final BackoffCacheEntry backoffCacheEntry; + + CachedRouteLookupResponse( + RouteLookupRequest request, + DataCacheEntry dataCacheEntry, + PendingCacheEntry pendingCacheEntry, + BackoffCacheEntry backoffCacheEntry) { + this.request = checkNotNull(request, "request"); + this.dataCacheEntry = dataCacheEntry; + this.pendingCacheEntry = pendingCacheEntry; + this.backoffCacheEntry = backoffCacheEntry; + checkState((dataCacheEntry != null ^ pendingCacheEntry != null ^ backoffCacheEntry != null) + && !(dataCacheEntry != null && pendingCacheEntry != null && backoffCacheEntry != null), + "Expected only 1 cache entry value provided"); + } + + static CachedRouteLookupResponse pendingResponse(PendingCacheEntry pendingEntry) { + return new CachedRouteLookupResponse(pendingEntry.request, null, pendingEntry, null); + } + + static CachedRouteLookupResponse backoffEntry(BackoffCacheEntry backoffEntry) { + return new CachedRouteLookupResponse(backoffEntry.request, null, null, backoffEntry); + } + + static CachedRouteLookupResponse dataEntry(DataCacheEntry dataEntry) { + return new CachedRouteLookupResponse(dataEntry.request, dataEntry, null, null); + } + + boolean hasData() { + return dataCacheEntry != null; + } + + @Nullable + ChildPolicyWrapper getChildPolicyWrapper() { + if (!hasData()) { + return null; + } + return dataCacheEntry.getChildPolicyWrapper(); + } + + @Nullable + public String getHeaderData() { + if (!hasData()) { + return null; + } + return dataCacheEntry.getHeaderData(); + } + + boolean hasError() { + return backoffCacheEntry != null; + } + + boolean isPending() { + return pendingCacheEntry != null; + } + + @Nullable + Status getStatus() { + if (!hasError()) { + return null; + } + return backoffCacheEntry.getStatus(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("request", request) + .add("dataCacheEntry", dataCacheEntry) + .add("pendingCacheEntry", pendingCacheEntry) + .add("backoffCacheEntry", backoffCacheEntry) + .toString(); + } + } + + /** A pending cache entry when the async RouteLookup RPC is still on the fly. */ + final class PendingCacheEntry { + private final ListenableFuture pendingCall; + private final RouteLookupRequest request; + private final BackoffPolicy backoffPolicy; + + PendingCacheEntry( + RouteLookupRequest request, ListenableFuture pendingCall) { + this(request, pendingCall, null); + } + + PendingCacheEntry( + RouteLookupRequest request, + ListenableFuture pendingCall, + @Nullable BackoffPolicy backoffPolicy) { + this.request = checkNotNull(request, "request"); + this.pendingCall = pendingCall; + this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy; + pendingCall.addListener( + new Runnable() { + @Override + public void run() { + handleDoneFuture(); + } + }, + synchronizationContext); + } + + private void handleDoneFuture() { + synchronized (lock) { + pendingCallCache.remove(request); + if (pendingCall.isCancelled()) { + return; + } + + try { + transitionToDataEntry(pendingCall.get()); + } catch (Exception e) { + if (e instanceof ThrottledException) { + transitionToBackOff(Status.RESOURCE_EXHAUSTED.withCause(e)); + } else { + transitionToBackOff(Status.fromThrowable(e)); + } + } + } + } + + private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) { + synchronized (lock) { + linkedHashLruCache.cache(request, new DataCacheEntry(request, routeLookupResponse)); + } + } + + private void transitionToBackOff(Status status) { + synchronized (lock) { + linkedHashLruCache.cache(request, new BackoffCacheEntry(request, status, backoffPolicy)); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("request", request) + .add("pendingCall", pendingCall) + .add("backoffPolicy", backoffPolicy) + .toString(); + } + } + + /** Common cache entry data for {@link RlsAsyncLruCache}. */ + abstract class CacheEntry { + + protected final RouteLookupRequest request; + + CacheEntry(RouteLookupRequest request) { + this.request = checkNotNull(request, "request"); + } + + abstract int getSizeBytes(); + + final boolean isExpired() { + return isExpired(timeProvider.currentTimeNanos()); + } + + abstract boolean isExpired(long now); + + abstract void cleanup(); + } + + /** Implementation of {@link CacheEntry} contains valid data. */ + final class DataCacheEntry extends CacheEntry { + private final RouteLookupResponse response; + private final long expireTime; + private final long staleTime; + private ChildPolicyWrapper childPolicyWrapper; + + DataCacheEntry(RouteLookupRequest request, final RouteLookupResponse response) { + super(request); + this.response = checkNotNull(response, "response"); + childPolicyWrapper = + refCountedChildPolicyWrapperFactory + .createOrGet(response.getTarget()); + long now = timeProvider.currentTimeNanos(); + expireTime = now + maxAgeNanos; + staleTime = now + staleAgeNanos; + + if (childPolicyWrapper.getPicker() != null) { + // using cached childPolicyWrapper + updateLbState(); + } else { + createChildLbPolicy(); + } + } + + private void updateLbState() { + childPolicyWrapper + .getHelper() + .updateBalancingState( + childPolicyWrapper.getConnectivityStateInfo().getState(), + childPolicyWrapper.getPicker()); + } + + private void createChildLbPolicy() { + ChildLoadBalancingPolicy childPolicy = lbPolicyConfig.getLoadBalancingPolicy(); + LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider(); + ConfigOrError lbConfig = + lbProvider + .parseLoadBalancingPolicyConfig( + childPolicy.getEffectiveChildPolicy(childPolicyWrapper.getTarget())); + + LoadBalancer lb = lbProvider.newLoadBalancer(childPolicyWrapper.getHelper()); + lb.handleResolvedAddresses(childLbResolvedAddressFactory.create(lbConfig.getConfig())); + lb.requestConnection(); + } + + /** + * Refreshes cache entry by creating {@link PendingCacheEntry}. When the {@code + * PendingCacheEntry} received data from RLS server, it will replace the data entry if valid + * data still exists. Flow looks like following. + * + *
+     * Timeline                       | async refresh
+     *                                V put new cache (entry2)
+     * entry1: Pending | hasValue | staled  |
+     * entry2:                        | OV* | pending | hasValue | staled |
+     *
+     * OV: old value
+     * 
+ */ + void maybeRefresh() { + synchronized (lock) { + if (pendingCallCache.containsKey(request)) { + // pending already requested + return; + } + final ListenableFuture asyncCall = asyncRlsCall(request); + if (!asyncCall.isDone()) { + pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall)); + } else { + // async call returned finished future is most likely throttled + try { + RouteLookupResponse response = asyncCall.get(); + linkedHashLruCache.cache(request, new DataCacheEntry(request, response)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + BackoffCacheEntry backoffEntry = + new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get()); + linkedHashLruCache.cache(request, backoffEntry); + } + } + } + } + + @Nullable + ChildPolicyWrapper getChildPolicyWrapper() { + return childPolicyWrapper; + } + + String getHeaderData() { + return response.getHeaderData(); + } + + @Override + int getSizeBytes() { + // size of strings and java object overhead, actual memory usage is more than this. + return (response.getTarget().length() + response.getHeaderData().length()) * 2 + 38 * 2; + } + + @Override + boolean isExpired(long now) { + return expireTime <= now; + } + + boolean isStaled(long now) { + return staleTime <= now; + } + + @Override + void cleanup() { + refCountedChildPolicyWrapperFactory.release(childPolicyWrapper); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("request", request) + .add("response", response) + .add("expireTime", expireTime) + .add("staleTime", staleTime) + .add("childPolicyWrapper", childPolicyWrapper) + .toString(); + } + } + + /** + * Implementation of {@link CacheEntry} contains error. This entry will transition to pending + * status when the backoff time is expired. + */ + private final class BackoffCacheEntry extends CacheEntry { + + private final Status status; + private final ScheduledHandle scheduledHandle; + private final BackoffPolicy backoffPolicy; + private final long expireNanos; + private boolean shutdown = false; + + BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) { + super(request); + this.status = checkNotNull(status, "status"); + this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy"); + long delayNanos = backoffPolicy.nextBackoffNanos(); + this.expireNanos = timeProvider.currentTimeNanos() + delayNanos; + this.scheduledHandle = + synchronizationContext.schedule( + new Runnable() { + @Override + public void run() { + transitionToPending(); + } + }, + delayNanos, + TimeUnit.NANOSECONDS, + scheduledExecutorService); + } + + /** Forcefully refreshes cache entry by ignoring the backoff timer. */ + void forceRefresh() { + if (scheduledHandle.isPending()) { + scheduledHandle.cancel(); + transitionToPending(); + } + } + + private void transitionToPending() { + synchronized (lock) { + if (shutdown) { + return; + } + ListenableFuture call = asyncRlsCall(request); + if (!call.isDone()) { + PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy); + pendingCallCache.put(request, pendingEntry); + linkedHashLruCache.invalidate(request); + } else { + try { + RouteLookupResponse response = call.get(); + linkedHashLruCache.cache(request, new DataCacheEntry(request, response)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + linkedHashLruCache.cache( + request, + new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy)); + } + } + } + } + + Status getStatus() { + return status; + } + + @Override + int getSizeBytes() { + return 0; + } + + @Override + boolean isExpired(long now) { + return expireNanos <= now; + } + + @Override + void cleanup() { + if (shutdown) { + return; + } + shutdown = true; + if (!scheduledHandle.isPending()) { + scheduledHandle.cancel(); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("request", request) + .add("status", status) + .add("backoffPolicy", backoffPolicy) + .add("scheduledFuture", scheduledHandle) + .toString(); + } + } + + /** Returns a Builder for {@link CachingRlsLbClient}. */ + public static Builder newBuilder() { + return new Builder(); + } + + /** A Builder for {@link CachingRlsLbClient}. */ + public static final class Builder { + + private Helper helper; + private LbPolicyConfiguration lbPolicyConfig; + private Throttler throttler = new HappyThrottler(); + private ResolvedAddressFactory resolvedAddressFactory; + private TimeProvider timeProvider = TimeProvider.SYSTEM_TIME_PROVIDER; + private EvictionListener evictionListener; + private BackoffPolicy.Provider backoffProvider = new ExponentialBackoffPolicy.Provider(); + + public Builder setHelper(Helper helper) { + this.helper = checkNotNull(helper, "helper"); + return this; + } + + public Builder setLbPolicyConfig(LbPolicyConfiguration lbPolicyConfig) { + this.lbPolicyConfig = checkNotNull(lbPolicyConfig, "lbPolicyConfig"); + return this; + } + + public Builder setThrottler(Throttler throttler) { + this.throttler = checkNotNull(throttler, "throttler"); + return this; + } + + /** + * Sets a factory to create {@link ResolvedAddresses} for child load balancer. + */ + public Builder setResolvedAddressesFactory( + ResolvedAddressFactory resolvedAddressFactory) { + this.resolvedAddressFactory = + checkNotNull(resolvedAddressFactory, "resolvedAddressFactory"); + return this; + } + + public Builder setTimeProvider(TimeProvider timeProvider) { + this.timeProvider = checkNotNull(timeProvider, "timeProvider"); + return this; + } + + public Builder setEvictionListener( + @Nullable EvictionListener evictionListener) { + this.evictionListener = evictionListener; + return this; + } + + public Builder setBackoffProvider(BackoffPolicy.Provider provider) { + this.backoffProvider = checkNotNull(provider, "provider"); + return this; + } + + public CachingRlsLbClient build() { + return new CachingRlsLbClient(this); + } + } + + /** + * When any {@link CacheEntry} is evicted from {@link LruCache}, it performs {@link + * CacheEntry#cleanup()} after original {@link EvictionListener} is finished. + */ + private static final class AutoCleaningEvictionListener + implements EvictionListener { + + private final EvictionListener delegate; + + AutoCleaningEvictionListener( + @Nullable EvictionListener delegate) { + this.delegate = delegate; + } + + @Override + public void onEviction(RouteLookupRequest key, CacheEntry value, EvictionType cause) { + if (delegate != null) { + delegate.onEviction(key, value, cause); + } + // performs cleanup after delegation + value.cleanup(); + } + } + + /** A Throttler never throttles. */ + private static final class HappyThrottler implements Throttler { + + @Override + public boolean shouldThrottle() { + return false; + } + + @Override + public void registerBackendResponse(boolean throttled) { + // no-op + } + } + + /** Implementation of {@link LinkedHashLruCache} for RLS. */ + private static final class RlsAsyncLruCache + extends LinkedHashLruCache { + + RlsAsyncLruCache(long maxEstimatedSizeBytes, + @Nullable EvictionListener evictionListener, + ScheduledExecutorService ses, TimeProvider timeProvider) { + super( + maxEstimatedSizeBytes, + new AutoCleaningEvictionListener(evictionListener), + 1, + TimeUnit.MINUTES, + ses, + timeProvider); + } + + @Override + protected boolean isExpired(RouteLookupRequest key, CacheEntry value, long nowNanos) { + return value.isExpired(); + } + + @Override + protected int estimateSizeOf(RouteLookupRequest key, CacheEntry value) { + return value.getSizeBytes(); + } + + @Override + protected boolean shouldInvalidateEldestEntry( + RouteLookupRequest eldestKey, CacheEntry eldestValue) { + // eldest entry should be evicted if size limit exceeded + return true; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .toString(); + } + } + + /** + * LbStatusListener refreshes {@link BackoffCacheEntry} when lb state is changed to {@link + * ConnectivityState#READY} from {@link ConnectivityState#TRANSIENT_FAILURE}. + */ + private final class BackoffRefreshListener implements ChildLbStatusListener { + + @Nullable + private ConnectivityState prevState = null; + + @Override + public void onStatusChanged(ConnectivityState newState) { + if (prevState == ConnectivityState.TRANSIENT_FAILURE + && newState == ConnectivityState.READY) { + synchronized (lock) { + for (CacheEntry value : linkedHashLruCache.values()) { + if (value instanceof BackoffCacheEntry) { + ((BackoffCacheEntry) value).forceRefresh(); + } + } + } + } + prevState = newState; + } + } + + /** A header will be added when RLS server respond with additional header data. */ + public static final Metadata.Key RLS_DATA_KEY = + Metadata.Key.of("X-Google-RLS-Data", Metadata.ASCII_STRING_MARSHALLER); + + final class RlsPicker extends SubchannelPicker { + + private final RlsRequestFactory requestFactory; + + RlsPicker(RlsRequestFactory requestFactory) { + this.requestFactory = checkNotNull(requestFactory, "requestFactory"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + String[] methodName = args.getMethodDescriptor().getFullMethodName().split("/", 2); + RouteLookupRequest request = + requestFactory.create(methodName[0], methodName[1], args.getHeaders()); + final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request); + + PickSubchannelArgs rlsAppliedArgs = getApplyRlsHeader(args, response); + if (response.hasData()) { + ChildPolicyWrapper childPolicyWrapper = response.getChildPolicyWrapper(); + ConnectivityState connectivityState = + childPolicyWrapper.getConnectivityStateInfo().getState(); + switch (connectivityState) { + case CONNECTING: + return PickResult.withNoResult(); + case IDLE: + // fall through + case READY: + if (childPolicyWrapper.getPicker() == null) { + return PickResult.withNoResult(); + } + return childPolicyWrapper.getPicker().pickSubchannel(rlsAppliedArgs); + case TRANSIENT_FAILURE: + return handleError(rlsAppliedArgs, Status.INTERNAL); + case SHUTDOWN: + default: + return handleError(rlsAppliedArgs, Status.ABORTED); + } + } else if (response.hasError()) { + return handleError(rlsAppliedArgs, response.getStatus()); + } else { + return PickResult.withNoResult(); + } + } + + private PickSubchannelArgs getApplyRlsHeader( + PickSubchannelArgs args, CachedRouteLookupResponse response) { + if (response.getHeaderData() == null || response.getHeaderData().isEmpty()) { + return args; + } + + Metadata headers = new Metadata(); + headers.merge(args.getHeaders()); + headers.put(RLS_DATA_KEY, response.getHeaderData()); + return new PickSubchannelArgsImpl(args.getMethodDescriptor(), headers, args.getCallOptions()); + } + + private PickResult handleError(PickSubchannelArgs args, Status cause) { + RequestProcessingStrategy strategy = + lbPolicyConfig.getRouteLookupConfig().getRequestProcessingStrategy(); + switch (strategy) { + case SYNC_LOOKUP_CLIENT_SEES_ERROR: + return PickResult.withError(cause); + case SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR: + return useFallback(args); + default: + throw new AssertionError("Unknown RequestProcessingStrategy: " + strategy); + } + } + + private ChildPolicyWrapper fallbackChildPolicyWrapper; + + /** Uses Subchannel connected to default target. */ + private PickResult useFallback(PickSubchannelArgs args) { + String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget(); + if (fallbackChildPolicyWrapper == null + || !fallbackChildPolicyWrapper.getTarget().equals(defaultTarget)) { + // TODO(creamsoup) wait until lb is ready + startFallbackChildPolicy(); + } + switch (fallbackChildPolicyWrapper.getConnectivityStateInfo().getState()) { + case CONNECTING: + return PickResult.withNoResult(); + case TRANSIENT_FAILURE: + // fall through + case SHUTDOWN: + return + PickResult + .withError(fallbackChildPolicyWrapper.getConnectivityStateInfo().getStatus()); + case IDLE: + // fall through + case READY: + SubchannelPicker picker = fallbackChildPolicyWrapper.getPicker(); + if (picker == null) { + return PickResult.withNoResult(); + } + return picker.pickSubchannel(args); + default: + throw new AssertionError(); + } + } + + private void startFallbackChildPolicy() { + String defaultTarget = lbPolicyConfig.getRouteLookupConfig().getDefaultTarget(); + fallbackChildPolicyWrapper = refCountedChildPolicyWrapperFactory.createOrGet(defaultTarget); + + LoadBalancerProvider lbProvider = + lbPolicyConfig.getLoadBalancingPolicy().getEffectiveLbProvider(); + final LoadBalancer lb = + lbProvider.newLoadBalancer(fallbackChildPolicyWrapper.getHelper()); + final ConfigOrError lbConfig = + lbProvider + .parseLoadBalancingPolicyConfig( + lbPolicyConfig + .getLoadBalancingPolicy() + .getEffectiveChildPolicy(defaultTarget)); + helper.getSynchronizationContext().execute( + new Runnable() { + @Override + public void run() { + lb.handleResolvedAddresses( + childLbResolvedAddressFactory.create(lbConfig.getConfig())); + lb.requestConnection(); + } + }); + } + + void close() { + if (fallbackChildPolicyWrapper != null) { + refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("target", lbPolicyConfig.getRouteLookupConfig().getLookupService()) + .toString(); + } + } +} diff --git a/rls/src/main/java/io/grpc/rls/internal/RlsProtoData.java b/rls/src/main/java/io/grpc/rls/internal/RlsProtoData.java index 175f726df5..a851ca42c7 100644 --- a/rls/src/main/java/io/grpc/rls/internal/RlsProtoData.java +++ b/rls/src/main/java/io/grpc/rls/internal/RlsProtoData.java @@ -38,7 +38,7 @@ public final class RlsProtoData { /** A request object sent to route lookup service. */ @Immutable - public static final class RouteLookupRequest { + static final class RouteLookupRequest { private final String server; @@ -119,7 +119,7 @@ public final class RlsProtoData { /** A response from route lookup service. */ @Immutable - public static final class RouteLookupResponse { + static final class RouteLookupResponse { private final String target; @@ -218,6 +218,8 @@ public final class RlsProtoData { checkState( lookupService != null && !lookupService.isEmpty(), "lookupService must not be empty"); this.lookupService = lookupService; + checkState( + lookupServiceTimeoutInMillis > 0, "lookupServiceTimeoutInMillis should be positive"); this.lookupServiceTimeoutInMillis = lookupServiceTimeoutInMillis; if (maxAgeInMillis == null) { checkState( @@ -238,9 +240,7 @@ public final class RlsProtoData { this.requestProcessingStrategy = requestProcessingStrategy; checkNotNull(requestProcessingStrategy, "requestProcessingStrategy"); checkState( - !((requestProcessingStrategy == RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR - || requestProcessingStrategy - == RequestProcessingStrategy.ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS) + !(requestProcessingStrategy == RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR && defaultTarget.isEmpty()), "defaultTarget cannot be empty if strategy is %s", requestProcessingStrategy); @@ -303,13 +303,10 @@ public final class RlsProtoData { } /** - * Returns the default target to use. It will be used for request processing strategy - * {@link RequestProcessingStrategy#SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR} if RLS - * returns an error, or strategy {@link - * RequestProcessingStrategy#ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS} if RLS returns an error or - * there is a cache miss in the client. It will also be used if there are no healthy backends - * for an RLS target. Note that requests can be routed only to a subdomain of the original - * target, {@literal e.g.} "us_east_1.cloudbigtable.googleapis.com". + * Returns the default target to use if needed. If nonempty (implies request processing + * strategy SYNC_LOOKUP_DEFAULT_TARGET_ON_ERROR is set), it will be used if RLS returns an + * error. Note that requests can be routed only to a subdomain of the original target, + * {@literal e.g.} "us_east_1.cloudbigtable.googleapis.com". */ public String getDefaultTarget() { return defaultTarget; @@ -379,7 +376,7 @@ public final class RlsProtoData { } /** RequestProcessingStrategy specifies how to process a request when not already in the cache. */ - enum RequestProcessingStrategy { + public enum RequestProcessingStrategy { /** * Query the RLS and process the request using target returned by the lookup. The target will * then be cached and used for processing subsequent requests for the same key. Any errors @@ -394,13 +391,6 @@ public final class RlsProtoData { * strict regional routing requirements should use this strategy. */ SYNC_LOOKUP_CLIENT_SEES_ERROR, - - /** - * Query the RLS asynchronously but respond with the default target. The target in the lookup - * response will then be cached and used for subsequent requests. Services with strict latency - * requirements (but not strict regional routing requirements) should use this strategy. - */ - ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS; } /** diff --git a/rls/src/main/java/io/grpc/rls/internal/Throttler.java b/rls/src/main/java/io/grpc/rls/internal/Throttler.java index 0ba7898eee..e60780ce61 100644 --- a/rls/src/main/java/io/grpc/rls/internal/Throttler.java +++ b/rls/src/main/java/io/grpc/rls/internal/Throttler.java @@ -22,7 +22,7 @@ import javax.annotation.concurrent.ThreadSafe; * A strategy for deciding when to throttle requests at the client. */ @ThreadSafe -public interface Throttler { +interface Throttler { /** * Checks if a given request should be throttled by the client. This should be called for every diff --git a/rls/src/test/java/io/grpc/rls/internal/CachingRlsLbClientTest.java b/rls/src/test/java/io/grpc/rls/internal/CachingRlsLbClientTest.java new file mode 100644 index 0000000000..2a868d1c2b --- /dev/null +++ b/rls/src/test/java/io/grpc/rls/internal/CachingRlsLbClientTest.java @@ -0,0 +1,537 @@ +/* + * Copyright 2020 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.rls.internal; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.base.Converter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.Attributes; +import io.grpc.ConnectivityState; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.LoadBalancerProvider; +import io.grpc.ManagedChannel; +import io.grpc.NameResolver.Factory; +import io.grpc.Status; +import io.grpc.SynchronizationContext; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; +import io.grpc.lookup.v1.RouteLookupServiceGrpc; +import io.grpc.rls.internal.CachingRlsLbClient.CacheEntry; +import io.grpc.rls.internal.CachingRlsLbClient.CachedRouteLookupResponse; +import io.grpc.rls.internal.CachingRlsLbClient.RlsPicker; +import io.grpc.rls.internal.DoNotUseDirectScheduledExecutorService.FakeTimeProvider; +import io.grpc.rls.internal.LbPolicyConfiguration.ChildLoadBalancingPolicy; +import io.grpc.rls.internal.LbPolicyConfiguration.ChildPolicyWrapper; +import io.grpc.rls.internal.LruCache.EvictionListener; +import io.grpc.rls.internal.LruCache.EvictionType; +import io.grpc.rls.internal.RlsProtoConverters.RouteLookupResponseConverter; +import io.grpc.rls.internal.RlsProtoData.GrpcKeyBuilder; +import io.grpc.rls.internal.RlsProtoData.GrpcKeyBuilder.Name; +import io.grpc.rls.internal.RlsProtoData.NameMatcher; +import io.grpc.rls.internal.RlsProtoData.RequestProcessingStrategy; +import io.grpc.rls.internal.RlsProtoData.RouteLookupConfig; +import io.grpc.rls.internal.RlsProtoData.RouteLookupRequest; +import io.grpc.rls.internal.RlsProtoData.RouteLookupResponse; +import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcCleanupRule; +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.net.SocketAddress; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.AdditionalAnswers; +import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +@RunWith(JUnit4.class) +public class CachingRlsLbClientTest { + + private static final RouteLookupConfig ROUTE_LOOKUP_CONFIG = getRouteLookupConfig(); + private static final int SERVER_LATENCY_MILLIS = 10; + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + @Rule + public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule(); + + @Mock + private EvictionListener evictionListener; + @Mock + private SocketAddress socketAddress; + + private final SynchronizationContext syncContext = + new SynchronizationContext(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new RuntimeException(e); + } + }); + private final FakeBackoffProvider fakeBackoffProvider = new FakeBackoffProvider(); + private final ResolvedAddressFactory resolvedAddressFactory = + new ChildLbResolvedAddressFactory( + ImmutableList.of(new EquivalentAddressGroup(socketAddress)), Attributes.EMPTY); + private final TestLoadBalancerProvider lbProvider = new TestLoadBalancerProvider(); + private final DoNotUseDirectScheduledExecutorService fakeScheduledExecutorService = + mock(DoNotUseDirectScheduledExecutorService.class, CALLS_REAL_METHODS); + private final FakeTimeProvider fakeTimeProvider = + fakeScheduledExecutorService.getFakeTimeProvider(); + private final StaticFixedDelayRlsServerImpl rlsServerImpl = + new StaticFixedDelayRlsServerImpl( + TimeUnit.MILLISECONDS.toNanos(SERVER_LATENCY_MILLIS), fakeScheduledExecutorService); + private final ChildLoadBalancingPolicy childLbPolicy = + new ChildLoadBalancingPolicy("target", Collections.emptyMap(), lbProvider); + private final Helper helper = + mock(Helper.class, AdditionalAnswers.delegatesTo(new FakeHelper())); + private final FakeThrottler fakeThrottler = new FakeThrottler(); + private final LbPolicyConfiguration lbPolicyConfiguration = + new LbPolicyConfiguration(ROUTE_LOOKUP_CONFIG, childLbPolicy); + + private CachingRlsLbClient rlsLbClient; + + @Before + public void setUp() throws Exception { + rlsLbClient = + CachingRlsLbClient.newBuilder() + .setBackoffProvider(fakeBackoffProvider) + .setResolvedAddressesFactory(resolvedAddressFactory) + .setEvictionListener(evictionListener) + .setHelper(helper) + .setLbPolicyConfig(lbPolicyConfiguration) + .setThrottler(fakeThrottler) + .setTimeProvider(fakeTimeProvider) + .build(); + } + + @After + public void tearDown() throws Exception { + rlsLbClient.close(); + } + + private CachedRouteLookupResponse getInSyncContext( + final RouteLookupRequest request) + throws ExecutionException, InterruptedException, TimeoutException { + final SettableFuture responseSettableFuture = + SettableFuture.create(); + syncContext.execute(new Runnable() { + @Override + public void run() { + responseSettableFuture.set(rlsLbClient.get(request)); + } + }); + return responseSettableFuture.get(5, TimeUnit.SECONDS); + } + + @Test + public void get_noError_lifeCycle() throws Exception { + InOrder inOrder = inOrder(evictionListener); + RouteLookupRequest routeLookupRequest = + new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.of()); + rlsServerImpl.setLookupTable( + ImmutableMap.of( + routeLookupRequest, + new RouteLookupResponse("target", "header"))); + + // initial request + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + assertThat(resp.isPending()).isTrue(); + + // server response + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + assertThat(resp.hasData()).isTrue(); + + // cache hit for staled entry + fakeTimeProvider.forwardTime(ROUTE_LOOKUP_CONFIG.getStaleAgeInMillis(), TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + assertThat(resp.hasData()).isTrue(); + + // async refresh finishes + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + inOrder + .verify(evictionListener) + .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED)); + + resp = getInSyncContext(routeLookupRequest); + + assertThat(resp.hasData()).isTrue(); + + // existing cache expired + fakeTimeProvider.forwardTime(ROUTE_LOOKUP_CONFIG.getMaxAgeInMillis(), TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + + assertThat(resp.isPending()).isTrue(); + inOrder + .verify(evictionListener) + .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPIRED)); + + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void get_throttledAndRecover() throws Exception { + RouteLookupRequest routeLookupRequest = + new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.of()); + rlsServerImpl.setLookupTable( + ImmutableMap.of( + routeLookupRequest, + new RouteLookupResponse("target", "header"))); + + fakeThrottler.nextResult = true; + fakeBackoffProvider.nextPolicy = createBackoffPolicy(10, TimeUnit.MILLISECONDS); + + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + + assertThat(resp.hasError()).isTrue(); + + fakeTimeProvider.forwardTime(10, TimeUnit.MILLISECONDS); + // initially backed off entry is backed off again + verify(evictionListener) + .onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED)); + + resp = getInSyncContext(routeLookupRequest); + + assertThat(resp.hasError()).isTrue(); + + // let it pass throttler + fakeThrottler.nextResult = false; + fakeTimeProvider.forwardTime(10, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + + assertThat(resp.isPending()).isTrue(); + + // server responses + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + + assertThat(resp.hasData()).isTrue(); + } + + @Test + public void get_updatesLbState() throws Exception { + InOrder inOrder = inOrder(helper); + RouteLookupRequest routeLookupRequest = + new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.of()); + rlsServerImpl.setLookupTable( + ImmutableMap.of( + routeLookupRequest, + new RouteLookupResponse("target", "header"))); + + // valid channel + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + assertThat(resp.isPending()).isTrue(); + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + assertThat(resp.hasData()).isTrue(); + + ArgumentCaptor pickerCaptor = ArgumentCaptor.forClass(SubchannelPicker.class); + ArgumentCaptor stateCaptor = + ArgumentCaptor.forClass(ConnectivityState.class); + inOrder.verify(helper, times(2)) + .updateBalancingState(stateCaptor.capture(), pickerCaptor.capture()); + + assertThat(new HashSet<>(pickerCaptor.getAllValues())).hasSize(1); + assertThat(stateCaptor.getAllValues()) + .containsExactly(ConnectivityState.CONNECTING, ConnectivityState.READY); + assertThat(pickerCaptor.getValue()).isInstanceOf(RlsPicker.class); + + // move backoff further back to only test error behavior + fakeBackoffProvider.nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS); + // try to get invalid + RouteLookupRequest invalidRouteLookupRequest = + new RouteLookupRequest( + "unknown_server", "/doesn/exists", "grpc", ImmutableMap.of()); + CachedRouteLookupResponse errorResp = getInSyncContext(invalidRouteLookupRequest); + assertThat(errorResp.isPending()).isTrue(); + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + errorResp = getInSyncContext(invalidRouteLookupRequest); + assertThat(errorResp.hasError()).isTrue(); + + inOrder.verify(helper, never()) + .updateBalancingState(any(ConnectivityState.class), any(SubchannelPicker.class)); + } + + @Test + public void get_childPolicyWrapper_reusedForSameTarget() throws Exception { + RouteLookupRequest routeLookupRequest = + new RouteLookupRequest("server", "/foo/bar", "grpc", ImmutableMap.of()); + RouteLookupRequest routeLookupRequest2 = + new RouteLookupRequest("server", "/foo/baz", "grpc", ImmutableMap.of()); + rlsServerImpl.setLookupTable( + ImmutableMap.of( + routeLookupRequest, new RouteLookupResponse("target", "header"), + routeLookupRequest2, new RouteLookupResponse("target", "header2"))); + + CachedRouteLookupResponse resp = getInSyncContext(routeLookupRequest); + assertThat(resp.isPending()).isTrue(); + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp = getInSyncContext(routeLookupRequest); + assertThat(resp.hasData()).isTrue(); + assertThat(resp.getHeaderData()).isEqualTo("header"); + + ChildPolicyWrapper childPolicyWrapper = resp.getChildPolicyWrapper(); + assertThat(childPolicyWrapper.getTarget()).isEqualTo("target"); + assertThat(childPolicyWrapper.getPicker()).isNotInstanceOf(RlsPicker.class); + + // request2 has same target, it should reuse childPolicyWrapper + CachedRouteLookupResponse resp2 = getInSyncContext(routeLookupRequest2); + assertThat(resp2.isPending()).isTrue(); + fakeTimeProvider.forwardTime(SERVER_LATENCY_MILLIS, TimeUnit.MILLISECONDS); + + resp2 = getInSyncContext(routeLookupRequest2); + assertThat(resp2.hasData()).isTrue(); + assertThat(resp2.getHeaderData()).isEqualTo("header2"); + assertThat(resp2.getChildPolicyWrapper()).isEqualTo(resp.getChildPolicyWrapper()); + } + + private static RouteLookupConfig getRouteLookupConfig() { + return new RouteLookupConfig( + ImmutableList.of( + new GrpcKeyBuilder( + ImmutableList.of(new Name("service1", "create")), + ImmutableList.of( + new NameMatcher("user", ImmutableList.of("User", "Parent"), true), + new NameMatcher("id", ImmutableList.of("X-Google-Id"), true)))), + /* lookupService= */ "service1", + /* lookupServiceTimeoutInMillis= */ TimeUnit.SECONDS.toMillis(2), + /* maxAgeInMillis= */ TimeUnit.SECONDS.toMillis(300), + /* staleAgeInMillis= */ TimeUnit.SECONDS.toMillis(240), + /* cacheSize= */ 1000, + /* validTargets= */ ImmutableList.of("a valid target"), + /* defaultTarget= */ "us_east_1.cloudbigtable.googleapis.com", + RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR); + } + + private static BackoffPolicy createBackoffPolicy(final long delay, final TimeUnit unit) { + checkArgument(delay > 0, "delay should be positive"); + checkNotNull(unit, "unit"); + return + new BackoffPolicy() { + @Override + public long nextBackoffNanos() { + return TimeUnit.NANOSECONDS.convert(delay, unit); + } + }; + } + + private static final class FakeBackoffProvider implements BackoffPolicy.Provider { + + private BackoffPolicy nextPolicy = createBackoffPolicy(100, TimeUnit.MILLISECONDS); + + @Override + public BackoffPolicy get() { + return nextPolicy; + } + } + + private static final class TestLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 0; + } + + @Override + public String getPolicyName() { + return null; + } + + @Override + public LoadBalancer newLoadBalancer(final Helper helper) { + return new LoadBalancer() { + + @Override + public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { + // TODO: make the picker accessible + helper.updateBalancingState(ConnectivityState.READY, mock(SubchannelPicker.class)); + } + + @Override + public void handleNameResolutionError(final Status error) { + class ErrorPicker extends SubchannelPicker { + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + return PickResult.withError(error); + } + } + + helper.updateBalancingState(ConnectivityState.TRANSIENT_FAILURE, new ErrorPicker()); + } + + @Override + public void shutdown() { + } + }; + } + } + + private static final class StaticFixedDelayRlsServerImpl + extends RouteLookupServiceGrpc.RouteLookupServiceImplBase { + + private static final Converter + REQUEST_CONVERTER = new RlsProtoConverters.RouteLookupRequestConverter(); + private static final Converter + RESPONSE_CONVERTER = new RouteLookupResponseConverter().reverse(); + + private final long responseDelayNano; + private final ScheduledExecutorService scheduledExecutorService; + + private Map lookupTable = ImmutableMap.of(); + + public StaticFixedDelayRlsServerImpl( + long responseDelayNano, ScheduledExecutorService scheduledExecutorService) { + checkArgument(responseDelayNano > 0, "delay must be positive"); + this.responseDelayNano = responseDelayNano; + this.scheduledExecutorService = + checkNotNull(scheduledExecutorService, "scheduledExecutorService"); + } + + private void setLookupTable(Map lookupTable) { + this.lookupTable = checkNotNull(lookupTable, "lookupTable"); + } + + @Override + public void routeLookup(final io.grpc.lookup.v1.RouteLookupRequest request, + final StreamObserver responseObserver) { + ScheduledFuture unused = + scheduledExecutorService.schedule( + new Runnable() { + @Override + public void run() { + RouteLookupResponse response = + lookupTable.get(REQUEST_CONVERTER.convert(request)); + if (response == null) { + responseObserver.onError(new RuntimeException("not found")); + } else { + responseObserver.onNext(RESPONSE_CONVERTER.convert(response)); + responseObserver.onCompleted(); + } + } + }, responseDelayNano, TimeUnit.NANOSECONDS); + } + } + + private final class FakeHelper extends Helper { + + @Override + public ManagedChannel createResolvingOobChannel(String target) { + try { + grpcCleanupRule.register( + InProcessServerBuilder.forName(target) + .addService(rlsServerImpl) + .directExecutor() + .build() + .start()); + } catch (IOException e) { + throw new RuntimeException("cannot create server: " + target, e); + } + return InProcessChannelBuilder.forName(target).directExecutor().build(); + } + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + throw new UnsupportedOperationException(); + } + + @Override + public void updateBalancingState( + @Nonnull ConnectivityState newState, @Nonnull SubchannelPicker newPicker) { + // no-op + } + + @Override + @Deprecated + public Factory getNameResolverFactory() { + throw new UnsupportedOperationException(); + } + + @Override + public String getAuthority() { + throw new UnsupportedOperationException(); + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return fakeScheduledExecutorService; + } + + @Override + public SynchronizationContext getSynchronizationContext() { + return syncContext; + } + } + + private static final class FakeThrottler implements Throttler { + + private boolean nextResult = false; + + @Override + public boolean shouldThrottle() { + return nextResult; + } + + @Override + public void registerBackendResponse(boolean throttled) { + // no-op + } + } +} diff --git a/rls/src/test/java/io/grpc/rls/internal/RlsRequestFactoryTest.java b/rls/src/test/java/io/grpc/rls/internal/RlsRequestFactoryTest.java index cac258d098..238bbebae4 100644 --- a/rls/src/test/java/io/grpc/rls/internal/RlsRequestFactoryTest.java +++ b/rls/src/test/java/io/grpc/rls/internal/RlsRequestFactoryTest.java @@ -66,7 +66,7 @@ public class RlsRequestFactoryTest { /* cacheSize= */ 1000, /* validTargets= */ ImmutableList.of("a valid target"), /* defaultTarget= */ "us_east_1.cloudbigtable.googleapis.com", - RequestProcessingStrategy.ASYNC_LOOKUP_DEFAULT_TARGET_ON_MISS); + RequestProcessingStrategy.SYNC_LOOKUP_CLIENT_SEES_ERROR); private final RlsRequestFactory factory = new RlsRequestFactory(RLS_CONFIG);