core: refactor ClientTransportProvider to ClientStreamProvider

Replace the two APIs of ClientCallImpl.ClientTransportProvider with a single API: newStream().
This commit is contained in:
ZHANG Dapeng 2020-07-24 09:40:53 -07:00 committed by GitHub
parent 8a792c3367
commit 6ec9387cbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 154 additions and 194 deletions

View File

@ -41,7 +41,6 @@ import io.grpc.Context.CancellationListener;
import io.grpc.Deadline; import io.grpc.Deadline;
import io.grpc.DecompressorRegistry; import io.grpc.DecompressorRegistry;
import io.grpc.InternalDecompressorRegistry; import io.grpc.InternalDecompressorRegistry;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
@ -84,12 +83,11 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
private final Context context; private final Context context;
private final boolean unaryRequest; private final boolean unaryRequest;
private final CallOptions callOptions; private final CallOptions callOptions;
private final boolean retryEnabled;
private ClientStream stream; private ClientStream stream;
private volatile boolean cancelListenersShouldBeRemoved; private volatile boolean cancelListenersShouldBeRemoved;
private boolean cancelCalled; private boolean cancelCalled;
private boolean halfCloseCalled; private boolean halfCloseCalled;
private final ClientTransportProvider clientTransportProvider; private final ClientStreamProvider clientStreamProvider;
private ContextCancellationListener cancellationListener; private ContextCancellationListener cancellationListener;
private final ScheduledExecutorService deadlineCancellationExecutor; private final ScheduledExecutorService deadlineCancellationExecutor;
private boolean fullStreamDecompression; private boolean fullStreamDecompression;
@ -101,10 +99,9 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
ClientCallImpl( ClientCallImpl(
MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions, MethodDescriptor<ReqT, RespT> method, Executor executor, CallOptions callOptions,
ClientTransportProvider clientTransportProvider, ClientStreamProvider clientStreamProvider,
ScheduledExecutorService deadlineCancellationExecutor, ScheduledExecutorService deadlineCancellationExecutor,
CallTracer channelCallsTracer, CallTracer channelCallsTracer) {
boolean retryEnabled) {
this.method = method; this.method = method;
// TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl.
this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this)); this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this));
@ -124,9 +121,8 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
this.unaryRequest = method.getType() == MethodType.UNARY this.unaryRequest = method.getType() == MethodType.UNARY
|| method.getType() == MethodType.SERVER_STREAMING; || method.getType() == MethodType.SERVER_STREAMING;
this.callOptions = callOptions; this.callOptions = callOptions;
this.clientTransportProvider = clientTransportProvider; this.clientStreamProvider = clientStreamProvider;
this.deadlineCancellationExecutor = deadlineCancellationExecutor; this.deadlineCancellationExecutor = deadlineCancellationExecutor;
this.retryEnabled = retryEnabled;
PerfMark.event("ClientCall.<init>", tag); PerfMark.event("ClientCall.<init>", tag);
} }
@ -149,23 +145,14 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
} }
/** /**
* Provider of {@link ClientTransport}s. * Provider of {@link ClientStream}s.
*/ */
// TODO(zdapeng): replace the two APIs with a single API: newStream() interface ClientStreamProvider {
interface ClientTransportProvider { ClientStream newStream(
/** MethodDescriptor<?, ?> method,
* Returns a transport for a new call.
*
* @param args object containing call arguments.
*/
ClientTransport get(PickSubchannelArgs args);
<ReqT> ClientStream newRetriableStream(
MethodDescriptor<ReqT, ?> method,
CallOptions callOptions, CallOptions callOptions,
Metadata headers, Metadata headers,
Context context); Context context);
} }
ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) { ClientCallImpl<ReqT, RespT> setFullStreamDecompression(boolean fullStreamDecompression) {
@ -252,18 +239,7 @@ final class ClientCallImpl<ReqT, RespT> extends ClientCall<ReqT, RespT> {
if (!deadlineExceeded) { if (!deadlineExceeded) {
logIfContextNarrowedTimeout( logIfContextNarrowedTimeout(
effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); effectiveDeadline, context.getDeadline(), callOptions.getDeadline());
if (retryEnabled) { stream = clientStreamProvider.newStream(method, callOptions, headers, context);
stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context);
} else {
ClientTransport transport = clientTransportProvider.get(
new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
try {
stream = transport.newStream(method, headers, callOptions);
} finally {
context.detach(origContext);
}
}
} else { } else {
stream = new FailingClientStream( stream = new FailingClientStream(
DEADLINE_EXCEEDED.withDescription( DEADLINE_EXCEEDED.withDescription(

View File

@ -72,7 +72,7 @@ import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
import io.grpc.internal.RetriableStream.ChannelBufferMeter; import io.grpc.internal.RetriableStream.ChannelBufferMeter;
import io.grpc.internal.RetriableStream.Throttle; import io.grpc.internal.RetriableStream.Throttle;
import java.net.URI; import java.net.URI;
@ -454,9 +454,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
} }
} }
private final class ChannelTransportProvider implements ClientTransportProvider { private final class ChannelStreamProvider implements ClientStreamProvider {
@Override private ClientTransport getTransport(PickSubchannelArgs args) {
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker; SubchannelPicker pickerCopy = subchannelPicker;
if (shutdown.get()) { if (shutdown.get()) {
// If channel is shut down, delayedTransport is also shut down which will fail the stream // If channel is shut down, delayedTransport is also shut down which will fail the stream
@ -494,57 +493,68 @@ final class ManagedChannelImpl extends ManagedChannel implements
} }
@Override @Override
public <ReqT> ClientStream newRetriableStream( public ClientStream newStream(
final MethodDescriptor<ReqT, ?> method, final MethodDescriptor<?, ?> method,
final CallOptions callOptions, final CallOptions callOptions,
final Metadata headers, final Metadata headers,
final Context context) { final Context context) {
checkState(retryEnabled, "retry should be enabled"); if (!retryEnabled) {
final Throttle throttle = lastServiceConfig.getRetryThrottling(); ClientTransport transport =
final class RetryStream extends RetriableStream<ReqT> { getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
RetryStream() { Context origContext = context.attach();
super( try {
method, return transport.newStream(method, headers, callOptions);
headers, } finally {
channelBufferUsed, context.detach(origContext);
perRpcBufferLimit,
channelBufferLimit,
getCallExecutor(callOptions),
transportFactory.getScheduledExecutorService(),
callOptions.getOption(RETRY_POLICY_KEY),
callOptions.getOption(HEDGING_POLICY_KEY),
throttle);
} }
} else {
final Throttle throttle = lastServiceConfig.getRetryThrottling();
final class RetryStream<ReqT> extends RetriableStream<ReqT> {
@SuppressWarnings("unchecked")
RetryStream() {
super(
(MethodDescriptor<ReqT, ?>) method,
headers,
channelBufferUsed,
perRpcBufferLimit,
channelBufferLimit,
getCallExecutor(callOptions),
transportFactory.getScheduledExecutorService(),
callOptions.getOption(RETRY_POLICY_KEY),
callOptions.getOption(HEDGING_POLICY_KEY),
throttle);
}
@Override @Override
Status prestart() { Status prestart() {
return uncommittedRetriableStreamsRegistry.add(this); return uncommittedRetriableStreamsRegistry.add(this);
} }
@Override @Override
void postCommit() { void postCommit() {
uncommittedRetriableStreamsRegistry.remove(this); uncommittedRetriableStreamsRegistry.remove(this);
} }
@Override @Override
ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) { ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) {
CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory); CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory);
ClientTransport transport = ClientTransport transport =
get(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
Context origContext = context.attach(); Context origContext = context.attach();
try { try {
return transport.newStream(method, newHeaders, newOptions); return transport.newStream(method, newHeaders, newOptions);
} finally { } finally {
context.detach(origContext); context.detach(origContext);
}
} }
} }
}
return new RetryStream(); return new RetryStream<>();
}
} }
} }
private final ClientTransportProvider transportProvider = new ChannelTransportProvider(); private final ClientStreamProvider transportProvider = new ChannelStreamProvider();
private final Rescheduler idleTimer; private final Rescheduler idleTimer;
@ -887,8 +897,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
callOptions, callOptions,
transportProvider, transportProvider,
terminated ? null : transportFactory.getScheduledExecutorService(), terminated ? null : transportFactory.getScheduledExecutorService(),
channelCallTracer, channelCallTracer)
retryEnabled)
.setFullStreamDecompression(fullStreamDecompression) .setFullStreamDecompression(fullStreamDecompression)
.setDecompressorRegistry(decompressorRegistry) .setDecompressorRegistry(decompressorRegistry)
.setCompressorRegistry(compressorRegistry); .setCompressorRegistry(compressorRegistry);

View File

@ -46,7 +46,7 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -82,19 +82,19 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented<Ch
private final ChannelTracer channelTracer; private final ChannelTracer channelTracer;
private final TimeProvider timeProvider; private final TimeProvider timeProvider;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() { private final ClientStreamProvider transportProvider = new ClientStreamProvider() {
@Override @Override
public ClientTransport get(PickSubchannelArgs args) { public ClientStream newStream(MethodDescriptor<?, ?> method,
CallOptions callOptions, Metadata headers, Context context) {
Context origContext = context.attach();
// delayed transport's newStream() always acquires a lock, but concurrent performance doesn't // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't
// matter here because OOB communication should be sparse, and it's not on application RPC's // matter here because OOB communication should be sparse, and it's not on application RPC's
// critical path. // critical path.
return delayedTransport; try {
} return delayedTransport.newStream(method, headers, callOptions);
} finally {
@Override context.detach(origContext);
public <ReqT> ClientStream newRetriableStream(MethodDescriptor<ReqT, ?> method, }
CallOptions callOptions, Metadata headers, Context context) {
throw new UnsupportedOperationException("OobChannel should not create retriable streams");
} }
}; };
@ -202,8 +202,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented<Ch
MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) { MethodDescriptor<RequestT, ResponseT> methodDescriptor, CallOptions callOptions) {
return new ClientCallImpl<>(methodDescriptor, return new ClientCallImpl<>(methodDescriptor,
callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), callOptions.getExecutor() == null ? executor : callOptions.getExecutor(),
callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer, callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer);
false /* retryEnabled */);
} }
@Override @Override

