rls: Synchronization fixes in CachingRlsLbClient

This started with combining handleNewRequest with asyncRlsCall, but that
emphasized pre-existing synchronization issues and trying to fix those
exposed others. It was hard to split this into smaller commits because
they were interconnected.

handleNewRequest was combined with asyncRlsCall to use a single code
flow for handling the completed future while also failing the pick
immediately for thottled requests. That flow was then reused for
refreshing after backoff and data stale. It no longer optimizes the RPC
completing immediately because that would not happen in real life; it
only happens in tests because of inprocess+directExecutor() and we don't
want to test a different code flow in tests. This did require updating
some of the tests.

One small behavior change to share the combined asyncRlsCall with
backoff is we now always invalidate an entry after the backoff.
Previously the code could replace the entry with its new value in one
operation if the asyncRlsCall future completed immediately. That only
mattered to a single test which now sees an EXPLICIT eviction.

SynchronizationContext used to provide atomic scheduling in
BackoffCacheEntry, but it was not guaranteeing the scheduledRunnable was
only accessed from the sync context. The same was true for calling up
the LB tree with `updateBalancingState()`. In particular, adding entries
to the cache during a pick could evict entries without running the
cleanup methods within the context, as well as the RLS channel
transitioning from TRANSIENT_FAILURE to READY. This was replaced with
using a bare Future with a lock to provide atomicity.

BackoffCacheEntry no longer uses the current time and instead waits for
the backoff timer to actually run before considering itself expired.
Previously, it could race with periodic cleanup and get evicted before
the timer ran, which would cancel the timer and forget the
backoffPolicy. Since the backoff timer invalidates the entry, it is
likely useless to claim it ever expires, but that level of behavior was
preserved since I didn't look into the LRU cache deeply.

propagateRlsError() was moved out of asyncRlsCall because it was not
guaranteed to run after the cache was updated. If something was already
running on the sync context, then RPCs would hang until another update
caused updateBalancingState().

Some methods were moved out of the CacheEntry classes to avoid
shared-state mutation in constructors. But if we add something in a
factory method, we want to remove it in a sibling method to the factory
method, so additional code is moved for symmetry. Moving shared-state
mutation ouf of constructors is important because 1) it is surprising
and 2) ErrorProne doesn't validate locking within constructors. In
general, having shared-state methods in CacheEntries also has the
problem that ErrorProne can't validate CachingRlsLbClient calls to
CacheEntry. ErrorProne can't know that "lock" is already held because
CacheEntry could have been created from a _different instance_ of
CachingRlsLbClient and there's no way for us to let ErrorProne prove it
is the same instance of "lock".

DataCacheEntry still mutates global state that requires a lock in its
constructor, but it is less severe of a problem and it requires more
choices to address.
This commit is contained in:
Eric Anderson 2024-04-03 12:22:04 -07:00 committed by GitHub
parent 58de563fa4
commit 6e97b180b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 134 additions and 227 deletions

View File

