V1reflectionservice (#11237)

V1 version of the proto reflection service, as the v1.alpha service has been deprecated.

* Create V1 alpha service wrapping underlying V1 service, by modifying the ServerServiceDefinition.

* Create ProtoReflectionService for the v1alpha proto by producing a ServerServiceDefinition constructed from that of the v1 service but with the service and method names and proto descriptors modified.

Issue #6724.
This commit is contained in:
Kannan J 2024-07-19 19:27:13 +05:30 committed by GitHub
parent 704123ed65
commit 0aa976c4eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1700 additions and 498 deletions

View File

@ -44,6 +44,7 @@ import io.grpc.Server;
import io.grpc.Status;
import io.grpc.gcp.csm.observability.CsmObservability;
import io.grpc.protobuf.services.ProtoReflectionService;
import io.grpc.protobuf.services.ProtoReflectionServiceV1;
import io.grpc.services.AdminInterface;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.ClientConfigureRequest;
@ -277,6 +278,7 @@ public final class XdsTestClient {
.addService(new XdsStatsImpl())
.addService(new ConfigureUpdateServiceImpl())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
try {

View File

@ -37,6 +37,7 @@ import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.protobuf.services.HealthStatusManager;
import io.grpc.protobuf.services.ProtoReflectionService;
import io.grpc.protobuf.services.ProtoReflectionServiceV1;
import io.grpc.services.AdminInterface;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.integration.Messages.Payload;
@ -220,6 +221,7 @@ public final class XdsTestServer {
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
maintenanceServer.start();
@ -268,6 +270,7 @@ public final class XdsTestServer {
.addService(new XdsUpdateHealthServiceImpl(health))
.addService(health.getHealthService())
.addService(ProtoReflectionService.newInstance())
.addService(ProtoReflectionServiceV1.newInstance())
.addServices(AdminInterface.getStandardServices())
.build();
server.start();

View File

@ -108,14 +108,17 @@ java_library(
name = "reflection",
srcs = [
"src/main/java/io/grpc/protobuf/services/ProtoReflectionService.java",
"src/main/java/io/grpc/protobuf/services/ProtoReflectionServiceV1.java",
],
deps = [
":_reflection_java_grpc",
":_reflection_v1_java_grpc",
"//api",
"//protobuf",
"//stub",
"@com_google_protobuf//:protobuf_java",
"@com_google_protobuf//:protobuf_java_util",
"@io_grpc_grpc_proto//:reflection_java_proto",
"@io_grpc_grpc_proto//:reflection_java_proto_deprecated",
artifact("com.google.code.findbugs:jsr305"),
artifact("com.google.guava:guava"),
@ -171,6 +174,13 @@ java_grpc_library(
deps = ["@io_grpc_grpc_proto//:reflection_java_proto_deprecated"],
)
java_grpc_library(
name = "_reflection_v1_java_grpc",
srcs = ["@io_grpc_grpc_proto//:reflection_proto"],
visibility = ["//visibility:private"],
deps = ["@io_grpc_grpc_proto//:reflection_java_proto"],
)
java_grpc_library(
name = "_channelz_java_grpc",
srcs = ["@io_grpc_grpc_proto//:channelz_proto"],

View File

@ -0,0 +1,285 @@
package io.grpc.reflection.v1;
import static io.grpc.MethodDescriptor.generateFullMethodName;
/**
*/
@javax.annotation.Generated(
value = "by gRPC proto compiler",
comments = "Source: grpc/reflection/v1/reflection.proto")
@io.grpc.stub.annotations.GrpcGenerated
public final class ServerReflectionGrpc {
private ServerReflectionGrpc() {}
public static final java.lang.String SERVICE_NAME = "grpc.reflection.v1.ServerReflection";
// Static method descriptors that strictly reflect the proto.
private static volatile io.grpc.MethodDescriptor<io.grpc.reflection.v1.ServerReflectionRequest,
io.grpc.reflection.v1.ServerReflectionResponse> getServerReflectionInfoMethod;
@io.grpc.stub.annotations.RpcMethod(
fullMethodName = SERVICE_NAME + '/' + "ServerReflectionInfo",
requestType = io.grpc.reflection.v1.ServerReflectionRequest.class,
responseType = io.grpc.reflection.v1.ServerReflectionResponse.class,
methodType = io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
public static io.grpc.MethodDescriptor<io.grpc.reflection.v1.ServerReflectionRequest,
io.grpc.reflection.v1.ServerReflectionResponse> getServerReflectionInfoMethod() {
io.grpc.MethodDescriptor<io.grpc.reflection.v1.ServerReflectionRequest, io.grpc.reflection.v1.ServerReflectionResponse> getServerReflectionInfoMethod;
if ((getServerReflectionInfoMethod = ServerReflectionGrpc.getServerReflectionInfoMethod) == null) {
synchronized (ServerReflectionGrpc.class) {
if ((getServerReflectionInfoMethod = ServerReflectionGrpc.getServerReflectionInfoMethod) == null) {
ServerReflectionGrpc.getServerReflectionInfoMethod = getServerReflectionInfoMethod =
io.grpc.MethodDescriptor.<io.grpc.reflection.v1.ServerReflectionRequest, io.grpc.reflection.v1.ServerReflectionResponse>newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.BIDI_STREAMING)
.setFullMethodName(generateFullMethodName(SERVICE_NAME, "ServerReflectionInfo"))
.setSampledToLocalTracing(true)
.setRequestMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.reflection.v1.ServerReflectionRequest.getDefaultInstance()))
.setResponseMarshaller(io.grpc.protobuf.ProtoUtils.marshaller(
io.grpc.reflection.v1.ServerReflectionResponse.getDefaultInstance()))
.setSchemaDescriptor(new ServerReflectionMethodDescriptorSupplier("ServerReflectionInfo"))
.build();
}
}
}
return getServerReflectionInfoMethod;
}
/**
* Creates a new async stub that supports all call types for the service
*/
public static ServerReflectionStub newStub(io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<ServerReflectionStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<ServerReflectionStub>() {
@java.lang.Override
public ServerReflectionStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ServerReflectionStub(channel, callOptions);
}
};
return ServerReflectionStub.newStub(factory, channel);
}
/**
* Creates a new blocking-style stub that supports unary and streaming output calls on the service
*/
public static ServerReflectionBlockingStub newBlockingStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<ServerReflectionBlockingStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<ServerReflectionBlockingStub>() {
@java.lang.Override
public ServerReflectionBlockingStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ServerReflectionBlockingStub(channel, callOptions);
}
};
return ServerReflectionBlockingStub.newStub(factory, channel);
}
/**
* Creates a new ListenableFuture-style stub that supports unary calls on the service
*/
public static ServerReflectionFutureStub newFutureStub(
io.grpc.Channel channel) {
io.grpc.stub.AbstractStub.StubFactory<ServerReflectionFutureStub> factory =
new io.grpc.stub.AbstractStub.StubFactory<ServerReflectionFutureStub>() {
@java.lang.Override
public ServerReflectionFutureStub newStub(io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ServerReflectionFutureStub(channel, callOptions);
}
};
return ServerReflectionFutureStub.newStub(factory, channel);
}
/**
*/
public interface AsyncService {
/**
* <pre>
* The reflection service is structured as a bidirectional stream, ensuring
* all related requests go to a single server.
* </pre>
*/
default io.grpc.stub.StreamObserver<io.grpc.reflection.v1.ServerReflectionRequest> serverReflectionInfo(
io.grpc.stub.StreamObserver<io.grpc.reflection.v1.ServerReflectionResponse> responseObserver) {
return io.grpc.stub.ServerCalls.asyncUnimplementedStreamingCall(getServerReflectionInfoMethod(), responseObserver);
}
}
/**
* Base class for the server implementation of the service ServerReflection.
*/
public static abstract class ServerReflectionImplBase
implements io.grpc.BindableService, AsyncService {
@java.lang.Override public final io.grpc.ServerServiceDefinition bindService() {
return ServerReflectionGrpc.bindService(this);
}
}
/**
* A stub to allow clients to do asynchronous rpc calls to service ServerReflection.
*/
public static final class ServerReflectionStub
extends io.grpc.stub.AbstractAsyncStub<ServerReflectionStub> {
private ServerReflectionStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected ServerReflectionStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ServerReflectionStub(channel, callOptions);
}
/**
* <pre>
* The reflection service is structured as a bidirectional stream, ensuring
* all related requests go to a single server.
* </pre>
*/
public io.grpc.stub.StreamObserver<io.grpc.reflection.v1.ServerReflectionRequest> serverReflectionInfo(
io.grpc.stub.StreamObserver<io.grpc.reflection.v1.ServerReflectionResponse> responseObserver) {
return io.grpc.stub.ClientCalls.asyncBidiStreamingCall(
getChannel().newCall(getServerReflectionInfoMethod(), getCallOptions()), responseObserver);
}
}
/**
* A stub to allow clients to do synchronous rpc calls to service ServerReflection.
*/
public static final class ServerReflectionBlockingStub
extends io.grpc.stub.AbstractBlockingStub<ServerReflectionBlockingStub> {
private ServerReflectionBlockingStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected ServerReflectionBlockingStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ServerReflectionBlockingStub(channel, callOptions);
}
}
/**
* A stub to allow clients to do ListenableFuture-style rpc calls to service ServerReflection.
*/
public static final class ServerReflectionFutureStub
extends io.grpc.stub.AbstractFutureStub<ServerReflectionFutureStub> {
private ServerReflectionFutureStub(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
super(channel, callOptions);
}
@java.lang.Override
protected ServerReflectionFutureStub build(
io.grpc.Channel channel, io.grpc.CallOptions callOptions) {
return new ServerReflectionFutureStub(channel, callOptions);
}
}
private static final int METHODID_SERVER_REFLECTION_INFO = 0;
private static final class MethodHandlers<Req, Resp> implements
io.grpc.stub.ServerCalls.UnaryMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ServerStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.ClientStreamingMethod<Req, Resp>,
io.grpc.stub.ServerCalls.BidiStreamingMethod<Req, Resp> {
private final AsyncService serviceImpl;
private final int methodId;
MethodHandlers(AsyncService serviceImpl, int methodId) {
this.serviceImpl = serviceImpl;
this.methodId = methodId;
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public void invoke(Req request, io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
default:
throw new AssertionError();
}
}
@java.lang.Override
@java.lang.SuppressWarnings("unchecked")
public io.grpc.stub.StreamObserver<Req> invoke(
io.grpc.stub.StreamObserver<Resp> responseObserver) {
switch (methodId) {
case METHODID_SERVER_REFLECTION_INFO:
return (io.grpc.stub.StreamObserver<Req>) serviceImpl.serverReflectionInfo(
(io.grpc.stub.StreamObserver<io.grpc.reflection.v1.ServerReflectionResponse>) responseObserver);
default:
throw new AssertionError();
}
}
}
public static final io.grpc.ServerServiceDefinition bindService(AsyncService service) {
return io.grpc.ServerServiceDefinition.builder(getServiceDescriptor())
.addMethod(
getServerReflectionInfoMethod(),
io.grpc.stub.ServerCalls.asyncBidiStreamingCall(
new MethodHandlers<
io.grpc.reflection.v1.ServerReflectionRequest,
io.grpc.reflection.v1.ServerReflectionResponse>(
service, METHODID_SERVER_REFLECTION_INFO)))
.build();
}
private static abstract class ServerReflectionBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoFileDescriptorSupplier, io.grpc.protobuf.ProtoServiceDescriptorSupplier {
ServerReflectionBaseDescriptorSupplier() {}
@java.lang.Override
public com.google.protobuf.Descriptors.FileDescriptor getFileDescriptor() {
return io.grpc.reflection.v1.ServerReflectionProto.getDescriptor();
}
@java.lang.Override
public com.google.protobuf.Descriptors.ServiceDescriptor getServiceDescriptor() {
return getFileDescriptor().findServiceByName("ServerReflection");
}
}
private static final class ServerReflectionFileDescriptorSupplier
extends ServerReflectionBaseDescriptorSupplier {
ServerReflectionFileDescriptorSupplier() {}
}
private static final class ServerReflectionMethodDescriptorSupplier
extends ServerReflectionBaseDescriptorSupplier
implements io.grpc.protobuf.ProtoMethodDescriptorSupplier {
private final java.lang.String methodName;
ServerReflectionMethodDescriptorSupplier(java.lang.String methodName) {
this.methodName = methodName;
}
@java.lang.Override
public com.google.protobuf.Descriptors.MethodDescriptor getMethodDescriptor() {
return getServiceDescriptor().findMethodByName(methodName);
}
}
private static volatile io.grpc.ServiceDescriptor serviceDescriptor;
public static io.grpc.ServiceDescriptor getServiceDescriptor() {
io.grpc.ServiceDescriptor result = serviceDescriptor;
if (result == null) {
synchronized (ServerReflectionGrpc.class) {
result = serviceDescriptor;
if (result == null) {
serviceDescriptor = result = io.grpc.ServiceDescriptor.newBuilder(SERVICE_NAME)
.setSchemaDescriptor(new ServerReflectionFileDescriptorSupplier())
.addMethod(getServerReflectionInfoMethod())
.build();
}
}
}
return result;
}
}

View File

@ -16,524 +16,70 @@
package io.grpc.protobuf.services;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Descriptors.ServiceDescriptor;
import io.grpc.BindableService;
import io.grpc.ExperimentalApi;
import io.grpc.InternalServer;
import io.grpc.Server;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCallHandler;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.protobuf.ProtoFileDescriptorSupplier;
import io.grpc.reflection.v1alpha.ErrorResponse;
import io.grpc.reflection.v1alpha.ExtensionNumberResponse;
import io.grpc.reflection.v1alpha.ExtensionRequest;
import io.grpc.reflection.v1alpha.FileDescriptorResponse;
import io.grpc.reflection.v1alpha.ListServiceResponse;
import io.grpc.reflection.v1alpha.ServerReflectionGrpc;
import io.grpc.reflection.v1alpha.ServerReflectionRequest;
import io.grpc.reflection.v1alpha.ServerReflectionResponse;
import io.grpc.reflection.v1alpha.ServiceResponse;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import io.grpc.ServiceDescriptor;
import io.grpc.reflection.v1.ServerReflectionGrpc;
import io.grpc.reflection.v1.ServerReflectionRequest;
import io.grpc.reflection.v1.ServerReflectionResponse;
/**
* Provides a reflection service for Protobuf services (including the reflection service itself).
* Uses the deprecated v1alpha proto. New users should use ProtoReflectionServiceV1 instead.
*
* <p>Separately tracks mutable and immutable services. Throws an exception if either group of
* services contains multiple Protobuf files with declarations of the same service, method, type, or
* extension.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2222")
public final class ProtoReflectionService extends ServerReflectionGrpc.ServerReflectionImplBase {
public final class ProtoReflectionService implements BindableService {
private final Object lock = new Object();
private ProtoReflectionService() {
}
@GuardedBy("lock")
private final Map<Server, ServerReflectionIndex> serverReflectionIndexes = new WeakHashMap<>();
private ProtoReflectionService() {}
/**
* Creates a instance of {@link ProtoReflectionService}.
*/
public static BindableService newInstance() {
return new ProtoReflectionService();
}
/**
* Retrieves the index for services of the server that dispatches the current call. Computes
* one if not exist. The index is updated if any changes to the server's mutable services are
* detected. A change is any addition or removal in the set of file descriptors attached to the
* mutable services or a change in the service names.
*/
private ServerReflectionIndex getRefreshedIndex() {
synchronized (lock) {
Server server = InternalServer.SERVER_CONTEXT_KEY.get();
ServerReflectionIndex index = serverReflectionIndexes.get(server);
if (index == null) {
index =
new ServerReflectionIndex(server.getImmutableServices(), server.getMutableServices());
serverReflectionIndexes.put(server, index);
return index;
}
Set<FileDescriptor> serverFileDescriptors = new HashSet<>();
Set<String> serverServiceNames = new HashSet<>();
List<ServerServiceDefinition> serverMutableServices = server.getMutableServices();
for (ServerServiceDefinition mutableService : serverMutableServices) {
io.grpc.ServiceDescriptor serviceDescriptor = mutableService.getServiceDescriptor();
if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) {
String serviceName = serviceDescriptor.getName();
FileDescriptor fileDescriptor =
((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor())
.getFileDescriptor();
serverFileDescriptors.add(fileDescriptor);
serverServiceNames.add(serviceName);
}
}
// Replace the index if the underlying mutable services have changed. Check both the file
// descriptors and the service names, because one file descriptor can define multiple
// services.
FileDescriptorIndex mutableServicesIndex = index.getMutableServicesIndex();
if (!mutableServicesIndex.getServiceFileDescriptors().equals(serverFileDescriptors)
|| !mutableServicesIndex.getServiceNames().equals(serverServiceNames)) {
index =
new ServerReflectionIndex(server.getImmutableServices(), serverMutableServices);
serverReflectionIndexes.put(server, index);
}
return index;
}
}
@Override
public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
final StreamObserver<ServerReflectionResponse> responseObserver) {
final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver =
(ServerCallStreamObserver<ServerReflectionResponse>) responseObserver;
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(getRefreshedIndex(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
serverCallStreamObserver.disableAutoRequest();
serverCallStreamObserver.request(1);
return requestObserver;
public ServerServiceDefinition bindService() {
ServerServiceDefinition serverServiceDefinitionV1 = ProtoReflectionServiceV1.newInstance()
.bindService();
MethodDescriptor<ServerReflectionRequest, ServerReflectionResponse> methodDescriptorV1 =
ServerReflectionGrpc.getServerReflectionInfoMethod();
// Retain the v1 proto marshallers but change the method name and schema descriptor to v1alpha.
MethodDescriptor<io.grpc.reflection.v1alpha.ServerReflectionRequest,
io.grpc.reflection.v1alpha.ServerReflectionResponse> methodDescriptorV1AlphaGenerated =
io.grpc.reflection.v1alpha.ServerReflectionGrpc.getServerReflectionInfoMethod();
MethodDescriptor<ServerReflectionRequest, ServerReflectionResponse> methodDescriptorV1Alpha =
methodDescriptorV1.toBuilder()
.setFullMethodName(methodDescriptorV1AlphaGenerated.getFullMethodName())
.setSchemaDescriptor(methodDescriptorV1AlphaGenerated.getSchemaDescriptor())
.build();
// Retain the v1 server call handler but change the service name schema descriptor in the
// service descriptor to v1alpha.
ServiceDescriptor serviceDescriptorV1AlphaGenerated =
io.grpc.reflection.v1alpha.ServerReflectionGrpc.getServiceDescriptor();
ServiceDescriptor serviceDescriptorV1Alpha =
ServiceDescriptor.newBuilder(serviceDescriptorV1AlphaGenerated.getName())
.setSchemaDescriptor(serviceDescriptorV1AlphaGenerated.getSchemaDescriptor())
.addMethod(methodDescriptorV1Alpha)
.build();
return ServerServiceDefinition.builder(serviceDescriptorV1Alpha)
.addMethod(methodDescriptorV1Alpha, createServerCallHandler(serverServiceDefinitionV1))
.build();
}
private static class ProtoReflectionStreamObserver
implements Runnable, StreamObserver<ServerReflectionRequest> {
private final ServerReflectionIndex serverReflectionIndex;
private final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver;
private boolean closeAfterSend = false;
private ServerReflectionRequest request;
ProtoReflectionStreamObserver(
ServerReflectionIndex serverReflectionIndex,
ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver) {
this.serverReflectionIndex = serverReflectionIndex;
this.serverCallStreamObserver = checkNotNull(serverCallStreamObserver, "observer");
}
@Override
public void run() {
if (request != null) {
handleReflectionRequest();
}
}
@Override
public void onNext(ServerReflectionRequest request) {
checkState(this.request == null);
this.request = checkNotNull(request);
handleReflectionRequest();
}
private void handleReflectionRequest() {
if (serverCallStreamObserver.isReady()) {
switch (request.getMessageRequestCase()) {
case FILE_BY_FILENAME:
getFileByName(request);
break;
case FILE_CONTAINING_SYMBOL:
getFileContainingSymbol(request);
break;
case FILE_CONTAINING_EXTENSION:
getFileByExtension(request);
break;
case ALL_EXTENSION_NUMBERS_OF_TYPE:
getAllExtensions(request);
break;
case LIST_SERVICES:
listServices(request);
break;
default:
sendErrorResponse(
request,
Status.Code.UNIMPLEMENTED,
"not implemented " + request.getMessageRequestCase());
}
request = null;
if (closeAfterSend) {
serverCallStreamObserver.onCompleted();
} else {
serverCallStreamObserver.request(1);
}
}
}
@Override
public void onCompleted() {
if (request != null) {
closeAfterSend = true;
} else {
serverCallStreamObserver.onCompleted();
}
}
@Override
public void onError(Throwable cause) {
serverCallStreamObserver.onError(cause);
}
private void getFileByName(ServerReflectionRequest request) {
String name = request.getFileByFilename();
FileDescriptor fd = serverReflectionIndex.getFileDescriptorByName(name);
if (fd != null) {
serverCallStreamObserver.onNext(createServerReflectionResponse(request, fd));
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "File not found.");
}
}
private void getFileContainingSymbol(ServerReflectionRequest request) {
String symbol = request.getFileContainingSymbol();
FileDescriptor fd = serverReflectionIndex.getFileDescriptorBySymbol(symbol);
if (fd != null) {
serverCallStreamObserver.onNext(createServerReflectionResponse(request, fd));
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "Symbol not found.");
}
}
private void getFileByExtension(ServerReflectionRequest request) {
ExtensionRequest extensionRequest = request.getFileContainingExtension();
String type = extensionRequest.getContainingType();
int extension = extensionRequest.getExtensionNumber();
FileDescriptor fd =
serverReflectionIndex.getFileDescriptorByExtensionAndNumber(type, extension);
if (fd != null) {
serverCallStreamObserver.onNext(createServerReflectionResponse(request, fd));
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "Extension not found.");
}
}
private void getAllExtensions(ServerReflectionRequest request) {
String type = request.getAllExtensionNumbersOfType();
Set<Integer> extensions = serverReflectionIndex.getExtensionNumbersOfType(type);
if (extensions != null) {
ExtensionNumberResponse.Builder builder =
ExtensionNumberResponse.newBuilder()
.setBaseTypeName(type)
.addAllExtensionNumber(extensions);
serverCallStreamObserver.onNext(
ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setAllExtensionNumbersResponse(builder)
.build());
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "Type not found.");
}
}
private void listServices(ServerReflectionRequest request) {
ListServiceResponse.Builder builder = ListServiceResponse.newBuilder();
for (String serviceName : serverReflectionIndex.getServiceNames()) {
builder.addService(ServiceResponse.newBuilder().setName(serviceName));
}
serverCallStreamObserver.onNext(
ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setListServicesResponse(builder)
.build());
}
private void sendErrorResponse(
ServerReflectionRequest request, Status.Code code, String message) {
ServerReflectionResponse response =
ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setErrorResponse(
ErrorResponse.newBuilder()
.setErrorCode(code.value())
.setErrorMessage(message))
.build();
serverCallStreamObserver.onNext(response);
}
private ServerReflectionResponse createServerReflectionResponse(
ServerReflectionRequest request, FileDescriptor fd) {
FileDescriptorResponse.Builder fdRBuilder = FileDescriptorResponse.newBuilder();
Set<String> seenFiles = new HashSet<>();
Queue<FileDescriptor> frontier = new ArrayDeque<>();
seenFiles.add(fd.getName());
frontier.add(fd);
while (!frontier.isEmpty()) {
FileDescriptor nextFd = frontier.remove();
fdRBuilder.addFileDescriptorProto(nextFd.toProto().toByteString());
for (FileDescriptor dependencyFd : nextFd.getDependencies()) {
if (!seenFiles.contains(dependencyFd.getName())) {
seenFiles.add(dependencyFd.getName());
frontier.add(dependencyFd);
}
}
}
return ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setFileDescriptorResponse(fdRBuilder)
.build();
}
}
/**
* Indexes the server's services and allows lookups of file descriptors by filename, symbol, type,
* and extension number.
*
* <p>Internally, this stores separate indices for the immutable and mutable services. When
* queried, the immutable service index is checked for a matching value. Only if there is no match
* in the immutable service index are the mutable services checked.
*/
private static final class ServerReflectionIndex {
private final FileDescriptorIndex immutableServicesIndex;
private final FileDescriptorIndex mutableServicesIndex;
public ServerReflectionIndex(
List<ServerServiceDefinition> immutableServices,
List<ServerServiceDefinition> mutableServices) {
immutableServicesIndex = new FileDescriptorIndex(immutableServices);
mutableServicesIndex = new FileDescriptorIndex(mutableServices);
}
private FileDescriptorIndex getMutableServicesIndex() {
return mutableServicesIndex;
}
private Set<String> getServiceNames() {
Set<String> immutableServiceNames = immutableServicesIndex.getServiceNames();
Set<String> mutableServiceNames = mutableServicesIndex.getServiceNames();
Set<String> serviceNames =
new HashSet<>(immutableServiceNames.size() + mutableServiceNames.size());
serviceNames.addAll(immutableServiceNames);
serviceNames.addAll(mutableServiceNames);
return serviceNames;
}
@Nullable
private FileDescriptor getFileDescriptorByName(String name) {
FileDescriptor fd = immutableServicesIndex.getFileDescriptorByName(name);
if (fd == null) {
fd = mutableServicesIndex.getFileDescriptorByName(name);
}
return fd;
}
@Nullable
private FileDescriptor getFileDescriptorBySymbol(String symbol) {
FileDescriptor fd = immutableServicesIndex.getFileDescriptorBySymbol(symbol);
if (fd == null) {
fd = mutableServicesIndex.getFileDescriptorBySymbol(symbol);
}
return fd;
}
@Nullable
private FileDescriptor getFileDescriptorByExtensionAndNumber(String type, int extension) {
FileDescriptor fd =
immutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
if (fd == null) {
fd = mutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
}
return fd;
}
@Nullable
private Set<Integer> getExtensionNumbersOfType(String type) {
Set<Integer> extensionNumbers = immutableServicesIndex.getExtensionNumbersOfType(type);
if (extensionNumbers == null) {
extensionNumbers = mutableServicesIndex.getExtensionNumbersOfType(type);
}
return extensionNumbers;
}
}
/**
* Provides a set of methods for answering reflection queries for the file descriptors underlying
* a set of services. Used by {@link ServerReflectionIndex} to separately index immutable and
* mutable services.
*/
private static final class FileDescriptorIndex {
private final Set<String> serviceNames = new HashSet<>();
private final Set<FileDescriptor> serviceFileDescriptors = new HashSet<>();
private final Map<String, FileDescriptor> fileDescriptorsByName =
new HashMap<>();
private final Map<String, FileDescriptor> fileDescriptorsBySymbol =
new HashMap<>();
private final Map<String, Map<Integer, FileDescriptor>> fileDescriptorsByExtensionAndNumber =
new HashMap<>();
FileDescriptorIndex(List<ServerServiceDefinition> services) {
Queue<FileDescriptor> fileDescriptorsToProcess = new ArrayDeque<>();
Set<String> seenFiles = new HashSet<>();
for (ServerServiceDefinition service : services) {
io.grpc.ServiceDescriptor serviceDescriptor = service.getServiceDescriptor();
if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) {
FileDescriptor fileDescriptor =
((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor())
.getFileDescriptor();
String serviceName = serviceDescriptor.getName();
checkState(
!serviceNames.contains(serviceName), "Service already defined: %s", serviceName);
serviceFileDescriptors.add(fileDescriptor);
serviceNames.add(serviceName);
if (!seenFiles.contains(fileDescriptor.getName())) {
seenFiles.add(fileDescriptor.getName());
fileDescriptorsToProcess.add(fileDescriptor);
}
}
}
while (!fileDescriptorsToProcess.isEmpty()) {
FileDescriptor currentFd = fileDescriptorsToProcess.remove();
processFileDescriptor(currentFd);
for (FileDescriptor dependencyFd : currentFd.getDependencies()) {
if (!seenFiles.contains(dependencyFd.getName())) {
seenFiles.add(dependencyFd.getName());
fileDescriptorsToProcess.add(dependencyFd);
}
}
}
}
/**
* Returns the file descriptors for the indexed services, but not their dependencies. This is
* used to check if the server's mutable services have changed.
*/
private Set<FileDescriptor> getServiceFileDescriptors() {
return Collections.unmodifiableSet(serviceFileDescriptors);
}
private Set<String> getServiceNames() {
return Collections.unmodifiableSet(serviceNames);
}
@Nullable
private FileDescriptor getFileDescriptorByName(String name) {
return fileDescriptorsByName.get(name);
}
@Nullable
private FileDescriptor getFileDescriptorBySymbol(String symbol) {
return fileDescriptorsBySymbol.get(symbol);
}
@Nullable
private FileDescriptor getFileDescriptorByExtensionAndNumber(String type, int number) {
if (fileDescriptorsByExtensionAndNumber.containsKey(type)) {
return fileDescriptorsByExtensionAndNumber.get(type).get(number);
}
return null;
}
@Nullable
private Set<Integer> getExtensionNumbersOfType(String type) {
if (fileDescriptorsByExtensionAndNumber.containsKey(type)) {
return Collections.unmodifiableSet(fileDescriptorsByExtensionAndNumber.get(type).keySet());
}
return null;
}
private void processFileDescriptor(FileDescriptor fd) {
String fdName = fd.getName();
checkState(!fileDescriptorsByName.containsKey(fdName), "File name already used: %s", fdName);
fileDescriptorsByName.put(fdName, fd);
for (ServiceDescriptor service : fd.getServices()) {
processService(service, fd);
}
for (Descriptor type : fd.getMessageTypes()) {
processType(type, fd);
}
for (FieldDescriptor extension : fd.getExtensions()) {
processExtension(extension, fd);
}
}
private void processService(ServiceDescriptor service, FileDescriptor fd) {
String serviceName = service.getFullName();
checkState(
!fileDescriptorsBySymbol.containsKey(serviceName),
"Service already defined: %s",
serviceName);
fileDescriptorsBySymbol.put(serviceName, fd);
for (MethodDescriptor method : service.getMethods()) {
String methodName = method.getFullName();
checkState(
!fileDescriptorsBySymbol.containsKey(methodName),
"Method already defined: %s",
methodName);
fileDescriptorsBySymbol.put(methodName, fd);
}
}
private void processType(Descriptor type, FileDescriptor fd) {
String typeName = type.getFullName();
checkState(
!fileDescriptorsBySymbol.containsKey(typeName), "Type already defined: %s", typeName);
fileDescriptorsBySymbol.put(typeName, fd);
for (FieldDescriptor extension : type.getExtensions()) {
processExtension(extension, fd);
}
for (Descriptor nestedType : type.getNestedTypes()) {
processType(nestedType, fd);
}
}
private void processExtension(FieldDescriptor extension, FileDescriptor fd) {
String extensionName = extension.getContainingType().getFullName();
int extensionNumber = extension.getNumber();
if (!fileDescriptorsByExtensionAndNumber.containsKey(extensionName)) {
fileDescriptorsByExtensionAndNumber.put(
extensionName, new HashMap<Integer, FileDescriptor>());
}
checkState(
!fileDescriptorsByExtensionAndNumber.get(extensionName).containsKey(extensionNumber),
"Extension name and number already defined: %s, %s",
extensionName,
extensionNumber);
fileDescriptorsByExtensionAndNumber.get(extensionName).put(extensionNumber, fd);
}
@SuppressWarnings("unchecked")
private ServerCallHandler<ServerReflectionRequest, ServerReflectionResponse>
createServerCallHandler(
ServerServiceDefinition serverServiceDefinition) {
return (ServerCallHandler<ServerReflectionRequest, ServerReflectionResponse>)
serverServiceDefinition.getMethod(
ServerReflectionGrpc.getServerReflectionInfoMethod().getFullMethodName())
.getServerCallHandler();
}
}

