diff --git a/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java b/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java index 3351aa7155..a9699ea77e 100644 --- a/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java +++ b/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java @@ -37,12 +37,10 @@ import com.google.common.base.Preconditions; import io.grpc.Call; import io.grpc.Channel; import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors.ForwardingCall; +import io.grpc.ClientInterceptors.CheckedForwardingCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -import io.grpc.Status; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -70,27 +68,24 @@ public class ClientAuthInterceptor implements ClientInterceptor { Channel next) { // TODO(ejona86): If the call fails for Auth reasons, this does not properly propagate info that // would be in WWW-Authenticate, because it does not yet have access to the header. - return new ForwardingCall(next.newCall(method)) { + return new CheckedForwardingCall(next.newCall(method)) { @Override - public void start(Listener responseListener, Metadata.Headers headers) { - try { - Metadata.Headers cachedSaved; - synchronized (ClientAuthInterceptor.this) { - // TODO(lryan): This is icky but the current auth library stores the same - // metadata map until the next refresh cycle. This will be fixed once - // https://github.com/google/google-auth-library-java/issues/3 - // is resolved. - if (lastMetadata == null || lastMetadata != credentials.getRequestMetadata()) { - lastMetadata = credentials.getRequestMetadata(); - cached = toHeaders(lastMetadata); - } - cachedSaved = cached; + protected void checkedStart(Listener responseListener, Metadata.Headers headers) + throws Exception { + Metadata.Headers cachedSaved; + synchronized (ClientAuthInterceptor.this) { + // TODO(lryan): This is icky but the current auth library stores the same + // metadata map until the next refresh cycle. This will be fixed once + // https://github.com/google/google-auth-library-java/issues/3 + // is resolved. + if (lastMetadata == null || lastMetadata != credentials.getRequestMetadata()) { + lastMetadata = credentials.getRequestMetadata(); + cached = toHeaders(lastMetadata); } - headers.merge(cachedSaved); - super.start(responseListener, headers); - } catch (IOException ioe) { - responseListener.onClose(Status.fromThrowable(ioe), new Metadata.Trailers()); + cachedSaved = cached; } + headers.merge(cachedSaved); + delegate().start(responseListener, headers); } }; } diff --git a/core/src/main/java/io/grpc/ClientInterceptors.java b/core/src/main/java/io/grpc/ClientInterceptors.java index 60631804e3..def7f06cf2 100644 --- a/core/src/main/java/io/grpc/ClientInterceptors.java +++ b/core/src/main/java/io/grpc/ClientInterceptors.java @@ -34,6 +34,9 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import io.grpc.ForwardingCall.SimpleForwardingCall; +import io.grpc.ForwardingCallListener.SimpleForwardingCallListener; + import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -114,71 +117,95 @@ public class ClientInterceptors { /** * A {@link Call} which forwards all of it's methods to another {@link Call}. + * + * @deprecated Use {@link SimpleForwardingCall}. */ - public static class ForwardingCall extends Call { - - private final Call delegate; - + @Deprecated + public static class ForwardingCall extends SimpleForwardingCall { public ForwardingCall(Call delegate) { + super(delegate); + } + } + + private static final Call NOOP_CALL = new Call() { + @Override + public void start(Listener responseListener, Metadata.Headers headers) { } + + @Override + public void request(int numMessages) { } + + @Override + public void cancel() { } + + @Override + public void halfClose() { } + + @Override + public void sendPayload(Object payload) { } + }; + + /** + * A {@link io.grpc.ForwardingCall} that delivers exceptions from its start logic to the call + * listener. + * + *

{@link io.grpc.ForwardingCall#start()} should not throw any exception other than those + * caused by misuse, e.g., {@link IllegalStateException}. {@code CheckedForwardingCall} provides + * {@code checkedStart()} in which throwing exceptions is allowed. + */ + public abstract static class CheckedForwardingCall + extends io.grpc.ForwardingCall { + + private Call delegate; + + /** + * Subclasses implement the start logic here that would normally belong to {@code start()}. + * + *

Implementation should call {@code this.delegate().start()} in the normal path. Exceptions + * may safely be thrown prior to calling {@code this.delegate().start()}. Such exceptions will + * be handled by {@code CheckedForwardingCall} and be delivered to {@code responseListener}. + * Exceptions must not be thrown after calling {@code this.delegate().start()}, as this + * can result in {@link Call.Listener#onClose} being called multiple times. + */ + protected abstract void checkedStart(Listener responseListener, Metadata.Headers headers) + throws Exception; + + protected CheckedForwardingCall(Call delegate) { this.delegate = delegate; } @Override - public void start(Listener responseListener, Metadata.Headers headers) { - this.delegate.start(responseListener, headers); + protected final Call delegate() { + return delegate; } @Override - public void request(int numMessages) { - this.delegate.request(numMessages); - } - - @Override - public void cancel() { - this.delegate.cancel(); - } - - @Override - public void halfClose() { - this.delegate.halfClose(); - } - - @Override - public void sendPayload(ReqT payload) { - this.delegate.sendPayload(payload); + @SuppressWarnings("unchecked") + public final void start(Listener responseListener, Metadata.Headers headers) { + try { + checkedStart(responseListener, headers); + } catch (Exception e) { + // Because start() doesn't throw, the caller may still try to call other methods on this + // call object. Passing these invocations to the original delegate will cause + // IllegalStateException because delegate().start() was not called. We switch the delegate + // to a NO-OP one to prevent the IllegalStateException. The user will finally get notified + // about the error through the listener. + delegate = (Call) NOOP_CALL; + responseListener.onClose(Status.fromThrowable(e), new Metadata.Trailers()); + } } } /** * A {@link Call.Listener} which forwards all of its methods to another * {@link Call.Listener}. + * + * @deprecated Use {@link SimpleFowardingCallListener}. */ - public static class ForwardingListener extends Call.Listener { - - private final Call.Listener delegate; + @Deprecated + public static class ForwardingListener extends SimpleForwardingCallListener { public ForwardingListener(Call.Listener delegate) { - this.delegate = delegate; - } - - @Override - public void onHeaders(Metadata.Headers headers) { - delegate.onHeaders(headers); - } - - @Override - public void onPayload(T payload) { - delegate.onPayload(payload); - } - - @Override - public void onClose(Status status, Metadata.Trailers trailers) { - delegate.onClose(status, trailers); - } - - @Override - public void onReady(int numMessages) { - delegate.onReady(numMessages); + super(delegate); } } } diff --git a/core/src/main/java/io/grpc/ForwardingCall.java b/core/src/main/java/io/grpc/ForwardingCall.java new file mode 100644 index 0000000000..6df55ab5ec --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingCall.java @@ -0,0 +1,85 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link Call} which forwards all of it's methods to another {@link Call}. + */ +public abstract class ForwardingCall extends Call { + /** + * Returns the delegated {@code Call}. + */ + protected abstract Call delegate(); + + @Override + public void start(Listener responseListener, Metadata.Headers headers) { + this.delegate().start(responseListener, headers); + } + + @Override + public void request(int numMessages) { + this.delegate().request(numMessages); + } + + @Override + public void cancel() { + this.delegate().cancel(); + } + + @Override + public void halfClose() { + this.delegate().halfClose(); + } + + @Override + public void sendPayload(ReqT payload) { + this.delegate().sendPayload(payload); + } + + /** + * A simplified version of {@link ForwardingCall} where subclasses can pass in a {@link Call} as + * the delegate. + */ + public abstract static class SimpleForwardingCall + extends ForwardingCall { + private final Call delegate; + + protected SimpleForwardingCall(Call delegate) { + this.delegate = delegate; + } + + @Override + protected Call delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ForwardingCallListener.java b/core/src/main/java/io/grpc/ForwardingCallListener.java new file mode 100644 index 0000000000..ec0855ebcf --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingCallListener.java @@ -0,0 +1,81 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link Call.Listener} which forwards all of its methods to another {@link Call.Listener}. + */ +public abstract class ForwardingCallListener extends Call.Listener { + /** + * Returns the delegated {@code Call.Listener}. + */ + protected abstract Call.Listener delegate(); + + @Override + public void onHeaders(Metadata.Headers headers) { + delegate().onHeaders(headers); + } + + @Override + public void onPayload(RespT payload) { + delegate().onPayload(payload); + } + + @Override + public void onClose(Status status, Metadata.Trailers trailers) { + delegate().onClose(status, trailers); + } + + @Override + public void onReady(int numMessages) { + delegate().onReady(numMessages); + } + + /** + * A simplified version of {@link ForwardingCallListener} where subclasses can pass in a {@link + * Call.Listener} as the delegate. + */ + public abstract static class SimpleForwardingCallListener + extends ForwardingCallListener { + + private final Call.Listener delegate; + + protected SimpleForwardingCallListener(Call.Listener delegate) { + this.delegate = delegate; + } + + @Override + protected Call.Listener delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ForwardingServerCall.java b/core/src/main/java/io/grpc/ForwardingServerCall.java new file mode 100644 index 0000000000..5d7bccfca0 --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingServerCall.java @@ -0,0 +1,86 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link ServerCall} which forwards all of it's methods to another {@link ServerCall}. + */ +public abstract class ForwardingServerCall extends ServerCall { + /** + * Returns the delegated {@code ServerCall}. + */ + protected abstract ServerCall delegate(); + + @Override + public void request(int numMessages) { + delegate().request(numMessages); + } + + @Override + public void sendHeaders(Metadata.Headers headers) { + delegate().sendHeaders(headers); + } + + @Override + public void sendPayload(RespT payload) { + delegate().sendPayload(payload); + } + + @Override + public void close(Status status, Metadata.Trailers trailers) { + delegate().close(status, trailers); + } + + @Override + public boolean isCancelled() { + return delegate().isCancelled(); + } + + /** + * A simplified version of {@link ForwardingServerCall} where subclasses can pass in a {@link + * ServerCall} as the delegate. + */ + public abstract static class SimpleForwardingServerCall + extends ForwardingServerCall { + + private final ServerCall delegate; + + protected SimpleForwardingServerCall(ServerCall delegate) { + this.delegate = delegate; + } + + @Override + protected ServerCall delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ForwardingServerCallListener.java b/core/src/main/java/io/grpc/ForwardingServerCallListener.java new file mode 100644 index 0000000000..e4a6842841 --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingServerCallListener.java @@ -0,0 +1,87 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link ServerCall.Listener} which forwards all of its methods to another {@link + * ServerCall.Listener}. + */ +public abstract class ForwardingServerCallListener extends ServerCall.Listener { + /** + * Returns the delegated {@code ServerCall.Listener}. + */ + protected abstract ServerCall.Listener delegate(); + + @Override + public void onPayload(ReqT payload) { + delegate().onPayload(payload); + } + + @Override + public void onHalfClose() { + delegate().onHalfClose(); + } + + @Override + public void onCancel() { + delegate().onCancel(); + } + + @Override + public void onComplete() { + delegate().onComplete(); + } + + @Override + public void onReady(int numMessages) { + delegate().onReady(numMessages); + } + + /** + * A simplified version of {@link ForwardingServerCallListener} where subclasses can pass in a + * {@link ServerCall.Listener} as the delegate. + */ + public abstract static class SimpleForwardingServerCallListener + extends ForwardingServerCallListener { + + private final ServerCall.Listener delegate; + + protected SimpleForwardingServerCallListener(ServerCall.Listener delegate) { + this.delegate = delegate; + } + + @Override + protected ServerCall.Listener delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ServerInterceptors.java b/core/src/main/java/io/grpc/ServerInterceptors.java index 57012897eb..a8588fc99a 100644 --- a/core/src/main/java/io/grpc/ServerInterceptors.java +++ b/core/src/main/java/io/grpc/ServerInterceptors.java @@ -34,6 +34,9 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; + import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -145,75 +148,25 @@ public class ServerInterceptors { /** * Utility base class for decorating {@link ServerCall} instances. + * + * @deprecated Use {@link SimpleForwardingServerCall} */ - public static class ForwardingServerCall extends ServerCall { - - private final ServerCall delegate; - + @Deprecated + public static class ForwardingServerCall extends SimpleForwardingServerCall { public ForwardingServerCall(ServerCall delegate) { - this.delegate = delegate; - } - - @Override - public void request(int numMessages) { - delegate.request(numMessages); - } - - @Override - public void sendHeaders(Metadata.Headers headers) { - delegate.sendHeaders(headers); - } - - @Override - public void sendPayload(RespT payload) { - delegate.sendPayload(payload); - } - - @Override - public void close(Status status, Metadata.Trailers trailers) { - delegate.close(status, trailers); - } - - @Override - public boolean isCancelled() { - return delegate.isCancelled(); + super(delegate); } } /** * Utility base class for decorating {@link ServerCall.Listener} instances. + * + * @deprecated Use {@link SimpleForwardingServerCallListener} */ - public static class ForwardingListener extends ServerCall.Listener { - - private final ServerCall.Listener delegate; - - public ForwardingListener(ServerCall.Listener delegate) { - this.delegate = delegate; - } - - @Override - public void onPayload(RespT payload) { - delegate.onPayload(payload); - } - - @Override - public void onHalfClose() { - delegate.onHalfClose(); - } - - @Override - public void onCancel() { - delegate.onCancel(); - } - - @Override - public void onComplete() { - delegate.onComplete(); - } - - @Override - public void onReady(int numMessages) { - delegate.onReady(numMessages); + @Deprecated + public static class ForwardingListener extends SimpleForwardingServerCallListener { + public ForwardingListener(ServerCall.Listener delegate) { + super(delegate); } } } diff --git a/core/src/test/java/io/grpc/ClientInterceptorsTest.java b/core/src/test/java/io/grpc/ClientInterceptorsTest.java index cfcee5d9fa..4daecb1d6e 100644 --- a/core/src/test/java/io/grpc/ClientInterceptorsTest.java +++ b/core/src/test/java/io/grpc/ClientInterceptorsTest.java @@ -32,8 +32,13 @@ package io.grpc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -41,8 +46,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import io.grpc.ClientInterceptors.ForwardingCall; -import io.grpc.ClientInterceptors.ForwardingListener; +import io.grpc.ClientInterceptors.CheckedForwardingCall; +import io.grpc.ForwardingCall.SimpleForwardingCall; +import io.grpc.ForwardingCallListener.SimpleForwardingCallListener; import org.junit.Before; import org.junit.Test; @@ -52,6 +58,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Arrays; @@ -70,9 +78,24 @@ public class ClientInterceptorsTest { @Mock private MethodDescriptor method; + /** + * Sets up mocks. + */ @Before public void setUp() { MockitoAnnotations.initMocks(this); when(channel.newCall(Mockito.>any())).thenReturn(call); + + // Emulate the precondition checks in ChannelImpl.CallImpl + Answer checkStartCalled = new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + verify(call).start(Mockito.>any(), Mockito.any()); + return null; + } + }; + doAnswer(checkStartCalled).when(call).request(anyInt()); + doAnswer(checkStartCalled).when(call).halfClose(); + doAnswer(checkStartCalled).when(call).sendPayload(Mockito.any()); } @Test(expected = NullPointerException.class) @@ -165,7 +188,7 @@ public class ClientInterceptorsTest { public Call interceptCall(MethodDescriptor method, Channel next) { Call call = next.newCall(method); - return new ForwardingCall(call) { + return new SimpleForwardingCall(call) { @Override public void start(Call.Listener responseListener, Metadata.Headers headers) { headers.put(credKey, "abcd"); @@ -195,10 +218,10 @@ public class ClientInterceptorsTest { public Call interceptCall(MethodDescriptor method, Channel next) { Call call = next.newCall(method); - return new ForwardingCall(call) { + return new SimpleForwardingCall(call) { @Override public void start(Call.Listener responseListener, Metadata.Headers headers) { - super.start(new ForwardingListener(responseListener) { + super.start(new SimpleForwardingCallListener(responseListener) { @Override public void onHeaders(Metadata.Headers headers) { examinedHeaders.add(headers); @@ -223,6 +246,67 @@ public class ClientInterceptorsTest { assertEquals(Arrays.asList(inboundHeaders), examinedHeaders); } + @Test + public void normalCall() { + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public Call interceptCall(MethodDescriptor method, + Channel next) { + Call call = next.newCall(method); + return new SimpleForwardingCall(call) { }; + } + }; + Channel intercepted = ClientInterceptors.intercept(channel, interceptor); + Call interceptedCall = intercepted.newCall(method); + assertNotSame(call, interceptedCall); + @SuppressWarnings("unchecked") + Call.Listener listener = mock(Call.Listener.class); + Metadata.Headers headers = new Metadata.Headers(); + interceptedCall.start(listener, headers); + verify(call).start(same(listener), same(headers)); + interceptedCall.sendPayload("request"); + verify(call).sendPayload(eq("request")); + interceptedCall.halfClose(); + verify(call).halfClose(); + interceptedCall.request(1); + verify(call).request(1); + } + + @Test + public void exceptionInStart() { + final Exception error = new Exception("emulated error"); + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public Call interceptCall(MethodDescriptor method, + Channel next) { + Call call = next.newCall(method); + return new CheckedForwardingCall(call) { + @Override + protected void checkedStart(Call.Listener responseListener, + Metadata.Headers headers) throws Exception { + if (this instanceof Object) { + throw error; + } + delegate().start(responseListener, headers); // will not be called + } + }; + } + }; + Channel intercepted = ClientInterceptors.intercept(channel, interceptor); + @SuppressWarnings("unchecked") + Call.Listener listener = mock(Call.Listener.class); + Call interceptedCall = intercepted.newCall(method); + assertNotSame(call, interceptedCall); + interceptedCall.start(listener, new Metadata.Headers()); + interceptedCall.sendPayload("request"); + interceptedCall.halfClose(); + interceptedCall.request(1); + verifyNoMoreInteractions(call); + ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); + verify(listener).onClose(captor.capture(), any(Metadata.Trailers.class)); + assertSame(error, captor.getValue().getCause()); + } + private static class NoopInterceptor implements ClientInterceptor { @Override public Call interceptCall(MethodDescriptor method, diff --git a/stub/src/main/java/io/grpc/stub/MetadataUtils.java b/stub/src/main/java/io/grpc/stub/MetadataUtils.java index ce30025f08..0230132830 100644 --- a/stub/src/main/java/io/grpc/stub/MetadataUtils.java +++ b/stub/src/main/java/io/grpc/stub/MetadataUtils.java @@ -34,8 +34,8 @@ package io.grpc.stub; import io.grpc.Call; import io.grpc.Channel; import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors.ForwardingCall; -import io.grpc.ClientInterceptors.ForwardingListener; +import io.grpc.ForwardingCall.SimpleForwardingCall; +import io.grpc.ForwardingCallListener.SimpleForwardingCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; @@ -73,7 +73,7 @@ public class MetadataUtils { @Override public Call interceptCall(MethodDescriptor method, Channel next) { - return new ForwardingCall(next.newCall(method)) { + return new SimpleForwardingCall(next.newCall(method)) { @Override public void start(Listener responseListener, Metadata.Headers headers) { headers.merge(extraHeaders); @@ -116,12 +116,12 @@ public class MetadataUtils { @Override public Call interceptCall(MethodDescriptor method, Channel next) { - return new ForwardingCall(next.newCall(method)) { + return new SimpleForwardingCall(next.newCall(method)) { @Override public void start(Listener responseListener, Metadata.Headers headers) { headersCapture.set(null); trailersCapture.set(null); - super.start(new ForwardingListener(responseListener) { + super.start(new SimpleForwardingCallListener(responseListener) { @Override public void onHeaders(Metadata.Headers headers) { headersCapture.set(headers); diff --git a/testing/src/main/java/io/grpc/testing/TestUtils.java b/testing/src/main/java/io/grpc/testing/TestUtils.java index 45fa92c113..80e494da93 100644 --- a/testing/src/main/java/io/grpc/testing/TestUtils.java +++ b/testing/src/main/java/io/grpc/testing/TestUtils.java @@ -31,11 +31,11 @@ package io.grpc.testing; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -import io.grpc.ServerInterceptors; import io.grpc.Status; import java.util.Arrays; @@ -60,7 +60,7 @@ public class TestUtils { final Metadata.Headers requestHeaders, ServerCallHandler next) { ServerCall.Listener listener = next.startCall(method, - new ServerInterceptors.ForwardingServerCall(call) { + new SimpleForwardingServerCall(call) { boolean sentHeaders; @Override