Fix TODO attribution

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=84172357
This commit is contained in:
ejona 2015-01-16 16:17:35 -08:00 committed by Eric Anderson
parent 080e33effd
commit 4de2026492
27 changed files with 40 additions and 40 deletions

View File

@ -64,7 +64,7 @@ public class OAuth2ChannelInterceptor implements ClientInterceptor {
@Override @Override
public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, public <ReqT, RespT> Call<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
Channel next) { Channel next) {
// TODO(user): If the call fails for Auth reasons, this does not properly propagate info that // TODO(ejona): If the call fails for Auth reasons, this does not properly propagate info that
// would be in WWW-Authenticate, because it does not yet have access to the header. // would be in WWW-Authenticate, because it does not yet have access to the header.
return new ForwardingCall<ReqT, RespT>(next.newCall(method)) { return new ForwardingCall<ReqT, RespT>(next.newCall(method)) {
@Override @Override

View File

@ -337,7 +337,7 @@ static void PrintStub(const google::protobuf::ServiceDescriptor* service,
p->Print("public "); p->Print("public ");
switch (call_type) { switch (call_type) {
case BLOCKING_CALL: case BLOCKING_CALL:
// TODO(user): decide the blocking server interface // TODO(zhangkun): decide the blocking server interface
CHECK(type != BLOCKING_SERVER_INTERFACE) CHECK(type != BLOCKING_SERVER_INTERFACE)
<< "Blocking server interface is not available"; << "Blocking server interface is not available";
CHECK(!client_streaming) CHECK(!client_streaming)

View File

@ -94,7 +94,7 @@ public abstract class Call<RequestT, ResponseT> {
* @param headers which can contain extra information like authentication. * @param headers which can contain extra information like authentication.
* @throws IllegalStateException if call is already started * @throws IllegalStateException if call is already started
*/ */
// TODO(user): Might be better to put into Channel#newCall, might reduce decoration burden // TODO(lryan): Might be better to put into Channel#newCall, might reduce decoration burden
public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers); public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers);
/** /**

View File

@ -44,6 +44,6 @@ public interface Channel {
/** /**
* Create a call to the given service method. * Create a call to the given service method.
*/ */
// TODO(user): perform start() as part of new Call creation? // TODO(ejona): perform start() as part of new Call creation?
public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method); public <ReqT, RespT> Call<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method);
} }

View File

@ -94,7 +94,7 @@ public final class ChannelImpl implements Channel {
} }
/** Hack to allow executors to auto-shutdown. Not for general use. */ /** Hack to allow executors to auto-shutdown. Not for general use. */
// TODO(user): Replace with a real API. // TODO(ejona): Replace with a real API.
void setTerminationRunnable(Runnable runnable) { void setTerminationRunnable(Runnable runnable) {
this.terminationRunnable = runnable; this.terminationRunnable = runnable;
} }
@ -125,7 +125,7 @@ public final class ChannelImpl implements Channel {
* *
* <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown(). * <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
*/ */
// TODO(user): cancel preexisting calls. // TODO(ejona): cancel preexisting calls.
public synchronized ChannelImpl shutdownNow() { public synchronized ChannelImpl shutdownNow() {
shutdown(); shutdown();
return this; return this;
@ -263,7 +263,7 @@ public final class ChannelImpl implements Channel {
stream = transport.newStream(method, headers, listener); stream = transport.newStream(method, headers, listener);
} catch (IllegalStateException ex) { } catch (IllegalStateException ex) {
// We can race with the transport and end up trying to use a terminated transport. // We can race with the transport and end up trying to use a terminated transport.
// TODO(user): Improve the API to remove the possibility of the race. // TODO(ejona): Improve the API to remove the possibility of the race.
stream = new NoopClientStream(); stream = new NoopClientStream();
listener.closed(Status.fromThrowable(ex), new Metadata.Trailers()); listener.closed(Status.fromThrowable(ex), new Metadata.Trailers());
return; return;

View File

@ -41,7 +41,7 @@ public interface Marshaller<T> {
/** /**
* Given a message produce an {@link InputStream} for it. * Given a message produce an {@link InputStream} for it.
*/ */
// TODO(user): Switch to ByteSource equivalent when ready // TODO(lryan): Switch to ByteSource equivalent when ready
public InputStream stream(T value); public InputStream stream(T value);
/** /**

View File

@ -104,7 +104,7 @@ public abstract class Metadata {
/** /**
* Constructor called by the transport layer when it receives binary metadata. * Constructor called by the transport layer when it receives binary metadata.
*/ */
// TODO(user): Convert to use ByteString so we can cache transformations // TODO(lryan): Convert to use ByteString so we can cache transformations
private Metadata(byte[]... binaryValues) { private Metadata(byte[]... binaryValues) {
store = LinkedListMultimap.create(); store = LinkedListMultimap.create();
for (int i = 0; i < binaryValues.length; i++) { for (int i = 0; i < binaryValues.length; i++) {

View File

@ -49,7 +49,7 @@ import javax.annotation.concurrent.GuardedBy;
* @author JJ Furman * @author JJ Furman
*/ */
// TODO(user): figure out a way to not expose it or move it to transport package. // TODO(simonma): figure out a way to not expose it or move it to transport package.
public final class SerializingExecutor implements Executor { public final class SerializingExecutor implements Executor {
private static final Logger log = private static final Logger log =
Logger.getLogger(SerializingExecutor.class.getName()); Logger.getLogger(SerializingExecutor.class.getName());

View File

@ -59,7 +59,7 @@ public abstract class ServerCall<ResponseT> {
* <p>Implementations are free to block for extended periods of time. Implementations are not * <p>Implementations are free to block for extended periods of time. Implementations are not
* required to be thread-safe. * required to be thread-safe.
*/ */
// TODO(user): We need to decide what to do in the case of server closing with non-cancellation // TODO(ejona): We need to decide what to do in the case of server closing with non-cancellation
// before client half closes. It may be that we treat such a case as an error. If we permit such // before client half closes. It may be that we treat such a case as an error. If we permit such
// a case then we either get to generate a half close or purposefully omit it. // a case then we either get to generate a half close or purposefully omit it.
public abstract static class Listener<RequestT> { public abstract static class Listener<RequestT> {

View File

@ -221,7 +221,7 @@ public class ServerImpl extends AbstractService implements Server {
@Override @Override
public void failed(Service.State from, Throwable failure) { public void failed(Service.State from, Throwable failure) {
// TODO(user): Ideally we would want to force-stop transports before notifying application of // TODO(ejona): Ideally we would want to force-stop transports before notifying application of
// failure, but that would cause us to have an unrepresentative state since we would be // failure, but that would cause us to have an unrepresentative state since we would be
// RUNNING but not accepting connections. // RUNNING but not accepting connections.
notifyFailed(failure); notifyFailed(failure);
@ -285,7 +285,7 @@ public class ServerImpl extends AbstractService implements Server {
/** Never returns {@code null}. */ /** Never returns {@code null}. */
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName, private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata.Headers headers) { ServerMethodDefinition<ReqT, RespT> methodDef, Metadata.Headers headers) {
// TODO(user): should we update fullMethodName to have the canonical path of the method? // TODO(ejona): should we update fullMethodName to have the canonical path of the method?
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(stream, methodDef); final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<ReqT, RespT>(stream, methodDef);
ServerCall.Listener<ReqT> listener ServerCall.Listener<ReqT> listener
= methodDef.getServerCallHandler().startCall(fullMethodName, call, headers); = methodDef.getServerCallHandler().startCall(fullMethodName, call, headers);
@ -344,7 +344,7 @@ public class ServerImpl extends AbstractService implements Server {
* Like {@link ServerCall#close(Status, Metadata.Trailers)}, but thread-safe for internal use. * Like {@link ServerCall#close(Status, Metadata.Trailers)}, but thread-safe for internal use.
*/ */
private void internalClose(Status status, Metadata.Trailers trailers) { private void internalClose(Status status, Metadata.Trailers trailers) {
// TODO(user): this is not thread-safe :) // TODO(ejona): this is not thread-safe :)
stream.close(status, trailers); stream.close(status, trailers);
} }

View File

@ -128,7 +128,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
frame.close(); frame.close();
return; return;
} }
// TODO(user): It sounds sub-optimal to deframe in the network thread. That means // TODO(zhangkun): It sounds sub-optimal to deframe in the network thread. That means
// decompression is serialized. // decompression is serialized.
deframe(frame, endOfStream); deframe(frame, endOfStream);
} }
@ -213,7 +213,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* about stream closure and send the status * about stream closure and send the status
*/ */
public final void abortStream(Status status, boolean notifyClient) { public final void abortStream(Status status, boolean notifyClient) {
// TODO(user): Investigate whether we can remove the notification to the client // TODO(lryan): Investigate whether we can remove the notification to the client
// and rely on a transport layer stream reset instead. // and rely on a transport layer stream reset instead.
Preconditions.checkArgument(!status.isOk(), "status must not be OK"); Preconditions.checkArgument(!status.isOk(), "status must not be OK");
if (!listenerClosed) { if (!listenerClosed) {
@ -221,7 +221,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
listener.closed(status); listener.closed(status);
} }
if (notifyClient) { if (notifyClient) {
// TODO(user): Remove // TODO(lryan): Remove
if (stashedTrailers == null) { if (stashedTrailers == null) {
stashedTrailers = new Metadata.Trailers(); stashedTrailers = new Metadata.Trailers();
} }

View File

@ -147,7 +147,7 @@ public abstract class AbstractStream<IdT> implements Stream {
framer.writePayload(message, length); framer.writePayload(message, length);
} }
// TODO(user): add flow control. // TODO(nathanmittler): add flow control.
if (accepted != null) { if (accepted != null) {
accepted.run(); accepted.run();
} }

View File

@ -47,7 +47,7 @@ public interface ClientTransport extends Service {
/** /**
* Creates a new stream for sending messages to the remote end-point. If the service is already * Creates a new stream for sending messages to the remote end-point. If the service is already
* stopped, throws an {@link IllegalStateException}. * stopped, throws an {@link IllegalStateException}.
* TODO(user): Consider also throwing for stopping. * TODO(nathanmittler): Consider also throwing for stopping.
* <p> * <p>
* This method returns immediately and does not wait for any validation of the request. If * This method returns immediately and does not wait for any validation of the request. If
* creation fails for any reason, {@link ClientStreamListener#closed} will be called to provide * creation fails for any reason, {@link ClientStreamListener#closed} will be called to provide

View File

@ -194,7 +194,7 @@ public class MessageFramer {
* closed or disposed, additional calls to this method will have no affect. * closed or disposed, additional calls to this method will have no affect.
*/ */
public void dispose() { public void dispose() {
// TODO(user): Returning buffer to a pool would go here // TODO(lryan): Returning buffer to a pool would go here
bytebuf = null; bytebuf = null;
} }

View File

@ -55,7 +55,7 @@ public final class TransportFrameUtil {
private static final byte[] binaryHeaderSuffixBytes = private static final byte[] binaryHeaderSuffixBytes =
Metadata.BINARY_HEADER_SUFFIX.getBytes(US_ASCII); Metadata.BINARY_HEADER_SUFFIX.getBytes(US_ASCII);
// TODO(user): This needs proper namespacing support, this is currently just a hack // TODO(lryan): This needs proper namespacing support, this is currently just a hack
/** /**
* Converts the path from the HTTP request to the full qualified method name. * Converts the path from the HTTP request to the full qualified method name.
* *
@ -88,7 +88,7 @@ public final class TransportFrameUtil {
} else { } else {
// Non-binary header. // Non-binary header.
// Filter out headers that contain non-spec-compliant ASCII characters. // Filter out headers that contain non-spec-compliant ASCII characters.
// TODO(user): only do such check in development mode since it's expensive // TODO(zhangkun): only do such check in development mode since it's expensive
if (isSpecCompliantAscii(value)) { if (isSpecCompliantAscii(value)) {
result.add(key); result.add(key);
result.add(value); result.add(value);

View File

@ -217,7 +217,7 @@ public class MathClient {
* <p> The asynchronous usage is similar to {@link #divMany}. * <p> The asynchronous usage is similar to {@link #divMany}.
*/ */
public void blockingFib() { public void blockingFib() {
// TODO(user): Support "send until cancel". Currently, client application can not // TODO(simonma): Support "send until cancel". Currently, client application can not
// cancel a server streaming call. // cancel a server streaming call.
int limit = rand.nextInt(20) + 10; int limit = rand.nextInt(20) + 10;
logger.info("*** Blocking Fib, print the first " + limit + " fibonacci numbers."); logger.info("*** Blocking Fib, print the first " + limit + " fibonacci numbers.");

View File

@ -61,7 +61,7 @@ public class MathServer {
.addService(CalcGrpc.bindService(new CalcService())) .addService(CalcGrpc.bindService(new CalcService()))
.buildAndWaitForRunning(); .buildAndWaitForRunning();
logger.info("Server started, listening on " + port); logger.info("Server started, listening on " + port);
// TODO(user): gRPC server should register JVM shutdown hook to shutdown itself, remove this // TODO(simonma): gRPC server should register JVM shutdown hook to shutdown itself, remove this
// after we support that. // after we support that.
Runtime.getRuntime().addShutdownHook(new Thread() { Runtime.getRuntime().addShutdownHook(new Thread() {
@Override @Override
@ -131,7 +131,7 @@ public class MathServer {
public void fib(FibArgs request, StreamObserver<Num> responseObserver) { public void fib(FibArgs request, StreamObserver<Num> responseObserver) {
int limit = (int) request.getLimit(); int limit = (int) request.getLimit();
if (limit <= 0) { if (limit <= 0) {
// TODO(user): Support "send until cancel". Currently, client application can not // TODO(simonma): Support "send until cancel". Currently, client application can not
// cancel a server streaming call. // cancel a server streaming call.
return; return;
} }

View File

@ -93,7 +93,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService {
boolean compressable = compressableResponse(req.getResponseType()); boolean compressable = compressableResponse(req.getResponseType());
ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer; ByteString dataBuffer = compressable ? compressableBuffer : uncompressableBuffer;
// For consistency with the c++ TestServiceImpl, use a random offset for unary calls. // For consistency with the c++ TestServiceImpl, use a random offset for unary calls.
// TODO(user): whether or not this is a good approach needs further discussion. // TODO(wonderfly): whether or not this is a good approach needs further discussion.
int offset = random.nextInt( int offset = random.nextInt(
compressable ? compressableBuffer.size() : uncompressableBuffer.size()); compressable ? compressableBuffer.size() : uncompressableBuffer.size());
ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize()); ByteString payload = generatePayload(dataBuffer, offset, req.getResponseSize());

View File

@ -178,7 +178,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
*/ */
private void onRstStreamRead(int streamId) private void onRstStreamRead(int streamId)
throws Http2Exception { throws Http2Exception {
// TODO(user): do something with errorCode? // TODO(nathanmittler): do something with errorCode?
Http2Stream http2Stream = connection().requireStream(streamId); Http2Stream http2Stream = connection().requireStream(streamId);
NettyClientStream stream = clientStream(http2Stream); NettyClientStream stream = clientStream(http2Stream);
stream.transportReportStatus(Status.UNKNOWN, false, new Metadata.Trailers()); stream.transportReportStatus(Status.UNKNOWN, false, new Metadata.Trailers());
@ -318,7 +318,7 @@ class NettyClientHandler extends Http2ConnectionHandler {
if (streamId <= 0) { if (streamId <= 0) {
// The HTTP/2 connection has exhausted its stream IDs. Permanently fail all stream creation // The HTTP/2 connection has exhausted its stream IDs. Permanently fail all stream creation
// attempts for this transport. // attempts for this transport.
// TODO(user): send GO_AWAY? // TODO(nathanmittler): send GO_AWAY?
failPendingStreams(goAwayStatus); failPendingStreams(goAwayStatus);
return; return;
} }

View File

@ -127,7 +127,7 @@ class NettyClientTransport extends AbstractClientTransport {
throw new RuntimeException(ex); throw new RuntimeException(ex);
} }
} }
// TODO(user): specify allocator. The method currently ignores it though. // TODO(ejona): specify allocator. The method currently ignores it though.
SSLEngine sslEngine SSLEngine sslEngine
= sslContext.newEngine(null, inetAddress.getHostString(), inetAddress.getPort()); = sslContext.newEngine(null, inetAddress.getHostString(), inetAddress.getPort());
SSLParameters sslParams = new SSLParameters(); SSLParameters sslParams = new SSLParameters();

View File

@ -80,7 +80,7 @@ public class NettyServer extends AbstractService {
public void initChannel(Channel ch) throws Exception { public void initChannel(Channel ch) throws Exception {
NettyServerTransport transport = new NettyServerTransport(ch, serverListener, sslContext); NettyServerTransport transport = new NettyServerTransport(ch, serverListener, sslContext);
transport.startAsync(); transport.startAsync();
// TODO(user): Should we wait for transport shutdown before shutting down server? // TODO(nathanmittler): Should we wait for transport shutdown before shutting down server?
} }
}; };
this.bossGroup = bossGroup; this.bossGroup = bossGroup;

View File

@ -88,7 +88,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
private NettyClientHandler handler; private NettyClientHandler handler;
// TODO(user): mocking concrete classes is not safe. Consider making NettyClientStream an // TODO(zhangkun): mocking concrete classes is not safe. Consider making NettyClientStream an
// interface. // interface.
@Mock @Mock
private NettyClientStream stream; private NettyClientStream stream;

View File

@ -132,7 +132,7 @@ class OkHttpClientStream extends Http2ClientStream {
Preconditions.checkState(id() != 0, "streamId should be set"); Preconditions.checkState(id() != 0, "streamId should be set");
okio.Buffer buffer = new okio.Buffer(); okio.Buffer buffer = new okio.Buffer();
// Read the data into a buffer. // Read the data into a buffer.
// TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it // TODO(simonma): swap to NIO buffers or zero-copy if/when okhttp/okio supports it
buffer.write(frame.array(), frame.arrayOffset(), frame.remaining()); buffer.write(frame.array(), frame.arrayOffset(), frame.remaining());
// Write the data to the remote endpoint. // Write the data to the remote endpoint.
// Per http2 SPEC, the max data length should be larger than 64K, while our frame size is // Per http2 SPEC, the max data length should be larger than 64K, while our frame size is

View File

@ -448,13 +448,13 @@ public class OkHttpClientTransport extends AbstractClientTransport {
@Override @Override
public void priority(int streamId, int streamDependency, int weight, boolean exclusive) { public void priority(int streamId, int streamDependency, int weight, boolean exclusive) {
// Ignore priority change. // Ignore priority change.
// TODO(user): log // TODO(simonma): log
} }
@Override @Override
public void alternateService(int streamId, String origin, ByteString protocol, String host, public void alternateService(int streamId, String origin, ByteString protocol, String host,
int port, long maxAge) { int port, long maxAge) {
// TODO(user): Deal with alternateService propagation // TODO(simonma): Deal with alternateService propagation
} }
} }

View File

@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
* @param <S> the concrete type of this stub. * @param <S> the concrete type of this stub.
* @param <C> the service descriptor type * @param <C> the service descriptor type
*/ */
// TODO(user): Move into 3rd party when tidy // TODO(lryan): Move into 3rd party when tidy
// TODO(lryan/kevinb): Excessive parameterization can be a pain, try to eliminate once the generated // TODO(lryan/kevinb): Excessive parameterization can be a pain, try to eliminate once the generated
// code is more tangible. // code is more tangible.
public abstract class AbstractStub<S extends AbstractStub<?, ?>, public abstract class AbstractStub<S extends AbstractStub<?, ?>,

View File

@ -65,7 +65,7 @@ public class Calls {
*/ */
public static <RequestT, ResponseT> MethodDescriptor<RequestT, ResponseT> createMethodDescriptor( public static <RequestT, ResponseT> MethodDescriptor<RequestT, ResponseT> createMethodDescriptor(
String fullServiceName, Method<RequestT, ResponseT> method) { String fullServiceName, Method<RequestT, ResponseT> method) {
// TODO(user): if timeout is not defined in proto file, use a default timeout here. // TODO(zhangkun): if timeout is not defined in proto file, use a default timeout here.
// If timeout is defined in proto file, Method should carry the timeout. // If timeout is defined in proto file, Method should carry the timeout.
return MethodDescriptor.create(method.getType(), fullServiceName + "/" + method.getName(), return MethodDescriptor.create(method.getType(), fullServiceName + "/" + method.getName(),
1, TimeUnit.SECONDS, method.getRequestMarshaller(), method.getResponseMarshaller()); 1, TimeUnit.SECONDS, method.getRequestMarshaller(), method.getResponseMarshaller());
@ -144,7 +144,7 @@ public class Calls {
* response stream. * response stream.
* @return an iterator over the response stream. * @return an iterator over the response stream.
*/ */
// TODO(user): Not clear if we want to use this idiom for 'simple' stubs. // TODO(lryan): Not clear if we want to use this idiom for 'simple' stubs.
public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall( public static <ReqT, RespT> Iterator<RespT> blockingServerStreamingCall(
Call<ReqT, RespT> call, ReqT param) { Call<ReqT, RespT> call, ReqT param) {
BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>(); BlockingResponseStream<RespT> result = new BlockingResponseStream<RespT>();
@ -238,7 +238,7 @@ public class Calls {
@Override @Override
public void onError(Throwable t) { public void onError(Throwable t) {
// TODO(user): log? // TODO(ejona): log?
call.cancel(); call.cancel();
} }
@ -325,7 +325,7 @@ public class Calls {
* <p>The class is not thread-safe, but it does permit Call.Listener calls in a separate thread * <p>The class is not thread-safe, but it does permit Call.Listener calls in a separate thread
* from Iterator calls. * from Iterator calls.
*/ */
// TODO(user): determine how to allow Call.cancel() in case of application error. // TODO(ejona): determine how to allow Call.cancel() in case of application error.
private static class BlockingResponseStream<T> implements Iterator<T> { private static class BlockingResponseStream<T> implements Iterator<T> {
private final LinkedBlockingQueue<Object> buffer = new LinkedBlockingQueue<Object>(); private final LinkedBlockingQueue<Object> buffer = new LinkedBlockingQueue<Object>();
private final Call.Listener<T> listener = new QueuingListener(); private final Call.Listener<T> listener = new QueuingListener();

View File

@ -37,7 +37,7 @@ package com.google.net.stubby.stub;
* <p>Implementations are expected to be thread-compatible. Separate StreamObservers do not need to * <p>Implementations are expected to be thread-compatible. Separate StreamObservers do not need to
* be sychronized together; incoming and outgoing directions are independent. * be sychronized together; incoming and outgoing directions are independent.
*/ */
// TODO(user): Consider whether we need to interact with flow-control at this layer. E.g. // TODO(lryan): Consider whether we need to interact with flow-control at this layer. E.g.
// public ListenableFuture<Void> onValue(V value). Do we layer it in here or as an additional // public ListenableFuture<Void> onValue(V value). Do we layer it in here or as an additional
// interface? Interaction with flow control can be done by blocking here. // interface? Interaction with flow control can be done by blocking here.
public interface StreamObserver<V> { public interface StreamObserver<V> {