View File

@ -0,0 +1,539 @@
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.protobuf.services;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import com.google.protobuf.Descriptors.FileDescriptor;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Descriptors.ServiceDescriptor;
import io.grpc.BindableService;
import io.grpc.ExperimentalApi;
import io.grpc.InternalServer;
import io.grpc.Server;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import io.grpc.protobuf.ProtoFileDescriptorSupplier;
import io.grpc.reflection.v1.ErrorResponse;
import io.grpc.reflection.v1.ExtensionNumberResponse;
import io.grpc.reflection.v1.ExtensionRequest;
import io.grpc.reflection.v1.FileDescriptorResponse;
import io.grpc.reflection.v1.ListServiceResponse;
import io.grpc.reflection.v1.ServerReflectionGrpc;
import io.grpc.reflection.v1.ServerReflectionRequest;
import io.grpc.reflection.v1.ServerReflectionResponse;
import io.grpc.reflection.v1.ServiceResponse;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.WeakHashMap;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* Provides a reflection service for Protobuf services (including the reflection service itself).
*
* <p>Separately tracks mutable and immutable services. Throws an exception if either group of
* services contains multiple Protobuf files with declarations of the same service, method, type, or
* extension.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2222")
public final class ProtoReflectionServiceV1 extends ServerReflectionGrpc.ServerReflectionImplBase {
private final Object lock = new Object();
@GuardedBy("lock")
private final Map<Server, ServerReflectionIndex> serverReflectionIndexes = new WeakHashMap<>();
private ProtoReflectionServiceV1() {}
/**
* Creates a instance of {@link ProtoReflectionServiceV1}.
*/
public static BindableService newInstance() {
return new ProtoReflectionServiceV1();
}
/**
* Retrieves the index for services of the server that dispatches the current call. Computes
* one if not exist. The index is updated if any changes to the server's mutable services are
* detected. A change is any addition or removal in the set of file descriptors attached to the
* mutable services or a change in the service names.
*/
private ServerReflectionIndex getRefreshedIndex() {
synchronized (lock) {
Server server = InternalServer.SERVER_CONTEXT_KEY.get();
ServerReflectionIndex index = serverReflectionIndexes.get(server);
if (index == null) {
index =
new ServerReflectionIndex(server.getImmutableServices(), server.getMutableServices());
serverReflectionIndexes.put(server, index);
return index;
}
Set<FileDescriptor> serverFileDescriptors = new HashSet<>();
Set<String> serverServiceNames = new HashSet<>();
List<ServerServiceDefinition> serverMutableServices = server.getMutableServices();
for (ServerServiceDefinition mutableService : serverMutableServices) {
io.grpc.ServiceDescriptor serviceDescriptor = mutableService.getServiceDescriptor();
if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) {
String serviceName = serviceDescriptor.getName();
FileDescriptor fileDescriptor =
((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor())
.getFileDescriptor();
serverFileDescriptors.add(fileDescriptor);
serverServiceNames.add(serviceName);
}
}
// Replace the index if the underlying mutable services have changed. Check both the file
// descriptors and the service names, because one file descriptor can define multiple
// services.
FileDescriptorIndex mutableServicesIndex = index.getMutableServicesIndex();
if (!mutableServicesIndex.getServiceFileDescriptors().equals(serverFileDescriptors)
|| !mutableServicesIndex.getServiceNames().equals(serverServiceNames)) {
index =
new ServerReflectionIndex(server.getImmutableServices(), serverMutableServices);
serverReflectionIndexes.put(server, index);
}
return index;
}
}
@Override
public StreamObserver<ServerReflectionRequest> serverReflectionInfo(
final StreamObserver<ServerReflectionResponse> responseObserver) {
final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver =
(ServerCallStreamObserver<ServerReflectionResponse>) responseObserver;
ProtoReflectionStreamObserver requestObserver =
new ProtoReflectionStreamObserver(getRefreshedIndex(), serverCallStreamObserver);
serverCallStreamObserver.setOnReadyHandler(requestObserver);
serverCallStreamObserver.disableAutoRequest();
serverCallStreamObserver.request(1);
return requestObserver;
}
private static class ProtoReflectionStreamObserver
implements Runnable, StreamObserver<ServerReflectionRequest> {
private final ServerReflectionIndex serverReflectionIndex;
private final ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver;
private boolean closeAfterSend = false;
private ServerReflectionRequest request;
ProtoReflectionStreamObserver(
ServerReflectionIndex serverReflectionIndex,
ServerCallStreamObserver<ServerReflectionResponse> serverCallStreamObserver) {
this.serverReflectionIndex = serverReflectionIndex;
this.serverCallStreamObserver = checkNotNull(serverCallStreamObserver, "observer");
}
@Override
public void run() {
if (request != null) {
handleReflectionRequest();
}
}
@Override
public void onNext(ServerReflectionRequest request) {
checkState(this.request == null);
this.request = checkNotNull(request);
handleReflectionRequest();
}
private void handleReflectionRequest() {
if (serverCallStreamObserver.isReady()) {
switch (request.getMessageRequestCase()) {
case FILE_BY_FILENAME:
getFileByName(request);
break;
case FILE_CONTAINING_SYMBOL:
getFileContainingSymbol(request);
break;
case FILE_CONTAINING_EXTENSION:
getFileByExtension(request);
break;
case ALL_EXTENSION_NUMBERS_OF_TYPE:
getAllExtensions(request);
break;
case LIST_SERVICES:
listServices(request);
break;
default:
sendErrorResponse(
request,
Status.Code.UNIMPLEMENTED,
"not implemented " + request.getMessageRequestCase());
}
request = null;
if (closeAfterSend) {
serverCallStreamObserver.onCompleted();
} else {
serverCallStreamObserver.request(1);
}
}
}
@Override
public void onCompleted() {
if (request != null) {
closeAfterSend = true;
} else {
serverCallStreamObserver.onCompleted();
}
}
@Override
public void onError(Throwable cause) {
serverCallStreamObserver.onError(cause);
}
private void getFileByName(ServerReflectionRequest request) {
String name = request.getFileByFilename();
FileDescriptor fd = serverReflectionIndex.getFileDescriptorByName(name);
if (fd != null) {
serverCallStreamObserver.onNext(createServerReflectionResponse(request, fd));
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "File not found.");
}
}
private void getFileContainingSymbol(ServerReflectionRequest request) {
String symbol = request.getFileContainingSymbol();
FileDescriptor fd = serverReflectionIndex.getFileDescriptorBySymbol(symbol);
if (fd != null) {
serverCallStreamObserver.onNext(createServerReflectionResponse(request, fd));
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "Symbol not found.");
}
}
private void getFileByExtension(ServerReflectionRequest request) {
ExtensionRequest extensionRequest = request.getFileContainingExtension();
String type = extensionRequest.getContainingType();
int extension = extensionRequest.getExtensionNumber();
FileDescriptor fd =
serverReflectionIndex.getFileDescriptorByExtensionAndNumber(type, extension);
if (fd != null) {
serverCallStreamObserver.onNext(createServerReflectionResponse(request, fd));
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "Extension not found.");
}
}
private void getAllExtensions(ServerReflectionRequest request) {
String type = request.getAllExtensionNumbersOfType();
Set<Integer> extensions = serverReflectionIndex.getExtensionNumbersOfType(type);
if (extensions != null) {
ExtensionNumberResponse.Builder builder =
ExtensionNumberResponse.newBuilder()
.setBaseTypeName(type)
.addAllExtensionNumber(extensions);
serverCallStreamObserver.onNext(
ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setAllExtensionNumbersResponse(builder)
.build());
} else {
sendErrorResponse(request, Status.Code.NOT_FOUND, "Type not found.");
}
}
private void listServices(ServerReflectionRequest request) {
ListServiceResponse.Builder builder = ListServiceResponse.newBuilder();
for (String serviceName : serverReflectionIndex.getServiceNames()) {
builder.addService(ServiceResponse.newBuilder().setName(serviceName));
}
serverCallStreamObserver.onNext(
ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setListServicesResponse(builder)
.build());
}
private void sendErrorResponse(
ServerReflectionRequest request, Status.Code code, String message) {
ServerReflectionResponse response =
ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setErrorResponse(
ErrorResponse.newBuilder()
.setErrorCode(code.value())
.setErrorMessage(message))
.build();
serverCallStreamObserver.onNext(response);
}
private ServerReflectionResponse createServerReflectionResponse(
ServerReflectionRequest request, FileDescriptor fd) {
FileDescriptorResponse.Builder fdRBuilder = FileDescriptorResponse.newBuilder();
Set<String> seenFiles = new HashSet<>();
Queue<FileDescriptor> frontier = new ArrayDeque<>();
seenFiles.add(fd.getName());
frontier.add(fd);
while (!frontier.isEmpty()) {
FileDescriptor nextFd = frontier.remove();
fdRBuilder.addFileDescriptorProto(nextFd.toProto().toByteString());
for (FileDescriptor dependencyFd : nextFd.getDependencies()) {
if (!seenFiles.contains(dependencyFd.getName())) {
seenFiles.add(dependencyFd.getName());
frontier.add(dependencyFd);
}
}
}
return ServerReflectionResponse.newBuilder()
.setValidHost(request.getHost())
.setOriginalRequest(request)
.setFileDescriptorResponse(fdRBuilder)
.build();
}
}
/**
* Indexes the server's services and allows lookups of file descriptors by filename, symbol, type,
* and extension number.
*
* <p>Internally, this stores separate indices for the immutable and mutable services. When
* queried, the immutable service index is checked for a matching value. Only if there is no match
* in the immutable service index are the mutable services checked.
*/
private static final class ServerReflectionIndex {
private final FileDescriptorIndex immutableServicesIndex;
private final FileDescriptorIndex mutableServicesIndex;
public ServerReflectionIndex(
List<ServerServiceDefinition> immutableServices,
List<ServerServiceDefinition> mutableServices) {
immutableServicesIndex = new FileDescriptorIndex(immutableServices);
mutableServicesIndex = new FileDescriptorIndex(mutableServices);
}
private FileDescriptorIndex getMutableServicesIndex() {
return mutableServicesIndex;
}
private Set<String> getServiceNames() {
Set<String> immutableServiceNames = immutableServicesIndex.getServiceNames();
Set<String> mutableServiceNames = mutableServicesIndex.getServiceNames();
Set<String> serviceNames =
new HashSet<>(immutableServiceNames.size() + mutableServiceNames.size());
serviceNames.addAll(immutableServiceNames);
serviceNames.addAll(mutableServiceNames);
return serviceNames;
}
@Nullable
private FileDescriptor getFileDescriptorByName(String name) {
FileDescriptor fd = immutableServicesIndex.getFileDescriptorByName(name);
if (fd == null) {
fd = mutableServicesIndex.getFileDescriptorByName(name);
}
return fd;
}
@Nullable
private FileDescriptor getFileDescriptorBySymbol(String symbol) {
FileDescriptor fd = immutableServicesIndex.getFileDescriptorBySymbol(symbol);
if (fd == null) {
fd = mutableServicesIndex.getFileDescriptorBySymbol(symbol);
}
return fd;
}
@Nullable
private FileDescriptor getFileDescriptorByExtensionAndNumber(String type, int extension) {
FileDescriptor fd =
immutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
if (fd == null) {
fd = mutableServicesIndex.getFileDescriptorByExtensionAndNumber(type, extension);
}
return fd;
}
@Nullable
private Set<Integer> getExtensionNumbersOfType(String type) {
Set<Integer> extensionNumbers = immutableServicesIndex.getExtensionNumbersOfType(type);
if (extensionNumbers == null) {
extensionNumbers = mutableServicesIndex.getExtensionNumbersOfType(type);
}
return extensionNumbers;
}
}
/**
* Provides a set of methods for answering reflection queries for the file descriptors underlying
* a set of services. Used by {@link ServerReflectionIndex} to separately index immutable and
* mutable services.
*/
private static final class FileDescriptorIndex {
private final Set<String> serviceNames = new HashSet<>();
private final Set<FileDescriptor> serviceFileDescriptors = new HashSet<>();
private final Map<String, FileDescriptor> fileDescriptorsByName =
new HashMap<>();
private final Map<String, FileDescriptor> fileDescriptorsBySymbol =
new HashMap<>();
private final Map<String, Map<Integer, FileDescriptor>> fileDescriptorsByExtensionAndNumber =
new HashMap<>();
FileDescriptorIndex(List<ServerServiceDefinition> services) {
Queue<FileDescriptor> fileDescriptorsToProcess = new ArrayDeque<>();
Set<String> seenFiles = new HashSet<>();
for (ServerServiceDefinition service : services) {
io.grpc.ServiceDescriptor serviceDescriptor = service.getServiceDescriptor();
if (serviceDescriptor.getSchemaDescriptor() instanceof ProtoFileDescriptorSupplier) {
FileDescriptor fileDescriptor =
((ProtoFileDescriptorSupplier) serviceDescriptor.getSchemaDescriptor())
.getFileDescriptor();
String serviceName = serviceDescriptor.getName();
checkState(
!serviceNames.contains(serviceName), "Service already defined: %s", serviceName);
serviceFileDescriptors.add(fileDescriptor);
serviceNames.add(serviceName);
if (!seenFiles.contains(fileDescriptor.getName())) {
seenFiles.add(fileDescriptor.getName());
fileDescriptorsToProcess.add(fileDescriptor);
}
}
}
while (!fileDescriptorsToProcess.isEmpty()) {
FileDescriptor currentFd = fileDescriptorsToProcess.remove();
processFileDescriptor(currentFd);
for (FileDescriptor dependencyFd : currentFd.getDependencies()) {
if (!seenFiles.contains(dependencyFd.getName())) {
seenFiles.add(dependencyFd.getName());
fileDescriptorsToProcess.add(dependencyFd);
}
}
}
}
/**
* Returns the file descriptors for the indexed services, but not their dependencies. This is
* used to check if the server's mutable services have changed.
*/
private Set<FileDescriptor> getServiceFileDescriptors() {
return Collections.unmodifiableSet(serviceFileDescriptors);
}
private Set<String> getServiceNames() {
return Collections.unmodifiableSet(serviceNames);
}
@Nullable
private FileDescriptor getFileDescriptorByName(String name) {
return fileDescriptorsByName.get(name);
}
@Nullable
private FileDescriptor getFileDescriptorBySymbol(String symbol) {
return fileDescriptorsBySymbol.get(symbol);
}
@Nullable
private FileDescriptor getFileDescriptorByExtensionAndNumber(String type, int number) {
if (fileDescriptorsByExtensionAndNumber.containsKey(type)) {
return fileDescriptorsByExtensionAndNumber.get(type).get(number);
}
return null;
}
@Nullable
private Set<Integer> getExtensionNumbersOfType(String type) {
if (fileDescriptorsByExtensionAndNumber.containsKey(type)) {
return Collections.unmodifiableSet(fileDescriptorsByExtensionAndNumber.get(type).keySet());
}
return null;
}
private void processFileDescriptor(FileDescriptor fd) {
String fdName = fd.getName();
checkState(!fileDescriptorsByName.containsKey(fdName), "File name already used: %s", fdName);
fileDescriptorsByName.put(fdName, fd);
for (ServiceDescriptor service : fd.getServices()) {
processService(service, fd);
}
for (Descriptor type : fd.getMessageTypes()) {
processType(type, fd);
}
for (FieldDescriptor extension : fd.getExtensions()) {
processExtension(extension, fd);
}
}
private void processService(ServiceDescriptor service, FileDescriptor fd) {
String serviceName = service.getFullName();
checkState(
!fileDescriptorsBySymbol.containsKey(serviceName),
"Service already defined: %s",
serviceName);
fileDescriptorsBySymbol.put(serviceName, fd);
for (MethodDescriptor method : service.getMethods()) {
String methodName = method.getFullName();
checkState(
!fileDescriptorsBySymbol.containsKey(methodName),
"Method already defined: %s",
methodName);
fileDescriptorsBySymbol.put(methodName, fd);
}
}
private void processType(Descriptor type, FileDescriptor fd) {
String typeName = type.getFullName();
checkState(
!fileDescriptorsBySymbol.containsKey(typeName), "Type already defined: %s", typeName);
fileDescriptorsBySymbol.put(typeName, fd);
for (FieldDescriptor extension : type.getExtensions()) {
processExtension(extension, fd);
}
for (Descriptor nestedType : type.getNestedTypes()) {
processType(nestedType, fd);
}
}
private void processExtension(FieldDescriptor extension, FileDescriptor fd) {
String extensionName = extension.getContainingType().getFullName();
int extensionNumber = extension.getNumber();
if (!fileDescriptorsByExtensionAndNumber.containsKey(extensionName)) {
fileDescriptorsByExtensionAndNumber.put(
extensionName, new HashMap<Integer, FileDescriptor>());
}
checkState(
!fileDescriptorsByExtensionAndNumber.get(extensionName).containsKey(extensionNumber),
"Extension name and number already defined: %s, %s",
extensionName,
extensionNumber);
fileDescriptorsByExtensionAndNumber.get(extensionName).put(extensionNumber, fd);
}
}
}

