Plumb optional labels from LB to ClientStreamTracer

As part of gRFC A78:

> To support the locality label in the per-call metrics, we will provide
> a mechanism for LB picker to add optional labels to the call attempt
> tracer.
This commit is contained in:
Eric Anderson 2024-04-29 16:30:51 -07:00 committed by GitHub
parent 06df25b65d
commit 4c78a9746c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 305 additions and 94 deletions

View File

@ -79,6 +79,13 @@ public abstract class ClientStreamTracer extends StreamTracer {
public void inboundTrailers(Metadata trailers) {
}
/**
* Information providing context to the call became available.
*/
@Internal
public void addOptionalLabel(String key, String value) {
}
/**
* Factory class for {@link ClientStreamTracer}.
*/

View File

@ -490,6 +490,29 @@ public abstract class LoadBalancer {
* @since 1.2.0
*/
public abstract MethodDescriptor<?, ?> getMethodDescriptor();
/**
* Gets an object that can be informed about what sort of pick was made.
*/
@Internal
public PickDetailsConsumer getPickDetailsConsumer() {
return new PickDetailsConsumer() {};
}
}
/** Receives information about the pick being chosen. */
@Internal
public interface PickDetailsConsumer {
/**
* Optional labels that provide context of how the pick was routed. Particularly helpful for
* per-RPC metrics.
*
* @throws NullPointerException if key or value is {@code null}
*/
default void addOptionalLabel(String key, String value) {
checkNotNull(key, "key");
checkNotNull(value, "value");
}
}
/**

View File

@ -0,0 +1,59 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import com.google.common.base.Preconditions;
import io.grpc.CallOptions;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import org.mockito.ArgumentMatcher;
import org.mockito.ArgumentMatchers;
/**
* Mockito Matcher for {@link PickSubchannelArgs}.
*/
public final class PickSubchannelArgsMatcher implements ArgumentMatcher<PickSubchannelArgs> {
private final MethodDescriptor<?, ?> method;
private final Metadata headers;
private final CallOptions callOptions;
public PickSubchannelArgsMatcher(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
this.method = Preconditions.checkNotNull(method, "method");
this.headers = Preconditions.checkNotNull(headers, "headers");
this.callOptions = Preconditions.checkNotNull(callOptions, "callOptions");
}
@Override
public boolean matches(PickSubchannelArgs args) {
return args != null
&& method.equals(args.getMethodDescriptor())
&& headers.equals(args.getHeaders())
&& callOptions.equals(args.getCallOptions());
}
@Override
public final String toString() {
return "[method=" + method + " headers=" + headers + " callOptions=" + callOptions + "]";
}
public static PickSubchannelArgs eqPickSubchannelArgs(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
return ArgumentMatchers.argThat(new PickSubchannelArgsMatcher(method, headers, callOptions));
}
}

View File

@ -137,7 +137,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
ClientStreamTracer[] tracers) {
try {
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
PickSubchannelArgs args = new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
SubchannelPicker picker = null;
long pickerVersion = -1;
while (true) {

View File

@ -54,6 +54,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
delegate().inboundTrailers(trailers);
}
@Override
public void addOptionalLabel(String key, String value) {
delegate().addOptionalLabel(key, value);
}
@Override
public void streamClosed(Status status) {
delegate().streamClosed(status);

View File

@ -158,6 +158,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
throw new IllegalStateException("Resolution is pending");
}
};
private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
new LoadBalancer.PickDetailsConsumer() {};
private final InternalLogId logId;
private final String target;
@ -519,11 +521,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
final Metadata headers,
final Context context) {
if (!retryEnabled) {
ClientTransport transport =
getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
Context origContext = context.attach();
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
callOptions, headers, 0, /* isTransparentRetry= */ false);
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach();
try {
return transport.newStream(method, headers, callOptions, tracers);
} finally {
@ -566,8 +568,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
newOptions, newHeaders, previousAttempts, isTransparentRetry);
ClientTransport transport =
getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
Context origContext = context.attach();
try {
return transport.newStream(method, newHeaders, newOptions, tracers);
@ -1207,7 +1209,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
@SuppressWarnings("unchecked")
@Override
public void start(Listener<RespT> observer, Metadata headers) {
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
PickSubchannelArgs args =
new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
InternalConfigSelector.Result result = configSelector.selectConfig(args);
Status status = result.getStatus();
if (!status.isOk()) {

View File

@ -0,0 +1,42 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.internal;
import com.google.common.base.Preconditions;
import io.grpc.ClientStreamTracer;
import io.grpc.LoadBalancer.PickDetailsConsumer;
/**
* Adapter for tracers into details consumers.
*/
final class PickDetailsConsumerImpl implements PickDetailsConsumer {
private final ClientStreamTracer[] tracers;
/** Construct a consumer with unchanging tracers array. */
public PickDetailsConsumerImpl(ClientStreamTracer[] tracers) {
this.tracers = Preconditions.checkNotNull(tracers, "tracers");
}
@Override
public void addOptionalLabel(String key, String value) {
Preconditions.checkNotNull(key, "key");
Preconditions.checkNotNull(value, "value");
for (ClientStreamTracer tracer : tracers) {
tracer.addOptionalLabel(key, value);
}
}
}

View File

@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Objects;
import io.grpc.CallOptions;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
@ -29,15 +30,18 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
private final CallOptions callOptions;
private final Metadata headers;
private final MethodDescriptor<?, ?> method;
private final PickDetailsConsumer pickDetailsConsumer;
/**
* Creates call args object for given method with its call options, metadata.
*/
public PickSubchannelArgsImpl(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
PickDetailsConsumer pickDetailsConsumer) {
this.method = checkNotNull(method, "method");
this.headers = checkNotNull(headers, "headers");
this.callOptions = checkNotNull(callOptions, "callOptions");
this.pickDetailsConsumer = checkNotNull(pickDetailsConsumer, "pickDetailsConsumer");
}
@Override
@ -55,6 +59,11 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
return method;
}
@Override
public PickDetailsConsumer getPickDetailsConsumer() {
return pickDetailsConsumer;
}
@Override
public boolean equals(Object o) {
if (this == o) {
@ -66,12 +75,13 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
PickSubchannelArgsImpl that = (PickSubchannelArgsImpl) o;
return Objects.equal(callOptions, that.callOptions)
&& Objects.equal(headers, that.headers)
&& Objects.equal(method, that.method);
&& Objects.equal(method, that.method)
&& Objects.equal(pickDetailsConsumer, that.pickDetailsConsumer);
}
@Override
public int hashCode() {
return Objects.hashCode(callOptions, headers, method);
return Objects.hashCode(callOptions, headers, method, pickDetailsConsumer);
}
@Override

View File

@ -17,12 +17,14 @@
package io.grpc.internal;
import static com.google.common.truth.Truth.assertThat;
import static io.grpc.PickSubchannelArgsMatcher.eqPickSubchannelArgs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
@ -44,6 +46,7 @@ import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.PickSubchannelArgsMatcher;
import io.grpc.Status;
import io.grpc.StringMarshaller;
import io.grpc.SynchronizationContext;
@ -344,31 +347,31 @@ public class DelayedClientTransportTest {
method, headers, failFastCallOptions, tracers);
ff1.start(mock(ClientStreamListener.class));
ff1.halfClose();
PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers,
PickSubchannelArgsMatcher ff1args = new PickSubchannelArgsMatcher(method, headers,
failFastCallOptions);
verify(transportListener).transportInUse(true);
DelayedStream ff2 = (DelayedStream) delayedTransport.newStream(
method2, headers2, failFastCallOptions, tracers);
PickSubchannelArgsImpl ff2args = new PickSubchannelArgsImpl(method2, headers2,
PickSubchannelArgsMatcher ff2args = new PickSubchannelArgsMatcher(method2, headers2,
failFastCallOptions);
DelayedStream ff3 = (DelayedStream) delayedTransport.newStream(
method, headers, failFastCallOptions, tracers);
PickSubchannelArgsImpl ff3args = new PickSubchannelArgsImpl(method, headers,
PickSubchannelArgsMatcher ff3args = new PickSubchannelArgsMatcher(method, headers,
failFastCallOptions);
DelayedStream ff4 = (DelayedStream) delayedTransport.newStream(
method2, headers2, failFastCallOptions, tracers);
PickSubchannelArgsImpl ff4args = new PickSubchannelArgsImpl(method2, headers2,
PickSubchannelArgsMatcher ff4args = new PickSubchannelArgsMatcher(method2, headers2,
failFastCallOptions);
// Wait-for-ready streams
FakeClock wfr3Executor = new FakeClock();
DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream(
method, headers, waitForReadyCallOptions, tracers);
PickSubchannelArgsImpl wfr1args = new PickSubchannelArgsImpl(method, headers,
PickSubchannelArgsMatcher wfr1args = new PickSubchannelArgsMatcher(method, headers,
waitForReadyCallOptions);
DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream(
method2, headers2, waitForReadyCallOptions, tracers);
PickSubchannelArgsImpl wfr2args = new PickSubchannelArgsImpl(method2, headers2,
PickSubchannelArgsMatcher wfr2args = new PickSubchannelArgsMatcher(method2, headers2,
waitForReadyCallOptions);
CallOptions wfr3callOptions = waitForReadyCallOptions.withExecutor(
wfr3Executor.getScheduledExecutorService());
@ -376,11 +379,11 @@ public class DelayedClientTransportTest {
method, headers, wfr3callOptions, tracers);
wfr3.start(mock(ClientStreamListener.class));
wfr3.halfClose();
PickSubchannelArgsImpl wfr3args = new PickSubchannelArgsImpl(method, headers,
PickSubchannelArgsMatcher wfr3args = new PickSubchannelArgsMatcher(method, headers,
wfr3callOptions);
DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream(
method2, headers2, waitForReadyCallOptions, tracers);
PickSubchannelArgsImpl wfr4args = new PickSubchannelArgsImpl(method2, headers2,
PickSubchannelArgsMatcher wfr4args = new PickSubchannelArgsMatcher(method2, headers2,
waitForReadyCallOptions);
assertEquals(8, delayedTransport.getPendingStreamsCount());
@ -401,14 +404,14 @@ public class DelayedClientTransportTest {
delayedTransport.reprocess(picker);
assertEquals(5, delayedTransport.getPendingStreamsCount());
inOrder.verify(picker).pickSubchannel(ff1args);
inOrder.verify(picker).pickSubchannel(ff2args);
inOrder.verify(picker).pickSubchannel(ff3args);
inOrder.verify(picker).pickSubchannel(ff4args);
inOrder.verify(picker).pickSubchannel(wfr1args);
inOrder.verify(picker).pickSubchannel(wfr2args);
inOrder.verify(picker).pickSubchannel(wfr3args);
inOrder.verify(picker).pickSubchannel(wfr4args);
inOrder.verify(picker).pickSubchannel(argThat(ff1args));
inOrder.verify(picker).pickSubchannel(argThat(ff2args));
inOrder.verify(picker).pickSubchannel(argThat(ff3args));
inOrder.verify(picker).pickSubchannel(argThat(ff4args));
inOrder.verify(picker).pickSubchannel(argThat(wfr1args));
inOrder.verify(picker).pickSubchannel(argThat(wfr2args));
inOrder.verify(picker).pickSubchannel(argThat(wfr3args));
inOrder.verify(picker).pickSubchannel(argThat(wfr4args));
inOrder.verifyNoMoreInteractions();
// Make sure that streams are created and started immediately, not in any executor. This is
@ -454,11 +457,11 @@ public class DelayedClientTransportTest {
delayedTransport.reprocess(picker);
assertEquals(0, delayedTransport.getPendingStreamsCount());
verify(transportListener).transportInUse(false);
inOrder.verify(picker).pickSubchannel(ff3args); // ff3
inOrder.verify(picker).pickSubchannel(ff4args); // ff4
inOrder.verify(picker).pickSubchannel(wfr2args); // wfr2
inOrder.verify(picker).pickSubchannel(wfr3args); // wfr3
inOrder.verify(picker).pickSubchannel(wfr4args); // wfr4
inOrder.verify(picker).pickSubchannel(argThat(ff3args)); // ff3
inOrder.verify(picker).pickSubchannel(argThat(ff4args)); // ff4
inOrder.verify(picker).pickSubchannel(argThat(wfr2args)); // wfr2
inOrder.verify(picker).pickSubchannel(argThat(wfr3args)); // wfr3
inOrder.verify(picker).pickSubchannel(argThat(wfr4args)); // wfr4
inOrder.verifyNoMoreInteractions();
fakeExecutor.runDueTasks();
assertEquals(0, fakeExecutor.numPendingTasks());
@ -478,7 +481,7 @@ public class DelayedClientTransportTest {
method, headers, waitForReadyCallOptions, tracers);
assertNull(wfr5.getRealStream());
inOrder.verify(picker).pickSubchannel(
new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions));
eqPickSubchannelArgs(method, headers, waitForReadyCallOptions));
inOrder.verifyNoMoreInteractions();
assertEquals(1, delayedTransport.getPendingStreamsCount());
@ -492,7 +495,7 @@ public class DelayedClientTransportTest {
PickResult.withSubchannel(subchannel1));
delayedTransport.reprocess(picker);
verify(picker).pickSubchannel(
new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions));
eqPickSubchannelArgs(method, headers, waitForReadyCallOptions));
fakeExecutor.runDueTasks();
assertSame(mockRealStream, wfr5.getRealStream());
assertEquals(0, delayedTransport.getPendingStreamsCount());
@ -517,7 +520,7 @@ public class DelayedClientTransportTest {
// Though picker was not originally used, it will be saved and serve future streams.
ClientStream stream = delayedTransport.newStream(
method, headers, CallOptions.DEFAULT, tracers);
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
verify(picker).pickSubchannel(eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT));
verify(mockInternalSubchannel).obtainActiveTransport();
assertSame(mockRealStream, stream);
}
@ -559,16 +562,16 @@ public class DelayedClientTransportTest {
};
sideThread.start();
PickSubchannelArgsImpl args = new PickSubchannelArgsImpl(method, headers, callOptions);
PickSubchannelArgsImpl args2 = new PickSubchannelArgsImpl(method, headers2, callOptions);
PickSubchannelArgsMatcher args = new PickSubchannelArgsMatcher(method, headers, callOptions);
PickSubchannelArgsMatcher args2 = new PickSubchannelArgsMatcher(method, headers2, callOptions);
// Is called from sideThread
verify(picker, timeout(5000)).pickSubchannel(args);
verify(picker, timeout(5000)).pickSubchannel(argThat(args));
// Because stream has not been buffered (it's still stuck in newStream()), this will do nothing,
// but incrementing the picker version.
delayedTransport.reprocess(picker);
verify(picker).pickSubchannel(args);
verify(picker).pickSubchannel(argThat(args));
// Now let the stuck newStream() through
barrier.await(5, TimeUnit.SECONDS);
@ -576,7 +579,7 @@ public class DelayedClientTransportTest {
sideThread.join(5000);
assertFalse("sideThread should've exited", sideThread.isAlive());
// newStream() detects that there has been a new picker while it's stuck, thus will pick again.
verify(picker, times(2)).pickSubchannel(args);
verify(picker, times(2)).pickSubchannel(argThat(args));
barrier.reset();
nextPickShouldWait.set(true);
@ -592,9 +595,9 @@ public class DelayedClientTransportTest {
};
sideThread2.start();
// The second stream will see the first picker
verify(picker, timeout(5000)).pickSubchannel(args2);
verify(picker, timeout(5000)).pickSubchannel(argThat(args2));
// While the first stream won't use the first picker any more.
verify(picker, times(2)).pickSubchannel(args);
verify(picker, times(2)).pickSubchannel(argThat(args));
// Now use a different picker
SubchannelPicker picker2 = mock(SubchannelPicker.class);
@ -602,9 +605,9 @@ public class DelayedClientTransportTest {
.thenReturn(PickResult.withNoResult());
delayedTransport.reprocess(picker2);
// The pending first stream uses the new picker
verify(picker2).pickSubchannel(args);
verify(picker2).pickSubchannel(argThat(args));
// The second stream is still pending in creation, doesn't use the new picker.
verify(picker2, never()).pickSubchannel(args2);
verify(picker2, never()).pickSubchannel(argThat(args2));
// Now let the second stream finish creation
barrier.await(5, TimeUnit.SECONDS);
@ -612,13 +615,30 @@ public class DelayedClientTransportTest {
sideThread2.join(5000);
assertFalse("sideThread2 should've exited", sideThread2.isAlive());
// The second stream should see the new picker
verify(picker2, timeout(5000)).pickSubchannel(args2);
verify(picker2, timeout(5000)).pickSubchannel(argThat(args2));
// Wrapping up
verify(picker, times(2)).pickSubchannel(args);
verify(picker).pickSubchannel(args2);
verify(picker2).pickSubchannel(args);
verify(picker2).pickSubchannel(args);
verify(picker, times(2)).pickSubchannel(argThat(args));
verify(picker).pickSubchannel(argThat(args2));
verify(picker2).pickSubchannel(argThat(args));
verify(picker2).pickSubchannel(argThat(args));
}
@Test
public void reprocess_addOptionalLabelCallsTracer() throws Exception {
delayedTransport.reprocess(new SubchannelPicker() {
@Override public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getPickDetailsConsumer().addOptionalLabel("routed", "perfectly");
return PickResult.withError(Status.UNAVAILABLE.withDescription("expected"));
}
});
ClientStreamTracer tracer = mock(ClientStreamTracer.class);
ClientStream stream = delayedTransport.newStream(
method, headers, callOptions, new ClientStreamTracer[] {tracer});
stream.start(streamListener);
verify(tracer).addOptionalLabel("routed", "perfectly");
}
@Test

View File

@ -26,6 +26,7 @@ import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
import static io.grpc.PickSubchannelArgsMatcher.eqPickSubchannelArgs;
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static junit.framework.TestCase.assertNotSame;
import static org.junit.Assert.assertEquals;
@ -617,6 +618,32 @@ public class ManagedChannelImplTest {
TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
}
@Test
public void pickSubchannelAddOptionalLabel_callsTracer() {
channelBuilder.directExecutor();
createChannel();
updateBalancingStateSafely(helper, TRANSIENT_FAILURE, new SubchannelPicker() {
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
args.getPickDetailsConsumer().addOptionalLabel("routed", "perfectly");
return PickResult.withError(Status.UNAVAILABLE.withDescription("expected"));
}
});
ClientStreamTracer tracer = mock(ClientStreamTracer.class);
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
@Override
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
return tracer;
}
};
ClientCall<String, Integer> call = channel.newCall(
method, CallOptions.DEFAULT.withStreamTracerFactory(tracerFactory));
call.start(mockCallListener, new Metadata());
verify(tracer).addOptionalLabel("routed", "perfectly");
}
@Test
public void shutdownWithNoTransportsEverCreated() {
channelBuilder.nameResolverFactory(
@ -808,10 +835,10 @@ public class ManagedChannelImplTest {
.thenReturn(mockStream2);
transportListener.transportReady();
when(mockPicker.pickSubchannel(
new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn(
eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT))).thenReturn(
PickResult.withNoResult());
when(mockPicker.pickSubchannel(
new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn(
eqPickSubchannelArgs(method, headers2, CallOptions.DEFAULT))).thenReturn(
PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, mockPicker);
@ -875,7 +902,7 @@ public class ManagedChannelImplTest {
assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
// call and call2 are still alive, and can still be assigned to a real transport
SubchannelPicker picker2 = mock(SubchannelPicker.class);
when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)))
when(picker2.pickSubchannel(eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT)))
.thenReturn(PickResult.withSubchannel(subchannel));
updateBalancingStateSafely(helper, READY, picker2);
executor.runDueTasks();
@ -4531,4 +4558,4 @@ public class ManagedChannelImplTest {
return ManagedChannelServiceConfig
.fromServiceConfig(rawServiceConfig, true, 3, 4, policySelection);
}
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
import io.grpc.CallOptions;
import io.grpc.InternalConfigSelector;
import io.grpc.InternalConfigSelector.Result;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
@ -209,7 +210,8 @@ public class ManagedChannelServiceConfigTest {
InternalConfigSelector configSelector = serviceConfig.getDefaultConfigSelector();
MethodDescriptor<?, ?> method = methodForName("service1", "method1");
Result result = configSelector.selectConfig(
new PickSubchannelArgsImpl(method, new Metadata(), CallOptions.DEFAULT));
new PickSubchannelArgsImpl(
method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {}));
MethodInfo methodInfoFromDefaultConfigSelector =
((ManagedChannelServiceConfig) result.getConfig()).getMethodConfig(method);
assertThat(methodInfoFromDefaultConfigSelector)

View File

@ -100,7 +100,7 @@ public class RpcBehaviorLoadBalancerProviderTest {
@Test
public void pickerAddsRpcBehaviorMetadata() {
PickSubchannelArgsImpl args = new PickSubchannelArgsImpl(TestMethodDescriptors.voidMethod(),
new Metadata(), CallOptions.DEFAULT);
new Metadata(), CallOptions.DEFAULT, new LoadBalancer.PickDetailsConsumer() {});
new RpcBehaviorPicker(mockPicker, "error-code-15").pickSubchannel(args);
assertThat(args.getHeaders()

View File

@ -43,6 +43,7 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.ForwardingChannelBuilder2;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancerProvider;
@ -383,7 +384,8 @@ public class CachingRlsLbClientTest {
.setFullMethodName("doesn/exists")
.build(),
headers,
CallOptions.DEFAULT));
CallOptions.DEFAULT,
new PickDetailsConsumer() {}));
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(1);
@ -438,7 +440,8 @@ public class CachingRlsLbClientTest {
TestMethodDescriptors.voidMethod().toBuilder().setFullMethodName("service1/create")
.build(),
headers,
CallOptions.DEFAULT));
CallOptions.DEFAULT,
new PickDetailsConsumer() {}));
}
@Test
@ -521,7 +524,8 @@ public class CachingRlsLbClientTest {
.setFullMethodName("doesn/exists")
.build(),
headers,
CallOptions.DEFAULT);
CallOptions.DEFAULT,
new PickDetailsConsumer() {});
return invalidArgs;
}

