core: pass CallOptions to newClientStreamTracer(). (#3276)

Resolves #3256
This commit is contained in:
Kun Zhang 2017-07-27 12:07:25 -07:00 committed by GitHub
parent da47085dbb
commit 04e0450304
11 changed files with 48 additions and 23 deletions

View File

@ -43,10 +43,24 @@ public abstract class ClientStreamTracer extends StreamTracer {
/**
* Creates a {@link ClientStreamTracer} for a new client stream.
*
* @deprecated Override/call {@link #newClientStreamTracer(CallOptions, Metadata)} instead.
*/
@Deprecated
public ClientStreamTracer newClientStreamTracer(Metadata headers) {
throw new UnsupportedOperationException("This method will be deleted. Do not call.");
}
/**
* Creates a {@link ClientStreamTracer} for a new client stream.
*
* @param callOptions the effective CallOptions of the call
* @param headers the mutable headers of the stream. It can be safely mutated within this
* method. It should not be saved because it is not safe for read or write after the
* method returns.
*/
public abstract ClientStreamTracer newClientStreamTracer(Metadata headers);
@SuppressWarnings("deprecation")
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
return newClientStreamTracer(headers);
}
}
}

View File

@ -175,7 +175,7 @@ final class CensusStatsModule {
}
@Override
public ClientStreamTracer newClientStreamTracer(Metadata headers) {
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
ClientTracer tracer = new ClientTracer();
// TODO(zhangkun83): Once retry or hedging is implemented, a ClientCall may start more than
// one streams. We will need to update this file to support them.

View File

@ -201,7 +201,7 @@ final class CensusTracingModule {
}
@Override
public ClientStreamTracer newClientStreamTracer(Metadata headers) {
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
headers.discardAll(tracingHeader);
headers.put(tracingHeader, span.getContext());
return noopClientTracer;

View File

@ -55,7 +55,7 @@ public final class StatsTraceContext {
// so that for-each doesn't create an Iterator every time.
StreamTracer[] tracers = new StreamTracer[factories.size()];
for (int i = 0; i < tracers.length; i++) {
tracers[i] = factories.get(i).newClientStreamTracer(headers);
tracers[i] = factories.get(i).newClientStreamTracer(callOptions, headers);
}
return new StatsTraceContext(tracers);
}

View File

@ -257,7 +257,7 @@ public class CallOptionsTest {
}
@Override
public ClientStreamTracer newClientStreamTracer(Metadata headers) {
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
return new ClientStreamTracer() {};
}

View File

@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.Codec;
import io.grpc.Metadata;
@ -216,7 +217,8 @@ public class AbstractClientStreamTest {
ClientStreamTracer.Factory tracerFactory =
new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(Metadata headers) {
public ClientStreamTracer newClientStreamTracer(
CallOptions callOptions, Metadata headers) {
return tracer;
}
};

View File

@ -316,7 +316,7 @@ public class CensusModulesTest {
CensusStatsModule.ClientCallTracer callTracer =
censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName());
Metadata headers = new Metadata();
ClientStreamTracer tracer = callTracer.newClientStreamTracer(headers);
ClientStreamTracer tracer = callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
fakeClock.forwardTime(30, MILLISECONDS);
tracer.outboundHeaders();
@ -360,7 +360,8 @@ public class CensusModulesTest {
CensusTracingModule.ClientCallTracer callTracer =
censusTracing.newClientCallTracer(null, method.getFullMethodName());
Metadata headers = new Metadata();
ClientStreamTracer clientStreamTracer = callTracer.newClientStreamTracer(headers);
ClientStreamTracer clientStreamTracer =
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
verify(tracer).spanBuilderWithExplicitParent(
eq("Sent.package1.service2.method3"), isNull(Span.class));
verify(spyClientSpan, never()).end(any(EndSpanOptions.class));
@ -440,7 +441,7 @@ public class CensusModulesTest {
CensusStatsModule.ClientCallTracer callTracer =
census.newClientCallTracer(clientCtx, method.getFullMethodName());
// This propagates clientCtx to headers if propagates==true
callTracer.newClientStreamTracer(headers);
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
if (propagate) {
assertTrue(headers.containsKey(census.statsHeader));
} else {
@ -492,7 +493,7 @@ public class CensusModulesTest {
CensusStatsModule.ClientCallTracer callTracer =
censusStats.newClientCallTracer(statsCtxFactory.getDefault(), method.getFullMethodName());
Metadata headers = new Metadata();
callTracer.newClientStreamTracer(headers);
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
assertFalse(headers.containsKey(censusStats.statsHeader));
}
@ -521,7 +522,7 @@ public class CensusModulesTest {
CensusTracingModule.ClientCallTracer callTracer =
censusTracing.newClientCallTracer(fakeClientParentSpan, method.getFullMethodName());
Metadata headers = new Metadata();
callTracer.newClientStreamTracer(headers);
callTracer.newClientStreamTracer(CallOptions.DEFAULT, headers);
verify(mockTracingPropagationHandler).toBinaryValue(same(fakeClientSpanContext));
verifyNoMoreInteractions(mockTracingPropagationHandler);

View File

@ -19,6 +19,7 @@ package io.grpc.grpclb;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.protobuf.util.Timestamps;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.Metadata;
import io.grpc.Status;
@ -47,7 +48,7 @@ final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
}
@Override
public ClientStreamTracer newClientStreamTracer(Metadata headers) {
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
callsStarted.incrementAndGet();
return new StreamTracer();
}

View File

@ -46,6 +46,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.util.Durations;
import com.google.protobuf.util.Timestamps;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
@ -366,7 +367,7 @@ public class GrpclbLoadBalancerTest {
ClientStats.newBuilder().build());
ClientStreamTracer tracer1 =
pick1.getStreamTracerFactory().newClientStreamTracer(new Metadata());
pick1.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
PickResult pick2 = picker.pickSubchannel(args);
assertNull(pick2.getSubchannel());
@ -385,7 +386,7 @@ public class GrpclbLoadBalancerTest {
assertSame(subchannel2, pick3.getSubchannel());
assertSame(balancer.getLoadRecorder(), pick3.getStreamTracerFactory());
ClientStreamTracer tracer3 =
pick3.getStreamTracerFactory().newClientStreamTracer(new Metadata());
pick3.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
// pick3 has sent out headers
tracer3.outboundHeaders();
@ -418,7 +419,7 @@ public class GrpclbLoadBalancerTest {
assertSame(subchannel1, pick1.getSubchannel());
assertSame(balancer.getLoadRecorder(), pick5.getStreamTracerFactory());
ClientStreamTracer tracer5 =
pick5.getStreamTracerFactory().newClientStreamTracer(new Metadata());
pick5.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
// pick3 ended without receiving response headers
tracer3.streamClosed(Status.DEADLINE_EXCEEDED);
@ -492,7 +493,7 @@ public class GrpclbLoadBalancerTest {
PickResult pick1p = picker.pickSubchannel(args);
assertSame(subchannel1, pick1p.getSubchannel());
assertSame(balancer.getLoadRecorder(), pick1p.getStreamTracerFactory());
pick1p.getStreamTracerFactory().newClientStreamTracer(new Metadata());
pick1p.getStreamTracerFactory().newClientStreamTracer(CallOptions.DEFAULT, new Metadata());
// The pick from the new stream will be included in the report
assertNextReport(

View File

@ -204,7 +204,7 @@ public abstract class AbstractInteropTest {
private final ClientStreamTracer.Factory clientStreamTracerFactory =
new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(Metadata headers) {
public ClientStreamTracer newClientStreamTracer(CallOptions callOptions, Metadata headers) {
ClientStreamTracer tracer = spy(new ClientStreamTracer() {});
clientStreamTracers.add(tracer);
return tracer;

View File

@ -172,7 +172,8 @@ public abstract class AbstractTransportTest {
@Before
public void setUp() {
server = newServer(Arrays.asList(serverStreamTracerFactory));
when(clientStreamTracerFactory.newClientStreamTracer(any(Metadata.class)))
when(clientStreamTracerFactory
.newClientStreamTracer(any(CallOptions.class), any(Metadata.class)))
.thenReturn(clientStreamTracer);
when(serverStreamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
.thenReturn(serverStreamTracer);
@ -512,7 +513,8 @@ public abstract class AbstractTransportTest {
// Stream prevents termination
ClientStream stream = client.newStream(methodDescriptor, new Metadata(), callOptions);
if (metricsExpected()) {
inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class));
inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
any(CallOptions.class), any(Metadata.class));
}
stream.start(mockClientStreamListener);
client.shutdown();
@ -520,7 +522,8 @@ public abstract class AbstractTransportTest {
ClientStream stream2 = client.newStream(methodDescriptor, new Metadata(), callOptions);
if (metricsExpected()) {
inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class));
inOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
any(CallOptions.class), any(Metadata.class));
}
ClientStreamListener mockClientStreamListener2 = mock(ClientStreamListener.class);
stream2.start(mockClientStreamListener2);
@ -563,7 +566,8 @@ public abstract class AbstractTransportTest {
verify(mockClientTransportListener, never()).transportInUse(anyBoolean());
assertCodeEquals(Status.UNAVAILABLE, statusCaptor.getValue());
if (metricsExpected()) {
verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class));
verify(clientStreamTracerFactory).newClientStreamTracer(
any(CallOptions.class), any(Metadata.class));
verify(clientStreamTracer).streamClosed(same(statusCaptor.getValue()));
verifyZeroInteractions(serverStreamTracerFactory);
}
@ -639,7 +643,8 @@ public abstract class AbstractTransportTest {
clientHeadersCopy.merge(clientHeaders);
ClientStream clientStream = client.newStream(methodDescriptor, clientHeaders, callOptions);
if (metricsExpected()) {
clientInOrder.verify(clientStreamTracerFactory).newClientStreamTracer(same(clientHeaders));
clientInOrder.verify(clientStreamTracerFactory).newClientStreamTracer(
same(callOptions), same(clientHeaders));
}
clientStream.start(mockClientStreamListener);
@ -1068,7 +1073,8 @@ public abstract class AbstractTransportTest {
assertNull(statusCaptor.getValue().getCause());
if (metricsExpected()) {
verify(clientStreamTracerFactory).newClientStreamTracer(any(Metadata.class));
verify(clientStreamTracerFactory).newClientStreamTracer(
any(CallOptions.class), any(Metadata.class));
verify(clientStreamTracer).outboundHeaders();
verify(clientStreamTracer).streamClosed(same(statusCaptor.getValue()));
verify(serverStreamTracerFactory).newServerStreamTracer(anyString(), any(Metadata.class));