View File

@ -23,13 +23,11 @@ import io.grpc.CallOptions;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ClientCall; import io.grpc.ClientCall;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata; import io.grpc.Metadata;
import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.GrpcUtil;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -48,21 +46,20 @@ final class SubchannelChannel extends Channel {
private final ScheduledExecutorService deadlineCancellationExecutor; private final ScheduledExecutorService deadlineCancellationExecutor;
private final CallTracer callsTracer; private final CallTracer callsTracer;
private final ClientTransportProvider transportProvider = new ClientTransportProvider() { private final ClientStreamProvider transportProvider = new ClientStreamProvider() {
@Override @Override
public ClientTransport get(PickSubchannelArgs args) { public ClientStream newStream(MethodDescriptor<?, ?> method,
CallOptions callOptions, Metadata headers, Context context) {
ClientTransport transport = subchannel.getTransport(); ClientTransport transport = subchannel.getTransport();
if (transport == null) { if (transport == null) {
return notReadyTransport; transport = notReadyTransport;
} else { }
return transport; Context origContext = context.attach();
try {
return transport.newStream(method, headers, callOptions);
} finally {
context.detach(origContext);
} }
}
@Override
public <ReqT> ClientStream newRetriableStream(MethodDescriptor<ReqT, ?> method,
CallOptions callOptions, Metadata headers, Context context) {
throw new UnsupportedOperationException("OobChannel should not create retriable streams");
} }
}; };
@ -109,7 +106,7 @@ final class SubchannelChannel extends Channel {
return new ClientCallImpl<>(methodDescriptor, return new ClientCallImpl<>(methodDescriptor,
effectiveExecutor, effectiveExecutor,
callOptions.withOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER, Boolean.TRUE), callOptions.withOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER, Boolean.TRUE),
transportProvider, deadlineCancellationExecutor, callsTracer, false /* retryEnabled */); transportProvider, deadlineCancellationExecutor, callsTracer);
} }
@Override @Override