@ -24,7 +24,9 @@ import com.google.common.base.Converter;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Ticker;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ChannelLogger;
import io.grpc.ChannelLogger.ChannelLogLevel;
@ -38,8 +40,6 @@ import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.lookup.v1.RouteLookupServiceGrpc;
@ -54,7 +54,6 @@ import io.grpc.rls.RlsProtoConverters.RouteLookupResponseConverter;
import io.grpc.rls.RlsProtoData.RouteLookupConfig;
import io.grpc.rls.RlsProtoData.RouteLookupRequest;
import io.grpc.rls.RlsProtoData.RouteLookupResponse;
import io.grpc.rls.Throttler.ThrottledException;
import io.grpc.stub.StreamObserver;
import io.grpc.util.ForwardingLoadBalancerHelper;
import java.net.URI;
@ -62,6 +61,7 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
@ -96,7 +96,6 @@ final class CachingRlsLbClient {
@GuardedBy("lock")
private final Map<RouteLookupRequest, PendingCacheEntry> pendingCallCache = new HashMap<>();
private final SynchronizationContext synchronizationContext;
private final ScheduledExecutorService scheduledExecutorService;
private final Ticker ticker;
private final Throttler throttler;
@ -118,7 +117,6 @@ final class CachingRlsLbClient {
private CachingRlsLbClient(Builder builder) {
helper = new RlsLbHelper(checkNotNull(builder.helper, "helper"));
scheduledExecutorService = helper.getScheduledExecutorService();
synchronizationContext = helper.getSynchronizationContext();
lbPolicyConfig = checkNotNull(builder.lbPolicyConfig, "lbPolicyConfig");
RouteLookupConfig rlsConfig = lbPolicyConfig.getRouteLookupConfig();
maxAgeNanos = rlsConfig.maxAgeInNanos();
@ -129,10 +127,11 @@ final class CachingRlsLbClient {
linkedHashLruCache =
new RlsAsyncLruCache(
rlsConfig.cacheSizeBytes(),
builder.evictionListener,
new AutoCleaningEvictionListener(builder.evictionListener),
scheduledExecutorService,
ticker,
lock);
lock,
helper);
logger = helper.getChannelLogger();
String serverHost = null;
try {
@ -193,15 +192,19 @@ final class CachingRlsLbClient {
serverName, status.getCode(), status.getDescription()));
}
@CheckReturnValue
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
/** Populates async cache entry for new request. */
@GuardedBy("lock")
private CachedRouteLookupResponse asyncRlsCall(
RouteLookupRequest request, @Nullable BackoffPolicy backoffPolicy) {
logger.log(ChannelLogLevel.DEBUG, "Making an async call to RLS");
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
if (throttler.shouldThrottle()) {
logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
response.setException(new ThrottledException());
return response;
// Cache updated, but no need to call updateBalancingState because no RPCs were queued waiting
// on this result
return CachedRouteLookupResponse.backoffEntry(createBackOffEntry(
request, Status.RESOURCE_EXHAUSTED.withDescription("RLS throttled"), backoffPolicy));
}
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
@ -219,7 +222,6 @@ final class CachingRlsLbClient {
logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
response.setException(t);
throttler.registerBackendResponse(true);
helper.propagateRlsError();
}
@Override
@ -228,7 +230,8 @@ final class CachingRlsLbClient {
throttler.registerBackendResponse(false);
}
});
return response;
return CachedRouteLookupResponse.pendingResponse(
createPendingEntry(request, response, backoffPolicy));
}
/**
@ -245,7 +248,11 @@ final class CachingRlsLbClient {
cacheEntry = linkedHashLruCache.read(request);
if (cacheEntry == null) {
logger.log(ChannelLogLevel.DEBUG, "No cache entry found, making a new lrs request");
return handleNewRequest(request);
PendingCacheEntry pendingEntry = pendingCallCache.get(request);
if (pendingEntry != null) {
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
}
return asyncRlsCall(request, /* backoffPolicy= */ null);
}
if (cacheEntry instanceof DataCacheEntry) {
@ -276,46 +283,86 @@ final class CachingRlsLbClient {
}
}
/**
* Populates async cache entry for new request. This is only methods directly modifies the cache,
* any status change is happening via event (async request finished, timed out, etc) in {@link
* PendingCacheEntry}, {@link DataCacheEntry} and {@link BackoffCacheEntry}.
*/
private CachedRouteLookupResponse handleNewRequest(RouteLookupRequest request) {
void requestConnection() {
rlsChannel.getState(true);
}
@GuardedBy("lock")
private PendingCacheEntry createPendingEntry(
RouteLookupRequest request,
ListenableFuture<RouteLookupResponse> pendingCall,
@Nullable BackoffPolicy backoffPolicy) {
PendingCacheEntry entry = new PendingCacheEntry(request, pendingCall, backoffPolicy);
// Add the entry to the map before adding the Listener, because the listener removes the
// entry from the map
pendingCallCache.put(request, entry);
// Beware that the listener can run immediately on the current thread
pendingCall.addListener(() -> pendingRpcComplete(entry), MoreExecutors.directExecutor());
return entry;
}
private void pendingRpcComplete(PendingCacheEntry entry) {
synchronized (lock) {
PendingCacheEntry pendingEntry = pendingCallCache.get(request);
if (pendingEntry != null) {
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
boolean clientClosed = pendingCallCache.remove(entry.request) == null;
if (clientClosed) {
return;
}
ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
pendingEntry = new PendingCacheEntry(request, asyncCall);
// Add the entry to the map before adding the Listener, because the listener removes the
// entry from the map
pendingCallCache.put(request, pendingEntry);
// Beware that the listener can run immediately on the current thread
asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
} else {
// async call returned finished future is most likely throttled
try {
RouteLookupResponse response = asyncCall.get();
DataCacheEntry dataEntry = new DataCacheEntry(request, response);
linkedHashLruCache.cacheAndClean(request, dataEntry);
return CachedRouteLookupResponse.dataEntry(dataEntry);
} catch (Exception e) {
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cacheAndClean(request, backoffEntry);
return CachedRouteLookupResponse.backoffEntry(backoffEntry);
}
try {
createDataEntry(entry.request, Futures.getDone(entry.pendingCall));
// Cache updated. DataCacheEntry constructor indirectly calls updateBalancingState() to
// reattempt picks when the child LB is done connecting
} catch (Exception e) {
createBackOffEntry(entry.request, Status.fromThrowable(e), entry.backoffPolicy);
// Cache updated. updateBalancingState() to reattempt picks
helper.propagateRlsError();
}
}
}
void requestConnection() {
rlsChannel.getState(true);
@GuardedBy("lock")
private DataCacheEntry createDataEntry(
RouteLookupRequest request, RouteLookupResponse routeLookupResponse) {
logger.log(
ChannelLogLevel.DEBUG,
"Transition to data cache: routeLookupResponse={0}",
routeLookupResponse);
DataCacheEntry entry = new DataCacheEntry(request, routeLookupResponse);
// Constructor for DataCacheEntry causes updateBalancingState, but the picks can't happen until
// this cache update because the lock is held
linkedHashLruCache.cacheAndClean(request, entry);
return entry;
}
@GuardedBy("lock")
private BackoffCacheEntry createBackOffEntry(
RouteLookupRequest request, Status status, @Nullable BackoffPolicy backoffPolicy) {
logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
if (backoffPolicy == null) {
backoffPolicy = backoffProvider.get();
}
long delayNanos = backoffPolicy.nextBackoffNanos();
BackoffCacheEntry entry = new BackoffCacheEntry(request, status, backoffPolicy);
// Lock is held, so the task can't execute before the assignment
entry.scheduledFuture = scheduledExecutorService.schedule(
() -> refreshBackoffEntry(entry), delayNanos, TimeUnit.NANOSECONDS);
linkedHashLruCache.cacheAndClean(request, entry);
logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
delayNanos);
return entry;
}
private void refreshBackoffEntry(BackoffCacheEntry entry) {
synchronized (lock) {
// This checks whether the task has been cancelled and prevents a second execution.
if (!entry.scheduledFuture.cancel(false)) {
// Future was previously cancelled
return;
}
logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
linkedHashLruCache.invalidate(entry.request);
asyncRlsCall(entry.request, entry.backoffPolicy);
}
}
private static final class RlsLbHelper extends ForwardingLoadBalancerHelper {
@ -353,7 +400,8 @@ final class CachingRlsLbClient {
}
void triggerPendingRpcProcessing() {
super.updateBalancingState(state, picker);
helper.getSynchronizationContext().execute(
() -> super.updateBalancingState(state, picker));
}
}
@ -455,60 +503,19 @@ final class CachingRlsLbClient {
}
/** A pending cache entry when the async RouteLookup RPC is still on the fly. */
final class PendingCacheEntry {
static final class PendingCacheEntry {
private final ListenableFuture<RouteLookupResponse> pendingCall;
private final RouteLookupRequest request;
@Nullable
private final BackoffPolicy backoffPolicy;
PendingCacheEntry(
RouteLookupRequest request, ListenableFuture<RouteLookupResponse> pendingCall) {
this(request, pendingCall, null);
}
PendingCacheEntry(
RouteLookupRequest request,
ListenableFuture<RouteLookupResponse> pendingCall,
@Nullable BackoffPolicy backoffPolicy) {
this.request = checkNotNull(request, "request");
this.pendingCall = pendingCall;
this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy;
}
void handleDoneFuture() {
synchronized (lock) {
pendingCallCache.remove(request);
if (pendingCall.isCancelled()) {
return;
}
try {
transitionToDataEntry(pendingCall.get());
} catch (Exception e) {
if (e instanceof ThrottledException) {
transitionToBackOff(Status.RESOURCE_EXHAUSTED.withCause(e));
} else {
transitionToBackOff(Status.fromThrowable(e));
}
}
}
}
private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) {
synchronized (lock) {
logger.log(
ChannelLogLevel.DEBUG,
"Transition to data cache: routeLookupResponse={0}",
routeLookupResponse);
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, routeLookupResponse));
}
}
private void transitionToBackOff(Status status) {
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
linkedHashLruCache.cacheAndClean(request,
new BackoffCacheEntry(request, status, backoffPolicy));
}
this.pendingCall = checkNotNull(pendingCall, "pendingCall");
this.backoffPolicy = backoffPolicy;
}
@Override
@ -541,10 +548,6 @@ final class CachingRlsLbClient {
protected long getMinEvictionTime() {
return 0L;
}
protected void triggerPendingRpcProcessing() {
helper.triggerPendingRpcProcessing();
}
}
/** Implementation of {@link CacheEntry} contains valid data. */
@ -584,38 +587,14 @@ final class CachingRlsLbClient {
* </pre>
*/
void maybeRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to maybe refresh cache entry");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Lock to maybe refresh cache entry acquired");
synchronized (lock) { // Lock is already held, but ErrorProne can't tell
if (pendingCallCache.containsKey(request)) {
// pending already requested
logger.log(ChannelLogLevel.DEBUG,
"A pending refresh request already created, no need to proceed with refresh");
return;
}
final ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
if (!asyncCall.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Async call to rls not yet complete, adding a pending cache entry");
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall);
pendingCallCache.put(request, pendingEntry);
asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
} else {
// async call returned finished future is most likely throttled
try {
logger.log(ChannelLogLevel.DEBUG, "Waiting for RLS call to return");
RouteLookupResponse response = asyncCall.get();
logger.log(ChannelLogLevel.DEBUG, "RLS call to returned");
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG, "RLS call failed, adding a backoff entry", e);
BackoffCacheEntry backoffEntry =
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffProvider.get());
linkedHashLruCache.cacheAndClean(request, backoffEntry);
}
}
asyncRlsCall(request, /* backoffPolicy= */ null);
}
}
@ -701,75 +680,13 @@ final class CachingRlsLbClient {
private final class BackoffCacheEntry extends CacheEntry {
private final Status status;
private final ScheduledHandle scheduledHandle;
private final BackoffPolicy backoffPolicy;
private final long expireNanos;
private boolean shutdown = false;
private Future<?> scheduledFuture;
BackoffCacheEntry(RouteLookupRequest request, Status status, BackoffPolicy backoffPolicy) {
super(request);
this.status = checkNotNull(status, "status");
this.backoffPolicy = checkNotNull(backoffPolicy, "backoffPolicy");
long delayNanos = backoffPolicy.nextBackoffNanos();
this.expireNanos = ticker.read() + delayNanos;
this.scheduledHandle =
synchronizationContext.schedule(
new Runnable() {
@Override
public void run() {
transitionToPending();
}
},
delayNanos,
TimeUnit.NANOSECONDS,
scheduledExecutorService);
logger.log(ChannelLogLevel.DEBUG, "BackoffCacheEntry created with a delay of {0} nanos",
delayNanos);
}
/** Forcefully refreshes cache entry by ignoring the backoff timer. */
void forceRefresh() {
logger.log(ChannelLogLevel.DEBUG, "Forcefully refreshing cache entry");
if (scheduledHandle.isPending()) {
scheduledHandle.cancel();
transitionToPending();
}
}
private void transitionToPending() {
logger.log(ChannelLogLevel.DEBUG, "Acquiring lock to transition to pending");
synchronized (lock) {
logger.log(ChannelLogLevel.DEBUG, "Acquired lock to transition to pending");
if (shutdown) {
logger.log(ChannelLogLevel.DEBUG, "Already shut down, not transitioning to pending");
return;
}
logger.log(ChannelLogLevel.DEBUG, "Calling RLS for transition to pending");
ListenableFuture<RouteLookupResponse> call = asyncRlsCall(request);
if (!call.isDone()) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call not done, adding a pending cache entry");
linkedHashLruCache.invalidate(request);
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
pendingCallCache.put(request, pendingEntry);
call.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
} else {
try {
logger.log(ChannelLogLevel.DEBUG,
"Waiting for transition to pending RLS call response");
RouteLookupResponse response = call.get();
linkedHashLruCache.cacheAndClean(request, new DataCacheEntry(request, response));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(ChannelLogLevel.DEBUG,
"Transition to pending RLS call failed, creating a backoff entry", e);
linkedHashLruCache.cacheAndClean(
request,
new BackoffCacheEntry(request, Status.fromThrowable(e), backoffPolicy));
}
}
}
}
Status getStatus() {
@ -783,18 +700,12 @@ final class CachingRlsLbClient {
@Override
boolean isExpired(long now) {
return expireNanos - now <= 0;
return scheduledFuture.isDone();
}
@Override
void cleanup() {
if (shutdown) {
return;
}
shutdown = true;
if (!scheduledHandle.isPending()) {
scheduledHandle.cancel();
}
scheduledFuture.cancel(false);
}
@Override
@ -911,18 +822,20 @@ final class CachingRlsLbClient {
/** Implementation of {@link LinkedHashLruCache} for RLS. */
private static final class RlsAsyncLruCache
extends LinkedHashLruCache<RouteLookupRequest, CacheEntry> {
private final RlsLbHelper helper;
RlsAsyncLruCache(long maxEstimatedSizeBytes,
@Nullable EvictionListener<RouteLookupRequest, CacheEntry> evictionListener,
ScheduledExecutorService ses, Ticker ticker, Object lock) {
ScheduledExecutorService ses, Ticker ticker, Object lock, RlsLbHelper helper) {
super(
maxEstimatedSizeBytes,
new AutoCleaningEvictionListener(evictionListener),
evictionListener,
1,
TimeUnit.MINUTES,
ses,
ticker,
lock);
this.helper = checkNotNull(helper, "helper");
}
@Override
@ -951,7 +864,7 @@ final class CachingRlsLbClient {
// force cleanup if new entry pushed cache over max size (in bytes)
if (fitToLimit()) {
value.triggerPendingRpcProcessing();
helper.triggerPendingRpcProcessing();
}
return newEntry;
}
@ -977,7 +890,7 @@ final class CachingRlsLbClient {
logger.log(ChannelLogLevel.DEBUG, "Lock acquired for refreshing backoff cache entries");
for (CacheEntry value : linkedHashLruCache.values()) {
if (value instanceof BackoffCacheEntry) {
((BackoffCacheEntry) value).forceRefresh();
refreshBackoffEntry((BackoffCacheEntry) value);
}
}
}
@ -1077,9 +990,11 @@ final class CachingRlsLbClient {
// GuardedBy CachingRlsLbClient.lock
void close() {
logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
synchronized (lock) { // Lock is already held, but ErrorProne can't tell
logger.log(ChannelLogLevel.DEBUG, "Closing RLS picker");
if (fallbackChildPolicyWrapper != null) {
refCountedChildPolicyWrapperFactory.release(fallbackChildPolicyWrapper);
}
}
}

View File

@ -42,27 +42,4 @@ interface Throttler {
* @param throttled specifies whether the request was throttled by the backend.
*/
void registerBackendResponse(boolean throttled);
/**
* A ThrottledException indicates the call is throttled. This exception is meant to be used by
* caller of {@link Throttler}, the implementation of Throttler should <strong>not</strong> throw
* this exception when {@link #shouldThrottle()} is called.
*/
final class ThrottledException extends RuntimeException {
static final long serialVersionUID = 1L;
public ThrottledException() {
super();
}
public ThrottledException(String s) {
super(s);
}
@Override
public synchronized Throwable fillInStackTrace() {
return this;
}
}
}

View File

@ -301,7 +301,7 @@ public class CachingRlsLbClientTest {
fakeClock.forwardTime(10, TimeUnit.MILLISECONDS);
// initially backed off entry is backed off again
verify(evictionListener)
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.REPLACED));
.onEviction(eq(routeLookupRequest), any(CacheEntry.class), eq(EvictionType.EXPLICIT));
resp = getInSyncContext(routeLookupRequest);

View File

@ -186,13 +186,18 @@ public class RlsLoadBalancerTest {
Metadata headers = new Metadata();
PickSubchannelArgsImpl fakeSearchMethodArgs =
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT);
// Warm-up pick; will be queued
PickResult res = picker.pickSubchannel(fakeSearchMethodArgs);
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getSubchannel()).isNull();
// Cache is warm, but still unconnected
picker.pickSubchannel(fakeSearchMethodArgs); // Will create the subchannel
FakeSubchannel subchannel = subchannels.peek();
assertThat(subchannel).isNotNull();
// Ensure happy path is unaffected
subchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
PickResult res = picker.pickSubchannel(fakeSearchMethodArgs);
res = picker.pickSubchannel(fakeSearchMethodArgs);
assertThat(res.getStatus().getCode()).isEqualTo(Status.Code.OK);
// Check on conversion
@ -213,7 +218,12 @@ public class RlsLoadBalancerTest {
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
// Warm-up pick; will be queued
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getSubchannel()).isNull();
// Cache is warm, but still unconnected
res = picker.pickSubchannel(searchSubchannelArgs);
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
inOrder.verify(helper, atLeast(0))
.updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));
@ -323,7 +333,12 @@ public class RlsLoadBalancerTest {
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Metadata headers = new Metadata();
// Warm-up pick; will be queued
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().isOk()).isTrue();
assertThat(res.getSubchannel()).isNull();
// Cache is warm, but still unconnected
res = picker.pickSubchannel(searchSubchannelArgs);
inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class));
inOrder.verify(helper, atLeast(0))
.updateBalancingState(eq(ConnectivityState.CONNECTING), any(SubchannelPicker.class));