diff --git a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
index 792257758b..ba66611511 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessChannelBuilder.java
@@ -70,6 +70,7 @@ public final class InProcessChannelBuilder extends
private final String name;
private ScheduledExecutorService scheduledExecutorService;
private int maxInboundMetadataSize = Integer.MAX_VALUE;
+ private boolean transportIncludeStatusCause = false;
private InProcessChannelBuilder(String name) {
super(new InProcessSocketAddress(name), "localhost");
@@ -157,11 +158,30 @@ public final class InProcessChannelBuilder extends
return this;
}
+ /**
+ * Sets whether to include the cause with the status that is propagated
+ * forward from the InProcessTransport. This was added to make debugging failing
+ * tests easier by showing the cause of the status.
+ *
+ *
By default, this is set to false.
+ * A default value of false maintains consistency with other transports which strip causal
+ * information from the status to avoid leaking information to untrusted clients, and
+ * to avoid sharing language-specific information with the client.
+ * For the in-process implementation, this is not a concern.
+ *
+ * @param enable whether to include cause in status
+ * @return this
+ */
+ public InProcessChannelBuilder propagateCauseWithStatus(boolean enable) {
+ this.transportIncludeStatusCause = enable;
+ return this;
+ }
+
@Override
@Internal
protected ClientTransportFactory buildTransportFactory() {
return new InProcessClientTransportFactory(
- name, scheduledExecutorService, maxInboundMetadataSize);
+ name, scheduledExecutorService, maxInboundMetadataSize, transportIncludeStatusCause);
}
/**
@@ -173,16 +193,18 @@ public final class InProcessChannelBuilder extends
private final boolean useSharedTimer;
private final int maxInboundMetadataSize;
private boolean closed;
+ private boolean includeCauseWithStatus;
private InProcessClientTransportFactory(
String name,
@Nullable ScheduledExecutorService scheduledExecutorService,
- int maxInboundMetadataSize) {
+ int maxInboundMetadataSize, boolean includeCauseWithStatus) {
this.name = name;
useSharedTimer = scheduledExecutorService == null;
timerService = useSharedTimer
? SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE) : scheduledExecutorService;
this.maxInboundMetadataSize = maxInboundMetadataSize;
+ this.includeCauseWithStatus = includeCauseWithStatus;
}
@Override
@@ -194,7 +216,7 @@ public final class InProcessChannelBuilder extends
// TODO(carl-mastrangelo): Pass channelLogger in.
return new InProcessTransport(
name, maxInboundMetadataSize, options.getAuthority(), options.getUserAgent(),
- options.getEagAttributes());
+ options.getEagAttributes(), includeCauseWithStatus);
}
@Override
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
index 448d691306..3461eeebc0 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java
@@ -83,6 +83,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
private final String userAgent;
private final Optional optionalServerListener;
private int serverMaxInboundMetadataSize;
+ private final boolean includeCauseWithStatus;
private ObjectPool serverSchedulerPool;
private ScheduledExecutorService serverScheduler;
private ServerTransportListener serverTransportListener;
@@ -115,7 +116,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
};
private InProcessTransport(String name, int maxInboundMetadataSize, String authority,
- String userAgent, Attributes eagAttrs, Optional optionalServerListener) {
+ String userAgent, Attributes eagAttrs,
+ Optional optionalServerListener, boolean includeCauseWithStatus) {
this.name = name;
this.clientMaxInboundMetadataSize = maxInboundMetadataSize;
this.authority = authority;
@@ -129,13 +131,14 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
.build();
this.optionalServerListener = optionalServerListener;
logId = InternalLogId.allocate(getClass(), name);
+ this.includeCauseWithStatus = includeCauseWithStatus;
}
public InProcessTransport(
String name, int maxInboundMetadataSize, String authority, String userAgent,
- Attributes eagAttrs) {
+ Attributes eagAttrs, boolean includeCauseWithStatus) {
this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
- Optional.absent());
+ Optional.absent(), includeCauseWithStatus);
}
InProcessTransport(
@@ -143,7 +146,8 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
Attributes eagAttrs, ObjectPool serverSchedulerPool,
List serverStreamTracerFactories,
ServerListener serverListener) {
- this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs, Optional.of(serverListener));
+ this(name, maxInboundMetadataSize, authority, userAgent, eagAttrs,
+ Optional.of(serverListener), false);
this.serverMaxInboundMetadataSize = maxInboundMetadataSize;
this.serverSchedulerPool = serverSchedulerPool;
this.serverStreamTracerFactories = serverStreamTracerFactories;
@@ -564,7 +568,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
/** clientStream.serverClosed() must be called before this method */
private void notifyClientClose(Status status, Metadata trailers) {
- Status clientStatus = stripCause(status);
+ Status clientStatus = cleanStatus(status, includeCauseWithStatus);
synchronized (this) {
if (closed) {
return;
@@ -744,7 +748,7 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
// Must be thread-safe for shutdownNow()
@Override
public void cancel(Status reason) {
- Status serverStatus = stripCause(reason);
+ Status serverStatus = cleanStatus(reason, includeCauseWithStatus);
if (!internalCancel(serverStatus, serverStatus)) {
return;
}
@@ -843,19 +847,25 @@ final class InProcessTransport implements ServerTransport, ConnectionClientTrans
}
/**
- * Returns a new status with the same code and description, but stripped of any other information
- * (i.e. cause).
+ * Returns a new status with the same code and description.
+ * If includeCauseWithStatus is true, cause is also included.
*
- * This is, so that the InProcess transport behaves in the same way as the other transports,
- * when exchanging statuses between client and server and vice versa.
+ *
For InProcess transport to behave in the same way as the other transports,
+ * when exchanging statuses between client and server and vice versa,
+ * the cause should be excluded from the status.
+ * For easier debugging, the status may be optionally included.
*/
- private static Status stripCause(Status status) {
+ private static Status cleanStatus(Status status, boolean includeCauseWithStatus) {
if (status == null) {
return null;
}
- return Status
+ Status clientStatus = Status
.fromCodeValue(status.getCode().value())
.withDescription(status.getDescription());
+ if (includeCauseWithStatus) {
+ clientStatus = clientStatus.withCause(status.getCause());
+ }
+ return clientStatus;
}
private static class SingleMessageProducer implements StreamListener.MessageProducer {
diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java
index 5db33a226b..05c16e45bf 100644
--- a/core/src/main/java/io/grpc/internal/ServerImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerImpl.java
@@ -161,7 +161,6 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
this.ticker = checkNotNull(builder.ticker, "ticker");
-
channelz.addServer(this);
}
@@ -762,9 +761,9 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
/**
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
- private void internalClose() {
+ private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
- stream.close(Status.UNKNOWN, new Metadata());
+ stream.close(Status.UNKNOWN.withCause(t), new Metadata());
}
@Override
@@ -785,10 +784,10 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
try {
getListener().messagesAvailable(producer);
} catch (RuntimeException e) {
- internalClose();
+ internalClose(e);
throw e;
} catch (Error e) {
- internalClose();
+ internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag);
@@ -820,10 +819,10 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
try {
getListener().halfClosed();
} catch (RuntimeException e) {
- internalClose();
+ internalClose(e);
throw e;
} catch (Error e) {
- internalClose();
+ internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).halfClosed", tag);
@@ -894,10 +893,10 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
try {
getListener().onReady();
} catch (RuntimeException e) {
- internalClose();
+ internalClose(e);
throw e;
} catch (Error e) {
- internalClose();
+ internalClose(e);
throw e;
} finally {
PerfMark.stopTask("ServerCallListener(app).onReady", tag);
diff --git a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
index 3bd46cd384..f7e325ad5a 100644
--- a/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
+++ b/core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
@@ -16,14 +16,30 @@
package io.grpc.inprocess;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
import com.google.common.collect.ImmutableList;
+import io.grpc.CallOptions;
+import io.grpc.ManagedChannel;
+import io.grpc.Metadata;
+import io.grpc.Server;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerServiceDefinition;
import io.grpc.ServerStreamTracer;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
import io.grpc.internal.AbstractTransportTest;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
+import io.grpc.stub.ClientCalls;
+import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.testing.TestMethodDescriptors;
import java.util.List;
import org.junit.Ignore;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -35,6 +51,9 @@ public class InProcessTransportTest extends AbstractTransportTest {
private static final String AUTHORITY = "a-testing-authority";
private static final String USER_AGENT = "a-testing-user-agent";
+ @Rule
+ public final GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
+
@Override
protected List extends InternalServer> newServer(
List streamTracerFactories) {
@@ -59,7 +78,7 @@ public class InProcessTransportTest extends AbstractTransportTest {
protected ManagedClientTransport newClientTransport(InternalServer server) {
return new InProcessTransport(
TRANSPORT_NAME, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, testAuthority(server), USER_AGENT,
- eagAttrs());
+ eagAttrs(), false);
}
@Override
@@ -75,4 +94,42 @@ public class InProcessTransportTest extends AbstractTransportTest {
public void socketStats() throws Exception {
// test does not apply to in-process
}
+
+ @Test
+ public void causeShouldBePropagatedWithStatus() throws Exception {
+ server = null;
+ String failingServerName = "server_foo";
+ String serviceFoo = "service_foo";
+ final Status s = Status.INTERNAL.withCause(new Throwable("failing server exception"));
+ ServerServiceDefinition definition = ServerServiceDefinition.builder(serviceFoo)
+ .addMethod(TestMethodDescriptors.voidMethod(), new ServerCallHandler() {
+ @Override
+ public ServerCall.Listener startCall(
+ ServerCall call, Metadata headers) {
+ call.close(s, new Metadata());
+ return new ServerCall.Listener() {};
+ }
+ })
+ .build();
+ Server failingServer = InProcessServerBuilder
+ .forName(failingServerName)
+ .addService(definition)
+ .directExecutor()
+ .build()
+ .start();
+ grpcCleanupRule.register(failingServer);
+ ManagedChannel channel = InProcessChannelBuilder
+ .forName(failingServerName)
+ .propagateCauseWithStatus(true)
+ .build();
+ grpcCleanupRule.register(channel);
+ try {
+ ClientCalls.blockingUnaryCall(channel, TestMethodDescriptors.voidMethod(),
+ CallOptions.DEFAULT, null);
+ fail("exception should have been thrown");
+ } catch (StatusRuntimeException e) {
+ // When propagateCauseWithStatus is true, the cause should be sent forward
+ assertEquals(s.getCause(), e.getCause());
+ }
+ }
}
diff --git a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java
index adbefbedef..43f6273cde 100644
--- a/core/src/test/java/io/grpc/internal/AbstractTransportTest.java
+++ b/core/src/test/java/io/grpc/internal/AbstractTransportTest.java
@@ -157,7 +157,7 @@ public abstract class AbstractTransportTest {
* {@code serverListener}, otherwise tearDown() can't wait for shutdown which can put following
* tests in an indeterminate state.
*/
- private InternalServer server;
+ protected InternalServer server;
private ServerTransport serverTransport;
private ManagedClientTransport client;
private MethodDescriptor methodDescriptor =
@@ -1058,9 +1058,7 @@ public abstract class AbstractTransportTest {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
- assertEquals(status.getCode(), clientStreamStatus.getCode());
- assertEquals("Hello. Goodbye.", clientStreamStatus.getDescription());
- assertNull(clientStreamStatus.getCause());
+ checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertTrue(clientStreamTracer1.getInboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
@@ -1097,10 +1095,7 @@ public abstract class AbstractTransportTest {
Status clientStreamStatus = clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- assertEquals(status.getCode(), clientStreamStatus.getCode());
- assertEquals("Hellogoodbye", clientStreamStatus.getDescription());
- // Cause should not be transmitted to the client.
- assertNull(clientStreamStatus.getCause());
+ checkClientStatus(status, clientStreamStatus);
assertEquals(
Lists.newArrayList(trailers.getAll(asciiKey)),
Lists.newArrayList(clientStreamTrailers.getAll(asciiKey)));
@@ -1138,9 +1133,7 @@ public abstract class AbstractTransportTest {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
- assertEquals(status.getCode(), clientStreamStatus.getCode());
- assertEquals(status.getDescription(), clientStreamStatus.getDescription());
- assertNull(clientStreamStatus.getCause());
+ checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
@@ -1188,9 +1181,7 @@ public abstract class AbstractTransportTest {
Metadata clientStreamTrailers =
clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotNull(clientStreamTrailers);
- assertEquals(status.getCode(), clientStreamStatus.getCode());
- assertEquals(status.getDescription(), clientStreamStatus.getDescription());
- assertNull(clientStreamStatus.getCause());
+ checkClientStatus(status, clientStreamStatus);
assertTrue(clientStreamTracer1.getOutboundHeaders());
assertSame(clientStreamTrailers, clientStreamTracer1.getInboundTrailers());
assertSame(clientStreamStatus, clientStreamTracer1.getStatus());
@@ -1219,7 +1210,7 @@ public abstract class AbstractTransportTest {
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
Status serverStatus = serverStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
assertNotEquals(Status.Code.OK, serverStatus.getCode());
- // Cause should not be transmitted between client and server
+ // Cause should not be transmitted between client and server by default
assertNull(serverStatus.getCause());
clientStream.cancel(status);
@@ -2072,6 +2063,16 @@ public abstract class AbstractTransportTest {
}
}
+ /**
+ * Verifies that the client status is as expected. By default, the code and description should
+ * be present, and the cause should be stripped away.
+ */
+ private void checkClientStatus(Status expectedStatus, Status clientStreamStatus) {
+ assertEquals(expectedStatus.getCode(), clientStreamStatus.getCode());
+ assertEquals(expectedStatus.getDescription(), clientStreamStatus.getDescription());
+ assertNull(clientStreamStatus.getCause());
+ }
+
private static boolean waitForFuture(Future> future, long timeout, TimeUnit unit)
throws InterruptedException {
try {
diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java
index d76e714523..b32833f343 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java
@@ -1441,8 +1441,9 @@ public class ServerImplTest {
private void ensureServerStateNotLeaked() {
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
- assertEquals(Status.UNKNOWN, statusCaptor.getValue());
- assertNull(statusCaptor.getValue().getCause());
+ assertEquals(Status.UNKNOWN.getCode(), statusCaptor.getValue().getCode());
+ // Used in InProcessTransport when set to include the cause with the status
+ assertNotNull(statusCaptor.getValue().getCause());
assertTrue(metadataCaptor.getValue().keys().isEmpty());
}