core: Fully delegate picks to DelayedClientTransport

DelayedClientTransport already had to handle all the cases, so
ManagedChannelImpl picking was acting only as an optimization.
Optimizing DelayedClientTransport to avoid the lock when not queuing
makes ManagedChannelImpl picking entirely redundant, and allows us to
remove the duplicate race-handling logic.

This avoids double-picking when queuing, where ManagedChannelImpl does a
pick, decides to queue, and then DelayedClientTransport re-performs the
pick because it doesn't know which pick version was used. This was
noticed with RLS, which mutates state within the picker.
This commit is contained in:
Eric Anderson 2024-05-11 10:23:20 -07:00
parent d9e09c285b
commit 8844cf7b87
2 changed files with 58 additions and 88 deletions

View File

@ -69,23 +69,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
@GuardedBy("lock") @GuardedBy("lock")
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>(); private Collection<PendingStream> pendingStreams = new LinkedHashSet<>();
/** /** Immutable state needed for picking. 'lock' must be held for writing. */
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered private volatile PickerState pickerState = new PickerState(null, null);
* terminated.
*/
@GuardedBy("lock")
private Status shutdownStatus;
/**
* The last picker that {@link #reprocess} has used. May be set to null when the channel has moved
* to idle.
*/
@GuardedBy("lock")
@Nullable
private SubchannelPicker lastPicker;
@GuardedBy("lock")
private long lastPickerVersion;
/** /**
* Creates a new delayed transport. * Creates a new delayed transport.
@ -139,24 +124,13 @@ final class DelayedClientTransport implements ManagedClientTransport {
try { try {
PickSubchannelArgs args = new PickSubchannelArgsImpl( PickSubchannelArgs args = new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers)); method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
SubchannelPicker picker = null; PickerState state = pickerState;
long pickerVersion = -1;
while (true) { while (true) {
synchronized (lock) { if (state.shutdownStatus != null) {
if (shutdownStatus != null) { return new FailingClientStream(state.shutdownStatus, tracers);
return new FailingClientStream(shutdownStatus, tracers);
} }
if (lastPicker == null) { if (state.lastPicker != null) {
return createPendingStream(args, tracers); PickResult pickResult = state.lastPicker.pickSubchannel(args);
}
// Check for second time through the loop, and whether anything changed
if (picker != null && pickerVersion == lastPickerVersion) {
return createPendingStream(args, tracers);
}
picker = lastPicker;
pickerVersion = lastPickerVersion;
}
PickResult pickResult = picker.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult,
callOptions.isWaitForReady()); callOptions.isWaitForReady());
if (transport != null) { if (transport != null) {
@ -164,8 +138,16 @@ final class DelayedClientTransport implements ManagedClientTransport {
args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(), args.getMethodDescriptor(), args.getHeaders(), args.getCallOptions(),
tracers); tracers);
} }
}
// This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible
// race with reprocess()), we will buffer it. Otherwise, will try with the new picker. // race with reprocess()), we will buffer it. Otherwise, will try with the new picker.
synchronized (lock) {
PickerState newerState = pickerState;
if (state == newerState) {
return createPendingStream(args, tracers);
}
state = newerState;
}
} }
} finally { } finally {
syncContext.drain(); syncContext.drain();
@ -210,10 +192,10 @@ final class DelayedClientTransport implements ManagedClientTransport {
@Override @Override
public final void shutdown(final Status status) { public final void shutdown(final Status status) {
synchronized (lock) { synchronized (lock) {
if (shutdownStatus != null) { if (pickerState.shutdownStatus != null) {
return; return;
} }
shutdownStatus = status; pickerState = pickerState.withShutdownStatus(status);
syncContext.executeLater(new Runnable() { syncContext.executeLater(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -288,8 +270,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
final void reprocess(@Nullable SubchannelPicker picker) { final void reprocess(@Nullable SubchannelPicker picker) {
ArrayList<PendingStream> toProcess; ArrayList<PendingStream> toProcess;
synchronized (lock) { synchronized (lock) {
lastPicker = picker; pickerState = pickerState.withPicker(picker);
lastPickerVersion++;
if (picker == null || !hasPendingStreams()) { if (picker == null || !hasPendingStreams()) {
return; return;
} }
@ -338,7 +319,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter // (which would shutdown the transports and LoadBalancer) because the gap should be shorter
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). // than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second).
syncContext.executeLater(reportTransportNotInUse); syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null && reportTransportTerminated != null) { if (pickerState.shutdownStatus != null && reportTransportTerminated != null) {
syncContext.executeLater(reportTransportTerminated); syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null; reportTransportTerminated = null;
} }
@ -384,7 +365,7 @@ final class DelayedClientTransport implements ManagedClientTransport {
boolean justRemovedAnElement = pendingStreams.remove(this); boolean justRemovedAnElement = pendingStreams.remove(this);
if (!hasPendingStreams() && justRemovedAnElement) { if (!hasPendingStreams() && justRemovedAnElement) {
syncContext.executeLater(reportTransportNotInUse); syncContext.executeLater(reportTransportNotInUse);
if (shutdownStatus != null) { if (pickerState.shutdownStatus != null) {
syncContext.executeLater(reportTransportTerminated); syncContext.executeLater(reportTransportTerminated);
reportTransportTerminated = null; reportTransportTerminated = null;
} }
@ -409,4 +390,32 @@ final class DelayedClientTransport implements ManagedClientTransport {
super.appendTimeoutInsight(insight); super.appendTimeoutInsight(insight);
} }
} }
static final class PickerState {
/**
* The last picker that {@link #reprocess} has used. May be set to null when the channel has
* moved to idle.
*/
@Nullable
final SubchannelPicker lastPicker;
/**
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered
* terminated.
*/
@Nullable
final Status shutdownStatus;
private PickerState(SubchannelPicker lastPicker, Status shutdownStatus) {
this.lastPicker = lastPicker;
this.shutdownStatus = shutdownStatus;
}
public PickerState withPicker(SubchannelPicker newPicker) {
return new PickerState(newPicker, this.shutdownStatus);
}
public PickerState withShutdownStatus(Status newShutdownStatus) {
return new PickerState(this.lastPicker, newShutdownStatus);
}
}
} }

