core: name more anonymous classes in ServerImpl

This makes it much easier to read stack traces since the name of the
event is in the frame.
This commit is contained in:
Carl Mastrangelo 2017-09-01 15:32:07 -07:00 committed by GitHub
parent b579528c89
commit aaebf6e967
1 changed files with 86 additions and 49 deletions

View File

@ -302,7 +302,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
} }
} }
private class ServerListenerImpl implements ServerListener { private final class ServerListenerImpl implements ServerListener {
@Override @Override
public ServerTransportListener transportCreated(ServerTransport transport) { public ServerTransportListener transportCreated(ServerTransport transport) {
synchronized (lock) { synchronized (lock) {
@ -336,11 +336,11 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
} }
} }
private class ServerTransportListenerImpl implements ServerTransportListener { private final class ServerTransportListenerImpl implements ServerTransportListener {
private final ServerTransport transport; private final ServerTransport transport;
private Attributes attributes; private Attributes attributes;
public ServerTransportListenerImpl(ServerTransport transport) { ServerTransportListenerImpl(ServerTransport transport) {
this.transport = transport; this.transport = transport;
} }
@ -398,41 +398,49 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks // Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
// are delivered, including any errors. Callbacks can still be triggered, but they will be // are delivered, including any errors. Callbacks can still be triggered, but they will be
// queued. // queued.
wrappedExecutor.execute(new ContextRunnable(context) {
@Override final class StreamCreated extends ContextRunnable {
public void runInContext() {
ServerStreamListener listener = NOOP_LISTENER; StreamCreated() {
try { super(context);
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName); }
if (method == null) {
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority()); @Override
} public void runInContext() {
if (method == null) { ServerStreamListener listener = NOOP_LISTENER;
Status status = Status.UNIMPLEMENTED.withDescription( try {
"Method not found: " + methodName); ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in if (method == null) {
// memory as a map whose key is the method name, this would allow a misbehaving method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
stream.close(status, new Metadata());
context.cancel(null);
return;
}
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
} catch (RuntimeException e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
throw e;
} catch (Error e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
throw e;
} finally {
jumpListener.setListener(listener);
} }
if (method == null) {
Status status = Status.UNIMPLEMENTED.withDescription(
"Method not found: " + methodName);
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
// memory as a map whose key is the method name, this would allow a misbehaving
// client to blow up the server in-memory stats storage by sending large number of
// distinct unimplemented method
// names. (https://github.com/grpc/grpc-java/issues/2285)
stream.close(status, new Metadata());
context.cancel(null);
return;
}
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx);
} catch (RuntimeException e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
throw e;
} catch (Error e) {
stream.close(Status.fromThrowable(e), new Metadata());
context.cancel(null);
throw e;
} finally {
jumpListener.setListener(listener);
} }
}); }
}
wrappedExecutor.execute(new StreamCreated());
} }
private Context.CancellableContext createContext( private Context.CancellableContext createContext(
@ -447,7 +455,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
Context.CancellableContext context = baseContext.withDeadlineAfter( Context.CancellableContext context = baseContext.withDeadlineAfter(
timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService()); timeoutNanos, NANOSECONDS, transport.getScheduledExecutorService());
context.addListener(new Context.CancellationListener() { final class ServerStreamCancellationListener implements Context.CancellationListener {
@Override @Override
public void cancelled(Context context) { public void cancelled(Context context) {
Status status = statusFromCancelled(context); Status status = statusFromCancelled(context);
@ -457,7 +465,9 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
stream.cancel(status); stream.cancel(status);
} }
} }
}, directExecutor()); }
context.addListener(new ServerStreamCancellationListener(), directExecutor());
return context; return context;
} }
@ -489,7 +499,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
return logId; return logId;
} }
private static class NoopListener implements ServerStreamListener { private static final class NoopListener implements ServerStreamListener {
@Override @Override
public void messagesAvailable(MessageProducer producer) { public void messagesAvailable(MessageProducer producer) {
InputStream message; InputStream message;
@ -526,7 +536,7 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
* exceptions. * exceptions.
*/ */
@VisibleForTesting @VisibleForTesting
static class JumpToApplicationThreadServerStreamListener implements ServerStreamListener { static final class JumpToApplicationThreadServerStreamListener implements ServerStreamListener {
private final Executor callExecutor; private final Executor callExecutor;
private final Executor cancelExecutor; private final Executor cancelExecutor;
private final Context.CancellableContext context; private final Context.CancellableContext context;
@ -569,7 +579,13 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
@Override @Override
public void messagesAvailable(final MessageProducer producer) { public void messagesAvailable(final MessageProducer producer) {
callExecutor.execute(new ContextRunnable(context) {
final class MessagesAvailable extends ContextRunnable {
MessagesAvailable() {
super(context);
}
@Override @Override
public void runInContext() { public void runInContext() {
try { try {
@ -582,12 +598,18 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
throw e; throw e;
} }
} }
}); }
callExecutor.execute(new MessagesAvailable());
} }
@Override @Override
public void halfClosed() { public void halfClosed() {
callExecutor.execute(new ContextRunnable(context) { final class HalfClosed extends ContextRunnable {
HalfClosed() {
super(context);
}
@Override @Override
public void runInContext() { public void runInContext() {
try { try {
@ -600,7 +622,9 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
throw e; throw e;
} }
} }
}); }
callExecutor.execute(new HalfClosed());
} }
@Override @Override
@ -612,17 +636,28 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
// is not serializing. // is not serializing.
cancelExecutor.execute(new ContextCloser(context, status.getCause())); cancelExecutor.execute(new ContextCloser(context, status.getCause()));
} }
callExecutor.execute(new ContextRunnable(context) {
final class Closed extends ContextRunnable {
Closed() {
super(context);
}
@Override @Override
public void runInContext() { public void runInContext() {
getListener().closed(status); getListener().closed(status);
} }
}); }
callExecutor.execute(new Closed());
} }
@Override @Override
public void onReady() { public void onReady() {
callExecutor.execute(new ContextRunnable(context) { final class OnReady extends ContextRunnable {
OnReady() {
super(context);
}
@Override @Override
public void runInContext() { public void runInContext() {
try { try {
@ -635,12 +670,14 @@ public final class ServerImpl extends io.grpc.Server implements WithLogId {
throw e; throw e;
} }
} }
}); }
callExecutor.execute(new OnReady());
} }
} }
@VisibleForTesting @VisibleForTesting
static class ContextCloser implements Runnable { static final class ContextCloser implements Runnable {
private final Context.CancellableContext context; private final Context.CancellableContext context;
private final Throwable cause; private final Throwable cause;