From e345273268494b812524fef4e8fc79e15ab98ec9 Mon Sep 17 00:00:00 2001 From: Eric Gribkoff Date: Tue, 28 Feb 2017 20:05:26 -0800 Subject: [PATCH] services: don't update reflection index mid-stream This fix addresses https://github.com/grpc/grpc-java/issues/2689 --- .../service/ProtoReflectionService.java | 223 ++++++++-------- .../service/ProtoReflectionServiceTest.java | 242 +++++++++++------- 2 files changed, 261 insertions(+), 204 deletions(-) diff --git a/services/src/main/java/io/grpc/protobuf/service/ProtoReflectionService.java b/services/src/main/java/io/grpc/protobuf/service/ProtoReflectionService.java index 66113228ca..8806628ab3 100644 --- a/services/src/main/java/io/grpc/protobuf/service/ProtoReflectionService.java +++ b/services/src/main/java/io/grpc/protobuf/service/ProtoReflectionService.java @@ -79,49 +79,104 @@ import javax.annotation.concurrent.GuardedBy; public final class ProtoReflectionService extends ServerReflectionGrpc.ServerReflectionImplBase implements InternalNotifyOnServerBuild { - private volatile ServerReflectionIndex serverReflectionIndex; + private final Object lock = new Object(); + + @GuardedBy("lock") + private ServerReflectionIndex serverReflectionIndex; + + private Server server; private ProtoReflectionService() {} - public static BindableService getInstance() { + public static BindableService newInstance() { return new ProtoReflectionService(); } /** - * Receives a reference to the server at build time. + * Do not use this method. + * + * @deprecated use {@link ProtoReflectionService#newInstance()} instead. */ + @Deprecated + public static BindableService getInstance() { + return newInstance(); + } + + /** Receives a reference to the server at build time. */ @Override public void notifyOnBuild(Server server) { - checkState(serverReflectionIndex == null); - serverReflectionIndex = new ServerReflectionIndex(checkNotNull(server, "server")); + this.server = checkNotNull(server); + } + + /** + * Checks for updates to the server's mutable services and updates the index if any changes are + * detected. A change is any addition or removal in the set of file descriptors attached to the + * mutable services or a change in the service names. + * + * @return The (potentially updated) index. + */ + private ServerReflectionIndex updateIndexIfNecessary() { + synchronized (lock) { + if (serverReflectionIndex == null) { + serverReflectionIndex = + new ServerReflectionIndex(server.getImmutableServices(), server.getMutableServices()); + return serverReflectionIndex; + } + + Set serverFileDescriptors = new HashSet(); + Set serverServiceNames = new HashSet(); + List serverMutableServices = server.getMutableServices(); + for (ServerServiceDefinition mutableService : serverMutableServices) { + io.grpc.ServiceDescriptor serviceDescriptor = mutableService.getServiceDescriptor(); + if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) { + String serviceName = serviceDescriptor.getName(); + FileDescriptor fileDescriptor = + ((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor()) + .getFileDescriptor(); + serverFileDescriptors.add(fileDescriptor); + serverServiceNames.add(serviceName); + } + } + + // Replace the index if the underlying mutable services have changed. Check both the file + // descriptors and the service names, because one file descriptor can define multiple + // services. + FileDescriptorIndex mutableServicesIndex = serverReflectionIndex.getMutableServicesIndex(); + if (!mutableServicesIndex.getServiceFileDescriptors().equals(serverFileDescriptors) + || !mutableServicesIndex.getServiceNames().equals(serverServiceNames)) { + serverReflectionIndex = + new ServerReflectionIndex(server.getImmutableServices(), serverMutableServices); + } + + return serverReflectionIndex; + } } @Override public StreamObserver serverReflectionInfo( final StreamObserver responseObserver) { - - checkState(serverReflectionIndex != null); - serverReflectionIndex.initializeImmutableServicesIndex(); - final ServerCallStreamObserver serverCallStreamObserver = (ServerCallStreamObserver) responseObserver; ProtoReflectionStreamObserver requestObserver = - new ProtoReflectionStreamObserver(serverCallStreamObserver); + new ProtoReflectionStreamObserver(updateIndexIfNecessary(), serverCallStreamObserver); serverCallStreamObserver.setOnReadyHandler(requestObserver); serverCallStreamObserver.disableAutoInboundFlowControl(); serverCallStreamObserver.request(1); return requestObserver; } - private class ProtoReflectionStreamObserver implements Runnable, - StreamObserver { + private static class ProtoReflectionStreamObserver + implements Runnable, StreamObserver { + private final ServerReflectionIndex serverReflectionIndex; private final ServerCallStreamObserver serverCallStreamObserver; private boolean closeAfterSend = false; private ServerReflectionRequest request; ProtoReflectionStreamObserver( + ServerReflectionIndex serverReflectionIndex, ServerCallStreamObserver serverCallStreamObserver) { + this.serverReflectionIndex = serverReflectionIndex; this.serverCallStreamObserver = checkNotNull(serverCallStreamObserver, "observer"); } @@ -141,8 +196,6 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef private void handleReflectionRequest() { if (serverCallStreamObserver.isReady()) { - serverReflectionIndex.updateMutableIndexIfNecessary(); - switch (request.getMessageRequestCase()) { case FILE_BY_FILENAME: getFileByName(request); @@ -250,8 +303,7 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef .build()); } - private void sendErrorResponse( - ServerReflectionRequest request, Status status, String message) { + private void sendErrorResponse(ServerReflectionRequest request, Status status, String message) { ServerReflectionResponse response = ServerReflectionResponse.newBuilder() .setValidHost(request.getHost()) @@ -299,71 +351,27 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef * in the immutable service index are the mutable services checked. */ private static final class ServerReflectionIndex { - private FileDescriptorIndex immutableServicesIndex; - private final Object lock = new Object(); - /** - * Tracks mutable services. Accesses must be synchronized. - */ - @GuardedBy("lock") private FileDescriptorIndex mutableServicesIndex - = new FileDescriptorIndex(Collections.emptyList()); + private final FileDescriptorIndex immutableServicesIndex; + private final FileDescriptorIndex mutableServicesIndex; - private final Server server; - - public ServerReflectionIndex(Server server) { - this.server = server; + public ServerReflectionIndex( + List immutableServices, + List mutableServices) { + immutableServicesIndex = new FileDescriptorIndex(immutableServices); + mutableServicesIndex = new FileDescriptorIndex(mutableServices); } - /** - * When first called, initializes the immutable services index. Subsequent calls have no effect. - * - *

This must be called by the reflection service before returning a new - * {@link ProtoReflectionStreamObserver}. - */ - private synchronized void initializeImmutableServicesIndex() { - if (immutableServicesIndex == null) { - immutableServicesIndex = new FileDescriptorIndex(server.getImmutableServices()); - } - } - - /** - * Checks for updates to the server's mutable services and updates the index if any changes - * are detected. A change is any addition or removal in the set of file descriptors attached to - * the mutable services or a change in the service names. - */ - private void updateMutableIndexIfNecessary() { - Set currentFileDescriptors = new HashSet(); - Set currentServiceNames = new HashSet(); - synchronized (lock) { - List currentMutableServices = server.getMutableServices(); - for (ServerServiceDefinition mutableService : currentMutableServices) { - io.grpc.ServiceDescriptor serviceDescriptor = mutableService.getServiceDescriptor(); - if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) { - String serviceName = serviceDescriptor.getName(); - FileDescriptor fileDescriptor = - ((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor()) - .getFileDescriptor(); - currentFileDescriptors.add(fileDescriptor); - checkState(!currentServiceNames.contains(serviceName), - "Service already defined: %s", serviceName); - currentServiceNames.add(serviceName); - } - } - - // Replace the mutable index if the underlying services have changed. Check both the file - // descriptors and the service names, because one file descriptor can define multiple - // services. - if (!mutableServicesIndex.getServiceFileDescriptors().equals(currentFileDescriptors) - || !mutableServicesIndex.getServiceNames().equals(currentServiceNames)) { - mutableServicesIndex = new FileDescriptorIndex(currentMutableServices); - } - } + private FileDescriptorIndex getMutableServicesIndex() { + return mutableServicesIndex; } private Set getServiceNames() { - Set serviceNames = new HashSet(immutableServicesIndex.getServiceNames()); - synchronized (lock) { - serviceNames.addAll(mutableServicesIndex.getServiceNames()); - } + Set immutableServiceNames = immutableServicesIndex.getServiceNames(); + Set mutableServiceNames = mutableServicesIndex.getServiceNames(); + Set serviceNames = + new HashSet(immutableServiceNames.size() + mutableServiceNames.size()); + serviceNames.addAll(immutableServiceNames); + serviceNames.addAll(mutableServiceNames); return serviceNames; } @@ -371,9 +379,7 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef private FileDescriptor getFileDescriptorByName(String name) { FileDescriptor fd = immutableServicesIndex.getFileDescriptorByName(name); if (fd == null) { - synchronized (lock) { - fd = mutableServicesIndex.getFileDescriptorByName(name); - } + fd = mutableServicesIndex.getFileDescriptorByName(name); } return fd; } @@ -382,21 +388,17 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef private FileDescriptor getFileDescriptorBySymbol(String symbol) { FileDescriptor fd = immutableServicesIndex.getFileDescriptorBySymbol(symbol); if (fd == null) { - synchronized (lock) { - fd = mutableServicesIndex.getFileDescriptorBySymbol(symbol); - } + fd = mutableServicesIndex.getFileDescriptorBySymbol(symbol); } return fd; } @Nullable private FileDescriptor getFileDescriptorByExtensionAndNumber(String type, int extension) { - FileDescriptor fd - = immutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension); + FileDescriptor fd = + immutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension); if (fd == null) { - synchronized (lock) { - fd = mutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension); - } + fd = mutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension); } return fd; } @@ -405,28 +407,26 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef private Set getExtensionNumbersOfType(String type) { Set extensionNumbers = immutableServicesIndex.getExtensionNumbersOfType(type); if (extensionNumbers == null) { - synchronized (lock) { - extensionNumbers = mutableServicesIndex.getExtensionNumbersOfType(type); - } + extensionNumbers = mutableServicesIndex.getExtensionNumbersOfType(type); } return extensionNumbers; } } /** - * Provides a set of methods for answering reflection queries for the file descriptors - * underlying a set of services. Used by {@link ServerReflectionIndex} to separately index - * immutable and mutable services. + * Provides a set of methods for answering reflection queries for the file descriptors underlying + * a set of services. Used by {@link ServerReflectionIndex} to separately index immutable and + * mutable services. */ private static final class FileDescriptorIndex { private final Set serviceNames = new HashSet(); private final Set serviceFileDescriptors = new HashSet(); - private final Map fileDescriptorsByName - = new HashMap(); - private final Map fileDescriptorsBySymbol - = new HashMap(); - private final Map> fileDescriptorsByExtensionAndNumber - = new HashMap>(); + private final Map fileDescriptorsByName = + new HashMap(); + private final Map fileDescriptorsBySymbol = + new HashMap(); + private final Map> fileDescriptorsByExtensionAndNumber = + new HashMap>(); FileDescriptorIndex(List services) { Queue fileDescriptorsToProcess = new LinkedList(); @@ -438,8 +438,8 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef ((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor()) .getFileDescriptor(); String serviceName = serviceDescriptor.getName(); - checkState(!serviceNames.contains(serviceName), - "Service already defined: %s", serviceName); + checkState( + !serviceNames.contains(serviceName), "Service already defined: %s", serviceName); serviceFileDescriptors.add(fileDescriptor); serviceNames.add(serviceName); if (!seenFiles.contains(fileDescriptor.getName())) { @@ -501,8 +501,7 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef private void processFileDescriptor(FileDescriptor fd) { String fdName = fd.getName(); - checkState(!fileDescriptorsByName.containsKey(fdName), - "File name already used: %s", fdName); + checkState(!fileDescriptorsByName.containsKey(fdName), "File name already used: %s", fdName); fileDescriptorsByName.put(fdName, fd); for (ServiceDescriptor service : fd.getServices()) { processService(service, fd); @@ -517,21 +516,25 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef private void processService(ServiceDescriptor service, FileDescriptor fd) { String serviceName = service.getFullName(); - checkState(!fileDescriptorsBySymbol.containsKey(serviceName), - "Service already defined: %s", serviceName); + checkState( + !fileDescriptorsBySymbol.containsKey(serviceName), + "Service already defined: %s", + serviceName); fileDescriptorsBySymbol.put(serviceName, fd); for (MethodDescriptor method : service.getMethods()) { String methodName = method.getFullName(); - checkState(!fileDescriptorsBySymbol.containsKey(methodName), - "Method already defined: %s", methodName); + checkState( + !fileDescriptorsBySymbol.containsKey(methodName), + "Method already defined: %s", + methodName); fileDescriptorsBySymbol.put(methodName, fd); } } private void processType(Descriptor type, FileDescriptor fd) { String typeName = type.getFullName(); - checkState(!fileDescriptorsBySymbol.containsKey(typeName), - "Type already defined: %s", typeName); + checkState( + !fileDescriptorsBySymbol.containsKey(typeName), "Type already defined: %s", typeName); fileDescriptorsBySymbol.put(typeName, fd); for (FieldDescriptor extension : type.getExtensions()) { processExtension(extension, fd); @@ -550,7 +553,9 @@ public final class ProtoReflectionService extends ServerReflectionGrpc.ServerRef } checkState( !fileDescriptorsByExtensionAndNumber.get(extensionName).containsKey(extensionNumber), - "Extension name and number already defined: %s, %s", extensionName, extensionNumber); + "Extension name and number already defined: %s, %s", + extensionName, + extensionNumber); fileDescriptorsByExtensionAndNumber.get(extensionName).put(extensionNumber, fd); } } diff --git a/services/src/test/java/io/grpc/protobuf/service/ProtoReflectionServiceTest.java b/services/src/test/java/io/grpc/protobuf/service/ProtoReflectionServiceTest.java index 5c4e8141bd..e9c702ae2c 100644 --- a/services/src/test/java/io/grpc/protobuf/service/ProtoReflectionServiceTest.java +++ b/services/src/test/java/io/grpc/protobuf/service/ProtoReflectionServiceTest.java @@ -74,9 +74,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link ProtoReflectionService}. - */ +/** Tests for {@link ProtoReflectionService}. */ @RunWith(JUnit4.class) public class ProtoReflectionServiceTest { private static final String TEST_HOST = "localhost"; @@ -92,17 +90,16 @@ public class ProtoReflectionServiceTest { @Before public void setUp() throws Exception { - reflectionService = ProtoReflectionService.getInstance(); - server = InProcessServerBuilder.forName("proto-reflection-test") - .directExecutor() - .addService(reflectionService) - .addService(new ReflectableServiceGrpc.ReflectableServiceImplBase() {}) - .fallbackHandlerRegistry(handlerRegistry) - .build() - .start(); - channel = InProcessChannelBuilder.forName("proto-reflection-test") - .directExecutor() - .build(); + reflectionService = ProtoReflectionService.newInstance(); + server = + InProcessServerBuilder.forName("proto-reflection-test") + .directExecutor() + .addService(reflectionService) + .addService(new ReflectableServiceGrpc.ReflectableServiceImplBase() {}) + .fallbackHandlerRegistry(handlerRegistry) + .build() + .start(); + channel = InProcessChannelBuilder.forName("proto-reflection-test").directExecutor().build(); stub = ServerReflectionGrpc.newStub(channel); } @@ -118,61 +115,61 @@ public class ProtoReflectionServiceTest { @Test public void listServices() throws Exception { - Set originalServices = new HashSet( - Arrays.asList( - ServiceResponse.newBuilder() - .setName("grpc.reflection.v1alpha.ServerReflection") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.ReflectableService") - .build()) - ); + Set originalServices = + new HashSet( + Arrays.asList( + ServiceResponse.newBuilder() + .setName("grpc.reflection.v1alpha.ServerReflection") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.ReflectableService") + .build())); assertServiceResponseEquals(originalServices); handlerRegistry.addService(dynamicService); - assertServiceResponseEquals(new HashSet( - Arrays.asList( - ServiceResponse.newBuilder() - .setName("grpc.reflection.v1alpha.ServerReflection") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.ReflectableService") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.DynamicService") - .build()) - )); + assertServiceResponseEquals( + new HashSet( + Arrays.asList( + ServiceResponse.newBuilder() + .setName("grpc.reflection.v1alpha.ServerReflection") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.ReflectableService") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.DynamicService") + .build()))); handlerRegistry.addService(anotherDynamicService); - assertServiceResponseEquals(new HashSet( - Arrays.asList( - ServiceResponse.newBuilder() - .setName("grpc.reflection.v1alpha.ServerReflection") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.ReflectableService") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.DynamicService") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.AnotherDynamicService") - .build()) - )); + assertServiceResponseEquals( + new HashSet( + Arrays.asList( + ServiceResponse.newBuilder() + .setName("grpc.reflection.v1alpha.ServerReflection") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.ReflectableService") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.DynamicService") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.AnotherDynamicService") + .build()))); handlerRegistry.removeService(dynamicService); - assertServiceResponseEquals(new HashSet( - Arrays.asList( - ServiceResponse.newBuilder() - .setName("grpc.reflection.v1alpha.ServerReflection") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.ReflectableService") - .build(), - ServiceResponse.newBuilder() - .setName("grpc.reflection.testing.AnotherDynamicService") - .build()) - )); + assertServiceResponseEquals( + new HashSet( + Arrays.asList( + ServiceResponse.newBuilder() + .setName("grpc.reflection.v1alpha.ServerReflection") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.ReflectableService") + .build(), + ServiceResponse.newBuilder() + .setName("grpc.reflection.testing.AnotherDynamicService") + .build()))); handlerRegistry.removeService(anotherDynamicService); assertServiceResponseEquals(originalServices); @@ -207,7 +204,7 @@ public class ProtoReflectionServiceTest { } @Test - public void fileByFilenameForMutableServices() throws Exception { + public void fileByFilenameConsistentForMutableServices() throws Exception { ServerReflectionRequest request = ServerReflectionRequest.newBuilder() .setHost(TEST_HOST) @@ -229,13 +226,26 @@ public class ProtoReflectionServiceTest { stub.serverReflectionInfo(responseObserver); handlerRegistry.addService(dynamicService); requestObserver.onNext(request); - handlerRegistry.removeService(dynamicService); - requestObserver.onNext(request); requestObserver.onCompleted(); + StreamRecorder responseObserver2 = StreamRecorder.create(); + StreamObserver requestObserver2 = + stub.serverReflectionInfo(responseObserver2); + handlerRegistry.removeService(dynamicService); + requestObserver2.onNext(request); + requestObserver2.onCompleted(); + StreamRecorder responseObserver3 = StreamRecorder.create(); + StreamObserver requestObserver3 = + stub.serverReflectionInfo(responseObserver3); + requestObserver3.onNext(request); + requestObserver3.onCompleted(); - assertEquals(goldenResponse, responseObserver.getValues().get(0)); - assertEquals(ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, - responseObserver.getValues().get(1).getMessageResponseCase()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver.firstValue().get().getMessageResponseCase()); + assertEquals(goldenResponse, responseObserver2.firstValue().get()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver3.firstValue().get().getMessageResponseCase()); } @Test @@ -251,8 +261,7 @@ public class ProtoReflectionServiceTest { ReflectionTestProto.getDescriptor().toProto().toByteString(), ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(), ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(), - ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString() - ); + ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString()); StreamRecorder responseObserver = StreamRecorder.create(); StreamObserver requestObserver = @@ -261,8 +270,11 @@ public class ProtoReflectionServiceTest { requestObserver.onCompleted(); List response = - responseObserver.firstValue().get() - .getFileDescriptorResponse().getFileDescriptorProtoList(); + responseObserver + .firstValue() + .get() + .getFileDescriptorResponse() + .getFileDescriptorProtoList(); assertEquals(goldenResponse.size(), response.size()); assertEquals(new HashSet(goldenResponse), new HashSet(response)); } @@ -317,13 +329,26 @@ public class ProtoReflectionServiceTest { stub.serverReflectionInfo(responseObserver); handlerRegistry.addService(dynamicService); requestObserver.onNext(request); - handlerRegistry.removeService(dynamicService); - requestObserver.onNext(request); requestObserver.onCompleted(); + StreamRecorder responseObserver2 = StreamRecorder.create(); + StreamObserver requestObserver2 = + stub.serverReflectionInfo(responseObserver2); + handlerRegistry.removeService(dynamicService); + requestObserver2.onNext(request); + requestObserver2.onCompleted(); + StreamRecorder responseObserver3 = StreamRecorder.create(); + StreamObserver requestObserver3 = + stub.serverReflectionInfo(responseObserver3); + requestObserver3.onNext(request); + requestObserver3.onCompleted(); - assertEquals(goldenResponse, responseObserver.getValues().get(0)); - assertEquals(ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, - responseObserver.getValues().get(1).getMessageResponseCase()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver.firstValue().get().getMessageResponseCase()); + assertEquals(goldenResponse, responseObserver2.firstValue().get()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver3.firstValue().get().getMessageResponseCase()); } @Test @@ -343,8 +368,7 @@ public class ProtoReflectionServiceTest { ReflectionTestProto.getDescriptor().toProto().toByteString(), ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(), ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(), - ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString() - ); + ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString()); StreamRecorder responseObserver = StreamRecorder.create(); StreamObserver requestObserver = @@ -353,8 +377,11 @@ public class ProtoReflectionServiceTest { requestObserver.onCompleted(); List response = - responseObserver.firstValue().get() - .getFileDescriptorResponse().getFileDescriptorProtoList(); + responseObserver + .firstValue() + .get() + .getFileDescriptorResponse() + .getFileDescriptorProtoList(); assertEquals(goldenResponse.size(), response.size()); assertEquals(new HashSet(goldenResponse), new HashSet(response)); } @@ -419,13 +446,26 @@ public class ProtoReflectionServiceTest { stub.serverReflectionInfo(responseObserver); handlerRegistry.addService(dynamicService); requestObserver.onNext(request); - handlerRegistry.removeService(dynamicService); - requestObserver.onNext(request); requestObserver.onCompleted(); + StreamRecorder responseObserver2 = StreamRecorder.create(); + StreamObserver requestObserver2 = + stub.serverReflectionInfo(responseObserver2); + handlerRegistry.removeService(dynamicService); + requestObserver2.onNext(request); + requestObserver2.onCompleted(); + StreamRecorder responseObserver3 = StreamRecorder.create(); + StreamObserver requestObserver3 = + stub.serverReflectionInfo(responseObserver3); + requestObserver3.onNext(request); + requestObserver3.onCompleted(); - assertEquals(goldenResponse, responseObserver.getValues().get(0)); - assertEquals(ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, - responseObserver.getValues().get(1).getMessageResponseCase()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver.firstValue().get().getMessageResponseCase()); + assertEquals(goldenResponse, responseObserver2.firstValue().get()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver3.firstValue().get().getMessageResponseCase()); } @Test @@ -477,13 +517,26 @@ public class ProtoReflectionServiceTest { stub.serverReflectionInfo(responseObserver); handlerRegistry.addService(dynamicService); requestObserver.onNext(request); - handlerRegistry.removeService(dynamicService); - requestObserver.onNext(request); requestObserver.onCompleted(); + StreamRecorder responseObserver2 = StreamRecorder.create(); + StreamObserver requestObserver2 = + stub.serverReflectionInfo(responseObserver2); + handlerRegistry.removeService(dynamicService); + requestObserver2.onNext(request); + requestObserver2.onCompleted(); + StreamRecorder responseObserver3 = StreamRecorder.create(); + StreamObserver requestObserver3 = + stub.serverReflectionInfo(responseObserver3); + requestObserver3.onNext(request); + requestObserver3.onCompleted(); - assertEquals(goldenResponse, responseObserver.getValues().get(0)); - assertEquals(ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, - responseObserver.getValues().get(1).getMessageResponseCase()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver.firstValue().get().getMessageResponseCase()); + assertEquals(goldenResponse, responseObserver2.firstValue().get()); + assertEquals( + ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE, + responseObserver3.firstValue().get().getMessageResponseCase()); } @Test @@ -548,15 +601,14 @@ public class ProtoReflectionServiceTest { .build()) .build(); - private static class FlowControlClientResponseObserver implements - ClientResponseObserver { + private static class FlowControlClientResponseObserver + implements ClientResponseObserver { private final List responses = new ArrayList(); private boolean onCompleteCalled = false; @Override - public void beforeStart( - final ClientCallStreamObserver requestStream) { + public void beforeStart(final ClientCallStreamObserver requestStream) { requestStream.disableAutoInboundFlowControl(); }