View File

@ -471,57 +471,20 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final class ChannelStreamProvider implements ClientStreamProvider { private final class ChannelStreamProvider implements ClientStreamProvider {
volatile Throttle throttle; volatile Throttle throttle;
private ClientTransport getTransport(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
if (shutdown.get()) {
// If channel is shut down, delayedTransport is also shut down which will fail the stream
// properly.
return delayedTransport;
}
if (pickerCopy == null) {
final class ExitIdleModeForTransport implements Runnable {
@Override
public void run() {
exitIdleMode();
}
}
syncContext.execute(new ExitIdleModeForTransport());
return delayedTransport;
}
// There is no need to reschedule the idle timer here.
//
// pickerCopy != null, which means idle timer has not expired when this method starts.
// Even if idle timer expires right after we grab pickerCopy, and it shuts down LoadBalancer
// which calls Subchannel.shutdown(), the InternalSubchannel will be actually shutdown after
// SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, which gives the caller time to start RPC on it.
//
// In most cases the idle timer is scheduled to fire after the transport has created the
// stream, which would have reported in-use state to the channel that would have cancelled
// the idle timer.
PickResult pickResult = pickerCopy.pickSubchannel(args);
ClientTransport transport = GrpcUtil.getTransportFromPickResult(
pickResult, args.getCallOptions().isWaitForReady());
if (transport != null) {
return transport;
}
return delayedTransport;
}
@Override @Override
public ClientStream newStream( public ClientStream newStream(
final MethodDescriptor<?, ?> method, final MethodDescriptor<?, ?> method,
final CallOptions callOptions, final CallOptions callOptions,
final Metadata headers, final Metadata headers,
final Context context) { final Context context) {
// There is no need to reschedule the idle timer here. If the channel isn't shut down, either
// the delayed transport or a real transport will go in-use and cancel the idle timer.
if (!retryEnabled) { if (!retryEnabled) {
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, 0, /* isTransparentRetry= */ false); callOptions, headers, 0, /* isTransparentRetry= */ false);
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach(); Context origContext = context.attach();
try { try {
return transport.newStream(method, headers, callOptions, tracers); return delayedTransport.newStream(method, headers, callOptions, tracers);
} finally { } finally {
context.detach(origContext); context.detach(origContext);
} }
@ -562,11 +525,9 @@ final class ManagedChannelImpl extends ManagedChannel implements
CallOptions newOptions = callOptions.withStreamTracerFactory(factory); CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers( ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
newOptions, newHeaders, previousAttempts, isTransparentRetry); newOptions, newHeaders, previousAttempts, isTransparentRetry);
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach(); Context origContext = context.attach();
try { try {
return transport.newStream(method, newHeaders, newOptions, tracers); return delayedTransport.newStream(method, newHeaders, newOptions, tracers);
} finally { } finally {
context.detach(origContext); context.detach(origContext);
} }