Reapply "Eliminate MethodDescriptor from startCall and interceptCall for servers"

This reverts commit ef178304cb, which
itself was a revert.
This commit is contained in:
Eric Anderson 2016-06-20 15:22:23 -07:00
parent 2eaa540e35
commit 66ab956f9e
47 changed files with 669 additions and 428 deletions

View File

@ -263,9 +263,15 @@ public class BenchmarkServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_UNARY_CALL,
METHOD_STREAMING_CALL);
}
public static io.grpc.ServerServiceDefinition bindService(
final BenchmarkService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_UNARY_CALL,
asyncUnaryCall(

View File

@ -366,9 +366,17 @@ public class WorkerServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_RUN_SERVER,
METHOD_RUN_CLIENT,
METHOD_CORE_COUNT,
METHOD_QUIT_WORKER);
}
public static io.grpc.ServerServiceDefinition bindService(
final WorkerService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_RUN_SERVER,
asyncBidiStreamingCall(

View File

@ -41,6 +41,7 @@ import io.grpc.Server;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.benchmarks.ByteBufOutputMarshaller;
import io.grpc.netty.NegotiationType;
@ -258,13 +259,15 @@ public abstract class AbstractBenchmark {
// Server implementation of unary & streaming methods
serverBuilder.addService(
ServerServiceDefinition.builder("benchmark")
.addMethod(unaryMethod,
new ServerCallHandler<ByteBuf, ByteBuf>() {
ServerServiceDefinition.builder(
new ServiceDescriptor("benchmark",
unaryMethod,
pingPongMethod,
flowControlledStreaming))
.addMethod(unaryMethod, new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(
MethodDescriptor<ByteBuf, ByteBuf> method,
final ServerCall<ByteBuf> call,
final ServerCall<ByteBuf, ByteBuf> call,
Metadata headers) {
call.sendHeaders(new Metadata());
call.request(1);
@ -292,12 +295,10 @@ public abstract class AbstractBenchmark {
};
}
})
.addMethod(pingPongMethod,
new ServerCallHandler<ByteBuf, ByteBuf>() {
.addMethod(pingPongMethod, new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(
MethodDescriptor<ByteBuf, ByteBuf> method,
final ServerCall<ByteBuf> call,
final ServerCall<ByteBuf, ByteBuf> call,
Metadata headers) {
call.sendHeaders(new Metadata());
call.request(1);
@ -327,12 +328,10 @@ public abstract class AbstractBenchmark {
};
}
})
.addMethod(flowControlledStreaming,
new ServerCallHandler<ByteBuf, ByteBuf>() {
.addMethod(flowControlledStreaming, new ServerCallHandler<ByteBuf, ByteBuf>() {
@Override
public ServerCall.Listener<ByteBuf> startCall(
MethodDescriptor<ByteBuf, ByteBuf> method,
final ServerCall<ByteBuf> call,
final ServerCall<ByteBuf, ByteBuf> call,
Metadata headers) {
call.sendHeaders(new Metadata());
call.request(1);

View File

@ -31,9 +31,13 @@
package io.grpc.benchmarks.netty;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.util.MutableHandlerRegistry;
import org.openjdk.jmh.annotations.Benchmark;
@ -80,13 +84,21 @@ public class HandlerRegistryBenchmark {
fullMethodNames = new ArrayList<String>(serviceCount * methodCountPerService);
for (int serviceIndex = 0; serviceIndex < serviceCount; ++serviceIndex) {
String serviceName = randomString();
ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition.builder(serviceName);
ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition.builder(
new ServiceDescriptor(serviceName));
for (int methodIndex = 0; methodIndex < methodCountPerService; ++methodIndex) {
String methodName = randomString();
MethodDescriptor<?, ?> methodDescriptor = MethodDescriptor.create(
MethodDescriptor<Object, Object> methodDescriptor = MethodDescriptor.create(
MethodDescriptor.MethodType.UNKNOWN,
MethodDescriptor.generateFullMethodName(serviceName, methodName), null, null);
serviceBuilder.addMethod(ServerMethodDefinition.create(methodDescriptor, null));
serviceBuilder.addMethod(methodDescriptor,
new ServerCallHandler<Object, Object>() {
@Override
public Listener<Object> startCall(ServerCall<Object, Object> call,
Metadata headers) {
return null;
}
});
fullMethodNames.add(methodDescriptor.getFullMethodName());
}
registry.addService(serviceBuilder.build());

View File

@ -40,6 +40,7 @@ import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.benchmarks.ByteBufOutputMarshaller;
import io.grpc.benchmarks.Utils;
@ -141,7 +142,8 @@ final class LoadServer {
if (config.getServerType() == Control.ServerType.ASYNC_GENERIC_SERVER) {
serverBuilder.addService(
ServerServiceDefinition
.builder(BenchmarkServiceGrpc.SERVICE_NAME)
.builder(new ServiceDescriptor(BenchmarkServiceGrpc.SERVICE_NAME,
GENERIC_STREAMING_PING_PONG_METHOD))
.addMethod(GENERIC_STREAMING_PING_PONG_METHOD, new GenericServiceCallHandler())
.build());
} else {
@ -230,9 +232,10 @@ final class LoadServer {
}
private class GenericServiceCallHandler implements ServerCallHandler<ByteBuf, ByteBuf> {
@Override
public ServerCall.Listener<ByteBuf> startCall(MethodDescriptor<ByteBuf, ByteBuf> method,
final ServerCall<ByteBuf> call, Metadata headers) {
public ServerCall.Listener<ByteBuf> startCall(
final ServerCall<ByteBuf, ByteBuf> call, Metadata headers) {
call.sendHeaders(new Metadata());
call.request(1);
return new ServerCall.Listener<ByteBuf>() {

View File

@ -851,6 +851,31 @@ static void PrintMethodHandlerClass(const ServiceDescriptor* service,
p->Print("}\n\n");
}
static void PrintGetServiceDescriptorMethod(const ServiceDescriptor* service,
map<string, string>* vars,
Printer* p,
bool generate_nano) {
(*vars)["service_name"] = service->name();
p->Print(
*vars,
"public static $ServiceDescriptor$ getServiceDescriptor() {\n");
p->Indent();
p->Print(*vars,
"return new $ServiceDescriptor$(SERVICE_NAME");
p->Indent();
p->Indent();
for (int i = 0; i < service->method_count(); ++i) {
const MethodDescriptor* method = service->method(i);
(*vars)["method_field_name"] = MethodPropertiesFieldName(method);
p->Print(*vars, ",\n$method_field_name$");
}
p->Print(");\n");
p->Outdent();
p->Outdent();
p->Outdent();
p->Print("}\n\n");
}
static void PrintBindServiceMethod(const ServiceDescriptor* service,
map<string, string>* vars,
Printer* p,
@ -863,7 +888,7 @@ static void PrintBindServiceMethod(const ServiceDescriptor* service,
p->Indent();
p->Print(*vars,
"return "
"$ServerServiceDefinition$.builder(SERVICE_NAME)\n");
"$ServerServiceDefinition$.builder(getServiceDescriptor())\n");
p->Indent();
p->Indent();
for (int i = 0; i < service->method_count(); ++i) {
@ -994,6 +1019,7 @@ static void PrintService(const ServiceDescriptor* service,
PrintStub(service, vars, p, BLOCKING_CLIENT_IMPL, generate_nano);
PrintStub(service, vars, p, FUTURE_CLIENT_IMPL, generate_nano);
PrintMethodHandlerClass(service, vars, p, generate_nano);
PrintGetServiceDescriptorMethod(service, vars, p, generate_nano);
PrintBindServiceMethod(service, vars, p, generate_nano);
p->Outdent();
p->Print("}\n");
@ -1050,6 +1076,8 @@ void GenerateService(const ServiceDescriptor* service,
vars["BindableService"] = "io.grpc.BindableService";
vars["ServerServiceDefinition"] =
"io.grpc.ServerServiceDefinition";
vars["ServiceDescriptor"] =
"io.grpc.ServiceDescriptor";
vars["AbstractStub"] = "io.grpc.stub.AbstractStub";
vars["ImmutableList"] = "com.google.common.collect.ImmutableList";
vars["Collection"] = "java.util.Collection";

View File

@ -400,9 +400,18 @@ public class TestServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_UNARY_CALL,
METHOD_STREAMING_OUTPUT_CALL,
METHOD_STREAMING_INPUT_CALL,
METHOD_FULL_BIDI_CALL,
METHOD_HALF_BIDI_CALL);
}
public static io.grpc.ServerServiceDefinition bindService(
final TestService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_UNARY_CALL,
asyncUnaryCall(

View File

@ -400,9 +400,18 @@ public class TestServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_UNARY_CALL,
METHOD_STREAMING_OUTPUT_CALL,
METHOD_STREAMING_INPUT_CALL,
METHOD_FULL_BIDI_CALL,
METHOD_HALF_BIDI_CALL);
}
public static io.grpc.ServerServiceDefinition bindService(
final TestService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_UNARY_CALL,
asyncUnaryCall(

View File

@ -478,9 +478,18 @@ public class TestServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_UNARY_CALL,
METHOD_STREAMING_OUTPUT_CALL,
METHOD_STREAMING_INPUT_CALL,
METHOD_FULL_BIDI_CALL,
METHOD_HALF_BIDI_CALL);
}
public static io.grpc.ServerServiceDefinition bindService(
final TestService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_UNARY_CALL,
asyncUnaryCall(

View File

@ -53,7 +53,6 @@ public class Contexts {
* the client.
*
* @param context to make {@link Context#current()}.
* @param method being requested by the client.
* @param call used to send responses to client.
* @param headers received from client.
* @param next handler used to create the listener to be wrapped.
@ -61,14 +60,13 @@ public class Contexts {
*/
public static <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
Context context,
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
Context previous = context.attach();
try {
return new ContextualizedServerCallListener<ReqT>(
next.startCall(method, call, headers),
next.startCall(call, headers),
context);
} finally {
context.detach(previous);

View File

@ -34,13 +34,13 @@ package io.grpc;
/**
* A {@link ServerCall} which forwards all of it's methods to another {@link ServerCall}.
*/
public abstract class ForwardingServerCall<RespT>
extends PartialForwardingServerCall<RespT> {
public abstract class ForwardingServerCall<ReqT, RespT>
extends PartialForwardingServerCall<ReqT, RespT> {
/**
* Returns the delegated {@code ServerCall}.
*/
@Override
protected abstract ServerCall<RespT> delegate();
protected abstract ServerCall<ReqT, RespT> delegate();
@Override
public void sendMessage(RespT message) {
@ -51,18 +51,23 @@ public abstract class ForwardingServerCall<RespT>
* A simplified version of {@link ForwardingServerCall} where subclasses can pass in a {@link
* ServerCall} as the delegate.
*/
public abstract static class SimpleForwardingServerCall<RespT>
extends ForwardingServerCall<RespT> {
public abstract static class SimpleForwardingServerCall<ReqT, RespT>
extends ForwardingServerCall<ReqT, RespT> {
private final ServerCall<RespT> delegate;
private final ServerCall<ReqT, RespT> delegate;
protected SimpleForwardingServerCall(ServerCall<RespT> delegate) {
protected SimpleForwardingServerCall(ServerCall<ReqT, RespT> delegate) {
this.delegate = delegate;
}
@Override
protected ServerCall<RespT> delegate() {
protected ServerCall<ReqT, RespT> delegate() {
return delegate;
}
@Override
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return delegate.getMethodDescriptor();
}
}
}

View File

@ -31,6 +31,8 @@
package io.grpc;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;

View File

@ -35,11 +35,11 @@ package io.grpc;
* A {@link ServerCall} which forwards all of it's methods to another {@link ServerCall} which
* may have a different onMessage() message type.
*/
abstract class PartialForwardingServerCall<RespT> extends ServerCall<RespT> {
abstract class PartialForwardingServerCall<ReqT, RespT> extends ServerCall<ReqT, RespT> {
/**
* Returns the delegated {@code ServerCall}.
*/
protected abstract ServerCall<?> delegate();
protected abstract ServerCall<?, ?> delegate();
@Override
public void request(int numMessages) {

View File

@ -49,9 +49,10 @@ import javax.net.ssl.SSLSession;
*
* <p>Methods are guaranteed to be non-blocking. Implementations are not required to be thread-safe.
*
* @param <ReqT> parsed type of request message.
* @param <RespT> parsed type of response message.
*/
public abstract class ServerCall<RespT> {
public abstract class ServerCall<ReqT, RespT> {
/**
* {@link Attributes.Key} for the remote address of server call attributes
* {@link ServerCall#attributes()}
@ -226,4 +227,10 @@ public abstract class ServerCall<RespT> {
public Attributes attributes() {
return Attributes.EMPTY;
}
/**
* The {@link MethodDescriptor} for the call.
*/
public abstract MethodDescriptor<ReqT, RespT> getMethodDescriptor();
}

View File

@ -47,12 +47,10 @@ public interface ServerCallHandler<RequestT, ResponseT> {
* Implementations must not throw an exception if they started processing that may use {@code
* call} on another thread.
*
* @param method descriptor for the call
* @param call object for responding to the remote client.
* @return listener for processing incoming request messages for {@code call}
*/
ServerCall.Listener<RequestT> startCall(
MethodDescriptor<RequestT, ResponseT> method,
ServerCall<ResponseT> call,
ServerCall<RequestT, ResponseT> call,
Metadata headers);
}

View File

@ -56,14 +56,12 @@ public interface ServerInterceptor {
* Implementations must not throw an exception if they started processing that may use {@code
* call} on another thread.
*
* @param method descriptor for method
* @param call object to receive response messages
* @param next next processor in the interceptor chain
* @return listener for processing incoming messages for {@code call}, never {@code null}.
*/
<ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next);
}

View File

@ -33,6 +33,8 @@ package io.grpc;
import com.google.common.base.Preconditions;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import java.io.BufferedInputStream;
import java.io.InputStream;
import java.util.ArrayList;
@ -119,7 +121,7 @@ public class ServerInterceptors {
return serviceDef;
}
ServerServiceDefinition.Builder serviceDefBuilder
= ServerServiceDefinition.builder(serviceDef.getName());
= ServerServiceDefinition.builder(serviceDef.getServiceDescriptor());
for (ServerMethodDefinition<?, ?> method : serviceDef.getMethods()) {
wrapAndAddMethod(serviceDefBuilder, method, interceptors);
}
@ -178,8 +180,11 @@ public class ServerInterceptors {
public static <T> ServerServiceDefinition useMarshalledMessages(
final ServerServiceDefinition serviceDef,
final MethodDescriptor.Marshaller<T> marshaller) {
final ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition
.builder(serviceDef.getName());
List<ServerMethodDefinition<?, ?>> wrappedMethods =
new ArrayList<ServerMethodDefinition<?, ?>>();
List<MethodDescriptor<?, ?>> wrappedDescriptors =
new ArrayList<MethodDescriptor<?, ?>>();
// Wrap the descriptors
for (final ServerMethodDefinition<?, ?> definition : serviceDef.getMethods()) {
final MethodDescriptor<?, ?> originalMethodDescriptor = definition.getMethodDescriptor();
final MethodDescriptor<T, T> wrappedMethodDescriptor = MethodDescriptor
@ -187,7 +192,16 @@ public class ServerInterceptors {
originalMethodDescriptor.getFullMethodName(),
marshaller,
marshaller);
serviceBuilder.addMethod(wrapMethod(definition, wrappedMethodDescriptor));
wrappedDescriptors.add(wrappedMethodDescriptor);
wrappedMethods.add(wrapMethod(definition, wrappedMethodDescriptor));
}
// Build the new service descriptor
final ServerServiceDefinition.Builder serviceBuilder = ServerServiceDefinition
.builder(new ServiceDescriptor(serviceDef.getServiceDescriptor().getName(),
wrappedDescriptors));
// Create the new service definiton.
for (ServerMethodDefinition<?, ?> definition : wrappedMethods) {
serviceBuilder.addMethod(definition);
}
return serviceBuilder.build();
}
@ -219,10 +233,9 @@ public class ServerInterceptors {
@Override
public ServerCall.Listener<ReqT> startCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers) {
return interceptor.interceptCall(method, call, headers, callHandler);
return interceptor.interceptCall(call, headers, callHandler);
}
}
@ -243,12 +256,12 @@ public class ServerInterceptors {
return new ServerCallHandler<WReqT, WRespT>() {
@Override
public ServerCall.Listener<WReqT> startCall(
final MethodDescriptor<WReqT, WRespT> method,
final ServerCall<WRespT> call,
final ServerCall<WReqT, WRespT> call,
final Metadata headers) {
final ServerCall<ORespT> unwrappedCall = new PartialForwardingServerCall<ORespT>() {
final ServerCall<OReqT, ORespT> unwrappedCall =
new PartialForwardingServerCall<OReqT, ORespT>() {
@Override
protected ServerCall<WRespT> delegate() {
protected ServerCall<WReqT, WRespT> delegate() {
return call;
}
@ -258,10 +271,15 @@ public class ServerInterceptors {
final WRespT wrappedMessage = wrappedMethod.parseResponse(is);
delegate().sendMessage(wrappedMessage);
}
@Override
public MethodDescriptor<OReqT, ORespT> getMethodDescriptor() {
return originalMethod;
}
};
final ServerCall.Listener<OReqT> originalListener = originalHandler
.startCall(originalMethod, unwrappedCall, headers);
.startCall(unwrappedCall, headers);
return new PartialForwardingServerCallListener<WReqT>() {
@Override

View File

@ -1,83 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
/**
* Definition of a method exposed by a {@link Server}.
*
* @see ServerServiceDefinition
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1774")
public final class ServerMethodDefinition<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final ServerCallHandler<ReqT, RespT> handler;
private ServerMethodDefinition(MethodDescriptor<ReqT, RespT> method,
ServerCallHandler<ReqT, RespT> handler) {
this.method = method;
this.handler = handler;
}
/**
* Create a new instance.
*
* @param method the {@link MethodDescriptor} for this method.
* @param handler to dispatch calls to.
* @return a new instance.
*/
public static <ReqT, RespT> ServerMethodDefinition<ReqT, RespT> create(
MethodDescriptor<ReqT, RespT> method,
ServerCallHandler<ReqT, RespT> handler) {
return new ServerMethodDefinition<ReqT, RespT>(method, handler);
}
/** The {@code MethodDescriptor} for this method. */
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return method;
}
/** Handler for incoming calls. */
public ServerCallHandler<ReqT, RespT> getServerCallHandler() {
return handler;
}
/**
* Create a new method definition with a different call handler.
*
* @param handler to bind to a cloned instance of this.
* @return a cloned instance of this with the new handler bound.
*/
public ServerMethodDefinition<ReqT, RespT> withServerCallHandler(
ServerCallHandler<ReqT, RespT> handler) {
return new ServerMethodDefinition<ReqT, RespT>(method, handler);
}
}

View File

@ -43,26 +43,27 @@ import java.util.Map;
/** Definition of a service to be exposed via a Server. */
public final class ServerServiceDefinition {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1774")
public static Builder builder(String serviceName) {
return new Builder(serviceName);
public static Builder builder(ServiceDescriptor serviceDescriptor) {
return new Builder(serviceDescriptor);
}
private final String name;
private final ServiceDescriptor serviceDescriptor;
private final ImmutableMap<String, ServerMethodDefinition<?, ?>> methods;
private ServerServiceDefinition(
String name, Map<String, ServerMethodDefinition<?, ?>> methods) {
this.name = checkNotNull(name);
ServiceDescriptor serviceDescriptor, Map<String, ServerMethodDefinition<?, ?>> methods) {
this.serviceDescriptor = checkNotNull(serviceDescriptor);
this.methods = ImmutableMap.copyOf(methods);
}
/** Simple name of the service. It is not an absolute path. */
public String getName() {
return name;
/**
* The descriptor for the service.
*/
public ServiceDescriptor getServiceDescriptor() {
return serviceDescriptor;
}
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1774")
public Collection<ServerMethodDefinition<?, ?>> getMethods() {
return methods.values();
}
@ -70,21 +71,24 @@ public final class ServerServiceDefinition {
/**
* Look up a method by its fully qualified name.
*
* @param name the fully qualified name without leading slash. E.g., "com.foo.Foo/Bar"
* @param methodName the fully qualified name without leading slash. E.g., "com.foo.Foo/Bar"
*/
@Internal
public ServerMethodDefinition<?, ?> getMethod(String name) {
return methods.get(name);
public ServerMethodDefinition<?, ?> getMethod(String methodName) {
return methods.get(methodName);
}
/** Builder for constructing Service instances. */
/**
* Builder for constructing Service instances.
*/
public static final class Builder {
private final String serviceName;
private final ServiceDescriptor serviceDescriptor;
private final Map<String, ServerMethodDefinition<?, ?>> methods =
new HashMap<String, ServerMethodDefinition<?, ?>>();
private Builder(String serviceName) {
this.serviceName = serviceName;
private Builder(ServiceDescriptor serviceDescriptor) {
this.serviceDescriptor = serviceDescriptor;
}
/**
@ -93,7 +97,6 @@ public final class ServerServiceDefinition {
* @param method the {@link MethodDescriptor} of this method.
* @param handler handler for incoming calls
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1774")
public <ReqT, RespT> Builder addMethod(
MethodDescriptor<ReqT, RespT> method, ServerCallHandler<ReqT, RespT> handler) {
return addMethod(ServerMethodDefinition.create(
@ -102,22 +105,97 @@ public final class ServerServiceDefinition {
}
/** Add a method to be supported by the service. */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1774")
public <ReqT, RespT> Builder addMethod(ServerMethodDefinition<ReqT, RespT> def) {
MethodDescriptor<ReqT, RespT> method = def.getMethodDescriptor();
checkArgument(
serviceName.equals(MethodDescriptor.extractFullServiceName(method.getFullMethodName())),
serviceDescriptor.getName().equals(
MethodDescriptor.extractFullServiceName(method.getFullMethodName())),
"Service name mismatch. Expected service name: '%s'. Actual method name: '%s'.",
this.serviceName, method.getFullMethodName());
serviceDescriptor.getName(), method.getFullMethodName());
String name = method.getFullMethodName();
checkState(!methods.containsKey(name), "Method by same name already registered: %s", name);
methods.put(name, def);
return this;
}
/** Construct new ServerServiceDefinition. */
/**
* Construct new ServerServiceDefinition.
*/
public ServerServiceDefinition build() {
return new ServerServiceDefinition(serviceName, methods);
Map<String, ServerMethodDefinition<?, ?>> tmpMethods =
new HashMap<String, ServerMethodDefinition<?, ?>>(methods);
for (MethodDescriptor<?, ?> descriptorMethod : serviceDescriptor.getMethods()) {
ServerMethodDefinition<?, ?> removed = tmpMethods.remove(
descriptorMethod.getFullMethodName());
if (removed == null) {
throw new IllegalStateException(
"No method bound for descriptor entry " + descriptorMethod.getFullMethodName());
}
if (removed.getMethodDescriptor() != descriptorMethod) {
throw new IllegalStateException(
"Bound method for " + descriptorMethod.getFullMethodName()
+ " not same instance as method in service descriptor");
}
}
if (tmpMethods.size() > 0) {
throw new IllegalStateException(
"No entry in descriptor matching bound method "
+ tmpMethods.values().iterator().next().getMethodDescriptor().getFullMethodName());
}
return new ServerServiceDefinition(serviceDescriptor, methods);
}
}
/**
* Definition of a method exposed by a {@link Server}.
*/
public static final class ServerMethodDefinition<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final ServerCallHandler<ReqT, RespT> handler;
private ServerMethodDefinition(MethodDescriptor<ReqT, RespT> method,
ServerCallHandler<ReqT, RespT> handler) {
this.method = method;
this.handler = handler;
}
/**
* Create a new instance.
*
* @param method the {@link MethodDescriptor} for this method.
* @param handler to dispatch calls to.
* @return a new instance.
*/
public static <ReqT, RespT> ServerMethodDefinition<ReqT, RespT> create(
MethodDescriptor<ReqT, RespT> method,
ServerCallHandler<ReqT, RespT> handler) {
return new ServerMethodDefinition<ReqT, RespT>(method, handler);
}
/**
* The {@code MethodDescriptor} for this method.
*/
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return method;
}
/**
* Handler for incoming calls.
*/
public ServerCallHandler<ReqT, RespT> getServerCallHandler() {
return handler;
}
/**
* Create a new method definition with a different call handler.
*
* @param handler to bind to a cloned instance of this.
* @return a cloned instance of this with the new handler bound.
*/
public ServerMethodDefinition<ReqT, RespT> withServerCallHandler(
ServerCallHandler<ReqT, RespT> handler) {
return new ServerMethodDefinition<ReqT, RespT>(method, handler);
}
}
}

View File

@ -0,0 +1,69 @@
/*
* Copyright 2016, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Descriptor for a service.
*/
public class ServiceDescriptor {
private final String name;
private final List<MethodDescriptor<?, ?>> methods;
public ServiceDescriptor(String name, MethodDescriptor<?, ?>... methods) {
this(name, Arrays.asList(methods));
}
public ServiceDescriptor(String name, List<MethodDescriptor<?, ?>> methods) {
this.name = Preconditions.checkNotNull(name);
this.methods = Collections.unmodifiableList(new ArrayList<MethodDescriptor<?, ?>>(methods));
}
/** Simple name of the service. It is not an absolute path. */
public String getName() {
return name;
}
/**
* A list of {@link MethodDescriptor} instances describing the methods exposed by the service.
*/
public List<MethodDescriptor<?, ?>> getMethods() {
return methods;
}
}

View File

@ -42,8 +42,8 @@ import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.Internal;
import io.grpc.ServerBuilder;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import java.util.concurrent.Executor;
@ -58,7 +58,10 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
extends ServerBuilder<T> {
private static final HandlerRegistry EMPTY_FALLBACK_REGISTRY = new HandlerRegistry() {
@Override public ServerMethodDefinition<?, ?> lookupMethod(String method, String authority) {
@Override
public ServerMethodDefinition<?, ?> lookupMethod(String methodName,
@Nullable String authority) {
return null;
}
};

View File

@ -33,8 +33,8 @@ package io.grpc.internal;
import com.google.common.collect.ImmutableMap;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import java.util.HashMap;
import javax.annotation.Nullable;
@ -57,7 +57,7 @@ final class InternalHandlerRegistry {
new HashMap<String, ServerServiceDefinition>();
Builder addService(ServerServiceDefinition service) {
services.put(service.getName(), service);
services.put(service.getServiceDescriptor().getName(), service);
return this;
}

View File

@ -58,7 +58,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.List;
final class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private final ServerStream stream;
private final MethodDescriptor<ReqT, RespT> method;
private final Context.CancellableContext context;
@ -196,6 +196,11 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
return stream.attributes();
}
@Override
public MethodDescriptor<ReqT, RespT> getMethodDescriptor() {
return method;
}
/**
* All of these callbacks are assumed to called on an application thread, and the caller is
* responsible for handling thrown exceptions.

View File

@ -47,7 +47,7 @@ import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import io.grpc.Status;
import java.io.IOException;
@ -404,8 +404,8 @@ public final class ServerImpl extends io.grpc.Server {
ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
stream, methodDef.getMethodDescriptor(), headers, context, decompressorRegistry,
compressorRegistry);
ServerCall.Listener<ReqT> listener = methodDef.getServerCallHandler()
.startCall(methodDef.getMethodDescriptor(), call, headers);
ServerCall.Listener<ReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
if (listener == null) {
throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName);

View File

@ -34,8 +34,8 @@ package io.grpc.util;
import io.grpc.ExperimentalApi;
import io.grpc.HandlerRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -57,11 +57,11 @@ public final class MutableHandlerRegistry extends HandlerRegistry {
@Nullable
public ServerServiceDefinition addService(ServerServiceDefinition service) {
return services.put(service.getName(), service);
return services.put(service.getServiceDescriptor().getName(), service);
}
public boolean removeService(ServerServiceDefinition service) {
return services.remove(service.getName(), service);
return services.remove(service.getServiceDescriptor().getName(), service);
}
/**

View File

@ -66,7 +66,7 @@ public class ContextsTest {
@SuppressWarnings("unchecked")
private MethodDescriptor<Object, Object> method = mock(MethodDescriptor.class);
@SuppressWarnings("unchecked")
private ServerCall<Object> call = mock(ServerCall.class);
private ServerCall<Object, Object> call = mock(ServerCall.class);
private Metadata headers = new Metadata();
@Test
@ -101,11 +101,11 @@ public class ContextsTest {
methodCalls.add(5);
}
};
ServerCall.Listener<Object> wrapped = interceptCall(uniqueContext, method, call, headers,
ServerCall.Listener<Object> wrapped = interceptCall(uniqueContext, call, headers,
new ServerCallHandler<Object, Object>() {
@Override
public ServerCall.Listener<Object> startCall(MethodDescriptor<Object, Object> method,
ServerCall<Object> call, Metadata headers) {
public ServerCall.Listener<Object> startCall(
ServerCall<Object, Object> call, Metadata headers) {
assertSame(ContextsTest.this.method, method);
assertSame(ContextsTest.this.call, call);
assertSame(ContextsTest.this.headers, headers);
@ -128,10 +128,10 @@ public class ContextsTest {
public void interceptCall_restoresIfNextThrows() {
Context origContext = Context.current();
try {
interceptCall(uniqueContext, method, call, headers, new ServerCallHandler<Object, Object>() {
interceptCall(uniqueContext, call, headers, new ServerCallHandler<Object, Object>() {
@Override
public ServerCall.Listener<Object> startCall(MethodDescriptor<Object, Object> method,
ServerCall<Object> call, Metadata headers) {
public ServerCall.Listener<Object> startCall(
ServerCall<Object, Object> call, Metadata headers) {
throw new RuntimeException();
}
});
@ -165,11 +165,11 @@ public class ContextsTest {
throw new RuntimeException();
}
};
ServerCall.Listener<Object> wrapped = interceptCall(uniqueContext, method, call, headers,
ServerCall.Listener<Object> wrapped = interceptCall(uniqueContext, call, headers,
new ServerCallHandler<Object, Object>() {
@Override
public ServerCall.Listener<Object> startCall(MethodDescriptor<Object, Object> method,
ServerCall<Object> call, Metadata headers) {
public ServerCall.Listener<Object> startCall(
ServerCall<Object, Object> call, Metadata headers) {
return listener;
}
});

View File

@ -45,6 +45,7 @@ import static org.mockito.Mockito.verifyZeroInteractions;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import org.junit.After;
import org.junit.Before;
@ -64,34 +65,40 @@ import java.util.List;
/** Unit tests for {@link ServerInterceptors}. */
@RunWith(JUnit4.class)
public class ServerInterceptorsTest {
@SuppressWarnings("unchecked")
private Marshaller<String> requestMarshaller = mock(Marshaller.class);
@SuppressWarnings("unchecked")
private Marshaller<Integer> responseMarshaller = mock(Marshaller.class);
@SuppressWarnings("unchecked")
private ServerCallHandler<String, Integer> handler = mock(ServerCallHandler.class);
@Mock private ServerCall.Listener<String> listener;
private MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodType.UNKNOWN,
"someRandom/Name",
requestMarshaller,
responseMarshaller);
@Mock private ServerCall<Integer> call;
private ServerServiceDefinition serviceDefinition = ServerServiceDefinition.builder("basic")
.addMethod(
MethodDescriptor.create(
MethodType.UNKNOWN, "basic/flow", requestMarshaller, responseMarshaller),
handler).build();
private Metadata headers = new Metadata();
@Mock
private Marshaller<String> requestMarshaller;
@Mock
private Marshaller<Integer> responseMarshaller;
@Mock
private ServerCallHandler<String, Integer> handler;
@Mock
private ServerCall.Listener<String> listener;
private MethodDescriptor<String, Integer> flowMethod;
@Mock
private ServerCall<String, Integer> call;
private ServerServiceDefinition serviceDefinition;
private final Metadata headers = new Metadata();
/** Set up for test. */
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
flowMethod = MethodDescriptor.create(
MethodType.UNKNOWN, "basic/flow", requestMarshaller, responseMarshaller);
Mockito.when(handler.startCall(
Mockito.<MethodDescriptor<String, Integer>>any(),
Mockito.<ServerCall<Integer>>any(), Mockito.<Metadata>any()))
Mockito.<ServerCall<String, Integer>>any(), Mockito.<Metadata>any()))
.thenReturn(listener);
serviceDefinition = ServerServiceDefinition.builder(new ServiceDescriptor("basic", flowMethod))
.addMethod(flowMethod, handler).build();
}
/** Final checks for all tests. */
@ -129,17 +136,16 @@ public class ServerInterceptorsTest {
ServerServiceDefinition intercepted
= ServerInterceptors.intercept(serviceDefinition, Arrays.asList(interceptor));
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
verify(interceptor).interceptCall(
same(method), same(call), same(headers), anyCallHandler());
verify(handler).startCall(method, call, headers);
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(interceptor).interceptCall(same(call), same(headers), anyCallHandler());
verify(handler).startCall(call, headers);
verifyNoMoreInteractions(interceptor, handler);
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(interceptor, times(2))
.interceptCall(same(method), same(call), same(headers), anyCallHandler());
verify(handler, times(2)).startCall(method, call, headers);
.interceptCall(same(call), same(headers), anyCallHandler());
verify(handler, times(2)).startCall(call, headers);
verifyNoMoreInteractions(interceptor, handler);
}
@ -147,22 +153,22 @@ public class ServerInterceptorsTest {
public void correctHandlerCalled() {
@SuppressWarnings("unchecked")
ServerCallHandler<String, Integer> handler2 = mock(ServerCallHandler.class);
serviceDefinition = ServerServiceDefinition.builder("basic")
.addMethod(MethodDescriptor.create(MethodType.UNKNOWN, "basic/flow",
requestMarshaller, responseMarshaller), handler)
.addMethod(MethodDescriptor.create(MethodType.UNKNOWN, "basic/flow2",
requestMarshaller, responseMarshaller), handler2).build();
MethodDescriptor<String, Integer> flowMethod2 = MethodDescriptor
.create(MethodType.UNKNOWN, "basic/flow2",
requestMarshaller, responseMarshaller);
serviceDefinition = ServerServiceDefinition.builder(
new ServiceDescriptor("basic", flowMethod, flowMethod2))
.addMethod(flowMethod, handler)
.addMethod(flowMethod2, handler2).build();
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.<ServerInterceptor>asList(new NoopInterceptor()));
getMethod(intercepted, "basic/flow").getServerCallHandler().startCall(
method, call, headers);
verify(handler).startCall(method, call, headers);
getMethod(intercepted, "basic/flow").getServerCallHandler().startCall(call, headers);
verify(handler).startCall(call, headers);
verifyNoMoreInteractions(handler);
verifyZeroInteractions(handler2);
verifyNoMoreInteractions(handler2);
getMethod(intercepted, "basic/flow2").getServerCallHandler().startCall(
method, call, headers);
verify(handler2).startCall(method, call, headers);
getMethod(intercepted, "basic/flow2").getServerCallHandler().startCall(call, headers);
verify(handler2).startCall(call, headers);
verifyNoMoreInteractions(handler);
verifyNoMoreInteractions(handler2);
}
@ -172,20 +178,19 @@ public class ServerInterceptorsTest {
ServerInterceptor interceptor = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Calling next twice is permitted, although should only rarely be useful.
assertSame(listener, next.startCall(method, call, headers));
return next.startCall(method, call, headers);
assertSame(listener, next.startCall(call, headers));
return next.startCall(call, headers);
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(serviceDefinition,
interceptor);
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
verify(handler, times(2)).startCall(same(method), same(call), same(headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(handler, times(2)).startCall(same(call), same(headers));
verifyNoMoreInteractions(handler);
}
@ -195,8 +200,7 @@ public class ServerInterceptorsTest {
handler = new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
ServerCall<String, Integer> call,
Metadata headers) {
order.add("handler");
return listener;
@ -205,32 +209,30 @@ public class ServerInterceptorsTest {
ServerInterceptor interceptor1 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
order.add("i1");
return next.startCall(method, call, headers);
return next.startCall(call, headers);
}
};
ServerInterceptor interceptor2 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
order.add("i2");
return next.startCall(method, call, headers);
return next.startCall(call, headers);
}
};
ServerServiceDefinition serviceDefinition = ServerServiceDefinition.builder("basic")
.addMethod(MethodDescriptor.create(MethodType.UNKNOWN, "basic/flow",
requestMarshaller, responseMarshaller), handler).build();
ServerServiceDefinition serviceDefinition = ServerServiceDefinition.builder(
new ServiceDescriptor("basic", flowMethod))
.addMethod(flowMethod, handler).build();
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.asList(interceptor1, interceptor2));
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
assertEquals(Arrays.asList("i2", "i1", "handler"), order);
}
@ -240,8 +242,7 @@ public class ServerInterceptorsTest {
handler = new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
ServerCall<String, Integer> call,
Metadata headers) {
order.add("handler");
return listener;
@ -250,41 +251,37 @@ public class ServerInterceptorsTest {
ServerInterceptor interceptor1 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
order.add("i1");
return next.startCall(method, call, headers);
return next.startCall(call, headers);
}
};
ServerInterceptor interceptor2 = new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
order.add("i2");
return next.startCall(method, call, headers);
return next.startCall(call, headers);
}
};
ServerServiceDefinition serviceDefinition = ServerServiceDefinition.builder("basic")
.addMethod(MethodDescriptor.create(MethodType.UNKNOWN, "basic/flow",
requestMarshaller, responseMarshaller), handler).build();
ServerServiceDefinition serviceDefinition = ServerServiceDefinition.builder(
new ServiceDescriptor("basic", flowMethod))
.addMethod(flowMethod, handler).build();
ServerServiceDefinition intercepted = ServerInterceptors.interceptForward(
serviceDefinition, interceptor1, interceptor2);
assertSame(listener,
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
assertEquals(Arrays.asList("i1", "i2", "handler"), order);
}
@Test
public void argumentsPassed() {
final MethodDescriptor<String, Integer> method2 = MethodDescriptor.create(
MethodType.UNKNOWN, "someOtherRandom/Method", requestMarshaller, responseMarshaller);
@SuppressWarnings("unchecked")
final ServerCall<Integer> call2 = mock(ServerCall.class);
final ServerCall<String, Integer> call2 = mock(ServerCall.class);
@SuppressWarnings("unchecked")
final ServerCall.Listener<String> listener2 = mock(ServerCall.Listener.class);
@ -292,22 +289,20 @@ public class ServerInterceptorsTest {
@SuppressWarnings("unchecked") // Lot's of casting for no benefit. Not intended use.
@Override
public <R1, R2> ServerCall.Listener<R1> interceptCall(
MethodDescriptor<R1, R2> methodDescriptor,
ServerCall<R2> call,
ServerCall<R1, R2> call,
Metadata headers,
ServerCallHandler<R1, R2> next) {
assertSame(method, methodDescriptor);
assertSame(call, ServerInterceptorsTest.this.call);
assertSame(listener,
next.startCall((MethodDescriptor<R1, R2>)method2, (ServerCall<R2>)call2, headers));
next.startCall((ServerCall<R1, R2>)call2, headers));
return (ServerCall.Listener<R1>) listener2;
}
};
ServerServiceDefinition intercepted = ServerInterceptors.intercept(
serviceDefinition, Arrays.asList(interceptor));
assertSame(listener2,
getSoleMethod(intercepted).getServerCallHandler().startCall(method, call, headers));
verify(handler).startCall(method2, call2, headers);
getSoleMethod(intercepted).getServerCallHandler().startCall(call, headers));
verify(handler).startCall(call2, headers);
}
@Test
@ -328,8 +323,7 @@ public class ServerInterceptorsTest {
ServerCallHandler<Holder, Holder> handler2 = new ServerCallHandler<Holder, Holder>() {
@Override
public Listener<Holder> startCall(final MethodDescriptor<Holder, Holder> method,
final ServerCall<Holder> call,
public Listener<Holder> startCall(final ServerCall<Holder, Holder> call,
final Metadata headers) {
return new Listener<Holder>() {
@Override
@ -341,20 +335,20 @@ public class ServerInterceptorsTest {
}
};
ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("basic")
.addMethod(
MethodDescriptor.create(
MethodType.UNKNOWN, "basic/wrapped", marshaller, marshaller),
handler2).build();
MethodDescriptor<Holder, Holder> wrappedMethod = MethodDescriptor
.create(MethodType.UNKNOWN, "basic/wrapped",
marshaller, marshaller);
ServerServiceDefinition serviceDef = ServerServiceDefinition.builder(
new ServiceDescriptor("basic", wrappedMethod))
.addMethod(wrappedMethod, handler2).build();
ServerInterceptor interceptor1 = new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ServerCall<RespT> interceptedCall = new ForwardingServerCall
.SimpleForwardingServerCall<RespT>(call) {
ServerCall<ReqT, RespT> interceptedCall = new ForwardingServerCall
.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
order.add("i1sendMessage");
@ -364,7 +358,7 @@ public class ServerInterceptorsTest {
};
ServerCall.Listener<ReqT> originalListener = next
.startCall(method, interceptedCall, headers);
.startCall(interceptedCall, headers);
return new ForwardingServerCallListener
.SimpleForwardingServerCallListener<ReqT>(originalListener) {
@Override
@ -379,12 +373,11 @@ public class ServerInterceptorsTest {
ServerInterceptor interceptor2 = new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ServerCall<RespT> interceptedCall = new ForwardingServerCall
.SimpleForwardingServerCall<RespT>(call) {
ServerCall<ReqT, RespT> interceptedCall = new ForwardingServerCall
.SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendMessage(RespT message) {
order.add("i2sendMessage");
@ -394,7 +387,7 @@ public class ServerInterceptorsTest {
};
ServerCall.Listener<ReqT> originalListener = next
.startCall(method, interceptedCall, headers);
.startCall(interceptedCall, headers);
return new ForwardingServerCallListener
.SimpleForwardingServerCallListener<ReqT>(originalListener) {
@Override
@ -414,12 +407,11 @@ public class ServerInterceptorsTest {
.intercept(inputStreamMessageService, interceptor2);
ServerMethodDefinition<InputStream, InputStream> serverMethod =
(ServerMethodDefinition<InputStream, InputStream>) intercepted2.getMethod("basic/wrapped");
MethodDescriptor<InputStream, InputStream> method2 = serverMethod.getMethodDescriptor();
ServerCall<InputStream> call2 = mock(ServerCall.class);
ServerCall<InputStream, InputStream> call2 = mock(ServerCall.class);
byte[] bytes = {};
serverMethod
.getServerCallHandler()
.startCall(method2, call2, headers)
.startCall(call2, headers)
.onMessage(new ByteArrayInputStream(bytes));
assertEquals(
Arrays.asList("i2onMessage", "i1onMessage", "handler", "i1sendMessage", "i2sendMessage"),
@ -448,11 +440,10 @@ public class ServerInterceptorsTest {
private static class NoopInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(method, call, headers);
return next.startCall(call, headers);
}
}

View File

@ -63,6 +63,7 @@ import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.util.MutableHandlerRegistry;
@ -311,19 +312,20 @@ public class ServerImplTest {
public void basicExchangeSuccessful() throws Exception {
final Metadata.Key<String> metadataKey
= Metadata.Key.of("inception", Metadata.ASCII_STRING_MARSHALLER);
final AtomicReference<ServerCall<Integer>> callReference
= new AtomicReference<ServerCall<Integer>>();
fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
final AtomicReference<ServerCall<String, Integer>> callReference
= new AtomicReference<ServerCall<String, Integer>>();
MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER);
fallbackRegistry.addService(ServerServiceDefinition.builder(
new ServiceDescriptor("Waiter", method))
.addMethod(
MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER),
method,
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
ServerCall<String, Integer> call,
Metadata headers) {
assertEquals("Waiter/serve", method.getFullMethodName());
assertEquals("Waiter/serve", call.getMethodDescriptor().getFullMethodName());
assertNotNull(call);
assertNotNull(headers);
assertEquals("value", headers.get(metadataKey));
@ -341,7 +343,7 @@ public class ServerImplTest {
assertNotNull(streamListener);
executeBarrier(executor).await();
ServerCall<Integer> call = callReference.get();
ServerCall<String, Integer> call = callReference.get();
assertNotNull(call);
String order = "Lots of pizza, please";
@ -387,15 +389,16 @@ public class ServerImplTest {
public void exceptionInStartCallPropagatesToStream() throws Exception {
CyclicBarrier barrier = executeBarrier(executor);
final Status status = Status.ABORTED.withDescription("Oh, no!");
fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
.addMethod(
MethodDescriptor.create(MethodType.UNKNOWN, "Waiter/serve",
STRING_MARSHALLER, INTEGER_MARSHALLER),
MethodDescriptor<String, Integer> method = MethodDescriptor
.create(MethodType.UNKNOWN, "Waiter/serve",
STRING_MARSHALLER, INTEGER_MARSHALLER);
fallbackRegistry.addService(ServerServiceDefinition.builder(
new ServiceDescriptor("Waiter", method))
.addMethod(method,
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
ServerCall<String, Integer> call,
Metadata headers) {
throw status.asRuntimeException();
}
@ -497,15 +500,16 @@ public class ServerImplTest {
@Test
public void testCallContextIsBoundInListenerCallbacks() throws Exception {
fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER);
fallbackRegistry.addService(ServerServiceDefinition.builder(
new ServiceDescriptor("Waiter", method))
.addMethod(
MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER),
method,
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
ServerCall<String, Integer> call,
Metadata headers) {
// Check that the current context is a descendant of SERVER_CONTEXT
final Context initial = Context.current();
@ -582,17 +586,17 @@ public class ServerImplTest {
}
};
final AtomicReference<ServerCall<Integer>> callReference
= new AtomicReference<ServerCall<Integer>>();
fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
.addMethod(
MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER),
final AtomicReference<ServerCall<String, Integer>> callReference
= new AtomicReference<ServerCall<String, Integer>>();
MethodDescriptor<String, Integer> method = MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER);
fallbackRegistry.addService(ServerServiceDefinition.builder(
new ServiceDescriptor("Waiter", method))
.addMethod(method,
new ServerCallHandler<String, Integer>() {
@Override
public ServerCall.Listener<String> startCall(
MethodDescriptor<String, Integer> method,
ServerCall<Integer> call,
ServerCall<String, Integer> call,
Metadata headers) {
callReference.set(call);
return callListener;
@ -655,7 +659,7 @@ public class ServerImplTest {
MethodDescriptor<String, Integer> method1 = MethodDescriptor.create(
MethodType.UNKNOWN, "Service1/Method1", STRING_MARSHALLER, INTEGER_MARSHALLER);
registry = new InternalHandlerRegistry.Builder()
.addService(ServerServiceDefinition.builder("Service1")
.addService(ServerServiceDefinition.builder(new ServiceDescriptor("Service1", method1))
.addMethod(method1, callHandler).build())
.build();
transportServer = new SimpleServer();
@ -671,8 +675,8 @@ public class ServerImplTest {
// registry.
transportListener.streamCreated(stream, "Service1/Method2", new Metadata());
verify(callHandler, timeout(2000)).startCall(same(method1),
Matchers.<ServerCall<Integer>>anyObject(), Matchers.<Metadata>anyObject());
verify(callHandler, timeout(2000)).startCall(Matchers.<ServerCall<String, Integer>>anyObject(),
Matchers.<Metadata>anyObject());
verify(fallbackRegistry, timeout(2000)).lookupMethod("Service1/Method2", null);
verifyNoMoreInteractions(callHandler);
verifyNoMoreInteractions(fallbackRegistry);

View File

@ -31,68 +31,90 @@
package io.grpc.util;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.getOnlyElement;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCallHandler;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerServiceDefinition.ServerMethodDefinition;
import io.grpc.ServiceDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/** Unit tests for {@link MutableHandlerRegistry}. */
@RunWith(JUnit4.class)
public class MutableHandlerRegistryTest {
private MutableHandlerRegistry registry = new MutableHandlerRegistry();
@SuppressWarnings("unchecked")
private Marshaller<String> requestMarshaller = mock(Marshaller.class);
@SuppressWarnings("unchecked")
private Marshaller<Integer> responseMarshaller = mock(Marshaller.class);
@SuppressWarnings("unchecked")
private ServerCallHandler<String, Integer> handler = mock(ServerCallHandler.class);
private ServerServiceDefinition basicServiceDefinition = ServerServiceDefinition.builder("basic")
.addMethod(
MethodDescriptor.create(MethodType.UNKNOWN, "basic/flow",
requestMarshaller, responseMarshaller),
handler).build();
@Mock
private Marshaller<String> requestMarshaller;
@Mock
private Marshaller<Integer> responseMarshaller;
@Mock
private ServerCallHandler<String, Integer> flowHandler;
@Mock
private ServerCallHandler<String, Integer> coupleHandler;
@Mock
private ServerCallHandler<String, Integer> fewHandler;
@Mock
private ServerCallHandler<String, Integer> otherFlowHandler;
private ServerServiceDefinition basicServiceDefinition;
private ServerServiceDefinition multiServiceDefinition;
@SuppressWarnings("rawtypes")
private ServerMethodDefinition flowMethodDefinition =
getOnlyElement(basicServiceDefinition.getMethods());
private ServerServiceDefinition multiServiceDefinition = ServerServiceDefinition.builder("multi")
.addMethod(
MethodDescriptor.create(MethodType.UNKNOWN, "multi/couple",
requestMarshaller, responseMarshaller),
handler)
.addMethod(
MethodDescriptor.create(MethodType.UNKNOWN, "multi/few",
requestMarshaller, responseMarshaller),
handler).build();
@SuppressWarnings("rawtypes")
private ServerMethodDefinition coupleMethodDefinition =
checkNotNull(multiServiceDefinition.getMethod("multi/couple"));
@SuppressWarnings("rawtypes")
private ServerMethodDefinition fewMethodDefinition =
checkNotNull(multiServiceDefinition.getMethod("multi/few"));
private ServerMethodDefinition flowMethodDefinition;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
MethodDescriptor<String, Integer> flowMethod = MethodDescriptor
.create(MethodType.UNKNOWN, "basic/flow", requestMarshaller, responseMarshaller);
basicServiceDefinition = ServerServiceDefinition.builder(
new ServiceDescriptor("basic", flowMethod))
.addMethod(flowMethod, flowHandler)
.build();
MethodDescriptor<String, Integer> coupleMethod = MethodDescriptor
.create(MethodType.UNKNOWN, "multi/couple", requestMarshaller, responseMarshaller);
MethodDescriptor<String, Integer> fewMethod = MethodDescriptor
.create(MethodType.UNKNOWN, "multi/few", requestMarshaller, responseMarshaller);
multiServiceDefinition = ServerServiceDefinition.builder(
new ServiceDescriptor("multi", coupleMethod, fewMethod))
.addMethod(coupleMethod, coupleHandler)
.addMethod(fewMethod, fewHandler)
.build();
flowMethodDefinition = getOnlyElement(basicServiceDefinition.getMethods());
}
/** Final checks for all tests. */
@After
public void makeSureMocksUnused() {
Mockito.verifyZeroInteractions(requestMarshaller);
Mockito.verifyZeroInteractions(responseMarshaller);
Mockito.verifyZeroInteractions(handler);
Mockito.verifyNoMoreInteractions(flowHandler);
Mockito.verifyNoMoreInteractions(coupleHandler);
Mockito.verifyNoMoreInteractions(fewHandler);
}
@Test
@ -112,12 +134,12 @@ public class MutableHandlerRegistryTest {
assertNull(registry.addService(basicServiceDefinition));
assertNull(registry.addService(multiServiceDefinition));
ServerMethodDefinition<?, ?> method = registry.lookupMethod("basic/flow");
assertSame(flowMethodDefinition, method);
method = registry.lookupMethod("multi/couple");
assertSame(coupleMethodDefinition, method);
method = registry.lookupMethod("multi/few");
assertSame(fewMethodDefinition, method);
ServerCallHandler<?, ?> handler = registry.lookupMethod("basic/flow").getServerCallHandler();
assertSame(flowHandler, handler);
handler = registry.lookupMethod("multi/couple").getServerCallHandler();
assertSame(coupleHandler, handler);
handler = registry.lookupMethod("multi/few").getServerCallHandler();
assertSame(fewHandler, handler);
}
@Test
@ -134,9 +156,11 @@ public class MutableHandlerRegistryTest {
public void replaceAndLookup() {
assertNull(registry.addService(basicServiceDefinition));
assertNotNull(registry.lookupMethod("basic/flow"));
ServerServiceDefinition replaceServiceDefinition = ServerServiceDefinition.builder("basic")
.addMethod(MethodDescriptor.create(MethodType.UNKNOWN, "basic/another",
requestMarshaller, responseMarshaller), handler).build();
MethodDescriptor<String, Integer> anotherMethod = MethodDescriptor
.create(MethodType.UNKNOWN, "basic/another", requestMarshaller, responseMarshaller);
ServerServiceDefinition replaceServiceDefinition = ServerServiceDefinition.builder(
new ServiceDescriptor("basic", anotherMethod))
.addMethod(anotherMethod, flowHandler).build();
ServerMethodDefinition<?, ?> anotherMethodDefinition =
replaceServiceDefinition.getMethod("basic/another");
assertSame(basicServiceDefinition, registry.addService(replaceServiceDefinition));
@ -167,7 +191,8 @@ public class MutableHandlerRegistryTest {
@Test
public void removeMissingNameConflictFails() {
assertNull(registry.addService(basicServiceDefinition));
assertFalse(registry.removeService(ServerServiceDefinition.builder("basic").build()));
assertFalse(registry.removeService(ServerServiceDefinition.builder(
new ServiceDescriptor("basic")).build()));
}
@Test
@ -192,6 +217,7 @@ public class MutableHandlerRegistryTest {
public void addReturnsPrevious() {
assertNull(registry.addService(basicServiceDefinition));
assertSame(basicServiceDefinition,
registry.addService(ServerServiceDefinition.builder("basic").build()));
registry.addService(ServerServiceDefinition.builder(
new ServiceDescriptor("basic")).build()));
}
}

View File

@ -237,9 +237,14 @@ public class GreeterGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_SAY_HELLO);
}
public static io.grpc.ServerServiceDefinition bindService(
final Greeter serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_SAY_HELLO,
asyncUnaryCall(

View File

@ -376,9 +376,17 @@ public class RouteGuideGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_GET_FEATURE,
METHOD_LIST_FEATURES,
METHOD_RECORD_ROUTE,
METHOD_ROUTE_CHAT);
}
public static io.grpc.ServerServiceDefinition bindService(
final RouteGuide serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_GET_FEATURE,
asyncUnaryCall(

View File

@ -33,7 +33,6 @@ package io.grpc.examples.header;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
@ -53,12 +52,11 @@ public class HeaderServerInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
logger.info("header received from client:" + requestHeaders);
return next.startCall(method, new SimpleForwardingServerCall<RespT>(call) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.put(customHeadKey, "customRespondValue");

View File

@ -113,10 +113,10 @@ public class HelloJsonServer {
}
private ServerServiceDefinition bindService(final Greeter serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(GreeterGrpc.SERVICE_NAME)
.addMethod(
HelloJsonClient.HelloJsonStub.METHOD_SAY_HELLO,
asyncUnaryCall(
return io.grpc.ServerServiceDefinition
.builder(GreeterGrpc.getServiceDescriptor())
.addMethod(HelloJsonClient.HelloJsonStub.METHOD_SAY_HELLO,
asyncUnaryCall(
new UnaryMethod<HelloRequest, HelloReply>() {
@Override
public void invoke(

View File

@ -196,9 +196,14 @@ public class LoadBalancerGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_BALANCE_LOAD);
}
public static io.grpc.ServerServiceDefinition bindService(
final LoadBalancer serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_BALANCE_LOAD,
asyncBidiStreamingCall(

View File

@ -277,9 +277,15 @@ public class MetricsServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_GET_ALL_GAUGES,
METHOD_GET_GAUGE);
}
public static io.grpc.ServerServiceDefinition bindService(
final MetricsService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_GET_ALL_GAUGES,
asyncServerStreamingCall(

View File

@ -282,9 +282,15 @@ public class ReconnectServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_START,
METHOD_STOP);
}
public static io.grpc.ServerServiceDefinition bindService(
final ReconnectService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_START,
asyncUnaryCall(

View File

@ -464,9 +464,19 @@ public class TestServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_EMPTY_CALL,
METHOD_UNARY_CALL,
METHOD_STREAMING_OUTPUT_CALL,
METHOD_STREAMING_INPUT_CALL,
METHOD_FULL_DUPLEX_CALL,
METHOD_HALF_DUPLEX_CALL);
}
public static io.grpc.ServerServiceDefinition bindService(
final TestService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_EMPTY_CALL,
asyncUnaryCall(

View File

@ -241,9 +241,14 @@ public class UnimplementedServiceGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_UNIMPLEMENTED_CALL);
}
public static io.grpc.ServerServiceDefinition bindService(
final UnimplementedService serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_UNIMPLEMENTED_CALL,
asyncUnaryCall(

View File

@ -111,8 +111,8 @@ public abstract class AbstractInteropTest {
public static final Metadata.Key<Messages.SimpleContext> METADATA_KEY =
ProtoUtils.keyForProto(Messages.SimpleContext.getDefaultInstance());
private static final AtomicReference<ServerCall<?>> serverCallCapture =
new AtomicReference<ServerCall<?>>();
private static final AtomicReference<ServerCall<?, ?>> serverCallCapture =
new AtomicReference<ServerCall<?, ?>>();
private static final AtomicReference<Metadata> requestHeadersCapture =
new AtomicReference<Metadata>();
private static ScheduledExecutorService testServiceExecutor;

View File

@ -36,7 +36,6 @@ import static org.junit.Assert.fail;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
@ -207,8 +206,7 @@ public class CascadingTest {
new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
final ServerCall<RespT> call,
final ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Respond with the headers but nothing else.
@ -264,8 +262,7 @@ public class CascadingTest {
new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
final ServerCall<RespT> call,
final ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
// Respond with the headers but nothing else.

View File

@ -278,14 +278,13 @@ public class CompressionTest {
private class ServerCompressorInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> io.grpc.ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method, ServerCall<RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
if (serverEncoding) {
call.setCompression("fzip");
}
call.setMessageCompression(enableServerMessageCompression);
serverResponseHeaders = headers;
return next.startCall(method, call, headers);
return next.startCall(call, headers);
}
}

View File

@ -108,9 +108,9 @@ public class TransportCompressionTest extends AbstractInteropTest {
.decompressorRegistry(decompressors),
new ServerInterceptor() {
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Listener<ReqT> listener = next.startCall(method, call, headers);
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Listener<ReqT> listener = next.startCall(call, headers);
// TODO(carl-mastrangelo): check that encoding was set.
call.setMessageCompression(true);
return listener;

View File

@ -216,9 +216,14 @@ public class HealthGrpc {
}
}
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
return new io.grpc.ServiceDescriptor(SERVICE_NAME,
METHOD_CHECK);
}
public static io.grpc.ServerServiceDefinition bindService(
final Health serviceImpl) {
return io.grpc.ServerServiceDefinition.builder(SERVICE_NAME)
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
METHOD_CHECK,
asyncUnaryCall(

View File

@ -127,11 +127,10 @@ public class ServerCalls {
return new ServerCallHandler<ReqT, RespT>() {
@Override
public ServerCall.Listener<ReqT> startCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
final ServerCall<RespT> call,
final ServerCall<ReqT, RespT> call,
Metadata headers) {
final ServerCallStreamObserverImpl<RespT> responseObserver =
new ServerCallStreamObserverImpl<RespT>(call);
final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
// We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client
// sends more than 1 requests, ServerCall will catch it. Note that disabling auto
// inbound flow control has no effect on unary calls.
@ -190,11 +189,10 @@ public class ServerCalls {
return new ServerCallHandler<ReqT, RespT>() {
@Override
public ServerCall.Listener<ReqT> startCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
final ServerCall<RespT> call,
final ServerCall<ReqT, RespT> call,
Metadata headers) {
final ServerCallStreamObserverImpl<RespT> responseObserver =
new ServerCallStreamObserverImpl<RespT>(call);
final ServerCallStreamObserverImpl<ReqT, RespT> responseObserver =
new ServerCallStreamObserverImpl<ReqT, RespT>(call);
final StreamObserver<ReqT> requestObserver = method.invoke(responseObserver);
responseObserver.freeze();
if (responseObserver.autoFlowControlEnabled) {
@ -249,9 +247,9 @@ public class ServerCalls {
StreamObserver<ReqT> invoke(StreamObserver<RespT> responseObserver);
}
private static final class ServerCallStreamObserverImpl<RespT>
private static final class ServerCallStreamObserverImpl<ReqT, RespT>
extends ServerCallStreamObserver<RespT> {
final ServerCall<RespT> call;
final ServerCall<ReqT, RespT> call;
volatile boolean cancelled;
private boolean frozen;
private boolean autoFlowControlEnabled = true;
@ -259,7 +257,7 @@ public class ServerCalls {
private Runnable onReadyHandler;
private Runnable onCancelHandler;
ServerCallStreamObserverImpl(ServerCall<RespT> call) {
ServerCallStreamObserverImpl(ServerCall<ReqT, RespT> call) {
this.call = call;
}

View File

@ -46,6 +46,7 @@ import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
@ -259,7 +260,8 @@ public class ClientCallsTest {
@Test
public void inprocessTransportInboundFlowControl() throws Exception {
final Semaphore semaphore = new Semaphore(1);
ServerServiceDefinition service = ServerServiceDefinition.builder("some")
ServerServiceDefinition service = ServerServiceDefinition.builder(
new ServiceDescriptor("some", STREAMING_METHOD))
.addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
int iteration;
@ -338,7 +340,8 @@ public class ClientCallsTest {
final CountDownLatch latch = new CountDownLatch(1);
final Semaphore semaphore = new Semaphore(1);
final List<Integer> receivedMessages = new ArrayList<Integer>(6);
ServerServiceDefinition service = ServerServiceDefinition.builder("some")
ServerServiceDefinition service = ServerServiceDefinition.builder(
new ServiceDescriptor("some", STREAMING_METHOD))
.addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
@Override

View File

@ -45,6 +45,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServiceDescriptor;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
@ -88,7 +89,7 @@ public class ServerCallsTest {
new IntegerMarshaller(), new IntegerMarshaller());
@Mock
ServerCall<Integer> serverCall;
ServerCall<Integer, Integer> serverCall;
@Before
public void setUp() throws Exception {
@ -128,7 +129,7 @@ public class ServerCallsTest {
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callHandler.startCall(serverCall, new Metadata());
Mockito.when(serverCall.isReady()).thenReturn(true).thenReturn(false);
Mockito.when(serverCall.isCancelled()).thenReturn(false).thenReturn(true);
assertTrue(callObserver.get().isReady());
@ -160,7 +161,7 @@ public class ServerCallsTest {
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnCancelHandler(new Runnable() {
@ -188,7 +189,7 @@ public class ServerCallsTest {
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().setOnReadyHandler(new Runnable() {
@ -216,7 +217,7 @@ public class ServerCallsTest {
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callHandler.startCall(serverCall, new Metadata());
callListener.onMessage(1);
try {
callObserver.get().disableAutoInboundFlowControl();
@ -240,7 +241,7 @@ public class ServerCallsTest {
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callHandler.startCall(serverCall, new Metadata());
callListener.onReady();
// Transport should not call this if nothing has been requested but forcing it here
// to verify that message delivery does not trigger a call to request(1).
@ -261,8 +262,7 @@ public class ServerCallsTest {
serverCallObserver.disableAutoInboundFlowControl();
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(UNARY_METHOD, serverCall, new Metadata());
callHandler.startCall(serverCall, new Metadata());
// Auto inbound flow-control always requests 2 messages for unary to detect a violation
// of the unary semantic.
Mockito.verify(serverCall, times(1)).request(2);
@ -271,8 +271,6 @@ public class ServerCallsTest {
@Test
public void onReadyHandlerCalledForUnaryRequest() throws Exception {
final AtomicInteger onReadyCalled = new AtomicInteger();
final AtomicReference<ServerCallStreamObserver<Integer>> callObserver =
new AtomicReference<ServerCallStreamObserver<Integer>>();
ServerCallHandler<Integer, Integer> callHandler =
ServerCalls.asyncServerStreamingCall(
new ServerCalls.ServerStreamingMethod<Integer, Integer>() {
@ -289,7 +287,7 @@ public class ServerCallsTest {
}
});
ServerCall.Listener<Integer> callListener =
callHandler.startCall(STREAMING_METHOD, serverCall, new Metadata());
callHandler.startCall(serverCall, new Metadata());
Mockito.when(serverCall.isReady()).thenReturn(true).thenReturn(false);
Mockito.when(serverCall.isCancelled()).thenReturn(false).thenReturn(true);
callListener.onReady();
@ -309,7 +307,8 @@ public class ServerCallsTest {
@Test
public void inprocessTransportManualFlow() throws Exception {
final Semaphore semaphore = new Semaphore(1);
ServerServiceDefinition service = ServerServiceDefinition.builder("some")
ServerServiceDefinition service = ServerServiceDefinition.builder(
new ServiceDescriptor("some", STREAMING_METHOD))
.addMethod(STREAMING_METHOD, ServerCalls.asyncBidiStreamingCall(
new ServerCalls.BidiStreamingMethod<Integer, Integer>() {
int iteration;

View File

@ -34,7 +34,6 @@ package io.grpc.testing;
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerCall.SimpleForwardingServerCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
@ -85,12 +84,10 @@ public class TestUtils {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
final Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(method,
new SimpleForwardingServerCall<RespT>(call) {
return next.startCall(new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void sendHeaders(Metadata responseHeaders) {
responseHeaders.merge(requestHeaders, keySet);
@ -117,12 +114,11 @@ public class TestUtils {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
headersCapture.set(requestHeaders);
return next.startCall(method, call, requestHeaders);
return next.startCall(call, requestHeaders);
}
};
}
@ -132,16 +128,15 @@ public class TestUtils {
* {@link ServerCall#attributes()}
*/
public static ServerInterceptor recordServerCallInterceptor(
final AtomicReference<ServerCall<?>> serverCallCapture) {
final AtomicReference<ServerCall<?, ?>> serverCallCapture) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
ServerCall<RespT> call,
ServerCall<ReqT, RespT> call,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> next) {
serverCallCapture.set(call);
return next.startCall(method, call, requestHeaders);
return next.startCall(call, requestHeaders);
}
};
}