stub: update docs about Call lifetime + minor cleanups

* Reflowed some method parameters to be on the same line, else one
  parameter per line
* Used `@link` where appropriate
* Made some parameters non-final where it had no effect
* Renamed some parameters to be consistent
This commit is contained in:
Carl Mastrangelo 2018-08-13 14:36:32 -07:00 committed by GitHub
parent 3cfc5af4f1
commit 6d4841a8c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 51 deletions

View File

@ -55,27 +55,29 @@ public final class ClientCalls {
private ClientCalls() {} private ClientCalls() {}
/** /**
* Executes a unary call with a response {@link StreamObserver}. * Executes a unary call with a response {@link StreamObserver}. The {@code call} should not be
* already started. After calling this method, {@code call} should no longer be used.
*/ */
public static <ReqT, RespT> void asyncUnaryCall( public static <ReqT, RespT> void asyncUnaryCall(
ClientCall<ReqT, RespT> call, ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
ReqT param, asyncUnaryRequestCall(call, req, responseObserver, false);
StreamObserver<RespT> observer) {
asyncUnaryRequestCall(call, param, observer, false);
} }
/** /**
* Executes a server-streaming call with a response {@link StreamObserver}. * Executes a server-streaming call with a response {@link StreamObserver}. The {@code call}
* should not be already started. After calling this method, {@code call} should no longer be
* used.
*/ */
public static <ReqT, RespT> void asyncServerStreamingCall( public static <ReqT, RespT> void asyncServerStreamingCall(
ClientCall<ReqT, RespT> call, ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver) {
ReqT param, asyncUnaryRequestCall(call, req, responseObserver, true);
StreamObserver<RespT> responseObserver) {
asyncUnaryRequestCall(call, param, responseObserver, true);
} }
/** /**
* Executes a client-streaming call returning a {@link StreamObserver} for the request messages. * Executes a client-streaming call returning a {@link StreamObserver} for the request messages.
* The {@code call} should not be already started. After calling this method, {@code call}
* should no longer be used.
*
* @return request stream observer. * @return request stream observer.
*/ */
public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall( public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
@ -85,7 +87,9 @@ public final class ClientCalls {
} }
/** /**
* Executes a bidi-streaming call. * Executes a bidirectional-streaming call. The {@code call} should not be already started.
* After calling this method, {@code call} should no longer be used.
*
* @return request stream observer. * @return request stream observer.
*/ */
public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall( public static <ReqT, RespT> StreamObserver<ReqT> asyncBidiStreamingCall(
@ -94,12 +98,14 @@ public final class ClientCalls {
} }
/** /**
* Executes a unary call and blocks on the response. * Executes a unary call and blocks on the response. The {@code call} should not be already
* started. After calling this method, {@code call} should no longer be used.
*
* @return the single response message. * @return the single response message.
*/ */
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT param) { public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT req) {
try { try {
return getUnchecked(futureUnaryCall(call, param)); return getUnchecked(futureUnaryCall(call, req));
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw cancelThrow(call, e); throw cancelThrow(call, e);
} catch (Error e) { } catch (Error e) {
@ -108,16 +114,17 @@ public final class ClientCalls {
} }
/** /**
* Executes a unary call and blocks on the response. * Executes a unary call and blocks on the response. The {@code call} should not be already
* started. After calling this method, {@code call} should no longer be used.
* *
* @return the single response message. * @return the single response message.
*/ */
public static <ReqT, RespT> RespT blockingUnaryCall( public static <ReqT, RespT> RespT blockingUnaryCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT param) { Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor(); ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor)); ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
try { try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, param); ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) { while (!responseFuture.isDone()) {
try { try {
executor.waitAndDrain(); executor.waitAndDrain();
@ -139,43 +146,47 @@ public final class ClientCalls {
/** /**
* Executes a server-streaming call returning a blocking {@link Iterator} over the * Executes a server-streaming call returning a blocking {@link Iterator} over the
* response stream. * response stream. The {@code call} should not be already started. After calling this method,
* {@code call} should no longer be used.
*
* @return an iterator over the response stream. * @return an iterator over the response stream.
*/ */
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
ClientCall<ReqT, RespT> call, ReqT param) { ClientCall<ReqT, RespT> call, ReqT req) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call); BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call);
asyncUnaryRequestCall(call, param, result.listener(), true); asyncUnaryRequestCall(call, req, result.listener(), true);
return result; return result;
} }
/** /**
* Executes a server-streaming call returning a blocking {@link Iterator} over the * Executes a server-streaming call returning a blocking {@link Iterator} over the
* response stream. * response stream. The {@code call} should not be already started. After calling this method,
* {@code call} should no longer be used.
* *
* @return an iterator over the response stream. * @return an iterator over the response stream.
*/ */
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs. // TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT param) { Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor(); ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor)); ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call, executor); BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call, executor);
asyncUnaryRequestCall(call, param, result.listener(), true); asyncUnaryRequestCall(call, req, result.listener(), true);
return result; return result;
} }
/** /**
* Executes a unary call and returns a {@link ListenableFuture} to the response. * Executes a unary call and returns a {@link ListenableFuture} to the response. The
* {@code call} should not be already started. After calling this method, {@code call} should no
* longer be used.
* *
* @return a future for the single response message. * @return a future for the single response message.
*/ */
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall( public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall(
ClientCall<ReqT, RespT> call, ClientCall<ReqT, RespT> call, ReqT req) {
ReqT param) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call); GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
asyncUnaryRequestCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture), false); asyncUnaryRequestCall(call, req, new UnaryStreamToFuture<RespT>(responseFuture), false);
return responseFuture; return responseFuture;
} }
@ -249,11 +260,11 @@ public final class ClientCalls {
} }
private static <ReqT, RespT> void asyncUnaryRequestCall( private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call, ReqT param, StreamObserver<RespT> responseObserver, ClientCall<ReqT, RespT> call, ReqT req, StreamObserver<RespT> responseObserver,
boolean streamingResponse) { boolean streamingResponse) {
asyncUnaryRequestCall( asyncUnaryRequestCall(
call, call,
param, req,
new StreamObserverToCallListenerAdapter<ReqT, RespT>( new StreamObserverToCallListenerAdapter<ReqT, RespT>(
responseObserver, responseObserver,
new CallToStreamObserverAdapter<ReqT>(call), new CallToStreamObserverAdapter<ReqT>(call),
@ -263,12 +274,12 @@ public final class ClientCalls {
private static <ReqT, RespT> void asyncUnaryRequestCall( private static <ReqT, RespT> void asyncUnaryRequestCall(
ClientCall<ReqT, RespT> call, ClientCall<ReqT, RespT> call,
ReqT param, ReqT req,
ClientCall.Listener<RespT> responseListener, ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) { boolean streamingResponse) {
startCall(call, responseListener, streamingResponse); startCall(call, responseListener, streamingResponse);
try { try {
call.sendMessage(param); call.sendMessage(req);
call.halfClose(); call.halfClose();
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw cancelThrow(call, e); throw cancelThrow(call, e);
@ -278,7 +289,8 @@ public final class ClientCalls {
} }
private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall( private static <ReqT, RespT> StreamObserver<ReqT> asyncStreamingRequestCall(
ClientCall<ReqT, RespT> call, StreamObserver<RespT> responseObserver, ClientCall<ReqT, RespT> call,
StreamObserver<RespT> responseObserver,
boolean streamingResponse) { boolean streamingResponse) {
CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call); CallToStreamObserverAdapter<ReqT> adapter = new CallToStreamObserverAdapter<ReqT>(call);
startCall( startCall(
@ -289,8 +301,10 @@ public final class ClientCalls {
return adapter; return adapter;
} }
private static <ReqT, RespT> void startCall(ClientCall<ReqT, RespT> call, private static <ReqT, RespT> void startCall(
ClientCall.Listener<RespT> responseListener, boolean streamingResponse) { ClientCall<ReqT, RespT> call,
ClientCall.Listener<RespT> responseListener,
boolean streamingResponse) {
call.start(responseListener, new Metadata()); call.start(responseListener, new Metadata());
if (streamingResponse) { if (streamingResponse) {
call.request(1); call.request(1);
@ -430,7 +444,7 @@ public final class ClientCalls {
} }
/** /**
* Complete a GrpcFuture using {@link StreamObserver} events. * Completes a {@link GrpcFuture} using {@link StreamObserver} events.
*/ */
private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> { private static final class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
private final GrpcFuture<RespT> responseFuture; private final GrpcFuture<RespT> responseFuture;
@ -500,11 +514,10 @@ public final class ClientCalls {
} }
/** /**
* Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking * Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking {@link Iterator}.
* {@link Iterator}.
* *
* <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a * <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
* separate thread from {@code Iterator} calls. * separate thread from {@link Iterator} calls.
*/ */
// TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error. // TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
private static final class BlockingResponseStream<T> implements Iterator<T> { private static final class BlockingResponseStream<T> implements Iterator<T> {

View File

@ -42,42 +42,42 @@ public final class ServerCalls {
} }
/** /**
* Creates a {@code ServerCallHandler} for a unary call method of the service. * Creates a {@link ServerCallHandler} for a unary call method of the service.
* *
* @param method an adaptor to the actual method on the service implementation. * @param method an adaptor to the actual method on the service implementation.
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncUnaryCall(
final UnaryMethod<ReqT, RespT> method) { UnaryMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method); return asyncUnaryRequestCall(method);
} }
/** /**
* Creates a {@code ServerCallHandler} for a server streaming method of the service. * Creates a {@link ServerCallHandler} for a server streaming method of the service.
* *
* @param method an adaptor to the actual method on the service implementation. * @param method an adaptor to the actual method on the service implementation.
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncServerStreamingCall(
final ServerStreamingMethod<ReqT, RespT> method) { ServerStreamingMethod<ReqT, RespT> method) {
return asyncUnaryRequestCall(method); return asyncUnaryRequestCall(method);
} }
/** /**
* Creates a {@code ServerCallHandler} for a client streaming method of the service. * Creates a {@link ServerCallHandler} for a client streaming method of the service.
* *
* @param method an adaptor to the actual method on the service implementation. * @param method an adaptor to the actual method on the service implementation.
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncClientStreamingCall(
final ClientStreamingMethod<ReqT, RespT> method) { ClientStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method); return asyncStreamingRequestCall(method);
} }
/** /**
* Creates a {@code ServerCallHandler} for a bidi streaming method of the service. * Creates a {@link ServerCallHandler} for a bidi streaming method of the service.
* *
* @param method an adaptor to the actual method on the service implementation. * @param method an adaptor to the actual method on the service implementation.
*/ */
public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall( public static <ReqT, RespT> ServerCallHandler<ReqT, RespT> asyncBidiStreamingCall(
final BidiStreamingMethod<ReqT, RespT> method) { BidiStreamingMethod<ReqT, RespT> method) {
return asyncStreamingRequestCall(method); return asyncStreamingRequestCall(method);
} }
@ -97,7 +97,7 @@ public final class ServerCalls {
public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {} public interface ClientStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
/** /**
* Adaptor to a bi-directional streaming method. * Adaptor to a bidirectional streaming method.
*/ */
public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {} public interface BidiStreamingMethod<ReqT, RespT> extends StreamingRequestMethod<ReqT, RespT> {}
@ -195,7 +195,7 @@ public final class ServerCalls {
} }
/** /**
* Creates a {@code ServerCallHandler} for a unary request call method of the service. * Creates a {@link ServerCallHandler} for a unary request call method of the service.
* *
* @param method an adaptor to the actual method on the service implementation. * @param method an adaptor to the actual method on the service implementation.
*/ */
@ -283,7 +283,7 @@ public final class ServerCalls {
} }
/** /**
* Creates a {@code ServerCallHandler} for a streaming request call method of the service. * Creates a {@link ServerCallHandler} for a streaming request call method of the service.
* *
* @param method an adaptor to the actual method on the service implementation. * @param method an adaptor to the actual method on the service implementation.
*/ */
@ -399,8 +399,8 @@ public final class ServerCalls {
* @param methodDescriptor of method for which error will be thrown. * @param methodDescriptor of method for which error will be thrown.
* @param responseObserver on which error will be set. * @param responseObserver on which error will be set.
*/ */
public static void asyncUnimplementedUnaryCall(MethodDescriptor<?, ?> methodDescriptor, public static void asyncUnimplementedUnaryCall(
StreamObserver<?> responseObserver) { MethodDescriptor<?, ?> methodDescriptor, StreamObserver<?> responseObserver) {
checkNotNull(methodDescriptor, "methodDescriptor"); checkNotNull(methodDescriptor, "methodDescriptor");
checkNotNull(responseObserver, "responseObserver"); checkNotNull(responseObserver, "responseObserver");
responseObserver.onError(Status.UNIMPLEMENTED responseObserver.onError(Status.UNIMPLEMENTED