mirror of https://github.com/grpc/grpc-java.git
rls: Fix a local and remote race
The local race passes `rlsPicker` to the channel before CachingRlsLbClient is finished constructing. `RlsPicker` can use multiple of the fields not yet initialized. This seems not to be happening in practice, because it appears like it would break things very loudly (e.g., NPE). The remote race seems incredibly hard to hit, because it requires an RPC to complete before the pending data tracking the RPC is added to a map. But with if a system is at 100% CPU utilization, maybe it can be hit. If it is hit, all RPCs needing the impacted cache entry will forever be buffered.
This commit is contained in:
parent
8f45a97be6
commit
aa90768129
|
|
@ -166,7 +166,6 @@ final class CachingRlsLbClient {
|
|||
rlsChannelBuilder.disableServiceConfigLookUp();
|
||||
}
|
||||
rlsChannel = rlsChannelBuilder.build();
|
||||
helper.updateBalancingState(ConnectivityState.CONNECTING, rlsPicker);
|
||||
rlsStub = RouteLookupServiceGrpc.newStub(rlsChannel);
|
||||
childLbResolvedAddressFactory =
|
||||
checkNotNull(builder.resolvedAddressFactory, "resolvedAddressFactory");
|
||||
|
|
@ -292,7 +291,11 @@ final class CachingRlsLbClient {
|
|||
ListenableFuture<RouteLookupResponse> asyncCall = asyncRlsCall(request);
|
||||
if (!asyncCall.isDone()) {
|
||||
pendingEntry = new PendingCacheEntry(request, asyncCall);
|
||||
// Add the entry to the map before adding the Listener, because the listener removes the
|
||||
// entry from the map
|
||||
pendingCallCache.put(request, pendingEntry);
|
||||
// Beware that the listener can run immediately on the current thread
|
||||
asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
|
||||
return CachedRouteLookupResponse.pendingResponse(pendingEntry);
|
||||
} else {
|
||||
// async call returned finished future is most likely throttled
|
||||
|
|
@ -469,17 +472,9 @@ final class CachingRlsLbClient {
|
|||
this.request = checkNotNull(request, "request");
|
||||
this.pendingCall = pendingCall;
|
||||
this.backoffPolicy = backoffPolicy == null ? backoffProvider.get() : backoffPolicy;
|
||||
pendingCall.addListener(
|
||||
new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
handleDoneFuture();
|
||||
}
|
||||
},
|
||||
synchronizationContext);
|
||||
}
|
||||
|
||||
private void handleDoneFuture() {
|
||||
void handleDoneFuture() {
|
||||
synchronized (lock) {
|
||||
pendingCallCache.remove(request);
|
||||
if (pendingCall.isCancelled()) {
|
||||
|
|
@ -602,7 +597,9 @@ final class CachingRlsLbClient {
|
|||
if (!asyncCall.isDone()) {
|
||||
logger.log(ChannelLogLevel.DEBUG,
|
||||
"Async call to rls not yet complete, adding a pending cache entry");
|
||||
pendingCallCache.put(request, new PendingCacheEntry(request, asyncCall));
|
||||
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, asyncCall);
|
||||
pendingCallCache.put(request, pendingEntry);
|
||||
asyncCall.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
|
||||
} else {
|
||||
// async call returned finished future is most likely throttled
|
||||
try {
|
||||
|
|
@ -752,9 +749,10 @@ final class CachingRlsLbClient {
|
|||
if (!call.isDone()) {
|
||||
logger.log(ChannelLogLevel.DEBUG,
|
||||
"Transition to pending RLS call not done, adding a pending cache entry");
|
||||
linkedHashLruCache.invalidate(request);
|
||||
PendingCacheEntry pendingEntry = new PendingCacheEntry(request, call, backoffPolicy);
|
||||
pendingCallCache.put(request, pendingEntry);
|
||||
linkedHashLruCache.invalidate(request);
|
||||
call.addListener(pendingEntry::handleDoneFuture, synchronizationContext);
|
||||
} else {
|
||||
try {
|
||||
logger.log(ChannelLogLevel.DEBUG,
|
||||
|
|
@ -866,7 +864,9 @@ final class CachingRlsLbClient {
|
|||
}
|
||||
|
||||
CachingRlsLbClient build() {
|
||||
return new CachingRlsLbClient(this);
|
||||
CachingRlsLbClient client = new CachingRlsLbClient(this);
|
||||
helper.updateBalancingState(ConnectivityState.CONNECTING, client.rlsPicker);
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue