services: implement Health.Watch (#4930)

This commit is contained in:
Kun Zhang 2018-10-12 16:02:25 -07:00 committed by GitHub
parent 0e8cf58d1a
commit 11f9e8d5b4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 277 additions and 72 deletions

View File

@ -16,6 +16,10 @@
package io.grpc.services;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Context.CancellationListener;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.health.v1.HealthCheckRequest;
@ -23,23 +27,35 @@ import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
final class HealthServiceImpl extends HealthGrpc.HealthImplBase {
/* Due to the latency of rpc calls, synchronization of the map does not help with consistency.
* However, need use ConcurrentHashMap to prevent the possible race condition of concurrently
* putting two keys with a colliding hashCode into the map.*/
private final Map<String, ServingStatus> statusMap
= new ConcurrentHashMap<String, ServingStatus>();
// Due to the latency of rpc calls, synchronization of the map does not help with consistency.
// However, need use ConcurrentHashMap to allow concurrent reading by check().
private final ConcurrentHashMap<String, ServingStatus> statusMap =
new ConcurrentHashMap<String, ServingStatus>();
private final Object watchLock = new Object();
// Technically a Multimap<String, StreamObserver<HealthCheckResponse>>. The Boolean value is not
// used. The StreamObservers need to be kept in a identity-equality set, to make sure
// user-defined equals() doesn't confuse our book-keeping of the StreamObservers. Constructing
// such Multimap would require extra lines and the end result is not significally simpler, thus I
// would rather not have the Guava collections dependency.
@GuardedBy("watchLock")
private final HashMap<String, IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>
watchers =
new HashMap<String, IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>>();
@Override
public void check(HealthCheckRequest request,
StreamObserver<HealthCheckResponse> responseObserver) {
ServingStatus status = getStatus(request.getService());
ServingStatus status = statusMap.get(request.getService());
if (status == null) {
responseObserver.onError(new StatusException(
Status.NOT_FOUND.withDescription("unknown service " + request.getService())));
@ -50,16 +66,85 @@ final class HealthServiceImpl extends HealthGrpc.HealthImplBase {
}
}
void setStatus(String service, ServingStatus status) {
statusMap.put(service, status);
@Override
public void watch(HealthCheckRequest request,
final StreamObserver<HealthCheckResponse> responseObserver) {
final String service = request.getService();
synchronized (watchLock) {
ServingStatus status = statusMap.get(service);
responseObserver.onNext(getResponseForWatch(status));
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers == null) {
serviceWatchers = new IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean>();
watchers.put(service, serviceWatchers);
}
serviceWatchers.put(responseObserver, Boolean.TRUE);
}
Context.current().addListener(
new CancellationListener() {
@Override
// Called when the client has closed the stream
public void cancelled(Context context) {
synchronized (watchLock) {
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers != null) {
serviceWatchers.remove(responseObserver);
if (serviceWatchers.isEmpty()) {
watchers.remove(service);
}
}
}
}
},
MoreExecutors.directExecutor());
}
@Nullable
ServingStatus getStatus(String service) {
return statusMap.get(service);
void setStatus(String service, ServingStatus status) {
synchronized (watchLock) {
ServingStatus prevStatus = statusMap.put(service, status);
if (prevStatus != status) {
notifyWatchers(service, status);
}
}
}
void clearStatus(String service) {
statusMap.remove(service);
synchronized (watchLock) {
ServingStatus prevStatus = statusMap.remove(service);
if (prevStatus != null) {
notifyWatchers(service, null);
}
}
}
@VisibleForTesting
int numWatchersForTest(String service) {
synchronized (watchLock) {
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers == null) {
return 0;
}
return serviceWatchers.size();
}
}
@GuardedBy("watchLock")
private void notifyWatchers(String service, @Nullable ServingStatus status) {
HealthCheckResponse response = getResponseForWatch(status);
IdentityHashMap<StreamObserver<HealthCheckResponse>, Boolean> serviceWatchers =
watchers.get(service);
if (serviceWatchers != null) {
for (StreamObserver<HealthCheckResponse> responseObserver : serviceWatchers.keySet()) {
responseObserver.onNext(response);
}
}
}
private static HealthCheckResponse getResponseForWatch(@Nullable ServingStatus recordedStatus) {
return HealthCheckResponse.newBuilder().setStatus(
recordedStatus == null ? ServingStatus.SERVICE_UNKNOWN : recordedStatus).build();
}
}

View File

@ -16,99 +16,219 @@
package io.grpc.services;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import io.grpc.BindableService;
import io.grpc.Context.CancellableContext;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
import io.grpc.health.v1.HealthCheckRequest;
import io.grpc.health.v1.HealthCheckResponse;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.health.v1.HealthGrpc;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcServerRule;
import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
/** Unit tests for {@link HealthStatusManager}. */
/** Tests for {@link HealthStatusManager}. */
@RunWith(JUnit4.class)
public class HealthStatusManagerTest {
private static final String SERVICE1 = "service1";
private static final String SERVICE2 = "service2";
@Rule public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
private final HealthStatusManager manager = new HealthStatusManager();
private final HealthGrpc.HealthImplBase health =
(HealthGrpc.HealthImplBase) manager.getHealthService();
private final HealthCheckResponse.ServingStatus status
= HealthCheckResponse.ServingStatus.UNKNOWN;
private final HealthServiceImpl service = (HealthServiceImpl) manager.getHealthService();
private HealthGrpc.HealthStub stub;
private HealthGrpc.HealthBlockingStub blockingStub;
@Before
public void setup() {
grpcServerRule.getServiceRegistry().addService(service);
stub = HealthGrpc.newStub(grpcServerRule.getChannel());
blockingStub = HealthGrpc.newBlockingStub(grpcServerRule.getChannel());
}
@After
public void teardown() {
// Health-check streams are usually not closed in the tests. Force closing for clean up.
grpcServerRule.getServer().shutdownNow();
}
@Test
public void getHealthService_getterReturnsTheSameHealthRefAfterUpdate() throws Exception {
manager.setStatus("", status);
assertEquals(health, manager.getHealthService());
BindableService health = manager.getHealthService();
manager.setStatus(SERVICE1, ServingStatus.UNKNOWN);
assertThat(health).isSameAs(manager.getHealthService());
}
@Test
public void checkValidStatus() throws Exception {
//setup
manager.setStatus("", status);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService("").build();
@SuppressWarnings("unchecked")
StreamObserver<HealthCheckResponse> observer = mock(StreamObserver.class);
manager.setStatus(SERVICE1, ServingStatus.NOT_SERVING);
manager.setStatus(SERVICE2, ServingStatus.SERVING);
HealthCheckRequest request = HealthCheckRequest.newBuilder().setService(SERVICE1).build();
HealthCheckResponse response =
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
//test
health.check(request, observer);
//verify
InOrder inOrder = inOrder(observer);
inOrder.verify(observer, times(1)).onNext(any(HealthCheckResponse.class));
inOrder.verify(observer, times(1)).onCompleted();
verify(observer, never()).onError(any(Throwable.class));
request = HealthCheckRequest.newBuilder().setService(SERVICE2).build();
response = blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
assertThat(response).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
@Test
public void checkStatusNotFound() throws Exception {
//setup
manager.setStatus("", status);
manager.setStatus(SERVICE1, ServingStatus.SERVING);
// SERVICE2's status is not set
HealthCheckRequest request
= HealthCheckRequest.newBuilder().setService("invalid").build();
@SuppressWarnings("unchecked")
StreamObserver<HealthCheckResponse> observer = mock(StreamObserver.class);
//test
health.check(request, observer);
//verify
ArgumentCaptor<StatusException> exception = ArgumentCaptor.forClass(StatusException.class);
verify(observer, times(1)).onError(exception.capture());
assertEquals(Status.Code.NOT_FOUND, exception.getValue().getStatus().getCode());
verify(observer, never()).onCompleted();
= HealthCheckRequest.newBuilder().setService(SERVICE2).build();
try {
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
fail("Should've failed");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
}
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
}
@Test
public void notFoundForClearedStatus() throws Exception {
//setup
manager.setStatus("", status);
manager.clearStatus("");
manager.setStatus(SERVICE1, ServingStatus.SERVING);
manager.clearStatus(SERVICE1);
HealthCheckRequest request
= HealthCheckRequest.newBuilder().setService("").build();
@SuppressWarnings("unchecked")
StreamObserver<HealthCheckResponse> observer = mock(StreamObserver.class);
= HealthCheckRequest.newBuilder().setService(SERVICE1).build();
try {
blockingStub.withDeadlineAfter(5, TimeUnit.SECONDS).check(request);
fail("Should've failed");
} catch (StatusRuntimeException e) {
assertThat(e.getStatus().getCode()).isEqualTo(Status.Code.NOT_FOUND);
}
}
//test
health.check(request, observer);
@Test
public void watch() throws Exception {
manager.setStatus(SERVICE1, ServingStatus.UNKNOWN);
//verify
ArgumentCaptor<StatusException> exception = ArgumentCaptor.forClass(StatusException.class);
verify(observer, times(1)).onError(exception.capture());
assertEquals(Status.Code.NOT_FOUND, exception.getValue().getStatus().getCode());
// Start a watch on SERVICE1
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
RespObserver respObs1 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
// Will get the current status
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
verify(observer, never()).onCompleted();
// Status change is notified of to the RPC
manager.setStatus(SERVICE1, ServingStatus.SERVING);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
// Start another watch on SERVICE1
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
// Will get the current status
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVING).build());
// Start a watch on SERVICE2, which is not known yet
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(0);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
// Current status is SERVICE_UNKNOWN
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
// Set status for SERVICE2, which will be notified of
manager.setStatus(SERVICE2, ServingStatus.NOT_SERVING);
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.NOT_SERVING).build());
// Clear the status for SERVICE1, which will be notified of
manager.clearStatus(SERVICE1);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
// All responses have been accounted for
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}
@Test
public void watchRemovedWhenClientCloses() throws Exception {
CancellableContext withCancellation = Context.current().withCancellation();
Context prevCtx = withCancellation.attach();
RespObserver respObs1 = new RespObserver();
try {
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(0);
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1);
} finally {
withCancellation.detach(prevCtx);
}
RespObserver respObs1b = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE1).build(), respObs1b);
RespObserver respObs2 = new RespObserver();
stub.watch(HealthCheckRequest.newBuilder().setService(SERVICE2).build(), respObs2);
assertThat(respObs1.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs1b.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(respObs2.responses.poll()).isEqualTo(
HealthCheckResponse.newBuilder().setStatus(ServingStatus.SERVICE_UNKNOWN).build());
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(2);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
// This will cancel the RPC with respObs1
withCancellation.close();
assertThat(respObs1.responses.poll()).isInstanceOf(Throwable.class);
assertThat(service.numWatchersForTest(SERVICE1)).isEqualTo(1);
assertThat(service.numWatchersForTest(SERVICE2)).isEqualTo(1);
assertThat(respObs1.responses).isEmpty();
assertThat(respObs1b.responses).isEmpty();
assertThat(respObs2.responses).isEmpty();
}
private static class RespObserver implements StreamObserver<HealthCheckResponse> {
final ArrayDeque<Object> responses = new ArrayDeque<Object>();
@Override
public void onNext(HealthCheckResponse value) {
responses.add(value);
}
@Override
public void onError(Throwable t) {
responses.add(t);
}
@Override
public void onCompleted() {
responses.add("onCompleted");
}
}
}