View File

@ -0,0 +1,147 @@
// Copyright 2016 The gRPC Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Service exported by server reflection. A more complete description of how
// server reflection works can be found at
// https://github.com/grpc/grpc/blob/master/doc/server-reflection.md
//
// The canonical version of this proto can be found at
// https://github.com/grpc/grpc-proto/blob/master/grpc/reflection/v1/reflection.proto
syntax = "proto3";
package grpc.reflection.v1;
option go_package = "google.golang.org/grpc/reflection/grpc_reflection_v1";
option java_multiple_files = true;
option java_package = "io.grpc.reflection.v1";
option java_outer_classname = "ServerReflectionProto";
service ServerReflection {
// The reflection service is structured as a bidirectional stream, ensuring
// all related requests go to a single server.
rpc ServerReflectionInfo(stream ServerReflectionRequest)
returns (stream ServerReflectionResponse);
}
// The message sent by the client when calling ServerReflectionInfo method.
message ServerReflectionRequest {
string host = 1;
// To use reflection service, the client should set one of the following
// fields in message_request. The server distinguishes requests by their
// defined field and then handles them using corresponding methods.
oneof message_request {
// Find a proto file by the file name.
string file_by_filename = 3;
// Find the proto file that declares the given fully-qualified symbol name.
// This field should be a fully-qualified symbol name
// (e.g. <package>.<service>[.<method>] or <package>.<type>).
string file_containing_symbol = 4;
// Find the proto file which defines an extension extending the given
// message type with the given field number.
ExtensionRequest file_containing_extension = 5;
// Finds the tag numbers used by all known extensions of the given message
// type, and appends them to ExtensionNumberResponse in an undefined order.
// Its corresponding method is best-effort: it's not guaranteed that the
// reflection service will implement this method, and it's not guaranteed
// that this method will provide all extensions. Returns
// StatusCode::UNIMPLEMENTED if it's not implemented.
// This field should be a fully-qualified type name. The format is
// <package>.<type>
string all_extension_numbers_of_type = 6;
// List the full names of registered services. The content will not be
// checked.
string list_services = 7;
}
}
// The type name and extension number sent by the client when requesting
// file_containing_extension.
message ExtensionRequest {
// Fully-qualified type name. The format should be <package>.<type>
string containing_type = 1;
int32 extension_number = 2;
}
// The message sent by the server to answer ServerReflectionInfo method.
message ServerReflectionResponse {
string valid_host = 1;
ServerReflectionRequest original_request = 2;
// The server sets one of the following fields according to the message_request
// in the request.
oneof message_response {
// This message is used to answer file_by_filename, file_containing_symbol,
// file_containing_extension requests with transitive dependencies.
// As the repeated label is not allowed in oneof fields, we use a
// FileDescriptorResponse message to encapsulate the repeated fields.
// The reflection service is allowed to avoid sending FileDescriptorProtos
// that were previously sent in response to earlier requests in the stream.
FileDescriptorResponse file_descriptor_response = 4;
// This message is used to answer all_extension_numbers_of_type requests.
ExtensionNumberResponse all_extension_numbers_response = 5;
// This message is used to answer list_services requests.
ListServiceResponse list_services_response = 6;
// This message is used when an error occurs.
ErrorResponse error_response = 7;
}
}
// Serialized FileDescriptorProto messages sent by the server answering
// a file_by_filename, file_containing_symbol, or file_containing_extension
// request.
message FileDescriptorResponse {
// Serialized FileDescriptorProto messages. We avoid taking a dependency on
// descriptor.proto, which uses proto2 only features, by making them opaque
// bytes instead.
repeated bytes file_descriptor_proto = 1;
}
// A list of extension numbers sent by the server answering
// all_extension_numbers_of_type request.
message ExtensionNumberResponse {
// Full name of the base type, including the package name. The format
// is <package>.<type>
string base_type_name = 1;
repeated int32 extension_number = 2;
}
// A list of ServiceResponse sent by the server answering list_services request.
message ListServiceResponse {
// The information of each service may be expanded in the future, so we use
// ServiceResponse message to encapsulate it.
repeated ServiceResponse service = 1;
}
// The information of a single service used by ListServiceResponse to answer
// list_services request.
message ServiceResponse {
// Full name of a registered service, including its package name. The format
// is <package>.<service>
string name = 1;
}
// The error code and error message sent by the server when an error occurs.
message ErrorResponse {
// This field uses the error codes defined in grpc::StatusCode.
int32 error_code = 1;
string error_message = 2;
}

