xds: add OrcaOobHelperWrapper.setListener(), allow receiving reports per subchannel (#9143)

remove OrcaOobHelperWrapper layer. Use OrcaOobUtil.updateListener() to set OrcaOobReportListener per each subchannel, not per helper. OrcaOobReportListener is per helper+subchannel unique.

Orca stats are created when creating helper.createSubchannels(), overriding subchannel attributes to store orcaState in the orcaHelper created subchannels
This commit is contained in:
yifeizhuang 2022-05-12 10:50:58 -07:00 committed by GitHub
parent 1ee93758cd
commit 0fd8a6fcea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 197 additions and 204 deletions

View File

@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN;
import com.github.xds.data.orca.v3.OrcaLoadReport; import com.github.xds.data.orca.v3.OrcaLoadReport;
import com.github.xds.service.orca.v3.OpenRcaServiceGrpc; import com.github.xds.service.orca.v3.OpenRcaServiceGrpc;
@ -31,6 +30,7 @@ import com.google.common.base.Objects;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.protobuf.util.Durations; import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
import io.grpc.CallOptions; import io.grpc.CallOptions;
import io.grpc.Channel; import io.grpc.Channel;
import io.grpc.ChannelLogger; import io.grpc.ChannelLogger;
@ -53,12 +53,8 @@ import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil; import io.grpc.internal.GrpcUtil;
import io.grpc.util.ForwardingLoadBalancerHelper; import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel; import io.grpc.util.ForwardingSubchannel;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
@ -67,34 +63,13 @@ import javax.annotation.Nullable;
/** /**
* Utility class that provides method for {@link LoadBalancer} to install listeners to receive * Utility class that provides method for {@link LoadBalancer} to install listeners to receive
* out-of-band backend cost metrics in the format of Open Request Cost Aggregation (ORCA). * out-of-band backend metrics in the format of Open Request Cost Aggregation (ORCA).
*/ */
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/9129") @ExperimentalApi("https://github.com/grpc/grpc-java/issues/9129")
public abstract class OrcaOobUtil { public final class OrcaOobUtil {
private static final Logger logger = Logger.getLogger(OrcaPerRequestUtil.class.getName()); private static final Logger logger = Logger.getLogger(OrcaPerRequestUtil.class.getName());
private static final OrcaOobUtil DEFAULT_INSTANCE =
new OrcaOobUtil() {
@Override private OrcaOobUtil() {}
public OrcaReportingHelperWrapper newOrcaReportingHelperWrapper(
LoadBalancer.Helper delegate,
OrcaOobReportListener listener) {
return newOrcaReportingHelperWrapper(
delegate,
listener,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
}
};
/**
* Gets an {@code OrcaOobUtil} instance that provides actual implementation of
* {@link #newOrcaReportingHelperWrapper}.
*/
public static OrcaOobUtil getInstance() {
return DEFAULT_INSTANCE;
}
/** /**
* Creates a new {@link io.grpc.LoadBalancer.Helper} with provided * Creates a new {@link io.grpc.LoadBalancer.Helper} with provided
@ -112,12 +87,14 @@ public abstract class OrcaOobUtil {
* *
* public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { * public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
* // listener implements the logic for WRR's usage of backend metrics. * // listener implements the logic for WRR's usage of backend metrics.
* OrcaReportingHelperWrapper orcaWrapper = * OrcaReportingHelper orcaHelper =
* OrcaOobUtil.getInstance().newOrcaReportingHelperWrapper(originHelper, listener); * OrcaOobUtil.newOrcaReportingHelper(originHelper);
* orcaWrapper.setReportingConfig(
* OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build());
* Subchannel subchannel = * Subchannel subchannel =
* orcaWrapper.asHelper().createSubchannel(CreateSubchannelArgs.newBuilder()...); * orcaHelper.createSubchannel(CreateSubchannelArgs.newBuilder()...);
* OrcaOobUtil.setListener(
* subchannel,
* listener,
* OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build());
* ... * ...
* } * }
* } * }
@ -128,8 +105,11 @@ public abstract class OrcaOobUtil {
* <pre> * <pre>
* {@code * {@code
* class XdsLoadBalancer extends LoadBalancer { * class XdsLoadBalancer extends LoadBalancer {
* private final Helper originHelper; // the original Helper * private final Helper orcaHelper; // the original Helper
* *
* public XdsLoadBalancer(LoadBalancer.Helper helper) {
* this.orcaHelper = OrcaUtil.newOrcaReportingHelper(helper);
* }
* private void createChildPolicy( * private void createChildPolicy(
* Locality locality, LoadBalancerProvider childPolicyProvider) { * Locality locality, LoadBalancerProvider childPolicyProvider) {
* // Each Locality has a child policy, and the parent does per-locality aggregation by * // Each Locality has a child policy, and the parent does per-locality aggregation by
@ -137,11 +117,18 @@ public abstract class OrcaOobUtil {
* *
* // Create an OrcaReportingHelperWrapper for each Locality. * // Create an OrcaReportingHelperWrapper for each Locality.
* // listener implements the logic for locality-level backend metric aggregation. * // listener implements the logic for locality-level backend metric aggregation.
* OrcaReportingHelperWrapper orcaWrapper = * LoadBalancer childLb = childPolicyProvider.newLoadBalancer(
* OrcaOobUtil.getInstance().newOrcaReportingHelperWrapper(originHelper, listener); * new ForwardingLoadBalancerHelper() {
* orcaWrapper.setReportingConfig( * public Subchannel createSubchannel(CreateSubchannelArgs args) {
* OrcaRerportingConfig.newBuilder().setReportInterval(30, SECOND).build()); * Subchannel subchannel = super.createSubchannel(args);
* LoadBalancer childLb = childPolicyProvider.newLoadBalancer(orcaWrapper.asHelper()); * OrcaOobUtil.setListener(subchannel, listener,
* OrcaReportingConfig.newBuilder().setReportInterval(30, SECOND).build());
* return subchannel;
* }
* public LoadBalancer.Helper delegate() {
* return orcaHelper;
* }
* });
* } * }
* } * }
* } * }
@ -151,33 +138,20 @@ public abstract class OrcaOobUtil {
* *
* @param delegate the delegate helper that provides essentials for establishing subchannels to * @param delegate the delegate helper that provides essentials for establishing subchannels to
* backends. * backends.
* @param listener contains the callback to be invoked when an out-of-band ORCA report is
* received.
*/ */
public abstract OrcaReportingHelperWrapper newOrcaReportingHelperWrapper( public static LoadBalancer.Helper newOrcaReportingHelper(LoadBalancer.Helper delegate) {
LoadBalancer.Helper delegate, return newOrcaReportingHelper(
OrcaOobReportListener listener); delegate,
new ExponentialBackoffPolicy.Provider(),
GrpcUtil.STOPWATCH_SUPPLIER);
}
@VisibleForTesting @VisibleForTesting
static OrcaReportingHelperWrapper newOrcaReportingHelperWrapper( static LoadBalancer.Helper newOrcaReportingHelper(
LoadBalancer.Helper delegate, LoadBalancer.Helper delegate,
OrcaOobReportListener listener,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) { Supplier<Stopwatch> stopwatchSupplier) {
final OrcaReportingHelper orcaHelper = return new OrcaReportingHelper(delegate, backoffPolicyProvider, stopwatchSupplier);
new OrcaReportingHelper(delegate, listener, backoffPolicyProvider, stopwatchSupplier);
return new OrcaReportingHelperWrapper() {
@Override
public void setReportingConfig(OrcaReportingConfig config) {
orcaHelper.setReportingConfig(config);
}
@Override
public Helper asHelper() {
return orcaHelper;
}
};
} }
/** /**
@ -199,15 +173,13 @@ public abstract class OrcaOobUtil {
void onLoadReport(OrcaLoadReport report); void onLoadReport(OrcaLoadReport report);
} }
/** static final Attributes.Key<SubchannelImpl> ORCA_REPORTING_STATE_KEY =
* Blueprint for the wrapper that wraps a {@link io.grpc.LoadBalancer.Helper} with the capability Attributes.Key.create("internal-orca-reporting-state");
* of allowing {@link LoadBalancer}s interested in receiving out-of-band ORCA reports to update
* the reporting configuration such as reporting interval.
*/
public abstract static class OrcaReportingHelperWrapper {
/** /**
* Sets the configuration of receiving ORCA reports, such as the interval of receiving reports. * Update {@link OrcaOobReportListener} to receive Out-of-Band metrics report for the
* particular subchannel connection, and set the configuration of receiving ORCA reports,
* such as the interval of receiving reports.
* *
* <p>This method needs to be called from the SynchronizationContext returned by the wrapped * <p>This method needs to be called from the SynchronizationContext returned by the wrapped
* helper's {@link Helper#getSynchronizationContext()}. * helper's {@link Helper#getSynchronizationContext()}.
@ -218,42 +190,40 @@ public abstract class OrcaOobUtil {
* <p>If multiple load balancing policies configure reporting with different intervals, reports * <p>If multiple load balancing policies configure reporting with different intervals, reports
* come with the minimum of those intervals. * come with the minimum of those intervals.
* *
* @param subchannel the server connected by this subchannel to receive the metrics.
*
* @param listener the callback upon receiving backend metrics from the Out-Of-Band stream.
*
* @param config the configuration to be set. * @param config the configuration to be set.
*
*/ */
public abstract void setReportingConfig(OrcaReportingConfig config); public static void setListener(Subchannel subchannel, OrcaOobReportListener listener,
OrcaReportingConfig config) {
/** SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY);
* Returns a wrapped {@link io.grpc.LoadBalancer.Helper}. Subchannels created through it will if (orcaSubchannel == null) {
* retrieve ORCA load reports if the server supports it. throw new IllegalArgumentException("Subchannel does not have orca Out-Of-Band stream enabled."
*/ + " Try to use a subchannel created by OrcaOobUtil.OrcaHelper.");
public abstract LoadBalancer.Helper asHelper(); }
orcaSubchannel.orcaState.setListener(orcaSubchannel, listener, config);
} }
/** /**
* An {@link OrcaReportingHelper} wraps a delegated {@link LoadBalancer.Helper} with additional * An {@link OrcaReportingHelper} wraps a delegated {@link LoadBalancer.Helper} with additional
* functionality to manage RPCs for out-of-band ORCA reporting for each backend it establishes * functionality to manage RPCs for out-of-band ORCA reporting for each backend it establishes
* connection to. * connection to. Subchannels created through it will retrieve ORCA load reports if the server
* supports it.
*/ */
private static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper static final class OrcaReportingHelper extends ForwardingLoadBalancerHelper {
implements OrcaOobReportListener {
private static final CreateSubchannelArgs.Key<OrcaReportingState> ORCA_REPORTING_STATE_KEY =
CreateSubchannelArgs.Key.create("internal-orca-reporting-state");
private final LoadBalancer.Helper delegate; private final LoadBalancer.Helper delegate;
private final OrcaOobReportListener listener;
private final SynchronizationContext syncContext; private final SynchronizationContext syncContext;
private final BackoffPolicy.Provider backoffPolicyProvider; private final BackoffPolicy.Provider backoffPolicyProvider;
private final Supplier<Stopwatch> stopwatchSupplier; private final Supplier<Stopwatch> stopwatchSupplier;
private final Set<OrcaReportingState> orcaStates = new HashSet<>();
@Nullable private OrcaReportingConfig orcaConfig;
OrcaReportingHelper( OrcaReportingHelper(
LoadBalancer.Helper delegate, LoadBalancer.Helper delegate,
OrcaOobReportListener listener,
BackoffPolicy.Provider backoffPolicyProvider, BackoffPolicy.Provider backoffPolicyProvider,
Supplier<Stopwatch> stopwatchSupplier) { Supplier<Stopwatch> stopwatchSupplier) {
this.delegate = checkNotNull(delegate, "delegate"); this.delegate = checkNotNull(delegate, "delegate");
this.listener = checkNotNull(listener, "listener");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext"); syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext");
@ -267,42 +237,17 @@ public abstract class OrcaOobUtil {
@Override @Override
public Subchannel createSubchannel(CreateSubchannelArgs args) { public Subchannel createSubchannel(CreateSubchannelArgs args) {
syncContext.throwIfNotInThisSynchronizationContext(); syncContext.throwIfNotInThisSynchronizationContext();
OrcaReportingState orcaState = args.getOption(ORCA_REPORTING_STATE_KEY); Subchannel subchannel = super.createSubchannel(args);
boolean augmented = false; SubchannelImpl orcaSubchannel = subchannel.getAttributes().get(ORCA_REPORTING_STATE_KEY);
if (orcaState == null) { OrcaReportingState orcaState;
if (orcaSubchannel == null) {
// Only the first load balancing policy requesting ORCA reports instantiates an // Only the first load balancing policy requesting ORCA reports instantiates an
// OrcaReportingState. // OrcaReportingState.
orcaState = new OrcaReportingState(this, syncContext, orcaState = new OrcaReportingState(syncContext, delegate().getScheduledExecutorService());
delegate().getScheduledExecutorService()); } else {
args = args.toBuilder().addOption(ORCA_REPORTING_STATE_KEY, orcaState).build(); orcaState = orcaSubchannel.orcaState;
augmented = true;
}
orcaStates.add(orcaState);
orcaState.listeners.add(this);
Subchannel subchannel = super.createSubchannel(args);
if (augmented) {
subchannel = new SubchannelImpl(subchannel, orcaState);
}
if (orcaConfig != null) {
orcaState.setReportingConfig(this, orcaConfig);
}
return subchannel;
}
void setReportingConfig(final OrcaReportingConfig config) {
syncContext.throwIfNotInThisSynchronizationContext();
orcaConfig = config;
for (OrcaReportingState state : orcaStates) {
state.setReportingConfig(OrcaReportingHelper.this, config);
}
}
@Override
public void onLoadReport(OrcaLoadReport report) {
syncContext.throwIfNotInThisSynchronizationContext();
if (orcaConfig != null) {
listener.onLoadReport(report);
} }
return new SubchannelImpl(subchannel, orcaState);
} }
/** /**
@ -312,11 +257,9 @@ public abstract class OrcaOobUtil {
*/ */
private final class OrcaReportingState implements SubchannelStateListener { private final class OrcaReportingState implements SubchannelStateListener {
private final OrcaReportingHelper orcaHelper;
private final SynchronizationContext syncContext; private final SynchronizationContext syncContext;
private final ScheduledExecutorService timeService; private final ScheduledExecutorService timeService;
private final List<OrcaOobReportListener> listeners = new ArrayList<>(); private final Map<OrcaOobReportListener, OrcaReportingConfig> configs = new HashMap<>();
private final Map<OrcaReportingHelper, OrcaReportingConfig> configs = new HashMap<>();
@Nullable private Subchannel subchannel; @Nullable private Subchannel subchannel;
@Nullable private ChannelLogger subchannelLogger; @Nullable private ChannelLogger subchannelLogger;
@Nullable @Nullable
@ -335,12 +278,11 @@ public abstract class OrcaOobUtil {
private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE); private ConnectivityStateInfo state = ConnectivityStateInfo.forNonError(IDLE);
// True if server returned UNIMPLEMENTED. // True if server returned UNIMPLEMENTED.
private boolean disabled; private boolean disabled;
private boolean started;
OrcaReportingState( OrcaReportingState(
OrcaReportingHelper orcaHelper,
SynchronizationContext syncContext, SynchronizationContext syncContext,
ScheduledExecutorService timeService) { ScheduledExecutorService timeService) {
this.orcaHelper = checkNotNull(orcaHelper, "orcaHelper");
this.syncContext = checkNotNull(syncContext, "syncContext"); this.syncContext = checkNotNull(syncContext, "syncContext");
this.timeService = checkNotNull(timeService, "timeService"); this.timeService = checkNotNull(timeService, "timeService");
} }
@ -350,11 +292,27 @@ public abstract class OrcaOobUtil {
this.subchannel = checkNotNull(subchannel, "subchannel"); this.subchannel = checkNotNull(subchannel, "subchannel");
this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger"); this.subchannelLogger = checkNotNull(subchannel.getChannelLogger(), "subchannelLogger");
this.stateListener = checkNotNull(stateListener, "stateListener"); this.stateListener = checkNotNull(stateListener, "stateListener");
started = true;
} }
void setReportingConfig(OrcaReportingHelper helper, OrcaReportingConfig config) { void setListener(SubchannelImpl orcaSubchannel, OrcaOobReportListener listener,
OrcaReportingConfig config) {
syncContext.execute(new Runnable() {
@Override
public void run() {
OrcaOobReportListener oldListener = orcaSubchannel.reportListener;
if (oldListener != null) {
configs.remove(oldListener);
}
orcaSubchannel.reportListener = listener;
setReportingConfig(listener, config);
}
});
}
private void setReportingConfig(OrcaOobReportListener listener, OrcaReportingConfig config) {
boolean reconfigured = false; boolean reconfigured = false;
configs.put(helper, config); configs.put(listener, config);
// Real reporting interval is the minimum of intervals requested by all participating // Real reporting interval is the minimum of intervals requested by all participating
// helpers. // helpers.
if (overallConfig == null) { if (overallConfig == null) {
@ -386,9 +344,6 @@ public abstract class OrcaOobUtil {
// may be available on the new connection. // may be available on the new connection.
disabled = false; disabled = false;
} }
if (Objects.equal(newState.getState(), SHUTDOWN)) {
orcaHelper.orcaStates.remove(this);
}
state = newState; state = newState;
adjustOrcaReporting(); adjustOrcaReporting();
// Propagate subchannel state update to downstream listeners. // Propagate subchannel state update to downstream listeners.
@ -495,7 +450,7 @@ public abstract class OrcaOobUtil {
callHasResponded = true; callHasResponded = true;
backoffPolicy = null; backoffPolicy = null;
subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response); subchannelLogger.log(ChannelLogLevel.DEBUG, "Received an ORCA report: {0}", response);
for (OrcaOobReportListener listener : listeners) { for (OrcaOobReportListener listener : configs.keySet()) {
listener.onLoadReport(response); listener.onLoadReport(response);
} }
call.request(1); call.request(1);
@ -550,9 +505,9 @@ public abstract class OrcaOobUtil {
@VisibleForTesting @VisibleForTesting
static final class SubchannelImpl extends ForwardingSubchannel { static final class SubchannelImpl extends ForwardingSubchannel {
private final Subchannel delegate; private final Subchannel delegate;
private final OrcaReportingHelper.OrcaReportingState orcaState; private final OrcaReportingHelper.OrcaReportingState orcaState;
@Nullable private OrcaOobReportListener reportListener;
SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState) { SubchannelImpl(Subchannel delegate, OrcaReportingHelper.OrcaReportingState orcaState) {
this.delegate = checkNotNull(delegate, "delegate"); this.delegate = checkNotNull(delegate, "delegate");
@ -566,8 +521,17 @@ public abstract class OrcaOobUtil {
@Override @Override
public void start(SubchannelStateListener listener) { public void start(SubchannelStateListener listener) {
if (!orcaState.started) {
orcaState.init(this, listener); orcaState.init(this, listener);
super.start(orcaState); super.start(orcaState);
} else {
super.start(listener);
}
}
@Override
public Attributes getAttributes() {
return super.getAttributes().toBuilder().set(ORCA_REPORTING_STATE_KEY, this).build();
} }
} }

View File

@ -23,6 +23,7 @@ import static io.grpc.ConnectivityState.CONNECTING;
import static io.grpc.ConnectivityState.IDLE; import static io.grpc.ConnectivityState.IDLE;
import static io.grpc.ConnectivityState.READY; import static io.grpc.ConnectivityState.READY;
import static io.grpc.ConnectivityState.SHUTDOWN; import static io.grpc.ConnectivityState.SHUTDOWN;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo; import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.atLeast;
@ -47,6 +48,7 @@ import io.grpc.ConnectivityStateInfo;
import io.grpc.Context; import io.grpc.Context;
import io.grpc.Context.CancellationListener; import io.grpc.Context.CancellationListener;
import io.grpc.EquivalentAddressGroup; import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer;
import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper; import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel; import io.grpc.LoadBalancer.Subchannel;
@ -61,9 +63,9 @@ import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.FakeClock; import io.grpc.internal.FakeClock;
import io.grpc.stub.StreamObserver; import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule; import io.grpc.testing.GrpcCleanupRule;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener; import io.grpc.xds.orca.OrcaOobUtil.OrcaOobReportListener;
import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingConfig; import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingConfig;
import io.grpc.xds.orca.OrcaOobUtil.OrcaReportingHelperWrapper;
import io.grpc.xds.orca.OrcaOobUtil.SubchannelImpl; import io.grpc.xds.orca.OrcaOobUtil.SubchannelImpl;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.text.MessageFormat; import java.text.MessageFormat;
@ -130,9 +132,10 @@ public class OrcaOobUtilTest {
@Mock private BackoffPolicy backoffPolicy1; @Mock private BackoffPolicy backoffPolicy1;
@Mock private BackoffPolicy backoffPolicy2; @Mock private BackoffPolicy backoffPolicy2;
private FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS]; private FakeSubchannel[] subchannels = new FakeSubchannel[NUM_SUBCHANNELS];
private OrcaReportingHelperWrapper orcaHelperWrapper; private LoadBalancer.Helper orcaHelper;
private OrcaReportingHelperWrapper parentHelperWrapper; private LoadBalancer.Helper parentHelper;
private OrcaReportingHelperWrapper childHelperWrapper; private LoadBalancer.Helper childHelper;
private Subchannel savedParentSubchannel;
private static FakeSubchannel unwrap(Subchannel s) { private static FakeSubchannel unwrap(Subchannel s) {
return (FakeSubchannel) ((SubchannelImpl) s).delegate(); return (FakeSubchannel) ((SubchannelImpl) s).delegate();
@ -201,29 +204,34 @@ public class OrcaOobUtilTest {
when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L); when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L);
when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L); when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L);
orcaHelperWrapper = orcaHelper =
OrcaOobUtil.newOrcaReportingHelperWrapper( OrcaOobUtil.newOrcaReportingHelper(
origHelper, origHelper,
mockOrcaListener0,
backoffPolicyProvider, backoffPolicyProvider,
fakeClock.getStopwatchSupplier()); fakeClock.getStopwatchSupplier());
parentHelperWrapper = parentHelper =
OrcaOobUtil.newOrcaReportingHelperWrapper( new ForwardingLoadBalancerHelper() {
origHelper, @Override
mockOrcaListener1, protected Helper delegate() {
backoffPolicyProvider, return orcaHelper;
fakeClock.getStopwatchSupplier()); }
childHelperWrapper =
OrcaOobUtil.newOrcaReportingHelperWrapper( @Override
parentHelperWrapper.asHelper(), public Subchannel createSubchannel(CreateSubchannelArgs args) {
mockOrcaListener2, Subchannel subchannel = super.createSubchannel(args);
savedParentSubchannel = subchannel;
return subchannel;
}
};
childHelper =
OrcaOobUtil.newOrcaReportingHelper(
parentHelper,
backoffPolicyProvider, backoffPolicyProvider,
fakeClock.getStopwatchSupplier()); fakeClock.getStopwatchSupplier());
} }
@Test @Test
public void singlePolicyTypicalWorkflow() { public void singlePolicyTypicalWorkflow() {
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG);
verify(origHelper, atLeast(0)).getSynchronizationContext(); verify(origHelper, atLeast(0)).getSynchronizationContext();
verifyNoMoreInteractions(origHelper); verifyNoMoreInteractions(origHelper);
@ -234,8 +242,9 @@ public class OrcaOobUtilTest {
String subchannelAttrValue = "eag attr " + i; String subchannelAttrValue = "eag attr " + i;
Attributes attrs = Attributes attrs =
Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
assertThat(unwrap(createSubchannel(orcaHelperWrapper.asHelper(), i, attrs))) Subchannel created = createSubchannel(orcaHelper, i, attrs);
.isSameInstanceAs(subchannels[i]); assertThat(unwrap(created)).isSameInstanceAs(subchannels[i]);
setOrcaReportConfig(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]);
assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY))
@ -304,8 +313,6 @@ public class OrcaOobUtilTest {
@Test @Test
public void twoLevelPoliciesTypicalWorkflow() { public void twoLevelPoliciesTypicalWorkflow() {
setOrcaReportConfig(childHelperWrapper, SHORT_INTERVAL_CONFIG);
setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG);
verify(origHelper, atLeast(0)).getSynchronizationContext(); verify(origHelper, atLeast(0)).getSynchronizationContext();
verifyNoMoreInteractions(origHelper); verifyNoMoreInteractions(origHelper);
@ -316,8 +323,9 @@ public class OrcaOobUtilTest {
String subchannelAttrValue = "eag attr " + i; String subchannelAttrValue = "eag attr " + i;
Attributes attrs = Attributes attrs =
Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); Attributes.newBuilder().set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build();
assertThat(unwrap(createSubchannel(childHelperWrapper.asHelper(), i, attrs))) Subchannel created = createSubchannel(childHelper, i, attrs);
.isSameInstanceAs(subchannels[i]); assertThat(unwrap(((SubchannelImpl) created).delegate())).isSameInstanceAs(subchannels[i]);
OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture()); verify(origHelper, times(i + 1)).createSubchannel(createArgsCaptor.capture());
assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]); assertThat(createArgsCaptor.getValue().getAddresses()).isEqualTo(eagLists[i]);
assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY)) assertThat(createArgsCaptor.getValue().getAttributes().get(SUBCHANNEL_ATTR_KEY))
@ -362,7 +370,6 @@ public class OrcaOobUtilTest {
serverCall.responseObserver.onNext(report); serverCall.responseObserver.onNext(report);
assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report); assertLog(subchannel.logs, "DEBUG: Received an ORCA report: " + report);
verify(mockOrcaListener1, times(i + 1)).onLoadReport(eq(report)); verify(mockOrcaListener1, times(i + 1)).onLoadReport(eq(report));
verify(mockOrcaListener2, times(i + 1)).onLoadReport(eq(report));
} }
for (int i = 0; i < NUM_SUBCHANNELS; i++) { for (int i = 0; i < NUM_SUBCHANNELS; i++) {
@ -390,8 +397,8 @@ public class OrcaOobUtilTest {
@Test @Test
public void orcReportingDisabledWhenServiceNotImplemented() { public void orcReportingDisabledWhenServiceNotImplemented() {
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
FakeSubchannel subchannel = subchannels[0]; FakeSubchannel subchannel = subchannels[0];
OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0]; OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0];
SubchannelStateListener mockStateListener = mockStateListeners[0]; SubchannelStateListener mockStateListener = mockStateListeners[0];
@ -425,8 +432,8 @@ public class OrcaOobUtilTest {
@Test @Test
public void orcaReportingStreamClosedAndRetried() { public void orcaReportingStreamClosedAndRetried() {
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); final Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
FakeSubchannel subchannel = subchannels[0]; FakeSubchannel subchannel = subchannels[0];
OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0]; OpenRcaServiceImp orcaServiceImp = orcaServiceImps[0];
SubchannelStateListener mockStateListener = mockStateListeners[0]; SubchannelStateListener mockStateListener = mockStateListeners[0];
@ -491,14 +498,14 @@ public class OrcaOobUtilTest {
@Test @Test
public void reportingNotStartedUntilConfigured() { public void reportingNotStartedUntilConfigured() {
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]) verify(mockStateListeners[0])
.onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); .onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).isEmpty(); assertThat(orcaServiceImps[0].calls).isEmpty();
assertThat(subchannels[0].logs).isEmpty(); assertThat(subchannels[0].logs).isEmpty();
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls).hasSize(1); assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs, assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
@ -506,10 +513,33 @@ public class OrcaOobUtilTest {
.isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG)); .isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
} }
@Test
public void updateListenerThrows() {
Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0])
.onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
assertThat(orcaServiceImps[0].calls.peek().request)
.isEqualTo(buildOrcaRequestFromConfig(SHORT_INTERVAL_CONFIG));
assertThat(unwrap(created)).isSameInstanceAs(subchannels[0]);
try {
OrcaOobUtil.setListener(subchannels[0], mockOrcaListener1, MEDIUM_INTERVAL_CONFIG);
fail("Update orca listener on non-orca subchannel should fail");
} catch (IllegalArgumentException ex) {
assertThat(ex.getMessage()).isEqualTo("Subchannel does not have orca Out-Of-Band "
+ "stream enabled. Try to use a subchannel created by OrcaOobUtil.OrcaHelper.");
}
}
@Test @Test
public void updateReportingIntervalBeforeCreatingSubchannel() { public void updateReportingIntervalBeforeCreatingSubchannel() {
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@ -522,8 +552,8 @@ public class OrcaOobUtilTest {
@Test @Test
public void updateReportingIntervalBeforeSubchannelReady() { public void updateReportingIntervalBeforeSubchannelReady() {
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@ -538,8 +568,9 @@ public class OrcaOobUtilTest {
public void updateReportingIntervalWhenRpcActive() { public void updateReportingIntervalWhenRpcActive() {
// Sets report interval before creating a Subchannel, reporting starts right after suchannel // Sets report interval before creating a Subchannel, reporting starts right after suchannel
// state becomes READY. // state becomes READY.
setOrcaReportConfig(orcaHelperWrapper, MEDIUM_INTERVAL_CONFIG); Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); OrcaOobUtil.setListener(created, mockOrcaListener0,
MEDIUM_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@ -550,7 +581,7 @@ public class OrcaOobUtilTest {
.isEqualTo(buildOrcaRequestFromConfig(MEDIUM_INTERVAL_CONFIG)); .isEqualTo(buildOrcaRequestFromConfig(MEDIUM_INTERVAL_CONFIG));
// Make reporting less frequent. // Make reporting less frequent.
setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
assertThat(orcaServiceImps[0].calls).hasSize(1); assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs, assertLog(subchannels[0].logs,
@ -559,12 +590,13 @@ public class OrcaOobUtilTest {
.isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG)); .isEqualTo(buildOrcaRequestFromConfig(LONG_INTERVAL_CONFIG));
// Configuring with the same report interval again does not restart ORCA RPC. // Configuring with the same report interval again does not restart ORCA RPC.
setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse(); assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse();
assertThat(subchannels[0].logs).isEmpty(); assertThat(subchannels[0].logs).isEmpty();
// Make reporting more frequent. // Make reporting more frequent.
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0,
SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
assertThat(orcaServiceImps[0].calls).hasSize(1); assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs, assertLog(subchannels[0].logs,
@ -575,8 +607,8 @@ public class OrcaOobUtilTest {
@Test @Test
public void updateReportingIntervalWhenRpcPendingRetry() { public void updateReportingIntervalWhenRpcPendingRetry() {
createSubchannel(orcaHelperWrapper.asHelper(), 0, Attributes.EMPTY); Subchannel created = createSubchannel(orcaHelper, 0, Attributes.EMPTY);
setOrcaReportConfig(orcaHelperWrapper, SHORT_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
@ -596,7 +628,7 @@ public class OrcaOobUtilTest {
assertThat(orcaServiceImps[0].calls).isEmpty(); assertThat(orcaServiceImps[0].calls).isEmpty();
// Make reporting less frequent. // Make reporting less frequent.
setOrcaReportConfig(orcaHelperWrapper, LONG_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
// Retry task will be canceled and restarts new RPC immediately. // Retry task will be canceled and restarts new RPC immediately.
assertThat(fakeClock.getPendingTasks()).isEmpty(); assertThat(fakeClock.getPendingTasks()).isEmpty();
assertThat(orcaServiceImps[0].calls).hasSize(1); assertThat(orcaServiceImps[0].calls).hasSize(1);
@ -608,7 +640,7 @@ public class OrcaOobUtilTest {
@Test @Test
public void policiesReceiveSameReportIndependently() { public void policiesReceiveSameReportIndependently() {
createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY); Subchannel childSubchannel = createSubchannel(childHelper, 0, Attributes.EMPTY);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
// No helper sets ORCA reporting interval, so load reporting is not started. // No helper sets ORCA reporting interval, so load reporting is not started.
@ -617,7 +649,7 @@ public class OrcaOobUtilTest {
assertThat(subchannels[0].logs).isEmpty(); assertThat(subchannels[0].logs).isEmpty();
// Parent helper requests ORCA reports with a certain interval, load reporting starts. // Parent helper requests ORCA reports with a certain interval, load reporting starts.
setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG); OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls).hasSize(1); assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs, assertLog(subchannels[0].logs,
"DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses()); "DEBUG: Starting ORCA reporting for " + subchannels[0].getAllAddresses());
@ -633,7 +665,7 @@ public class OrcaOobUtilTest {
verifyNoMoreInteractions(mockOrcaListener2); verifyNoMoreInteractions(mockOrcaListener2);
// Now child helper also wants to receive reports. // Now child helper also wants to receive reports.
setOrcaReportConfig(childHelperWrapper, SHORT_INTERVAL_CONFIG); OrcaOobUtil.setListener(childSubchannel, mockOrcaListener2, SHORT_INTERVAL_CONFIG);
orcaServiceImps[0].calls.peek().responseObserver.onNext(report); orcaServiceImps[0].calls.peek().responseObserver.onNext(report);
assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report); assertLog(subchannels[0].logs, "DEBUG: Received an ORCA report: " + report);
// Both helper receives the same report instance. // Both helper receives the same report instance.
@ -647,9 +679,9 @@ public class OrcaOobUtilTest {
@Test @Test
public void reportWithMostFrequentIntervalRequested() { public void reportWithMostFrequentIntervalRequested() {
setOrcaReportConfig(parentHelperWrapper, SHORT_INTERVAL_CONFIG); Subchannel created = createSubchannel(childHelper, 0, Attributes.EMPTY);
setOrcaReportConfig(childHelperWrapper, LONG_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener0, LONG_INTERVAL_CONFIG);
createSubchannel(childHelperWrapper.asHelper(), 0, Attributes.EMPTY); OrcaOobUtil.setListener(created, mockOrcaListener1, SHORT_INTERVAL_CONFIG);
deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY)); deliverSubchannelState(0, ConnectivityStateInfo.forNonError(READY));
verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY))); verify(mockStateListeners[0]).onSubchannelState(eq(ConnectivityStateInfo.forNonError(READY)));
assertThat(orcaServiceImps[0].calls).hasSize(1); assertThat(orcaServiceImps[0].calls).hasSize(1);
@ -660,14 +692,14 @@ public class OrcaOobUtilTest {
assertThat(Durations.toNanos(orcaServiceImps[0].calls.peek().request.getReportInterval())) assertThat(Durations.toNanos(orcaServiceImps[0].calls.peek().request.getReportInterval()))
.isEqualTo(SHORT_INTERVAL_CONFIG.getReportIntervalNanos()); .isEqualTo(SHORT_INTERVAL_CONFIG.getReportIntervalNanos());
// Child helper wants reporting to be more frequent than its current setting while it is still // Parent helper wants reporting to be more frequent than its current setting while it is still
// less frequent than parent helper. Nothing should happen on existing RPC. // less frequent than parent helper. Nothing should happen on existing RPC.
setOrcaReportConfig(childHelperWrapper, MEDIUM_INTERVAL_CONFIG); OrcaOobUtil.setListener(savedParentSubchannel, mockOrcaListener0, MEDIUM_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse(); assertThat(orcaServiceImps[0].calls.peek().cancelled).isFalse();
assertThat(subchannels[0].logs).isEmpty(); assertThat(subchannels[0].logs).isEmpty();
// Parent helper wants reporting to be less frequent. // Parent helper wants reporting to be less frequent.
setOrcaReportConfig(parentHelperWrapper, MEDIUM_INTERVAL_CONFIG); OrcaOobUtil.setListener(created, mockOrcaListener1, MEDIUM_INTERVAL_CONFIG);
assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue(); assertThat(orcaServiceImps[0].calls.poll().cancelled).isTrue();
assertThat(orcaServiceImps[0].calls).hasSize(1); assertThat(orcaServiceImps[0].calls).hasSize(1);
assertLog(subchannels[0].logs, assertLog(subchannels[0].logs,
@ -720,13 +752,10 @@ public class OrcaOobUtilTest {
} }
private void setOrcaReportConfig( private void setOrcaReportConfig(
final OrcaReportingHelperWrapper helperWrapper, final OrcaReportingConfig config) { final Subchannel subchannel,
syncContext.execute(new Runnable() { final OrcaOobReportListener listener,
@Override final OrcaReportingConfig config) {
public void run() { OrcaOobUtil.setListener(subchannel, listener, config);
helperWrapper.setReportingConfig(config);
}
});
} }
private static final class OpenRcaServiceImp extends OpenRcaServiceGrpc.OpenRcaServiceImplBase { private static final class OpenRcaServiceImp extends OpenRcaServiceGrpc.OpenRcaServiceImplBase {