rls: lb policy config object (#6883)

This commit is contained in:
Jihun Cho 2020-04-20 23:04:17 -07:00 committed by GitHub
parent eb8e31409e
commit 23bcdb1a09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 649 additions and 0 deletions

View File

@ -0,0 +1,449 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.rls.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.internal.ObjectPool;
import io.grpc.rls.internal.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
import io.grpc.rls.internal.RlsProtoData.RouteLookupConfig;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.ForwardingSubchannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
/** Configuration for RLS load balancing policy. */
public final class LbPolicyConfiguration {
private final RouteLookupConfig routeLookupConfig;
private final ChildLoadBalancingPolicy policy;
public LbPolicyConfiguration(
RouteLookupConfig routeLookupConfig, ChildLoadBalancingPolicy policy) {
this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig");
this.policy = checkNotNull(policy, "policy");
}
public RouteLookupConfig getRouteLookupConfig() {
return routeLookupConfig;
}
public ChildLoadBalancingPolicy getLoadBalancingPolicy() {
return policy;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
LbPolicyConfiguration that = (LbPolicyConfiguration) o;
return Objects.equals(routeLookupConfig, that.routeLookupConfig)
&& Objects.equals(policy, that.policy);
}
@Override
public int hashCode() {
return Objects.hash(routeLookupConfig, policy);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("routeLookupConfig", routeLookupConfig)
.add("policy", policy)
.toString();
}
/** ChildLoadBalancingPolicy is an elected child policy to delegate requests. */
public static final class ChildLoadBalancingPolicy {
private final Map<String, Object> effectiveRawChildPolicy;
private final LoadBalancerProvider effectiveLbProvider;
private final String targetFieldName;
@VisibleForTesting
ChildLoadBalancingPolicy(
String targetFieldName,
Map<String, Object> effectiveRawChildPolicy,
LoadBalancerProvider effectiveLbProvider) {
checkArgument(
targetFieldName != null && !targetFieldName.isEmpty(),
"targetFieldName cannot be empty or null");
this.targetFieldName = targetFieldName;
this.effectiveRawChildPolicy =
checkNotNull(effectiveRawChildPolicy, "effectiveRawChildPolicy");
this.effectiveLbProvider = checkNotNull(effectiveLbProvider, "effectiveLbProvider");
}
/** Creates ChildLoadBalancingPolicy. */
@SuppressWarnings("unchecked")
public static ChildLoadBalancingPolicy create(
String childPolicyConfigTargetFieldName, List<Map<String, ?>> childPolicies)
throws InvalidChildPolicyConfigException {
Map<String, Object> effectiveChildPolicy = null;
LoadBalancerProvider effectiveLbProvider = null;
List<String> policyTried = new ArrayList<>();
LoadBalancerRegistry lbRegistry = LoadBalancerRegistry.getDefaultRegistry();
for (Map<String, ?> childPolicy : childPolicies) {
if (childPolicy.isEmpty()) {
continue;
}
if (childPolicy.size() != 1) {
throw
new InvalidChildPolicyConfigException(
"childPolicy should have exactly one loadbalancing policy");
}
String policyName = childPolicy.keySet().iterator().next();
LoadBalancerProvider provider = lbRegistry.getProvider(policyName);
if (provider != null) {
effectiveLbProvider = provider;
effectiveChildPolicy = Collections.unmodifiableMap(childPolicy);
break;
}
policyTried.add(policyName);
}
if (effectiveChildPolicy == null) {
throw
new InvalidChildPolicyConfigException(
String.format("no valid childPolicy found, policy tried: %s", policyTried));
}
return
new ChildLoadBalancingPolicy(
childPolicyConfigTargetFieldName,
(Map<String, Object>) effectiveChildPolicy.values().iterator().next(),
effectiveLbProvider);
}
/** Creates a child load balancer config for given target from elected raw child policy. */
public Map<String, ?> getEffectiveChildPolicy(String target) {
Map<String, Object> childPolicy = new HashMap<>(effectiveRawChildPolicy);
childPolicy.put(targetFieldName, target);
return childPolicy;
}
/** Returns the elected child {@link LoadBalancerProvider}. */
public LoadBalancerProvider getEffectiveLbProvider() {
return effectiveLbProvider;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChildLoadBalancingPolicy that = (ChildLoadBalancingPolicy) o;
return Objects.equals(effectiveRawChildPolicy, that.effectiveRawChildPolicy)
&& Objects.equals(effectiveLbProvider, that.effectiveLbProvider)
&& Objects.equals(targetFieldName, that.targetFieldName);
}
@Override
public int hashCode() {
return Objects.hash(effectiveRawChildPolicy, effectiveLbProvider, targetFieldName);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("effectiveRawChildPolicy", effectiveRawChildPolicy)
.add("effectiveLbProvider", effectiveLbProvider)
.add("childPolicyConfigTargetFieldName", targetFieldName)
.toString();
}
}
/** Factory for {@link ChildPolicyWrapper}. */
static final class RefCountedChildPolicyWrapperFactory {
@VisibleForTesting
final Map<String /* target */, RefCountedChildPolicyWrapper> childPolicyMap =
new HashMap<>();
private final ChildLoadBalancerHelperProvider childLbHelperProvider;
@Nullable
private final ChildLbStatusListener childLbStatusListener;
public RefCountedChildPolicyWrapperFactory(
ChildLoadBalancerHelperProvider childLbHelperProvider,
@Nullable ChildLbStatusListener childLbStatusListener) {
this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider");
this.childLbStatusListener = childLbStatusListener;
}
ChildPolicyWrapper createOrGet(String target) {
// TODO(creamsoup) check if the target is valid or not
RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target);
if (pooledChildPolicyWrapper == null) {
ChildPolicyWrapper childPolicyWrapper =
new ChildPolicyWrapper(target, childLbHelperProvider, childLbStatusListener);
pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper);
childPolicyMap.put(target, pooledChildPolicyWrapper);
}
return pooledChildPolicyWrapper.getObject();
}
void release(ChildPolicyWrapper childPolicyWrapper) {
checkNotNull(childPolicyWrapper, "childPolicyWrapper");
String target = childPolicyWrapper.getTarget();
RefCountedChildPolicyWrapper existing = childPolicyMap.get(target);
checkState(existing != null, "Cannot access already released object");
existing.returnObject(childPolicyWrapper);
if (existing.isReleased()) {
childPolicyMap.remove(target);
}
}
}
/**
* ChildPolicyWrapper is a wrapper class for child load balancing policy with associated helper /
* utility classes to manage the child policy.
*/
static final class ChildPolicyWrapper {
private final String target;
private final ChildPolicyReportingHelper helper;
private ConnectivityStateInfo connectivityStateInfo =
ConnectivityStateInfo.forNonError(ConnectivityState.IDLE);
private SubchannelPicker picker;
public ChildPolicyWrapper(
String target,
ChildLoadBalancerHelperProvider childLbHelperProvider,
@Nullable ChildLbStatusListener childLbStatusListener) {
this.target = target;
this.helper =
new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener);
}
String getTarget() {
return target;
}
void setPicker(SubchannelPicker picker) {
this.picker = checkNotNull(picker, "picker");
}
SubchannelPicker getPicker() {
return picker;
}
ChildPolicyReportingHelper getHelper() {
return helper;
}
void setConnectivityStateInfo(ConnectivityStateInfo connectivityStateInfo) {
this.connectivityStateInfo = connectivityStateInfo;
}
ConnectivityStateInfo getConnectivityStateInfo() {
return connectivityStateInfo;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChildPolicyWrapper that = (ChildPolicyWrapper) o;
return Objects.equals(target, that.target)
&& Objects.equals(helper, that.helper)
&& Objects.equals(connectivityStateInfo, that.connectivityStateInfo)
&& Objects.equals(picker, that.picker);
}
@Override
public int hashCode() {
return Objects.hash(target, helper, connectivityStateInfo, picker);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("target", target)
.add("helper", helper)
.add("connectivityStateInfo", connectivityStateInfo)
.add("picker", picker)
.toString();
}
/**
* A delegating {@link io.grpc.LoadBalancer.Helper} maintains status of {@link
* ChildPolicyWrapper} when {@link Subchannel} status changed. This helper is used between child
* policy and parent load-balancer where each picker in child policy is governed by a governing
* picker (RlsPicker). The governing picker will be reported back to the parent load-balancer.
*/
final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper {
private final ChildLoadBalancerHelper delegate;
@Nullable
private final ChildLbStatusListener listener;
ChildPolicyReportingHelper(ChildLoadBalancerHelperProvider childHelperProvider) {
this(childHelperProvider, null);
}
ChildPolicyReportingHelper(
ChildLoadBalancerHelperProvider childHelperProvider,
@Nullable ChildLbStatusListener listener) {
checkNotNull(childHelperProvider, "childHelperProvider");
this.delegate = childHelperProvider.forTarget(getTarget());
this.listener = listener;
}
@Override
protected Helper delegate() {
return delegate;
}
@Override
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
setPicker(newPicker);
super.updateBalancingState(newState, newPicker);
if (listener != null) {
listener.onStatusChanged(newState);
}
}
@Override
public Subchannel createSubchannel(CreateSubchannelArgs args) {
final Subchannel subchannel = super.createSubchannel(args);
return new ForwardingSubchannel() {
@Override
protected Subchannel delegate() {
return subchannel;
}
@Override
public void start(final SubchannelStateListener listener) {
super.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
setConnectivityStateInfo(newState);
listener.onSubchannelState(newState);
}
});
}
};
}
}
}
/** Listener for child lb status change events. */
interface ChildLbStatusListener {
/** Notifies when child lb status changes. */
void onStatusChanged(ConnectivityState newState);
}
private static final class RefCountedChildPolicyWrapper
implements ObjectPool<ChildPolicyWrapper> {
private final AtomicLong refCnt = new AtomicLong();
@Nullable
private ChildPolicyWrapper childPolicyWrapper;
private RefCountedChildPolicyWrapper(ChildPolicyWrapper childPolicyWrapper) {
this.childPolicyWrapper = checkNotNull(childPolicyWrapper, "childPolicyWrapper");
}
@Override
public ChildPolicyWrapper getObject() {
checkState(!isReleased(), "ChildPolicyWrapper is already released");
refCnt.getAndIncrement();
return childPolicyWrapper;
}
@Override
@Nullable
public ChildPolicyWrapper returnObject(Object object) {
checkState(
!isReleased(),
"cannot return already released ChildPolicyWrapper, this is possibly a bug.");
checkState(
childPolicyWrapper == object,
"returned object doesn't match the pooled childPolicyWrapper");
long newCnt = refCnt.decrementAndGet();
checkState(newCnt != -1, "Cannot return never pooled childPolicyWrapper");
if (newCnt == 0) {
childPolicyWrapper = null;
}
return null;
}
boolean isReleased() {
return childPolicyWrapper == null;
}
static RefCountedChildPolicyWrapper of(ChildPolicyWrapper childPolicyWrapper) {
return new RefCountedChildPolicyWrapper(childPolicyWrapper);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("object", childPolicyWrapper)
.add("refCnt", refCnt.get())
.toString();
}
}
/** Exception thrown when attempting to parse child policy encountered parsing issue. */
public static final class InvalidChildPolicyConfigException extends Exception {
private static final long serialVersionUID = 0L;
public InvalidChildPolicyConfigException(String message) {
super(message);
}
@Override
public synchronized Throwable fillInStackTrace() {
// no stack trace above this point
return this;
}
}
}