View File

@ -0,0 +1,670 @@
/*
* Copyright 2016 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.protobuf.services;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.protobuf.ByteString;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.ServerServiceDefinition;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.reflection.testing.AnotherDynamicServiceGrpc;
import io.grpc.reflection.testing.AnotherReflectableServiceGrpc;
import io.grpc.reflection.testing.DynamicReflectionTestDepthTwoProto;
import io.grpc.reflection.testing.DynamicServiceGrpc;
import io.grpc.reflection.testing.ReflectableServiceGrpc;
import io.grpc.reflection.testing.ReflectionTestDepthThreeProto;
import io.grpc.reflection.testing.ReflectionTestDepthTwoAlternateProto;
import io.grpc.reflection.testing.ReflectionTestDepthTwoProto;
import io.grpc.reflection.testing.ReflectionTestProto;
import io.grpc.reflection.v1.ExtensionNumberResponse;
import io.grpc.reflection.v1.ExtensionRequest;
import io.grpc.reflection.v1.FileDescriptorResponse;
import io.grpc.reflection.v1.ServerReflectionGrpc;
import io.grpc.reflection.v1.ServerReflectionRequest;
import io.grpc.reflection.v1.ServerReflectionResponse;
import io.grpc.reflection.v1.ServiceResponse;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.util.MutableHandlerRegistry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link ProtoReflectionServiceV1}. */
@RunWith(JUnit4.class)
public class ProtoReflectionServiceV1Test {
@Rule
public GrpcCleanupRule grpcCleanupRule = new GrpcCleanupRule();
private static final String TEST_HOST = "localhost";
private MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry();
private BindableService reflectionService;
private ServerServiceDefinition dynamicService =
new DynamicServiceGrpc.DynamicServiceImplBase() {}.bindService();
private ServerServiceDefinition anotherDynamicService =
new AnotherDynamicServiceGrpc.AnotherDynamicServiceImplBase() {}.bindService();
private ServerReflectionGrpc.ServerReflectionStub stub;
@Before
public void setUp() throws Exception {
reflectionService = ProtoReflectionServiceV1.newInstance();
Server server =
InProcessServerBuilder.forName("proto-reflection-test")
.directExecutor()
.addService(reflectionService)
.addService(new ReflectableServiceGrpc.ReflectableServiceImplBase() {})
.fallbackHandlerRegistry(handlerRegistry)
.build()
.start();
grpcCleanupRule.register(server);
ManagedChannel channel =
grpcCleanupRule.register(
InProcessChannelBuilder.forName("proto-reflection-test").directExecutor().build());
stub = ServerReflectionGrpc.newStub(channel);
}
@Test
public void listServices() throws Exception {
Set<ServiceResponse> originalServices =
new HashSet<>(
Arrays.asList(
ServiceResponse.newBuilder()
.setName("grpc.reflection.v1.ServerReflection")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.ReflectableService")
.build()));
assertServiceResponseEquals(originalServices);
handlerRegistry.addService(dynamicService);
assertServiceResponseEquals(
new HashSet<>(
Arrays.asList(
ServiceResponse.newBuilder()
.setName("grpc.reflection.v1.ServerReflection")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.ReflectableService")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.DynamicService")
.build())));
handlerRegistry.addService(anotherDynamicService);
assertServiceResponseEquals(
new HashSet<>(
Arrays.asList(
ServiceResponse.newBuilder()
.setName("grpc.reflection.v1.ServerReflection")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.ReflectableService")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.DynamicService")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.AnotherDynamicService")
.build())));
handlerRegistry.removeService(dynamicService);
assertServiceResponseEquals(
new HashSet<>(
Arrays.asList(
ServiceResponse.newBuilder()
.setName("grpc.reflection.v1.ServerReflection")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.ReflectableService")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.AnotherDynamicService")
.build())));
handlerRegistry.removeService(anotherDynamicService);
assertServiceResponseEquals(originalServices);
}
@Test
public void fileByFilename() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileByFilename("io/grpc/reflection/testing/reflection_test_depth_three.proto")
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
@Test
public void fileByFilenameConsistentForMutableServices() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileByFilename("io/grpc/reflection/testing/dynamic_reflection_test_depth_two.proto")
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
DynamicReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
handlerRegistry.addService(dynamicService);
requestObserver.onNext(request);
requestObserver.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver2 =
stub.serverReflectionInfo(responseObserver2);
handlerRegistry.removeService(dynamicService);
requestObserver2.onNext(request);
requestObserver2.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver3 =
stub.serverReflectionInfo(responseObserver3);
requestObserver3.onNext(request);
requestObserver3.onCompleted();
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver.firstValue().get().getMessageResponseCase());
assertEquals(goldenResponse, responseObserver2.firstValue().get());
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver3.firstValue().get().getMessageResponseCase());
}
@Test
public void fileContainingSymbol() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingSymbol("grpc.reflection.testing.ReflectableService.Method")
.build();
List<ByteString> goldenResponse =
Arrays.asList(
ReflectionTestProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString());
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
List<ByteString> response =
responseObserver
.firstValue()
.get()
.getFileDescriptorResponse()
.getFileDescriptorProtoList();
assertEquals(goldenResponse.size(), response.size());
assertEquals(new HashSet<>(goldenResponse), new HashSet<>(response));
}
@Test
public void fileContainingNestedSymbol() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingSymbol("grpc.reflection.testing.NestedTypeOuter.Middle.Inner")
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
@Test
public void fileContainingSymbolForMutableServices() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingSymbol("grpc.reflection.testing.DynamicRequest")
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
DynamicReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
handlerRegistry.addService(dynamicService);
requestObserver.onNext(request);
requestObserver.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver2 =
stub.serverReflectionInfo(responseObserver2);
handlerRegistry.removeService(dynamicService);
requestObserver2.onNext(request);
requestObserver2.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver3 =
stub.serverReflectionInfo(responseObserver3);
requestObserver3.onNext(request);
requestObserver3.onCompleted();
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver.firstValue().get().getMessageResponseCase());
assertEquals(goldenResponse, responseObserver2.firstValue().get());
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver3.firstValue().get().getMessageResponseCase());
}
@Test
public void fileContainingExtension() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingExtension(
ExtensionRequest.newBuilder()
.setContainingType("grpc.reflection.testing.ThirdLevelType")
.setExtensionNumber(100)
.build())
.build();
List<ByteString> goldenResponse =
Arrays.asList(
ReflectionTestProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthTwoAlternateProto.getDescriptor().toProto().toByteString(),
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString());
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
List<ByteString> response =
responseObserver
.firstValue()
.get()
.getFileDescriptorResponse()
.getFileDescriptorProtoList();
assertEquals(goldenResponse.size(), response.size());
assertEquals(new HashSet<>(goldenResponse), new HashSet<>(response));
}
@Test
public void fileContainingNestedExtension() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingExtension(
ExtensionRequest.newBuilder()
.setContainingType("grpc.reflection.testing.ThirdLevelType")
.setExtensionNumber(101)
.build())
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
ReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
.addFileDescriptorProto(
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
assertEquals(goldenResponse, responseObserver.firstValue().get());
}
@Test
public void fileContainingExtensionForMutableServices() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileContainingExtension(
ExtensionRequest.newBuilder()
.setContainingType("grpc.reflection.testing.TypeWithExtensions")
.setExtensionNumber(200)
.build())
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
DynamicReflectionTestDepthTwoProto.getDescriptor().toProto().toByteString())
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
handlerRegistry.addService(dynamicService);
requestObserver.onNext(request);
requestObserver.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver2 =
stub.serverReflectionInfo(responseObserver2);
handlerRegistry.removeService(dynamicService);
requestObserver2.onNext(request);
requestObserver2.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver3 =
stub.serverReflectionInfo(responseObserver3);
requestObserver3.onNext(request);
requestObserver3.onCompleted();
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver.firstValue().get().getMessageResponseCase());
assertEquals(goldenResponse, responseObserver2.firstValue().get());
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver3.firstValue().get().getMessageResponseCase());
}
@Test
public void allExtensionNumbersOfType() throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setAllExtensionNumbersOfType("grpc.reflection.testing.ThirdLevelType")
.build();
Set<Integer> goldenResponse = new HashSet<>(Arrays.asList(100, 101));
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
Set<Integer> extensionNumberResponseSet =
new HashSet<>(
responseObserver
.firstValue()
.get()
.getAllExtensionNumbersResponse()
.getExtensionNumberList());
assertEquals(goldenResponse, extensionNumberResponseSet);
}
@Test
public void allExtensionNumbersOfTypeForMutableServices() throws Exception {
String type = "grpc.reflection.testing.TypeWithExtensions";
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setAllExtensionNumbersOfType(type)
.build();
ServerReflectionResponse goldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(request)
.setAllExtensionNumbersResponse(
ExtensionNumberResponse.newBuilder()
.setBaseTypeName(type)
.addExtensionNumber(200)
.build())
.build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
handlerRegistry.addService(dynamicService);
requestObserver.onNext(request);
requestObserver.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver2 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver2 =
stub.serverReflectionInfo(responseObserver2);
handlerRegistry.removeService(dynamicService);
requestObserver2.onNext(request);
requestObserver2.onCompleted();
StreamRecorder<ServerReflectionResponse> responseObserver3 = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver3 =
stub.serverReflectionInfo(responseObserver3);
requestObserver3.onNext(request);
requestObserver3.onCompleted();
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver.firstValue().get().getMessageResponseCase());
assertEquals(goldenResponse, responseObserver2.firstValue().get());
assertEquals(
ServerReflectionResponse.MessageResponseCase.ERROR_RESPONSE,
responseObserver3.firstValue().get().getMessageResponseCase());
}
@Test
public void sharedServiceBetweenServers()
throws IOException, ExecutionException, InterruptedException {
Server anotherServer = InProcessServerBuilder.forName("proto-reflection-test-2")
.directExecutor()
.addService(reflectionService)
.addService(new AnotherReflectableServiceGrpc.AnotherReflectableServiceImplBase() {})
.build()
.start();
grpcCleanupRule.register(anotherServer);
ManagedChannel anotherChannel = grpcCleanupRule.register(
InProcessChannelBuilder.forName("proto-reflection-test-2").directExecutor().build());
ServerReflectionGrpc.ServerReflectionStub stub2 = ServerReflectionGrpc.newStub(anotherChannel);
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder().setHost(TEST_HOST).setListServices("services").build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub2.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
List<ServiceResponse> response =
responseObserver.firstValue().get().getListServicesResponse().getServiceList();
assertEquals(new HashSet<>(
Arrays.asList(
ServiceResponse.newBuilder()
.setName("grpc.reflection.v1.ServerReflection")
.build(),
ServiceResponse.newBuilder()
.setName("grpc.reflection.testing.AnotherReflectableService")
.build())),
new HashSet<>(response));
}
@Test
public void flowControl() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
// Verify we don't receive a response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(0, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
// Verify we don't receive an additional response until we request it.
requestObserver.onNext(flowControlRequest);
assertEquals(1, clientResponseObserver.getResponses().size());
requestObserver.request(1);
assertEquals(2, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(1));
requestObserver.onCompleted();
assertTrue(clientResponseObserver.onCompleteCalled());
}
@Test
public void flowControlOnCompleteWithPendingRequest() throws Exception {
FlowControlClientResponseObserver clientResponseObserver =
new FlowControlClientResponseObserver();
ClientCallStreamObserver<ServerReflectionRequest> requestObserver =
(ClientCallStreamObserver<ServerReflectionRequest>)
stub.serverReflectionInfo(clientResponseObserver);
requestObserver.onNext(flowControlRequest);
requestObserver.onCompleted();
assertEquals(0, clientResponseObserver.getResponses().size());
assertFalse(clientResponseObserver.onCompleteCalled());
requestObserver.request(1);
assertTrue(clientResponseObserver.onCompleteCalled());
assertEquals(1, clientResponseObserver.getResponses().size());
assertEquals(flowControlGoldenResponse, clientResponseObserver.getResponses().get(0));
}
private final ServerReflectionRequest flowControlRequest =
ServerReflectionRequest.newBuilder()
.setHost(TEST_HOST)
.setFileByFilename("io/grpc/reflection/testing/reflection_test_depth_three.proto")
.build();
private final ServerReflectionResponse flowControlGoldenResponse =
ServerReflectionResponse.newBuilder()
.setValidHost(TEST_HOST)
.setOriginalRequest(flowControlRequest)
.setFileDescriptorResponse(
FileDescriptorResponse.newBuilder()
.addFileDescriptorProto(
ReflectionTestDepthThreeProto.getDescriptor().toProto().toByteString())
.build())
.build();
private static class FlowControlClientResponseObserver
implements ClientResponseObserver<ServerReflectionRequest, ServerReflectionResponse> {
private final List<ServerReflectionResponse> responses =
new ArrayList<>();
private boolean onCompleteCalled = false;
@Override
public void beforeStart(final ClientCallStreamObserver<ServerReflectionRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(0);
}
@Override
public void onNext(ServerReflectionResponse value) {
responses.add(value);
}
@Override
public void onError(Throwable t) {
fail("onError called");
}
@Override
public void onCompleted() {
onCompleteCalled = true;
}
public List<ServerReflectionResponse> getResponses() {
return responses;
}
public boolean onCompleteCalled() {
return onCompleteCalled;
}
}
private void assertServiceResponseEquals(Set<ServiceResponse> goldenResponse) throws Exception {
ServerReflectionRequest request =
ServerReflectionRequest.newBuilder().setHost(TEST_HOST).setListServices("services").build();
StreamRecorder<ServerReflectionResponse> responseObserver = StreamRecorder.create();
StreamObserver<ServerReflectionRequest> requestObserver =
stub.serverReflectionInfo(responseObserver);
requestObserver.onNext(request);
requestObserver.onCompleted();
List<ServiceResponse> response =
responseObserver.firstValue().get().getListServicesResponse().getServiceList();
assertEquals(goldenResponse.size(), response.size());
assertEquals(goldenResponse, new HashSet<>(response));
}
}