Make serverInterceptor use MethodDescriptor

This commit is contained in:
Carl Mastrangelo 2015-08-06 16:17:21 -07:00
parent 6ff7b220b6
commit b141093b3b
10 changed files with 116 additions and 73 deletions

View File

@ -240,9 +240,10 @@ public abstract class AbstractBenchmark {
.addMethod(unaryMethod,
new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(String fullMethodName,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
public ServerCall.Listener<ByteBuf> startCall(
MethodDescriptor<ByteBuf, ByteBuf> method,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
call.request(1);
return new ServerCall.Listener<ByteBuf>() {
@Override
@ -271,9 +272,10 @@ public abstract class AbstractBenchmark {
.addMethod(pingPongMethod,
new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(String fullMethodName,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
public ServerCall.Listener<ByteBuf> startCall(
MethodDescriptor<ByteBuf, ByteBuf> method,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
call.request(1);
return new ServerCall.Listener<ByteBuf>() {
@Override
@ -304,9 +306,10 @@ public abstract class AbstractBenchmark {
.addMethod(flowControlledStreaming,
new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(String fullMethodName,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
public ServerCall.Listener<ByteBuf> startCall(
MethodDescriptor<ByteBuf, ByteBuf> method,
final ServerCall<ByteBuf> call,
Metadata.Headers headers) {
call.request(1);
return new ServerCall.Listener<ByteBuf>() {
@Override

View File

@ -47,10 +47,12 @@ public interface ServerCallHandler<RequestT, ResponseT> {
* Implementations must not throw an exception if they started processing that may use {@code
* call} on another thread.
*
* @param fullMethodName full qualified method name of call.
* @param method descriptor for the call
* @param call object for responding to the remote client.
* @return listener for processing incoming request messages for {@code call}
*/
ServerCall.Listener<RequestT> startCall(String fullMethodName, ServerCall<ResponseT> call,
ServerCall.Listener<RequestT> startCall(
MethodDescriptor<RequestT, ResponseT> method,
ServerCall<ResponseT> call,
Metadata.Headers headers);
}

View File

@ -348,8 +348,8 @@ public final class ServerImpl extends Server {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
stream, methodDef.getMethodDescriptor());
ServerCall.Listener<ReqT> listener
= methodDef.getServerCallHandler().startCall(fullMethodName, call, headers);
ServerCall.Listener<ReqT> listener = methodDef.getServerCallHandler()
.startCall(methodDef.getMethodDescriptor(), call, headers);
if (listener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);

View File

@ -56,13 +56,13 @@ public interface ServerInterceptor {
* Implementations must not throw an exception if they started processing that may use {@code
* call} on another thread.
*
* @param method fully qualified method name of the call
* @param method descriptor for method
* @param call object to receive response messages
* @param next next processor in the interceptor chain
* @return listener for processing incoming messages for {@code call}, never {@code null}.
*/
<RequestT, ResponseT> ServerCall.Listener<RequestT> interceptCall(
String method,
MethodDescriptor<RequestT, ResponseT> method,
ServerCall<ResponseT> call,
Metadata.Headers headers,
ServerCallHandler<RequestT, ResponseT> next);

View File

@ -109,7 +109,9 @@ public class ServerInterceptors {
}
@Override
public ServerCall.Listener<ReqT> startCall(String method, ServerCall<RespT> call,
public ServerCall.Listener<ReqT> startCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
Metadata.Headers headers) {
return interceptor.interceptCall(method, call, headers, callHandler);
}

View File

@ -183,9 +183,11 @@ public class ServerImplTest {
MethodType.UNKNOWN, "Waiter", "serve", STRING_MARSHALLER, INTEGER_MARSHALLER),
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(String fullMethodName,
ServerCall<Integer> call, Metadata.Headers headers) {
assertEquals("Waiter/serve", fullMethodName);
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
Metadata.Headers headers) {
assertEquals("Waiter/serve", method.getFullMethodName());
assertNotNull(call);
assertNotNull(headers);
assertEquals(0, headers.get(metadataKey).intValue());
@ -249,8 +251,10 @@ public class ServerImplTest {
STRING_MARSHALLER, INTEGER_MARSHALLER),
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(String fullMethodName,
ServerCall<Integer> call, Metadata.Headers headers) {
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
Metadata.Headers headers) {
throw status.asRuntimeException();
}
}).build());

View File

@ -43,6 +43,7 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import io.grpc.Metadata.Headers;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerCall.Listener;
import org.junit.After;
import org.junit.Before;
@ -67,7 +68,11 @@ public class ServerInterceptorsTest {
@SuppressWarnings("unchecked")
private ServerCallHandler<String, Integer> handler = mock(ServerCallHandler.class);
@Mock private ServerCall.Listener<String> listener;
private String methodName = "/someRandom.Name";
private MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodType.UNKNOWN,
"/someRandom.Name",
requestMarshaller,
responseMarshaller);
@Mock private ServerCall<Integer> call;
private ServerServiceDefinition serviceDefinition = ServerServiceDefinition.builder("basic")
.addMethod(
@ -81,8 +86,9 @@ public class ServerInterceptorsTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
Mockito.when(handler.startCall(
Mockito.<String>any(), Mockito.<ServerCall<Integer>>any(), Mockito.<Headers>any()))
.thenReturn(listener);
Mockito.<MethodDescriptor<String, Integer>>any(),
Mockito.<ServerCall<Integer>>any(), Mockito.<Headers>any()))
.thenReturn(listener);
}
/** Final checks for all tests. */
@ -120,17 +126,17 @@ public class ServerInterceptorsTest {
ServerServiceDefinition intercepted
= ServerInterceptors.intercept(serviceDefinition, Arrays.asList(interceptor));
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(methodName, call, headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
verify(interceptor).interceptCall(
same(methodName), same(call), same(headers), anyCallHandler());
verify(handler).startCall(methodName, call, headers);
same(method), same(call), same(headers), anyCallHandler());
verify(handler).startCall(method, call, headers);
verifyNoMoreInteractions(interceptor, handler);
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(methodName, call, headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
verify(interceptor, times(2))
.interceptCall(same(methodName), same(call), same(headers), anyCallHandler());
verify(handler, times(2)).startCall(methodName, call, headers);
.interceptCall(same(method), same(call), same(headers), anyCallHandler());
verify(handler, times(2)).startCall(method, call, headers);
verifyNoMoreInteractions(interceptor, handler);
}
@ -146,14 +152,14 @@ public class ServerInterceptorsTest {
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.<ServerInterceptor>asList(new NoopInterceptor()));
getMethod(intercepted, "basic/flow").getServerCallHandler().startCall(
methodName, call, headers);
verify(handler).startCall(methodName, call, headers);
method, call, headers);
verify(handler).startCall(method, call, headers);
verifyNoMoreInteractions(handler);
verifyZeroInteractions(handler2);
getMethod(intercepted, "basic/flow2").getServerCallHandler().startCall(
methodName, call, headers);
verify(handler2).startCall(methodName, call, headers);
method, call, headers);
verify(handler2).startCall(method, call, headers);
verifyNoMoreInteractions(handler);
verifyNoMoreInteractions(handler2);
}
@ -162,8 +168,11 @@ public class ServerInterceptorsTest {
public void callNextTwice() {
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(String method,
ServerCall<RespT> call, Headers headers, ServerCallHandler<ReqT, RespT> next) {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
Headers headers,
ServerCallHandler<ReqT, RespT> next) {
// Calling next twice is permitted, although should only rarely be useful.
assertSame(listener, next.startCall(method, call, headers));
return next.startCall(method, call, headers);
@ -172,8 +181,8 @@ public class ServerInterceptorsTest {
ServerServiceDefinition intercepted = ServerInterceptors.intercept(serviceDefinition,
interceptor);
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(methodName, call, headers));
verify(handler, times(2)).startCall(same(methodName), same(call), same(headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
verify(handler, times(2)).startCall(same(method), same(call), same(headers));
verifyNoMoreInteractions(handler);
}
@ -182,7 +191,9 @@ public class ServerInterceptorsTest {
final List<String> order = new ArrayList<String>();
handler = new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(String method, ServerCall<Integer> call,
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
Headers headers) {
order.add("handler");
return listener;
@ -190,16 +201,22 @@ public class ServerInterceptorsTest {
};
ServerInterceptor interceptor1 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(String method,
ServerCall<RespT> call, Headers headers, ServerCallHandler<ReqT, RespT> next) {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
Headers headers,
ServerCallHandler<ReqT, RespT> next) {
order.add("i1");
return next.startCall(method, call, headers);
}
};
ServerInterceptor interceptor2 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(String method,
ServerCall<RespT> call, Headers headers, ServerCallHandler<ReqT, RespT> next) {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
Headers headers,
ServerCallHandler<ReqT, RespT> next) {
order.add("i2");
return next.startCall(method, call, headers);
}
@ -210,35 +227,38 @@ public class ServerInterceptorsTest {
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.asList(interceptor1, interceptor2));
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(methodName, call, headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
assertEquals(Arrays.asList("i2", "i1", "handler"), order);
}
@Test
public void argumentsPassed() {
final String method2 = "/someOtherRandom.Method";
final MethodDescriptor<String, Integer> method2 =
MethodDescriptor.create(MethodType.UNKNOWN, "/someOtherRandom.Method", null, null);
@SuppressWarnings("unchecked")
final ServerCall<Integer> call2 = mock(ServerCall.class);
@SuppressWarnings("unchecked")
final ServerCall.Listener<String> listener2 = mock(ServerCall.Listener.class);
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(String method,
ServerCall<RespT> call, Headers headers, ServerCallHandler<ReqT, RespT> next) {
assertSame(method, methodName);
assertSame(call, ServerInterceptorsTest.this.call);
@SuppressWarnings("unchecked")
ServerCall<RespT> call2Typed = (ServerCall<RespT>) call2;
assertSame(listener, next.startCall(method2, call2Typed, headers));
@SuppressWarnings("unchecked")
ServerCall.Listener<ReqT> listener2Typed = (ServerCall.Listener<ReqT>) listener2;
return listener2Typed;
}
};
@SuppressWarnings("unchecked") // Lot's of casting for no benefit. Not intended use.
@Override
public <R1, R2> ServerCall.Listener<R1> interceptCall(
MethodDescriptor<R1, R2> methodDescriptor,
ServerCall<R2> call,
Headers headers,
ServerCallHandler<R1, R2> next) {
assertSame(method, methodDescriptor);
assertSame(call, ServerInterceptorsTest.this.call);
assertSame(listener,
next.startCall((MethodDescriptor<R1, R2>)method2, (ServerCall<R2>)call2, headers));
return (ServerCall.Listener<R1>) listener2;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.asList(interceptor));
assertSame(listener2,
getSoleMethod(intercepted).getServerCallHandler().startCall(methodName, call, headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
verify(handler).startCall(method2, call2, headers);
}
@ -263,8 +283,11 @@ public class ServerInterceptorsTest {
private static class NoopInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(String method,
ServerCall<RespT> call, Headers headers, ServerCallHandler<ReqT, RespT> next) {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
Headers headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(method, call, headers);
}
}

View File

@ -33,6 +33,7 @@ package io.grpc.examples.header;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
@ -53,7 +54,7 @@ public class HeaderServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
String method,
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
final Metadata.Headers requestHeaders,
ServerCallHandler<ReqT, RespT> next) {

View File

@ -32,6 +32,7 @@
package io.grpc.stub;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.Status;
@ -122,7 +123,9 @@ public class ServerCalls {
return new ServerCallHandler<ReqT, RespT>() {
@Override
public ServerCall.Listener<ReqT> startCall(
String fullMethodName, final ServerCall<RespT> call, Metadata.Headers headers) {
MethodDescriptor<ReqT, RespT> methodDescriptor,
final ServerCall<RespT> call,
Metadata.Headers headers) {
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, we will catch it in onMessage() and emit INVALID_ARGUMENT.
@ -171,8 +174,10 @@ public class ServerCalls {
final StreamingRequestMethod<ReqT, RespT> method) {
return new ServerCallHandler<ReqT, RespT>() {
@Override
public ServerCall.Listener<ReqT> startCall(String fullMethodName,
final ServerCall<RespT> call, Metadata.Headers headers) {
public ServerCall.Listener<ReqT> startCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
final ServerCall<RespT> call,
Metadata.Headers headers) {
call.request(1);
final ResponseObserver<RespT> responseObserver = new ResponseObserver<RespT>(call);
final StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);

View File

@ -33,6 +33,7 @@ package io.grpc.testing;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
@ -80,10 +81,11 @@ public class TestUtils {
final Set<Metadata.Key<?>> keySet = new HashSet<Metadata.Key<?>>(Arrays.asList(keys));
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(String method,
ServerCall<RespT> call,
final Metadata.Headers requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
final Metadata.Headers requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(method,
new SimpleForwardingServerCall<RespT>(call) {
boolean sentHeaders;
@ -122,10 +124,11 @@ public class TestUtils {
final AtomicReference<Metadata.Headers> headersCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(String method,
ServerCall<RespT> call,
Metadata.Headers requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
Metadata.Headers requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
headersCapture.set(requestHeaders);
return next.startCall(method, call, requestHeaders);
}