From 11f9e8d5b45f13d37344b74bbd056c00e43ae984 Mon Sep 17 00:00:00 2001 From: Kun Zhang Date: Fri, 12 Oct 2018 16:02:25 -0700 Subject: [PATCH] services: implement Health.Watch (#4930) --- .../io/grpc/services/HealthServiceImpl.java | 113 +++++++-- .../services/HealthStatusManagerTest.java | 236 +++++++++++++----- 2 files changed, 277 insertions(+), 72 deletions(-) diff --git a/services/src/main/java/io/grpc/services/HealthServiceImpl.java b/services/src/main/java/io/grpc/services/HealthServiceImpl.java index d27299b7ec..724488ee70 100644 --- a/services/src/main/java/io/grpc/services/HealthServiceImpl.java +++ b/services/src/main/java/io/grpc/services/HealthServiceImpl.java @@ -16,6 +16,10 @@ package io.grpc.services; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; +import io.grpc.Context.CancellationListener; +import io.grpc.Context; import io.grpc.Status; import io.grpc.StatusException; import io.grpc.health.v1.HealthCheckRequest; @@ -23,23 +27,35 @@ import io.grpc.health.v1.HealthCheckResponse; import io.grpc.health.v1.HealthCheckResponse.ServingStatus; import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.StreamObserver; -import java.util.Map; +import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; - +import javax.annotation.concurrent.GuardedBy; final class HealthServiceImpl extends HealthGrpc.HealthImplBase { - /* Due to the latency of rpc calls, synchronization of the map does not help with consistency. - * However, need use ConcurrentHashMap to prevent the possible race condition of concurrently - * putting two keys with a colliding hashCode into the map.*/ - private final Map statusMap - = new ConcurrentHashMap(); + // Due to the latency of rpc calls, synchronization of the map does not help with consistency. + // However, need use ConcurrentHashMap to allow concurrent reading by check(). + private final ConcurrentHashMap statusMap = + new ConcurrentHashMap(); + + private final Object watchLock = new Object(); + + // Technically a Multimap>. The Boolean value is not + // used. The StreamObservers need to be kept in a identity-equality set, to make sure + // user-defined equals() doesn't confuse our book-keeping of the StreamObservers. Constructing + // such Multimap would require extra lines and the end result is not significally simpler, thus I + // would rather not have the Guava collections dependency. + @GuardedBy("watchLock") + private final HashMap, Boolean>> + watchers = + new HashMap, Boolean>>(); @Override public void check(HealthCheckRequest request, StreamObserver responseObserver) { - ServingStatus status = getStatus(request.getService()); + ServingStatus status = statusMap.get(request.getService()); if (status == null) { responseObserver.onError(new StatusException( Status.NOT_FOUND.withDescription("unknown service " + request.getService()))); @@ -50,16 +66,85 @@ final class HealthServiceImpl extends HealthGrpc.HealthImplBase { } } - void setStatus(String service, ServingStatus status) { - statusMap.put(service, status); + @Override + public void watch(HealthCheckRequest request, + final StreamObserver responseObserver) { + final String service = request.getService(); + synchronized (watchLock) { + ServingStatus status = statusMap.get(service); + responseObserver.onNext(getResponseForWatch(status)); + IdentityHashMap, Boolean> serviceWatchers = + watchers.get(service); + if (serviceWatchers == null) { + serviceWatchers = new IdentityHashMap, Boolean>(); + watchers.put(service, serviceWatchers); + } + serviceWatchers.put(responseObserver, Boolean.TRUE); + } + Context.current().addListener( + new CancellationListener() { + @Override + // Called when the client has closed the stream + public void cancelled(Context context) { + synchronized (watchLock) { + IdentityHashMap, Boolean> serviceWatchers = + watchers.get(service); + if (serviceWatchers != null) { + serviceWatchers.remove(responseObserver); + if (serviceWatchers.isEmpty()) { + watchers.remove(service); + } + } + } + } + }, + MoreExecutors.directExecutor()); } - @Nullable - ServingStatus getStatus(String service) { - return statusMap.get(service); + void setStatus(String service, ServingStatus status) { + synchronized (watchLock) { + ServingStatus prevStatus = statusMap.put(service, status); + if (prevStatus != status) { + notifyWatchers(service, status); + } + } } void clearStatus(String service) { - statusMap.remove(service); + synchronized (watchLock) { + ServingStatus prevStatus = statusMap.remove(service); + if (prevStatus != null) { + notifyWatchers(service, null); + } + } + } + + @VisibleForTesting + int numWatchersForTest(String service) { + synchronized (watchLock) { + IdentityHashMap, Boolean> serviceWatchers = + watchers.get(service); + if (serviceWatchers == null) { + return 0; + } + return serviceWatchers.size(); + } + } + + @GuardedBy("watchLock") + private void notifyWatchers(String service, @Nullable ServingStatus status) { + HealthCheckResponse response = getResponseForWatch(status); + IdentityHashMap, Boolean> serviceWatchers = + watchers.get(service); + if (serviceWatchers != null) { + for (StreamObserver responseObserver : serviceWatchers.keySet()) { + responseObserver.onNext(response); + } + } + } + + private static HealthCheckResponse getResponseForWatch(@Nullable ServingStatus recordedStatus) { + return HealthCheckResponse.newBuilder().setStatus( + recordedStatus == null ? ServingStatus.SERVICE_UNKNOWN : recordedStatus).build(); } } diff --git a/services/src/test/java/io/grpc/services/HealthStatusManagerTest.java b/services/src/test/java/io/grpc/services/HealthStatusManagerTest.java index ef71ff8ba3..ec3e4822e8 100644 --- a/services/src/test/java/io/grpc/services/HealthStatusManagerTest.java +++ b/services/src/test/java/io/grpc/services/HealthStatusManagerTest.java @@ -16,99 +16,219 @@ package io.grpc.services; -import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; +import io.grpc.BindableService; +import io.grpc.Context.CancellableContext; +import io.grpc.Context; import io.grpc.Status; -import io.grpc.StatusException; +import io.grpc.StatusRuntimeException; import io.grpc.health.v1.HealthCheckRequest; import io.grpc.health.v1.HealthCheckResponse; +import io.grpc.health.v1.HealthCheckResponse.ServingStatus; import io.grpc.health.v1.HealthGrpc; import io.grpc.stub.StreamObserver; +import io.grpc.testing.GrpcServerRule; +import java.util.ArrayDeque; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.InOrder; -/** Unit tests for {@link HealthStatusManager}. */ +/** Tests for {@link HealthStatusManager}. */ @RunWith(JUnit4.class) public class HealthStatusManagerTest { + private static final String SERVICE1 = "service1"; + private static final String SERVICE2 = "service2"; + + @Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor(); private final HealthStatusManager manager = new HealthStatusManager(); - private final HealthGrpc.HealthImplBase health = - (HealthGrpc.HealthImplBase) manager.getHealthService(); - private final HealthCheckResponse.ServingStatus status - = HealthCheckResponse.ServingStatus.UNKNOWN; + private final HealthServiceImpl service = (HealthServiceImpl) manager.getHealthService(); + private HealthGrpc.HealthStub stub; + private HealthGrpc.HealthBlockingStub blockingStub; + + @Before + public void setup() { + grpcServerRule.getServiceRegistry().addService(service); + stub = HealthGrpc.newStub(grpcServerRule.getChannel()); + blockingStub = HealthGrpc.newBlockingStub(grpcServerRule.getChannel()); + } + + @After + public void teardown() { + // Health-check streams are usually not closed in the tests. Force closing for clean up. + grpcServerRule.getServer().shutdownNow(); + } @Test public void getHealthService_getterReturnsTheSameHealthRefAfterUpdate() throws Exception { - manager.setStatus("", status); - assertEquals(health, manager.getHealthService()); + BindableService health = manager.getHealthService(); + manager.setStatus(SERVICE1, ServingStatus.UNKNOWN); + assertThat(health).isSameAs(manager.getHealthService()); } @Test public void checkValidStatus() throws Exception { - //setup - manager.setStatus("", status); - HealthCheckRequest request = HealthCheckRequest.newBuilder().setService("").build(); - @SuppressWarnings("unchecked") - StreamObserver observer = mock(StreamObserver.class); + manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING); + manager.setStatus(SERVICE2, ServingStatus.SERVING); + HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(SERVICE1).build(); + HealthCheckResponse response = + blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request); + assertThat(response).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build()); - //test - health.check(request, observer); - - //verify - InOrder inOrder = inOrder(observer); - inOrder.verify(observer, times(1)).onNext(any(HealthCheckResponse.class)); - inOrder.verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Throwable.class)); + request = HealthCheckRequest.newBuilder().setService(SERVICE2).build(); + response = blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request); + assertThat(response).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0); + assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0); } @Test public void checkStatusNotFound() throws Exception { - //setup - manager.setStatus("", status); + manager.setStatus(SERVICE1, ServingStatus.SERVING); + // SERVICE2's status is not set HealthCheckRequest request - = HealthCheckRequest.newBuilder().setService("invalid").build(); - @SuppressWarnings("unchecked") - StreamObserver observer = mock(StreamObserver.class); - - //test - health.check(request, observer); - - //verify - ArgumentCaptor exception = ArgumentCaptor.forClass(StatusException.class); - verify(observer, times(1)).onError(exception.capture()); - assertEquals(Status.Code.NOT_FOUND, exception.getValue().getStatus().getCode()); - - verify(observer, never()).onCompleted(); + = HealthCheckRequest.newBuilder().setService(SERVICE2).build(); + try { + blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request); + fail("Should've failed"); + } catch (StatusRuntimeException e) { + assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND); + } + assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0); } @Test public void notFoundForClearedStatus() throws Exception { - //setup - manager.setStatus("", status); - manager.clearStatus(""); + manager.setStatus(SERVICE1, ServingStatus.SERVING); + manager.clearStatus(SERVICE1); HealthCheckRequest request - = HealthCheckRequest.newBuilder().setService("").build(); - @SuppressWarnings("unchecked") - StreamObserver observer = mock(StreamObserver.class); + = HealthCheckRequest.newBuilder().setService(SERVICE1).build(); + try { + blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request); + fail("Should've failed"); + } catch (StatusRuntimeException e) { + assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND); + } + } - //test - health.check(request, observer); + @Test + public void watch() throws Exception { + manager.setStatus(SERVICE1, ServingStatus.UNKNOWN); - //verify - ArgumentCaptor exception = ArgumentCaptor.forClass(StatusException.class); - verify(observer, times(1)).onError(exception.capture()); - assertEquals(Status.Code.NOT_FOUND, exception.getValue().getStatus().getCode()); + // Start a watch on SERVICE1 + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0); + RespObserver respObs1 = new RespObserver(); + stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1); + // Will get the current status + assertThat(respObs1.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.UNKNOWN).build()); + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1); - verify(observer, never()).onCompleted(); + // Status change is notified of to the RPC + manager.setStatus(SERVICE1, ServingStatus.SERVING); + assertThat(respObs1.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + + // Start another watch on SERVICE1 + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1); + RespObserver respObs1b = new RespObserver(); + stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b); + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2); + // Will get the current status + assertThat(respObs1b.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build()); + + // Start a watch on SERVICE2, which is not known yet + assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0); + RespObserver respObs2 = new RespObserver(); + stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2); + assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1); + // Current status is SERVICE_UNKNOWN + assertThat(respObs2.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build()); + + // Set status for SERVICE2, which will be notified of + manager.setStatus(SERVICE2, ServingStatus.NOT_SERVING); + assertThat(respObs2.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build()); + + // Clear the status for SERVICE1, which will be notified of + manager.clearStatus(SERVICE1); + assertThat(respObs1.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build()); + assertThat(respObs1b.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build()); + + // All responses have been accounted for + assertThat(respObs1.responses).isEmpty(); + assertThat(respObs1b.responses).isEmpty(); + assertThat(respObs2.responses).isEmpty(); + } + + @Test + public void watchRemovedWhenClientCloses() throws Exception { + CancellableContext withCancellation = Context.current().withCancellation(); + Context prevCtx = withCancellation.attach(); + RespObserver respObs1 = new RespObserver(); + try { + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0); + stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1); + } finally { + withCancellation.detach(prevCtx); + } + RespObserver respObs1b = new RespObserver(); + stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b); + RespObserver respObs2 = new RespObserver(); + stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2); + + assertThat(respObs1.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build()); + assertThat(respObs1b.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build()); + assertThat(respObs2.responses.poll()).isEqualTo( + HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build()); + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2); + assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1); + assertThat(respObs1.responses).isEmpty(); + assertThat(respObs1b.responses).isEmpty(); + assertThat(respObs2.responses).isEmpty(); + + // This will cancel the RPC with respObs1 + withCancellation.close(); + + assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class); + assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1); + assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1); + assertThat(respObs1.responses).isEmpty(); + assertThat(respObs1b.responses).isEmpty(); + assertThat(respObs2.responses).isEmpty(); + } + + private static class RespObserver implements StreamObserver { + final ArrayDeque responses = new ArrayDeque(); + + @Override + public void onNext(HealthCheckResponse value) { + responses.add(value); + } + + @Override + public void onError(Throwable t) { + responses.add(t); + } + + @Override + public void onCompleted() { + responses.add("onCompleted"); + } } }