Fix the issue where the intecepting call fails in start(), does not

call super.start(), and makes the subsequent use of other methods on the
call throw IllegalStateException.

Create ClientInterceptors.CheckedForwardingCall that handles exception
in start logic.

Create Forwarding[Server]Call[Listener] for generic decoration use
cases, with an abstract delegate() for flexibility.

Create SimpleForwarding[Server]Call[Listener] to replace now deprecated
forwarding classes.
This commit is contained in:
Kun Zhang 2015-03-26 16:02:24 -07:00
parent 90706dc48f
commit 0d89d3d1bc
10 changed files with 539 additions and 141 deletions

View File

@ -37,12 +37,10 @@ import com.google.common.base.Preconditions;
import io.grpc.Call;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors.ForwardingCall;
import io.grpc.ClientInterceptors.CheckedForwardingCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@ -70,27 +68,24 @@ public class ClientAuthInterceptor implements ClientInterceptor {
Channel next) {
// TODO(ejona86): If the call fails for Auth reasons, this does not properly propagate info that
// would be in WWW-Authenticate, because it does not yet have access to the header.
return new ForwardingCall<ReqT, RespT>(next.newCall(method)) {
return new CheckedForwardingCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
try {
Metadata.Headers cachedSaved;
synchronized (ClientAuthInterceptor.this) {
// TODO(lryan): This is icky but the current auth library stores the same
// metadata map until the next refresh cycle. This will be fixed once
// https://github.com/google/google-auth-library-java/issues/3
// is resolved.
if (lastMetadata == null || lastMetadata != credentials.getRequestMetadata()) {
lastMetadata = credentials.getRequestMetadata();
cached = toHeaders(lastMetadata);
}
cachedSaved = cached;
protected void checkedStart(Listener<RespT> responseListener, Metadata.Headers headers)
throws Exception {
Metadata.Headers cachedSaved;
synchronized (ClientAuthInterceptor.this) {
// TODO(lryan): This is icky but the current auth library stores the same
// metadata map until the next refresh cycle. This will be fixed once
// https://github.com/google/google-auth-library-java/issues/3
// is resolved.
if (lastMetadata == null || lastMetadata != credentials.getRequestMetadata()) {
lastMetadata = credentials.getRequestMetadata();
cached = toHeaders(lastMetadata);
}
headers.merge(cachedSaved);
super.start(responseListener, headers);
} catch (IOException ioe) {
responseListener.onClose(Status.fromThrowable(ioe), new Metadata.Trailers());
cachedSaved = cached;
}
headers.merge(cachedSaved);
delegate().start(responseListener, headers);
}
};
}

View File

@ -34,6 +34,9 @@ package io.grpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.grpc.ForwardingCall.SimpleForwardingCall;
import io.grpc.ForwardingCallListener.SimpleForwardingCallListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@ -114,71 +117,95 @@ public class ClientInterceptors {
/**
* A {@link Call} which forwards all of it's methods to another {@link Call}.
*
* @deprecated Use {@link SimpleForwardingCall}.
*/
public static class ForwardingCall<ReqT, RespT> extends Call<ReqT, RespT> {
private final Call<ReqT, RespT> delegate;
@Deprecated
public static class ForwardingCall<ReqT, RespT> extends SimpleForwardingCall<ReqT, RespT> {
public ForwardingCall(Call<ReqT, RespT> delegate) {
super(delegate);
}
}
private static final Call<Object, Object> NOOP_CALL = new Call<Object, Object>() {
@Override
public void start(Listener<Object> responseListener, Metadata.Headers headers) { }
@Override
public void request(int numMessages) { }
@Override
public void cancel() { }
@Override
public void halfClose() { }
@Override
public void sendPayload(Object payload) { }
};
/**
* A {@link io.grpc.ForwardingCall} that delivers exceptions from its start logic to the call
* listener.
*
* <p>{@link io.grpc.ForwardingCall#start()} should not throw any exception other than those
* caused by misuse, e.g., {@link IllegalStateException}. {@code CheckedForwardingCall} provides
* {@code checkedStart()} in which throwing exceptions is allowed.
*/
public abstract static class CheckedForwardingCall<ReqT, RespT>
extends io.grpc.ForwardingCall<ReqT, RespT> {
private Call<ReqT, RespT> delegate;
/**
* Subclasses implement the start logic here that would normally belong to {@code start()}.
*
* <p>Implementation should call {@code this.delegate().start()} in the normal path. Exceptions
* may safely be thrown prior to calling {@code this.delegate().start()}. Such exceptions will
* be handled by {@code CheckedForwardingCall} and be delivered to {@code responseListener}.
* Exceptions <em>must not</em> be thrown after calling {@code this.delegate().start()}, as this
* can result in {@link Call.Listener#onClose} being called multiple times.
*/
protected abstract void checkedStart(Listener<RespT> responseListener, Metadata.Headers headers)
throws Exception;
protected CheckedForwardingCall(Call<ReqT, RespT> delegate) {
this.delegate = delegate;
}
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
this.delegate.start(responseListener, headers);
protected final Call<ReqT, RespT> delegate() {
return delegate;
}
@Override
public void request(int numMessages) {
this.delegate.request(numMessages);
}
@Override
public void cancel() {
this.delegate.cancel();
}
@Override
public void halfClose() {
this.delegate.halfClose();
}
@Override
public void sendPayload(ReqT payload) {
this.delegate.sendPayload(payload);
@SuppressWarnings("unchecked")
public final void start(Listener<RespT> responseListener, Metadata.Headers headers) {
try {
checkedStart(responseListener, headers);
} catch (Exception e) {
// Because start() doesn't throw, the caller may still try to call other methods on this
// call object. Passing these invocations to the original delegate will cause
// IllegalStateException because delegate().start() was not called. We switch the delegate
// to a NO-OP one to prevent the IllegalStateException. The user will finally get notified
// about the error through the listener.
delegate = (Call<ReqT, RespT>) NOOP_CALL;
responseListener.onClose(Status.fromThrowable(e), new Metadata.Trailers());
}
}
}
/**
* A {@link Call.Listener} which forwards all of its methods to another
* {@link Call.Listener}.
*
* @deprecated Use {@link SimpleFowardingCallListener}.
*/
public static class ForwardingListener<T> extends Call.Listener<T> {
private final Call.Listener<T> delegate;
@Deprecated
public static class ForwardingListener<T> extends SimpleForwardingCallListener<T> {
public ForwardingListener(Call.Listener<T> delegate) {
this.delegate = delegate;
}
@Override
public void onHeaders(Metadata.Headers headers) {
delegate.onHeaders(headers);
}
@Override
public void onPayload(T payload) {
delegate.onPayload(payload);
}
@Override
public void onClose(Status status, Metadata.Trailers trailers) {
delegate.onClose(status, trailers);
}
@Override
public void onReady(int numMessages) {
delegate.onReady(numMessages);
super(delegate);
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
/**
* A {@link Call} which forwards all of it's methods to another {@link Call}.
*/
public abstract class ForwardingCall<ReqT, RespT> extends Call<ReqT, RespT> {
/**
* Returns the delegated {@code Call}.
*/
protected abstract Call<ReqT, RespT> delegate();
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
this.delegate().start(responseListener, headers);
}
@Override
public void request(int numMessages) {
this.delegate().request(numMessages);
}
@Override
public void cancel() {
this.delegate().cancel();
}
@Override
public void halfClose() {
this.delegate().halfClose();
}
@Override
public void sendPayload(ReqT payload) {
this.delegate().sendPayload(payload);
}
/**
* A simplified version of {@link ForwardingCall} where subclasses can pass in a {@link Call} as
* the delegate.
*/
public abstract static class SimpleForwardingCall<ReqT, RespT>
extends ForwardingCall<ReqT, RespT> {
private final Call<ReqT, RespT> delegate;
protected SimpleForwardingCall(Call<ReqT, RespT> delegate) {
this.delegate = delegate;
}
@Override
protected Call<ReqT, RespT> delegate() {
return delegate;
}
}
}

View File

@ -0,0 +1,81 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
/**
* A {@link Call.Listener} which forwards all of its methods to another {@link Call.Listener}.
*/
public abstract class ForwardingCallListener<RespT> extends Call.Listener<RespT> {
/**
* Returns the delegated {@code Call.Listener}.
*/
protected abstract Call.Listener<RespT> delegate();
@Override
public void onHeaders(Metadata.Headers headers) {
delegate().onHeaders(headers);
}
@Override
public void onPayload(RespT payload) {
delegate().onPayload(payload);
}
@Override
public void onClose(Status status, Metadata.Trailers trailers) {
delegate().onClose(status, trailers);
}
@Override
public void onReady(int numMessages) {
delegate().onReady(numMessages);
}
/**
* A simplified version of {@link ForwardingCallListener} where subclasses can pass in a {@link
* Call.Listener} as the delegate.
*/
public abstract static class SimpleForwardingCallListener<RespT>
extends ForwardingCallListener<RespT> {
private final Call.Listener<RespT> delegate;
protected SimpleForwardingCallListener(Call.Listener<RespT> delegate) {
this.delegate = delegate;
}
@Override
protected Call.Listener<RespT> delegate() {
return delegate;
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
/**
* A {@link ServerCall} which forwards all of it's methods to another {@link ServerCall}.
*/
public abstract class ForwardingServerCall<RespT> extends ServerCall<RespT> {
/**
* Returns the delegated {@code ServerCall}.
*/
protected abstract ServerCall<RespT> delegate();
@Override
public void request(int numMessages) {
delegate().request(numMessages);
}
@Override
public void sendHeaders(Metadata.Headers headers) {
delegate().sendHeaders(headers);
}
@Override
public void sendPayload(RespT payload) {
delegate().sendPayload(payload);
}
@Override
public void close(Status status, Metadata.Trailers trailers) {
delegate().close(status, trailers);
}
@Override
public boolean isCancelled() {
return delegate().isCancelled();
}
/**
* A simplified version of {@link ForwardingServerCall} where subclasses can pass in a {@link
* ServerCall} as the delegate.
*/
public abstract static class SimpleForwardingServerCall<RespT>
extends ForwardingServerCall<RespT> {
private final ServerCall<RespT> delegate;
protected SimpleForwardingServerCall(ServerCall<RespT> delegate) {
this.delegate = delegate;
}
@Override
protected ServerCall<RespT> delegate() {
return delegate;
}
}
}

View File

@ -0,0 +1,87 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
/**
* A {@link ServerCall.Listener} which forwards all of its methods to another {@link
* ServerCall.Listener}.
*/
public abstract class ForwardingServerCallListener<ReqT> extends ServerCall.Listener<ReqT> {
/**
* Returns the delegated {@code ServerCall.Listener}.
*/
protected abstract ServerCall.Listener<ReqT> delegate();
@Override
public void onPayload(ReqT payload) {
delegate().onPayload(payload);
}
@Override
public void onHalfClose() {
delegate().onHalfClose();
}
@Override
public void onCancel() {
delegate().onCancel();
}
@Override
public void onComplete() {
delegate().onComplete();
}
@Override
public void onReady(int numMessages) {
delegate().onReady(numMessages);
}
/**
* A simplified version of {@link ForwardingServerCallListener} where subclasses can pass in a
* {@link ServerCall.Listener} as the delegate.
*/
public abstract static class SimpleForwardingServerCallListener<ReqT>
extends ForwardingServerCallListener<ReqT> {
private final ServerCall.Listener<ReqT> delegate;
protected SimpleForwardingServerCallListener(ServerCall.Listener<ReqT> delegate) {
this.delegate = delegate;
}
@Override
protected ServerCall.Listener<ReqT> delegate() {
return delegate;
}
}
}

View File

@ -34,6 +34,9 @@ package io.grpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@ -145,75 +148,25 @@ public class ServerInterceptors {
/**
* Utility base class for decorating {@link ServerCall} instances.
*
* @deprecated Use {@link SimpleForwardingServerCall}
*/
public static class ForwardingServerCall<RespT> extends ServerCall<RespT> {
private final ServerCall<RespT> delegate;
@Deprecated
public static class ForwardingServerCall<RespT> extends SimpleForwardingServerCall<RespT> {
public ForwardingServerCall(ServerCall<RespT> delegate) {
this.delegate = delegate;
}
@Override
public void request(int numMessages) {
delegate.request(numMessages);
}
@Override
public void sendHeaders(Metadata.Headers headers) {
delegate.sendHeaders(headers);
}
@Override
public void sendPayload(RespT payload) {
delegate.sendPayload(payload);
}
@Override
public void close(Status status, Metadata.Trailers trailers) {
delegate.close(status, trailers);
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
super(delegate);
}
}
/**
* Utility base class for decorating {@link ServerCall.Listener} instances.
*
* @deprecated Use {@link SimpleForwardingServerCallListener}
*/
public static class ForwardingListener<RespT> extends ServerCall.Listener<RespT> {
private final ServerCall.Listener<RespT> delegate;
public ForwardingListener(ServerCall.Listener<RespT> delegate) {
this.delegate = delegate;
}
@Override
public void onPayload(RespT payload) {
delegate.onPayload(payload);
}
@Override
public void onHalfClose() {
delegate.onHalfClose();
}
@Override
public void onCancel() {
delegate.onCancel();
}
@Override
public void onComplete() {
delegate.onComplete();
}
@Override
public void onReady(int numMessages) {
delegate.onReady(numMessages);
@Deprecated
public static class ForwardingListener<ReqT> extends SimpleForwardingServerCallListener<ReqT> {
public ForwardingListener(ServerCall.Listener<ReqT> delegate) {
super(delegate);
}
}
}

View File

@ -32,8 +32,13 @@
package io.grpc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertSame;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@ -41,8 +46,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.ClientInterceptors.ForwardingCall;
import io.grpc.ClientInterceptors.ForwardingListener;
import io.grpc.ClientInterceptors.CheckedForwardingCall;
import io.grpc.ForwardingCall.SimpleForwardingCall;
import io.grpc.ForwardingCallListener.SimpleForwardingCallListener;
import org.junit.Before;
import org.junit.Test;
@ -52,6 +58,8 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Arrays;
@ -70,9 +78,24 @@ public class ClientInterceptorsTest {
@Mock
private MethodDescriptor<String, Integer> method;
/**
* Sets up mocks.
*/
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
when(channel.newCall(Mockito.<MethodDescriptor<String, Integer>>any())).thenReturn(call);
// Emulate the precondition checks in ChannelImpl.CallImpl
Answer<Void> checkStartCalled = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
verify(call).start(Mockito.<Call.Listener<Integer>>any(), Mockito.<Metadata.Headers>any());
return null;
}
};
doAnswer(checkStartCalled).when(call).request(anyInt());
doAnswer(checkStartCalled).when(call).halfClose();
doAnswer(checkStartCalled).when(call).sendPayload(Mockito.<String>any());
}
@Test(expected = NullPointerException.class)
@ -165,7 +188,7 @@ public class ClientInterceptorsTest {
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new ForwardingCall<ReqT, RespT>(call) {
return new SimpleForwardingCall<ReqT, RespT>(call) {
@Override
public void start(Call.Listener<RespT> responseListener, Metadata.Headers headers) {
headers.put(credKey, "abcd");
@ -195,10 +218,10 @@ public class ClientInterceptorsTest {
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new ForwardingCall<ReqT, RespT>(call) {
return new SimpleForwardingCall<ReqT, RespT>(call) {
@Override
public void start(Call.Listener<RespT> responseListener, Metadata.Headers headers) {
super.start(new ForwardingListener<RespT>(responseListener) {
super.start(new SimpleForwardingCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata.Headers headers) {
examinedHeaders.add(headers);
@ -223,6 +246,67 @@ public class ClientInterceptorsTest {
assertEquals(Arrays.asList(inboundHeaders), examinedHeaders);
}
@Test
public void normalCall() {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new SimpleForwardingCall<ReqT, RespT>(call) { };
}
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
Call<String, Integer> interceptedCall = intercepted.newCall(method);
assertNotSame(call, interceptedCall);
@SuppressWarnings("unchecked")
Call.Listener<Integer> listener = mock(Call.Listener.class);
Metadata.Headers headers = new Metadata.Headers();
interceptedCall.start(listener, headers);
verify(call).start(same(listener), same(headers));
interceptedCall.sendPayload("request");
verify(call).sendPayload(eq("request"));
interceptedCall.halfClose();
verify(call).halfClose();
interceptedCall.request(1);
verify(call).request(1);
}
@Test
public void exceptionInStart() {
final Exception error = new Exception("emulated error");
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new CheckedForwardingCall<ReqT, RespT>(call) {
@Override
protected void checkedStart(Call.Listener<RespT> responseListener,
Metadata.Headers headers) throws Exception {
if (this instanceof Object) {
throw error;
}
delegate().start(responseListener, headers); // will not be called
}
};
}
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
@SuppressWarnings("unchecked")
Call.Listener<Integer> listener = mock(Call.Listener.class);
Call<String, Integer> interceptedCall = intercepted.newCall(method);
assertNotSame(call, interceptedCall);
interceptedCall.start(listener, new Metadata.Headers());
interceptedCall.sendPayload("request");
interceptedCall.halfClose();
interceptedCall.request(1);
verifyNoMoreInteractions(call);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).onClose(captor.capture(), any(Metadata.Trailers.class));
assertSame(error, captor.getValue().getCause());
}
private static class NoopInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,

View File

@ -34,8 +34,8 @@ package io.grpc.stub;
import io.grpc.Call;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors.ForwardingCall;
import io.grpc.ClientInterceptors.ForwardingListener;
import io.grpc.ForwardingCall.SimpleForwardingCall;
import io.grpc.ForwardingCallListener.SimpleForwardingCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@ -73,7 +73,7 @@ public class MetadataUtils {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
return new ForwardingCall<ReqT, RespT>(next.newCall(method)) {
return new SimpleForwardingCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
headers.merge(extraHeaders);
@ -116,12 +116,12 @@ public class MetadataUtils {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
return new ForwardingCall<ReqT, RespT>(next.newCall(method)) {
return new SimpleForwardingCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
headersCapture.set(null);
trailersCapture.set(null);
super.start(new ForwardingListener<RespT>(responseListener) {
super.start(new SimpleForwardingCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata.Headers headers) {
headersCapture.set(headers);

View File

@ -31,11 +31,11 @@
package io.grpc.testing;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import java.util.Arrays;
@ -60,7 +60,7 @@ public class TestUtils {
final Metadata.Headers requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
ServerCall.Listener<ReqT> listener = next.startCall(method,
new ServerInterceptors.ForwardingServerCall<RespT>(call) {
new SimpleForwardingServerCall<RespT>(call) {
boolean sentHeaders;
@Override