diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index f9a84085ce..4d0f8c14a8 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -104,85 +104,182 @@ public final class ServerCalls { extends StreamingRequestMethod { } + private static final class UnaryServerCallHandler + implements ServerCallHandler { + + private final UnaryRequestMethod method; + + // Non private to avoid synthetic class + UnaryServerCallHandler(UnaryRequestMethod method) { + this.method = method; + } + + @Override + public ServerCall.Listener startCall(ServerCall call, Metadata headers) { + Preconditions.checkArgument( + call.getMethodDescriptor().getType().clientSendsOneMessage(), + "asyncUnaryRequestCall is only for clientSendsOneMessage methods"); + ServerCallStreamObserverImpl responseObserver = + new ServerCallStreamObserverImpl(call); + // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client + // sends more than 1 requests, ServerCall will catch it. Note that disabling auto + // inbound flow control has no effect on unary calls. + call.request(2); + return new UnaryServerCallListener(responseObserver, call); + } + + private final class UnaryServerCallListener extends ServerCall.Listener { + private final ServerCall call; + private final ServerCallStreamObserverImpl responseObserver; + private boolean canInvoke = true; + private ReqT request; + + // Non private to avoid synthetic class + UnaryServerCallListener( + ServerCallStreamObserverImpl responseObserver, + ServerCall call) { + this.call = call; + this.responseObserver = responseObserver; + } + + @Override + public void onMessage(ReqT request) { + if (this.request != null) { + // Safe to close the call, because the application has not yet been invoked + call.close( + Status.INTERNAL.withDescription(TOO_MANY_REQUESTS), + new Metadata()); + canInvoke = false; + return; + } + + // We delay calling method.invoke() until onHalfClose() to make sure the client + // half-closes. + this.request = request; + } + + @Override + public void onHalfClose() { + if (!canInvoke) { + return; + } + if (request == null) { + // Safe to close the call, because the application has not yet been invoked + call.close( + Status.INTERNAL.withDescription(MISSING_REQUEST), + new Metadata()); + return; + } + + method.invoke(request, responseObserver); + responseObserver.freeze(); + if (call.isReady()) { + // Since we are calling invoke in halfClose we have missed the onReady + // event from the transport so recover it here. + onReady(); + } + } + + @Override + public void onCancel() { + responseObserver.cancelled = true; + if (responseObserver.onCancelHandler != null) { + responseObserver.onCancelHandler.run(); + } + } + + @Override + public void onReady() { + if (responseObserver.onReadyHandler != null) { + responseObserver.onReadyHandler.run(); + } + } + } + } + /** * Creates a {@code ServerCallHandler} for a unary request call method of the service. * * @param method an adaptor to the actual method on the service implementation. */ private static ServerCallHandler asyncUnaryRequestCall( - final UnaryRequestMethod method) { - return new ServerCallHandler() { - @Override - public ServerCall.Listener startCall( - final ServerCall call, - Metadata headers) { - Preconditions.checkArgument( - call.getMethodDescriptor().getType().clientSendsOneMessage(), - "asyncUnaryRequestCall is only for clientSendsOneMessage methods"); - final ServerCallStreamObserverImpl responseObserver = - new ServerCallStreamObserverImpl(call); - // We expect only 1 request, but we ask for 2 requests here so that if a misbehaving client - // sends more than 1 requests, ServerCall will catch it. Note that disabling auto - // inbound flow control has no effect on unary calls. - call.request(2); - return new EmptyServerCallListener() { - boolean canInvoke = true; - ReqT request; - @Override - public void onMessage(ReqT request) { - if (this.request != null) { - // Safe to close the call, because the application has not yet been invoked - call.close( - Status.INTERNAL.withDescription(TOO_MANY_REQUESTS), - new Metadata()); - canInvoke = false; - return; - } + UnaryRequestMethod method) { + return new UnaryServerCallHandler(method); + } - // We delay calling method.invoke() until onHalfClose() to make sure the client - // half-closes. - this.request = request; - } + private static final class StreamingServerCallHandler + implements ServerCallHandler { - @Override - public void onHalfClose() { - if (!canInvoke) { - return; - } - if (request == null) { - // Safe to close the call, because the application has not yet been invoked - call.close( - Status.INTERNAL.withDescription(MISSING_REQUEST), - new Metadata()); - return; - } + private final StreamingRequestMethod method; - method.invoke(request, responseObserver); - responseObserver.freeze(); - if (call.isReady()) { - // Since we are calling invoke in halfClose we have missed the onReady - // event from the transport so recover it here. - onReady(); - } - } + // Non private to avoid synthetic class + StreamingServerCallHandler(StreamingRequestMethod method) { + this.method = method; + } - @Override - public void onCancel() { - responseObserver.cancelled = true; - if (responseObserver.onCancelHandler != null) { - responseObserver.onCancelHandler.run(); - } - } - - @Override - public void onReady() { - if (responseObserver.onReadyHandler != null) { - responseObserver.onReadyHandler.run(); - } - } - }; + @Override + public ServerCall.Listener startCall(ServerCall call, Metadata headers) { + ServerCallStreamObserverImpl responseObserver = + new ServerCallStreamObserverImpl(call); + StreamObserver requestObserver = method.invoke(responseObserver); + responseObserver.freeze(); + if (responseObserver.autoFlowControlEnabled) { + call.request(1); } - }; + return new StreamingServerCallListener(requestObserver, responseObserver, call); + } + + private final class StreamingServerCallListener extends ServerCall.Listener { + + private final StreamObserver requestObserver; + private final ServerCallStreamObserverImpl responseObserver; + private final ServerCall call; + private boolean halfClosed = false; + + // Non private to avoid synthetic class + StreamingServerCallListener( + StreamObserver requestObserver, + ServerCallStreamObserverImpl responseObserver, + ServerCall call) { + this.requestObserver = requestObserver; + this.responseObserver = responseObserver; + this.call = call; + } + + @Override + public void onMessage(ReqT request) { + requestObserver.onNext(request); + + // Request delivery of the next inbound message. + if (responseObserver.autoFlowControlEnabled) { + call.request(1); + } + } + + @Override + public void onHalfClose() { + halfClosed = true; + requestObserver.onCompleted(); + } + + @Override + public void onCancel() { + responseObserver.cancelled = true; + if (responseObserver.onCancelHandler != null) { + responseObserver.onCancelHandler.run(); + } + if (!halfClosed) { + requestObserver.onError(Status.CANCELLED.asException()); + } + } + + @Override + public void onReady() { + if (responseObserver.onReadyHandler != null) { + responseObserver.onReadyHandler.run(); + } + } + } } /** @@ -191,58 +288,8 @@ public final class ServerCalls { * @param method an adaptor to the actual method on the service implementation. */ private static ServerCallHandler asyncStreamingRequestCall( - final StreamingRequestMethod method) { - return new ServerCallHandler() { - @Override - public ServerCall.Listener startCall( - final ServerCall call, - Metadata headers) { - final ServerCallStreamObserverImpl responseObserver = - new ServerCallStreamObserverImpl(call); - final StreamObserver requestObserver = method.invoke(responseObserver); - responseObserver.freeze(); - if (responseObserver.autoFlowControlEnabled) { - call.request(1); - } - return new EmptyServerCallListener() { - boolean halfClosed = false; - - @Override - public void onMessage(ReqT request) { - requestObserver.onNext(request); - - // Request delivery of the next inbound message. - if (responseObserver.autoFlowControlEnabled) { - call.request(1); - } - } - - @Override - public void onHalfClose() { - halfClosed = true; - requestObserver.onCompleted(); - } - - @Override - public void onCancel() { - responseObserver.cancelled = true; - if (responseObserver.onCancelHandler != null) { - responseObserver.onCancelHandler.run(); - } - if (!halfClosed) { - requestObserver.onError(Status.CANCELLED.asException()); - } - } - - @Override - public void onReady() { - if (responseObserver.onReadyHandler != null) { - responseObserver.onReadyHandler.run(); - } - } - }; - } - }; + StreamingRequestMethod method) { + return new StreamingServerCallHandler(method); } private static interface UnaryRequestMethod { @@ -263,6 +310,7 @@ public final class ServerCalls { private Runnable onReadyHandler; private Runnable onCancelHandler; + // Non private to avoid synthetic class ServerCallStreamObserverImpl(ServerCall call) { this.call = call; } @@ -352,24 +400,6 @@ public final class ServerCalls { } } - private static class EmptyServerCallListener extends ServerCall.Listener { - @Override - public void onMessage(ReqT request) { - } - - @Override - public void onHalfClose() { - } - - @Override - public void onCancel() { - } - - @Override - public void onComplete() { - } - } - /** * Sets unimplemented status for method on given response stream for unary call. *