core: add ServerImpl to channelz (#4147)

This allows servers to be identified by ids.
This commit is contained in:
zpencer 2018-02-27 08:22:25 -08:00 committed by GitHub
parent 39decadaf6
commit 4dc1b50e1a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 198 additions and 13 deletions

View File

@ -110,6 +110,9 @@ public abstract class AbstractServerImplBuilder<T extends AbstractServerImplBuil
protected BinaryLogProvider binlogProvider = BinaryLogProvider.provider();
protected TransportTracer.Factory transportTracerFactory = TransportTracer.getDefaultFactory();
protected Channelz channelz = Channelz.instance();
protected CallTracer.Factory callTracerFactory = CallTracer.getDefaultFactory();
@Override
public final T directExecutor() {
return executor(MoreExecutors.directExecutor());

View File

@ -18,6 +18,7 @@ package io.grpc.internal;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.Channelz.ChannelStats;
import io.grpc.internal.Channelz.ServerStats;
/**
* A collection of call stats for channelz.
@ -54,6 +55,14 @@ final class CallTracer {
.setLastCallStartedMillis(lastCallStartedMillis);
}
void updateBuilder(ServerStats.Builder builder) {
builder
.setCallsStarted(callsStarted.value())
.setCallsSucceeded(callsSucceeded.value())
.setCallsFailed(callsFailed.value())
.setLastCallStartedMillis(lastCallStartedMillis);
}
@VisibleForTesting
interface TimeProvider {
/** Returns the current milli time. */

View File

@ -30,6 +30,8 @@ public final class Channelz {
private static final Logger log = Logger.getLogger(Channelz.class.getName());
private static final Channelz INSTANCE = new Channelz();
private final ConcurrentMap<Long, Instrumented<ServerStats>> servers =
new ConcurrentHashMap<Long, Instrumented<ServerStats>>();
private final ConcurrentMap<Long, Instrumented<ChannelStats>> rootChannels =
new ConcurrentHashMap<Long, Instrumented<ChannelStats>>();
private final ConcurrentMap<Long, Instrumented<ChannelStats>> channels =
@ -45,6 +47,10 @@ public final class Channelz {
return INSTANCE;
}
public void addServer(Instrumented<ServerStats> server) {
add(servers, server);
}
public void addChannel(Instrumented<ChannelStats> channel) {
add(channels, channel);
}
@ -58,6 +64,10 @@ public final class Channelz {
add(transports, transport);
}
public void removeServer(Instrumented<ServerStats> server) {
remove(servers, server);
}
public void removeChannel(Instrumented<ChannelStats> channel) {
remove(channels, channel);
}
@ -71,6 +81,11 @@ public final class Channelz {
remove(transports, transport);
}
@VisibleForTesting
public boolean containsServer(LogId serverRef) {
return contains(servers, serverRef);
}
@VisibleForTesting
public boolean containsChannel(LogId channelRef) {
return contains(channels, channelRef);
@ -98,6 +113,69 @@ public final class Channelz {
return map.containsKey(id.getId());
}
@Immutable
public static final class ServerStats {
public final long callsStarted;
public final long callsSucceeded;
public final long callsFailed;
public final long lastCallStartedMillis;
/**
* Creates an instance.
*/
public ServerStats(
long callsStarted,
long callsSucceeded,
long callsFailed,
long lastCallStartedMillis) {
this.callsStarted = callsStarted;
this.callsSucceeded = callsSucceeded;
this.callsFailed = callsFailed;
this.lastCallStartedMillis = lastCallStartedMillis;
}
public static final class Builder {
private String target;
private ConnectivityState state;
private long callsStarted;
private long callsSucceeded;
private long callsFailed;
private long lastCallStartedMillis;
public List<LogId> subchannels;
public Builder setCallsStarted(long callsStarted) {
this.callsStarted = callsStarted;
return this;
}
public Builder setCallsSucceeded(long callsSucceeded) {
this.callsSucceeded = callsSucceeded;
return this;
}
public Builder setCallsFailed(long callsFailed) {
this.callsFailed = callsFailed;
return this;
}
public Builder setLastCallStartedMillis(long lastCallStartedMillis) {
this.lastCallStartedMillis = lastCallStartedMillis;
return this;
}
/**
* Builds an instance.
*/
public ServerStats build() {
return new ServerStats(
callsStarted,
callsSucceeded,
callsFailed,
lastCallStartedMillis);
}
}
}
/**
* A data class to represent a channel's stats.
*/

View File

@ -55,6 +55,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private final byte[] messageAcceptEncoding;
private final DecompressorRegistry decompressorRegistry;
private final CompressorRegistry compressorRegistry;
private CallTracer serverCallTracer;
// state
private volatile boolean cancelled;
@ -65,13 +66,16 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
ServerCallImpl(ServerStream stream, MethodDescriptor<ReqT, RespT> method,
Metadata inboundHeaders, Context.CancellableContext context,
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry) {
DecompressorRegistry decompressorRegistry, CompressorRegistry compressorRegistry,
CallTracer serverCallTracer) {
this.stream = stream;
this.method = method;
this.context = context;
this.messageAcceptEncoding = inboundHeaders.get(MESSAGE_ACCEPT_ENCODING_KEY);
this.decompressorRegistry = decompressorRegistry;
this.compressorRegistry = compressorRegistry;
this.serverCallTracer = serverCallTracer;
this.serverCallTracer.reportCallStarted();
}
@Override
@ -166,14 +170,18 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
@Override
public void close(Status status, Metadata trailers) {
checkState(!closeCalled, "call already closed");
closeCalled = true;
try {
closeCalled = true;
if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE));
return;
if (status.isOk() && method.getType().serverSendsOneMessage() && !messageSent) {
internalClose(Status.INTERNAL.withDescription(MISSING_RESPONSE));
return;
}
stream.close(status, trailers);
} finally {
serverCallTracer.reportCallEnded(status.isOk());
}
stream.close(status, trailers);
}
@Override
@ -208,6 +216,7 @@ final class ServerCallImpl<ReqT, RespT> extends ServerCall<ReqT, RespT> {
private void internalClose(Status internalError) {
log.log(Level.WARNING, "Cancelling the stream with status {0}", new Object[] {internalError});
stream.cancel(internalError);
serverCallTracer.reportCallEnded(internalError.isOk()); // error so always false
}
/**

View File

@ -26,6 +26,8 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
@ -41,6 +43,7 @@ import io.grpc.ServerMethodDefinition;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import io.grpc.Status;
import io.grpc.internal.Channelz.ServerStats;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@ -70,7 +73,7 @@ import javax.annotation.concurrent.GuardedBy;
* <p>Starting the server starts the underlying transport for servicing requests. Stopping the
* server stops servicing new requests and waits for all connections to terminate.
*/
public final class ServerImpl extends io.grpc.Server implements WithLogId {
public final class ServerImpl extends io.grpc.Server implements Instrumented<ServerStats> {
private static final Logger log = Logger.getLogger(ServerImpl.class.getName());
private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
@ -106,6 +109,9 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
private final CompressorRegistry compressorRegistry;
private final BinaryLogProvider binlogProvider;
private final Channelz channelz;
private final CallTracer serverCallTracer;
/**
* Construct a server.
*
@ -133,6 +139,10 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
builder.interceptors.toArray(new ServerInterceptor[builder.interceptors.size()]);
this.handshakeTimeoutMillis = builder.handshakeTimeoutMillis;
this.binlogProvider = builder.binlogProvider;
this.channelz = builder.channelz;
this.serverCallTracer = builder.callTracerFactory.create();
channelz.addServer(this);
}
/**
@ -304,6 +314,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
}
// TODO(carl-mastrangelo): move this outside the synchronized block.
lock.notifyAll();
channelz.removeServer(this);
}
}
}
@ -538,7 +549,8 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
headers,
context,
decompressorRegistry,
compressorRegistry);
compressorRegistry,
serverCallTracer);
ServerCall.Listener<WReqT> listener =
methodDef.getServerCallHandler().startCall(call, headers);
@ -555,6 +567,15 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
return logId;
}
@Override
public ListenableFuture<ServerStats> getStats() {
ServerStats.Builder builder = new ServerStats.Builder();
serverCallTracer.updateBuilder(builder);
SettableFuture<ServerStats> ret = SettableFuture.create();
ret.set(builder.build());
return ret;
}
private static final class NoopListener implements ServerStreamListener {
@Override
public void messagesAvailable(MessageProducer producer) {

View File

@ -40,6 +40,8 @@ import io.grpc.MethodDescriptor.Marshaller;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.internal.Channelz.ServerStats;
import io.grpc.internal.Channelz.ServerStats.Builder;
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
import io.grpc.internal.testing.SingleMessageProducer;
import java.io.ByteArrayInputStream;
@ -61,6 +63,7 @@ public class ServerCallImplTest {
@Mock private ServerStream stream;
@Mock private ServerCall.Listener<Long> callListener;
private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create();
private ServerCallImpl<Long, Long> call;
private Context.CancellableContext context;
@ -87,7 +90,47 @@ public class ServerCallImplTest {
MockitoAnnotations.initMocks(this);
context = Context.ROOT.withCancellation();
call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance());
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
serverCallTracer);
}
@Test
public void callTracer_success() {
callTracer0(Status.OK);
}
@Test
public void callTracer_failure() {
callTracer0(Status.UNKNOWN);
}
private void callTracer0(Status status) {
CallTracer tracer = CallTracer.getDefaultFactory().create();
Builder beforeBuilder = new Builder();
tracer.updateBuilder(beforeBuilder);
ServerStats before = beforeBuilder.build();
assertEquals(0, before.callsStarted);
assertEquals(0, before.lastCallStartedMillis);
call = new ServerCallImpl<Long, Long>(stream, UNARY_METHOD, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
tracer);
// required boilerplate
call.sendHeaders(new Metadata());
call.sendMessage(123L);
// end: required boilerplate
call.close(status, new Metadata());
Builder afterBuilder = new Builder();
tracer.updateBuilder(afterBuilder);
ServerStats after = afterBuilder.build();
assertEquals(1, after.callsStarted);
if (status.isOk()) {
assertEquals(1, after.callsSucceeded);
} else {
assertEquals(1, after.callsFailed);
}
}
@Test
@ -181,7 +224,8 @@ public class ServerCallImplTest {
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance());
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
verify(stream, times(1)).writeMessage(any(InputStream.class));
@ -215,7 +259,8 @@ public class ServerCallImplTest {
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance());
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCall.sendHeaders(new Metadata());
serverCall.sendMessage(1L);
serverCall.sendMessage(1L);
@ -251,7 +296,8 @@ public class ServerCallImplTest {
requestHeaders,
context,
DecompressorRegistry.getDefaultInstance(),
CompressorRegistry.getDefaultInstance());
CompressorRegistry.getDefaultInstance(),
serverCallTracer);
serverCall.close(Status.OK, new Metadata());
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream, times(1)).cancel(statusCaptor.capture());

View File

@ -139,6 +139,8 @@ public class ServerImplTest {
private final FakeClock executor = new FakeClock();
private final FakeClock timer = new FakeClock();
private final Channelz channelz = new Channelz();
@Mock
private ServerStreamTracer.Factory streamTracerFactory;
private List<ServerStreamTracer.Factory> streamTracerFactories;
@ -188,6 +190,7 @@ public class ServerImplTest {
@Before
public void startUp() throws IOException {
MockitoAnnotations.initMocks(this);
builder.channelz = channelz;
streamTracerFactories = Arrays.asList(streamTracerFactory);
when(executorPool.getObject()).thenReturn(executor.getScheduledExecutorService());
when(streamTracerFactory.newServerStreamTracer(anyString(), any(Metadata.class)))
@ -1361,6 +1364,22 @@ public class ServerImplTest {
userInterceptor.interceptedMethods.get(0).getResponseMarshaller());
}
@Test
public void channelz_membership() throws Exception {
createServer();
assertTrue(builder.channelz.containsServer(server.getLogId()));
server.shutdownNow().awaitTermination();
assertFalse(builder.channelz.containsServer(server.getLogId()));
}
@Test
public void channelz_serverStats() throws Exception {
createAndStartServer();
assertEquals(0, server.getStats().get().callsSucceeded);
basicExchangeHelper(METHOD, "Lots of pizza, please", 314, null);
assertEquals(1, server.getStats().get().callsSucceeded);
}
private void createAndStartServer() throws IOException {
createServer();
server.start();