View File

@ -0,0 +1,200 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.rls.internal;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.Attributes;
import io.grpc.ConnectivityState;
import io.grpc.ConnectivityStateInfo;
import io.grpc.EquivalentAddressGroup;
import io.grpc.LoadBalancer.CreateSubchannelArgs;
import io.grpc.LoadBalancer.Helper;
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.LoadBalancerProvider;
import io.grpc.LoadBalancerRegistry;
import io.grpc.rls.internal.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildLbStatusListener;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildLoadBalancingPolicy;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildPolicyWrapper;
import io.grpc.rls.internal.LbPolicyConfiguration.ChildPolicyWrapper.ChildPolicyReportingHelper;
import io.grpc.rls.internal.LbPolicyConfiguration.InvalidChildPolicyConfigException;
import io.grpc.rls.internal.LbPolicyConfiguration.RefCountedChildPolicyWrapperFactory;
import java.net.SocketAddress;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class LbPolicyConfigurationTest {
private final Helper helper = mock(Helper.class);
private final SubchannelStateManager subchannelStateManager = new SubchannelStateManagerImpl();
private final SubchannelPicker picker = mock(SubchannelPicker.class);
private final ChildLbStatusListener childLbStatusListener = mock(ChildLbStatusListener.class);
private final RefCountedChildPolicyWrapperFactory factory =
new RefCountedChildPolicyWrapperFactory(
new ChildLoadBalancerHelperProvider(helper, subchannelStateManager, picker),
childLbStatusListener);
@Test
public void childPolicyWrapper_refCounted() {
String target = "target";
ChildPolicyWrapper childPolicy = factory.createOrGet(target);
assertThat(factory.childPolicyMap.keySet()).containsExactly(target);
ChildPolicyWrapper childPolicy2 = factory.createOrGet(target);
assertThat(factory.childPolicyMap.keySet()).containsExactly(target);
assertThat(childPolicy2).isEqualTo(childPolicy);
factory.release(childPolicy2);
assertThat(factory.childPolicyMap.keySet()).containsExactly(target);
factory.release(childPolicy);
assertThat(factory.childPolicyMap).isEmpty();
try {
factory.release(childPolicy);
fail("should not be able to access already released policy");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("already released");
}
}
@Test
public void childLoadBalancingPolicy_effectiveChildPolicy() {
LoadBalancerProvider mockProvider = mock(LoadBalancerProvider.class);
ChildLoadBalancingPolicy childLbPolicy =
new ChildLoadBalancingPolicy(
"targetFieldName",
ImmutableMap.<String, Object>of("foo", "bar"),
mockProvider);
assertThat(childLbPolicy.getEffectiveChildPolicy("target"))
.containsExactly("foo", "bar", "targetFieldName", "target");
assertThat(childLbPolicy.getEffectiveLbProvider()).isEqualTo(mockProvider);
}
@Test
public void childLoadBalancingPolicy_noPolicyProvided() {
LoadBalancerProvider mockProvider = mock(LoadBalancerProvider.class);
when(mockProvider.getPolicyName()).thenReturn("rls");
when(mockProvider.isAvailable()).thenReturn(true);
LoadBalancerRegistry.getDefaultRegistry().register(mockProvider);
try {
ChildLoadBalancingPolicy.create(
"targetFieldName",
ImmutableList.<Map<String, ?>>of(
ImmutableMap.<String, Object>of(
"rls", ImmutableMap.of(), "rls2", ImmutableMap.of())));
fail("parsing exception expected");
} catch (InvalidChildPolicyConfigException e) {
assertThat(e).hasMessageThat()
.contains("childPolicy should have exactly one loadbalancing policy");
} finally {
LoadBalancerRegistry.getDefaultRegistry().deregister(mockProvider);
}
}
@Test
public void childLoadBalancingPolicy_tooManyChildPolicies() {
try {
ChildLoadBalancingPolicy
.create("targetFieldName", ImmutableList.<Map<String, ?>>of());
fail("parsing exception expected");
} catch (InvalidChildPolicyConfigException e) {
assertThat(e).hasMessageThat().contains("no valid childPolicy found");
}
}
@Test
public void subchannelStateChange_updateChildPolicyWrapper() {
ChildPolicyWrapper childPolicyWrapper = factory.createOrGet("foo.google.com");
ChildPolicyReportingHelper childPolicyReportingHelper = childPolicyWrapper.getHelper();
FakeSubchannel fakeSubchannel = new FakeSubchannel();
when(helper.createSubchannel(any(CreateSubchannelArgs.class))).thenReturn(fakeSubchannel);
Subchannel subchannel =
childPolicyReportingHelper
.createSubchannel(
CreateSubchannelArgs.newBuilder()
.setAddresses(new EquivalentAddressGroup(mock(SocketAddress.class)))
.build());
subchannel.start(new SubchannelStateListener() {
@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
// no-op
}
});
fakeSubchannel.updateState(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
assertThat(childPolicyWrapper.getConnectivityStateInfo())
.isEqualTo(ConnectivityStateInfo.forNonError(ConnectivityState.CONNECTING));
}
@Test
public void updateBalancingState_triggersListener() {
ChildPolicyWrapper childPolicyWrapper = factory.createOrGet("foo.google.com");
ChildPolicyReportingHelper childPolicyReportingHelper = childPolicyWrapper.getHelper();
SubchannelPicker childPicker = mock(SubchannelPicker.class);
childPolicyReportingHelper.updateBalancingState(ConnectivityState.READY, childPicker);
verify(childLbStatusListener).onStatusChanged(ConnectivityState.READY);
assertThat(childPolicyWrapper.getPicker()).isEqualTo(childPicker);
// picker governs childPickers will be reported to parent LB
verify(helper).updateBalancingState(ConnectivityState.READY, picker);
}
private static class FakeSubchannel extends Subchannel {
private SubchannelStateListener listener;
@Override
public void start(SubchannelStateListener listener) {
this.listener = listener;
}
void updateState(ConnectivityStateInfo newState) {
checkState(listener != null, "channel is not started yet");
listener.onSubchannelState(newState);
}
@Override
public void shutdown() {}
@Override
public void requestConnection() {}
@Override
public Attributes getAttributes() {
return null;
}
}
}