View File

@ -39,7 +39,9 @@ import io.grpc.EquivalentAddressGroup;
import io.grpc.ForwardingChannelBuilder2;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
@ -120,8 +122,8 @@ public class RlsLoadBalancerTest {
private MethodDescriptor<Object, Object> fakeRescueMethod;
private RlsLoadBalancer rlsLb;
private String defaultTarget = "defaultTarget";
private PickSubchannelArgsImpl searchSubchannelArgs;
private PickSubchannelArgsImpl rescueSubchannelArgs;
private PickSubchannelArgs searchSubchannelArgs;
private PickSubchannelArgs rescueSubchannelArgs;
@Before
public void setUp() {
@ -163,12 +165,8 @@ public class RlsLoadBalancerTest {
}
};
Metadata headers = new Metadata();
searchSubchannelArgs =
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT);
rescueSubchannelArgs =
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT);
searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);
}
@After
@ -183,9 +181,7 @@ public class RlsLoadBalancerTest {
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Metadata headers = new Metadata();
PickSubchannelArgsImpl fakeSearchMethodArgs =
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT);
PickSubchannelArgs fakeSearchMethodArgs = newPickSubchannelArgs(fakeSearchMethod);
// Warm-up pick; will be queued
PickResult res = picker.pickSubchannel(fakeSearchMethodArgs);
assertThat(res.getStatus().isOk()).isTrue();
@ -332,7 +328,6 @@ public class RlsLoadBalancerTest {
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Metadata headers = new Metadata();
// Warm-up pick; will be queued
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
assertThat(res.getStatus().isOk()).isTrue();
@ -375,13 +370,11 @@ public class RlsLoadBalancerTest {
// search method will fail because there is no fallback target.
picker = pickerCaptor.getValue();
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isFalse();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
res = picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod));
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses());
assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes());
@ -401,10 +394,7 @@ public class RlsLoadBalancerTest {
inOrder.verify(helper)
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
SubchannelPicker picker = pickerCaptor.getValue();
Metadata headers = new Metadata();
PickResult res =
picker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
PickResult res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isTrue();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
@ -418,8 +408,7 @@ public class RlsLoadBalancerTest {
SubchannelPicker picker2 = pickerCaptor.getValue();
assertThat(picker2).isEqualTo(picker);
res = picker2.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
res = picker2.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
// verify success. Subchannel is wrapped, so checking attributes.
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses());
@ -432,14 +421,13 @@ public class RlsLoadBalancerTest {
verify(helper)
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
SubchannelPicker failedPicker = pickerCaptor.getValue();
res = failedPicker.pickSubchannel(
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
assertThat(res.getStatus().isOk()).isFalse();
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
}
private PickResult markReadyAndGetPickResult(InOrder inOrder,
PickSubchannelArgsImpl pickSubchannelArgs) {
PickSubchannelArgs pickSubchannelArgs) {
subchannels.getLast().updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
inOrder.verify(helper, atLeast(1))
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
@ -504,6 +492,11 @@ public class RlsLoadBalancerTest {
+ "}";
}
private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor<?, ?> method) {
return new PickSubchannelArgsImpl(
method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {});
}
private final class FakeHelper extends Helper {
@Override

View File

@ -53,6 +53,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
delegate().inboundTrailers(trailers);
}
@Override
public void addOptionalLabel(String key, String value) {
delegate().addOptionalLabel(key, value);
}
@Override
public void streamClosed(Status status) {
delegate().streamClosed(status);

View File

@ -37,6 +37,7 @@ import io.grpc.ConnectivityState;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
@ -310,7 +311,8 @@ public class ClusterManagerLoadBalancerTest {
.build(),
new Metadata(),
CallOptions.DEFAULT.withOption(
XdsNameResolver.CLUSTER_SELECTION_KEY, clusterName));
XdsNameResolver.CLUSTER_SELECTION_KEY, clusterName),
new PickDetailsConsumer() {});
return picker.pickSubchannel(args);
}

