From b88ea27b5380223a5ba8372500648222b7347f6c Mon Sep 17 00:00:00 2001 From: ZHANG Dapeng Date: Mon, 6 Jun 2016 19:43:15 -0700 Subject: [PATCH] core/internal: add 3-arg newStream method to ClientTransport interface (#1898) adding ClientStream newStream(MethodDescriptor method, Metadata headers, CallOptions callOptions); to ClientTransport interface Created this PR first because both fail fast implementation and another change will be using this interface change --- .../io/grpc/inprocess/InProcessTransport.java | 9 ++++- .../java/io/grpc/internal/ClientCallImpl.java | 2 +- .../io/grpc/internal/ClientTransport.java | 5 +++ .../grpc/internal/DelayedClientTransport.java | 22 ++++++++--- .../grpc/internal/FailingClientTransport.java | 9 ++++- .../io/grpc/internal/ClientCallImplTest.java | 23 ++++++++++- .../internal/DelayedClientTransportTest.java | 35 ++++++++++------- .../grpc/internal/ManagedChannelImplTest.java | 39 ++++++++++++------- ...anagedChannelImplTransportManagerTest.java | 13 +++++-- .../test/java/io/grpc/internal/TestUtils.java | 5 ++- .../io/grpc/internal/TransportSetTest.java | 4 +- .../io/grpc/netty/NettyClientTransport.java | 9 ++++- .../io/grpc/okhttp/OkHttpClientTransport.java | 10 ++++- 13 files changed, 136 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java index caf9327b73..e861024596 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessTransport.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessTransport.java @@ -34,6 +34,7 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.Compressor; import io.grpc.Decompressor; import io.grpc.Metadata; @@ -126,7 +127,7 @@ class InProcessTransport implements ServerTransport, ManagedClientTransport { @Override public synchronized ClientStream newStream( - final MethodDescriptor method, final Metadata headers) { + final MethodDescriptor method, final Metadata headers, final CallOptions callOptions) { if (shutdownStatus != null) { final Status capturedStatus = shutdownStatus; return new NoopClientStream() { @@ -140,6 +141,12 @@ class InProcessTransport implements ServerTransport, ManagedClientTransport { return new InProcessStream(method, headers).clientStream; } + @Override + public synchronized ClientStream newStream( + final MethodDescriptor method, final Metadata headers) { + return newStream(method, headers, CallOptions.DEFAULT); + } + @Override public synchronized void ping(final PingCallback callback, Executor executor) { if (terminated) { diff --git a/core/src/main/java/io/grpc/internal/ClientCallImpl.java b/core/src/main/java/io/grpc/internal/ClientCallImpl.java index d376a5e96c..f766b34a16 100644 --- a/core/src/main/java/io/grpc/internal/ClientCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ClientCallImpl.java @@ -205,7 +205,7 @@ final class ClientCallImpl extends ClientCall updateTimeoutHeaders(effectiveDeadline, callOptions.getDeadline(), parentContext.getDeadline(), headers); ClientTransport transport = clientTransportProvider.get(callOptions); - stream = transport.newStream(method, headers); + stream = transport.newStream(method, headers, callOptions); } else { stream = new FailingClientStream(DEADLINE_EXCEEDED); } diff --git a/core/src/main/java/io/grpc/internal/ClientTransport.java b/core/src/main/java/io/grpc/internal/ClientTransport.java index 353fc4e9f3..cd92b220d2 100644 --- a/core/src/main/java/io/grpc/internal/ClientTransport.java +++ b/core/src/main/java/io/grpc/internal/ClientTransport.java @@ -31,6 +31,7 @@ package io.grpc.internal; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -56,9 +57,13 @@ public interface ClientTransport { * * @param method the descriptor of the remote method to be called for this stream. * @param headers to send at the beginning of the call + * @param callOptions runtime options of the call * @return the newly created stream. */ // TODO(nmittler): Consider also throwing for stopping. + ClientStream newStream(MethodDescriptor method, Metadata headers, CallOptions callOptions); + + // TODO(zdapeng): Remove tow-argument version in favor of three-argument overload. ClientStream newStream(MethodDescriptor method, Metadata headers); /** diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 41c15ec169..cc2150276f 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -36,6 +36,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; @@ -85,25 +86,31 @@ class DelayedClientTransport implements ManagedClientTransport { } @Override - public ClientStream newStream(MethodDescriptor method, Metadata headers) { + public ClientStream newStream(MethodDescriptor method, Metadata headers, CallOptions + callOptions) { Supplier supplier = transportSupplier; if (supplier == null) { synchronized (lock) { // Check again, since it may have changed while waiting for lock supplier = transportSupplier; if (supplier == null && !shutdown) { - PendingStream pendingStream = new PendingStream(method, headers); + PendingStream pendingStream = new PendingStream(method, headers, callOptions); pendingStreams.add(pendingStream); return pendingStream; } } } if (supplier != null) { - return supplier.get().newStream(method, headers); + return supplier.get().newStream(method, headers, callOptions); } return new FailingClientStream(Status.UNAVAILABLE.withDescription("transport shutdown")); } + @Override + public ClientStream newStream(MethodDescriptor method, Metadata headers) { + return newStream(method, headers, CallOptions.DEFAULT); + } + @Override public void ping(final PingCallback callback, Executor executor) { Supplier supplier = transportSupplier; @@ -133,7 +140,7 @@ class DelayedClientTransport implements ManagedClientTransport { /** * Prevents creating any new streams until {@link #setTransport} is called. Buffered streams are * not failed, so if {@link #shutdown} is called when {@link #setTransport} has not been called, - * you still need to call {@link setTransport} to make this transport terminated. + * you still need to call {@link #setTransport} to make this transport terminated. */ @Override public void shutdown() { @@ -257,14 +264,17 @@ class DelayedClientTransport implements ManagedClientTransport { private class PendingStream extends DelayedStream { private final MethodDescriptor method; private final Metadata headers; + private final CallOptions callOptions; - private PendingStream(MethodDescriptor method, Metadata headers) { + private PendingStream(MethodDescriptor method, Metadata headers, CallOptions + callOptions) { this.method = method; this.headers = headers; + this.callOptions = callOptions; } private void createRealStream(ClientTransport transport) { - setStream(transport.newStream(method, headers)); + setStream(transport.newStream(method, headers, callOptions)); } @Override diff --git a/core/src/main/java/io/grpc/internal/FailingClientTransport.java b/core/src/main/java/io/grpc/internal/FailingClientTransport.java index e4389caf4c..874a97880b 100644 --- a/core/src/main/java/io/grpc/internal/FailingClientTransport.java +++ b/core/src/main/java/io/grpc/internal/FailingClientTransport.java @@ -34,6 +34,7 @@ package io.grpc.internal; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; @@ -54,10 +55,16 @@ class FailingClientTransport implements ClientTransport { } @Override - public ClientStream newStream(MethodDescriptor method, Metadata headers) { + public ClientStream newStream(MethodDescriptor method, Metadata headers, CallOptions + callOptions) { return new FailingClientStream(error); } + @Override + public ClientStream newStream(MethodDescriptor method, Metadata headers) { + return newStream(method, headers, CallOptions.DEFAULT); + } + @Override public void ping(final PingCallback callback, Executor executor) { executor.execute(new Runnable() { diff --git a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java index 5bac1ffa81..656cdfe3d0 100644 --- a/core/src/test/java/io/grpc/internal/ClientCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ClientCallImplTest.java @@ -42,6 +42,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; import static org.mockito.Mockito.never; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.times; @@ -135,7 +136,8 @@ public class ClientCallImplTest { public void setUp() { MockitoAnnotations.initMocks(this); when(provider.get(any(CallOptions.class))).thenReturn(transport); - when(transport.newStream(any(MethodDescriptor.class), any(Metadata.class))).thenReturn(stream); + when(transport.newStream(any(MethodDescriptor.class), any(Metadata.class), + any(CallOptions.class))).thenReturn(stream); } @After @@ -156,7 +158,7 @@ public class ClientCallImplTest { call.start(callListener, new Metadata()); ArgumentCaptor metadataCaptor = ArgumentCaptor.forClass(Metadata.class); - verify(transport).newStream(eq(method), metadataCaptor.capture()); + verify(transport).newStream(eq(method), metadataCaptor.capture(), same(CallOptions.DEFAULT)); Metadata actual = metadataCaptor.getValue(); Set acceptedEncodings = @@ -178,6 +180,23 @@ public class ClientCallImplTest { verify(stream).setAuthority("overridden-authority"); } + @Test + public void callOptionsPropagatedToTransport() { + final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value"); + final ClientCallImpl call = new ClientCallImpl( + method, + MoreExecutors.directExecutor(), + callOptions, + provider, + deadlineCancellationExecutor) + .setDecompressorRegistry(decompressorRegistry); + final Metadata metadata = new Metadata(); + + call.start(callListener, metadata); + + verify(transport).newStream(same(method), same(metadata), same(callOptions)); + } + @Test public void authorityNotPropagatedToStream() { ClientCallImpl call = new ClientCallImpl( diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index 8a5577f2fa..5a43bdc268 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -43,12 +43,12 @@ import static org.mockito.Mockito.when; import com.google.common.base.Supplier; +import io.grpc.CallOptions; import io.grpc.IntegerMarshaller; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.StringMarshaller; -import io.grpc.internal.ClientTransport; import org.junit.After; import org.junit.Before; @@ -90,14 +90,19 @@ public class DelayedClientTransportTest { private final Metadata headers = new Metadata(); private final Metadata headers2 = new Metadata(); + private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value"); + private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2"); + private final FakeClock fakeExecutor = new FakeClock(); private final DelayedClientTransport delayedTransport = new DelayedClientTransport( fakeExecutor.scheduledExecutorService); @Before public void setUp() { MockitoAnnotations.initMocks(this); - when(mockRealTransport.newStream(same(method), same(headers))).thenReturn(mockRealStream); - when(mockRealTransport2.newStream(same(method2), same(headers2))).thenReturn(mockRealStream2); + when(mockRealTransport.newStream(same(method), same(headers), same(callOptions))) + .thenReturn(mockRealStream); + when(mockRealTransport2.newStream(same(method2), same(headers2), same(callOptions2))) + .thenReturn(mockRealStream2); delayedTransport.start(transportListener); } @@ -106,8 +111,8 @@ public class DelayedClientTransportTest { } @Test public void transportsAreUsedInOrder() { - delayedTransport.newStream(method, headers); - delayedTransport.newStream(method2, headers2); + delayedTransport.newStream(method, headers, callOptions); + delayedTransport.newStream(method2, headers2, callOptions2); assertEquals(0, fakeExecutor.numPendingTasks()); delayedTransport.setTransportSupplier(new Supplier() { final Iterator it = @@ -118,13 +123,13 @@ public class DelayedClientTransportTest { } }); assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockRealTransport).newStream(same(method), same(headers)); - verify(mockRealTransport2).newStream(same(method2), same(headers2)); + verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); + verify(mockRealTransport2).newStream(same(method2), same(headers2), same(callOptions2)); } @Test public void streamStartThenSetTransport() { assertFalse(delayedTransport.hasPendingStreams()); - ClientStream stream = delayedTransport.newStream(method, headers); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions); stream.start(streamListener); assertEquals(1, delayedTransport.getPendingStreamsCount()); assertTrue(delayedTransport.hasPendingStreams()); @@ -134,12 +139,12 @@ public class DelayedClientTransportTest { assertEquals(0, delayedTransport.getPendingStreamsCount()); assertFalse(delayedTransport.hasPendingStreams()); assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockRealTransport).newStream(same(method), same(headers)); + verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); verify(mockRealStream).start(same(streamListener)); } @Test public void newStreamThenSetTransportThenShutdown() { - ClientStream stream = delayedTransport.newStream(method, headers); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions); assertEquals(1, delayedTransport.getPendingStreamsCount()); assertTrue(stream instanceof DelayedStream); delayedTransport.setTransport(mockRealTransport); @@ -148,7 +153,7 @@ public class DelayedClientTransportTest { verify(transportListener).transportShutdown(any(Status.class)); verify(transportListener).transportTerminated(); assertEquals(1, fakeExecutor.runDueTasks()); - verify(mockRealTransport).newStream(same(method), same(headers)); + verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); stream.start(streamListener); verify(mockRealStream).start(same(streamListener)); } @@ -166,11 +171,11 @@ public class DelayedClientTransportTest { delayedTransport.shutdown(); verify(transportListener).transportShutdown(any(Status.class)); verify(transportListener).transportTerminated(); - ClientStream stream = delayedTransport.newStream(method, headers); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions); assertEquals(0, delayedTransport.getPendingStreamsCount()); stream.start(streamListener); assertFalse(stream instanceof DelayedStream); - verify(mockRealTransport).newStream(same(method), same(headers)); + verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); verify(mockRealStream).start(same(streamListener)); } @@ -179,11 +184,11 @@ public class DelayedClientTransportTest { delayedTransport.shutdownNow(Status.UNAVAILABLE); verify(transportListener).transportShutdown(any(Status.class)); verify(transportListener).transportTerminated(); - ClientStream stream = delayedTransport.newStream(method, headers); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions); assertEquals(0, delayedTransport.getPendingStreamsCount()); stream.start(streamListener); assertFalse(stream instanceof DelayedStream); - verify(mockRealTransport).newStream(same(method), same(headers)); + verify(mockRealTransport).newStream(same(method), same(headers), same(callOptions)); verify(mockRealStream).start(same(streamListener)); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index a7f96a4ed7..2073e8c038 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -200,14 +200,16 @@ public class ManagedChannelImplTest { when(mockTransportFactory.newClientTransport( any(SocketAddress.class), any(String.class), any(String.class))) .thenReturn(mockTransport); - when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream); + when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT))) + .thenReturn(mockStream); call.start(mockCallListener, headers); verify(mockTransportFactory, timeout(1000)) .newClientTransport(same(socketAddress), eq(authority), eq(userAgent)); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue(); transportListener.transportReady(); - verify(mockTransport, timeout(1000)).newStream(same(method), same(headers)); + verify(mockTransport, timeout(1000)).newStream(same(method), same(headers), + same(CallOptions.DEFAULT)); verify(mockStream, timeout(1000)).start(streamListenerCaptor.capture()); verify(mockStream).setCompressor(isA(Compressor.class)); // Depends on how quick the real transport is created, ClientCallImpl may start on mockStream @@ -221,9 +223,11 @@ public class ManagedChannelImplTest { ClientCall call2 = channel.newCall(method, CallOptions.DEFAULT); ClientStream mockStream2 = mock(ClientStream.class); Metadata headers2 = new Metadata(); - when(mockTransport.newStream(same(method), same(headers2))).thenReturn(mockStream2); + when(mockTransport.newStream(same(method), same(headers2), same(CallOptions.DEFAULT))) + .thenReturn(mockStream2); call2.start(mockCallListener2, headers2); - verify(mockTransport, timeout(1000)).newStream(same(method), same(headers2)); + verify(mockTransport, timeout(1000)).newStream(same(method), same(headers2), + same(CallOptions.DEFAULT)); verify(mockStream2, timeout(1000)).start(streamListenerCaptor.capture()); ClientStreamListener streamListener2 = streamListenerCaptor.getValue(); Metadata trailers = new Metadata(); @@ -278,14 +282,16 @@ public class ManagedChannelImplTest { when(mockTransportFactory.newClientTransport( any(SocketAddress.class), any(String.class), any(String.class))) .thenReturn(mockTransport); - when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream); + when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT))) + .thenReturn(mockStream); call.start(mockCallListener, headers); verify(mockTransportFactory, timeout(1000)) .newClientTransport(same(socketAddress), eq(authority), any(String.class)); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue(); transportListener.transportReady(); - verify(mockTransport, timeout(1000)).newStream(same(method), same(headers)); + verify(mockTransport, timeout(1000)).newStream(same(method), same(headers), + same(CallOptions.DEFAULT)); verify(mockStream, timeout(1000)).start(streamListenerCaptor.capture()); verify(mockStream).setCompressor(isA(Compressor.class)); // Depends on how quick the real transport is created, ClientCallImpl may start on mockStream @@ -342,7 +348,8 @@ public class ManagedChannelImplTest { // Create transport and call ClientStream mockStream = mock(ClientStream.class); Metadata headers = new Metadata(); - when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream); + when(mockTransport.newStream(same(method), same(headers), same(CallOptions.DEFAULT))) + .thenReturn(mockStream); call.start(mockCallListener, headers); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); ManagedClientTransport.Listener transportListener = transportListenerCaptor.getValue(); @@ -444,18 +451,19 @@ public class ManagedChannelImplTest { public void callOptionsExecutor() { Metadata headers = new Metadata(); ClientStream mockStream = mock(ClientStream.class); - when(mockTransport.newStream(same(method), same(headers))).thenReturn(mockStream); + when(mockTransport.newStream(same(method), same(headers), any(CallOptions.class))) + .thenReturn(mockStream); FakeClock fakeExecutor = new FakeClock(); ManagedChannel channel = createChannel( new FakeNameResolverFactory(true), NO_INTERCEPTOR); + CallOptions options = CallOptions.DEFAULT.withExecutor(fakeExecutor.scheduledExecutorService); - ClientCall call = channel.newCall(method, CallOptions.DEFAULT.withExecutor( - fakeExecutor.scheduledExecutorService)); + ClientCall call = channel.newCall(method, options); call.start(mockCallListener, headers); verify(mockTransport, timeout(1000)).start(transportListenerCaptor.capture()); transportListenerCaptor.getValue().transportReady(); - verify(mockTransport, timeout(1000)).newStream(same(method), same(headers)); + verify(mockTransport, timeout(1000)).newStream(same(method), same(headers), same(options)); verify(mockStream, timeout(1000)).start(streamListenerCaptor.capture()); ClientStreamListener streamListener = streamListenerCaptor.getValue(); Metadata trailers = new Metadata(); @@ -608,7 +616,8 @@ public class ManagedChannelImplTest { .newClientTransport(same(goodAddress), any(String.class), any(String.class)); verify(goodTransport, timeout(1000)).start(goodTransportListenerCaptor.capture()); goodTransportListenerCaptor.getValue().transportReady(); - verify(goodTransport, timeout(1000)).newStream(same(method), same(headers)); + verify(goodTransport, timeout(1000)).newStream(same(method), same(headers), + same(CallOptions.DEFAULT)); // The bad transport was never used. verify(badTransport, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class)); } @@ -707,7 +716,8 @@ public class ManagedChannelImplTest { .newClientTransport(same(addr1), any(String.class), any(String.class)); verify(transport1, timeout(1000)).start(transportListenerCaptor.capture()); transportListenerCaptor.getValue().transportReady(); - verify(transport1, timeout(1000)).newStream(same(method), same(headers)); + verify(transport1, timeout(1000)).newStream(same(method), same(headers), + same(CallOptions.DEFAULT)); transportListenerCaptor.getValue().transportShutdown(Status.UNAVAILABLE); // Second call still use the first address, since it was successfully connected. @@ -717,7 +727,8 @@ public class ManagedChannelImplTest { verify(mockTransportFactory, times(2)) .newClientTransport(same(addr1), any(String.class), any(String.class)); transportListenerCaptor.getValue().transportReady(); - verify(transport2, timeout(1000)).newStream(same(method), same(headers)); + verify(transport2, timeout(1000)).newStream(same(method), same(headers), + same(CallOptions.DEFAULT)); } @Test diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java index d8b1f1a083..19ea9fdb7c 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTransportManagerTest.java @@ -47,6 +47,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.ClientInterceptor; import io.grpc.CompressorRegistry; import io.grpc.DecompressorRegistry; @@ -98,6 +99,8 @@ public class ManagedChannelImplTransportManagerTest { private final MethodDescriptor method2 = MethodDescriptor.create( MethodDescriptor.MethodType.UNKNOWN, "/service/method2", new StringMarshaller(), new StringMarshaller()); + private final CallOptions callOptions = CallOptions.DEFAULT.withAuthority("dummy_value"); + private final CallOptions callOptions2 = CallOptions.DEFAULT.withAuthority("dummy_value2"); private ManagedChannelImpl channel; @@ -188,7 +191,7 @@ public class ManagedChannelImplTransportManagerTest { // Subsequent getTransport() will use the next address ClientTransport t2 = tm.getTransport(addressGroup); assertNotNull(t2); - t2.newStream(method, new Metadata()); + t2.newStream(method, new Metadata(), callOptions); // Will keep the previous back-off policy, and not consult back-off policy verify(mockTransportFactory, timeout(1000)).newClientTransport(addr2, authority, userAgent); verify(mockBackoffPolicyProvider, times(backoffReset)).get(); @@ -196,7 +199,8 @@ public class ManagedChannelImplTransportManagerTest { ClientTransport rt2 = transportInfo.transport; // Make the second transport ready transportInfo.listener.transportReady(); - verify(rt2, timeout(1000)).newStream(same(method), any(Metadata.class)); + verify(rt2, timeout(1000)).newStream(same(method), any(Metadata.class), + same(callOptions)); verify(mockNameResolver, times(0)).refresh(); // Disconnect the second transport transportInfo.listener.transportShutdown(Status.UNAVAILABLE); @@ -205,7 +209,7 @@ public class ManagedChannelImplTransportManagerTest { // Subsequent getTransport() will use the first address, since last attempt was successful. ClientTransport t3 = tm.getTransport(addressGroup); - t3.newStream(method2, new Metadata()); + t3.newStream(method2, new Metadata(), callOptions2); verify(mockTransportFactory, timeout(1000).times(2)) .newClientTransport(addr1, authority, userAgent); // Still no back-off policy creation, because an address succeeded. @@ -213,7 +217,8 @@ public class ManagedChannelImplTransportManagerTest { transportInfo = transports.poll(1, TimeUnit.SECONDS); ClientTransport rt3 = transportInfo.transport; transportInfo.listener.transportReady(); - verify(rt3, timeout(1000)).newStream(same(method2), any(Metadata.class)); + verify(rt3, timeout(1000)).newStream(same(method2), any(Metadata.class), + same(callOptions2)); verify(rt1, times(0)).newStream(any(MethodDescriptor.class), any(Metadata.class)); // Back-off policy was never consulted. diff --git a/core/src/test/java/io/grpc/internal/TestUtils.java b/core/src/test/java/io/grpc/internal/TestUtils.java index a23474f75b..5022924146 100644 --- a/core/src/test/java/io/grpc/internal/TestUtils.java +++ b/core/src/test/java/io/grpc/internal/TestUtils.java @@ -36,6 +36,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -84,8 +85,8 @@ final class TestUtils { @Override public ManagedClientTransport answer(InvocationOnMock invocation) throws Throwable { final ManagedClientTransport mockTransport = mock(ManagedClientTransport.class); - when(mockTransport.newStream(any(MethodDescriptor.class), any(Metadata.class))) - .thenReturn(mock(ClientStream.class)); + when(mockTransport.newStream(any(MethodDescriptor.class), any(Metadata.class), + any(CallOptions.class))).thenReturn(mock(ClientStream.class)); // Save the listener doAnswer(new Answer() { @Override diff --git a/core/src/test/java/io/grpc/internal/TransportSetTest.java b/core/src/test/java/io/grpc/internal/TransportSetTest.java index 2b9756e75a..f7423780bf 100644 --- a/core/src/test/java/io/grpc/internal/TransportSetTest.java +++ b/core/src/test/java/io/grpc/internal/TransportSetTest.java @@ -48,6 +48,7 @@ import static org.mockito.Mockito.when; import com.google.common.base.Stopwatch; +import io.grpc.CallOptions; import io.grpc.EquivalentAddressGroup; import io.grpc.IntegerMarshaller; import io.grpc.LoadBalancer; @@ -450,7 +451,8 @@ public class TransportSetTest { verify(transportInfo.transport, times(0)).newStream( any(MethodDescriptor.class), any(Metadata.class)); assertEquals(1, fakeExecutor.runDueTasks()); - verify(transportInfo.transport).newStream(same(method), same(headers)); + verify(transportInfo.transport).newStream(same(method), same(headers), + same(CallOptions.DEFAULT)); verify(transportInfo.transport).shutdown(); transportInfo.listener.transportShutdown(Status.UNAVAILABLE); verify(mockTransportSetCallback, never()).onTerminated(); diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index dd48a33398..d4c775e8d6 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -36,6 +36,7 @@ import static io.netty.channel.ChannelOption.SO_KEEPALIVE; import com.google.common.base.Preconditions; import com.google.common.base.Ticker; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; @@ -112,7 +113,8 @@ class NettyClientTransport implements ManagedClientTransport { } @Override - public ClientStream newStream(MethodDescriptor method, Metadata headers) { + public ClientStream newStream(MethodDescriptor method, Metadata headers, CallOptions + callOptions) { Preconditions.checkNotNull(method, "method"); Preconditions.checkNotNull(headers, "headers"); return new NettyClientStream(method, headers, channel, handler, maxMessageSize, authority, @@ -124,6 +126,11 @@ class NettyClientTransport implements ManagedClientTransport { }; } + @Override + public ClientStream newStream(MethodDescriptor method, Metadata headers) { + return newStream(method, headers, CallOptions.DEFAULT); + } + @Override public void start(Listener transportListener) { lifecycleManager = new ClientTransportLifecycleManager( diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 74e33da3d8..b77c380d34 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -39,6 +39,7 @@ import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; import com.google.common.util.concurrent.SettableFuture; +import io.grpc.CallOptions; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; @@ -246,13 +247,20 @@ class OkHttpClientTransport implements ManagedClientTransport { } @Override - public OkHttpClientStream newStream(final MethodDescriptor method, final Metadata headers) { + public OkHttpClientStream newStream(final MethodDescriptor method, final Metadata + headers, CallOptions callOptions) { Preconditions.checkNotNull(method, "method"); Preconditions.checkNotNull(headers, "headers"); return new OkHttpClientStream(method, headers, frameWriter, OkHttpClientTransport.this, outboundFlow, lock, maxMessageSize, defaultAuthority, userAgent); } + @Override + public OkHttpClientStream newStream(final MethodDescriptor method, final Metadata + headers) { + return newStream(method, headers, CallOptions.DEFAULT); + } + @GuardedBy("lock") void streamReadyToStart(OkHttpClientStream clientStream) { synchronized (lock) {