Rename Call to ClientCalls.

Other classes are already following the convention that ClientFoo for
client-side, and ServerFoo for server-side. Call has been the black
sheep of the family.

- Call -> ClientCall
- Calls -> ClientCalls
- ForwardingCall* -> ForwardingClientCall*
This commit is contained in:
Kun Zhang 2015-06-04 16:39:25 -07:00
parent 4ee2a6584a
commit 2ee4d0228d
28 changed files with 274 additions and 259 deletions

View File

@ -236,7 +236,7 @@ The 'channel' layer is an abstraction over transport handling that is suitable f
#### Client
* [Channel - client side binding](https://github.com/google/grpc-java/blob/master/core/src/main/java/io/grpc/Channel.java)
* [Call](https://github.com/google/grpc-java/blob/master/core/src/main/java/io/grpc/Call.java)
* [Client Call](https://github.com/google/grpc-java/blob/master/core/src/main/java/io/grpc/ClientCall.java)
* [Client Interceptor](https://github.com/google/grpc-java/blob/master/core/src/main/java/io/grpc/ClientInterceptor.java)
#### Server

View File

@ -34,10 +34,10 @@ package io.grpc.auth;
import com.google.auth.Credentials;
import com.google.common.base.Preconditions;
import io.grpc.Call;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors.CheckedForwardingCall;
import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -64,11 +64,11 @@ public class ClientAuthInterceptor implements ClientInterceptor {
}
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
// TODO(ejona86): If the call fails for Auth reasons, this does not properly propagate info that
// would be in WWW-Authenticate, because it does not yet have access to the header.
return new CheckedForwardingCall<ReqT, RespT>(next.newCall(method)) {
return new CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method)) {
@Override
protected void checkedStart(Listener<RespT> responseListener, Metadata.Headers headers)
throws Exception {

View File

@ -44,8 +44,8 @@ import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import io.grpc.Call;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@ -82,13 +82,13 @@ public class ClientAuthInterceptorTests {
MethodDescriptor<String, Integer> descriptor;
@Mock
Call.Listener<Integer> listener;
ClientCall.Listener<Integer> listener;
@Mock
Channel channel;
@Mock
Call<String, Integer> call;
ClientCall<String, Integer> call;
ClientAuthInterceptor interceptor;
@ -109,7 +109,7 @@ public class ClientAuthInterceptorTests {
values.put("Extra-Authorization", "token3");
values.put("Extra-Authorization", "token4");
when(credentials.getRequestMetadata()).thenReturn(Multimaps.asMap(values));
Call<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
ClientCall<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
Metadata.Headers headers = new Metadata.Headers();
interceptedCall.start(listener, headers);
verify(call).start(listener, headers);
@ -125,7 +125,7 @@ public class ClientAuthInterceptorTests {
@Test
public void testCredentialsThrows() throws IOException {
when(credentials.getRequestMetadata()).thenThrow(new IOException("Broken"));
Call<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
ClientCall<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
Metadata.Headers headers = new Metadata.Headers();
interceptedCall.start(listener, headers);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
@ -146,7 +146,7 @@ public class ClientAuthInterceptorTests {
}
};
interceptor = new ClientAuthInterceptor(oAuth2Credentials, Executors.newSingleThreadExecutor());
Call<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
ClientCall<String, Integer> interceptedCall = interceptor.interceptCall(descriptor, channel);
Metadata.Headers headers = new Metadata.Headers();
interceptedCall.start(listener, headers);
verify(call).start(listener, headers);

View File

@ -1,13 +1,13 @@
package io.grpc.testing;
import static io.grpc.stub.Calls.createMethodDescriptor;
import static io.grpc.stub.Calls.asyncUnaryCall;
import static io.grpc.stub.Calls.asyncServerStreamingCall;
import static io.grpc.stub.Calls.asyncClientStreamingCall;
import static io.grpc.stub.Calls.duplexStreamingCall;
import static io.grpc.stub.Calls.blockingUnaryCall;
import static io.grpc.stub.Calls.blockingServerStreamingCall;
import static io.grpc.stub.Calls.unaryFutureCall;
import static io.grpc.stub.ClientCalls.createMethodDescriptor;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.createMethodDefinition;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;

View File

@ -1,13 +1,13 @@
package io.grpc.testing;
import static io.grpc.stub.Calls.createMethodDescriptor;
import static io.grpc.stub.Calls.asyncUnaryCall;
import static io.grpc.stub.Calls.asyncServerStreamingCall;
import static io.grpc.stub.Calls.asyncClientStreamingCall;
import static io.grpc.stub.Calls.duplexStreamingCall;
import static io.grpc.stub.Calls.blockingUnaryCall;
import static io.grpc.stub.Calls.blockingServerStreamingCall;
import static io.grpc.stub.Calls.unaryFutureCall;
import static io.grpc.stub.ClientCalls.createMethodDescriptor;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.createMethodDefinition;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;

View File

@ -2,8 +2,8 @@ package io.grpc.benchmarks.netty;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Call;
import io.grpc.ChannelImpl;
import io.grpc.ClientCall;
import io.grpc.DeferredInputStream;
import io.grpc.KnownLength;
import io.grpc.Marshaller;
@ -15,7 +15,7 @@ import io.grpc.ServerCallHandler;
import io.grpc.ServerImpl;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.stub.Calls;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import io.grpc.transport.netty.NegotiationType;
import io.grpc.transport.netty.NettyChannelBuilder;
@ -326,7 +326,7 @@ public abstract class AbstractBenchmark {
public void onCompleted() {
if (!done.get()) {
ByteBuf slice = request.slice();
Calls.asyncUnaryCall(channel.newCall(unaryMethod), slice, this);
ClientCalls.asyncUnaryCall(channel.newCall(unaryMethod), slice, this);
}
}
};
@ -346,10 +346,10 @@ public abstract class AbstractBenchmark {
final long counterDelta) {
for (final ChannelImpl channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final Call<ByteBuf, ByteBuf> streamingCall = channel.newCall(pingPongMethod);
final ClientCall<ByteBuf, ByteBuf> streamingCall = channel.newCall(pingPongMethod);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
StreamObserver<ByteBuf> requestObserver = Calls.duplexStreamingCall(streamingCall,
StreamObserver<ByteBuf> requestObserver = ClientCalls.duplexStreamingCall(streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onValue(ByteBuf value) {
@ -389,10 +389,10 @@ public abstract class AbstractBenchmark {
final long counterDelta) {
for (final ChannelImpl channel : channels) {
for (int i = 0; i < callsPerChannel; i++) {
final Call<ByteBuf, ByteBuf> streamingCall = channel.newCall(flowControlledStreaming);
final ClientCall<ByteBuf, ByteBuf> streamingCall = channel.newCall(flowControlledStreaming);
final AtomicReference<StreamObserver<ByteBuf>> requestObserverRef =
new AtomicReference<StreamObserver<ByteBuf>>();
StreamObserver<ByteBuf> requestObserver = Calls.duplexStreamingCall(streamingCall,
StreamObserver<ByteBuf> requestObserver = ClientCalls.duplexStreamingCall(streamingCall,
new StreamObserver<ByteBuf>() {
@Override
public void onValue(ByteBuf value) {

View File

@ -1,6 +1,6 @@
package io.grpc.benchmarks.netty;
import io.grpc.stub.Calls;
import io.grpc.stub.ClientCalls;
import io.netty.buffer.Unpooled;
import org.openjdk.jmh.annotations.Benchmark;
@ -50,7 +50,7 @@ public class SingleThreadBlockingQpsBenchmark extends AbstractBenchmark {
*/
@Benchmark
public void blockingUnary() throws Exception {
Calls.blockingUnaryCall(channels[0].newCall(unaryMethod), Unpooled.EMPTY_BUFFER);
ClientCalls.blockingUnaryCall(channels[0].newCall(unaryMethod), Unpooled.EMPTY_BUFFER);
}
/**

View File

@ -618,21 +618,21 @@ static void PrintService(const ServiceDescriptor* service,
void PrintImports(Printer* p, bool generate_nano) {
p->Print(
"import static "
"io.grpc.stub.Calls.createMethodDescriptor;\n"
"io.grpc.stub.ClientCalls.createMethodDescriptor;\n"
"import static "
"io.grpc.stub.Calls.asyncUnaryCall;\n"
"io.grpc.stub.ClientCalls.asyncUnaryCall;\n"
"import static "
"io.grpc.stub.Calls.asyncServerStreamingCall;\n"
"io.grpc.stub.ClientCalls.asyncServerStreamingCall;\n"
"import static "
"io.grpc.stub.Calls.asyncClientStreamingCall;\n"
"io.grpc.stub.ClientCalls.asyncClientStreamingCall;\n"
"import static "
"io.grpc.stub.Calls.duplexStreamingCall;\n"
"io.grpc.stub.ClientCalls.duplexStreamingCall;\n"
"import static "
"io.grpc.stub.Calls.blockingUnaryCall;\n"
"io.grpc.stub.ClientCalls.blockingUnaryCall;\n"
"import static "
"io.grpc.stub.Calls.blockingServerStreamingCall;\n"
"io.grpc.stub.ClientCalls.blockingServerStreamingCall;\n"
"import static "
"io.grpc.stub.Calls.unaryFutureCall;\n"
"io.grpc.stub.ClientCalls.unaryFutureCall;\n"
"import static "
"io.grpc.stub.ServerCalls.createMethodDefinition;\n"
"import static "

View File

@ -1,13 +1,13 @@
package io.grpc.testing.integration;
import static io.grpc.stub.Calls.createMethodDescriptor;
import static io.grpc.stub.Calls.asyncUnaryCall;
import static io.grpc.stub.Calls.asyncServerStreamingCall;
import static io.grpc.stub.Calls.asyncClientStreamingCall;
import static io.grpc.stub.Calls.duplexStreamingCall;
import static io.grpc.stub.Calls.blockingUnaryCall;
import static io.grpc.stub.Calls.blockingServerStreamingCall;
import static io.grpc.stub.Calls.unaryFutureCall;
import static io.grpc.stub.ClientCalls.createMethodDescriptor;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.createMethodDefinition;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;

View File

@ -1,13 +1,13 @@
package io.grpc.testing.integration;
import static io.grpc.stub.Calls.createMethodDescriptor;
import static io.grpc.stub.Calls.asyncUnaryCall;
import static io.grpc.stub.Calls.asyncServerStreamingCall;
import static io.grpc.stub.Calls.asyncClientStreamingCall;
import static io.grpc.stub.Calls.duplexStreamingCall;
import static io.grpc.stub.Calls.blockingUnaryCall;
import static io.grpc.stub.Calls.blockingServerStreamingCall;
import static io.grpc.stub.Calls.unaryFutureCall;
import static io.grpc.stub.ClientCalls.createMethodDescriptor;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.createMethodDefinition;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;

View File

@ -35,8 +35,8 @@ import javax.annotation.concurrent.ThreadSafe;
/**
* A Channel provides an abstraction over the transport layer that is designed to be consumed
* by stub implementations. Channel and its associated types {@link Call} and
* {@link Call.Listener} exchange parsed request and response objects whereas the
* by stub implementations. Channel and its associated types {@link ClientCall} and
* {@link ClientCall.Listener} exchange parsed request and response objects whereas the
* transport layer only works with serialized data.
*
* <p>Applications can add common cross-cutting behaviors to stubs by decorating Channel
@ -48,15 +48,15 @@ import javax.annotation.concurrent.ThreadSafe;
public abstract class Channel {
/**
* Create a {@link Call} to the remote operation specified by the given
* {@link MethodDescriptor}. The returned {@link Call} does not trigger any remote
* behavior until {@link Call#start(Call.Listener, Metadata.Headers)} is
* Create a {@link ClientCall} to the remote operation specified by the given
* {@link MethodDescriptor}. The returned {@link ClientCall} does not trigger any remote
* behavior until {@link ClientCall#start(ClientCall.Listener, Metadata.Headers)} is
* invoked.
*
* @param methodDescriptor describes the name and parameter types of the operation to call.
* @return a {@link Call} bound to the specified method.
* @return a {@link ClientCall} bound to the specified method.
*
*/
public abstract <RequestT, ResponseT> Call<RequestT, ResponseT> newCall(
public abstract <RequestT, ResponseT> ClientCall<RequestT, ResponseT> newCall(
MethodDescriptor<RequestT, ResponseT> methodDescriptor);
}

View File

@ -67,8 +67,8 @@ public final class ChannelImpl extends Channel {
@Override public void request(int numMessages) {}
/**
* Always returns {@code false}, since this is only used when the startup of the {@link Call}
* fails (i.e. the {@link Call} is closed).
* Always returns {@code false}, since this is only used when the startup of the {@link
* ClientCall} fails (i.e. the {@link ClientCall} is closed).
*/
@Override public boolean isReady() {
return false;
@ -179,7 +179,7 @@ public final class ChannelImpl extends Channel {
* Creates a new outgoing call on the channel.
*/
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return new CallImpl<ReqT, RespT>(method, new SerializingExecutor(executor));
}
@ -243,7 +243,7 @@ public final class ChannelImpl extends Channel {
}
}
private class CallImpl<ReqT, RespT> extends Call<ReqT, RespT> {
private class CallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final MethodDescriptor<ReqT, RespT> method;
private final SerializingExecutor callExecutor;
private final boolean unaryRequest;

View File

@ -53,7 +53,7 @@ package io.grpc;
* @param <RequestT> type of message sent one or more times to the server.
* @param <ResponseT> type of message received one or more times from the server.
*/
public abstract class Call<RequestT, ResponseT> {
public abstract class ClientCall<RequestT, ResponseT> {
/**
* Callbacks for receiving metadata, response messages and completion status from the server.
*
@ -80,10 +80,10 @@ public abstract class Call<RequestT, ResponseT> {
public abstract void onPayload(T payload);
/**
* The Call has been closed. No further sending or receiving can occur. If {@code status} is
* not equal to {@link Status#OK}, then the call failed. An additional block of trailer metadata
* may be received at the end of the call from the server. An empty {@link Metadata} object is
* passed if no trailers are received.
* The ClientCall has been closed. No further sending or receiving can occur. If {@code status}
* is not equal to {@link Status#OK}, then the call failed. An additional block of trailer
* metadata may be received at the end of the call from the server. An empty {@link Metadata}
* object is passed if no trailers are received.
*
* @param status the result of the remote call.
* @param trailers metadata provided at call completion.
@ -91,10 +91,10 @@ public abstract class Call<RequestT, ResponseT> {
public abstract void onClose(Status status, Metadata.Trailers trailers);
/**
* This indicates that the Call is now capable of sending additional messages (via
* This indicates that the ClientCall is now capable of sending additional messages (via
* {@link #sendPayload}) without requiring excessive buffering internally. This event is
* just a suggestion and the application is free to ignore it, however doing so may
* result in excessive buffering within the Call.
* result in excessive buffering within the ClientCall.
*/
public void onReady() {}
}
@ -125,7 +125,7 @@ public abstract class Call<RequestT, ResponseT> {
public abstract void request(int numMessages);
/**
* Prevent any further processing for this Call. No further messages may be sent or will be
* Prevent any further processing for this ClientCall. No further messages may be sent or will be
* received. The server is informed of cancellations, but may not stop processing the call.
* Cancellation is permitted even if previously {@code cancel()}ed or {@link #halfClose}d.
*/

View File

@ -47,7 +47,7 @@ import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public interface ClientInterceptor {
/**
* Intercept {@link Call} creation by the {@code next} {@link Channel}.
* Intercept {@link ClientCall} creation by the {@code next} {@link Channel}.
*
* <p>Many variations of interception are possible. Complex implementations may return a wrapper
* around the result of {@code next.newCall()}, whereas a simpler implementation may just modify
@ -57,7 +57,7 @@ public interface ClientInterceptor {
* @param next the channel which is being intercepted.
* @return the call object for the remote operation, never {@code null}.
*/
<RequestT, ResponseT> Call<RequestT, ResponseT> interceptCall(
<RequestT, ResponseT> ClientCall<RequestT, ResponseT> interceptCall(
MethodDescriptor<RequestT, ResponseT> method,
Channel next);
}

View File

@ -34,8 +34,8 @@ package io.grpc;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import io.grpc.ForwardingCall.SimpleForwardingCall;
import io.grpc.ForwardingCallListener.SimpleForwardingCallListener;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import java.util.Arrays;
import java.util.Iterator;
@ -87,7 +87,7 @@ public class ClientInterceptors {
}
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return new ProcessInterceptorChannel(channel, interceptors).newCall(method);
}
}
@ -102,7 +102,7 @@ public class ClientInterceptors {
}
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
if (interceptors != null && interceptors.hasNext()) {
return interceptors.next().interceptCall(method, this);
} else {
@ -116,18 +116,19 @@ public class ClientInterceptors {
}
/**
* A {@link Call} which forwards all of it's methods to another {@link Call}.
* A {@link ClientCall} which forwards all of it's methods to another {@link ClientCall}.
*
* @deprecated Use {@link SimpleForwardingCall}.
* @deprecated Use {@link SimpleForwardingClientCall}.
*/
@Deprecated
public static class ForwardingCall<ReqT, RespT> extends SimpleForwardingCall<ReqT, RespT> {
public ForwardingCall(Call<ReqT, RespT> delegate) {
public static class ForwardingClientCall<ReqT, RespT>
extends SimpleForwardingClientCall<ReqT, RespT> {
public ForwardingClientCall(ClientCall<ReqT, RespT> delegate) {
super(delegate);
}
}
private static final Call<Object, Object> NOOP_CALL = new Call<Object, Object>() {
private static final ClientCall<Object, Object> NOOP_CALL = new ClientCall<Object, Object>() {
@Override
public void start(Listener<Object> responseListener, Metadata.Headers headers) {}
@ -144,8 +145,8 @@ public class ClientInterceptors {
public void sendPayload(Object payload) {}
/**
* Always returns {@code false}, since this is only used when the startup of the {@link Call}
* fails (i.e. the {@link Call} is closed).
* Always returns {@code false}, since this is only used when the startup of the {@link
* ClientCall} fails (i.e. the {@link ClientCall} is closed).
*/
@Override
public boolean isReady() {
@ -154,36 +155,38 @@ public class ClientInterceptors {
};
/**
* A {@link io.grpc.ForwardingCall} that delivers exceptions from its start logic to the call
* listener.
* A {@link io.grpc.ForwardingClientCall} that delivers exceptions from its start logic to the
* call listener.
*
* <p>{@link Call#start(Call.Listener, Metadata.Headers)} should not throw any exception other
* than those caused by misuse, e.g., {@link IllegalStateException}. {@code
* CheckedForwardingCall} provides {@code checkedStart()} in which throwing exceptions is allowed.
* <p>{@link ClientCall#start(ClientCall.Listener, Metadata.Headers)} should not throw any
* exception other than those caused by misuse, e.g., {@link IllegalStateException}. {@code
* CheckedForwardingClientCall} provides {@code checkedStart()} in which throwing exceptions is
* allowed.
*/
public abstract static class CheckedForwardingCall<ReqT, RespT>
extends io.grpc.ForwardingCall<ReqT, RespT> {
public abstract static class CheckedForwardingClientCall<ReqT, RespT>
extends io.grpc.ForwardingClientCall<ReqT, RespT> {
private Call<ReqT, RespT> delegate;
private ClientCall<ReqT, RespT> delegate;
/**
* Subclasses implement the start logic here that would normally belong to {@code start()}.
*
* <p>Implementation should call {@code this.delegate().start()} in the normal path. Exceptions
* may safely be thrown prior to calling {@code this.delegate().start()}. Such exceptions will
* be handled by {@code CheckedForwardingCall} and be delivered to {@code responseListener}.
* Exceptions <em>must not</em> be thrown after calling {@code this.delegate().start()}, as this
* can result in {@link Call.Listener#onClose} being called multiple times.
* be handled by {@code CheckedForwardingClientCall} and be delivered to {@code
* responseListener}. Exceptions <em>must not</em> be thrown after calling {@code
* this.delegate().start()}, as this can result in {@link ClientCall.Listener#onClose} being
* called multiple times.
*/
protected abstract void checkedStart(Listener<RespT> responseListener, Metadata.Headers headers)
throws Exception;
protected CheckedForwardingCall(Call<ReqT, RespT> delegate) {
protected CheckedForwardingClientCall(ClientCall<ReqT, RespT> delegate) {
this.delegate = delegate;
}
@Override
protected final Call<ReqT, RespT> delegate() {
protected final ClientCall<ReqT, RespT> delegate() {
return delegate;
}
@ -198,22 +201,22 @@ public class ClientInterceptors {
// IllegalStateException because delegate().start() was not called. We switch the delegate
// to a NO-OP one to prevent the IllegalStateException. The user will finally get notified
// about the error through the listener.
delegate = (Call<ReqT, RespT>) NOOP_CALL;
delegate = (ClientCall<ReqT, RespT>) NOOP_CALL;
responseListener.onClose(Status.fromThrowable(e), new Metadata.Trailers());
}
}
}
/**
* A {@link Call.Listener} which forwards all of its methods to another
* {@link Call.Listener}.
* A {@link ClientCall.Listener} which forwards all of its methods to another
* {@link ClientCall.Listener}.
*
* @deprecated Use {@link SimpleForwardingCallListener}.
* @deprecated Use {@link SimpleForwardingClientCallListener}.
*/
@Deprecated
public static class ForwardingListener<T> extends SimpleForwardingCallListener<T> {
public static class ForwardingListener<T> extends SimpleForwardingClientCallListener<T> {
public ForwardingListener(Call.Listener<T> delegate) {
public ForwardingListener(ClientCall.Listener<T> delegate) {
super(delegate);
}
}

View File

@ -32,13 +32,13 @@
package io.grpc;
/**
* A {@link Call} which forwards all of it's methods to another {@link Call}.
* A {@link ClientCall} which forwards all of it's methods to another {@link ClientCall}.
*/
public abstract class ForwardingCall<ReqT, RespT> extends Call<ReqT, RespT> {
public abstract class ForwardingClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
/**
* Returns the delegated {@code Call}.
* Returns the delegated {@code ClientCall}.
*/
protected abstract Call<ReqT, RespT> delegate();
protected abstract ClientCall<ReqT, RespT> delegate();
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
@ -71,19 +71,19 @@ public abstract class ForwardingCall<ReqT, RespT> extends Call<ReqT, RespT> {
}
/**
* A simplified version of {@link ForwardingCall} where subclasses can pass in a {@link Call} as
* the delegate.
* A simplified version of {@link ForwardingClientCall} where subclasses can pass in a {@link
* ClientCall} as the delegate.
*/
public abstract static class SimpleForwardingCall<ReqT, RespT>
extends ForwardingCall<ReqT, RespT> {
private final Call<ReqT, RespT> delegate;
public abstract static class SimpleForwardingClientCall<ReqT, RespT>
extends ForwardingClientCall<ReqT, RespT> {
private final ClientCall<ReqT, RespT> delegate;
protected SimpleForwardingCall(Call<ReqT, RespT> delegate) {
protected SimpleForwardingClientCall(ClientCall<ReqT, RespT> delegate) {
this.delegate = delegate;
}
@Override
protected Call<ReqT, RespT> delegate() {
protected ClientCall<ReqT, RespT> delegate() {
return delegate;
}
}

View File

@ -32,13 +32,14 @@
package io.grpc;
/**
* A {@link Call.Listener} which forwards all of its methods to another {@link Call.Listener}.
* A {@link ClientCall.Listener} which forwards all of its methods to another {@link
* ClientCall.Listener}.
*/
public abstract class ForwardingCallListener<RespT> extends Call.Listener<RespT> {
public abstract class ForwardingClientCallListener<RespT> extends ClientCall.Listener<RespT> {
/**
* Returns the delegated {@code Call.Listener}.
* Returns the delegated {@code ClientCall.Listener}.
*/
protected abstract Call.Listener<RespT> delegate();
protected abstract ClientCall.Listener<RespT> delegate();
@Override
public void onHeaders(Metadata.Headers headers) {
@ -61,20 +62,20 @@ public abstract class ForwardingCallListener<RespT> extends Call.Listener<RespT>
}
/**
* A simplified version of {@link ForwardingCallListener} where subclasses can pass in a {@link
* Call.Listener} as the delegate.
* A simplified version of {@link ForwardingClientCallListener} where subclasses can pass in a
* {@link ClientCall.Listener} as the delegate.
*/
public abstract static class SimpleForwardingCallListener<RespT>
extends ForwardingCallListener<RespT> {
public abstract static class SimpleForwardingClientCallListener<RespT>
extends ForwardingClientCallListener<RespT> {
private final Call.Listener<RespT> delegate;
private final ClientCall.Listener<RespT> delegate;
protected SimpleForwardingCallListener(Call.Listener<RespT> delegate) {
protected SimpleForwardingClientCallListener(ClientCall.Listener<RespT> delegate) {
this.delegate = delegate;
}
@Override
protected Call.Listener<RespT> delegate() {
protected ClientCall.Listener<RespT> delegate() {
return delegate;
}
}

View File

@ -63,8 +63,8 @@ public enum MethodType {
/**
* Returns {@code true} if the client will immediately send one request message to the server
* after calling {@link Call#start(io.grpc.Call.Listener, io.grpc.Metadata.Headers)} and then
* immediately half-close the stream by calling {@link io.grpc.Call#halfClose()}.
* after calling {@link ClientCall#start(io.grpc.ClientCall.Listener, io.grpc.Metadata.Headers)}
* and then immediately half-close the stream by calling {@link io.grpc.ClientCall#halfClose()}.
*/
public final boolean clientSendsOneMessage() {
return this == UNARY || this == SERVER_STREAMING;

View File

@ -46,9 +46,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import io.grpc.ClientInterceptors.CheckedForwardingCall;
import io.grpc.ForwardingCall.SimpleForwardingCall;
import io.grpc.ForwardingCallListener.SimpleForwardingCallListener;
import io.grpc.ClientInterceptors.CheckedForwardingClientCall;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import org.junit.Before;
import org.junit.Test;
@ -73,7 +73,7 @@ public class ClientInterceptorsTest {
private Channel channel;
@Mock
private Call<String, Integer> call;
private ClientCall<String, Integer> call;
@Mock
private MethodDescriptor<String, Integer> method;
@ -89,7 +89,8 @@ public class ClientInterceptorsTest {
Answer<Void> checkStartCalled = new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
verify(call).start(Mockito.<Call.Listener<Integer>>any(), Mockito.<Metadata.Headers>any());
verify(call).start(Mockito.<ClientCall.Listener<Integer>>any(),
Mockito.<Metadata.Headers>any());
return null;
}
};
@ -138,7 +139,8 @@ public class ClientInterceptorsTest {
public void callNextTwice() {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
next.newCall(method);
return next.newCall(method);
@ -154,14 +156,15 @@ public class ClientInterceptorsTest {
channel = new Channel() {
@SuppressWarnings("unchecked")
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
order.add("channel");
return (Call<ReqT, RespT>) call;
return (ClientCall<ReqT, RespT>) call;
}
};
ClientInterceptor interceptor1 = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
order.add("i1");
return next.newCall(method);
@ -169,7 +172,8 @@ public class ClientInterceptorsTest {
};
ClientInterceptor interceptor2 = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
order.add("i2");
return next.newCall(method);
@ -185,12 +189,13 @@ public class ClientInterceptorsTest {
final Metadata.Key<String> credKey = Metadata.Key.of("Cred", Metadata.ASCII_STRING_MARSHALLER);
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new SimpleForwardingCall<ReqT, RespT>(call) {
ClientCall<ReqT, RespT> call = next.newCall(method);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Call.Listener<RespT> responseListener, Metadata.Headers headers) {
public void start(ClientCall.Listener<RespT> responseListener, Metadata.Headers headers) {
headers.put(credKey, "abcd");
super.start(responseListener, headers);
}
@ -199,8 +204,8 @@ public class ClientInterceptorsTest {
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
@SuppressWarnings("unchecked")
Call.Listener<Integer> listener = mock(Call.Listener.class);
Call<String, Integer> interceptedCall = intercepted.newCall(method);
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
ClientCall<String, Integer> interceptedCall = intercepted.newCall(method);
// start() on the intercepted call will eventually reach the call created by the real channel
interceptedCall.start(listener, new Metadata.Headers());
ArgumentCaptor<Metadata.Headers> captor = ArgumentCaptor.forClass(Metadata.Headers.class);
@ -215,13 +220,14 @@ public class ClientInterceptorsTest {
final List<Metadata.Headers> examinedHeaders = new ArrayList<Metadata.Headers>();
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new SimpleForwardingCall<ReqT, RespT>(call) {
ClientCall<ReqT, RespT> call = next.newCall(method);
return new SimpleForwardingClientCall<ReqT, RespT>(call) {
@Override
public void start(Call.Listener<RespT> responseListener, Metadata.Headers headers) {
super.start(new SimpleForwardingCallListener<RespT>(responseListener) {
public void start(ClientCall.Listener<RespT> responseListener, Metadata.Headers headers) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata.Headers headers) {
examinedHeaders.add(headers);
@ -234,11 +240,11 @@ public class ClientInterceptorsTest {
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
@SuppressWarnings("unchecked")
Call.Listener<Integer> listener = mock(Call.Listener.class);
Call<String, Integer> interceptedCall = intercepted.newCall(method);
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
ClientCall<String, Integer> interceptedCall = intercepted.newCall(method);
interceptedCall.start(listener, new Metadata.Headers());
// Capture the underlying call listener that will receive headers from the transport.
ArgumentCaptor<Call.Listener<Integer>> captor = ArgumentCaptor.forClass(null);
ArgumentCaptor<ClientCall.Listener<Integer>> captor = ArgumentCaptor.forClass(null);
verify(call).start(captor.capture(), Mockito.<Metadata.Headers>any());
Metadata.Headers inboundHeaders = new Metadata.Headers();
// Simulate that a headers arrives on the underlying call listener.
@ -250,17 +256,18 @@ public class ClientInterceptorsTest {
public void normalCall() {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new SimpleForwardingCall<ReqT, RespT>(call) { };
ClientCall<ReqT, RespT> call = next.newCall(method);
return new SimpleForwardingClientCall<ReqT, RespT>(call) { };
}
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
Call<String, Integer> interceptedCall = intercepted.newCall(method);
ClientCall<String, Integer> interceptedCall = intercepted.newCall(method);
assertNotSame(call, interceptedCall);
@SuppressWarnings("unchecked")
Call.Listener<Integer> listener = mock(Call.Listener.class);
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
Metadata.Headers headers = new Metadata.Headers();
interceptedCall.start(listener, headers);
verify(call).start(same(listener), same(headers));
@ -277,12 +284,13 @@ public class ClientInterceptorsTest {
final Exception error = new Exception("emulated error");
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
Call<ReqT, RespT> call = next.newCall(method);
return new CheckedForwardingCall<ReqT, RespT>(call) {
ClientCall<ReqT, RespT> call = next.newCall(method);
return new CheckedForwardingClientCall<ReqT, RespT>(call) {
@Override
protected void checkedStart(Call.Listener<RespT> responseListener,
protected void checkedStart(ClientCall.Listener<RespT> responseListener,
Metadata.Headers headers) throws Exception {
if (this instanceof Object) {
throw error;
@ -294,8 +302,8 @@ public class ClientInterceptorsTest {
};
Channel intercepted = ClientInterceptors.intercept(channel, interceptor);
@SuppressWarnings("unchecked")
Call.Listener<Integer> listener = mock(Call.Listener.class);
Call<String, Integer> interceptedCall = intercepted.newCall(method);
ClientCall.Listener<Integer> listener = mock(ClientCall.Listener.class);
ClientCall<String, Integer> interceptedCall = intercepted.newCall(method);
assertNotSame(call, interceptedCall);
interceptedCall.start(listener, new Metadata.Headers());
interceptedCall.sendPayload("request");
@ -309,7 +317,7 @@ public class ClientInterceptorsTest {
private static class NoopInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
return next.newCall(method);
}

View File

@ -1,13 +1,13 @@
package io.grpc.examples.helloworld;
import static io.grpc.stub.Calls.createMethodDescriptor;
import static io.grpc.stub.Calls.asyncUnaryCall;
import static io.grpc.stub.Calls.asyncServerStreamingCall;
import static io.grpc.stub.Calls.asyncClientStreamingCall;
import static io.grpc.stub.Calls.duplexStreamingCall;
import static io.grpc.stub.Calls.blockingUnaryCall;
import static io.grpc.stub.Calls.blockingServerStreamingCall;
import static io.grpc.stub.Calls.unaryFutureCall;
import static io.grpc.stub.ClientCalls.createMethodDescriptor;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.createMethodDefinition;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;

View File

@ -1,13 +1,13 @@
package io.grpc.examples.routeguide;
import static io.grpc.stub.Calls.createMethodDescriptor;
import static io.grpc.stub.Calls.asyncUnaryCall;
import static io.grpc.stub.Calls.asyncServerStreamingCall;
import static io.grpc.stub.Calls.asyncClientStreamingCall;
import static io.grpc.stub.Calls.duplexStreamingCall;
import static io.grpc.stub.Calls.blockingUnaryCall;
import static io.grpc.stub.Calls.blockingServerStreamingCall;
import static io.grpc.stub.Calls.unaryFutureCall;
import static io.grpc.stub.ClientCalls.createMethodDescriptor;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.createMethodDefinition;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;

View File

@ -31,11 +31,11 @@
package io.grpc.examples.header;
import io.grpc.Call;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingCall.SimpleForwardingCall;
import io.grpc.ForwardingCallListener.SimpleForwardingCallListener;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -52,15 +52,15 @@ public class HeaderClientInterceptor implements ClientInterceptor {
Metadata.Key.of("custom_client_header_key", Metadata.ASCII_STRING_MARSHALLER);
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) {
return new SimpleForwardingCall<ReqT, RespT>(next.newCall(method)) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
/* put custom header */
headers.put(customHeadKey, "customRequestValue");
super.start(new SimpleForwardingCallListener<RespT>(responseListener) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata.Headers headers) {
/**

View File

@ -1,13 +1,13 @@
package io.grpc.testing.integration;
import static io.grpc.stub.Calls.createMethodDescriptor;
import static io.grpc.stub.Calls.asyncUnaryCall;
import static io.grpc.stub.Calls.asyncServerStreamingCall;
import static io.grpc.stub.Calls.asyncClientStreamingCall;
import static io.grpc.stub.Calls.duplexStreamingCall;
import static io.grpc.stub.Calls.blockingUnaryCall;
import static io.grpc.stub.Calls.blockingServerStreamingCall;
import static io.grpc.stub.Calls.unaryFutureCall;
import static io.grpc.stub.ClientCalls.createMethodDescriptor;
import static io.grpc.stub.ClientCalls.asyncUnaryCall;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;
import static io.grpc.stub.ClientCalls.asyncClientStreamingCall;
import static io.grpc.stub.ClientCalls.duplexStreamingCall;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ClientCalls.blockingServerStreamingCall;
import static io.grpc.stub.ClientCalls.unaryFutureCall;
import static io.grpc.stub.ServerCalls.createMethodDefinition;
import static io.grpc.stub.ServerCalls.asyncUnaryRequestCall;
import static io.grpc.stub.ServerCalls.asyncStreamingRequestCall;

View File

@ -49,8 +49,8 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.EmptyProtos.Empty;
import io.grpc.AbstractServerBuilder;
import io.grpc.Call;
import io.grpc.ChannelImpl;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.ServerImpl;
import io.grpc.ServerInterceptors;
@ -435,9 +435,9 @@ public abstract class AbstractTransportTest {
long start = System.nanoTime();
final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(10);
Call<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
ClientCall<StreamingOutputCallRequest, StreamingOutputCallResponse> call =
channel.newCall(TestServiceGrpc.CONFIG.streamingOutputCall);
call.start(new Call.Listener<StreamingOutputCallResponse>() {
call.start(new ClientCall.Listener<StreamingOutputCallResponse>() {
@Override
public void onHeaders(Metadata.Headers headers) {}

View File

@ -33,8 +33,8 @@ package io.grpc.stub;
import static org.junit.Assert.assertEquals;
import io.grpc.Call;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.MethodDescriptor;
import io.grpc.testing.integration.TestServiceGrpc;
@ -72,7 +72,7 @@ public class StubConfigTest {
private static class FakeChannel extends Channel {
@Override
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return null;
}
}

View File

@ -38,7 +38,7 @@ import com.google.common.util.concurrent.ExecutionError;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.UncheckedExecutionException;
import io.grpc.Call;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@ -58,7 +58,7 @@ import javax.annotation.Nullable;
* between utilities in this class and the potential signatures in a generated stub class so
* that the runtime can vary behavior without requiring regeneration of the stub.
*/
public class Calls {
public class ClientCalls {
/**
* Creates a {@link MethodDescriptor} for a given method.
@ -80,7 +80,7 @@ public class Calls {
* @return a future for the single response message.
*/
public static <ReqT, RespT> ListenableFuture<RespT> unaryFutureCall(
Call<ReqT, RespT> call,
ClientCall<ReqT, RespT> call,
ReqT param) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
asyncServerStreamingCall(call, param, new UnaryStreamToFuture<RespT>(responseFuture));
@ -124,7 +124,7 @@ public class Calls {
* Executes a unary call and blocks on the response.
* @return the single response message.
*/
public static <ReqT, RespT> RespT blockingUnaryCall(Call<ReqT, RespT> call, ReqT param) {
public static <ReqT, RespT> RespT blockingUnaryCall(ClientCall<ReqT, RespT> call, ReqT param) {
try {
return getUnchecked(unaryFutureCall(call, param));
} catch (Throwable t) {
@ -137,7 +137,7 @@ public class Calls {
* Executes a unary call with a response {@link StreamObserver}.
*/
public static <ReqT, RespT> void asyncUnaryCall(
Call<ReqT, RespT> call,
ClientCall<ReqT, RespT> call,
ReqT param,
StreamObserver<RespT> observer) {
asyncServerStreamingCall(call, param, observer);
@ -150,7 +150,7 @@ public class Calls {
*/
// TODO(louiscryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Call<ReqT, RespT> call, ReqT param) {
ClientCall<ReqT, RespT> call, ReqT param) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(call);
asyncServerStreamingCall(call, param, result.listener());
return result;
@ -160,7 +160,7 @@ public class Calls {
* Executes a server-streaming call with a response {@link StreamObserver}.
*/
public static <ReqT, RespT> void asyncServerStreamingCall(
Call<ReqT, RespT> call,
ClientCall<ReqT, RespT> call,
ReqT param,
StreamObserver<RespT> responseObserver) {
asyncServerStreamingCall(call, param,
@ -168,9 +168,9 @@ public class Calls {
}
private static <ReqT, RespT> void asyncServerStreamingCall(
Call<ReqT, RespT> call,
ClientCall<ReqT, RespT> call,
ReqT param,
Call.Listener<RespT> responseListener) {
ClientCall.Listener<RespT> responseListener) {
call.start(responseListener, new Metadata.Headers());
call.request(1);
try {
@ -187,7 +187,7 @@ public class Calls {
* @return the single response value.
*/
public static <ReqT, RespT> RespT blockingClientStreamingCall(
Call<ReqT, RespT> call,
ClientCall<ReqT, RespT> call,
Iterator<ReqT> clientStream) {
GrpcFuture<RespT> responseFuture = new GrpcFuture<RespT>(call);
call.start(new UnaryStreamToFuture<RespT>(responseFuture), new Metadata.Headers());
@ -213,7 +213,7 @@ public class Calls {
* @return request stream observer.
*/
public static <ReqT, RespT> StreamObserver<ReqT> asyncClientStreamingCall(
Call<ReqT, RespT> call,
ClientCall<ReqT, RespT> call,
StreamObserver<RespT> responseObserver) {
return duplexStreamingCall(call, responseObserver);
}
@ -222,7 +222,7 @@ public class Calls {
* Executes a duplex-streaming call.
* @return request stream observer.
*/
public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(Call<ReqT, RespT> call,
public static <ReqT, RespT> StreamObserver<ReqT> duplexStreamingCall(ClientCall<ReqT, RespT> call,
StreamObserver<RespT> responseObserver) {
call.start(new StreamObserverToCallListenerAdapter<RespT>(call, responseObserver),
new Metadata.Headers());
@ -231,9 +231,9 @@ public class Calls {
}
private static class CallToStreamObserverAdapter<T> implements StreamObserver<T> {
private final Call<T, ?> call;
private final ClientCall<T, ?> call;
public CallToStreamObserverAdapter(Call<T, ?> call) {
public CallToStreamObserverAdapter(ClientCall<T, ?> call) {
this.call = call;
}
@ -254,12 +254,13 @@ public class Calls {
}
}
private static class StreamObserverToCallListenerAdapter<RespT> extends Call.Listener<RespT> {
private final Call<?, RespT> call;
private static class StreamObserverToCallListenerAdapter<RespT>
extends ClientCall.Listener<RespT> {
private final ClientCall<?, RespT> call;
private final StreamObserver<RespT> observer;
public StreamObserverToCallListenerAdapter(
Call<?, RespT> call, StreamObserver<RespT> observer) {
ClientCall<?, RespT> call, StreamObserver<RespT> observer) {
this.call = call;
this.observer = observer;
}
@ -289,7 +290,7 @@ public class Calls {
/**
* Complete a GrpcFuture using {@link StreamObserver} events.
*/
private static class UnaryStreamToFuture<RespT> extends Call.Listener<RespT> {
private static class UnaryStreamToFuture<RespT> extends ClientCall.Listener<RespT> {
private final GrpcFuture<RespT> responseFuture;
private RespT value;
@ -327,9 +328,9 @@ public class Calls {
}
private static class GrpcFuture<RespT> extends AbstractFuture<RespT> {
private final Call<?, RespT> call;
private final ClientCall<?, RespT> call;
GrpcFuture(Call<?, RespT> call) {
GrpcFuture(ClientCall<?, RespT> call) {
this.call = call;
}
@ -350,26 +351,26 @@ public class Calls {
}
/**
* Convert events on a {@link io.grpc.Call.Listener} into a blocking
* Convert events on a {@link io.grpc.ClientCall.Listener} into a blocking
* {@link Iterator}.
*
* <p>The class is not thread-safe, but it does permit {@link Call.Listener} calls in a separate
* thread from {@code Iterator} calls.
* <p>The class is not thread-safe, but it does permit {@link ClientCall.Listener} calls in a
* separate thread from {@code Iterator} calls.
*/
// TODO(ejona86): determine how to allow Call.cancel() in case of application error.
// TODO(ejona86): determine how to allow ClientCall.cancel() in case of application error.
private static class BlockingResponseStream<T> implements Iterator<T> {
// Due to flow control, only needs to hold up to 2 items: 1 for value, 1 for close.
private final BlockingQueue<Object> buffer = new ArrayBlockingQueue<Object>(2);
private final Call.Listener<T> listener = new QueuingListener();
private final Call<?, T> call;
private final ClientCall.Listener<T> listener = new QueuingListener();
private final ClientCall<?, T> call;
// Only accessed when iterating.
private Object last;
private BlockingResponseStream(Call<?, T> call) {
private BlockingResponseStream(ClientCall<?, T> call) {
this.call = call;
}
Call.Listener<T> listener() {
ClientCall.Listener<T> listener() {
return listener;
}
@ -409,7 +410,7 @@ public class Calls {
throw new UnsupportedOperationException();
}
private class QueuingListener extends Call.Listener<T> {
private class QueuingListener extends ClientCall.Listener<T> {
private boolean done = false;
@Override
@ -418,13 +419,13 @@ public class Calls {
@Override
public void onPayload(T value) {
Preconditions.checkState(!done, "Call already closed");
Preconditions.checkState(!done, "ClientCall already closed");
buffer.add(value);
}
@Override
public void onClose(Status status, Metadata.Trailers trailers) {
Preconditions.checkState(!done, "Call already closed");
Preconditions.checkState(!done, "ClientCall already closed");
if (status.isOk()) {
buffer.add(BlockingResponseStream.this);
} else {

View File

@ -31,11 +31,11 @@
package io.grpc.stub;
import io.grpc.Call;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingCall.SimpleForwardingCall;
import io.grpc.ForwardingCallListener.SimpleForwardingCallListener;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
@ -71,9 +71,10 @@ public class MetadataUtils {
public static ClientInterceptor newAttachHeadersInterceptor(final Metadata.Headers extraHeaders) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
return new SimpleForwardingCall<ReqT, RespT>(next.newCall(method)) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
headers.merge(extraHeaders);
@ -113,14 +114,15 @@ public class MetadataUtils {
final AtomicReference<Metadata.Trailers> trailersCapture) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
Channel next) {
return new SimpleForwardingCall<ReqT, RespT>(next.newCall(method)) {
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method)) {
@Override
public void start(Listener<RespT> responseListener, Metadata.Headers headers) {
headersCapture.set(null);
trailersCapture.set(null);
super.start(new SimpleForwardingCallListener<RespT>(responseListener) {
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
@Override
public void onHeaders(Metadata.Headers headers) {
headersCapture.set(headers);

View File

@ -38,7 +38,7 @@ import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.ListenableFuture;
import io.grpc.Call;
import io.grpc.ClientCall;
import io.grpc.Metadata;
import io.grpc.Status;
@ -54,12 +54,12 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
/**
* Unit tests for {@link Calls}.
* Unit tests for {@link ClientCalls}.
*/
@RunWith(JUnit4.class)
public class CallsTest {
public class ClientCallsTest {
@Mock private Call<Integer, String> call;
@Mock private ClientCall<Integer, String> call;
@Before public void setUp() {
MockitoAnnotations.initMocks(this);
@ -67,10 +67,10 @@ public class CallsTest {
@Test public void unaryFutureCallSuccess() throws Exception {
Integer req = 2;
ListenableFuture<String> future = Calls.unaryFutureCall(call, req);
ArgumentCaptor<Call.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
ListenableFuture<String> future = ClientCalls.unaryFutureCall(call, req);
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
verify(call).start(listenerCaptor.capture(), any(Metadata.Headers.class));
Call.Listener<String> listener = listenerCaptor.getValue();
ClientCall.Listener<String> listener = listenerCaptor.getValue();
verify(call).sendPayload(req);
verify(call).halfClose();
listener.onPayload("bar");
@ -80,10 +80,10 @@ public class CallsTest {
@Test public void unaryFutureCallFailed() throws Exception {
Integer req = 2;
ListenableFuture<String> future = Calls.unaryFutureCall(call, req);
ArgumentCaptor<Call.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
ListenableFuture<String> future = ClientCalls.unaryFutureCall(call, req);
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
verify(call).start(listenerCaptor.capture(), any(Metadata.Headers.class));
Call.Listener<String> listener = listenerCaptor.getValue();
ClientCall.Listener<String> listener = listenerCaptor.getValue();
listener.onClose(Status.INVALID_ARGUMENT, new Metadata.Trailers());
try {
future.get();
@ -96,10 +96,10 @@ public class CallsTest {
@Test public void unaryFutureCallCancelled() throws Exception {
Integer req = 2;
ListenableFuture<String> future = Calls.unaryFutureCall(call, req);
ArgumentCaptor<Call.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
ListenableFuture<String> future = ClientCalls.unaryFutureCall(call, req);
ArgumentCaptor<ClientCall.Listener<String>> listenerCaptor = ArgumentCaptor.forClass(null);
verify(call).start(listenerCaptor.capture(), any(Metadata.Headers.class));
Call.Listener<String> listener = listenerCaptor.getValue();
ClientCall.Listener<String> listener = listenerCaptor.getValue();
future.cancel(true);
verify(call).cancel();
listener.onPayload("bar");