diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java index ffe455fa75..967b346549 100644 --- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java +++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java @@ -110,6 +110,9 @@ public abstract class AbstractServerImplBuilder> servers = + new ConcurrentHashMap>(); private final ConcurrentMap> rootChannels = new ConcurrentHashMap>(); private final ConcurrentMap> channels = @@ -45,6 +47,10 @@ public final class Channelz { return INSTANCE; } + public void addServer(Instrumented server) { + add(servers, server); + } + public void addChannel(Instrumented channel) { add(channels, channel); } @@ -58,6 +64,10 @@ public final class Channelz { add(transports, transport); } + public void removeServer(Instrumented server) { + remove(servers, server); + } + public void removeChannel(Instrumented channel) { remove(channels, channel); } @@ -71,6 +81,11 @@ public final class Channelz { remove(transports, transport); } + @VisibleForTesting + public boolean containsServer(LogId serverRef) { + return contains(servers, serverRef); + } + @VisibleForTesting public boolean containsChannel(LogId channelRef) { return contains(channels, channelRef); @@ -98,6 +113,69 @@ public final class Channelz { return map.containsKey(id.getId()); } + @Immutable + public static final class ServerStats { + public final long callsStarted; + public final long callsSucceeded; + public final long callsFailed; + public final long lastCallStartedMillis; + + /** + * Creates an instance. + */ + public ServerStats( + long callsStarted, + long callsSucceeded, + long callsFailed, + long lastCallStartedMillis) { + this.callsStarted = callsStarted; + this.callsSucceeded = callsSucceeded; + this.callsFailed = callsFailed; + this.lastCallStartedMillis = lastCallStartedMillis; + } + + public static final class Builder { + private String target; + private ConnectivityState state; + private long callsStarted; + private long callsSucceeded; + private long callsFailed; + private long lastCallStartedMillis; + public List subchannels; + + public Builder setCallsStarted(long callsStarted) { + this.callsStarted = callsStarted; + return this; + } + + public Builder setCallsSucceeded(long callsSucceeded) { + this.callsSucceeded = callsSucceeded; + return this; + } + + public Builder setCallsFailed(long callsFailed) { + this.callsFailed = callsFailed; + return this; + } + + public Builder setLastCallStartedMillis(long lastCallStartedMillis) { + this.lastCallStartedMillis = lastCallStartedMillis; + return this; + } + + /** + * Builds an instance. + */ + public ServerStats build() { + return new ServerStats( + callsStarted, + callsSucceeded, + callsFailed, + lastCallStartedMillis); + } + } + } + /** * A data class to represent a channel's stats. */ diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 1380b93f9f..203ecaaed4 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -55,6 +55,7 @@ final class ServerCallImpl extends ServerCall { private final byte[] messageAcceptEncoding; private final DecompressorRegistry decompressorRegistry; private final CompressorRegistry compressorRegistry; + private CallTracer serverCallTracer; // state private volatile boolean cancelled; @@ -65,13 +66,16 @@ final class ServerCallImpl extends ServerCall { ServerCallImpl(ServerStream stream, MethodDescriptor method, Metadata inboundHeaders, Context.CancellableContext context, - DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry) { + DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry, + CallTracer serverCallTracer) { this.stream = stream; this.method = method; this.context = context; this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY); this.decompressorRegistry = decompressorRegistry; this.compressorRegistry = compressorRegistry; + this.serverCallTracer = serverCallTracer; + this.serverCallTracer.reportCallStarted(); } @Override @@ -166,14 +170,18 @@ final class ServerCallImpl extends ServerCall { @Override public void close(Status status, Metadata trailers) { checkState(!closeCalled, "call already closed"); - closeCalled = true; + try { + closeCalled = true; - if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) { - internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE)); - return; + if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) { + internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE)); + return; + } + + stream.close(status, trailers); + } finally { + serverCallTracer.reportCallEnded(status.isOk()); } - - stream.close(status, trailers); } @Override @@ -208,6 +216,7 @@ final class ServerCallImpl extends ServerCall { private void internalClose(Status internalError) { log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError}); stream.cancel(internalError); + serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false } /** diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 292028f85f..498516ebee 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -26,6 +26,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.grpc.Attributes; import io.grpc.CompressorRegistry; import io.grpc.Context; @@ -41,6 +43,7 @@ import io.grpc.ServerMethodDefinition; import io.grpc.ServerServiceDefinition; import io.grpc.ServerTransportFilter; import io.grpc.Status; +import io.grpc.internal.Channelz.ServerStats; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -70,7 +73,7 @@ import javax.annotation.concurrent.GuardedBy; *

Starting the server starts the underlying transport for servicing requests. Stopping the * server stops servicing new requests and waits for all connections to terminate. */ -public final class ServerImpl extends io.grpc.Server implements WithLogId { +public final class ServerImpl extends io.grpc.Server implements Instrumented { private static final Logger log = Logger.getLogger(ServerImpl.class.getName()); private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); @@ -106,6 +109,9 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { private final CompressorRegistry compressorRegistry; private final BinaryLogProvider binlogProvider; + private final Channelz channelz; + private final CallTracer serverCallTracer; + /** * Construct a server. * @@ -133,6 +139,10 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]); this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis; this.binlogProvider = builder.binlogProvider; + this.channelz = builder.channelz; + this.serverCallTracer = builder.callTracerFactory.create(); + + channelz.addServer(this); } /** @@ -304,6 +314,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { } // TODO(carl-mastrangelo): move this outside the synchronized block. lock.notifyAll(); + channelz.removeServer(this); } } } @@ -538,7 +549,8 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { headers, context, decompressorRegistry, - compressorRegistry); + compressorRegistry, + serverCallTracer); ServerCall.Listener listener = methodDef.getServerCallHandler().startCall(call, headers); @@ -555,6 +567,15 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId { return logId; } + @Override + public ListenableFuture getStats() { + ServerStats.Builder builder = new ServerStats.Builder(); + serverCallTracer.updateBuilder(builder); + SettableFuture ret = SettableFuture.create(); + ret.set(builder.build()); + return ret; + } + private static final class NoopListener implements ServerStreamListener { @Override public void messagesAvailable(MessageProducer producer) { diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index 39b4c5532a..378bbabf7e 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -40,6 +40,8 @@ import io.grpc.MethodDescriptor.Marshaller; import io.grpc.MethodDescriptor.MethodType; import io.grpc.ServerCall; import io.grpc.Status; +import io.grpc.internal.Channelz.ServerStats; +import io.grpc.internal.Channelz.ServerStats.Builder; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.grpc.internal.testing.SingleMessageProducer; import java.io.ByteArrayInputStream; @@ -61,6 +63,7 @@ public class ServerCallImplTest { @Mock private ServerStream stream; @Mock private ServerCall.Listener callListener; + private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create(); private ServerCallImpl call; private Context.CancellableContext context; @@ -87,7 +90,47 @@ public class ServerCallImplTest { MockitoAnnotations.initMocks(this); context = Context.ROOT.withCancellation(); call = new ServerCallImpl(stream, UNARY_METHOD, requestHeaders, context, - DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance()); + DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), + serverCallTracer); + } + + @Test + public void callTracer_success() { + callTracer0(Status.OK); + } + + @Test + public void callTracer_failure() { + callTracer0(Status.UNKNOWN); + } + + private void callTracer0(Status status) { + CallTracer tracer = CallTracer.getDefaultFactory().create(); + Builder beforeBuilder = new Builder(); + tracer.updateBuilder(beforeBuilder); + ServerStats before = beforeBuilder.build(); + assertEquals(0, before.callsStarted); + assertEquals(0, before.lastCallStartedMillis); + + call = new ServerCallImpl(stream, UNARY_METHOD, requestHeaders, context, + DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), + tracer); + + // required boilerplate + call.sendHeaders(new Metadata()); + call.sendMessage(123L); + // end: required boilerplate + + call.close(status, new Metadata()); + Builder afterBuilder = new Builder(); + tracer.updateBuilder(afterBuilder); + ServerStats after = afterBuilder.build(); + assertEquals(1, after.callsStarted); + if (status.isOk()) { + assertEquals(1, after.callsSucceeded); + } else { + assertEquals(1, after.callsFailed); + } } @Test @@ -181,7 +224,8 @@ public class ServerCallImplTest { requestHeaders, context, DecompressorRegistry.getDefaultInstance(), - CompressorRegistry.getDefaultInstance()); + CompressorRegistry.getDefaultInstance(), + serverCallTracer); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); verify(stream, times(1)).writeMessage(any(InputStream.class)); @@ -215,7 +259,8 @@ public class ServerCallImplTest { requestHeaders, context, DecompressorRegistry.getDefaultInstance(), - CompressorRegistry.getDefaultInstance()); + CompressorRegistry.getDefaultInstance(), + serverCallTracer); serverCall.sendHeaders(new Metadata()); serverCall.sendMessage(1L); serverCall.sendMessage(1L); @@ -251,7 +296,8 @@ public class ServerCallImplTest { requestHeaders, context, DecompressorRegistry.getDefaultInstance(), - CompressorRegistry.getDefaultInstance()); + CompressorRegistry.getDefaultInstance(), + serverCallTracer); serverCall.close(Status.OK, new Metadata()); ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); verify(stream, times(1)).cancel(statusCaptor.capture()); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index ce0c7b518a..e115ed9e2c 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -139,6 +139,8 @@ public class ServerImplTest { private final FakeClock executor = new FakeClock(); private final FakeClock timer = new FakeClock(); + private final Channelz channelz = new Channelz(); + @Mock private ServerStreamTracer.Factory streamTracerFactory; private List streamTracerFactories; @@ -188,6 +190,7 @@ public class ServerImplTest { @Before public void startUp() throws IOException { MockitoAnnotations.initMocks(this); + builder.channelz = channelz; streamTracerFactories = Arrays.asList(streamTracerFactory); when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService()); when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class))) @@ -1361,6 +1364,22 @@ public class ServerImplTest { userInterceptor.interceptedMethods.get(0).getResponseMarshaller()); } + @Test + public void channelz_membership() throws Exception { + createServer(); + assertTrue(builder.channelz.containsServer(server.getLogId())); + server.shutdownNow().awaitTermination(); + assertFalse(builder.channelz.containsServer(server.getLogId())); + } + + @Test + public void channelz_serverStats() throws Exception { + createAndStartServer(); + assertEquals(0, server.getStats().get().callsSucceeded); + basicExchangeHelper(METHOD, "Lots of pizza, please", 314, null); + assertEquals(1, server.getStats().get().callsSucceeded); + } + private void createAndStartServer() throws IOException { createServer(); server.start();