core: fix thread safety in NameResolver.Listener

This commit is contained in:
ZHANG Dapeng 2019-03-11 14:35:49 -07:00 committed by GitHub
parent 5ba6619ce5
commit 97ff7fe50a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 49 additions and 43 deletions

View File

@ -234,8 +234,10 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final ChannelTracer channelTracer; private final ChannelTracer channelTracer;
private final ChannelLogger channelLogger; private final ChannelLogger channelLogger;
private final InternalChannelz channelz; private final InternalChannelz channelz;
// Must be mutated and read from syncContext
@CheckForNull @CheckForNull
private Boolean haveBackends; // a flag for doing channel tracing when flipped private Boolean haveBackends; // a flag for doing channel tracing when flipped
// Must be mutated and read from syncContext
@Nullable @Nullable
private Map<String, ?> lastServiceConfig; // used for channel tracing when value changed private Map<String, ?> lastServiceConfig; // used for channel tracing when value changed
@ -1277,22 +1279,22 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override @Override
public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) { public void onAddresses(final List<EquivalentAddressGroup> servers, final Attributes config) {
channelLogger.log(
ChannelLogLevel.DEBUG, "Resolved address: {0}, config={1}", servers, config);
if (haveBackends == null || !haveBackends) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
haveBackends = true;
}
final Map<String, ?> serviceConfig = config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
if (serviceConfig != null && !serviceConfig.equals(lastServiceConfig)) {
channelLogger.log(ChannelLogLevel.INFO, "Service config changed");
lastServiceConfig = serviceConfig;
}
final class NamesResolved implements Runnable { final class NamesResolved implements Runnable {
@Override @Override
public void run() { public void run() {
channelLogger.log(
ChannelLogLevel.DEBUG, "Resolved address: {0}, config={1}", servers, config);
if (haveBackends == null || !haveBackends) {
channelLogger.log(ChannelLogLevel.INFO, "Address resolved: {0}", servers);
haveBackends = true;
}
final Map<String, ?> serviceConfig =
config.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG);
if (serviceConfig != null && !serviceConfig.equals(lastServiceConfig)) {
channelLogger.log(ChannelLogLevel.INFO, "Service config changed");
lastServiceConfig = serviceConfig;
}
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
return; return;
@ -1315,7 +1317,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
} }
if (servers.isEmpty() && !helper.lb.canHandleEmptyAddressListFromNameResolution()) { if (servers.isEmpty() && !helper.lb.canHandleEmptyAddressListFromNameResolution()) {
onError(Status.UNAVAILABLE.withDescription( handleErrorInSyncContext(Status.UNAVAILABLE.withDescription(
"Name resolver " + resolver + " returned an empty list")); "Name resolver " + resolver + " returned an empty list"));
} else { } else {
helper.lb.handleResolvedAddressGroups(servers, config); helper.lb.handleResolvedAddressGroups(servers, config);
@ -1329,42 +1331,46 @@ final class ManagedChannelImpl extends ManagedChannel implements
@Override @Override
public void onError(final Status error) { public void onError(final Status error) {
checkArgument(!error.isOk(), "the error status must not be OK"); checkArgument(!error.isOk(), "the error status must not be OK");
final class NameResolverErrorHandler implements Runnable {
@Override
public void run() {
handleErrorInSyncContext(error);
}
}
syncContext.execute(new NameResolverErrorHandler());
}
private void handleErrorInSyncContext(Status error) {
logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}", logger.log(Level.WARNING, "[{0}] Failed to resolve name. status={1}",
new Object[] {getLogId(), error}); new Object[] {getLogId(), error});
if (haveBackends == null || haveBackends) { if (haveBackends == null || haveBackends) {
channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error); channelLogger.log(ChannelLogLevel.WARNING, "Failed to resolve name: {0}", error);
haveBackends = false; haveBackends = false;
} }
final class NameResolverErrorHandler implements Runnable { // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
@Override if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
public void run() { return;
// Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
return;
}
helper.lb.handleNameResolutionError(error);
if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
// TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
// want to reset the backoff interval upon repeated onError() calls
return;
}
if (nameResolverBackoffPolicy == null) {
nameResolverBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Scheduling DNS resolution backoff for {0} ns", delayNanos);
scheduledNameResolverRefresh =
syncContext.schedule(
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
transportFactory .getScheduledExecutorService());
}
} }
helper.lb.handleNameResolutionError(error);
syncContext.execute(new NameResolverErrorHandler()); if (scheduledNameResolverRefresh != null && scheduledNameResolverRefresh.isPending()) {
// The name resolver may invoke onError multiple times, but we only want to
// schedule one backoff attempt
// TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
// want to reset the backoff interval upon repeated onError() calls
return;
}
if (nameResolverBackoffPolicy == null) {
nameResolverBackoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
channelLogger.log(
ChannelLogLevel.DEBUG,
"Scheduling DNS resolution backoff for {0} ns", delayNanos);
scheduledNameResolverRefresh =
syncContext.schedule(
new DelayedNameResolverRefresh(), delayNanos, TimeUnit.NANOSECONDS,
transportFactory .getScheduledExecutorService());
} }
} }