Add simple server timeout support

Reintroduce throws

Add timeoutExecutor shutdown

Use a default future

Move timeout cancellation

Cancel the timeout in error cases
This commit is contained in:
Carl Mastrangelo 2015-07-10 09:40:21 -07:00
parent ad7820ca9b
commit 0003e44886
6 changed files with 85 additions and 36 deletions

View File

@ -306,7 +306,7 @@ public final class ChannelImpl extends Channel {
} }
} }
private class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> { private final class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method; private final MethodDescriptor<ReqT, RespT> method;
private final SerializingExecutor callExecutor; private final SerializingExecutor callExecutor;
private final boolean unaryRequest; private final boolean unaryRequest;
@ -314,7 +314,7 @@ public final class ChannelImpl extends Channel {
private ClientStream stream; private ClientStream stream;
private volatile ScheduledFuture<?> deadlineCancellationFuture; private volatile ScheduledFuture<?> deadlineCancellationFuture;
public CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor, private CallImpl(MethodDescriptor<ReqT, RespT> method, SerializingExecutor executor,
CallOptions callOptions) { CallOptions callOptions) {
this.method = method; this.method = method;
this.callExecutor = executor; this.callExecutor = executor;
@ -403,6 +403,7 @@ public final class ChannelImpl extends Channel {
stream.writeMessage(payloadIs); stream.writeMessage(payloadIs);
failed = false; failed = false;
} finally { } finally {
// TODO(notcarl): Find out if payloadIs needs to be closed.
if (failed) { if (failed) {
cancel(); cancel();
} }

View File

@ -42,7 +42,6 @@ import com.google.common.collect.Lists;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import javax.annotation.concurrent.NotThreadSafe; import javax.annotation.concurrent.NotThreadSafe;
@ -204,9 +203,9 @@ public abstract class Metadata {
Preconditions.checkState(serializable, "Can't serialize raw metadata"); Preconditions.checkState(serializable, "Can't serialize raw metadata");
byte[][] serialized = new byte[store.size() * 2][]; byte[][] serialized = new byte[store.size() * 2][];
int i = 0; int i = 0;
for (Map.Entry<String, MetadataEntry> entry : store.entries()) { for (MetadataEntry entry : store.values()) {
serialized[i++] = entry.getValue().key.asciiName(); serialized[i++] = entry.key.asciiName();
serialized[i++] = entry.getValue().getSerialized(); serialized[i++] = entry.getSerialized();
} }
return serialized; return serialized;
} }

View File

@ -33,6 +33,7 @@ package io.grpc;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import io.grpc.transport.ServerListener; import io.grpc.transport.ServerListener;
import io.grpc.transport.ServerStream; import io.grpc.transport.ServerStream;
@ -42,9 +43,13 @@ import io.grpc.transport.ServerTransportListener;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -64,6 +69,8 @@ import java.util.concurrent.TimeUnit;
public final class ServerImpl extends Server { public final class ServerImpl extends Server {
private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
private static final Future<?> DEFAULT_TIMEOUT_FUTURE = Futures.immediateCancelledFuture();
/** Executor for application processing. */ /** Executor for application processing. */
private final Executor executor; private final Executor executor;
private final HandlerRegistry registry; private final HandlerRegistry registry;
@ -77,6 +84,8 @@ public final class ServerImpl extends Server {
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */ /** {@code transportServer} and services encapsulating something similar to a TCP connection. */
private final Collection<ServerTransport> transports = new HashSet<ServerTransport>(); private final Collection<ServerTransport> transports = new HashSet<ServerTransport>();
private final ScheduledExecutorService timeoutService;
/** /**
* Construct a server. * Construct a server.
* *
@ -88,6 +97,8 @@ public final class ServerImpl extends Server {
this.executor = Preconditions.checkNotNull(executor, "executor"); this.executor = Preconditions.checkNotNull(executor, "executor");
this.registry = Preconditions.checkNotNull(registry, "registry"); this.registry = Preconditions.checkNotNull(registry, "registry");
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer"); this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
// TODO(carl-mastrangelo): replace this with the shared scheduler once PR #576 is merged.
this.timeoutService = Executors.newScheduledThreadPool(1);
} }
/** Hack to allow executors to auto-shutdown. Not for general use. */ /** Hack to allow executors to auto-shutdown. Not for general use. */
@ -122,6 +133,7 @@ public final class ServerImpl extends Server {
} }
transportServer.shutdown(); transportServer.shutdown();
shutdown = true; shutdown = true;
timeoutService.shutdown();
return this; return this;
} }
@ -224,8 +236,7 @@ public final class ServerImpl extends Server {
synchronized (ServerImpl.this) { synchronized (ServerImpl.this) {
// transports collection can be modified during shutdown(), even if we hold the lock, due // transports collection can be modified during shutdown(), even if we hold the lock, due
// to reentrancy. // to reentrancy.
for (ServerTransport transport for (ServerTransport transport : new ArrayList<ServerTransport>(transports)) {
: transports.toArray(new ServerTransport[transports.size()])) {
transport.shutdown(); transport.shutdown();
} }
transportServerTerminated = true; transportServerTerminated = true;
@ -249,6 +260,7 @@ public final class ServerImpl extends Server {
@Override @Override
public ServerStreamListener streamCreated(final ServerStream stream, final String methodName, public ServerStreamListener streamCreated(final ServerStream stream, final String methodName,
final Metadata.Headers headers) { final Metadata.Headers headers) {
final Future<?> timeout = scheduleTimeout(stream, headers);
SerializingExecutor serializingExecutor = new SerializingExecutor(executor); SerializingExecutor serializingExecutor = new SerializingExecutor(executor);
final JumpToApplicationThreadServerStreamListener jumpListener final JumpToApplicationThreadServerStreamListener jumpListener
= new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream); = new JumpToApplicationThreadServerStreamListener(serializingExecutor, stream);
@ -265,11 +277,14 @@ public final class ServerImpl extends Server {
stream.close( stream.close(
Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName), Status.UNIMPLEMENTED.withDescription("Method not found: " + methodName),
new Metadata.Trailers()); new Metadata.Trailers());
timeout.cancel(true);
return; return;
} }
listener = startCall(stream, methodName, method.getMethodDefinition(), headers); listener = startCall(stream, methodName, method.getMethodDefinition(), timeout,
headers);
} catch (Throwable t) { } catch (Throwable t) {
stream.close(Status.fromThrowable(t), new Metadata.Trailers()); stream.close(Status.fromThrowable(t), new Metadata.Trailers());
timeout.cancel(true);
throw Throwables.propagate(t); throw Throwables.propagate(t);
} finally { } finally {
jumpListener.setListener(listener); jumpListener.setListener(listener);
@ -279,9 +294,27 @@ public final class ServerImpl extends Server {
return jumpListener; return jumpListener;
} }
private Future<?> scheduleTimeout(final ServerStream stream, Metadata.Headers headers) {
Long timeoutMicros = headers.get(ChannelImpl.TIMEOUT_KEY);
if (timeoutMicros == null) {
return DEFAULT_TIMEOUT_FUTURE;
}
return timeoutService.schedule(new Runnable() {
@Override
public void run() {
// This should rarely get run, since the client will likely cancel the stream before
// the timeout is reached.
stream.cancel(Status.DEADLINE_EXCEEDED);
}
},
timeoutMicros,
TimeUnit.MICROSECONDS);
}
/** Never returns {@code null}. */ /** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName, private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata.Headers headers) { ServerMethodDefinition<ReqT, RespT> methodDef, Future<?> timeout,
Metadata.Headers headers) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method? // TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>( final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
stream, methodDef.getMethodDescriptor()); stream, methodDef.getMethodDescriptor());
@ -291,7 +324,7 @@ public final class ServerImpl extends Server {
throw new NullPointerException( throw new NullPointerException(
"startCall() returned a null listener for method " + fullMethodName); "startCall() returned a null listener for method " + fullMethodName);
} }
return call.newServerStreamListener(listener); return call.newServerStreamListener(listener, timeout);
} }
} }
@ -403,7 +436,7 @@ public final class ServerImpl extends Server {
} }
} }
private class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> { private static class ServerCallImpl<ReqT, RespT> extends ServerCall<RespT> {
private final ServerStream stream; private final ServerStream stream;
private final MethodDescriptor<ReqT, RespT> method; private final MethodDescriptor<ReqT, RespT> method;
private volatile boolean cancelled; private volatile boolean cancelled;
@ -450,8 +483,9 @@ public final class ServerImpl extends Server {
return cancelled; return cancelled;
} }
private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener) { private ServerStreamListenerImpl newServerStreamListener(ServerCall.Listener<ReqT> listener,
return new ServerStreamListenerImpl(listener); Future<?> timeout) {
return new ServerStreamListenerImpl(listener, timeout);
} }
/** /**
@ -460,9 +494,11 @@ public final class ServerImpl extends Server {
*/ */
private class ServerStreamListenerImpl implements ServerStreamListener { private class ServerStreamListenerImpl implements ServerStreamListener {
private final ServerCall.Listener<ReqT> listener; private final ServerCall.Listener<ReqT> listener;
private final Future<?> timeout;
public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener) { public ServerStreamListenerImpl(ServerCall.Listener<ReqT> listener, Future<?> timeout) {
this.listener = Preconditions.checkNotNull(listener, "listener must not be null"); this.listener = Preconditions.checkNotNull(listener, "listener must not be null");
this.timeout = timeout;
} }
@Override @Override
@ -493,6 +529,7 @@ public final class ServerImpl extends Server {
@Override @Override
public void closed(Status status) { public void closed(Status status) {
timeout.cancel(true);
if (status.isOk()) { if (status.isOk()) {
listener.onComplete(); listener.onComplete();
} else { } else {

View File

@ -63,7 +63,7 @@ public interface ClientTransport {
ClientStreamListener listener); ClientStreamListener listener);
/** /**
* Starts transport. Implementations must not call {@code listener} until after {@code start()} * Starts transport. Implementations must not call {@code listener} until after {@link #start}
* returns. * returns.
* *
* @param listener non-{@code null} listener of transport events * @param listener non-{@code null} listener of transport events
@ -81,7 +81,7 @@ public interface ClientTransport {
/** /**
* Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will * Initiates an orderly shutdown of the transport. Existing streams continue, but new streams will
* fail (once {@link Listener#transportShutdown()} callback called). * fail (once {@link Listener#transportShutdown} callback called).
*/ */
void shutdown(); void shutdown();

View File

@ -41,8 +41,8 @@ public interface ServerStream extends Stream {
/** /**
* Writes custom metadata as headers on the response stream sent to the client. This method may * Writes custom metadata as headers on the response stream sent to the client. This method may
* only be called once and cannot be called after calls to {@code Stream#writePayload} * only be called once and cannot be called after calls to {@link Stream#writeMessage}
* or {@code #close}. * or {@link #close}.
* *
* @param headers to send to client. * @param headers to send to client.
*/ */
@ -57,4 +57,10 @@ public interface ServerStream extends Stream {
* @param trailers an additional block of metadata to pass to the client on stream closure. * @param trailers an additional block of metadata to pass to the client on stream closure.
*/ */
void close(Status status, Metadata.Trailers trailers); void close(Status status, Metadata.Trailers trailers);
/**
* Tears down the stream, typically in the event of a timeout.
*/
public void cancel(Status status);
} }

View File

@ -34,6 +34,7 @@ package io.grpc.transport.netty;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.transport.AbstractServerStream; import io.grpc.transport.AbstractServerStream;
import io.grpc.transport.WritableBuffer; import io.grpc.transport.WritableBuffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -121,4 +122,9 @@ class NettyServerStream extends AbstractServerStream<Integer> {
handler.returnProcessedBytes(http2Stream, processedBytes); handler.returnProcessedBytes(http2Stream, processedBytes);
writeQueue.scheduleFlush(); writeQueue.scheduleFlush();
} }
@Override
public void cancel(Status status) {
// TODO(carl-mastrangelo): implement this
}
} }