From f97eb733f70fb2f55a8eba55275e6ceca90ead71 Mon Sep 17 00:00:00 2001 From: ejona Date: Thu, 24 Jul 2014 10:01:02 -0700 Subject: [PATCH] Add Server API, a.k.a., Channel for incoming requests. Everything is nested under a single Server class. This may not seem best, but splitting things up basically requires a lot of "Server" prefixes all over the place, even if we use a separate package. I'm expecting to defer the decision, but once we decide the organization we need to update the Channel API to match. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=71838718 --- .../main/java/com/google/net/stubby/Call.java | 15 +- .../java/com/google/net/stubby/Server.java | 248 ++++++++++++++++++ 2 files changed, 259 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/com/google/net/stubby/Server.java diff --git a/core/src/main/java/com/google/net/stubby/Call.java b/core/src/main/java/com/google/net/stubby/Call.java index e2c7cd5770..554049bab3 100644 --- a/core/src/main/java/com/google/net/stubby/Call.java +++ b/core/src/main/java/com/google/net/stubby/Call.java @@ -15,6 +15,8 @@ import javax.annotation.Nullable; * *

{@link #start} is required to be the first of any methods called. * + *

Any contexts must be sent before any payloads, which must be sent before half closing. + * *

No generic method for determining message receipt or providing acknowlegement is provided. * Applications are expected to utilize normal payload messages for such signals, as a response * natually acknowledges its request. @@ -25,6 +27,9 @@ public abstract class Call { /** * Callbacks for consuming incoming RPC messages. * + *

Any contexts are guaranteed to arrive before any payloads, which are guaranteed before + * close. + * *

Implementations are free to block for extended periods of time. Implementations are not * required to be thread-safe. */ @@ -33,7 +38,7 @@ public abstract class Call { * A response context has been received. Any context messages will precede payload messages. * *

The {@code value} {@link InputStream} will be closed when the returned future completes. - * If no future is returned, the stream will be closed immediately after returning from this + * If no future is returned, the value will be closed immediately after returning from this * method. */ @Nullable @@ -81,7 +86,8 @@ public abstract class Call { * * @param name key identifier of context * @param value context value bytes - * @throws IllegalStateException if call is {@link #halfClose}d or {@link #cancel}ed + * @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed, + * or after {@link #sendPayload} */ public void sendContext(String name, InputStream value) { sendContext(name, value, null); @@ -99,7 +105,8 @@ public abstract class Call { * @param name key identifier of context * @param value context value bytes * @param accepted notification for adhering to flow control, or {@code null} - * @throws IllegalStateException if call is {@link #halfClose}d or {@link #cancel}ed + * @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed, + * or after {@link #sendPayload} */ public abstract void sendContext(String name, InputStream value, @Nullable SettableFuture accepted); @@ -109,7 +116,7 @@ public abstract class Call { * RPCs. Multiple payload messages may exist for streaming calls. * * @param payload message - * @throws IllegalStateException if call is {@link #halfClose}d or {@link #cancel}ed + * @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed */ public void sendPayload(RequestT payload) { sendPayload(payload, null); diff --git a/core/src/main/java/com/google/net/stubby/Server.java b/core/src/main/java/com/google/net/stubby/Server.java new file mode 100644 index 0000000000..5b6a85c133 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/Server.java @@ -0,0 +1,248 @@ +package com.google.net.stubby; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Service; + +import java.io.InputStream; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Server for listening for and dispatching incoming calls. Although Server is an interface, it is + * not expected to be implemented by application code or interceptors. + */ +@ThreadSafe +public interface Server extends Service { + /** Builder that Servers are expected to provide for constructing new instances. */ + abstract class Builder { + public abstract Builder addService(ServiceDef service); + public abstract Server build(); + } + + /** Definition of a service to be exposed via a Server. */ + final class ServiceDef { + public static ServiceDef.Builder builder(String serviceName) { + return new ServiceDef.Builder(serviceName); + } + + private final String name; + private final ImmutableList methods; + + private ServiceDef(String name, ImmutableList methods) { + this.name = name; + this.methods = methods; + } + + /** Simple name of the service. It is not an absolute path. */ + public String getName() { + return name; + } + + public ImmutableList getMethods() { + return methods; + } + + /** Builder for constructing Service instances. */ + public static final class Builder { + private final String serviceName; + private final ImmutableList.Builder methods = ImmutableList.builder(); + + private Builder(String serviceName) { + this.serviceName = serviceName; + } + + /** + * Add a method to be supported by the service. + * + * @param name simple name of the method, without the service prefix + * @param requestMarshaller marshaller for deserializing incoming requests + * @param responseMarshaller marshaller for serializing outgoing responses + * @param handler handler for incoming calls + */ + public Builder addMethod(String name, Marshaller requestMarshaller, + Marshaller responseMarshaller, CallHandler handler) { + methods.add( + new MethodDef(name, requestMarshaller, responseMarshaller, handler)); + return this; + } + + /** Construct new ServiceDef. */ + public ServiceDef build() { + return new ServiceDef(serviceName, methods.build()); + } + } + } + + /** Definition of a method supported by a service. */ + final class MethodDef { + private final String name; + private final Marshaller requestMarshaller; + private final Marshaller responseMarshaller; + private final CallHandler handler; + + // MethodDef has no way of public creation, because all parameters are required. A builder + // wouldn't have any methods other than build(). addMethod() can be overriden if we ever need to + // extend what MethodDef contains or if we add a Builder or similar. + private MethodDef(String name, Marshaller requestMarshaller, + Marshaller responseMarshaller, CallHandler handler) { + this.name = name; + this.requestMarshaller = requestMarshaller; + this.responseMarshaller = responseMarshaller; + this.handler = handler; + } + + /** The simple name of the method. It is not an absolute path. */ + public String getName() { + return name; + } + + /** Marshaller for deserializing incoming requests. */ + public Marshaller getRequestMarshaller() { + return requestMarshaller; + } + + /** Marshaller for serializing outgoing responses. */ + public Marshaller getResponseMarshaller() { + return responseMarshaller; + } + + /** Handler for incoming calls. */ + public CallHandler getCallHandler() { + return handler; + } + } + + /** + * Class to begin processing incoming RPCs. Advanced applications and generated code implement + * this interface to implement service methods. + */ + @ThreadSafe + interface CallHandler { + /** + * Produce a listener for the incoming call. Implementations are free to call methods on {@code + * call} before this method has returned. + * + *

If the implementation throws an exception or returns {@code null}, {@code call} will be + * closed with an error. + * + * @param call object for responding + * @param method metadata concerning the call + * @return listener for processing incoming messages for {@code call} + */ + Server.Call.Listener startCall(Server.Call call, + MethodDescriptor method); + } + + /** + * Low-level method for communicating with a remote client during a single RPC. Unlike normal + * RPCs, calls may stream any number of requests and responses, although a single request and + * single response is most common. This API is generally intended for use generated handlers, but + * advanced applications may have need for it. + * + *

Any contexts must be sent before any payloads, which must be sent before closing. + * + *

No generic method for determining message receipt or providing acknowlegement is provided. + * Applications are expected to utilize normal payload messages for such signals, as a response + * natually acknowledges its request. + * + *

Methods are guaranteed to be non-blocking. Implementations are not required to be + * thread-safe. + */ + abstract class Call { + /** + * Callbacks for consuming incoming RPC messages. + * + *

Any contexts are guaranteed to arrive before any payloads, which are guaranteed before + * half close, which is guaranteed before completion. + * + *

Implementations are free to block for extended periods of time. Implementations are not + * required to be thread-safe. + */ + // TODO(user): We need to decide what to do in the case of server closing with non-cancellation + // before client half closes. It may be that we treat such a case as an error. If we permit such + // a case then we either get to generate a half close or purposefully omit it. + public abstract static class Listener { + /** + * A request context has been received. Any context messages will precede payload messages. + * + *

The {@code value} {@link InputStream} will be closed when the returned future completes. + * If no future is returned, the value will be closed immediately after returning from this + * method. + */ + @Nullable + public abstract ListenableFuture onContext(String name, InputStream value); + + /** + * A request payload has been receiveed. For streaming calls, there may be zero payload + * messages. + */ + @Nullable + public abstract ListenableFuture onPayload(T payload); + + /** + * The client completed all message sending. However, the call may still be cancelled. + */ + public abstract void onHalfClose(); + + /** + * The call was cancelled and the server is encouraged to abort processing to save resources, + * since the client will not process any further messages. Cancellations can be caused by + * timeouts, explicit cancel by client, network errors, and similar. + * + *

There will be no further callbacks for the call. + */ + public abstract void onCancel(); + + /** + * The call is considered complete and {@link #onCancel} is guaranteed not to be called. + * However, the client is not guaranteed to have received all messages. + * + *

There will be no further callbacks for the call. + */ + public abstract void onCompleted(); + } + + /** + * Close the call with the provided status. No further sending or receiving will occur. If + * {@code status} is not equal to {@link Status#OK}, then the call is said to have failed. + * + *

If {@code status} is not {@link Status#CANCELLED} and no errors or cancellations are known + * to have occured, then a {@link Listener#onCompleted} notification should be expected. + * Otherwise {@link Listener#onCancel} has been or will be called. + * + * @throws IllegalStateException if call is already {@code close}d + */ + public abstract void close(Status status); + + /** + * Send a context message. Context messages are intended for side-channel information like + * statistics and authentication. + * + * @param name key identifier of context + * @param value context value bytes + * @throws IllegalStateException if call is {@link #close}d, or after {@link #sendPayload} + */ + public abstract void sendContext(String name, InputStream value); + + /** + * Send a payload message. Payload messages are the primary form of communication associated + * with RPCs. Multiple payload messages may exist for streaming calls. + * + * @param payload message + * @throws IllegalStateException if call is {@link #close}d + */ + public abstract void sendPayload(ResponseT payload); + + /** + * Returns {@code true} when the call is cancelled and the server is encouraged to abort + * processing to save resources, since the client will not be processing any further methods. + * Cancellations can be caused by timeouts, explicit cancel by client, network errors, and + * similar. + * + *

This method may safely be called concurrently from multiple threads. + */ + public abstract boolean isCancelled(); + } +}