From 0003e44886dc6f7bb19b59bc50daeb3a63e69961 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Fri, 10 Jul 2015 09:40:21 -0700 Subject: [PATCH] Add simple server timeout support Reintroduce throws Add timeoutExecutor shutdown Use a default future Move timeout cancellation Cancel the timeout in error cases --- core/src/main/java/io/grpc/ChannelImpl.java | 5 +- core/src/main/java/io/grpc/Metadata.java | 7 +- core/src/main/java/io/grpc/ServerImpl.java | 89 +++++++++++++------ .../io/grpc/transport/ClientTransport.java | 4 +- .../java/io/grpc/transport/ServerStream.java | 10 ++- .../transport/netty/NettyServerStream.java | 6 ++ 6 files changed, 85 insertions(+), 36 deletions(-) diff --git a/core/src/main/java/io/grpc/ChannelImpl.java b/core/src/main/java/io/grpc/ChannelImpl.java index 5a19266eeb..03387a4758 100644 --- a/core/src/main/java/io/grpc/ChannelImpl.java +++ b/core/src/main/java/io/grpc/ChannelImpl.java @@ -306,7 +306,7 @@ public final class ChannelImpl extends Channel { } } - private class CallImpl extends ClientCall { + private final class CallImpl extends ClientCall { private final MethodDescriptor method; private final SerializingExecutor callExecutor; private final boolean unaryRequest; @@ -314,7 +314,7 @@ public final class ChannelImpl extends Channel { private ClientStream stream; private volatile ScheduledFuture deadlineCancellationFuture; - public CallImpl(MethodDescriptor method, SerializingExecutor executor, + private CallImpl(MethodDescriptor method, SerializingExecutor executor, CallOptions callOptions) { this.method = method; this.callExecutor = executor; @@ -403,6 +403,7 @@ public final class ChannelImpl extends Channel { stream.writeMessage(payloadIs); failed = false; } finally { + // TODO(notcarl): Find out if payloadIs needs to be closed. if (failed) { cancel(); } diff --git a/core/src/main/java/io/grpc/Metadata.java b/core/src/main/java/io/grpc/Metadata.java index 342c0e4fe5..cba3d1a72d 100644 --- a/core/src/main/java/io/grpc/Metadata.java +++ b/core/src/main/java/io/grpc/Metadata.java @@ -42,7 +42,6 @@ import com.google.common.collect.Lists; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import javax.annotation.concurrent.NotThreadSafe; @@ -204,9 +203,9 @@ public abstract class Metadata { Preconditions.checkState(serializable, "Can't serialize raw metadata"); byte[][] serialized = new byte[store.size() * 2][]; int i = 0; - for (Map.Entry entry : store.entries()) { - serialized[i++] = entry.getValue().key.asciiName(); - serialized[i++] = entry.getValue().getSerialized(); + for (MetadataEntry entry : store.values()) { + serialized[i++] = entry.key.asciiName(); + serialized[i++] = entry.getSerialized(); } return serialized; } diff --git a/core/src/main/java/io/grpc/ServerImpl.java b/core/src/main/java/io/grpc/ServerImpl.java index cccdbb0ba5..32b5e14c9e 100644 --- a/core/src/main/java/io/grpc/ServerImpl.java +++ b/core/src/main/java/io/grpc/ServerImpl.java @@ -33,6 +33,7 @@ package io.grpc; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.util.concurrent.Futures; import io.grpc.transport.ServerListener; import io.grpc.transport.ServerStream; @@ -42,9 +43,13 @@ import io.grpc.transport.ServerTransportListener; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -64,6 +69,8 @@ import java.util.concurrent.TimeUnit; public final class ServerImpl extends Server { private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); + private static final Future DEFAULT_TIMEOUT_FUTURE = Futures.immediateCancelledFuture(); + /** Executor for application processing. */ private final Executor executor; private final HandlerRegistry registry; @@ -77,6 +84,8 @@ public final class ServerImpl extends Server { /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ private final Collection transports = new HashSet(); + private final ScheduledExecutorService timeoutService; + /** * Construct a server. * @@ -88,6 +97,8 @@ public final class ServerImpl extends Server { this.executor = Preconditions.checkNotNull(executor, "executor"); this.registry = Preconditions.checkNotNull(registry, "registry"); this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); + // TODO(carl-mastrangelo): replace this with the shared scheduler once PR #576 is merged. + this.timeoutService = Executors.newScheduledThreadPool(1); } /** Hack to allow executors to auto-shutdown. Not for general use. */ @@ -122,6 +133,7 @@ public final class ServerImpl extends Server { } transportServer.shutdown(); shutdown = true; + timeoutService.shutdown(); return this; } @@ -224,8 +236,7 @@ public final class ServerImpl extends Server { synchronized (ServerImpl.this) { // transports collection can be modified during shutdown(), even if we hold the lock, due // to reentrancy. - for (ServerTransport transport - : transports.toArray(new ServerTransport[transports.size()])) { + for (ServerTransport transport : new ArrayList(transports)) { transport.shutdown(); } transportServerTerminated = true; @@ -249,6 +260,7 @@ public final class ServerImpl extends Server { @Override public ServerStreamListener streamCreated(final ServerStream stream, final String methodName, final Metadata.Headers headers) { + final Future timeout = scheduleTimeout(stream, headers); SerializingExecutor serializingExecutor = new SerializingExecutor(executor); final JumpToApplicationThreadServerStreamListener jumpListener = new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream); @@ -256,32 +268,53 @@ public final class ServerImpl extends Server { // are delivered, including any errors. Callbacks can still be triggered, but they will be // queued. serializingExecutor.execute(new Runnable() { - @Override - public void run() { - ServerStreamListener listener = NOOP_LISTENER; - try { - HandlerRegistry.Method method = registry.lookupMethod(methodName); - if (method == null) { - stream.close( - Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), - new Metadata.Trailers()); - return; - } - listener = startCall(stream, methodName, method.getMethodDefinition(), headers); - } catch (Throwable t) { - stream.close(Status.fromThrowable(t), new Metadata.Trailers()); - throw Throwables.propagate(t); - } finally { - jumpListener.setListener(listener); + @Override + public void run() { + ServerStreamListener listener = NOOP_LISTENER; + try { + HandlerRegistry.Method method = registry.lookupMethod(methodName); + if (method == null) { + stream.close( + Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), + new Metadata.Trailers()); + timeout.cancel(true); + return; } + listener = startCall(stream, methodName, method.getMethodDefinition(), timeout, + headers); + } catch (Throwable t) { + stream.close(Status.fromThrowable(t), new Metadata.Trailers()); + timeout.cancel(true); + throw Throwables.propagate(t); + } finally { + jumpListener.setListener(listener); } - }); + } + }); return jumpListener; } + private Future scheduleTimeout(final ServerStream stream, Metadata.Headers headers) { + Long timeoutMicros = headers.get(ChannelImpl.TIMEOUT_KEY); + if (timeoutMicros == null) { + return DEFAULT_TIMEOUT_FUTURE; + } + return timeoutService.schedule(new Runnable() { + @Override + public void run() { + // This should rarely get run, since the client will likely cancel the stream before + // the timeout is reached. + stream.cancel(Status.DEADLINE_EXCEEDED); + } + }, + timeoutMicros, + TimeUnit.MICROSECONDS); + } + /** Never returns {@code null}. */ private ServerStreamListener startCall(ServerStream stream, String fullMethodName, - ServerMethodDefinition methodDef, Metadata.Headers headers) { + ServerMethodDefinition methodDef, Future timeout, + Metadata.Headers headers) { // TODO(ejona86): should we update fullMethodName to have the canonical path of the method? final ServerCallImpl call = new ServerCallImpl( stream, methodDef.getMethodDescriptor()); @@ -291,7 +324,7 @@ public final class ServerImpl extends Server { throw new NullPointerException( "startCall() returned a null listener for method " + fullMethodName); } - return call.newServerStreamListener(listener); + return call.newServerStreamListener(listener, timeout); } } @@ -403,7 +436,7 @@ public final class ServerImpl extends Server { } } - private class ServerCallImpl extends ServerCall { + private static class ServerCallImpl extends ServerCall { private final ServerStream stream; private final MethodDescriptor method; private volatile boolean cancelled; @@ -450,8 +483,9 @@ public final class ServerImpl extends Server { return cancelled; } - private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener listener) { - return new ServerStreamListenerImpl(listener); + private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener listener, + Future timeout) { + return new ServerStreamListenerImpl(listener, timeout); } /** @@ -460,9 +494,11 @@ public final class ServerImpl extends Server { */ private class ServerStreamListenerImpl implements ServerStreamListener { private final ServerCall.Listener listener; + private final Future timeout; - public ServerStreamListenerImpl(ServerCall.Listener listener) { + public ServerStreamListenerImpl(ServerCall.Listener listener, Future timeout) { this.listener = Preconditions.checkNotNull(listener, "listener must not be null"); + this.timeout = timeout; } @Override @@ -493,6 +529,7 @@ public final class ServerImpl extends Server { @Override public void closed(Status status) { + timeout.cancel(true); if (status.isOk()) { listener.onComplete(); } else { diff --git a/core/src/main/java/io/grpc/transport/ClientTransport.java b/core/src/main/java/io/grpc/transport/ClientTransport.java index e9edc6978d..ed23ea5893 100644 --- a/core/src/main/java/io/grpc/transport/ClientTransport.java +++ b/core/src/main/java/io/grpc/transport/ClientTransport.java @@ -63,7 +63,7 @@ public interface ClientTransport { ClientStreamListener listener); /** - * Starts transport. Implementations must not call {@code listener} until after {@code start()} + * Starts transport. Implementations must not call {@code listener} until after {@link #start} * returns. * * @param listener non-{@code null} listener of transport events @@ -81,7 +81,7 @@ public interface ClientTransport { /** * Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will - * fail (once {@link Listener#transportShutdown()} callback called). + * fail (once {@link Listener#transportShutdown} callback called). */ void shutdown(); diff --git a/core/src/main/java/io/grpc/transport/ServerStream.java b/core/src/main/java/io/grpc/transport/ServerStream.java index 454db1df78..5fc62f2f69 100644 --- a/core/src/main/java/io/grpc/transport/ServerStream.java +++ b/core/src/main/java/io/grpc/transport/ServerStream.java @@ -41,8 +41,8 @@ public interface ServerStream extends Stream { /** * Writes custom metadata as headers on the response stream sent to the client. This method may - * only be called once and cannot be called after calls to {@code Stream#writePayload} - * or {@code #close}. + * only be called once and cannot be called after calls to {@link Stream#writeMessage} + * or {@link #close}. * * @param headers to send to client. */ @@ -57,4 +57,10 @@ public interface ServerStream extends Stream { * @param trailers an additional block of metadata to pass to the client on stream closure. */ void close(Status status, Metadata.Trailers trailers); + + + /** + * Tears down the stream, typically in the event of a timeout. + */ + public void cancel(Status status); } diff --git a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java index 099b5725d7..e46c1985ce 100644 --- a/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/transport/netty/NettyServerStream.java @@ -34,6 +34,7 @@ package io.grpc.transport.netty; import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.Metadata; +import io.grpc.Status; import io.grpc.transport.AbstractServerStream; import io.grpc.transport.WritableBuffer; import io.netty.buffer.ByteBuf; @@ -121,4 +122,9 @@ class NettyServerStream extends AbstractServerStream { handler.returnProcessedBytes(http2Stream, processedBytes); writeQueue.scheduleFlush(); } + + @Override + public void cancel(Status status) { + // TODO(carl-mastrangelo): implement this + } }