View File

@ -56,7 +56,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.Status.Code; import io.grpc.Status.Code;
import io.grpc.internal.ClientCallImpl.ClientTransportProvider; import io.grpc.internal.ClientCallImpl.ClientStreamProvider;
import io.grpc.internal.testing.SingleMessageProducer; import io.grpc.internal.testing.SingleMessageProducer;
import io.grpc.testing.TestMethodDescriptors; import io.grpc.testing.TestMethodDescriptors;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -108,18 +108,13 @@ public class ClientCallImplTest {
@Rule @Rule
public final MockitoRule mocks = MockitoJUnit.rule(); public final MockitoRule mocks = MockitoJUnit.rule();
@Mock private ClientStreamListener streamListener;
@Mock private ClientTransport clientTransport;
@Captor private ArgumentCaptor<Status> statusCaptor; @Captor private ArgumentCaptor<Status> statusCaptor;
@Mock @Mock
private ClientStreamTracer.Factory streamTracerFactory; private ClientStreamTracer.Factory streamTracerFactory;
@Mock @Mock
private ClientTransport transport; private ClientStreamProvider clientStreamProvider;
@Mock
private ClientTransportProvider provider;
@Mock @Mock
private ClientStream stream; private ClientStream stream;
@ -140,9 +135,11 @@ public class ClientCallImplTest {
@Before @Before
public void setUp() { public void setUp() {
when(provider.get(any(PickSubchannelArgsImpl.class))).thenReturn(transport); when(clientStreamProvider.newStream(
when(transport.newStream( (MethodDescriptor<?, ?>) any(MethodDescriptor.class),
any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) any(CallOptions.class),
any(Metadata.class),
any(Context.class)))
.thenReturn(stream); .thenReturn(stream);
doAnswer(new Answer<Void>() { doAnswer(new Answer<Void>() {
@Override @Override
@ -167,10 +164,9 @@ public class ClientCallImplTest {
method, method,
executor, executor,
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -189,10 +185,9 @@ public class ClientCallImplTest {
method, method,
executor, executor,
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -227,10 +222,9 @@ public class ClientCallImplTest {
method, method,
executor, executor,
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -272,10 +266,9 @@ public class ClientCallImplTest {
method, method,
executor, executor,
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
callListener = new NoopClientCall.NoopClientCallListener<Void>() { callListener = new NoopClientCall.NoopClientCallListener<Void>() {
private final RuntimeException failure = new RuntimeException("bad"); private final RuntimeException failure = new RuntimeException("bad");
@ -308,10 +301,9 @@ public class ClientCallImplTest {
method.toBuilder().setType(MethodType.UNKNOWN).build(), method.toBuilder().setType(MethodType.UNKNOWN).build(),
executor, executor,
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream).start(listenerArgumentCaptor.capture()); verify(stream).start(listenerArgumentCaptor.capture());
final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue();
@ -343,16 +335,16 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(transport).newStream(eq(method), metadataCaptor.capture(), same(baseCallOptions)); verify(clientStreamProvider)
.newStream(eq(method), same(baseCallOptions), metadataCaptor.capture(), any(Context.class));
Metadata actual = metadataCaptor.getValue(); Metadata actual = metadataCaptor.getValue();
// there should only be one. // there should only be one.
@ -367,10 +359,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions.withAuthority("overridden-authority"), baseCallOptions.withAuthority("overridden-authority"),
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -384,16 +375,16 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
callOptions, callOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
final Metadata metadata = new Metadata(); final Metadata metadata = new Metadata();
call.start(callListener, metadata); call.start(callListener, metadata);
verify(transport).newStream(same(method), same(metadata), same(callOptions)); verify(clientStreamProvider)
.newStream(same(method), same(callOptions), same(metadata), any(Context.class));
} }
@Test @Test
@ -403,10 +394,9 @@ public class ClientCallImplTest {
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
// Don't provide an authority // Don't provide an authority
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -574,10 +564,9 @@ public class ClientCallImplTest {
method.toBuilder().setType(MethodType.UNKNOWN).build(), method.toBuilder().setType(MethodType.UNKNOWN).build(),
new SerializingExecutor(Executors.newSingleThreadExecutor()), new SerializingExecutor(Executors.newSingleThreadExecutor()),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
context.detach(previous); context.detach(previous);
@ -652,10 +641,9 @@ public class ClientCallImplTest {
method, method,
new SerializingExecutor(Executors.newSingleThreadExecutor()), new SerializingExecutor(Executors.newSingleThreadExecutor()),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous); cancellableContext.detach(previous);
@ -682,10 +670,9 @@ public class ClientCallImplTest {
method, method,
new SerializingExecutor(Executors.newSingleThreadExecutor()), new SerializingExecutor(Executors.newSingleThreadExecutor()),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
cancellableContext.detach(previous); cancellableContext.detach(previous);
@ -709,7 +696,7 @@ public class ClientCallImplTest {
call.halfClose(); call.halfClose();
// Stream should never be created. // Stream should never be created.
verifyZeroInteractions(transport); verifyZeroInteractions(clientStreamProvider);
try { try {
call.sendMessage(null); call.sendMessage(null);
@ -727,19 +714,22 @@ public class ClientCallImplTest {
method, method,
new SerializingExecutor(Executors.newSingleThreadExecutor()), new SerializingExecutor(Executors.newSingleThreadExecutor()),
callOptions, callOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(transport, times(0)) verify(clientStreamProvider, never())
.newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); .newStream(
(MethodDescriptor<?, ?>) any(MethodDescriptor.class),
any(CallOptions.class),
any(Metadata.class),
any(Context.class));
verify(callListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class)); verify(callListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class));
assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode());
assertThat(statusCaptor.getValue().getDescription()) assertThat(statusCaptor.getValue().getDescription())
.startsWith("ClientCall started after deadline exceeded"); .startsWith("ClientCall started after deadline exceeded");
verifyZeroInteractions(provider); verifyZeroInteractions(clientStreamProvider);
} }
@Test @Test
@ -752,10 +742,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
context.detach(origContext); context.detach(origContext);
@ -777,10 +766,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
callOpts, callOpts,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
context.detach(origContext); context.detach(origContext);
@ -802,10 +790,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
callOpts, callOpts,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
context.detach(origContext); context.detach(origContext);
@ -823,10 +810,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
callOpts, callOpts,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class); ArgumentCaptor<Deadline> deadlineCaptor = ArgumentCaptor.forClass(Deadline.class);
@ -841,10 +827,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
verify(stream, never()).setDeadline(any(Deadline.class)); verify(stream, never()).setDeadline(any(Deadline.class));
@ -860,10 +845,9 @@ public class ClientCallImplTest {
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions.withDeadline( baseCallOptions.withDeadline(
Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker())), Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker())),
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -899,10 +883,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
context.detach(origContext); context.detach(origContext);
@ -930,10 +913,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)),
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
call.cancel("canceled", null); call.cancel("canceled", null);
@ -955,10 +937,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
Metadata headers = new Metadata(); Metadata headers = new Metadata();
@ -973,10 +954,9 @@ public class ClientCallImplTest {
method, method,
MoreExecutors.directExecutor(), MoreExecutors.directExecutor(),
baseCallOptions, baseCallOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer);
/* retryEnabled= */ false);
final Exception cause = new Exception(); final Exception cause = new Exception();
ClientCall.Listener<Void> callListener = ClientCall.Listener<Void> callListener =
new ClientCall.Listener<Void>() { new ClientCall.Listener<Void>() {
@ -1011,10 +991,9 @@ public class ClientCallImplTest {
method, method,
new SerializingExecutor(Executors.newSingleThreadExecutor()), new SerializingExecutor(Executors.newSingleThreadExecutor()),
callOptions, callOptions,
provider, clientStreamProvider,
deadlineCancellationExecutor, deadlineCancellationExecutor,
channelCallTracer, channelCallTracer)
/* retryEnabled= */ false)
.setDecompressorRegistry(decompressorRegistry); .setDecompressorRegistry(decompressorRegistry);
call.start(callListener, new Metadata()); call.start(callListener, new Metadata());
@ -1026,8 +1005,8 @@ public class ClientCallImplTest {
@Test @Test
public void getAttributes() { public void getAttributes() {
ClientCallImpl<Void, Void> call = new ClientCallImpl<>( ClientCallImpl<Void, Void> call = new ClientCallImpl<>(
method, MoreExecutors.directExecutor(), baseCallOptions, provider, method, MoreExecutors.directExecutor(), baseCallOptions, clientStreamProvider,
deadlineCancellationExecutor, channelCallTracer, /* retryEnabled= */ false); deadlineCancellationExecutor, channelCallTracer);
Attributes attrs = Attributes attrs =
Attributes.newBuilder().set(Key.<String>create("fake key"), "fake value").build(); Attributes.newBuilder().set(Key.<String>create("fake key"), "fake value").build();
when(stream.getAttributes()).thenReturn(attrs); when(stream.getAttributes()).thenReturn(attrs);