diff --git a/core/src/main/java/io/grpc/LoadBalancer.java b/core/src/main/java/io/grpc/LoadBalancer.java index 989077eeff..b6ab74db85 100644 --- a/core/src/main/java/io/grpc/LoadBalancer.java +++ b/core/src/main/java/io/grpc/LoadBalancer.java @@ -158,11 +158,6 @@ public abstract class LoadBalancer { */ public abstract void shutdown(); - @Override - public String toString() { - return getClass().getSimpleName(); - } - /** * The main balancing logic. It must be thread-safe. Typically it should only * synchronize on its own state, and avoid synchronizing with the LoadBalancer's state. diff --git a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java index 811ef867c8..fa89dc1cfd 100644 --- a/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java +++ b/core/src/main/java/io/grpc/internal/AutoConfiguredLoadBalancerFactory.java @@ -122,7 +122,8 @@ final class AutoConfiguredLoadBalancerFactory extends LoadBalancer.Factory { delegate = delegateProvider.newLoadBalancer(helper); if (channelTracer != null) { channelTracer.reportEvent(new ChannelTrace.Event.Builder() - .setDescription("Load balancer changed from " + old + " to " + delegate) + .setDescription("Load balancer changed from " + old.getClass().getSimpleName() + + " to " + delegate.getClass().getSimpleName()) .setSeverity(ChannelTrace.Event.Severity.CT_INFO) .setTimestampNanos(timeProvider.currentTimeNanos()) .build()); diff --git a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java index 5295327b4d..f112e7144c 100644 --- a/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java +++ b/core/src/main/java/io/grpc/internal/ServiceConfigUtil.java @@ -62,6 +62,32 @@ public final class ServiceConfigUtil { private ServiceConfigUtil() {} + /** + * Fetch the health-checked service name from service config. {@code null} if can't find one. + */ + @Nullable + public static String getHealthCheckedServiceName(@Nullable Map serviceConfig) { + String healthCheckKey = "healthCheckConfig"; + String serviceNameKey = "serviceName"; + if (serviceConfig == null || !serviceConfig.containsKey(healthCheckKey)) { + return null; + } + + /* schema as follows + { + "healthCheckConfig": { + // Service name to use in the health-checking request. + "serviceName": string + } + } + */ + Map healthCheck = getObject(serviceConfig, healthCheckKey); + if (!healthCheck.containsKey(serviceNameKey)) { + return null; + } + return getString(healthCheck, "serviceName"); + } + @Nullable static Throttle getThrottlePolicy(@Nullable Map serviceConfig) { String retryThrottlingKey = "retryThrottling"; diff --git a/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java b/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java new file mode 100644 index 0000000000..5be992643c --- /dev/null +++ b/core/src/main/java/io/grpc/util/ForwardingLoadBalancer.java @@ -0,0 +1,64 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import com.google.common.base.MoreObjects; +import io.grpc.Attributes; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.ExperimentalApi; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer; +import io.grpc.NameResolver; +import io.grpc.Status; +import java.util.List; + +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771") +public abstract class ForwardingLoadBalancer extends LoadBalancer { + /** + * Returns the underlying balancer. + */ + protected abstract LoadBalancer delegate(); + + @Override + public void handleResolvedAddressGroups( + List servers, + @NameResolver.ResolutionResultAttr Attributes attributes) { + delegate().handleResolvedAddressGroups(servers, attributes); + } + + @Override + public void handleNameResolutionError(Status error) { + delegate().handleNameResolutionError(error); + } + + @Override + public void handleSubchannelState( + Subchannel subchannel, ConnectivityStateInfo stateInfo) { + delegate().handleSubchannelState(subchannel, stateInfo); + } + + @Override + public void shutdown() { + delegate().shutdown(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); + } +} diff --git a/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java new file mode 100644 index 0000000000..be6c7f7d2e --- /dev/null +++ b/core/src/test/java/io/grpc/util/ForwardingLoadBalancerTest.java @@ -0,0 +1,51 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.util; + +import static org.mockito.Mockito.mock; + +import io.grpc.ForwardingTestUtil; +import io.grpc.LoadBalancer; +import java.lang.reflect.Method; +import java.net.SocketAddress; +import java.util.Collections; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Unit tests for {@link ForwardingLoadBalancer}. */ +@RunWith(JUnit4.class) +public class ForwardingLoadBalancerTest { + private final LoadBalancer mockDelegate = mock(LoadBalancer.class); + + private final class TestBalancer extends ForwardingLoadBalancer { + @Override + protected LoadBalancer delegate() { + return mockDelegate; + } + } + + @Test + public void allMethodsForwarded() throws Exception { + final SocketAddress mockAddr = mock(SocketAddress.class); + ForwardingTestUtil.testMethodsForwarded( + LoadBalancer.class, + mockDelegate, + new TestBalancer(), + Collections.emptyList()); + } +} diff --git a/services/build.gradle b/services/build.gradle index 1679de9c41..a22b9fdc1a 100644 --- a/services/build.gradle +++ b/services/build.gradle @@ -28,7 +28,8 @@ dependencies { compileOnly libraries.javax_annotation testCompile project(':grpc-testing'), - libraries.netty_epoll // for DomainSocketAddress + libraries.netty_epoll, // for DomainSocketAddress + project(':grpc-core').sourceSets.test.output // for FakeClock testCompileOnly libraries.javax_annotation signature "org.codehaus.mojo.signature:java17:1.0@signature" } diff --git a/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java new file mode 100644 index 0000000000..e5f15d0e40 --- /dev/null +++ b/services/src/main/java/io/grpc/services/HealthCheckingLoadBalancerFactory.java @@ -0,0 +1,434 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; + +import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; +import io.grpc.Attributes; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ConnectivityStateInfo; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Factory; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.SynchronizationContext.ScheduledHandle; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.GrpcAttributes; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.TimeProvider; +import io.grpc.util.ForwardingLoadBalancer; +import io.grpc.util.ForwardingLoadBalancerHelper; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + +/** + * Wraps a {@link LoadBalancer} and implements the client-side health-checking + * (https://github.com/grpc/proposal/blob/master/A17-client-side-health-checking.md). The + * Subchannel received by the states wrapped LoadBalancer will be determined by health-checking. + * + *

Note the original LoadBalancer must call {@code Helper.createSubchannel()} from the + * SynchronizationContext, or it will throw. + */ +final class HealthCheckingLoadBalancerFactory extends Factory { + private static final Attributes.Key KEY_HEALTH_CHECK_STATE = + Attributes.Key.create("io.grpc.services.HealthCheckingLoadBalancerFactory.healthCheckState"); + + private final Factory delegateFactory; + private final BackoffPolicy.Provider backoffPolicyProvider; + private final TimeProvider time; + + public HealthCheckingLoadBalancerFactory( + Factory delegateFactory, BackoffPolicy.Provider backoffPolicyProvider, TimeProvider time) { + this.delegateFactory = checkNotNull(delegateFactory, "delegateFactory"); + this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider"); + this.time = checkNotNull(time, "time"); + } + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + HelperImpl wrappedHelper = new HelperImpl(helper); + LoadBalancer delegateBalancer = delegateFactory.newLoadBalancer(wrappedHelper); + wrappedHelper.init(delegateBalancer); + return new LoadBalancerImpl(wrappedHelper, delegateBalancer); + } + + private final class HelperImpl extends ForwardingLoadBalancerHelper { + private final Helper delegate; + private final SynchronizationContext syncContext; + + private LoadBalancer delegateBalancer; + @Nullable String healthCheckedService; + + final HashSet hcStates = new HashSet(); + + HelperImpl(Helper delegate) { + this.delegate = checkNotNull(delegate, "delegate"); + this.syncContext = checkNotNull(delegate.getSynchronizationContext(), "syncContext"); + } + + void init(LoadBalancer delegateBalancer) { + checkState(this.delegateBalancer == null, "init() already called"); + this.delegateBalancer = checkNotNull(delegateBalancer, "delegateBalancer"); + } + + @Override + protected Helper delegate() { + return delegate; + } + + @Override + public Subchannel createSubchannel(List addrs, Attributes attrs) { + // HealthCheckState is not thread-safe, we are requiring the original LoadBalancer calls + // createSubchannel() from the SynchronizationContext. + syncContext.throwIfNotInThisSynchronizationContext(); + HealthCheckState hcState = new HealthCheckState( + delegateBalancer, syncContext, delegate.getScheduledExecutorService()); + hcStates.add(hcState); + Subchannel subchannel = super.createSubchannel( + addrs, attrs.toBuilder().set(KEY_HEALTH_CHECK_STATE, hcState).build()); + hcState.init(subchannel); + if (healthCheckedService != null) { + hcState.setServiceName(healthCheckedService); + } + return subchannel; + } + + void setHealthCheckedService(@Nullable String service) { + healthCheckedService = service; + for (HealthCheckState hcState : hcStates) { + hcState.setServiceName(service); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); + } + } + + private static final class LoadBalancerImpl extends ForwardingLoadBalancer { + final LoadBalancer delegate; + final HelperImpl helper; + final SynchronizationContext syncContext; + final ScheduledExecutorService timerService; + + LoadBalancerImpl(HelperImpl helper, LoadBalancer delegate) { + this.helper = checkNotNull(helper, "helper"); + this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext"); + this.timerService = checkNotNull(helper.getScheduledExecutorService(), "timerService"); + this.delegate = checkNotNull(delegate, "delegate"); + } + + @Override + protected LoadBalancer delegate() { + return delegate; + } + + @Override + public void handleResolvedAddressGroups( + List servers, Attributes attributes) { + Map serviceConfig = + attributes.get(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG); + String serviceName = ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig); + helper.setHealthCheckedService(serviceName); + super.handleResolvedAddressGroups(servers, attributes); + } + + @Override + public void handleSubchannelState( + Subchannel subchannel, ConnectivityStateInfo stateInfo) { + HealthCheckState hcState = + checkNotNull(subchannel.getAttributes().get(KEY_HEALTH_CHECK_STATE), "hcState"); + hcState.updateRawState(stateInfo); + + if (Objects.equal(stateInfo.getState(), SHUTDOWN)) { + helper.hcStates.remove(hcState); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString(); + } + } + + + // All methods are run from syncContext + private final class HealthCheckState { + private final Runnable retryTask = new Runnable() { + @Override + public void run() { + startRpc(); + } + }; + + private final LoadBalancer delegate; + private final SynchronizationContext syncContext; + private final ScheduledExecutorService timerService; + + private Subchannel subchannel; + + // Set when RPC started. Cleared when the RPC has closed or abandoned. + @Nullable + private HcStream activeRpc; + + // The service name that should be used for health checking + private String serviceName; + private BackoffPolicy backoffPolicy; + // The state from the underlying Subchannel + private ConnectivityStateInfo rawState = ConnectivityStateInfo.forNonError(IDLE); + // The state concluded from health checking + private ConnectivityStateInfo concludedState = ConnectivityStateInfo.forNonError(IDLE); + // true if a health check stream should be kept. When true, either there is an active RPC, or a + // retry is pending. + private boolean running; + // true if server returned UNIMPLEMENTED + private boolean disabled; + private ScheduledHandle retryTimer; + + HealthCheckState( + LoadBalancer delegate, SynchronizationContext syncContext, + ScheduledExecutorService timerService) { + this.delegate = checkNotNull(delegate, "delegate"); + this.syncContext = checkNotNull(syncContext, "syncContext"); + this.timerService = checkNotNull(timerService, "timerService"); + } + + void init(Subchannel subchannel) { + checkState(this.subchannel == null, "init() already called"); + this.subchannel = checkNotNull(subchannel, "subchannel"); + } + + void setServiceName(@Nullable String newServiceName) { + if (Objects.equal(newServiceName, serviceName)) { + return; + } + serviceName = newServiceName; + // If service name has changed while there is active RPC, cancel it so that + // a new call will be made with the new name. + String cancelMsg = + serviceName == null ? "Health check disabled by service config" + : "Switching to new service name: " + newServiceName; + stopRpc(cancelMsg); + adjustHealthCheck(); + } + + void updateRawState(ConnectivityStateInfo rawState) { + if (Objects.equal(this.rawState.getState(), READY) + && !Objects.equal(rawState.getState(), READY)) { + // A connection was lost. We will reset disabled flag because health check + // may be available on the new connection. + disabled = false; + // TODO(zhangkun83): record this to channel tracer + } + this.rawState = rawState; + adjustHealthCheck(); + } + + private boolean isRetryTimerPending() { + return retryTimer != null && retryTimer.isPending(); + } + + // Start or stop health check according to the current states. + private void adjustHealthCheck() { + if (!disabled && serviceName != null && Objects.equal(rawState.getState(), READY)) { + running = true; + if (activeRpc == null && !isRetryTimerPending()) { + startRpc(); + } + } else { + running = false; + // Prerequisites for health checking not met. + // Make sure it's stopped. + stopRpc("Client stops health check"); + backoffPolicy = null; + gotoState(rawState); + } + } + + private void startRpc() { + checkState(activeRpc == null, "previous health-checking RPC has not been cleaned up"); + checkState(subchannel != null, "init() not called"); + // Optimization suggested by @markroth: if we are already READY and starting the health + // checking RPC, either because health check is just enabled or has switched to a new service + // name, we don't go to CONNECTING, otherwise there will be artificial delays on RPCs + // waiting for the health check to respond. + if (!Objects.equal(concludedState.getState(), READY)) { + gotoState(ConnectivityStateInfo.forNonError(CONNECTING)); + } + activeRpc = new HcStream(); + activeRpc.start(); + } + + private void stopRpc(String msg) { + if (activeRpc != null) { + activeRpc.cancel(msg); + // Abandon this RPC. We are not interested in anything from this RPC any more. + activeRpc = null; + } + if (retryTimer != null) { + retryTimer.cancel(); + retryTimer = null; + } + } + + private void gotoState(ConnectivityStateInfo newState) { + checkState(subchannel != null, "init() not called"); + if (!Objects.equal(concludedState, newState)) { + concludedState = newState; + delegate.handleSubchannelState(subchannel, concludedState); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("running", running) + .add("disabled", disabled) + .add("activeRpc", activeRpc) + .add("serviceName", serviceName) + .add("rawState", rawState) + .add("concludedState", concludedState) + .toString(); + } + + private class HcStream extends ClientCall.Listener { + private final ClientCall call; + private final String callServiceName; + private final long callCreationNanos; + private boolean callHasResponded; + + HcStream() { + callCreationNanos = time.currentTimeNanos(); + callServiceName = serviceName; + call = subchannel.asChannel().newCall(HealthGrpc.getWatchMethod(), CallOptions.DEFAULT); + } + + void start() { + call.start(this, new Metadata()); + call.sendMessage(HealthCheckRequest.newBuilder().setService(serviceName).build()); + call.halfClose(); + call.request(1); + } + + void cancel(String msg) { + call.cancel(msg, null); + } + + @Override + public void onMessage(final HealthCheckResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (activeRpc == HcStream.this) { + handleResponse(response); + } + } + }); + } + + @Override + public void onClose(final Status status, Metadata trailers) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (activeRpc == HcStream.this) { + activeRpc = null; + handleStreamClosed(status); + } + } + }); + } + + void handleResponse(HealthCheckResponse response) { + callHasResponded = true; + backoffPolicy = null; + // running == true means the Subchannel's state (rawState) is READY + if (Objects.equal(response.getStatus(), ServingStatus.SERVING)) { + gotoState(ConnectivityStateInfo.forNonError(READY)); + } else { + gotoState( + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription( + "Health-check service responded " + + response.getStatus() + " for '" + callServiceName + "'"))); + } + call.request(1); + } + + void handleStreamClosed(Status status) { + if (Objects.equal(status.getCode(), Code.UNIMPLEMENTED)) { + // TODO(zhangkun83): record this to channel tracer + disabled = true; + gotoState(rawState); + return; + } + long delayNanos = 0; + gotoState( + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription( + "Health-check stream unexpectedly closed with " + + status + " for '" + callServiceName + "'"))); + // Use backoff only when server has not responded for the previous call + if (!callHasResponded) { + if (backoffPolicy == null) { + backoffPolicy = backoffPolicyProvider.get(); + } + delayNanos = + callCreationNanos + backoffPolicy.nextBackoffNanos() - time.currentTimeNanos(); + } + if (delayNanos <= 0) { + startRpc(); + } else { + checkState(!isRetryTimerPending(), "Retry double scheduled"); + retryTimer = syncContext.schedule( + retryTask, delayNanos, TimeUnit.NANOSECONDS, timerService); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("callStarted", call != null) + .add("serviceName", callServiceName) + .add("hasResponded", callHasResponded) + .toString(); + } + } + } +} diff --git a/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java new file mode 100644 index 0000000000..68cf840ad4 --- /dev/null +++ b/services/src/test/java/io/grpc/services/HealthCheckingLoadBalancerFactoryTest.java @@ -0,0 +1,1110 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.truth.Truth.assertThat; +import static io.grpc.ConnectivityState.CONNECTING; +import static io.grpc.ConnectivityState.IDLE; +import static io.grpc.ConnectivityState.READY; +import static io.grpc.ConnectivityState.SHUTDOWN; +import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; +import static org.junit.Assert.fail; +import static org.mockito.AdditionalAnswers.delegatesTo; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Attributes; +import io.grpc.Channel; +import io.grpc.ConnectivityState; +import io.grpc.ConnectivityStateInfo; +import io.grpc.Context; +import io.grpc.Context.CancellationListener; +import io.grpc.EquivalentAddressGroup; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Factory; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.Subchannel; +import io.grpc.LoadBalancer.SubchannelPicker; +import io.grpc.ManagedChannel; +import io.grpc.NameResolver; +import io.grpc.Server; +import io.grpc.Status; +import io.grpc.Status.Code; +import io.grpc.SynchronizationContext; +import io.grpc.health.v1.HealthCheckRequest; +import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; +import io.grpc.health.v1.HealthGrpc; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.internal.BackoffPolicy; +import io.grpc.internal.FakeClock; +import io.grpc.internal.GrpcAttributes; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.stub.StreamObserver; +import java.net.SocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link HealthCheckingLoadBalancerFactory}. */ +@RunWith(JUnit4.class) +public class HealthCheckingLoadBalancerFactoryTest { + private static final Attributes.Key SUBCHANNEL_ATTR_KEY = + Attributes.Key.create("subchannel-attr-for-test"); + + // We use in-process channels for Subchannel.asChannel(), so that we make sure we are making RPCs + // correctly. Mocking Channel and ClientCall is a bad idea because it can easily be done wrong. + // Each Channel goes to a different server, so that we can verify the health check activity on + // each Subchannel. + private static final int NUM_SUBCHANNELS = 2; + private final EquivalentAddressGroup[] eags = new EquivalentAddressGroup[NUM_SUBCHANNELS]; + @SuppressWarnings({"rawtypes", "unchecked"}) + private final List[] eagLists = new List[NUM_SUBCHANNELS]; + private List resolvedAddressList; + private final Subchannel[] subchannels = new Subchannel[NUM_SUBCHANNELS]; + private final ManagedChannel[] channels = new ManagedChannel[NUM_SUBCHANNELS]; + private final Server[] servers = new Server[NUM_SUBCHANNELS]; + private final HealthImpl[] healthImpls = new HealthImpl[NUM_SUBCHANNELS]; + + private final SynchronizationContext syncContext = new SynchronizationContext( + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new AssertionError(e); + } + }); + private final FakeClock clock = new FakeClock(); + private final Helper origHelper = mock(Helper.class, delegatesTo(new FakeHelper())); + // The helper seen by the origLb + private Helper wrappedHelper; + private final Factory origLbFactory = + mock(Factory.class, delegatesTo(new Factory() { + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + checkState(wrappedHelper == null, "LoadBalancer already created"); + wrappedHelper = helper; + return origLb; + } + })); + + @Mock + private LoadBalancer origLb; + @Captor + ArgumentCaptor attrsCaptor; + @Mock + private BackoffPolicy.Provider backoffPolicyProvider; + @Mock + private BackoffPolicy backoffPolicy1; + @Mock + private BackoffPolicy backoffPolicy2; + + private HealthCheckingLoadBalancerFactory hcLbFactory; + private LoadBalancer hcLbEventDelivery; + + @Before + @SuppressWarnings("unchecked") + public void setup() throws Exception { + MockitoAnnotations.initMocks(this); + + for (int i = 0; i < NUM_SUBCHANNELS; i++) { + HealthImpl healthImpl = new HealthImpl(); + healthImpls[i] = healthImpl; + Server server = + InProcessServerBuilder.forName("health-check-test-" + i) + .addService(healthImpl).directExecutor().build().start(); + servers[i] = server; + ManagedChannel channel = + InProcessChannelBuilder.forName("health-check-test-" + i).directExecutor().build(); + channels[i] = channel; + + EquivalentAddressGroup eag = + new EquivalentAddressGroup(new FakeSocketAddress("address-" + i)); + eags[i] = eag; + List eagList = Arrays.asList(eag); + eagLists[i] = eagList; + } + resolvedAddressList = Arrays.asList(eags); + + when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); + when(backoffPolicy1.nextBackoffNanos()).thenReturn(11L, 21L, 31L); + when(backoffPolicy2.nextBackoffNanos()).thenReturn(12L, 22L, 32L); + + hcLbFactory = new HealthCheckingLoadBalancerFactory( + origLbFactory, backoffPolicyProvider, clock.getTimeProvider()); + final LoadBalancer hcLb = hcLbFactory.newLoadBalancer(origHelper); + // Make sure all calls into the hcLb is from the syncContext + hcLbEventDelivery = new LoadBalancer() { + @Override + public void handleResolvedAddressGroups( + final List servers, final Attributes attributes) { + syncContext.execute(new Runnable() { + @Override + public void run() { + hcLb.handleResolvedAddressGroups(servers, attributes); + } + }); + } + + @Override + public void handleSubchannelState( + final Subchannel subchannel, final ConnectivityStateInfo stateInfo) { + syncContext.execute(new Runnable() { + @Override + public void run() { + hcLb.handleSubchannelState(subchannel, stateInfo); + } + }); + } + + @Override + public void handleNameResolutionError(Status error) { + throw new AssertionError("Not supposed to be called"); + } + + @Override + public void shutdown() { + throw new AssertionError("Not supposed to be called"); + } + }; + verify(origLbFactory).newLoadBalancer(any(Helper.class)); + } + + @After + public void teardown() throws Exception { + // All scheduled tasks have been accounted for + assertThat(clock.getPendingTasks()).isEmpty(); + // Health-check streams are usually not closed in the tests because handleSubchannelState() is + // faked. Force closing for clean up. + for (Server server : servers) { + server.shutdownNow(); + assertThat(server.awaitTermination(1, TimeUnit.SECONDS)).isTrue(); + } + for (ManagedChannel channel : channels) { + channel.shutdownNow(); + assertThat(channel.awaitTermination(1, TimeUnit.SECONDS)).isTrue(); + } + for (HealthImpl impl : healthImpls) { + assertThat(impl.checkCalled).isFalse(); + } + } + + @Test + public void createSubchannelThrowsIfCalledOutsideSynchronizationContext() { + try { + wrappedHelper.createSubchannel(eagLists[0], Attributes.EMPTY); + fail("Should throw"); + } catch (IllegalStateException e) { + assertThat(e.getMessage()).isEqualTo("Not called from the SynchronizationContext"); + } + } + + @Test + public void typicalWorkflow() { + Attributes resolutionAttrs = attrsWithHealthCheckService("FooService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verify(origHelper, atLeast(0)).getSynchronizationContext(); + verify(origHelper, atLeast(0)).getScheduledExecutorService(); + verifyNoMoreInteractions(origHelper); + verifyNoMoreInteractions(origLb); + + // Simulate that the orignal LB creates Subchannels + for (int i = 0; i < NUM_SUBCHANNELS; i++) { + // Subchannel attributes set by origLb are correctly plumbed in + String subchannelAttrValue = "eag attr " + i; + Attributes attrs = Attributes.newBuilder() + .set(SUBCHANNEL_ATTR_KEY, subchannelAttrValue).build(); + // We don't wrap Subchannels, thus origLb gets the original Subchannels. + assertThat(createSubchannel(i, attrs)).isSameAs(subchannels[i]); + verify(origHelper).createSubchannel(same(eagLists[i]), attrsCaptor.capture()); + assertThat(attrsCaptor.getValue().get(SUBCHANNEL_ATTR_KEY)).isEqualTo(subchannelAttrValue); + } + + for (int i = NUM_SUBCHANNELS - 1; i >= 0; i--) { + // Not starting health check until underlying Subchannel is READY + Subchannel subchannel = subchannels[i]; + HealthImpl healthImpl = healthImpls[i]; + InOrder inOrder = inOrder(origLb); + hcLbEventDelivery.handleSubchannelState( + subchannel, ConnectivityStateInfo.forNonError(CONNECTING)); + hcLbEventDelivery.handleSubchannelState( + subchannel, ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE)); + hcLbEventDelivery.handleSubchannelState( + subchannel, ConnectivityStateInfo.forNonError(IDLE)); + + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forTransientFailure(Status.UNAVAILABLE))); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(IDLE))); + verifyNoMoreInteractions(origLb); + + assertThat(healthImpl.calls).isEmpty(); + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + assertThat(healthImpl.calls).hasSize(1); + ServerSideCall serverCall = healthImpl.calls.peek(); + assertThat(serverCall.request).isEqualTo(makeRequest("FooService")); + + // Starting the health check will make the Subchannel appear CONNECTING to the origLb. + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + verifyNoMoreInteractions(origLb); + + // Simulate a series of responses. + for (ServingStatus servingStatus : + new ServingStatus[] { + ServingStatus.UNKNOWN, ServingStatus.NOT_SERVING, ServingStatus.SERVICE_UNKNOWN, + ServingStatus.SERVING, ServingStatus.NOT_SERVING, ServingStatus.SERVING}) { + serverCall.responseObserver.onNext(makeResponse(servingStatus)); + // SERVING is mapped to READY, while other statuses are mapped to TRANSIENT_FAILURE + if (servingStatus == ServingStatus.SERVING) { + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); + } else { + inOrder.verify(origLb).handleSubchannelState( + same(subchannel),unavailableStateWithMsg( + "Health-check service responded " + servingStatus + " for 'FooService'")); + } + verifyNoMoreInteractions(origLb); + } + } + + // origLb shuts down Subchannels + for (int i = 0; i < NUM_SUBCHANNELS; i++) { + Subchannel subchannel = subchannels[i]; + + ServerSideCall serverCall = healthImpls[i].calls.peek(); + assertThat(serverCall.cancelled).isFalse(); + verifyNoMoreInteractions(origLb); + + // Subchannel enters SHUTDOWN state as a response to shutdown(), and that will cancel the + // health check RPC + subchannel.shutdown(); + assertThat(serverCall.cancelled).isTrue(); + verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(SHUTDOWN))); + } + + for (int i = 0; i < NUM_SUBCHANNELS; i++) { + assertThat(healthImpls[i].calls).hasSize(1); + } + + verifyZeroInteractions(backoffPolicyProvider); + } + + @Test + public void healthCheckDisabledWhenServiceNotImplemented() { + Attributes resolutionAttrs = attrsWithHealthCheckService("BarService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + // We create 2 Subchannels. One of them connects to a server that doesn't implement health check + for (int i = 0; i < 2; i++) { + createSubchannel(i, Attributes.EMPTY); + } + + InOrder inOrder = inOrder(origLb); + + for (int i = 0; i < 2; i++) { + hcLbEventDelivery.handleSubchannelState( + subchannels[i], ConnectivityStateInfo.forNonError(READY)); + assertThat(healthImpls[i].calls).hasSize(1); + inOrder.verify(origLb).handleSubchannelState( + same(subchannels[i]), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + } + + ServerSideCall serverCall0 = healthImpls[0].calls.poll(); + ServerSideCall serverCall1 = healthImpls[1].calls.poll(); + + // subchannels[0] gets UNIMPLEMENTED for health checking, which will disable health + // checking and it'll use the original state, which is currently READY. + // In reality UNIMPLEMENTED is generated by GRPC server library, but the client can't tell + // whether it's the server library or the service implementation that returned this status. + serverCall0.responseObserver.onError(Status.UNIMPLEMENTED.asException()); + inOrder.verify(origLb).handleSubchannelState( + same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY))); + + // subchannels[1] has normal health checking + serverCall1.responseObserver.onNext(makeResponse(ServingStatus.NOT_SERVING)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannels[1]), + unavailableStateWithMsg("Health-check service responded NOT_SERVING for 'BarService'")); + + // Without health checking, states from underlying Subchannel are delivered directly to origLb + hcLbEventDelivery.handleSubchannelState( + subchannels[0], ConnectivityStateInfo.forNonError(IDLE)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(IDLE))); + + // Re-connecting on a Subchannel will reset the "disabled" flag. + assertThat(healthImpls[0].calls).hasSize(0); + hcLbEventDelivery.handleSubchannelState( + subchannels[0], ConnectivityStateInfo.forNonError(READY)); + assertThat(healthImpls[0].calls).hasSize(1); + serverCall0 = healthImpls[0].calls.poll(); + inOrder.verify(origLb).handleSubchannelState( + same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + + // Health check now works as normal + serverCall0.responseObserver.onNext(makeResponse(ServingStatus.SERVICE_UNKNOWN)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannels[0]), + unavailableStateWithMsg("Health-check service responded SERVICE_UNKNOWN for 'BarService'")); + + verifyNoMoreInteractions(origLb); + verifyZeroInteractions(backoffPolicyProvider); + } + + @Test + public void backoffRetriesWhenServerErroneouslyClosesRpcBeforeAnyResponse() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); + + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + HealthImpl healthImpl = healthImpls[0]; + assertThat(healthImpl.calls).hasSize(1); + assertThat(clock.getPendingTasks()).isEmpty(); + + // Server closes the health checking RPC without any response + healthImpl.calls.poll().responseObserver.onCompleted(); + + // which results in TRANSIENT_FAILURE + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), + unavailableStateWithMsg( + "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); + + // Retry with backoff is scheduled + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(clock.getPendingTasks()).hasSize(1); + + verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11); + assertThat(clock.getPendingTasks()).isEmpty(); + + // Server closes the health checking RPC without any response + healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException()); + + // which also results in TRANSIENT_FAILURE, with a different description + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), + unavailableStateWithMsg( + "Health-check stream unexpectedly closed with " + + Status.CANCELLED + " for 'TeeService'")); + + // Retry with backoff + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + + verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 21); + + // Server responds this time + healthImpl.calls.poll().responseObserver.onNext(makeResponse(ServingStatus.SERVING)); + + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); + + verifyNoMoreInteractions(origLb, backoffPolicyProvider, backoffPolicy1); + } + + @Test + public void serverRespondResetsBackoff() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); + + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + HealthImpl healthImpl = healthImpls[0]; + assertThat(healthImpl.calls).hasSize(1); + assertThat(clock.getPendingTasks()).isEmpty(); + + // Server closes the health checking RPC without any response + healthImpl.calls.poll().responseObserver.onError(Status.CANCELLED.asException()); + + // which results in TRANSIENT_FAILURE + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), + unavailableStateWithMsg( + "Health-check stream unexpectedly closed with " + + Status.CANCELLED + " for 'TeeService'")); + + // Retry with backoff is scheduled + inOrder.verify(backoffPolicyProvider).get(); + inOrder.verify(backoffPolicy1).nextBackoffNanos(); + assertThat(clock.getPendingTasks()).hasSize(1); + + verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 11); + assertThat(clock.getPendingTasks()).isEmpty(); + + // Server responds + healthImpl.calls.peek().responseObserver.onNext(makeResponse(ServingStatus.SERVING)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); + + verifyNoMoreInteractions(origLb); + + // then closes the stream + healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), + unavailableStateWithMsg( + "Health-check stream unexpectedly closed with " + + Status.UNAVAILABLE + " for 'TeeService'")); + + // Because server has responded, the first retry is not subject to backoff. + // But the backoff policy has been reset. A new backoff policy will be used for + // the next backed-off retry. + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + assertThat(healthImpl.calls).hasSize(1); + assertThat(clock.getPendingTasks()).isEmpty(); + inOrder.verifyNoMoreInteractions(); + + // then closes the stream for this retry + healthImpl.calls.poll().responseObserver.onError(Status.UNAVAILABLE.asException()); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), + unavailableStateWithMsg( + "Health-check stream unexpectedly closed with " + + Status.UNAVAILABLE + " for 'TeeService'")); + + // New backoff policy is used + inOrder.verify(backoffPolicyProvider).get(); + // Retry with a new backoff policy + inOrder.verify(backoffPolicy2).nextBackoffNanos(); + + verifyRetryAfterNanos(inOrder, subchannel, healthImpl, 12); + } + + private void verifyRetryAfterNanos( + InOrder inOrder, Subchannel subchannel, HealthImpl impl, long nanos) { + assertThat(impl.calls).isEmpty(); + clock.forwardNanos(nanos - 1); + assertThat(impl.calls).isEmpty(); + inOrder.verifyNoMoreInteractions(); + verifyNoMoreInteractions(origLb); + clock.forwardNanos(1); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + assertThat(impl.calls).hasSize(1); + } + + @Test + public void serviceConfigHasNoHealthCheckingInitiallyButDoesLater() { + // No service config, thus no health check. + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(Attributes.EMPTY)); + verifyNoMoreInteractions(origLb); + + // First, create Subchannels 0 + createSubchannel(0, Attributes.EMPTY); + + // No health check activity. Underlying Subchannel states are directly propagated + hcLbEventDelivery.handleSubchannelState( + subchannels[0], ConnectivityStateInfo.forNonError(READY)); + assertThat(healthImpls[0].calls).isEmpty(); + verify(origLb).handleSubchannelState( + same(subchannels[0]), eq(ConnectivityStateInfo.forNonError(READY))); + + verifyNoMoreInteractions(origLb); + + // Service config enables health check + Attributes resolutionAttrs = attrsWithHealthCheckService("FooService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(resolutionAttrs)); + + // Health check started on existing Subchannel + assertThat(healthImpls[0].calls).hasSize(1); + + // State stays in READY, instead of switching to CONNECTING. + verifyNoMoreInteractions(origLb); + + // Start Subchannel 1, which will have health check + createSubchannel(1, Attributes.EMPTY); + assertThat(healthImpls[1].calls).isEmpty(); + hcLbEventDelivery.handleSubchannelState( + subchannels[1], ConnectivityStateInfo.forNonError(READY)); + assertThat(healthImpls[1].calls).hasSize(1); + } + + @Test + public void serviceConfigDisablesHealthCheckWhenRpcActive() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb); + + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + inOrder.verifyNoMoreInteractions(); + HealthImpl healthImpl = healthImpls[0]; + assertThat(healthImpl.calls).hasSize(1); + ServerSideCall serverCall = healthImpl.calls.poll(); + assertThat(serverCall.cancelled).isFalse(); + + // NameResolver gives an update without service config, thus health check will be disabled + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); + + // Health check RPC cancelled. + assertThat(serverCall.cancelled).isTrue(); + // Subchannel uses original state + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); + + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(Attributes.EMPTY)); + + verifyNoMoreInteractions(origLb); + assertThat(healthImpl.calls).isEmpty(); + } + + @Test + public void serviceConfigDisablesHealthCheckWhenRetryPending() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb); + + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + inOrder.verifyNoMoreInteractions(); + HealthImpl healthImpl = healthImpls[0]; + assertThat(healthImpl.calls).hasSize(1); + + // Server closes the stream without responding. Client in retry backoff + assertThat(clock.getPendingTasks()).isEmpty(); + healthImpl.calls.poll().responseObserver.onCompleted(); + assertThat(clock.getPendingTasks()).hasSize(1); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), + unavailableStateWithMsg( + "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); + + // NameResolver gives an update without service config, thus health check will be disabled + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); + + // Retry timer is cancelled + assertThat(clock.getPendingTasks()).isEmpty(); + + // No retry was attempted + assertThat(healthImpl.calls).isEmpty(); + + // Subchannel uses original state + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); + + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(Attributes.EMPTY)); + + verifyNoMoreInteractions(origLb); + } + + @Test + public void serviceConfigDisablesHealthCheckWhenRpcInactive() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb); + + // Underlying subchannel is not READY initially + ConnectivityStateInfo underlyingErrorState = + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection refused")); + hcLbEventDelivery.handleSubchannelState(subchannel, underlyingErrorState); + inOrder.verify(origLb).handleSubchannelState(same(subchannel), same(underlyingErrorState)); + inOrder.verifyNoMoreInteractions(); + + // NameResolver gives an update without service config, thus health check will be disabled + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, Attributes.EMPTY); + + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(Attributes.EMPTY)); + + // Underlying subchannel is now ready + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + + // Since health check is disabled, READY state is propagated directly. + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); + + // and there is no health check activity. + assertThat(healthImpls[0].calls).isEmpty(); + + verifyNoMoreInteractions(origLb); + } + + @Test + public void serviceConfigChangesServiceNameWhenRpcActive() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb); + + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + + HealthImpl healthImpl = healthImpls[0]; + assertThat(healthImpl.calls).hasSize(1); + ServerSideCall serverCall = healthImpl.calls.poll(); + assertThat(serverCall.cancelled).isFalse(); + assertThat(serverCall.request).isEqualTo(makeRequest("TeeService")); + + // Health check responded + serverCall.responseObserver.onNext(makeResponse(ServingStatus.SERVING)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(READY))); + + // Service config returns with the same health check name. + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + // It's delivered to origLb, but nothing else happens + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + // Service config returns a different health check name. + resolutionAttrs = attrsWithHealthCheckService("FooService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(resolutionAttrs)); + + // Current health check RPC cancelled. + assertThat(serverCall.cancelled).isTrue(); + + // A second RPC is started immediately + assertThat(healthImpl.calls).hasSize(1); + serverCall = healthImpl.calls.poll(); + // with the new service name + assertThat(serverCall.request).isEqualTo(makeRequest("FooService")); + + // State stays in READY, instead of switching to CONNECTING. + verifyNoMoreInteractions(origLb); + } + + @Test + public void serviceConfigChangesServiceNameWhenRetryPending() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb); + + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + + HealthImpl healthImpl = healthImpls[0]; + assertThat(healthImpl.calls).hasSize(1); + ServerSideCall serverCall = healthImpl.calls.poll(); + assertThat(serverCall.cancelled).isFalse(); + assertThat(serverCall.request).isEqualTo(makeRequest("TeeService")); + + // Health check stream closed without responding. Client in retry backoff. + assertThat(clock.getPendingTasks()).isEmpty(); + serverCall.responseObserver.onCompleted(); + assertThat(clock.getPendingTasks()).hasSize(1); + assertThat(healthImpl.calls).isEmpty(); + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), + unavailableStateWithMsg( + "Health-check stream unexpectedly closed with " + Status.OK + " for 'TeeService'")); + + // Service config returns with the same health check name. + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + // It's delivered to origLb, but nothing else happens + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + assertThat(clock.getPendingTasks()).hasSize(1); + assertThat(healthImpl.calls).isEmpty(); + + // Service config returns a different health check name. + resolutionAttrs = attrsWithHealthCheckService("FooService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + // Concluded CONNECTING state + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(resolutionAttrs)); + + // Current retry timer cancelled + assertThat(clock.getPendingTasks()).isEmpty(); + + // A second RPC is started immediately + assertThat(healthImpl.calls).hasSize(1); + serverCall = healthImpl.calls.poll(); + // with the new service name + assertThat(serverCall.request).isEqualTo(makeRequest("FooService")); + + verifyNoMoreInteractions(origLb); + } + + @Test + public void serviceConfigChangesServiceNameWhenRpcInactive() { + Attributes resolutionAttrs = attrsWithHealthCheckService("TeeService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + verify(origLb).handleResolvedAddressGroups(same(resolvedAddressList), same(resolutionAttrs)); + verifyNoMoreInteractions(origLb); + + Subchannel subchannel = createSubchannel(0, Attributes.EMPTY); + assertThat(subchannel).isSameAs(subchannels[0]); + InOrder inOrder = inOrder(origLb); + HealthImpl healthImpl = healthImpls[0]; + + // Underlying subchannel is not READY initially + ConnectivityStateInfo underlyingErrorState = + ConnectivityStateInfo.forTransientFailure( + Status.UNAVAILABLE.withDescription("connection refused")); + hcLbEventDelivery.handleSubchannelState(subchannel, underlyingErrorState); + inOrder.verify(origLb).handleSubchannelState(same(subchannel), same(underlyingErrorState)); + inOrder.verifyNoMoreInteractions(); + + // Service config returns with the same health check name. + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + // It's delivered to origLb, but nothing else happens + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(resolutionAttrs)); + assertThat(healthImpl.calls).isEmpty(); + verifyNoMoreInteractions(origLb); + + // Service config returns a different health check name. + resolutionAttrs = attrsWithHealthCheckService("FooService"); + hcLbEventDelivery.handleResolvedAddressGroups(resolvedAddressList, resolutionAttrs); + + inOrder.verify(origLb).handleResolvedAddressGroups( + same(resolvedAddressList), same(resolutionAttrs)); + + // Underlying subchannel is now ready + hcLbEventDelivery.handleSubchannelState(subchannel, ConnectivityStateInfo.forNonError(READY)); + + // Concluded CONNECTING state + inOrder.verify(origLb).handleSubchannelState( + same(subchannel), eq(ConnectivityStateInfo.forNonError(CONNECTING))); + + // Health check RPC is started + assertThat(healthImpl.calls).hasSize(1); + // with the new service name + assertThat(healthImpl.calls.poll().request).isEqualTo(makeRequest("FooService")); + + verifyNoMoreInteractions(origLb); + } + + @Test + public void getHealthCheckedServiceName_nullServiceConfig() { + assertThat(ServiceConfigUtil.getHealthCheckedServiceName(null)).isNull(); + } + + @Test + public void getHealthCheckedServiceName_noHealthCheckConfig() { + assertThat(ServiceConfigUtil.getHealthCheckedServiceName(new HashMap())) + .isNull(); + } + + @Test + public void getHealthCheckedServiceName_healthCheckConfigMissingServiceName() { + HashMap serviceConfig = new HashMap(); + HashMap hcConfig = new HashMap(); + serviceConfig.put("healthCheckConfig", hcConfig); + assertThat(ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig)).isNull(); + } + + @Test + public void getHealthCheckedServiceName_healthCheckConfigHasServiceName() { + HashMap serviceConfig = new HashMap(); + HashMap hcConfig = new HashMap(); + hcConfig.put("serviceName", "FooService"); + serviceConfig.put("healthCheckConfig", hcConfig); + assertThat(ServiceConfigUtil.getHealthCheckedServiceName(serviceConfig)) + .isEqualTo("FooService"); + } + + private Attributes attrsWithHealthCheckService(@Nullable String serviceName) { + HashMap serviceConfig = new HashMap(); + HashMap hcConfig = new HashMap(); + hcConfig.put("serviceName", serviceName); + serviceConfig.put("healthCheckConfig", hcConfig); + return Attributes.newBuilder() + .set(GrpcAttributes.NAME_RESOLVER_SERVICE_CONFIG, serviceConfig).build(); + } + + private HealthCheckRequest makeRequest(String service) { + return HealthCheckRequest.newBuilder().setService(service).build(); + } + + private HealthCheckResponse makeResponse(ServingStatus status) { + return HealthCheckResponse.newBuilder().setStatus(status).build(); + } + + private ConnectivityStateInfo unavailableStateWithMsg(final String expectedMsg) { + return argThat(new org.hamcrest.BaseMatcher() { + @Override + public boolean matches(Object item) { + if (!(item instanceof ConnectivityStateInfo)) { + return false; + } + ConnectivityStateInfo info = (ConnectivityStateInfo) item; + if (!info.getState().equals(TRANSIENT_FAILURE)) { + return false; + } + Status error = info.getStatus(); + if (!error.getCode().equals(Code.UNAVAILABLE)) { + return false; + } + if (!error.getDescription().equals(expectedMsg)) { + return false; + } + return true; + } + + @Override + public void describeTo(org.hamcrest.Description desc) { + desc.appendText("Matches unavailable state with msg='" + expectedMsg + "'"); + } + }); + } + + private static class HealthImpl extends HealthGrpc.HealthImplBase { + boolean isImplemented = true; + boolean checkCalled; + final LinkedList calls = new LinkedList(); + + @Override + public void check(HealthCheckRequest request, + StreamObserver responseObserver) { + responseObserver.onError(new UnsupportedOperationException("Should never be called")); + checkCalled = true; + } + + @Override + public void watch(HealthCheckRequest request, + StreamObserver responseObserver) { + final ServerSideCall call = new ServerSideCall(request, responseObserver); + Context.current().addListener( + new CancellationListener() { + @Override + public void cancelled(Context ctx) { + call.cancelled = true; + } + }, MoreExecutors.directExecutor()); + calls.add(call); + } + } + + private static class ServerSideCall { + final HealthCheckRequest request; + final StreamObserver responseObserver; + boolean cancelled; + + ServerSideCall( + HealthCheckRequest request, StreamObserver responseObserver) { + this.request = request; + this.responseObserver = responseObserver; + } + } + + private class FakeSubchannel extends Subchannel { + final List eagList; + final Attributes attrs; + final Channel channel; + + FakeSubchannel(List eagList, Attributes attrs, Channel channel) { + this.eagList = Collections.unmodifiableList(eagList); + this.attrs = checkNotNull(attrs); + this.channel = checkNotNull(channel); + } + + @Override + public void shutdown() { + hcLbEventDelivery.handleSubchannelState(this, ConnectivityStateInfo.forNonError(SHUTDOWN)); + } + + @Override + public void requestConnection() { + throw new AssertionError("Should not be called"); + } + + @Override + public List getAllAddresses() { + return eagList; + } + + @Override + public Attributes getAttributes() { + return attrs; + } + + @Override + public Channel asChannel() { + return channel; + } + } + + private class FakeHelper extends Helper { + @Override + public Subchannel createSubchannel(List addrs, Attributes attrs) { + int index = -1; + for (int i = 0; i < NUM_SUBCHANNELS; i++) { + if (eagLists[i] == addrs) { + index = i; + break; + } + } + checkState(index >= 0, "addrs " + addrs + " not found"); + Subchannel subchannel = new FakeSubchannel(addrs, attrs, channels[index]); + checkState(subchannels[index] == null, "subchannels[" + index + "] already created"); + subchannels[index] = subchannel; + return subchannel; + } + + @Override + public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { + throw new AssertionError("Should not be called"); + } + + @Override + public SynchronizationContext getSynchronizationContext() { + return syncContext; + } + + @Override + public ScheduledExecutorService getScheduledExecutorService() { + return clock.getScheduledExecutorService(); + } + + @Override + public NameResolver.Factory getNameResolverFactory() { + throw new AssertionError("Should not be called"); + } + + @Override + public String getAuthority() { + throw new AssertionError("Should not be called"); + } + + @Override + public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) { + throw new AssertionError("Should not be called"); + } + } + + private static class FakeSocketAddress extends SocketAddress { + final String name; + + FakeSocketAddress(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + + // In reality wrappedHelper.createSubchannel() is always called from syncContext. + // Make sure it's the case in the test too. + private Subchannel createSubchannel(final int index, final Attributes attrs) { + final AtomicReference returnedSubchannel = new AtomicReference(); + syncContext.execute(new Runnable() { + @Override + public void run() { + returnedSubchannel.set(wrappedHelper.createSubchannel(eagLists[index], attrs)); + } + }); + return returnedSubchannel.get(); + } +}