core: pass ServerCall to ServerStreamTracer. (#2930)

This is needed for GRPCLB server-side load reporting, which needs to record
the authority and peer identity.
This commit is contained in:
Kun Zhang 2017-04-20 09:30:46 -07:00 committed by GitHub
parent a92b0488ed
commit 7cf35510f7
4 changed files with 24 additions and 2 deletions

View File

@ -47,6 +47,13 @@ public abstract class ServerStreamTracer extends StreamTracer {
return context;
}
/**
* Called when {@link ServerCall} is created. This is for the tracer to access information
* about the {@code ServerCall}.
*/
public void serverCallStarted(ServerCall<?, ?> call) {
}
public abstract static class Factory {
/**
* Creates a {@link ServerStreamTracer} for a new server stream.

View File

@ -418,7 +418,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
context.cancel(null);
return;
}
listener = startCall(stream, methodName, method, headers, context);
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
} catch (RuntimeException e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
@ -464,11 +464,12 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
/** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
Context.CancellableContext context) {
Context.CancellableContext context, StatsTraceContext statsTraceCtx) {
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(
stream, methodDef.getMethodDescriptor(), headers, context,
decompressorRegistry, compressorRegistry);
statsTraceCtx.serverCallStarted(call);
ServerCall.Listener<ReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
if (listener == null) {

View File

@ -38,6 +38,7 @@ import io.grpc.CallOptions;
import io.grpc.ClientStreamTracer;
import io.grpc.Context;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.StreamTracer;
@ -138,6 +139,17 @@ public final class StatsTraceContext {
return ctx;
}
/**
* See {@link ServerStreamTracer#serverCallStarted}. For server-side only.
*
* <p>Called from {@link io.grpc.internal.ServerImpl}.
*/
public void serverCallStarted(ServerCall<?, ?> call) {
for (StreamTracer tracer : tracers) {
((ServerStreamTracer) tracer).serverCallStarted(call);
}
}
/**
* See {@link StreamTracer#streamClosed}. This may be called multiple times, and only the first
* value will be taken.

View File

@ -373,6 +373,7 @@ public class ServerImplTest {
assertEquals("Method not found: Waiter/nonexist", status.getDescription());
verify(streamTracerFactory).newServerStreamTracer(eq("Waiter/nonexist"), same(requestHeaders));
verify(streamTracer, never()).serverCallStarted(any(ServerCall.class));
assertEquals(Status.Code.UNIMPLEMENTED, statusCaptor.getValue().getCode());
}
@ -426,6 +427,7 @@ public class ServerImplTest {
assertEquals(1, executor.runDueTasks());
ServerCall<String, Integer> call = callReference.get();
assertNotNull(call);
verify(streamTracer).serverCallStarted(same(call));
verify(stream).getAuthority();
Context callContext = callContextReference.get();
assertNotNull(callContext);