core: Use SyncContext for InProcessTransport listener callbacks to avoid deadlocks

Fixes deadlocks caused by client and server listeners being called in a synchronized block

Also support unary calls returning null values

Fixes #3084
This commit is contained in:
Larry Safran 2022-06-30 13:41:36 -07:00 committed by GitHub
parent c0790283ec
commit 74137b0978
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 169 additions and 120 deletions

View File

@ -40,6 +40,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.SecurityLevel;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientStreamListener.RpcProgress;
@ -106,6 +107,18 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private List<ServerStreamTracer.Factory> serverStreamTracerFactories;
private final Attributes attributes;
private Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
if (e instanceof Error) {
throw new Error(e);
}
throw new RuntimeException(e);
}
};
@GuardedBy("this")
private final InUseStateAggregator<InProcessStream> inUseState =
new InUseStateAggregator<InProcessStream>() {
@ -407,8 +420,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private class InProcessServerStream implements ServerStream {
final StatsTraceContext statsTraceCtx;
@GuardedBy("this")
// All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
private ClientStreamListener clientStreamListener;
private final SynchronizationContext syncContext =
new SynchronizationContext(uncaughtExceptionHandler);
@GuardedBy("this")
private int clientRequested;
@GuardedBy("this")
@ -444,10 +459,11 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
if (onReady) {
synchronized (this) {
if (!closed) {
clientStreamListener.onReady();
syncContext.executeLater(() -> clientStreamListener.onReady());
}
}
}
syncContext.drain();
}
// This method is the only reason we have to synchronize field accesses.
@ -456,28 +472,36 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
*
* @return whether onReady should be called on the server
*/
private synchronized boolean clientRequested(int numMessages) {
if (closed) {
return false;
private boolean clientRequested(int numMessages) {
boolean previouslyReady;
boolean nowReady;
synchronized (this) {
if (closed) {
return false;
}
previouslyReady = clientRequested > 0;
clientRequested += numMessages;
while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
clientRequested--;
StreamListener.MessageProducer producer = clientReceiveQueue.poll();
syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
}
if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
closed = true;
clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
Status notifyStatus = this.clientNotifyStatus;
Metadata notifyTrailers = this.clientNotifyTrailers;
syncContext.executeLater(() ->
clientStreamListener.closed(notifyStatus, RpcProgress.PROCESSED, notifyTrailers));
}
nowReady = clientRequested > 0;
}
boolean previouslyReady = clientRequested > 0;
clientRequested += numMessages;
while (clientRequested > 0 && !clientReceiveQueue.isEmpty()) {
clientRequested--;
clientStreamListener.messagesAvailable(clientReceiveQueue.poll());
}
// Attempt being reentrant-safe
if (closed) {
return false;
}
if (clientReceiveQueue.isEmpty() && clientNotifyStatus != null) {
closed = true;
clientStream.statsTraceCtx.clientInboundTrailers(clientNotifyTrailers);
clientStream.statsTraceCtx.streamClosed(clientNotifyStatus);
clientStreamListener.closed(
clientNotifyStatus, RpcProgress.PROCESSED, clientNotifyTrailers);
}
boolean nowReady = clientRequested > 0;
syncContext.drain();
return !previouslyReady && nowReady;
}
@ -486,22 +510,26 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
@Override
public synchronized void writeMessage(InputStream message) {
if (closed) {
return;
}
statsTraceCtx.outboundMessage(outboundSeqNo);
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (clientRequested > 0) {
clientRequested--;
clientStreamListener.messagesAvailable(producer);
} else {
clientReceiveQueue.add(producer);
public void writeMessage(InputStream message) {
synchronized (this) {
if (closed) {
return;
}
statsTraceCtx.outboundMessage(outboundSeqNo);
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
clientStream.statsTraceCtx.inboundMessage(outboundSeqNo);
clientStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (clientRequested > 0) {
clientRequested--;
syncContext.executeLater(() -> clientStreamListener.messagesAvailable(producer));
} else {
clientReceiveQueue.add(producer);
}
}
syncContext.drain();
}
@Override
@ -540,8 +568,9 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
clientStream.statsTraceCtx.clientInboundHeaders();
clientStreamListener.headersRead(headers);
syncContext.executeLater(() -> clientStreamListener.headersRead(headers));
}
syncContext.drain();
}
@Override
@ -585,13 +614,14 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
closed = true;
clientStream.statsTraceCtx.clientInboundTrailers(trailers);
clientStream.statsTraceCtx.streamClosed(clientStatus);
clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers);
syncContext.executeLater(
() -> clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, trailers));
} else {
clientNotifyStatus = clientStatus;
clientNotifyTrailers = trailers;
}
}
syncContext.drain();
streamClosed();
}
@ -604,24 +634,29 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
streamClosed();
}
private synchronized boolean internalCancel(Status clientStatus) {
if (closed) {
return false;
}
closed = true;
StreamListener.MessageProducer producer;
while ((producer = clientReceiveQueue.poll()) != null) {
InputStream message;
while ((message = producer.next()) != null) {
try {
message.close();
} catch (Throwable t) {
log.log(Level.WARNING, "Exception closing stream", t);
private boolean internalCancel(Status clientStatus) {
synchronized (this) {
if (closed) {
return false;
}
closed = true;
StreamListener.MessageProducer producer;
while ((producer = clientReceiveQueue.poll()) != null) {
InputStream message;
while ((message = producer.next()) != null) {
try {
message.close();
} catch (Throwable t) {
log.log(Level.WARNING, "Exception closing stream", t);
}
}
}
clientStream.statsTraceCtx.streamClosed(clientStatus);
syncContext.executeLater(
() ->
clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata()));
}
clientStream.statsTraceCtx.streamClosed(clientStatus);
clientStreamListener.closed(clientStatus, RpcProgress.PROCESSED, new Metadata());
syncContext.drain();
return true;
}
@ -662,8 +697,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private class InProcessClientStream implements ClientStream {
final StatsTraceContext statsTraceCtx;
final CallOptions callOptions;
@GuardedBy("this")
// All callbacks must run in syncContext to avoid possibility of deadlock in direct executors
private ServerStreamListener serverStreamListener;
private final SynchronizationContext syncContext =
new SynchronizationContext(uncaughtExceptionHandler);
@GuardedBy("this")
private int serverRequested;
@GuardedBy("this")
@ -693,9 +730,10 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
if (onReady) {
synchronized (this) {
if (!closed) {
serverStreamListener.onReady();
syncContext.executeLater(() -> serverStreamListener.onReady());
}
}
syncContext.drain();
}
}
@ -705,21 +743,29 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
*
* @return whether onReady should be called on the server
*/
private synchronized boolean serverRequested(int numMessages) {
if (closed) {
return false;
private boolean serverRequested(int numMessages) {
boolean previouslyReady;
boolean nowReady;
synchronized (this) {
if (closed) {
return false;
}
previouslyReady = serverRequested > 0;
serverRequested += numMessages;
while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
serverRequested--;
StreamListener.MessageProducer producer = serverReceiveQueue.poll();
syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
}
if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
serverNotifyHalfClose = false;
syncContext.executeLater(() -> serverStreamListener.halfClosed());
}
nowReady = serverRequested > 0;
}
boolean previouslyReady = serverRequested > 0;
serverRequested += numMessages;
while (serverRequested > 0 && !serverReceiveQueue.isEmpty()) {
serverRequested--;
serverStreamListener.messagesAvailable(serverReceiveQueue.poll());
}
if (serverReceiveQueue.isEmpty() && serverNotifyHalfClose) {
serverNotifyHalfClose = false;
serverStreamListener.halfClosed();
}
boolean nowReady = serverRequested > 0;
syncContext.drain();
return !previouslyReady && nowReady;
}
@ -728,22 +774,25 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
@Override
public synchronized void writeMessage(InputStream message) {
if (closed) {
return;
}
statsTraceCtx.outboundMessage(outboundSeqNo);
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (serverRequested > 0) {
serverRequested--;
serverStreamListener.messagesAvailable(producer);
} else {
serverReceiveQueue.add(producer);
public void writeMessage(InputStream message) {
synchronized (this) {
if (closed) {
return;
}
statsTraceCtx.outboundMessage(outboundSeqNo);
statsTraceCtx.outboundMessageSent(outboundSeqNo, -1, -1);
serverStream.statsTraceCtx.inboundMessage(outboundSeqNo);
serverStream.statsTraceCtx.inboundMessageRead(outboundSeqNo, -1, -1);
outboundSeqNo++;
StreamListener.MessageProducer producer = new SingleMessageProducer(message);
if (serverRequested > 0) {
serverRequested--;
syncContext.executeLater(() -> serverStreamListener.messagesAvailable(producer));
} else {
serverReceiveQueue.add(producer);
}
}
syncContext.drain();
}
@Override
@ -768,39 +817,45 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
streamClosed();
}
private synchronized boolean internalCancel(
private boolean internalCancel(
Status serverListenerStatus, Status serverTracerStatus) {
if (closed) {
return false;
}
closed = true;
synchronized (this) {
if (closed) {
return false;
}
closed = true;
StreamListener.MessageProducer producer;
while ((producer = serverReceiveQueue.poll()) != null) {
InputStream message;
while ((message = producer.next()) != null) {
try {
message.close();
} catch (Throwable t) {
log.log(Level.WARNING, "Exception closing stream", t);
StreamListener.MessageProducer producer;
while ((producer = serverReceiveQueue.poll()) != null) {
InputStream message;
while ((message = producer.next()) != null) {
try {
message.close();
} catch (Throwable t) {
log.log(Level.WARNING, "Exception closing stream", t);
}
}
}
serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
syncContext.executeLater(() -> serverStreamListener.closed(serverListenerStatus));
}
serverStream.statsTraceCtx.streamClosed(serverTracerStatus);
serverStreamListener.closed(serverListenerStatus);
syncContext.drain();
return true;
}
@Override
public synchronized void halfClose() {
if (closed) {
return;
}
if (serverReceiveQueue.isEmpty()) {
serverStreamListener.halfClosed();
} else {
serverNotifyHalfClose = true;
public void halfClose() {
synchronized (this) {
if (closed) {
return;
}
if (serverReceiveQueue.isEmpty()) {
syncContext.executeLater(() -> serverStreamListener.halfClosed());
} else {
serverNotifyHalfClose = true;
}
}
syncContext.drain();
}
@Override

View File

@ -40,8 +40,6 @@ import org.mockito.ArgumentMatchers;
* Not intended to provide a high code coverage or to test every major usecase.
*
* directExecutor() makes it easier to have deterministic tests.
* However, if your implementation uses another thread and uses streaming it is better to use
* the default executor, to avoid hitting bug #3084.
*
* <p>For more unit test examples see {@link io.grpc.examples.routeguide.RouteGuideClientTest} and
* {@link io.grpc.examples.routeguide.RouteGuideServerTest}.

View File

@ -33,8 +33,6 @@ import org.junit.runners.JUnit4;
* Not intended to provide a high code coverage or to test every major usecase.
*
* directExecutor() makes it easier to have deterministic tests.
* However, if your implementation uses another thread and uses streaming it is better to use
* the default executor, to avoid hitting bug #3084.
*
* <p>For more unit test examples see {@link io.grpc.examples.routeguide.RouteGuideClientTest} and
* {@link io.grpc.examples.routeguide.RouteGuideServerTest}.

View File

@ -53,8 +53,6 @@ import org.mockito.ArgumentCaptor;
* Not intended to provide a high code coverage or to test every major usecase.
*
* directExecutor() makes it easier to have deterministic tests.
* However, if your implementation uses another thread and uses streaming it is better to use
* the default executor, to avoid hitting bug #3084.
*
* <p>For basic unit test examples see {@link io.grpc.examples.helloworld.HelloWorldClientTest} and
* {@link io.grpc.examples.helloworld.HelloWorldServerTest}.

View File

@ -50,8 +50,6 @@ import org.mockito.ArgumentCaptor;
* Not intended to provide a high code coverage or to test every major usecase.
*
* directExecutor() makes it easier to have deterministic tests.
* However, if your implementation uses another thread and uses streaming it is better to use
* the default executor, to avoid hitting bug #3084.
*
* <p>For basic unit test examples see {@link io.grpc.examples.helloworld.HelloWorldClientTest} and
* {@link io.grpc.examples.helloworld.HelloWorldServerTest}.

View File

@ -509,6 +509,7 @@ public final class ClientCalls {
private static final class UnaryStreamToFuture<RespT> extends StartableListener<RespT> {
private final GrpcFuture<RespT> responseFuture;
private RespT value;
private boolean isValueReceived = false;
// Non private to avoid synthetic class
UnaryStreamToFuture(GrpcFuture<RespT> responseFuture) {
@ -521,17 +522,18 @@ public final class ClientCalls {
@Override
public void onMessage(RespT value) {
if (this.value != null) {
if (this.isValueReceived) {
throw Status.INTERNAL.withDescription("More than one value received for unary call")
.asRuntimeException();
}
this.value = value;
this.isValueReceived = true;
}
@Override
public void onClose(Status status, Metadata trailers) {
if (status.isOk()) {
if (value == null) {
if (!isValueReceived) {
// No value received so mark the future as an error
responseFuture.setException(
Status.INTERNAL.withDescription("No value received for unary call")