Refactor HandlerRegistry.

See #933

- Create InternalHandlerRegistry, an immutable look-up table. Handlers
  passed to ServerBuilder.addService() go to this registry. This covers
  the most common use cases. By keeping the registry internal we could
  freely change the registry's interface to accommodate optimizations,
  e.g., for hpack.

- The internal registry uses a flat fullMethodName -> handler look-up
  table instead of a hierarchical one used before. It faster because it
  saves one look-up and a substring.

- Introduces the fallback registry, settable by
  ServerBuilder.fallbackHandlerRegistry(), for advanced users who want a
  dynamic registry. Moved the current MutableHandlerRegistryImpl to
  io.grpc.util.MutableHandlerRegistry as a stock implementation of the
  fallback registry. The io.grpc.MutableHandlerRegistry interface is now
  removed.
This commit is contained in:
Kun Zhang 2016-05-03 14:12:05 -07:00
parent dc80b52da6
commit 95973827f5
11 changed files with 167 additions and 146 deletions

View File

@ -32,9 +32,9 @@
package io.grpc.benchmarks.netty; package io.grpc.benchmarks.netty;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MutableHandlerRegistryImpl;
import io.grpc.ServerMethodDefinition; import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition; import io.grpc.ServerServiceDefinition;
import io.grpc.util.MutableHandlerRegistry;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Fork;
@ -50,7 +50,7 @@ import java.util.List;
import java.util.Random; import java.util.Random;
/** /**
* Benchmark for {@link MutableHandlerRegistryImpl}. * Benchmark for {@link MutableHandlerRegistry}.
*/ */
@State(Scope.Benchmark) @State(Scope.Benchmark)
@Fork(1) @Fork(1)
@ -68,7 +68,7 @@ public class HandlerRegistryBenchmark {
@Param({"100"}) @Param({"100"})
public int methodCountPerService; public int methodCountPerService;
private MutableHandlerRegistryImpl registry; private MutableHandlerRegistry registry;
private List<String> fullMethodNames; private List<String> fullMethodNames;
/** /**
@ -76,7 +76,7 @@ public class HandlerRegistryBenchmark {
*/ */
@Setup(Level.Trial) @Setup(Level.Trial)
public void setup() throws Exception { public void setup() throws Exception {
registry = new MutableHandlerRegistryImpl(); registry = new MutableHandlerRegistry();
fullMethodNames = new ArrayList<String>(serviceCount * methodCountPerService); fullMethodNames = new ArrayList<String>(serviceCount * methodCountPerService);
for (int serviceIndex = 0; serviceIndex < serviceCount; ++serviceIndex) { for (int serviceIndex = 0; serviceIndex < serviceCount; ++serviceIndex) {
String serviceName = randomString(); String serviceName = randomString();
@ -94,7 +94,7 @@ public class HandlerRegistryBenchmark {
} }
/** /**
* Benchmark the {@link MutableHandlerRegistryImpl#lookupMethod(String)} throughput. * Benchmark the {@link MutableHandlerRegistry#lookupMethod(String)} throughput.
*/ */
@Benchmark @Benchmark
public void lookupMethod(Blackhole bh) { public void lookupMethod(Blackhole bh) {

View File

@ -75,8 +75,6 @@ public abstract class ServerBuilder<T extends ServerBuilder<T>> {
* Adds a service implementation to the handler registry. * Adds a service implementation to the handler registry.
* *
* @param service ServerServiceDefinition object * @param service ServerServiceDefinition object
* @throws UnsupportedOperationException if this builder does not support dynamically adding
* services.
*/ */
public abstract T addService(ServerServiceDefinition service); public abstract T addService(ServerServiceDefinition service);
@ -84,12 +82,17 @@ public abstract class ServerBuilder<T extends ServerBuilder<T>> {
* Adds a service implementation to the handler registry. * Adds a service implementation to the handler registry.
* *
* @param bindableService BindableService object * @param bindableService BindableService object
* @throws UnsupportedOperationException if this builder does not support dynamically adding
* services.
*/ */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1701") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1701")
public abstract T addService(BindableService bindableService); public abstract T addService(BindableService bindableService);
/**
* Sets a fallback handler registry that will be looked up in if a method is not found in the
* primary registry.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/933")
public abstract T fallbackHandlerRegistry(@Nullable HandlerRegistry fallbackRegistry);
/** /**
* Makes the server use TLS. * Makes the server use TLS.
* *
@ -104,7 +107,7 @@ public abstract class ServerBuilder<T extends ServerBuilder<T>> {
* decompressors are in {@code DecompressorRegistry.getDefaultInstance}. * decompressors are in {@code DecompressorRegistry.getDefaultInstance}.
*/ */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public abstract T decompressorRegistry(DecompressorRegistry registry); public abstract T decompressorRegistry(@Nullable DecompressorRegistry registry);
/** /**
* Set the compression registry for use in the channel. This is an advanced API call and * Set the compression registry for use in the channel. This is an advanced API call and
@ -112,7 +115,7 @@ public abstract class ServerBuilder<T extends ServerBuilder<T>> {
* compressors are in {@code CompressorRegistry.getDefaultInstance}. * compressors are in {@code CompressorRegistry.getDefaultInstance}.
*/ */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1704")
public abstract T compressorRegistry(CompressorRegistry registry); public abstract T compressorRegistry(@Nullable CompressorRegistry registry);
/** /**
* Builds a server using the given parameters. * Builds a server using the given parameters.

View File

@ -75,7 +75,8 @@ public final class ServerServiceDefinition {
* *
* @param name the fully qualified name without leading slash. E.g., "com.foo.Foo/Bar" * @param name the fully qualified name without leading slash. E.g., "com.foo.Foo/Bar"
*/ */
ServerMethodDefinition<?, ?> getMethod(String name) { @Internal
public ServerMethodDefinition<?, ?> getMethod(String name) {
return methods.get(name); return methods.get(name);
} }

View File

@ -34,7 +34,6 @@ package io.grpc.inprocess;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
import io.grpc.HandlerRegistry;
import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.AbstractServerImplBuilder;
import java.io.File; import java.io.File;
@ -48,17 +47,6 @@ import java.io.File;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1783") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1783")
public final class InProcessServerBuilder public final class InProcessServerBuilder
extends AbstractServerImplBuilder<InProcessServerBuilder> { extends AbstractServerImplBuilder<InProcessServerBuilder> {
/**
* Create a server builder that will bind with the given name.
*
* @param name the identity of the server for clients to connect to
* @param registry the registry of handlers used for dispatching incoming calls
* @return a new builder
*/
public static InProcessServerBuilder forName(String name, HandlerRegistry registry) {
return new InProcessServerBuilder(name, registry);
}
/** /**
* Create a server builder that will bind with the given name. * Create a server builder that will bind with the given name.
* *
@ -71,11 +59,6 @@ public final class InProcessServerBuilder
private final String name; private final String name;
private InProcessServerBuilder(String name, HandlerRegistry registry) {
super(registry);
this.name = Preconditions.checkNotNull(name, "name");
}
private InProcessServerBuilder(String name) { private InProcessServerBuilder(String name) {
this.name = Preconditions.checkNotNull(name, "name"); this.name = Preconditions.checkNotNull(name, "name");
} }

View File

@ -33,7 +33,6 @@ package io.grpc.internal;
import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.MoreObjects.firstNonNull;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.BindableService; import io.grpc.BindableService;
@ -42,9 +41,8 @@ import io.grpc.Context;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry; import io.grpc.HandlerRegistry;
import io.grpc.Internal; import io.grpc.Internal;
import io.grpc.MutableHandlerRegistry;
import io.grpc.MutableHandlerRegistryImpl;
import io.grpc.ServerBuilder; import io.grpc.ServerBuilder;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition; import io.grpc.ServerServiceDefinition;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -59,7 +57,18 @@ import javax.annotation.Nullable;
public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuilder<T>> public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuilder<T>>
extends ServerBuilder<T> { extends ServerBuilder<T> {
private final HandlerRegistry registry; private static final HandlerRegistry EMPTY_FALLBACK_REGISTRY = new HandlerRegistry() {
@Override public ServerMethodDefinition<?, ?> lookupMethod(String method, String authority) {
return null;
}
};
private final InternalHandlerRegistry.Builder registryBuilder =
new InternalHandlerRegistry.Builder();
@Nullable
private HandlerRegistry fallbackRegistry;
@Nullable @Nullable
private Executor executor; private Executor executor;
@ -69,20 +78,6 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
@Nullable @Nullable
private CompressorRegistry compressorRegistry; private CompressorRegistry compressorRegistry;
/**
* Constructs using a given handler registry.
*/
protected AbstractServerImplBuilder(HandlerRegistry registry) {
this.registry = Preconditions.checkNotNull(registry);
}
/**
* Constructs with a MutableHandlerRegistry created internally.
*/
protected AbstractServerImplBuilder() {
this.registry = new MutableHandlerRegistryImpl();
}
@Override @Override
public final T directExecutor() { public final T directExecutor() {
return executor(MoreExecutors.directExecutor()); return executor(MoreExecutors.directExecutor());
@ -94,36 +89,23 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
return thisT(); return thisT();
} }
/**
* Adds a service implementation to the handler registry.
*
* <p>This is supported only if the user didn't provide a handler registry, or the provided one is
* a {@link MutableHandlerRegistry}. Otherwise it throws an UnsupportedOperationException.
*
* @param service ServerServiceDefinition object.
*/
@Override @Override
public final T addService(ServerServiceDefinition service) { public final T addService(ServerServiceDefinition service) {
if (registry instanceof MutableHandlerRegistry) { registryBuilder.addService(service);
((MutableHandlerRegistry) registry).addService(service);
return thisT(); return thisT();
} }
throw new UnsupportedOperationException("Underlying HandlerRegistry is not mutable");
}
/**
* Adds a service implementation to the handler registry.
*
* <p>This is supported only if the user didn't provide a handler registry, or the provided one is
* a {@link MutableHandlerRegistry}. Otherwise it throws an UnsupportedOperationException.
*
* @param bindableService BindableService object.
*/
@Override @Override
public final T addService(BindableService bindableService) { public final T addService(BindableService bindableService) {
return addService(bindableService.bindService()); return addService(bindableService.bindService());
} }
@Override
public final T fallbackHandlerRegistry(HandlerRegistry registry) {
this.fallbackRegistry = registry;
return thisT();
}
@Override @Override
public final T decompressorRegistry(DecompressorRegistry registry) { public final T decompressorRegistry(DecompressorRegistry registry) {
decompressorRegistry = registry; decompressorRegistry = registry;
@ -139,8 +121,9 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
@Override @Override
public ServerImpl build() { public ServerImpl build() {
io.grpc.internal.InternalServer transportServer = buildTransportServer(); io.grpc.internal.InternalServer transportServer = buildTransportServer();
return new ServerImpl(executor, registry, transportServer, Context.ROOT, return new ServerImpl(executor, registryBuilder.build(),
firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()), firstNonNull(fallbackRegistry, EMPTY_FALLBACK_REGISTRY), transportServer,
Context.ROOT, firstNonNull(decompressorRegistry, DecompressorRegistry.getDefaultInstance()),
firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance())); firstNonNull(compressorRegistry, CompressorRegistry.getDefaultInstance()));
} }

View File

@ -1,5 +1,5 @@
/* /*
* Copyright 2014, Google Inc. All rights reserved. * Copyright 2016, Google Inc. All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are * modification, are permitted provided that the following conditions are
@ -29,28 +29,47 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
package io.grpc; package io.grpc.internal;
import com.google.common.collect.ImmutableMap;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import java.util.HashMap;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
/** final class InternalHandlerRegistry {
* Mutable base class of {@link HandlerRegistry}. Used by server implementations private final ImmutableMap<String, ServerMethodDefinition<?, ?>> methods;
* that need to bind and unbind services that are exposed to remote clients.
*
* @see MutableHandlerRegistryImpl
*/
@ThreadSafe
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/933")
public abstract class MutableHandlerRegistry extends HandlerRegistry {
/**
* Returns {@code null}, or previous service if {@code service} replaced an existing service.
*/
@Nullable
public abstract ServerServiceDefinition addService(ServerServiceDefinition service);
/** private InternalHandlerRegistry(ImmutableMap<String, ServerMethodDefinition<?, ?>> methods) {
* Returns {@code false} if {@code service} was not registered. this.methods = methods;
*/ }
public abstract boolean removeService(ServerServiceDefinition service);
@Nullable
ServerMethodDefinition<?, ?> lookupMethod(String methodName) {
return methods.get(methodName);
}
static class Builder {
// Store per-service first, to make sure services are added/replaced atomically.
private final HashMap<String, ServerServiceDefinition> services =
new HashMap<String, ServerServiceDefinition>();
Builder addService(ServerServiceDefinition service) {
services.put(service.getName(), service);
return this;
}
InternalHandlerRegistry build() {
ImmutableMap.Builder<String, ServerMethodDefinition<?, ?>> mapBuilder =
ImmutableMap.builder();
for (ServerServiceDefinition service : services.values()) {
for (ServerMethodDefinition<?, ?> method : service.getMethods()) {
mapBuilder.put(method.getMethodDescriptor().getFullMethodName(), method);
}
}
return new InternalHandlerRegistry(mapBuilder.build());
}
}
} }

View File

@ -80,7 +80,8 @@ public final class ServerImpl extends io.grpc.Server {
/** Executor for application processing. */ /** Executor for application processing. */
private Executor executor; private Executor executor;
private boolean usingSharedExecutor; private boolean usingSharedExecutor;
private final HandlerRegistry registry; private final InternalHandlerRegistry registry;
private final HandlerRegistry fallbackRegistry;
private boolean started; private boolean started;
private boolean shutdown; private boolean shutdown;
private boolean terminated; private boolean terminated;
@ -100,14 +101,17 @@ public final class ServerImpl extends io.grpc.Server {
/** /**
* Construct a server. * Construct a server.
* *
* @param registry the primary method registry
* @param fallbackRegistry the secondary method registry, used only if the primary registry
* doesn't have the method
* @param executor to call methods on behalf of remote clients * @param executor to call methods on behalf of remote clients
* @param registry of methods to expose to remote clients.
*/ */
ServerImpl(Executor executor, HandlerRegistry registry, InternalServer transportServer, ServerImpl(Executor executor, InternalHandlerRegistry registry, HandlerRegistry fallbackRegistry,
Context rootContext, DecompressorRegistry decompressorRegistry, InternalServer transportServer, Context rootContext,
CompressorRegistry compressorRegistry) { DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry) {
this.executor = executor; this.executor = executor;
this.registry = Preconditions.checkNotNull(registry, "registry"); this.registry = Preconditions.checkNotNull(registry, "registry");
this.fallbackRegistry = Preconditions.checkNotNull(fallbackRegistry, "fallbackRegistry");
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
// Fork from the passed in context so that it does not propagate cancellation, it only // Fork from the passed in context so that it does not propagate cancellation, it only
// inherits values. // inherits values.
@ -312,6 +316,9 @@ public final class ServerImpl extends io.grpc.Server {
ServerStreamListener listener = NOOP_LISTENER; ServerStreamListener listener = NOOP_LISTENER;
try { try {
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName); ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName);
}
if (method == null) { if (method == null) {
stream.close( stream.close(
Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),

View File

@ -29,7 +29,13 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
package io.grpc; package io.grpc.util;
import io.grpc.ExperimentalApi;
import io.grpc.HandlerRegistry;
import io.grpc.MethodDescriptor;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -45,17 +51,15 @@ import javax.annotation.concurrent.ThreadSafe;
*/ */
@ThreadSafe @ThreadSafe
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/933") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/933")
public final class MutableHandlerRegistryImpl extends MutableHandlerRegistry { public final class MutableHandlerRegistry extends HandlerRegistry {
private final ConcurrentMap<String, ServerServiceDefinition> services private final ConcurrentMap<String, ServerServiceDefinition> services
= new ConcurrentHashMap<String, ServerServiceDefinition>(); = new ConcurrentHashMap<String, ServerServiceDefinition>();
@Override
@Nullable @Nullable
public ServerServiceDefinition addService(ServerServiceDefinition service) { public ServerServiceDefinition addService(ServerServiceDefinition service) {
return services.put(service.getName(), service); return services.put(service.getName(), service);
} }
@Override
public boolean removeService(ServerServiceDefinition service) { public boolean removeService(ServerServiceDefinition service) {
return services.remove(service.getName(), service); return services.remove(service.getName(), service);
} }

View File

@ -42,6 +42,7 @@ import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.isNotNull; import static org.mockito.Matchers.isNotNull;
import static org.mockito.Matchers.notNull; import static org.mockito.Matchers.notNull;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -54,17 +55,17 @@ import io.grpc.Compressor;
import io.grpc.CompressorRegistry; import io.grpc.CompressorRegistry;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.HandlerRegistry;
import io.grpc.IntegerMarshaller; import io.grpc.IntegerMarshaller;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MutableHandlerRegistry;
import io.grpc.MutableHandlerRegistryImpl;
import io.grpc.ServerCall; import io.grpc.ServerCall;
import io.grpc.ServerCallHandler; import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition; import io.grpc.ServerServiceDefinition;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StringMarshaller; import io.grpc.StringMarshaller;
import io.grpc.util.MutableHandlerRegistry;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -75,6 +76,7 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.MockitoAnnotations; import org.mockito.MockitoAnnotations;
@ -112,10 +114,11 @@ public class ServerImplTest {
} }
private ExecutorService executor = Executors.newSingleThreadExecutor(); private ExecutorService executor = Executors.newSingleThreadExecutor();
private MutableHandlerRegistry registry = new MutableHandlerRegistryImpl(); private InternalHandlerRegistry registry = new InternalHandlerRegistry.Builder().build();
private MutableHandlerRegistry fallbackRegistry = new MutableHandlerRegistry();
private SimpleServer transportServer = new SimpleServer(); private SimpleServer transportServer = new SimpleServer();
private ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, private ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
@Mock @Mock
private ServerStream stream; private ServerStream stream;
@ -123,6 +126,9 @@ public class ServerImplTest {
@Mock @Mock
private ServerCall.Listener<String> callListener; private ServerCall.Listener<String> callListener;
@Mock
private ServerCallHandler<String, Integer> callHandler;
/** Set up for test. */ /** Set up for test. */
@Before @Before
public void startUp() throws IOException { public void startUp() throws IOException {
@ -143,8 +149,8 @@ public class ServerImplTest {
@Override @Override
public void shutdown() {} public void shutdown() {}
}; };
ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start(); server.start();
server.shutdown(); server.shutdown();
assertTrue(server.isShutdown()); assertTrue(server.isShutdown());
@ -161,8 +167,8 @@ public class ServerImplTest {
throw new AssertionError("Should not be called, because wasn't started"); throw new AssertionError("Should not be called, because wasn't started");
} }
}; };
ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.shutdown(); server.shutdown();
assertTrue(server.isShutdown()); assertTrue(server.isShutdown());
assertTrue(server.isTerminated()); assertTrue(server.isTerminated());
@ -170,8 +176,8 @@ public class ServerImplTest {
@Test @Test
public void startStopImmediateWithChildTransport() throws IOException { public void startStopImmediateWithChildTransport() throws IOException {
ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start(); server.start();
class DelayedShutdownServerTransport extends SimpleServerTransport { class DelayedShutdownServerTransport extends SimpleServerTransport {
boolean shutdown; boolean shutdown;
@ -202,8 +208,8 @@ public class ServerImplTest {
} }
} }
ServerImpl server = new ServerImpl(executor, registry, new FailingStartupServer(), ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry,
SERVER_CONTEXT, decompressorRegistry, compressorRegistry); new FailingStartupServer(), SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
try { try {
server.start(); server.start();
fail("expected exception"); fail("expected exception");
@ -218,7 +224,7 @@ public class ServerImplTest {
= Metadata.Key.of("inception", Metadata.ASCII_STRING_MARSHALLER); = Metadata.Key.of("inception", Metadata.ASCII_STRING_MARSHALLER);
final AtomicReference<ServerCall<Integer>> callReference final AtomicReference<ServerCall<Integer>> callReference
= new AtomicReference<ServerCall<Integer>>(); = new AtomicReference<ServerCall<Integer>>();
registry.addService(ServerServiceDefinition.builder("Waiter") fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
.addMethod( .addMethod(
MethodDescriptor.create( MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER), MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER),
@ -292,7 +298,7 @@ public class ServerImplTest {
public void exceptionInStartCallPropagatesToStream() throws Exception { public void exceptionInStartCallPropagatesToStream() throws Exception {
CyclicBarrier barrier = executeBarrier(executor); CyclicBarrier barrier = executeBarrier(executor);
final Status status = Status.ABORTED.withDescription("Oh, no!"); final Status status = Status.ABORTED.withDescription("Oh, no!");
registry.addService(ServerServiceDefinition.builder("Waiter") fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
.addMethod( .addMethod(
MethodDescriptor.create(MethodType.UNKNOWN, "Waiter/serve", MethodDescriptor.create(MethodType.UNKNOWN, "Waiter/serve",
STRING_MARSHALLER, INTEGER_MARSHALLER), STRING_MARSHALLER, INTEGER_MARSHALLER),
@ -340,8 +346,8 @@ public class ServerImplTest {
} }
transportServer = new MaybeDeadlockingServer(); transportServer = new MaybeDeadlockingServer();
ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start(); server.start();
new Thread() { new Thread() {
@Override @Override
@ -402,7 +408,7 @@ public class ServerImplTest {
@Test @Test
public void testCallContextIsBoundInListenerCallbacks() throws Exception { public void testCallContextIsBoundInListenerCallbacks() throws Exception {
registry.addService(ServerServiceDefinition.builder("Waiter") fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
.addMethod( .addMethod(
MethodDescriptor.create( MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER), MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER),
@ -489,7 +495,7 @@ public class ServerImplTest {
final AtomicReference<ServerCall<Integer>> callReference final AtomicReference<ServerCall<Integer>> callReference
= new AtomicReference<ServerCall<Integer>>(); = new AtomicReference<ServerCall<Integer>>();
registry.addService(ServerServiceDefinition.builder("Waiter") fallbackRegistry.addService(ServerServiceDefinition.builder("Waiter")
.addMethod( .addMethod(
MethodDescriptor.create( MethodDescriptor.create(
MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER), MethodType.UNKNOWN, "Waiter/serve", STRING_MARSHALLER, INTEGER_MARSHALLER),
@ -524,8 +530,8 @@ public class ServerImplTest {
return 65535; return 65535;
} }
}; };
ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start(); server.start();
Truth.assertThat(server.getPort()).isEqualTo(65535); Truth.assertThat(server.getPort()).isEqualTo(65535);
@ -534,8 +540,8 @@ public class ServerImplTest {
@Test @Test
public void getPortBeforeStartedFails() { public void getPortBeforeStartedFails() {
transportServer = new SimpleServer(); transportServer = new SimpleServer();
ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
thrown.expect(IllegalStateException.class); thrown.expect(IllegalStateException.class);
thrown.expectMessage("started"); thrown.expectMessage("started");
server.getPort(); server.getPort();
@ -544,8 +550,8 @@ public class ServerImplTest {
@Test @Test
public void getPortAfterTerminationFails() throws Exception { public void getPortAfterTerminationFails() throws Exception {
transportServer = new SimpleServer(); transportServer = new SimpleServer();
ServerImpl server = new ServerImpl(executor, registry, transportServer, SERVER_CONTEXT, ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
decompressorRegistry, compressorRegistry); SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start(); server.start();
server.shutdown(); server.shutdown();
server.awaitTermination(); server.awaitTermination();
@ -554,6 +560,35 @@ public class ServerImplTest {
server.getPort(); server.getPort();
} }
@Test
public void handlerRegistryPriorities() throws Exception {
HandlerRegistry fallbackRegistry = mock(HandlerRegistry.class);
MethodDescriptor<String, Integer> method1 = MethodDescriptor.create(
MethodType.UNKNOWN, "Service1/Method1", STRING_MARSHALLER, INTEGER_MARSHALLER);
registry = new InternalHandlerRegistry.Builder()
.addService(ServerServiceDefinition.builder("Service1")
.addMethod(method1, callHandler).build())
.build();
transportServer = new SimpleServer();
ServerImpl server = new ServerImpl(executor, registry, fallbackRegistry, transportServer,
SERVER_CONTEXT, decompressorRegistry, compressorRegistry);
server.start();
ServerTransportListener transportListener
= transportServer.registerNewServerTransport(new SimpleServerTransport());
// This call will be handled by callHandler from the internal registry
transportListener.streamCreated(stream, "Service1/Method1", new Metadata());
// This call will be handled by the fallbackRegistry because it's not registred in the internal
// registry.
transportListener.streamCreated(stream, "Service1/Method2", new Metadata());
verify(callHandler, timeout(2000)).startCall(same(method1),
Matchers.<ServerCall<Integer>>anyObject(), Matchers.<Metadata>anyObject());
verify(fallbackRegistry, timeout(2000)).lookupMethod("Service1/Method2", null);
verifyNoMoreInteractions(callHandler);
verifyNoMoreInteractions(fallbackRegistry);
}
/** /**
* Useful for plugging a single-threaded executor from processing tasks, or for waiting until a * Useful for plugging a single-threaded executor from processing tasks, or for waiting until a
* single-threaded executor has processed queued tasks. * single-threaded executor has processed queued tasks.

View File

@ -29,7 +29,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
package io.grpc; package io.grpc.util;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.collect.Iterables.getOnlyElement;
@ -42,6 +42,10 @@ import static org.mockito.Mockito.mock;
import io.grpc.MethodDescriptor.Marshaller; import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCallHandler;
import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
@ -49,10 +53,10 @@ import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.Mockito; import org.mockito.Mockito;
/** Unit tests for {@link MutableHandlerRegistryImpl}. */ /** Unit tests for {@link MutableHandlerRegistry}. */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class MutableHandlerRegistryImplTest { public class MutableHandlerRegistryTest {
private MutableHandlerRegistry registry = new MutableHandlerRegistryImpl(); private MutableHandlerRegistry registry = new MutableHandlerRegistry();
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private Marshaller<String> requestMarshaller = mock(Marshaller.class); private Marshaller<String> requestMarshaller = mock(Marshaller.class);
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -37,7 +37,6 @@ import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.grpc.ExperimentalApi; import io.grpc.ExperimentalApi;
import io.grpc.HandlerRegistry;
import io.grpc.Internal; import io.grpc.Internal;
import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
@ -83,18 +82,6 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
return new NettyServerBuilder(port); return new NettyServerBuilder(port);
} }
/**
* Creates a server builder that will bind to the given port and use the {@link HandlerRegistry}
* for call dispatching.
*
* @param registry the registry of handlers used for dispatching incoming calls.
* @param port the port on which to the server is to be bound.
* @return the server builder.
*/
public static NettyServerBuilder forRegistryAndPort(HandlerRegistry registry, int port) {
return new NettyServerBuilder(registry, port);
}
/** /**
* Creates a server builder configured with the given {@link SocketAddress}. * Creates a server builder configured with the given {@link SocketAddress}.
* *
@ -109,11 +96,6 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
this.address = new InetSocketAddress(port); this.address = new InetSocketAddress(port);
} }
private NettyServerBuilder(HandlerRegistry registry, int port) {
super(registry);
this.address = new InetSocketAddress(port);
}
private NettyServerBuilder(SocketAddress address) { private NettyServerBuilder(SocketAddress address) {
this.address = address; this.address = address;
} }