xds: do not add a new CallMetricRecorder instance to context if there is already one (#6042)

* xds: do not add a new CallMetricRecorder instance to context if there is already one

* save a local reference to CallMetricRecorder instance to allow ServerCall#close() being called in a different context

* added test coverage for having ServerInterceptor/ServerStreamTracerFactory both install CallMetricRecorder
This commit is contained in:
Chengyuan Zhang 2019-08-08 13:00:46 -07:00 committed by GitHub
parent 4faad27078
commit c5317e4935
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 3 deletions

View File

@ -64,12 +64,19 @@ public final class OrcaMetricReportingServerInterceptor implements ServerInterce
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) {
Context ctx = Context.current();
CallMetricRecorder callMetricRecorder = InternalCallMetricRecorder.CONTEXT_KEY.get(ctx);
if (callMetricRecorder == null) {
callMetricRecorder = InternalCallMetricRecorder.newCallMetricRecorder();
ctx = ctx.withValue(InternalCallMetricRecorder.CONTEXT_KEY, callMetricRecorder);
}
final CallMetricRecorder finalCallMetricRecorder = callMetricRecorder;
ServerCall<ReqT, RespT> trailerAttachingCall =
new SimpleForwardingServerCall<ReqT, RespT>(call) {
@Override
public void close(Status status, Metadata trailers) {
Map<String, Double> metricValues =
InternalCallMetricRecorder.finalizeAndDump(CallMetricRecorder.getCurrent());
InternalCallMetricRecorder.finalizeAndDump(finalCallMetricRecorder);
// Only attach a metric report if there are some metric values to be reported.
if (!metricValues.isEmpty()) {
OrcaLoadReport report =
@ -79,9 +86,8 @@ public final class OrcaMetricReportingServerInterceptor implements ServerInterce
super.close(status, trailers);
}
};
final CallMetricRecorder recorder = InternalCallMetricRecorder.newCallMetricRecorder();
return Contexts.interceptCall(
Context.current().withValue(InternalCallMetricRecorder.CONTEXT_KEY, recorder),
ctx,
trailerAttachingCall,
headers,
next);

View File

@ -24,6 +24,7 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.Context;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.ForwardingClientCallListener.SimpleForwardingClientCallListener;
import io.grpc.ManagedChannel;
@ -31,16 +32,19 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.services.CallMetricRecorder;
import io.grpc.services.InternalCallMetricRecorder;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleRequest;
import io.grpc.testing.protobuf.SimpleResponse;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
@ -105,6 +109,55 @@ public class OrcaMetricReportingServerInterceptorTest {
baseChannel, new TrailersCapturingClientInterceptor(trailersCapture));
}
@Test
public void shareCallMetricRecorderInContext() throws IOException {
final CallMetricRecorder callMetricRecorder =
InternalCallMetricRecorder.newCallMetricRecorder();
ServerStreamTracer.Factory callMetricRecorderSharingStreamTracerFactory =
new ServerStreamTracer.Factory() {
@Override
public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata headers) {
return new ServerStreamTracer() {
@Override
public Context filterContext(Context context) {
return context.withValue(InternalCallMetricRecorder.CONTEXT_KEY, callMetricRecorder);
}
};
}
};
final AtomicReference<CallMetricRecorder> callMetricRecorderCapture = new AtomicReference<>();
SimpleServiceGrpc.SimpleServiceImplBase simpleServiceImpl =
new SimpleServiceGrpc.SimpleServiceImplBase() {
@Override
public void unaryRpc(
SimpleRequest request, StreamObserver<SimpleResponse> responseObserver) {
callMetricRecorderCapture.set(CallMetricRecorder.getCurrent());
SimpleResponse response =
SimpleResponse.newBuilder().setResponseMessage("Simple response").build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
};
ServerInterceptor metricReportingServerInterceptor = new OrcaMetricReportingServerInterceptor();
String serverName = InProcessServerBuilder.generateName();
grpcCleanupRule.register(
InProcessServerBuilder
.forName(serverName)
.directExecutor()
.addStreamTracerFactory(callMetricRecorderSharingStreamTracerFactory)
.addService(
ServerInterceptors.intercept(simpleServiceImpl, metricReportingServerInterceptor))
.build().start());
ManagedChannel channel =
grpcCleanupRule.register(InProcessChannelBuilder.forName(serverName).build());
ClientCalls.blockingUnaryCall(channel, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);
assertThat(callMetricRecorderCapture.get()).isSameInstanceAs(callMetricRecorder);
}
@Test
public void noTrailerReportIfNoRecordedMetrics() {
ClientCalls.blockingUnaryCall(channelToUse, SIMPLE_METHOD, CallOptions.DEFAULT, REQUEST);