mirror of https://github.com/grpc/grpc-java.git
Plumb optional labels from LB to ClientStreamTracer
As part of gRFC A78: > To support the locality label in the per-call metrics, we will provide > a mechanism for LB picker to add optional labels to the call attempt > tracer.
This commit is contained in:
parent
06df25b65d
commit
4c78a9746c
|
|
@ -79,6 +79,13 @@ public abstract class ClientStreamTracer extends StreamTracer {
|
|||
public void inboundTrailers(Metadata trailers) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Information providing context to the call became available.
|
||||
*/
|
||||
@Internal
|
||||
public void addOptionalLabel(String key, String value) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory class for {@link ClientStreamTracer}.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -490,6 +490,29 @@ public abstract class LoadBalancer {
|
|||
* @since 1.2.0
|
||||
*/
|
||||
public abstract MethodDescriptor<?, ?> getMethodDescriptor();
|
||||
|
||||
/**
|
||||
* Gets an object that can be informed about what sort of pick was made.
|
||||
*/
|
||||
@Internal
|
||||
public PickDetailsConsumer getPickDetailsConsumer() {
|
||||
return new PickDetailsConsumer() {};
|
||||
}
|
||||
}
|
||||
|
||||
/** Receives information about the pick being chosen. */
|
||||
@Internal
|
||||
public interface PickDetailsConsumer {
|
||||
/**
|
||||
* Optional labels that provide context of how the pick was routed. Particularly helpful for
|
||||
* per-RPC metrics.
|
||||
*
|
||||
* @throws NullPointerException if key or value is {@code null}
|
||||
*/
|
||||
default void addOptionalLabel(String key, String value) {
|
||||
checkNotNull(key, "key");
|
||||
checkNotNull(value, "value");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import org.mockito.ArgumentMatcher;
|
||||
import org.mockito.ArgumentMatchers;
|
||||
|
||||
/**
|
||||
* Mockito Matcher for {@link PickSubchannelArgs}.
|
||||
*/
|
||||
public final class PickSubchannelArgsMatcher implements ArgumentMatcher<PickSubchannelArgs> {
|
||||
private final MethodDescriptor<?, ?> method;
|
||||
private final Metadata headers;
|
||||
private final CallOptions callOptions;
|
||||
|
||||
public PickSubchannelArgsMatcher(
|
||||
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
|
||||
this.method = Preconditions.checkNotNull(method, "method");
|
||||
this.headers = Preconditions.checkNotNull(headers, "headers");
|
||||
this.callOptions = Preconditions.checkNotNull(callOptions, "callOptions");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean matches(PickSubchannelArgs args) {
|
||||
return args != null
|
||||
&& method.equals(args.getMethodDescriptor())
|
||||
&& headers.equals(args.getHeaders())
|
||||
&& callOptions.equals(args.getCallOptions());
|
||||
}
|
||||
|
||||
@Override
|
||||
public final String toString() {
|
||||
return "[method=" + method + " headers=" + headers + " callOptions=" + callOptions + "]";
|
||||
}
|
||||
|
||||
public static PickSubchannelArgs eqPickSubchannelArgs(
|
||||
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
|
||||
return ArgumentMatchers.argThat(new PickSubchannelArgsMatcher(method, headers, callOptions));
|
||||
}
|
||||
}
|
||||
|
|
@ -137,7 +137,8 @@ final class DelayedClientTransport implements ManagedClientTransport {
|
|||
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
|
||||
ClientStreamTracer[] tracers) {
|
||||
try {
|
||||
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
|
||||
PickSubchannelArgs args = new PickSubchannelArgsImpl(
|
||||
method, headers, callOptions, new PickDetailsConsumerImpl(tracers));
|
||||
SubchannelPicker picker = null;
|
||||
long pickerVersion = -1;
|
||||
while (true) {
|
||||
|
|
|
|||
|
|
@ -54,6 +54,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
|
|||
delegate().inboundTrailers(trailers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addOptionalLabel(String key, String value) {
|
||||
delegate().addOptionalLabel(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamClosed(Status status) {
|
||||
delegate().streamClosed(status);
|
||||
|
|
|
|||
|
|
@ -158,6 +158,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
throw new IllegalStateException("Resolution is pending");
|
||||
}
|
||||
};
|
||||
private static final LoadBalancer.PickDetailsConsumer NOOP_PICK_DETAILS_CONSUMER =
|
||||
new LoadBalancer.PickDetailsConsumer() {};
|
||||
|
||||
private final InternalLogId logId;
|
||||
private final String target;
|
||||
|
|
@ -519,11 +521,11 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
final Metadata headers,
|
||||
final Context context) {
|
||||
if (!retryEnabled) {
|
||||
ClientTransport transport =
|
||||
getTransport(new PickSubchannelArgsImpl(method, headers, callOptions));
|
||||
Context origContext = context.attach();
|
||||
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
|
||||
callOptions, headers, 0, /* isTransparentRetry= */ false);
|
||||
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
|
||||
method, headers, callOptions, new PickDetailsConsumerImpl(tracers)));
|
||||
Context origContext = context.attach();
|
||||
try {
|
||||
return transport.newStream(method, headers, callOptions, tracers);
|
||||
} finally {
|
||||
|
|
@ -566,8 +568,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
CallOptions newOptions = callOptions.withStreamTracerFactory(factory);
|
||||
ClientStreamTracer[] tracers = GrpcUtil.getClientStreamTracers(
|
||||
newOptions, newHeaders, previousAttempts, isTransparentRetry);
|
||||
ClientTransport transport =
|
||||
getTransport(new PickSubchannelArgsImpl(method, newHeaders, newOptions));
|
||||
ClientTransport transport = getTransport(new PickSubchannelArgsImpl(
|
||||
method, newHeaders, newOptions, new PickDetailsConsumerImpl(tracers)));
|
||||
Context origContext = context.attach();
|
||||
try {
|
||||
return transport.newStream(method, newHeaders, newOptions, tracers);
|
||||
|
|
@ -1207,7 +1209,8 @@ final class ManagedChannelImpl extends ManagedChannel implements
|
|||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void start(Listener<RespT> observer, Metadata headers) {
|
||||
PickSubchannelArgs args = new PickSubchannelArgsImpl(method, headers, callOptions);
|
||||
PickSubchannelArgs args =
|
||||
new PickSubchannelArgsImpl(method, headers, callOptions, NOOP_PICK_DETAILS_CONSUMER);
|
||||
InternalConfigSelector.Result result = configSelector.selectConfig(args);
|
||||
Status status = result.getStatus();
|
||||
if (!status.isOk()) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright 2024 The gRPC Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import io.grpc.ClientStreamTracer;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
|
||||
/**
|
||||
* Adapter for tracers into details consumers.
|
||||
*/
|
||||
final class PickDetailsConsumerImpl implements PickDetailsConsumer {
|
||||
private final ClientStreamTracer[] tracers;
|
||||
|
||||
/** Construct a consumer with unchanging tracers array. */
|
||||
public PickDetailsConsumerImpl(ClientStreamTracer[] tracers) {
|
||||
this.tracers = Preconditions.checkNotNull(tracers, "tracers");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addOptionalLabel(String key, String value) {
|
||||
Preconditions.checkNotNull(key, "key");
|
||||
Preconditions.checkNotNull(value, "value");
|
||||
for (ClientStreamTracer tracer : tracers) {
|
||||
tracer.addOptionalLabel(key, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
|||
|
||||
import com.google.common.base.Objects;
|
||||
import io.grpc.CallOptions;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
|
|
@ -29,15 +30,18 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
|
|||
private final CallOptions callOptions;
|
||||
private final Metadata headers;
|
||||
private final MethodDescriptor<?, ?> method;
|
||||
private final PickDetailsConsumer pickDetailsConsumer;
|
||||
|
||||
/**
|
||||
* Creates call args object for given method with its call options, metadata.
|
||||
*/
|
||||
public PickSubchannelArgsImpl(
|
||||
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
|
||||
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions,
|
||||
PickDetailsConsumer pickDetailsConsumer) {
|
||||
this.method = checkNotNull(method, "method");
|
||||
this.headers = checkNotNull(headers, "headers");
|
||||
this.callOptions = checkNotNull(callOptions, "callOptions");
|
||||
this.pickDetailsConsumer = checkNotNull(pickDetailsConsumer, "pickDetailsConsumer");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -55,6 +59,11 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
|
|||
return method;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PickDetailsConsumer getPickDetailsConsumer() {
|
||||
return pickDetailsConsumer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) {
|
||||
|
|
@ -66,12 +75,13 @@ public final class PickSubchannelArgsImpl extends PickSubchannelArgs {
|
|||
PickSubchannelArgsImpl that = (PickSubchannelArgsImpl) o;
|
||||
return Objects.equal(callOptions, that.callOptions)
|
||||
&& Objects.equal(headers, that.headers)
|
||||
&& Objects.equal(method, that.method);
|
||||
&& Objects.equal(method, that.method)
|
||||
&& Objects.equal(pickDetailsConsumer, that.pickDetailsConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hashCode(callOptions, headers, method);
|
||||
return Objects.hashCode(callOptions, headers, method, pickDetailsConsumer);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -17,12 +17,14 @@
|
|||
package io.grpc.internal;
|
||||
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static io.grpc.PickSubchannelArgsMatcher.eqPickSubchannelArgs;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.argThat;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.ArgumentMatchers.same;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
|
|
@ -44,6 +46,7 @@ import io.grpc.LoadBalancer.SubchannelPicker;
|
|||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
import io.grpc.PickSubchannelArgsMatcher;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.SynchronizationContext;
|
||||
|
|
@ -344,31 +347,31 @@ public class DelayedClientTransportTest {
|
|||
method, headers, failFastCallOptions, tracers);
|
||||
ff1.start(mock(ClientStreamListener.class));
|
||||
ff1.halfClose();
|
||||
PickSubchannelArgsImpl ff1args = new PickSubchannelArgsImpl(method, headers,
|
||||
PickSubchannelArgsMatcher ff1args = new PickSubchannelArgsMatcher(method, headers,
|
||||
failFastCallOptions);
|
||||
verify(transportListener).transportInUse(true);
|
||||
DelayedStream ff2 = (DelayedStream) delayedTransport.newStream(
|
||||
method2, headers2, failFastCallOptions, tracers);
|
||||
PickSubchannelArgsImpl ff2args = new PickSubchannelArgsImpl(method2, headers2,
|
||||
PickSubchannelArgsMatcher ff2args = new PickSubchannelArgsMatcher(method2, headers2,
|
||||
failFastCallOptions);
|
||||
DelayedStream ff3 = (DelayedStream) delayedTransport.newStream(
|
||||
method, headers, failFastCallOptions, tracers);
|
||||
PickSubchannelArgsImpl ff3args = new PickSubchannelArgsImpl(method, headers,
|
||||
PickSubchannelArgsMatcher ff3args = new PickSubchannelArgsMatcher(method, headers,
|
||||
failFastCallOptions);
|
||||
DelayedStream ff4 = (DelayedStream) delayedTransport.newStream(
|
||||
method2, headers2, failFastCallOptions, tracers);
|
||||
PickSubchannelArgsImpl ff4args = new PickSubchannelArgsImpl(method2, headers2,
|
||||
PickSubchannelArgsMatcher ff4args = new PickSubchannelArgsMatcher(method2, headers2,
|
||||
failFastCallOptions);
|
||||
|
||||
// Wait-for-ready streams
|
||||
FakeClock wfr3Executor = new FakeClock();
|
||||
DelayedStream wfr1 = (DelayedStream) delayedTransport.newStream(
|
||||
method, headers, waitForReadyCallOptions, tracers);
|
||||
PickSubchannelArgsImpl wfr1args = new PickSubchannelArgsImpl(method, headers,
|
||||
PickSubchannelArgsMatcher wfr1args = new PickSubchannelArgsMatcher(method, headers,
|
||||
waitForReadyCallOptions);
|
||||
DelayedStream wfr2 = (DelayedStream) delayedTransport.newStream(
|
||||
method2, headers2, waitForReadyCallOptions, tracers);
|
||||
PickSubchannelArgsImpl wfr2args = new PickSubchannelArgsImpl(method2, headers2,
|
||||
PickSubchannelArgsMatcher wfr2args = new PickSubchannelArgsMatcher(method2, headers2,
|
||||
waitForReadyCallOptions);
|
||||
CallOptions wfr3callOptions = waitForReadyCallOptions.withExecutor(
|
||||
wfr3Executor.getScheduledExecutorService());
|
||||
|
|
@ -376,11 +379,11 @@ public class DelayedClientTransportTest {
|
|||
method, headers, wfr3callOptions, tracers);
|
||||
wfr3.start(mock(ClientStreamListener.class));
|
||||
wfr3.halfClose();
|
||||
PickSubchannelArgsImpl wfr3args = new PickSubchannelArgsImpl(method, headers,
|
||||
PickSubchannelArgsMatcher wfr3args = new PickSubchannelArgsMatcher(method, headers,
|
||||
wfr3callOptions);
|
||||
DelayedStream wfr4 = (DelayedStream) delayedTransport.newStream(
|
||||
method2, headers2, waitForReadyCallOptions, tracers);
|
||||
PickSubchannelArgsImpl wfr4args = new PickSubchannelArgsImpl(method2, headers2,
|
||||
PickSubchannelArgsMatcher wfr4args = new PickSubchannelArgsMatcher(method2, headers2,
|
||||
waitForReadyCallOptions);
|
||||
|
||||
assertEquals(8, delayedTransport.getPendingStreamsCount());
|
||||
|
|
@ -401,14 +404,14 @@ public class DelayedClientTransportTest {
|
|||
delayedTransport.reprocess(picker);
|
||||
|
||||
assertEquals(5, delayedTransport.getPendingStreamsCount());
|
||||
inOrder.verify(picker).pickSubchannel(ff1args);
|
||||
inOrder.verify(picker).pickSubchannel(ff2args);
|
||||
inOrder.verify(picker).pickSubchannel(ff3args);
|
||||
inOrder.verify(picker).pickSubchannel(ff4args);
|
||||
inOrder.verify(picker).pickSubchannel(wfr1args);
|
||||
inOrder.verify(picker).pickSubchannel(wfr2args);
|
||||
inOrder.verify(picker).pickSubchannel(wfr3args);
|
||||
inOrder.verify(picker).pickSubchannel(wfr4args);
|
||||
inOrder.verify(picker).pickSubchannel(argThat(ff1args));
|
||||
inOrder.verify(picker).pickSubchannel(argThat(ff2args));
|
||||
inOrder.verify(picker).pickSubchannel(argThat(ff3args));
|
||||
inOrder.verify(picker).pickSubchannel(argThat(ff4args));
|
||||
inOrder.verify(picker).pickSubchannel(argThat(wfr1args));
|
||||
inOrder.verify(picker).pickSubchannel(argThat(wfr2args));
|
||||
inOrder.verify(picker).pickSubchannel(argThat(wfr3args));
|
||||
inOrder.verify(picker).pickSubchannel(argThat(wfr4args));
|
||||
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
// Make sure that streams are created and started immediately, not in any executor. This is
|
||||
|
|
@ -454,11 +457,11 @@ public class DelayedClientTransportTest {
|
|||
delayedTransport.reprocess(picker);
|
||||
assertEquals(0, delayedTransport.getPendingStreamsCount());
|
||||
verify(transportListener).transportInUse(false);
|
||||
inOrder.verify(picker).pickSubchannel(ff3args); // ff3
|
||||
inOrder.verify(picker).pickSubchannel(ff4args); // ff4
|
||||
inOrder.verify(picker).pickSubchannel(wfr2args); // wfr2
|
||||
inOrder.verify(picker).pickSubchannel(wfr3args); // wfr3
|
||||
inOrder.verify(picker).pickSubchannel(wfr4args); // wfr4
|
||||
inOrder.verify(picker).pickSubchannel(argThat(ff3args)); // ff3
|
||||
inOrder.verify(picker).pickSubchannel(argThat(ff4args)); // ff4
|
||||
inOrder.verify(picker).pickSubchannel(argThat(wfr2args)); // wfr2
|
||||
inOrder.verify(picker).pickSubchannel(argThat(wfr3args)); // wfr3
|
||||
inOrder.verify(picker).pickSubchannel(argThat(wfr4args)); // wfr4
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
fakeExecutor.runDueTasks();
|
||||
assertEquals(0, fakeExecutor.numPendingTasks());
|
||||
|
|
@ -478,7 +481,7 @@ public class DelayedClientTransportTest {
|
|||
method, headers, waitForReadyCallOptions, tracers);
|
||||
assertNull(wfr5.getRealStream());
|
||||
inOrder.verify(picker).pickSubchannel(
|
||||
new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions));
|
||||
eqPickSubchannelArgs(method, headers, waitForReadyCallOptions));
|
||||
inOrder.verifyNoMoreInteractions();
|
||||
assertEquals(1, delayedTransport.getPendingStreamsCount());
|
||||
|
||||
|
|
@ -492,7 +495,7 @@ public class DelayedClientTransportTest {
|
|||
PickResult.withSubchannel(subchannel1));
|
||||
delayedTransport.reprocess(picker);
|
||||
verify(picker).pickSubchannel(
|
||||
new PickSubchannelArgsImpl(method, headers, waitForReadyCallOptions));
|
||||
eqPickSubchannelArgs(method, headers, waitForReadyCallOptions));
|
||||
fakeExecutor.runDueTasks();
|
||||
assertSame(mockRealStream, wfr5.getRealStream());
|
||||
assertEquals(0, delayedTransport.getPendingStreamsCount());
|
||||
|
|
@ -517,7 +520,7 @@ public class DelayedClientTransportTest {
|
|||
// Though picker was not originally used, it will be saved and serve future streams.
|
||||
ClientStream stream = delayedTransport.newStream(
|
||||
method, headers, CallOptions.DEFAULT, tracers);
|
||||
verify(picker).pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT));
|
||||
verify(picker).pickSubchannel(eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT));
|
||||
verify(mockInternalSubchannel).obtainActiveTransport();
|
||||
assertSame(mockRealStream, stream);
|
||||
}
|
||||
|
|
@ -559,16 +562,16 @@ public class DelayedClientTransportTest {
|
|||
};
|
||||
sideThread.start();
|
||||
|
||||
PickSubchannelArgsImpl args = new PickSubchannelArgsImpl(method, headers, callOptions);
|
||||
PickSubchannelArgsImpl args2 = new PickSubchannelArgsImpl(method, headers2, callOptions);
|
||||
PickSubchannelArgsMatcher args = new PickSubchannelArgsMatcher(method, headers, callOptions);
|
||||
PickSubchannelArgsMatcher args2 = new PickSubchannelArgsMatcher(method, headers2, callOptions);
|
||||
|
||||
// Is called from sideThread
|
||||
verify(picker, timeout(5000)).pickSubchannel(args);
|
||||
verify(picker, timeout(5000)).pickSubchannel(argThat(args));
|
||||
|
||||
// Because stream has not been buffered (it's still stuck in newStream()), this will do nothing,
|
||||
// but incrementing the picker version.
|
||||
delayedTransport.reprocess(picker);
|
||||
verify(picker).pickSubchannel(args);
|
||||
verify(picker).pickSubchannel(argThat(args));
|
||||
|
||||
// Now let the stuck newStream() through
|
||||
barrier.await(5, TimeUnit.SECONDS);
|
||||
|
|
@ -576,7 +579,7 @@ public class DelayedClientTransportTest {
|
|||
sideThread.join(5000);
|
||||
assertFalse("sideThread should've exited", sideThread.isAlive());
|
||||
// newStream() detects that there has been a new picker while it's stuck, thus will pick again.
|
||||
verify(picker, times(2)).pickSubchannel(args);
|
||||
verify(picker, times(2)).pickSubchannel(argThat(args));
|
||||
|
||||
barrier.reset();
|
||||
nextPickShouldWait.set(true);
|
||||
|
|
@ -592,9 +595,9 @@ public class DelayedClientTransportTest {
|
|||
};
|
||||
sideThread2.start();
|
||||
// The second stream will see the first picker
|
||||
verify(picker, timeout(5000)).pickSubchannel(args2);
|
||||
verify(picker, timeout(5000)).pickSubchannel(argThat(args2));
|
||||
// While the first stream won't use the first picker any more.
|
||||
verify(picker, times(2)).pickSubchannel(args);
|
||||
verify(picker, times(2)).pickSubchannel(argThat(args));
|
||||
|
||||
// Now use a different picker
|
||||
SubchannelPicker picker2 = mock(SubchannelPicker.class);
|
||||
|
|
@ -602,9 +605,9 @@ public class DelayedClientTransportTest {
|
|||
.thenReturn(PickResult.withNoResult());
|
||||
delayedTransport.reprocess(picker2);
|
||||
// The pending first stream uses the new picker
|
||||
verify(picker2).pickSubchannel(args);
|
||||
verify(picker2).pickSubchannel(argThat(args));
|
||||
// The second stream is still pending in creation, doesn't use the new picker.
|
||||
verify(picker2, never()).pickSubchannel(args2);
|
||||
verify(picker2, never()).pickSubchannel(argThat(args2));
|
||||
|
||||
// Now let the second stream finish creation
|
||||
barrier.await(5, TimeUnit.SECONDS);
|
||||
|
|
@ -612,13 +615,30 @@ public class DelayedClientTransportTest {
|
|||
sideThread2.join(5000);
|
||||
assertFalse("sideThread2 should've exited", sideThread2.isAlive());
|
||||
// The second stream should see the new picker
|
||||
verify(picker2, timeout(5000)).pickSubchannel(args2);
|
||||
verify(picker2, timeout(5000)).pickSubchannel(argThat(args2));
|
||||
|
||||
// Wrapping up
|
||||
verify(picker, times(2)).pickSubchannel(args);
|
||||
verify(picker).pickSubchannel(args2);
|
||||
verify(picker2).pickSubchannel(args);
|
||||
verify(picker2).pickSubchannel(args);
|
||||
verify(picker, times(2)).pickSubchannel(argThat(args));
|
||||
verify(picker).pickSubchannel(argThat(args2));
|
||||
verify(picker2).pickSubchannel(argThat(args));
|
||||
verify(picker2).pickSubchannel(argThat(args));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void reprocess_addOptionalLabelCallsTracer() throws Exception {
|
||||
delayedTransport.reprocess(new SubchannelPicker() {
|
||||
@Override public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
args.getPickDetailsConsumer().addOptionalLabel("routed", "perfectly");
|
||||
return PickResult.withError(Status.UNAVAILABLE.withDescription("expected"));
|
||||
}
|
||||
});
|
||||
|
||||
ClientStreamTracer tracer = mock(ClientStreamTracer.class);
|
||||
ClientStream stream = delayedTransport.newStream(
|
||||
method, headers, callOptions, new ClientStreamTracer[] {tracer});
|
||||
stream.start(streamListener);
|
||||
|
||||
verify(tracer).addOptionalLabel("routed", "perfectly");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ import static io.grpc.ConnectivityState.READY;
|
|||
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
|
||||
import static io.grpc.EquivalentAddressGroup.ATTR_AUTHORITY_OVERRIDE;
|
||||
import static io.grpc.PickSubchannelArgsMatcher.eqPickSubchannelArgs;
|
||||
import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
|
||||
import static junit.framework.TestCase.assertNotSame;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
|
@ -617,6 +618,32 @@ public class ManagedChannelImplTest {
|
|||
TimeUnit.SECONDS.toNanos(ManagedChannelImpl.SUBCHANNEL_SHUTDOWN_DELAY_SECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void pickSubchannelAddOptionalLabel_callsTracer() {
|
||||
channelBuilder.directExecutor();
|
||||
createChannel();
|
||||
|
||||
updateBalancingStateSafely(helper, TRANSIENT_FAILURE, new SubchannelPicker() {
|
||||
@Override
|
||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||
args.getPickDetailsConsumer().addOptionalLabel("routed", "perfectly");
|
||||
return PickResult.withError(Status.UNAVAILABLE.withDescription("expected"));
|
||||
}
|
||||
});
|
||||
ClientStreamTracer tracer = mock(ClientStreamTracer.class);
|
||||
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
|
||||
@Override
|
||||
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||
return tracer;
|
||||
}
|
||||
};
|
||||
ClientCall<String, Integer> call = channel.newCall(
|
||||
method, CallOptions.DEFAULT.withStreamTracerFactory(tracerFactory));
|
||||
call.start(mockCallListener, new Metadata());
|
||||
|
||||
verify(tracer).addOptionalLabel("routed", "perfectly");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shutdownWithNoTransportsEverCreated() {
|
||||
channelBuilder.nameResolverFactory(
|
||||
|
|
@ -808,10 +835,10 @@ public class ManagedChannelImplTest {
|
|||
.thenReturn(mockStream2);
|
||||
transportListener.transportReady();
|
||||
when(mockPicker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT))).thenReturn(
|
||||
eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT))).thenReturn(
|
||||
PickResult.withNoResult());
|
||||
when(mockPicker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(method, headers2, CallOptions.DEFAULT))).thenReturn(
|
||||
eqPickSubchannelArgs(method, headers2, CallOptions.DEFAULT))).thenReturn(
|
||||
PickResult.withSubchannel(subchannel));
|
||||
updateBalancingStateSafely(helper, READY, mockPicker);
|
||||
|
||||
|
|
@ -875,7 +902,7 @@ public class ManagedChannelImplTest {
|
|||
assertFalse(nameResolverFactory.resolvers.get(0).shutdown);
|
||||
// call and call2 are still alive, and can still be assigned to a real transport
|
||||
SubchannelPicker picker2 = mock(SubchannelPicker.class);
|
||||
when(picker2.pickSubchannel(new PickSubchannelArgsImpl(method, headers, CallOptions.DEFAULT)))
|
||||
when(picker2.pickSubchannel(eqPickSubchannelArgs(method, headers, CallOptions.DEFAULT)))
|
||||
.thenReturn(PickResult.withSubchannel(subchannel));
|
||||
updateBalancingStateSafely(helper, READY, picker2);
|
||||
executor.runDueTasks();
|
||||
|
|
@ -4531,4 +4558,4 @@ public class ManagedChannelImplTest {
|
|||
return ManagedChannelServiceConfig
|
||||
.fromServiceConfig(rawServiceConfig, true, 3, 4, policySelection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import io.grpc.CallOptions;
|
||||
import io.grpc.InternalConfigSelector;
|
||||
import io.grpc.InternalConfigSelector.Result;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.internal.ManagedChannelServiceConfig.MethodInfo;
|
||||
|
|
@ -209,7 +210,8 @@ public class ManagedChannelServiceConfigTest {
|
|||
InternalConfigSelector configSelector = serviceConfig.getDefaultConfigSelector();
|
||||
MethodDescriptor<?, ?> method = methodForName("service1", "method1");
|
||||
Result result = configSelector.selectConfig(
|
||||
new PickSubchannelArgsImpl(method, new Metadata(), CallOptions.DEFAULT));
|
||||
new PickSubchannelArgsImpl(
|
||||
method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {}));
|
||||
MethodInfo methodInfoFromDefaultConfigSelector =
|
||||
((ManagedChannelServiceConfig) result.getConfig()).getMethodConfig(method);
|
||||
assertThat(methodInfoFromDefaultConfigSelector)
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ public class RpcBehaviorLoadBalancerProviderTest {
|
|||
@Test
|
||||
public void pickerAddsRpcBehaviorMetadata() {
|
||||
PickSubchannelArgsImpl args = new PickSubchannelArgsImpl(TestMethodDescriptors.voidMethod(),
|
||||
new Metadata(), CallOptions.DEFAULT);
|
||||
new Metadata(), CallOptions.DEFAULT, new LoadBalancer.PickDetailsConsumer() {});
|
||||
new RpcBehaviorPicker(mockPicker, "error-code-15").pickSubchannel(args);
|
||||
|
||||
assertThat(args.getHeaders()
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ import io.grpc.EquivalentAddressGroup;
|
|||
import io.grpc.ForwardingChannelBuilder2;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
import io.grpc.LoadBalancerProvider;
|
||||
|
|
@ -383,7 +384,8 @@ public class CachingRlsLbClientTest {
|
|||
.setFullMethodName("doesn/exists")
|
||||
.build(),
|
||||
headers,
|
||||
CallOptions.DEFAULT));
|
||||
CallOptions.DEFAULT,
|
||||
new PickDetailsConsumer() {}));
|
||||
assertThat(pickResult.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
assertThat(pickResult.getStatus().getDescription()).contains("fallback not available");
|
||||
assertThat(fakeThrottler.getNumThrottled()).isEqualTo(1);
|
||||
|
|
@ -438,7 +440,8 @@ public class CachingRlsLbClientTest {
|
|||
TestMethodDescriptors.voidMethod().toBuilder().setFullMethodName("service1/create")
|
||||
.build(),
|
||||
headers,
|
||||
CallOptions.DEFAULT));
|
||||
CallOptions.DEFAULT,
|
||||
new PickDetailsConsumer() {}));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
@ -521,7 +524,8 @@ public class CachingRlsLbClientTest {
|
|||
.setFullMethodName("doesn/exists")
|
||||
.build(),
|
||||
headers,
|
||||
CallOptions.DEFAULT);
|
||||
CallOptions.DEFAULT,
|
||||
new PickDetailsConsumer() {});
|
||||
return invalidArgs;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,7 +39,9 @@ import io.grpc.EquivalentAddressGroup;
|
|||
import io.grpc.ForwardingChannelBuilder2;
|
||||
import io.grpc.LoadBalancer.CreateSubchannelArgs;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.LoadBalancer.ResolvedAddresses;
|
||||
import io.grpc.LoadBalancer.Subchannel;
|
||||
import io.grpc.LoadBalancer.SubchannelPicker;
|
||||
|
|
@ -120,8 +122,8 @@ public class RlsLoadBalancerTest {
|
|||
private MethodDescriptor<Object, Object> fakeRescueMethod;
|
||||
private RlsLoadBalancer rlsLb;
|
||||
private String defaultTarget = "defaultTarget";
|
||||
private PickSubchannelArgsImpl searchSubchannelArgs;
|
||||
private PickSubchannelArgsImpl rescueSubchannelArgs;
|
||||
private PickSubchannelArgs searchSubchannelArgs;
|
||||
private PickSubchannelArgs rescueSubchannelArgs;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
|
|
@ -163,12 +165,8 @@ public class RlsLoadBalancerTest {
|
|||
}
|
||||
};
|
||||
|
||||
Metadata headers = new Metadata();
|
||||
searchSubchannelArgs =
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT);
|
||||
rescueSubchannelArgs =
|
||||
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT);
|
||||
|
||||
searchSubchannelArgs = newPickSubchannelArgs(fakeSearchMethod);
|
||||
rescueSubchannelArgs = newPickSubchannelArgs(fakeRescueMethod);
|
||||
}
|
||||
|
||||
@After
|
||||
|
|
@ -183,9 +181,7 @@ public class RlsLoadBalancerTest {
|
|||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
SubchannelPicker picker = pickerCaptor.getValue();
|
||||
Metadata headers = new Metadata();
|
||||
PickSubchannelArgsImpl fakeSearchMethodArgs =
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT);
|
||||
PickSubchannelArgs fakeSearchMethodArgs = newPickSubchannelArgs(fakeSearchMethod);
|
||||
// Warm-up pick; will be queued
|
||||
PickResult res = picker.pickSubchannel(fakeSearchMethodArgs);
|
||||
assertThat(res.getStatus().isOk()).isTrue();
|
||||
|
|
@ -332,7 +328,6 @@ public class RlsLoadBalancerTest {
|
|||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
SubchannelPicker picker = pickerCaptor.getValue();
|
||||
Metadata headers = new Metadata();
|
||||
// Warm-up pick; will be queued
|
||||
PickResult res = picker.pickSubchannel(searchSubchannelArgs);
|
||||
assertThat(res.getStatus().isOk()).isTrue();
|
||||
|
|
@ -375,13 +370,11 @@ public class RlsLoadBalancerTest {
|
|||
|
||||
// search method will fail because there is no fallback target.
|
||||
picker = pickerCaptor.getValue();
|
||||
res = picker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||
res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
|
||||
assertThat(res.getStatus().isOk()).isFalse();
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
|
||||
res = picker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeRescueMethod, headers, CallOptions.DEFAULT));
|
||||
res = picker.pickSubchannel(newPickSubchannelArgs(fakeRescueMethod));
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
||||
assertThat(res.getSubchannel().getAddresses()).isEqualTo(rescueSubchannel.getAddresses());
|
||||
assertThat(res.getSubchannel().getAttributes()).isEqualTo(rescueSubchannel.getAttributes());
|
||||
|
|
@ -401,10 +394,7 @@ public class RlsLoadBalancerTest {
|
|||
inOrder.verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture());
|
||||
SubchannelPicker picker = pickerCaptor.getValue();
|
||||
Metadata headers = new Metadata();
|
||||
PickResult res =
|
||||
picker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||
PickResult res = picker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
|
||||
assertThat(res.getStatus().isOk()).isTrue();
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
|
||||
|
|
@ -418,8 +408,7 @@ public class RlsLoadBalancerTest {
|
|||
|
||||
SubchannelPicker picker2 = pickerCaptor.getValue();
|
||||
assertThat(picker2).isEqualTo(picker);
|
||||
res = picker2.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||
res = picker2.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
|
||||
// verify success. Subchannel is wrapped, so checking attributes.
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isTrue();
|
||||
assertThat(res.getSubchannel().getAddresses()).isEqualTo(searchSubchannel.getAddresses());
|
||||
|
|
@ -432,14 +421,13 @@ public class RlsLoadBalancerTest {
|
|||
verify(helper)
|
||||
.updateBalancingState(eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
|
||||
SubchannelPicker failedPicker = pickerCaptor.getValue();
|
||||
res = failedPicker.pickSubchannel(
|
||||
new PickSubchannelArgsImpl(fakeSearchMethod, headers, CallOptions.DEFAULT));
|
||||
res = failedPicker.pickSubchannel(newPickSubchannelArgs(fakeSearchMethod));
|
||||
assertThat(res.getStatus().isOk()).isFalse();
|
||||
assertThat(subchannelIsReady(res.getSubchannel())).isFalse();
|
||||
}
|
||||
|
||||
private PickResult markReadyAndGetPickResult(InOrder inOrder,
|
||||
PickSubchannelArgsImpl pickSubchannelArgs) {
|
||||
PickSubchannelArgs pickSubchannelArgs) {
|
||||
subchannels.getLast().updateState(ConnectivityStateInfo.forNonError(ConnectivityState.READY));
|
||||
inOrder.verify(helper, atLeast(1))
|
||||
.updateBalancingState(eq(ConnectivityState.READY), pickerCaptor.capture());
|
||||
|
|
@ -504,6 +492,11 @@ public class RlsLoadBalancerTest {
|
|||
+ "}";
|
||||
}
|
||||
|
||||
private PickSubchannelArgs newPickSubchannelArgs(MethodDescriptor<?, ?> method) {
|
||||
return new PickSubchannelArgsImpl(
|
||||
method, new Metadata(), CallOptions.DEFAULT, new PickDetailsConsumer() {});
|
||||
}
|
||||
|
||||
private final class FakeHelper extends Helper {
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -53,6 +53,11 @@ public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
|
|||
delegate().inboundTrailers(trailers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addOptionalLabel(String key, String value) {
|
||||
delegate().addOptionalLabel(key, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void streamClosed(Status status) {
|
||||
delegate().streamClosed(status);
|
||||
|
|
|
|||
|
|
@ -37,6 +37,7 @@ import io.grpc.ConnectivityState;
|
|||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.LoadBalancer;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.LoadBalancer.ResolvedAddresses;
|
||||
|
|
@ -310,7 +311,8 @@ public class ClusterManagerLoadBalancerTest {
|
|||
.build(),
|
||||
new Metadata(),
|
||||
CallOptions.DEFAULT.withOption(
|
||||
XdsNameResolver.CLUSTER_SELECTION_KEY, clusterName));
|
||||
XdsNameResolver.CLUSTER_SELECTION_KEY, clusterName),
|
||||
new PickDetailsConsumer() {});
|
||||
return picker.pickSubchannel(args);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ import io.grpc.ConnectivityStateInfo;
|
|||
import io.grpc.EquivalentAddressGroup;
|
||||
import io.grpc.LoadBalancer.CreateSubchannelArgs;
|
||||
import io.grpc.LoadBalancer.Helper;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
import io.grpc.LoadBalancer.PickResult;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.LoadBalancer.ResolvedAddresses;
|
||||
|
|
@ -461,13 +462,14 @@ public class RingHashLoadBalancerTest {
|
|||
assertThat(result.getSubchannel().getAddresses()).isEqualTo(servers.get(1));
|
||||
}
|
||||
|
||||
private PickSubchannelArgsImpl getDefaultPickSubchannelArgs(long rpcHash) {
|
||||
private PickSubchannelArgs getDefaultPickSubchannelArgs(long rpcHash) {
|
||||
return new PickSubchannelArgsImpl(
|
||||
TestMethodDescriptors.voidMethod(), new Metadata(),
|
||||
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash));
|
||||
CallOptions.DEFAULT.withOption(XdsNameResolver.RPC_HASH_KEY, rpcHash),
|
||||
new PickDetailsConsumer() {});
|
||||
}
|
||||
|
||||
private PickSubchannelArgsImpl getDefaultPickSubchannelArgsForServer(int serverid) {
|
||||
private PickSubchannelArgs getDefaultPickSubchannelArgsForServer(int serverid) {
|
||||
long rpcHash = hashFunc.hashAsciiString("FakeSocketAddress-server" + serverid + "_0");
|
||||
return getDefaultPickSubchannelArgs(rpcHash);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,6 +50,8 @@ import io.grpc.Deadline;
|
|||
import io.grpc.InsecureChannelCredentials;
|
||||
import io.grpc.InternalConfigSelector;
|
||||
import io.grpc.InternalConfigSelector.Result;
|
||||
import io.grpc.LoadBalancer.PickDetailsConsumer;
|
||||
import io.grpc.LoadBalancer.PickSubchannelArgs;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.MethodDescriptor.MethodType;
|
||||
|
|
@ -662,7 +664,7 @@ public class XdsNameResolverTest {
|
|||
ResolutionResult result = resolutionResultCaptor.getValue();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
Result selectResult = configSelector.selectConfig(
|
||||
new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
Object config = selectResult.getConfig();
|
||||
|
||||
// Purely validating the data (io.grpc.internal.RetryPolicy).
|
||||
|
|
@ -693,7 +695,7 @@ public class XdsNameResolverTest {
|
|||
InternalConfigSelector configSelector = resolveToClusters();
|
||||
CallInfo call = new CallInfo("FooService", "barMethod");
|
||||
Result selectResult = configSelector.selectConfig(
|
||||
new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
Status status = selectResult.getStatus();
|
||||
assertThat(status.isOk()).isFalse();
|
||||
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
|
|
@ -727,7 +729,7 @@ public class XdsNameResolverTest {
|
|||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
// Simulates making a call1 RPC.
|
||||
Result selectResult = configSelector.selectConfig(
|
||||
new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
Status status = selectResult.getStatus();
|
||||
assertThat(status.isOk()).isFalse();
|
||||
assertThat(status.getCode()).isEqualTo(Code.UNAVAILABLE);
|
||||
|
|
@ -1166,7 +1168,7 @@ public class XdsNameResolverTest {
|
|||
assertThat((Map<String, ?>) result.getServiceConfig().getConfig()).isEmpty();
|
||||
InternalConfigSelector configSelector = result.getAttributes().get(InternalConfigSelector.KEY);
|
||||
Result configResult = configSelector.selectConfig(
|
||||
new PickSubchannelArgsImpl(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
newPickSubchannelArgs(call1.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
assertThat(configResult.getStatus().getCode()).isEqualTo(Status.Code.UNAVAILABLE);
|
||||
assertThat(configResult.getStatus().getDescription()).contains(resource);
|
||||
}
|
||||
|
|
@ -1175,7 +1177,7 @@ public class XdsNameResolverTest {
|
|||
CallInfo call, InternalConfigSelector configSelector, String expectedCluster,
|
||||
@Nullable Double expectedTimeoutSec) {
|
||||
Result result = configSelector.selectConfig(
|
||||
new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
assertThat(result.getStatus().isOk()).isTrue();
|
||||
ClientInterceptor interceptor = result.getInterceptor();
|
||||
ClientCall<Void, Void> clientCall = interceptor.interceptCall(
|
||||
|
|
@ -1203,7 +1205,7 @@ public class XdsNameResolverTest {
|
|||
CallInfo call, InternalConfigSelector configSelector, String expectedPluginName,
|
||||
Double expectedTimeoutSec) {
|
||||
Result result = configSelector.selectConfig(
|
||||
new PickSubchannelArgsImpl(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
newPickSubchannelArgs(call.methodDescriptor, new Metadata(), CallOptions.DEFAULT));
|
||||
assertThat(result.getStatus().isOk()).isTrue();
|
||||
ClientInterceptor interceptor = result.getInterceptor();
|
||||
ClientCall<Void, Void> clientCall = interceptor.interceptCall(
|
||||
|
|
@ -1850,8 +1852,7 @@ public class XdsNameResolverTest {
|
|||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
ClientCall.Listener<RespT> listener = mock(ClientCall.Listener.class);
|
||||
Result result = selector.selectConfig(new PickSubchannelArgsImpl(
|
||||
method, metadata, callOptions));
|
||||
Result result = selector.selectConfig(newPickSubchannelArgs(method, metadata, callOptions));
|
||||
ClientCall<ReqT, RespT> call = ClientInterceptors.intercept(channel,
|
||||
result.getInterceptor()).newCall(method, callOptions);
|
||||
call.start(listener, metadata);
|
||||
|
|
@ -1889,6 +1890,11 @@ public class XdsNameResolverTest {
|
|||
verifyRpcFailed(listener, expectedStatus);
|
||||
}
|
||||
|
||||
private PickSubchannelArgs newPickSubchannelArgs(
|
||||
MethodDescriptor<?, ?> method, Metadata headers, CallOptions callOptions) {
|
||||
return new PickSubchannelArgsImpl(method, headers, callOptions, new PickDetailsConsumer() {});
|
||||
}
|
||||
|
||||
private final class FakeXdsClientPoolFactory implements XdsClientPoolFactory {
|
||||
@Override
|
||||
public void setBootstrapOverride(Map<String, ?> bootstrap) {}
|
||||
|
|
|
|||
Loading…
Reference in New Issue