View File

@ -50,6 +50,7 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickResult;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.LoadBalancer.ResolvedAddresses;
@ -461,13 +462,14 @@ public class RingHashLoadBalancerTest {
assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(1));
}
private PickSubchannelArgsImpl getDefaultPickSubchannelArgs(long rpcHash) {
private PickSubchannelArgs getDefaultPickSubchannelArgs(long rpcHash) {
return new PickSubchannelArgsImpl(
TestMethodDescriptors.voidMethod(), new Metadata(),
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash));
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash),
new PickDetailsConsumer() {});
}
private PickSubchannelArgsImpl getDefaultPickSubchannelArgsForServer(int serverid) {
private PickSubchannelArgs getDefaultPickSubchannelArgsForServer(int serverid) {
long rpcHash = hashFunc.hashAsciiString("FakeSocketAddress-server" + serverid + "_0");
return getDefaultPickSubchannelArgs(rpcHash);
}

View File

@ -50,6 +50,8 @@ import io.grpc.Deadline;
import io.grpc.InsecureChannelCredentials;
import io.grpc.InternalConfigSelector;
import io.grpc.InternalConfigSelector.Result;
import io.grpc.LoadBalancer.PickDetailsConsumer;
import io.grpc.LoadBalancer.PickSubchannelArgs;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
@ -662,7 +664,7 @@ public class XdsNameResolverTest {
ResolutionResult result = resolutionResultCaptor.getValue();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
Result selectResult = configSelector.selectConfig(
new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
Object config = selectResult.getConfig();
// Purely validating the data (io.grpc.internal.RetryPolicy).
@ -693,7 +695,7 @@ public class XdsNameResolverTest {
InternalConfigSelector configSelector = resolveToClusters();
CallInfo call = new CallInfo("FooService", "barMethod");
Result selectResult = configSelector.selectConfig(
new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
Status status = selectResult.getStatus();
assertThat(status.isOk()).isFalse();
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
@ -727,7 +729,7 @@ public class XdsNameResolverTest {
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
// Simulates making a call1 RPC.
Result selectResult = configSelector.selectConfig(
new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
Status status = selectResult.getStatus();
assertThat(status.isOk()).isFalse();
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
@ -1166,7 +1168,7 @@ public class XdsNameResolverTest {
assertThat((Map<String, ?>) result.getServiceConfig().getConfig()).isEmpty();
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
Result configResult = configSelector.selectConfig(
new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
assertThat(configResult.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
assertThat(configResult.getStatus().getDescription()).contains(resource);
}
@ -1175,7 +1177,7 @@ public class XdsNameResolverTest {
CallInfo call, InternalConfigSelector configSelector, String expectedCluster,
@Nullable Double expectedTimeoutSec) {
Result result = configSelector.selectConfig(
new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
assertThat(result.getStatus().isOk()).isTrue();
ClientInterceptor interceptor = result.getInterceptor();
ClientCall<Void, Void> clientCall = interceptor.interceptCall(
@ -1203,7 +1205,7 @@ public class XdsNameResolverTest {
CallInfo call, InternalConfigSelector configSelector, String expectedPluginName,
Double expectedTimeoutSec) {
Result result = configSelector.selectConfig(
new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
assertThat(result.getStatus().isOk()).isTrue();
ClientInterceptor interceptor = result.getInterceptor();
ClientCall<Void, Void> clientCall = interceptor.interceptCall(
@ -1850,8 +1852,7 @@ public class XdsNameResolverTest {
}
@SuppressWarnings("unchecked")
ClientCall.Listener<RespT> listener = mock(ClientCall.Listener.class);
Result result = selector.selectConfig(new PickSubchannelArgsImpl(
method, metadata, callOptions));
Result result = selector.selectConfig(newPickSubchannelArgs(method, metadata, callOptions));
ClientCall<ReqT, RespT> call = ClientInterceptors.intercept(channel,
result.getInterceptor()).newCall(method, callOptions);
call.start(listener, metadata);
@ -1889,6 +1890,11 @@ public class XdsNameResolverTest {
verifyRpcFailed(listener, expectedStatus);
}
private PickSubchannelArgs newPickSubchannelArgs(
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
return new PickSubchannelArgsImpl(method, headers, callOptions, new PickDetailsConsumer() {});
}
private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory {
@Override
public void setBootstrapOverride(Map<String, ?> bootstrap) {}