From 0d89d3d1bcb73c45a2dd750042b1e89a86451c38 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Thu, 26 Mar 2015 16:02:24 -0700 Subject: [PATCH] Fix the issue where the intecepting call fails in start(), does not call super.start(), and makes the subsequent use of other methods on the call throw IllegalStateException. Create ClientInterceptors.CheckedForwardingCall that handles exception in start logic. Create Forwarding[Server]Call[Listener] for generic decoration use cases, with an abstract delegate() for flexibility. Create SimpleForwarding[Server]Call[Listener] to replace now deprecated forwarding classes. --- .../io/grpc/auth/ClientAuthInterceptor.java | 37 +++--- .../main/java/io/grpc/ClientInterceptors.java | 121 +++++++++++------- .../src/main/java/io/grpc/ForwardingCall.java | 85 ++++++++++++ .../java/io/grpc/ForwardingCallListener.java | 81 ++++++++++++ .../java/io/grpc/ForwardingServerCall.java | 86 +++++++++++++ .../io/grpc/ForwardingServerCallListener.java | 87 +++++++++++++ .../main/java/io/grpc/ServerInterceptors.java | 75 ++--------- .../java/io/grpc/ClientInterceptorsTest.java | 94 +++++++++++++- .../main/java/io/grpc/stub/MetadataUtils.java | 10 +- .../main/java/io/grpc/testing/TestUtils.java | 4 +- 10 files changed, 539 insertions(+), 141 deletions(-) create mode 100644 core/src/main/java/io/grpc/ForwardingCall.java create mode 100644 core/src/main/java/io/grpc/ForwardingCallListener.java create mode 100644 core/src/main/java/io/grpc/ForwardingServerCall.java create mode 100644 core/src/main/java/io/grpc/ForwardingServerCallListener.java diff --git a/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java b/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java index 3351aa7155..a9699ea77e 100644 --- a/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java +++ b/auth/src/main/java/io/grpc/auth/ClientAuthInterceptor.java @@ -37,12 +37,10 @@ import com.google.common.base.Preconditions; import io.grpc.Call; import io.grpc.Channel; import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors.ForwardingCall; +import io.grpc.ClientInterceptors.CheckedForwardingCall; import io.grpc.Metadata; import io.grpc.MethodDescriptor; -import io.grpc.Status; -import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -70,27 +68,24 @@ public class ClientAuthInterceptor implements ClientInterceptor { Channel next) { // TODO(ejona86): If the call fails for Auth reasons, this does not properly propagate info that // would be in WWW-Authenticate, because it does not yet have access to the header. - return new ForwardingCall(next.newCall(method)) { + return new CheckedForwardingCall(next.newCall(method)) { @Override - public void start(Listener responseListener, Metadata.Headers headers) { - try { - Metadata.Headers cachedSaved; - synchronized (ClientAuthInterceptor.this) { - // TODO(lryan): This is icky but the current auth library stores the same - // metadata map until the next refresh cycle. This will be fixed once - // https://github.com/google/google-auth-library-java/issues/3 - // is resolved. - if (lastMetadata == null || lastMetadata != credentials.getRequestMetadata()) { - lastMetadata = credentials.getRequestMetadata(); - cached = toHeaders(lastMetadata); - } - cachedSaved = cached; + protected void checkedStart(Listener responseListener, Metadata.Headers headers) + throws Exception { + Metadata.Headers cachedSaved; + synchronized (ClientAuthInterceptor.this) { + // TODO(lryan): This is icky but the current auth library stores the same + // metadata map until the next refresh cycle. This will be fixed once + // https://github.com/google/google-auth-library-java/issues/3 + // is resolved. + if (lastMetadata == null || lastMetadata != credentials.getRequestMetadata()) { + lastMetadata = credentials.getRequestMetadata(); + cached = toHeaders(lastMetadata); } - headers.merge(cachedSaved); - super.start(responseListener, headers); - } catch (IOException ioe) { - responseListener.onClose(Status.fromThrowable(ioe), new Metadata.Trailers()); + cachedSaved = cached; } + headers.merge(cachedSaved); + delegate().start(responseListener, headers); } }; } diff --git a/core/src/main/java/io/grpc/ClientInterceptors.java b/core/src/main/java/io/grpc/ClientInterceptors.java index 60631804e3..def7f06cf2 100644 --- a/core/src/main/java/io/grpc/ClientInterceptors.java +++ b/core/src/main/java/io/grpc/ClientInterceptors.java @@ -34,6 +34,9 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import io.grpc.ForwardingCall.SimpleForwardingCall; +import io.grpc.ForwardingCallListener.SimpleForwardingCallListener; + import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -114,71 +117,95 @@ public class ClientInterceptors { /** * A {@link Call} which forwards all of it's methods to another {@link Call}. + * + * @deprecated Use {@link SimpleForwardingCall}. */ - public static class ForwardingCall extends Call { - - private final Call delegate; - + @Deprecated + public static class ForwardingCall extends SimpleForwardingCall { public ForwardingCall(Call delegate) { + super(delegate); + } + } + + private static final Call NOOP_CALL = new Call() { + @Override + public void start(Listener responseListener, Metadata.Headers headers) { } + + @Override + public void request(int numMessages) { } + + @Override + public void cancel() { } + + @Override + public void halfClose() { } + + @Override + public void sendPayload(Object payload) { } + }; + + /** + * A {@link io.grpc.ForwardingCall} that delivers exceptions from its start logic to the call + * listener. + * + *

{@link io.grpc.ForwardingCall#start()} should not throw any exception other than those + * caused by misuse, e.g., {@link IllegalStateException}. {@code CheckedForwardingCall} provides + * {@code checkedStart()} in which throwing exceptions is allowed. + */ + public abstract static class CheckedForwardingCall + extends io.grpc.ForwardingCall { + + private Call delegate; + + /** + * Subclasses implement the start logic here that would normally belong to {@code start()}. + * + *

Implementation should call {@code this.delegate().start()} in the normal path. Exceptions + * may safely be thrown prior to calling {@code this.delegate().start()}. Such exceptions will + * be handled by {@code CheckedForwardingCall} and be delivered to {@code responseListener}. + * Exceptions must not be thrown after calling {@code this.delegate().start()}, as this + * can result in {@link Call.Listener#onClose} being called multiple times. + */ + protected abstract void checkedStart(Listener responseListener, Metadata.Headers headers) + throws Exception; + + protected CheckedForwardingCall(Call delegate) { this.delegate = delegate; } @Override - public void start(Listener responseListener, Metadata.Headers headers) { - this.delegate.start(responseListener, headers); + protected final Call delegate() { + return delegate; } @Override - public void request(int numMessages) { - this.delegate.request(numMessages); - } - - @Override - public void cancel() { - this.delegate.cancel(); - } - - @Override - public void halfClose() { - this.delegate.halfClose(); - } - - @Override - public void sendPayload(ReqT payload) { - this.delegate.sendPayload(payload); + @SuppressWarnings("unchecked") + public final void start(Listener responseListener, Metadata.Headers headers) { + try { + checkedStart(responseListener, headers); + } catch (Exception e) { + // Because start() doesn't throw, the caller may still try to call other methods on this + // call object. Passing these invocations to the original delegate will cause + // IllegalStateException because delegate().start() was not called. We switch the delegate + // to a NO-OP one to prevent the IllegalStateException. The user will finally get notified + // about the error through the listener. + delegate = (Call) NOOP_CALL; + responseListener.onClose(Status.fromThrowable(e), new Metadata.Trailers()); + } } } /** * A {@link Call.Listener} which forwards all of its methods to another * {@link Call.Listener}. + * + * @deprecated Use {@link SimpleFowardingCallListener}. */ - public static class ForwardingListener extends Call.Listener { - - private final Call.Listener delegate; + @Deprecated + public static class ForwardingListener extends SimpleForwardingCallListener { public ForwardingListener(Call.Listener delegate) { - this.delegate = delegate; - } - - @Override - public void onHeaders(Metadata.Headers headers) { - delegate.onHeaders(headers); - } - - @Override - public void onPayload(T payload) { - delegate.onPayload(payload); - } - - @Override - public void onClose(Status status, Metadata.Trailers trailers) { - delegate.onClose(status, trailers); - } - - @Override - public void onReady(int numMessages) { - delegate.onReady(numMessages); + super(delegate); } } } diff --git a/core/src/main/java/io/grpc/ForwardingCall.java b/core/src/main/java/io/grpc/ForwardingCall.java new file mode 100644 index 0000000000..6df55ab5ec --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingCall.java @@ -0,0 +1,85 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link Call} which forwards all of it's methods to another {@link Call}. + */ +public abstract class ForwardingCall extends Call { + /** + * Returns the delegated {@code Call}. + */ + protected abstract Call delegate(); + + @Override + public void start(Listener responseListener, Metadata.Headers headers) { + this.delegate().start(responseListener, headers); + } + + @Override + public void request(int numMessages) { + this.delegate().request(numMessages); + } + + @Override + public void cancel() { + this.delegate().cancel(); + } + + @Override + public void halfClose() { + this.delegate().halfClose(); + } + + @Override + public void sendPayload(ReqT payload) { + this.delegate().sendPayload(payload); + } + + /** + * A simplified version of {@link ForwardingCall} where subclasses can pass in a {@link Call} as + * the delegate. + */ + public abstract static class SimpleForwardingCall + extends ForwardingCall { + private final Call delegate; + + protected SimpleForwardingCall(Call delegate) { + this.delegate = delegate; + } + + @Override + protected Call delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ForwardingCallListener.java b/core/src/main/java/io/grpc/ForwardingCallListener.java new file mode 100644 index 0000000000..ec0855ebcf --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingCallListener.java @@ -0,0 +1,81 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link Call.Listener} which forwards all of its methods to another {@link Call.Listener}. + */ +public abstract class ForwardingCallListener extends Call.Listener { + /** + * Returns the delegated {@code Call.Listener}. + */ + protected abstract Call.Listener delegate(); + + @Override + public void onHeaders(Metadata.Headers headers) { + delegate().onHeaders(headers); + } + + @Override + public void onPayload(RespT payload) { + delegate().onPayload(payload); + } + + @Override + public void onClose(Status status, Metadata.Trailers trailers) { + delegate().onClose(status, trailers); + } + + @Override + public void onReady(int numMessages) { + delegate().onReady(numMessages); + } + + /** + * A simplified version of {@link ForwardingCallListener} where subclasses can pass in a {@link + * Call.Listener} as the delegate. + */ + public abstract static class SimpleForwardingCallListener + extends ForwardingCallListener { + + private final Call.Listener delegate; + + protected SimpleForwardingCallListener(Call.Listener delegate) { + this.delegate = delegate; + } + + @Override + protected Call.Listener delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ForwardingServerCall.java b/core/src/main/java/io/grpc/ForwardingServerCall.java new file mode 100644 index 0000000000..5d7bccfca0 --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingServerCall.java @@ -0,0 +1,86 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link ServerCall} which forwards all of it's methods to another {@link ServerCall}. + */ +public abstract class ForwardingServerCall extends ServerCall { + /** + * Returns the delegated {@code ServerCall}. + */ + protected abstract ServerCall delegate(); + + @Override + public void request(int numMessages) { + delegate().request(numMessages); + } + + @Override + public void sendHeaders(Metadata.Headers headers) { + delegate().sendHeaders(headers); + } + + @Override + public void sendPayload(RespT payload) { + delegate().sendPayload(payload); + } + + @Override + public void close(Status status, Metadata.Trailers trailers) { + delegate().close(status, trailers); + } + + @Override + public boolean isCancelled() { + return delegate().isCancelled(); + } + + /** + * A simplified version of {@link ForwardingServerCall} where subclasses can pass in a {@link + * ServerCall} as the delegate. + */ + public abstract static class SimpleForwardingServerCall + extends ForwardingServerCall { + + private final ServerCall delegate; + + protected SimpleForwardingServerCall(ServerCall delegate) { + this.delegate = delegate; + } + + @Override + protected ServerCall delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ForwardingServerCallListener.java b/core/src/main/java/io/grpc/ForwardingServerCallListener.java new file mode 100644 index 0000000000..e4a6842841 --- /dev/null +++ b/core/src/main/java/io/grpc/ForwardingServerCallListener.java @@ -0,0 +1,87 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +/** + * A {@link ServerCall.Listener} which forwards all of its methods to another {@link + * ServerCall.Listener}. + */ +public abstract class ForwardingServerCallListener extends ServerCall.Listener { + /** + * Returns the delegated {@code ServerCall.Listener}. + */ + protected abstract ServerCall.Listener delegate(); + + @Override + public void onPayload(ReqT payload) { + delegate().onPayload(payload); + } + + @Override + public void onHalfClose() { + delegate().onHalfClose(); + } + + @Override + public void onCancel() { + delegate().onCancel(); + } + + @Override + public void onComplete() { + delegate().onComplete(); + } + + @Override + public void onReady(int numMessages) { + delegate().onReady(numMessages); + } + + /** + * A simplified version of {@link ForwardingServerCallListener} where subclasses can pass in a + * {@link ServerCall.Listener} as the delegate. + */ + public abstract static class SimpleForwardingServerCallListener + extends ForwardingServerCallListener { + + private final ServerCall.Listener delegate; + + protected SimpleForwardingServerCallListener(ServerCall.Listener delegate) { + this.delegate = delegate; + } + + @Override + protected ServerCall.Listener delegate() { + return delegate; + } + } +} diff --git a/core/src/main/java/io/grpc/ServerInterceptors.java b/core/src/main/java/io/grpc/ServerInterceptors.java index 57012897eb..a8588fc99a 100644 --- a/core/src/main/java/io/grpc/ServerInterceptors.java +++ b/core/src/main/java/io/grpc/ServerInterceptors.java @@ -34,6 +34,9 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; +import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener; + import java.util.Arrays; import java.util.Iterator; import java.util.List; @@ -145,75 +148,25 @@ public class ServerInterceptors { /** * Utility base class for decorating {@link ServerCall} instances. + * + * @deprecated Use {@link SimpleForwardingServerCall} */ - public static class ForwardingServerCall extends ServerCall { - - private final ServerCall delegate; - + @Deprecated + public static class ForwardingServerCall extends SimpleForwardingServerCall { public ForwardingServerCall(ServerCall delegate) { - this.delegate = delegate; - } - - @Override - public void request(int numMessages) { - delegate.request(numMessages); - } - - @Override - public void sendHeaders(Metadata.Headers headers) { - delegate.sendHeaders(headers); - } - - @Override - public void sendPayload(RespT payload) { - delegate.sendPayload(payload); - } - - @Override - public void close(Status status, Metadata.Trailers trailers) { - delegate.close(status, trailers); - } - - @Override - public boolean isCancelled() { - return delegate.isCancelled(); + super(delegate); } } /** * Utility base class for decorating {@link ServerCall.Listener} instances. + * + * @deprecated Use {@link SimpleForwardingServerCallListener} */ - public static class ForwardingListener extends ServerCall.Listener { - - private final ServerCall.Listener delegate; - - public ForwardingListener(ServerCall.Listener delegate) { - this.delegate = delegate; - } - - @Override - public void onPayload(RespT payload) { - delegate.onPayload(payload); - } - - @Override - public void onHalfClose() { - delegate.onHalfClose(); - } - - @Override - public void onCancel() { - delegate.onCancel(); - } - - @Override - public void onComplete() { - delegate.onComplete(); - } - - @Override - public void onReady(int numMessages) { - delegate.onReady(numMessages); + @Deprecated + public static class ForwardingListener extends SimpleForwardingServerCallListener { + public ForwardingListener(ServerCall.Listener delegate) { + super(delegate); } } } diff --git a/core/src/test/java/io/grpc/ClientInterceptorsTest.java b/core/src/test/java/io/grpc/ClientInterceptorsTest.java index cfcee5d9fa..4daecb1d6e 100644 --- a/core/src/test/java/io/grpc/ClientInterceptorsTest.java +++ b/core/src/test/java/io/grpc/ClientInterceptorsTest.java @@ -32,8 +32,13 @@ package io.grpc; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertSame; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -41,8 +46,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import io.grpc.ClientInterceptors.ForwardingCall; -import io.grpc.ClientInterceptors.ForwardingListener; +import io.grpc.ClientInterceptors.CheckedForwardingCall; +import io.grpc.ForwardingCall.SimpleForwardingCall; +import io.grpc.ForwardingCallListener.SimpleForwardingCallListener; import org.junit.Before; import org.junit.Test; @@ -52,6 +58,8 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Arrays; @@ -70,9 +78,24 @@ public class ClientInterceptorsTest { @Mock private MethodDescriptor method; + /** + * Sets up mocks. + */ @Before public void setUp() { MockitoAnnotations.initMocks(this); when(channel.newCall(Mockito.>any())).thenReturn(call); + + // Emulate the precondition checks in ChannelImpl.CallImpl + Answer checkStartCalled = new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + verify(call).start(Mockito.>any(), Mockito.any()); + return null; + } + }; + doAnswer(checkStartCalled).when(call).request(anyInt()); + doAnswer(checkStartCalled).when(call).halfClose(); + doAnswer(checkStartCalled).when(call).sendPayload(Mockito.any()); } @Test(expected = NullPointerException.class) @@ -165,7 +188,7 @@ public class ClientInterceptorsTest { public Call interceptCall(MethodDescriptor method, Channel next) { Call call = next.newCall(method); - return new ForwardingCall(call) { + return new SimpleForwardingCall(call) { @Override public void start(Call.Listener responseListener, Metadata.Headers headers) { headers.put(credKey, "abcd"); @@ -195,10 +218,10 @@ public class ClientInterceptorsTest { public Call interceptCall(MethodDescriptor method, Channel next) { Call call = next.newCall(method); - return new ForwardingCall(call) { + return new SimpleForwardingCall(call) { @Override public void start(Call.Listener responseListener, Metadata.Headers headers) { - super.start(new ForwardingListener(responseListener) { + super.start(new SimpleForwardingCallListener(responseListener) { @Override public void onHeaders(Metadata.Headers headers) { examinedHeaders.add(headers); @@ -223,6 +246,67 @@ public class ClientInterceptorsTest { assertEquals(Arrays.asList(inboundHeaders), examinedHeaders); } + @Test + public void normalCall() { + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public Call interceptCall(MethodDescriptor method, + Channel next) { + Call call = next.newCall(method); + return new SimpleForwardingCall(call) { }; + } + }; + Channel intercepted = ClientInterceptors.intercept(channel, interceptor); + Call interceptedCall = intercepted.newCall(method); + assertNotSame(call, interceptedCall); + @SuppressWarnings("unchecked") + Call.Listener listener = mock(Call.Listener.class); + Metadata.Headers headers = new Metadata.Headers(); + interceptedCall.start(listener, headers); + verify(call).start(same(listener), same(headers)); + interceptedCall.sendPayload("request"); + verify(call).sendPayload(eq("request")); + interceptedCall.halfClose(); + verify(call).halfClose(); + interceptedCall.request(1); + verify(call).request(1); + } + + @Test + public void exceptionInStart() { + final Exception error = new Exception("emulated error"); + ClientInterceptor interceptor = new ClientInterceptor() { + @Override + public Call interceptCall(MethodDescriptor method, + Channel next) { + Call call = next.newCall(method); + return new CheckedForwardingCall(call) { + @Override + protected void checkedStart(Call.Listener responseListener, + Metadata.Headers headers) throws Exception { + if (this instanceof Object) { + throw error; + } + delegate().start(responseListener, headers); // will not be called + } + }; + } + }; + Channel intercepted = ClientInterceptors.intercept(channel, interceptor); + @SuppressWarnings("unchecked") + Call.Listener listener = mock(Call.Listener.class); + Call interceptedCall = intercepted.newCall(method); + assertNotSame(call, interceptedCall); + interceptedCall.start(listener, new Metadata.Headers()); + interceptedCall.sendPayload("request"); + interceptedCall.halfClose(); + interceptedCall.request(1); + verifyNoMoreInteractions(call); + ArgumentCaptor captor = ArgumentCaptor.forClass(Status.class); + verify(listener).onClose(captor.capture(), any(Metadata.Trailers.class)); + assertSame(error, captor.getValue().getCause()); + } + private static class NoopInterceptor implements ClientInterceptor { @Override public Call interceptCall(MethodDescriptor method, diff --git a/stub/src/main/java/io/grpc/stub/MetadataUtils.java b/stub/src/main/java/io/grpc/stub/MetadataUtils.java index ce30025f08..0230132830 100644 --- a/stub/src/main/java/io/grpc/stub/MetadataUtils.java +++ b/stub/src/main/java/io/grpc/stub/MetadataUtils.java @@ -34,8 +34,8 @@ package io.grpc.stub; import io.grpc.Call; import io.grpc.Channel; import io.grpc.ClientInterceptor; -import io.grpc.ClientInterceptors.ForwardingCall; -import io.grpc.ClientInterceptors.ForwardingListener; +import io.grpc.ForwardingCall.SimpleForwardingCall; +import io.grpc.ForwardingCallListener.SimpleForwardingCallListener; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; @@ -73,7 +73,7 @@ public class MetadataUtils { @Override public Call interceptCall(MethodDescriptor method, Channel next) { - return new ForwardingCall(next.newCall(method)) { + return new SimpleForwardingCall(next.newCall(method)) { @Override public void start(Listener responseListener, Metadata.Headers headers) { headers.merge(extraHeaders); @@ -116,12 +116,12 @@ public class MetadataUtils { @Override public Call interceptCall(MethodDescriptor method, Channel next) { - return new ForwardingCall(next.newCall(method)) { + return new SimpleForwardingCall(next.newCall(method)) { @Override public void start(Listener responseListener, Metadata.Headers headers) { headersCapture.set(null); trailersCapture.set(null); - super.start(new ForwardingListener(responseListener) { + super.start(new SimpleForwardingCallListener(responseListener) { @Override public void onHeaders(Metadata.Headers headers) { headersCapture.set(headers); diff --git a/testing/src/main/java/io/grpc/testing/TestUtils.java b/testing/src/main/java/io/grpc/testing/TestUtils.java index 45fa92c113..80e494da93 100644 --- a/testing/src/main/java/io/grpc/testing/TestUtils.java +++ b/testing/src/main/java/io/grpc/testing/TestUtils.java @@ -31,11 +31,11 @@ package io.grpc.testing; +import io.grpc.ForwardingServerCall.SimpleForwardingServerCall; import io.grpc.Metadata; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -import io.grpc.ServerInterceptors; import io.grpc.Status; import java.util.Arrays; @@ -60,7 +60,7 @@ public class TestUtils { final Metadata.Headers requestHeaders, ServerCallHandler next) { ServerCall.Listener listener = next.startCall(method, - new ServerInterceptors.ForwardingServerCall(call) { + new SimpleForwardingServerCall(call) { boolean sentHeaders; @Override