stub: add Blocking StubType to blocking ClientCalls methods. (#6900)

This commit is contained in:
Ran 2020-04-06 10:55:24 -07:00 committed by GitHub
parent 24e3d9587e
commit 37913fd3b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 2 deletions

View File

@ -125,7 +125,9 @@ public final class ClientCalls {
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
boolean interrupt = false;
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
ClientCall<ReqT, RespT> call = channel.newCall(method,
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
try {
ListenableFuture<RespT> responseFuture = futureUnaryCall(call, req);
while (!responseFuture.isDone()) {
@ -177,7 +179,9 @@ public final class ClientCalls {
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Channel channel, MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, ReqT req) {
ThreadlessExecutor executor = new ThreadlessExecutor();
ClientCall<ReqT, RespT> call = channel.newCall(method, callOptions.withExecutor(executor));
ClientCall<ReqT, RespT> call = channel.newCall(method,
callOptions.withOption(ClientCalls.STUB_TYPE_OPTION, StubType.BLOCKING)
.withExecutor(executor));
BlockingResponseStream<RespT> result = new BlockingResponseStream<>(call, executor);
asyncUnaryRequestCall(call, req, result.listener(), true);
return result;

View File

@ -23,6 +23,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@ -43,6 +46,7 @@ import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.NoopClientCall;
import io.grpc.stub.ClientCalls.StubType;
import io.grpc.stub.ServerCalls.NoopStreamObserver;
import io.grpc.stub.ServerCalls.ServerStreamingMethod;
import io.grpc.stub.ServerCalls.UnaryMethod;
@ -62,6 +66,10 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
@ -83,6 +91,12 @@ public class ClientCallsTest {
private Server server;
private ManagedChannel channel;
@Mock
private ManagedChannel mockChannel;
@Captor
private ArgumentCaptor<MethodDescriptor<?, ?>> methodDescriptorCaptor;
@Captor
private ArgumentCaptor<CallOptions> callOptionsCaptor;
@Before
public void setUp() {
@ -203,6 +217,50 @@ public class ClientCallsTest {
assertTrue("context not cancelled", methodImpl.observer.isCancelled());
}
@Test
public void blockingUnaryCall_HasBlockingStubType() {
NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
@Override
public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) {
listener.onMessage(1);
listener.onClose(Status.OK, new Metadata());
}
};
when(mockChannel.newCall(
ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class)))
.thenReturn(call);
Integer unused =
ClientCalls.blockingUnaryCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1);
verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture());
CallOptions capturedCallOption = callOptionsCaptor.getValue();
assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION))
.isEquivalentAccordingToCompareTo(StubType.BLOCKING);
}
@Test
public void blockingServerStreamingCall_HasBlockingStubType() {
NoopClientCall<Integer, Integer> call = new NoopClientCall<Integer, Integer>() {
@Override
public void start(io.grpc.ClientCall.Listener<Integer> listener, Metadata headers) {
listener.onMessage(1);
listener.onClose(Status.OK, new Metadata());
}
};
when(mockChannel.newCall(
ArgumentMatchers.<MethodDescriptor<Integer, Integer>>any(), any(CallOptions.class)))
.thenReturn(call);
Iterator<Integer> unused =
ClientCalls.blockingServerStreamingCall(mockChannel, UNARY_METHOD, CallOptions.DEFAULT, 1);
verify(mockChannel).newCall(methodDescriptorCaptor.capture(), callOptionsCaptor.capture());
CallOptions capturedCallOption = callOptionsCaptor.getValue();
assertThat(capturedCallOption.getOption(ClientCalls.STUB_TYPE_OPTION))
.isEquivalentAccordingToCompareTo(StubType.BLOCKING);
}
@Test
public void unaryFutureCallSuccess() throws Exception {
final AtomicReference<ClientCall.Listener<String>> listener =