diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index 0460f0c5e3..2c0321b27a 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -41,7 +41,6 @@ import io.grpc.Context.CancellationListener; import io.grpc.Deadline; import io.grpc.DecompressorRegistry; import io.grpc.InternalDecompressorRegistry; -import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -84,12 +83,11 @@ final class ClientCallImpl extends ClientCall { private final Context context; private final boolean unaryRequest; private final CallOptions callOptions; - private final boolean retryEnabled; private ClientStream stream; private volatile boolean cancelListenersShouldBeRemoved; private boolean cancelCalled; private boolean halfCloseCalled; - private final ClientTransportProvider clientTransportProvider; + private final ClientStreamProvider clientStreamProvider; private ContextCancellationListener cancellationListener; private final ScheduledExecutorService deadlineCancellationExecutor; private boolean fullStreamDecompression; @@ -101,10 +99,9 @@ final class ClientCallImpl extends ClientCall { ClientCallImpl( MethodDescriptor method, Executor executor, CallOptions callOptions, - ClientTransportProvider clientTransportProvider, + ClientStreamProvider clientStreamProvider, ScheduledExecutorService deadlineCancellationExecutor, - CallTracer channelCallsTracer, - boolean retryEnabled) { + CallTracer channelCallsTracer) { this.method = method; // TODO(carl-mastrangelo): consider moving this construction to ManagedChannelImpl. this.tag = PerfMark.createTag(method.getFullMethodName(), System.identityHashCode(this)); @@ -124,9 +121,8 @@ final class ClientCallImpl extends ClientCall { this.unaryRequest = method.getType() == MethodType.UNARY || method.getType() == MethodType.SERVER_STREAMING; this.callOptions = callOptions; - this.clientTransportProvider = clientTransportProvider; + this.clientStreamProvider = clientStreamProvider; this.deadlineCancellationExecutor = deadlineCancellationExecutor; - this.retryEnabled = retryEnabled; PerfMark.event("ClientCall.", tag); } @@ -149,23 +145,14 @@ final class ClientCallImpl extends ClientCall { } /** - * Provider of {@link ClientTransport}s. + * Provider of {@link ClientStream}s. */ - // TODO(zdapeng): replace the two APIs with a single API: newStream() - interface ClientTransportProvider { - /** - * Returns a transport for a new call. - * - * @param args object containing call arguments. - */ - ClientTransport get(PickSubchannelArgs args); - - ClientStream newRetriableStream( - MethodDescriptor method, + interface ClientStreamProvider { + ClientStream newStream( + MethodDescriptor method, CallOptions callOptions, Metadata headers, Context context); - } ClientCallImpl setFullStreamDecompression(boolean fullStreamDecompression) { @@ -252,18 +239,7 @@ final class ClientCallImpl extends ClientCall { if (!deadlineExceeded) { logIfContextNarrowedTimeout( effectiveDeadline, context.getDeadline(), callOptions.getDeadline()); - if (retryEnabled) { - stream = clientTransportProvider.newRetriableStream(method, callOptions, headers, context); - } else { - ClientTransport transport = clientTransportProvider.get( - new PickSubchannelArgsImpl(method, headers, callOptions)); - Context origContext = context.attach(); - try { - stream = transport.newStream(method, headers, callOptions); - } finally { - context.detach(origContext); - } - } + stream = clientStreamProvider.newStream(method, callOptions, headers, context); } else { stream = new FailingClientStream( DEADLINE_EXCEEDED.withDescription( diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 66a1497711..015b82917d 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -72,7 +72,7 @@ import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.AutoConfiguredLoadBalancerFactory.AutoConfiguredLoadBalancer; -import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientCallImpl.ClientStreamProvider; import io.grpc.internal.RetriableStream.ChannelBufferMeter; import io.grpc.internal.RetriableStream.Throttle; import java.net.URI; @@ -454,9 +454,8 @@ final class ManagedChannelImpl extends ManagedChannel implements } } - private final class ChannelTransportProvider implements ClientTransportProvider { - @Override - public ClientTransport get(PickSubchannelArgs args) { + private final class ChannelStreamProvider implements ClientStreamProvider { + private ClientTransport getTransport(PickSubchannelArgs args) { SubchannelPicker pickerCopy = subchannelPicker; if (shutdown.get()) { // If channel is shut down, delayedTransport is also shut down which will fail the stream @@ -494,57 +493,68 @@ final class ManagedChannelImpl extends ManagedChannel implements } @Override - public ClientStream newRetriableStream( - final MethodDescriptor method, + public ClientStream newStream( + final MethodDescriptor method, final CallOptions callOptions, final Metadata headers, final Context context) { - checkState(retryEnabled, "retry should be enabled"); - final Throttle throttle = lastServiceConfig.getRetryThrottling(); - final class RetryStream extends RetriableStream { - RetryStream() { - super( - method, - headers, - channelBufferUsed, - perRpcBufferLimit, - channelBufferLimit, - getCallExecutor(callOptions), - transportFactory.getScheduledExecutorService(), - callOptions.getOption(RETRY_POLICY_KEY), - callOptions.getOption(HEDGING_POLICY_KEY), - throttle); + if (!retryEnabled) { + ClientTransport transport = + getTransport(new PickSubchannelArgsImpl(method, headers, callOptions)); + Context origContext = context.attach(); + try { + return transport.newStream(method, headers, callOptions); + } finally { + context.detach(origContext); } + } else { + final Throttle throttle = lastServiceConfig.getRetryThrottling(); + final class RetryStream extends RetriableStream { + @SuppressWarnings("unchecked") + RetryStream() { + super( + (MethodDescriptor) method, + headers, + channelBufferUsed, + perRpcBufferLimit, + channelBufferLimit, + getCallExecutor(callOptions), + transportFactory.getScheduledExecutorService(), + callOptions.getOption(RETRY_POLICY_KEY), + callOptions.getOption(HEDGING_POLICY_KEY), + throttle); + } - @Override - Status prestart() { - return uncommittedRetriableStreamsRegistry.add(this); - } + @Override + Status prestart() { + return uncommittedRetriableStreamsRegistry.add(this); + } - @Override - void postCommit() { - uncommittedRetriableStreamsRegistry.remove(this); - } + @Override + void postCommit() { + uncommittedRetriableStreamsRegistry.remove(this); + } - @Override - ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) { - CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory); - ClientTransport transport = - get(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); - Context origContext = context.attach(); - try { - return transport.newStream(method, newHeaders, newOptions); - } finally { - context.detach(origContext); + @Override + ClientStream newSubstream(ClientStreamTracer.Factory tracerFactory, Metadata newHeaders) { + CallOptions newOptions = callOptions.withStreamTracerFactory(tracerFactory); + ClientTransport transport = + getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions)); + Context origContext = context.attach(); + try { + return transport.newStream(method, newHeaders, newOptions); + } finally { + context.detach(origContext); + } } } - } - return new RetryStream(); + return new RetryStream<>(); + } } } - private final ClientTransportProvider transportProvider = new ChannelTransportProvider(); + private final ClientStreamProvider transportProvider = new ChannelStreamProvider(); private final Rescheduler idleTimer; @@ -887,8 +897,7 @@ final class ManagedChannelImpl extends ManagedChannel implements callOptions, transportProvider, terminated ? null : transportFactory.getScheduledExecutorService(), - channelCallTracer, - retryEnabled) + channelCallTracer) .setFullStreamDecompression(fullStreamDecompression) .setDecompressorRegistry(decompressorRegistry) .setCompressorRegistry(compressorRegistry); diff --git a/core/src/main/java/io/grpc/internal/OobChannel.java b/core/src/main/java/io/grpc/internal/OobChannel.java index aae3314f3f..c746336d58 100644 --- a/core/src/main/java/io/grpc/internal/OobChannel.java +++ b/core/src/main/java/io/grpc/internal/OobChannel.java @@ -46,7 +46,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.SynchronizationContext; -import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientCallImpl.ClientStreamProvider; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -82,19 +82,19 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented method, + CallOptions callOptions, Metadata headers, Context context) { + Context origContext = context.attach(); // delayed transport's newStream() always acquires a lock, but concurrent performance doesn't // matter here because OOB communication should be sparse, and it's not on application RPC's // critical path. - return delayedTransport; - } - - @Override - public ClientStream newRetriableStream(MethodDescriptor method, - CallOptions callOptions, Metadata headers, Context context) { - throw new UnsupportedOperationException("OobChannel should not create retriable streams"); + try { + return delayedTransport.newStream(method, headers, callOptions); + } finally { + context.detach(origContext); + } } }; @@ -202,8 +202,7 @@ final class OobChannel extends ManagedChannel implements InternalInstrumented methodDescriptor, CallOptions callOptions) { return new ClientCallImpl<>(methodDescriptor, callOptions.getExecutor() == null ? executor : callOptions.getExecutor(), - callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer, - false /* retryEnabled */); + callOptions, transportProvider, deadlineCancellationExecutor, channelCallsTracer); } @Override diff --git a/core/src/main/java/io/grpc/internal/SubchannelChannel.java b/core/src/main/java/io/grpc/internal/SubchannelChannel.java index 9d5bfcec7d..d55c403ef0 100644 --- a/core/src/main/java/io/grpc/internal/SubchannelChannel.java +++ b/core/src/main/java/io/grpc/internal/SubchannelChannel.java @@ -23,13 +23,11 @@ import io.grpc.CallOptions; import io.grpc.Channel; import io.grpc.ClientCall; import io.grpc.Context; -import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; -import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientCallImpl.ClientStreamProvider; import io.grpc.internal.ClientStreamListener.RpcProgress; -import io.grpc.internal.GrpcUtil; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -48,21 +46,20 @@ final class SubchannelChannel extends Channel { private final ScheduledExecutorService deadlineCancellationExecutor; private final CallTracer callsTracer; - private final ClientTransportProvider transportProvider = new ClientTransportProvider() { + private final ClientStreamProvider transportProvider = new ClientStreamProvider() { @Override - public ClientTransport get(PickSubchannelArgs args) { + public ClientStream newStream(MethodDescriptor method, + CallOptions callOptions, Metadata headers, Context context) { ClientTransport transport = subchannel.getTransport(); if (transport == null) { - return notReadyTransport; - } else { - return transport; + transport = notReadyTransport; + } + Context origContext = context.attach(); + try { + return transport.newStream(method, headers, callOptions); + } finally { + context.detach(origContext); } - } - - @Override - public ClientStream newRetriableStream(MethodDescriptor method, - CallOptions callOptions, Metadata headers, Context context) { - throw new UnsupportedOperationException("OobChannel should not create retriable streams"); } }; @@ -109,7 +106,7 @@ final class SubchannelChannel extends Channel { return new ClientCallImpl<>(methodDescriptor, effectiveExecutor, callOptions.withOption(GrpcUtil.CALL_OPTIONS_RPC_OWNED_BY_BALANCER, Boolean.TRUE), - transportProvider, deadlineCancellationExecutor, callsTracer, false /* retryEnabled */); + transportProvider, deadlineCancellationExecutor, callsTracer); } @Override diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index cf74c2be46..268264f848 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -56,7 +56,7 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.internal.ClientCallImpl.ClientTransportProvider; +import io.grpc.internal.ClientCallImpl.ClientStreamProvider; import io.grpc.internal.testing.SingleMessageProducer; import io.grpc.testing.TestMethodDescriptors; import java.io.ByteArrayInputStream; @@ -108,18 +108,13 @@ public class ClientCallImplTest { @Rule public final MockitoRule mocks = MockitoJUnit.rule(); - @Mock private ClientStreamListener streamListener; - @Mock private ClientTransport clientTransport; @Captor private ArgumentCaptor statusCaptor; @Mock private ClientStreamTracer.Factory streamTracerFactory; @Mock - private ClientTransport transport; - - @Mock - private ClientTransportProvider provider; + private ClientStreamProvider clientStreamProvider; @Mock private ClientStream stream; @@ -140,9 +135,11 @@ public class ClientCallImplTest { @Before public void setUp() { - when(provider.get(any(PickSubchannelArgsImpl.class))).thenReturn(transport); - when(transport.newStream( - any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class))) + when(clientStreamProvider.newStream( + (MethodDescriptor) any(MethodDescriptor.class), + any(CallOptions.class), + any(Metadata.class), + any(Context.class))) .thenReturn(stream); doAnswer(new Answer() { @Override @@ -167,10 +164,9 @@ public class ClientCallImplTest { method, executor, baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -189,10 +185,9 @@ public class ClientCallImplTest { method, executor, baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -227,10 +222,9 @@ public class ClientCallImplTest { method, executor, baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -272,10 +266,9 @@ public class ClientCallImplTest { method, executor, baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); callListener = new NoopClientCall.NoopClientCallListener() { private final RuntimeException failure = new RuntimeException("bad"); @@ -308,10 +301,9 @@ public class ClientCallImplTest { method.toBuilder().setType(MethodType.UNKNOWN).build(), executor, baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); verify(stream).start(listenerArgumentCaptor.capture()); final ClientStreamListener streamListener = listenerArgumentCaptor.getValue(); @@ -343,16 +335,16 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); - verify(transport).newStream(eq(method), metadataCaptor.capture(), same(baseCallOptions)); + verify(clientStreamProvider) + .newStream(eq(method), same(baseCallOptions), metadataCaptor.capture(), any(Context.class)); Metadata actual = metadataCaptor.getValue(); // there should only be one. @@ -367,10 +359,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions.withAuthority("overridden-authority"), - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -384,16 +375,16 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), callOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); final Metadata metadata = new Metadata(); call.start(callListener, metadata); - verify(transport).newStream(same(method), same(metadata), same(callOptions)); + verify(clientStreamProvider) + .newStream(same(method), same(callOptions), same(metadata), any(Context.class)); } @Test @@ -403,10 +394,9 @@ public class ClientCallImplTest { MoreExecutors.directExecutor(), // Don't provide an authority baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -574,10 +564,9 @@ public class ClientCallImplTest { method.toBuilder().setType(MethodType.UNKNOWN).build(), new SerializingExecutor(Executors.newSingleThreadExecutor()), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); context.detach(previous); @@ -652,10 +641,9 @@ public class ClientCallImplTest { method, new SerializingExecutor(Executors.newSingleThreadExecutor()), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); cancellableContext.detach(previous); @@ -682,10 +670,9 @@ public class ClientCallImplTest { method, new SerializingExecutor(Executors.newSingleThreadExecutor()), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); cancellableContext.detach(previous); @@ -709,7 +696,7 @@ public class ClientCallImplTest { call.halfClose(); // Stream should never be created. - verifyZeroInteractions(transport); + verifyZeroInteractions(clientStreamProvider); try { call.sendMessage(null); @@ -727,19 +714,22 @@ public class ClientCallImplTest { method, new SerializingExecutor(Executors.newSingleThreadExecutor()), callOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); - verify(transport, times(0)) - .newStream(any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class)); + verify(clientStreamProvider, never()) + .newStream( + (MethodDescriptor) any(MethodDescriptor.class), + any(CallOptions.class), + any(Metadata.class), + any(Context.class)); verify(callListener, timeout(1000)).onClose(statusCaptor.capture(), any(Metadata.class)); assertEquals(Status.Code.DEADLINE_EXCEEDED, statusCaptor.getValue().getCode()); assertThat(statusCaptor.getValue().getDescription()) .startsWith("ClientCall started after deadline exceeded"); - verifyZeroInteractions(provider); + verifyZeroInteractions(clientStreamProvider); } @Test @@ -752,10 +742,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); context.detach(origContext); @@ -777,10 +766,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), callOpts, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); context.detach(origContext); @@ -802,10 +790,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), callOpts, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); context.detach(origContext); @@ -823,10 +810,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), callOpts, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); ArgumentCaptor deadlineCaptor = ArgumentCaptor.forClass(Deadline.class); @@ -841,10 +827,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); verify(stream, never()).setDeadline(any(Deadline.class)); @@ -860,10 +845,9 @@ public class ClientCallImplTest { MoreExecutors.directExecutor(), baseCallOptions.withDeadline( Deadline.after(1, TimeUnit.SECONDS, fakeClock.getDeadlineTicker())), - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); @@ -899,10 +883,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); context.detach(origContext); @@ -930,10 +913,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions.withDeadline(Deadline.after(1, TimeUnit.SECONDS)), - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); call.start(callListener, new Metadata()); call.cancel("canceled", null); @@ -955,10 +937,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); Metadata headers = new Metadata(); @@ -973,10 +954,9 @@ public class ClientCallImplTest { method, MoreExecutors.directExecutor(), baseCallOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false); + channelCallTracer); final Exception cause = new Exception(); ClientCall.Listener callListener = new ClientCall.Listener() { @@ -1011,10 +991,9 @@ public class ClientCallImplTest { method, new SerializingExecutor(Executors.newSingleThreadExecutor()), callOptions, - provider, + clientStreamProvider, deadlineCancellationExecutor, - channelCallTracer, - /* retryEnabled= */ false) + channelCallTracer) .setDecompressorRegistry(decompressorRegistry); call.start(callListener, new Metadata()); @@ -1026,8 +1005,8 @@ public class ClientCallImplTest { @Test public void getAttributes() { ClientCallImpl call = new ClientCallImpl<>( - method, MoreExecutors.directExecutor(), baseCallOptions, provider, - deadlineCancellationExecutor, channelCallTracer, /* retryEnabled= */ false); + method, MoreExecutors.directExecutor(), baseCallOptions, clientStreamProvider, + deadlineCancellationExecutor, channelCallTracer); Attributes attrs = Attributes.newBuilder().set(Key.create("fake key"), "fake value").build(); when(stream.getAttributes()).thenReturn(attrs);