From 04e0450304b18c194c5643a8b52421abc6d523a5 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Thu, 27 Jul 2017 12:07:25 -0700 Subject: [PATCH] core: pass CallOptions to newClientStreamTracer(). (#3276) Resolves #3256 --- .../main/java/io/grpc/ClientStreamTracer.java | 16 +++++++++++++++- .../io/grpc/internal/CensusStatsModule.java | 2 +- .../io/grpc/internal/CensusTracingModule.java | 2 +- .../io/grpc/internal/StatsTraceContext.java | 2 +- .../src/test/java/io/grpc/CallOptionsTest.java | 2 +- .../internal/AbstractClientStreamTest.java | 4 +++- .../io/grpc/internal/CensusModulesTest.java | 11 ++++++----- .../grpc/grpclb/GrpclbClientLoadRecorder.java | 3 ++- .../io/grpc/grpclb/GrpclbLoadBalancerTest.java | 9 +++++---- .../integration/AbstractInteropTest.java | 2 +- .../testing/AbstractTransportTest.java | 18 ++++++++++++------ 11 files changed, 48 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/io/grpc/ClientStreamTracer.java b/core/src/main/java/io/grpc/ClientStreamTracer.java index 3d01e12b6f..a4716ea812 100644 --- a/core/src/main/java/io/grpc/ClientStreamTracer.java +++ b/core/src/main/java/io/grpc/ClientStreamTracer.java @@ -43,10 +43,24 @@ public abstract class ClientStreamTracer extends StreamTracer { /** * Creates a {@link ClientStreamTracer} for a new client stream. * + * @deprecated Override/call {@link #newClientStreamTracer(CallOptions, Metadata)} instead. + */ + @Deprecated + public ClientStreamTracer newClientStreamTracer(Metadata headers) { + throw new UnsupportedOperationException("This method will be deleted. Do not call."); + } + + /** + * Creates a {@link ClientStreamTracer} for a new client stream. + * + * @param callOptions the effective CallOptions of the call * @param headers the mutable headers of the stream. It can be safely mutated within this * method. It should not be saved because it is not safe for read or write after the * method returns. */ - public abstract ClientStreamTracer newClientStreamTracer(Metadata headers); + @SuppressWarnings("deprecation") + public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { + return newClientStreamTracer(headers); + } } } diff --git a/core/src/main/java/io/grpc/internal/CensusStatsModule.java b/core/src/main/java/io/grpc/internal/CensusStatsModule.java index 06d40fe56f..843f539e07 100644 --- a/core/src/main/java/io/grpc/internal/CensusStatsModule.java +++ b/core/src/main/java/io/grpc/internal/CensusStatsModule.java @@ -175,7 +175,7 @@ final class CensusStatsModule { } @Override - public ClientStreamTracer newClientStreamTracer(Metadata headers) { + public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { ClientTracer tracer = new ClientTracer(); // TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than // one streams. We will need to update this file to support them. diff --git a/core/src/main/java/io/grpc/internal/CensusTracingModule.java b/core/src/main/java/io/grpc/internal/CensusTracingModule.java index a6c160af8d..a38bd29ac5 100644 --- a/core/src/main/java/io/grpc/internal/CensusTracingModule.java +++ b/core/src/main/java/io/grpc/internal/CensusTracingModule.java @@ -201,7 +201,7 @@ final class CensusTracingModule { } @Override - public ClientStreamTracer newClientStreamTracer(Metadata headers) { + public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { headers.discardAll(tracingHeader); headers.put(tracingHeader, span.getContext()); return noopClientTracer; diff --git a/core/src/main/java/io/grpc/internal/StatsTraceContext.java b/core/src/main/java/io/grpc/internal/StatsTraceContext.java index bf0c14b962..1326a1b856 100644 --- a/core/src/main/java/io/grpc/internal/StatsTraceContext.java +++ b/core/src/main/java/io/grpc/internal/StatsTraceContext.java @@ -55,7 +55,7 @@ public final class StatsTraceContext { // so that for-each doesn't create an Iterator every time. StreamTracer[] tracers = new StreamTracer[factories.size()]; for (int i = 0; i < tracers.length; i++) { - tracers[i] = factories.get(i).newClientStreamTracer(headers); + tracers[i] = factories.get(i).newClientStreamTracer(callOptions, headers); } return new StatsTraceContext(tracers); } diff --git a/core/src/test/java/io/grpc/CallOptionsTest.java b/core/src/test/java/io/grpc/CallOptionsTest.java index 0b1b06a5e2..3fef17a5c4 100644 --- a/core/src/test/java/io/grpc/CallOptionsTest.java +++ b/core/src/test/java/io/grpc/CallOptionsTest.java @@ -257,7 +257,7 @@ public class CallOptionsTest { } @Override - public ClientStreamTracer newClientStreamTracer(Metadata headers) { + public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { return new ClientStreamTracer() {}; } diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java index 4a27f79d53..96b75aa0f0 100644 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; import io.grpc.Codec; import io.grpc.Metadata; @@ -216,7 +217,8 @@ public class AbstractClientStreamTest { ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() { @Override - public ClientStreamTracer newClientStreamTracer(Metadata headers) { + public ClientStreamTracer newClientStreamTracer( + CallOptions callOptions, Metadata headers) { return tracer; } }; diff --git a/core/src/test/java/io/grpc/internal/CensusModulesTest.java b/core/src/test/java/io/grpc/internal/CensusModulesTest.java index 68fde92382..498294d454 100644 --- a/core/src/test/java/io/grpc/internal/CensusModulesTest.java +++ b/core/src/test/java/io/grpc/internal/CensusModulesTest.java @@ -316,7 +316,7 @@ public class CensusModulesTest { CensusStatsModule.ClientCallTracer callTracer = censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); Metadata headers = new Metadata(); - ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers); + ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); fakeClock.forwardTime(30, MILLISECONDS); tracer.outboundHeaders(); @@ -360,7 +360,8 @@ public class CensusModulesTest { CensusTracingModule.ClientCallTracer callTracer = censusTracing.newClientCallTracer(null, method.getFullMethodName()); Metadata headers = new Metadata(); - ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(headers); + ClientStreamTracer clientStreamTracer = + callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); verify(tracer).spanBuilderWithExplicitParent( eq("Sent.package1.service2.method3"), isNull(Span.class)); verify(spyClientSpan, never()).end(any(EndSpanOptions.class)); @@ -440,7 +441,7 @@ public class CensusModulesTest { CensusStatsModule.ClientCallTracer callTracer = census.newClientCallTracer(clientCtx, method.getFullMethodName()); // This propagates clientCtx to headers if propagates==true - callTracer.newClientStreamTracer(headers); + callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); if (propagate) { assertTrue(headers.containsKey(census.statsHeader)); } else { @@ -492,7 +493,7 @@ public class CensusModulesTest { CensusStatsModule.ClientCallTracer callTracer = censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName()); Metadata headers = new Metadata(); - callTracer.newClientStreamTracer(headers); + callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); assertFalse(headers.containsKey(censusStats.statsHeader)); } @@ -521,7 +522,7 @@ public class CensusModulesTest { CensusTracingModule.ClientCallTracer callTracer = censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName()); Metadata headers = new Metadata(); - callTracer.newClientStreamTracer(headers); + callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers); verify(mockTracingPropagationHandler).toBinaryValue(same(fakeClientSpanContext)); verifyNoMoreInteractions(mockTracingPropagationHandler); diff --git a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java index 41acde471b..7f6fbf4e20 100644 --- a/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java +++ b/grpclb/src/main/java/io/grpc/grpclb/GrpclbClientLoadRecorder.java @@ -19,6 +19,7 @@ package io.grpc.grpclb; import static com.google.common.base.Preconditions.checkNotNull; import com.google.protobuf.util.Timestamps; +import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; import io.grpc.Metadata; import io.grpc.Status; @@ -47,7 +48,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { } @Override - public ClientStreamTracer newClientStreamTracer(Metadata headers) { + public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { callsStarted.incrementAndGet(); return new StreamTracer(); } diff --git a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java index cd6dacf22b..7fee0eaba3 100644 --- a/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java +++ b/grpclb/src/test/java/io/grpc/grpclb/GrpclbLoadBalancerTest.java @@ -46,6 +46,7 @@ import com.google.protobuf.ByteString; import com.google.protobuf.util.Durations; import com.google.protobuf.util.Timestamps; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; import io.grpc.ConnectivityStateInfo; import io.grpc.EquivalentAddressGroup; @@ -366,7 +367,7 @@ public class GrpclbLoadBalancerTest { ClientStats.newBuilder().build()); ClientStreamTracer tracer1 = - pick1.getStreamTracerFactory().newClientStreamTracer(new Metadata()); + pick1.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); PickResult pick2 = picker.pickSubchannel(args); assertNull(pick2.getSubchannel()); @@ -385,7 +386,7 @@ public class GrpclbLoadBalancerTest { assertSame(subchannel2, pick3.getSubchannel()); assertSame(balancer.getLoadRecorder(), pick3.getStreamTracerFactory()); ClientStreamTracer tracer3 = - pick3.getStreamTracerFactory().newClientStreamTracer(new Metadata()); + pick3.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); // pick3 has sent out headers tracer3.outboundHeaders(); @@ -418,7 +419,7 @@ public class GrpclbLoadBalancerTest { assertSame(subchannel1, pick1.getSubchannel()); assertSame(balancer.getLoadRecorder(), pick5.getStreamTracerFactory()); ClientStreamTracer tracer5 = - pick5.getStreamTracerFactory().newClientStreamTracer(new Metadata()); + pick5.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); // pick3 ended without receiving response headers tracer3.streamClosed(Status.DEADLINE_EXCEEDED); @@ -492,7 +493,7 @@ public class GrpclbLoadBalancerTest { PickResult pick1p = picker.pickSubchannel(args); assertSame(subchannel1, pick1p.getSubchannel()); assertSame(balancer.getLoadRecorder(), pick1p.getStreamTracerFactory()); - pick1p.getStreamTracerFactory().newClientStreamTracer(new Metadata()); + pick1p.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata()); // The pick from the new stream will be included in the report assertNextReport( diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index c3d2391295..e2061a56ea 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -204,7 +204,7 @@ public abstract class AbstractInteropTest { private final ClientStreamTracer.Factory clientStreamTracerFactory = new ClientStreamTracer.Factory() { @Override - public ClientStreamTracer newClientStreamTracer(Metadata headers) { + public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) { ClientStreamTracer tracer = spy(new ClientStreamTracer() {}); clientStreamTracers.add(tracer); return tracer; diff --git a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java index 2f232fd949..7fb0bf9c6a 100644 --- a/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java +++ b/testing/src/main/java/io/grpc/internal/testing/AbstractTransportTest.java @@ -172,7 +172,8 @@ public abstract class AbstractTransportTest { @Before public void setUp() { server = newServer(Arrays.asList(serverStreamTracerFactory)); - when(clientStreamTracerFactory.newClientStreamTracer(any(Metadata.class))) + when(clientStreamTracerFactory + .newClientStreamTracer(any(CallOptions.class), any(Metadata.class))) .thenReturn(clientStreamTracer); when(serverStreamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) .thenReturn(serverStreamTracer); @@ -512,7 +513,8 @@ public abstract class AbstractTransportTest { // Stream prevents termination ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions); if (metricsExpected()) { - inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class)); + inOrder.verify(clientStreamTracerFactory).newClientStreamTracer( + any(CallOptions.class), any(Metadata.class)); } stream.start(mockClientStreamListener); client.shutdown(); @@ -520,7 +522,8 @@ public abstract class AbstractTransportTest { ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions); if (metricsExpected()) { - inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class)); + inOrder.verify(clientStreamTracerFactory).newClientStreamTracer( + any(CallOptions.class), any(Metadata.class)); } ClientStreamListener mockClientStreamListener2 = mock(ClientStreamListener.class); stream2.start(mockClientStreamListener2); @@ -563,7 +566,8 @@ public abstract class AbstractTransportTest { verify(mockClientTransportListener, never()).transportInUse(anyBoolean()); assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue()); if (metricsExpected()) { - verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class)); + verify(clientStreamTracerFactory).newClientStreamTracer( + any(CallOptions.class), any(Metadata.class)); verify(clientStreamTracer).streamClosed(same(statusCaptor.getValue())); verifyZeroInteractions(serverStreamTracerFactory); } @@ -639,7 +643,8 @@ public abstract class AbstractTransportTest { clientHeadersCopy.merge(clientHeaders); ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions); if (metricsExpected()) { - clientInOrder.verify(clientStreamTracerFactory).newClientStreamTracer(same(clientHeaders)); + clientInOrder.verify(clientStreamTracerFactory).newClientStreamTracer( + same(callOptions), same(clientHeaders)); } clientStream.start(mockClientStreamListener); @@ -1068,7 +1073,8 @@ public abstract class AbstractTransportTest { assertNull(statusCaptor.getValue().getCause()); if (metricsExpected()) { - verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class)); + verify(clientStreamTracerFactory).newClientStreamTracer( + any(CallOptions.class), any(Metadata.class)); verify(clientStreamTracer).outboundHeaders(); verify(clientStreamTracer).streamClosed(same(statusCaptor.getValue())); verify(serverStreamTracerFactory).newServerStreamTracer(anyString(), any(Metadata.class));