diff --git a/core/src/main/java/com/google/net/stubby/AbstractOperation.java b/core/src/main/java/com/google/net/stubby/AbstractOperation.java index e45fa186d8..31ef6815f8 100644 --- a/core/src/main/java/com/google/net/stubby/AbstractOperation.java +++ b/core/src/main/java/com/google/net/stubby/AbstractOperation.java @@ -56,20 +56,6 @@ public abstract class AbstractOperation implements Operation { return this; } - @Override - public Operation addContext(String type, InputStream message, Phase nextPhase) { - if (getPhase() == Phase.CLOSED) { - throw new RuntimeException("addContext called after operation closed"); - } - if (phase == Phase.PAYLOAD) { - progressTo(Phase.FOOTERS); - } - if (phase == Phase.HEADERS || phase == Phase.FOOTERS) { - return progressTo(nextPhase); - } - throw new IllegalStateException("Cannot add context in phase " + phase.name()); - } - @Override public Operation addPayload(InputStream payload, Phase nextPhase) { if (getPhase() == Phase.CLOSED) { diff --git a/core/src/main/java/com/google/net/stubby/Call.java b/core/src/main/java/com/google/net/stubby/Call.java index 6be24a8500..424020b8bb 100644 --- a/core/src/main/java/com/google/net/stubby/Call.java +++ b/core/src/main/java/com/google/net/stubby/Call.java @@ -3,8 +3,6 @@ package com.google.net.stubby; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; -import java.io.InputStream; - import javax.annotation.Nullable; /** @@ -43,17 +41,6 @@ public abstract class Call { */ public abstract ListenableFuture onHeaders(Metadata.Headers headers); - /** - * A response context has been received. Any context messages will precede payload messages. - * - *

The {@code value} {@link InputStream} will be closed when the returned future completes. - * If no future is returned, the value will be closed immediately after returning from this - * method. - */ - @Nullable - @Deprecated - public abstract ListenableFuture onContext(String name, InputStream value); - /** * A response payload has been received. For streaming calls, there may be zero payload * messages. @@ -80,40 +67,6 @@ public abstract class Call { // TODO(user): Might be better to put into Channel#newCall, might reduce decoration burden public abstract void start(Listener responseListener, Metadata.Headers headers); - - /** - * Send a context message. Context messages are intended for side-channel information like - * statistics and authentication. - * - * @param name key identifier of context - * @param value context value bytes - * @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed, - * or after {@link #sendPayload} - */ - @Deprecated - public void sendContext(String name, InputStream value) { - sendContext(name, value, null); - } - - /** - * Send a context message. Context messages are intended for side-channel information like - * statistics and authentication. - * - *

If {@code accepted} is non-{@code null}, then the future will be completed when the flow - * control window is able to fully permit the context message. If the Call fails before being - * accepted, then the future will be cancelled. Callers concerned with unbounded buffering should - * wait until the future completes before sending more messages. - * - * @param name key identifier of context - * @param value context value bytes - * @param accepted notification for adhering to flow control, or {@code null} - * @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed, - * or after {@link #sendPayload} - */ - @Deprecated - public abstract void sendContext(String name, InputStream value, - @Nullable SettableFuture accepted); - /** * Prevent any further processing for this Call. No further messages may be sent or will be * received. The server is informed of cancellations, but may not stop processing the call. diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java index 41f0b22e40..b9b8c3e3ce 100644 --- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java +++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java @@ -167,17 +167,6 @@ public final class ChannelImpl extends AbstractService implements Channel { } } - @Override - public void sendContext(String name, InputStream value, SettableFuture accepted) { - Preconditions.checkState(stream != null, "Not started"); - if (accepted == null) { - stream.writeContext(name, value, available(value), null); - } else { - inProcessFutures.add(accepted); - stream.writeContext(name, value, available(value), new AcceptedRunnable(accepted)); - } - } - @Override public void sendPayload(ReqT payload, SettableFuture accepted) { Preconditions.checkState(stream != null, "Not started"); @@ -251,17 +240,6 @@ public final class ChannelImpl extends AbstractService implements Channel { }); } - @Override - public ListenableFuture contextRead(final String name, final InputStream value, - final int length) { - return dispatchCallable(new Callable>() { - @Override - public ListenableFuture call() { - return observer.onContext(name, value); - } - }); - } - @Override public ListenableFuture messageRead(final InputStream message, final int length) { return dispatchCallable(new Callable>() { diff --git a/core/src/main/java/com/google/net/stubby/GrpcFramingUtil.java b/core/src/main/java/com/google/net/stubby/GrpcFramingUtil.java index 33e19df670..03fab49a6d 100644 --- a/core/src/main/java/com/google/net/stubby/GrpcFramingUtil.java +++ b/core/src/main/java/com/google/net/stubby/GrpcFramingUtil.java @@ -26,8 +26,6 @@ public class GrpcFramingUtil { // Flags public static final byte PAYLOAD_FRAME = 0x0; - public static final byte CONTEXT_VALUE_FRAME = 0x1; - public static final byte CALL_HEADER_FRAME = 0x2; public static final byte STATUS_FRAME = 0x3; public static final byte FRAME_TYPE_MASK = 0x3; @@ -36,10 +34,6 @@ public class GrpcFramingUtil { */ public static final int FRAME_LENGTH = 4; - public static boolean isContextValueFrame(int flags) { - return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME; - } - public static boolean isPayloadFrame(byte flags) { return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME; } diff --git a/core/src/main/java/com/google/net/stubby/Operation.java b/core/src/main/java/com/google/net/stubby/Operation.java index 05e230beee..1bbfbca262 100644 --- a/core/src/main/java/com/google/net/stubby/Operation.java +++ b/core/src/main/java/com/google/net/stubby/Operation.java @@ -14,7 +14,7 @@ public interface Operation { public static enum Phase { /** - * Used to communicate key-value pairs that define common context for the operation but + * Used to communicate key-value pairs that define metadata for the call but * that are not strictly part of the interface. Provided prior to delivering any formal * parameters */ @@ -24,11 +24,11 @@ public interface Operation { */ PAYLOAD, /** - * Used to communicate key-value pairs that define common context for the call but + * Used to communicate key-value pairs that define metadata for the call but * that are not strictly part of the interface. Provided after all formal parameters have * been delivered. */ - FOOTERS, + TRAILERS, /** * Indicates that the operation is closed and will not accept further input. */ @@ -46,25 +46,6 @@ public interface Operation { */ public Phase getPhase(); - /** - * Add a key-value context value. - * Allowed when phase = HEADERS | FOOTERS. - * Valid next phases - * HEADERS -> PAYLOAD_FRAME | FOOTERS | CLOSED - * FOOTERS -> CLOSED - *

- * The {@link InputStream} message must be entirely consumed before this call returns. - * Implementations should not pass references to this stream across thread boundaries without - * taking a copy. - *

- * {@code payload.available()} must return the number of remaining bytes to be read. - * - * @return this object - */ - // TODO(user): Context is an incredibly general term. Consider having two signatures - // addHeader and addTrailer to follow HTTP nomenclature more closely. - public Operation addContext(String type, InputStream message, Phase nextPhase); - /** * Send a payload to the receiver, indicates that more may follow. * Allowed when phase = PAYLOAD_FRAME diff --git a/core/src/main/java/com/google/net/stubby/ProtocolConstants.java b/core/src/main/java/com/google/net/stubby/ProtocolConstants.java index 154ebe9c61..6383a0b6cc 100644 --- a/core/src/main/java/com/google/net/stubby/ProtocolConstants.java +++ b/core/src/main/java/com/google/net/stubby/ProtocolConstants.java @@ -17,7 +17,6 @@ public class ProtocolConstants { // Flags public static final int PAYLOAD_FRAME = 0x0; - public static final int CONTEXT_VALUE_FRAME = 0x1; public static final int RESPONSE_STATUS_FRAME = 0x2; public static final int RESERVED_FRAME = 0x3; public static final int FRAME_TYPE_MASK = 0x3; @@ -28,10 +27,6 @@ public class ProtocolConstants { */ public static final int FRAME_LENGTH = 4; - public static boolean isContextValueFrame(int flags) { - return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME; - } - public static boolean isPayloadFrame(byte flags) { return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME; } diff --git a/core/src/main/java/com/google/net/stubby/ServerCall.java b/core/src/main/java/com/google/net/stubby/ServerCall.java index 5e9183afc6..bb06c2c132 100644 --- a/core/src/main/java/com/google/net/stubby/ServerCall.java +++ b/core/src/main/java/com/google/net/stubby/ServerCall.java @@ -2,8 +2,6 @@ package com.google.net.stubby; import com.google.common.util.concurrent.ListenableFuture; -import java.io.InputStream; - import javax.annotation.Nullable; /** @@ -46,18 +44,6 @@ public abstract class ServerCall { @Nullable public abstract ListenableFuture headersRead(Metadata.Headers headers); - - /** - * A request context has been received. Any context messages will precede payload messages. - * - *

The {@code value} {@link InputStream} will be closed when the returned future completes. - * If no future is returned, the value will be closed immediately after returning from this - * method. - */ - @Nullable - @Deprecated - public abstract ListenableFuture onContext(String name, InputStream value); - /** * A request payload has been receiveed. For streaming calls, there may be zero payload * messages. @@ -100,17 +86,6 @@ public abstract class ServerCall { */ public abstract void close(Status status, Metadata.Trailers trailers); - /** - * Send a context message. Context messages are intended for side-channel information like - * statistics and authentication. - * - * @param name key identifier of context - * @param value context value bytes - * @throws IllegalStateException if call is {@link #close}d, or after {@link #sendPayload} - */ - @Deprecated - public abstract void sendContext(String name, InputStream value); - /** * Send a payload message. Payload messages are the primary form of communication associated with * RPCs. Multiple payload messages may exist for streaming calls. diff --git a/core/src/main/java/com/google/net/stubby/SessionClientStream.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java index 20b6922582..c794bad0d6 100644 --- a/core/src/main/java/com/google/net/stubby/SessionClientStream.java +++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java @@ -65,14 +65,6 @@ public class SessionClientStream implements ClientStream { request.close(Status.OK); } - @Override - public void writeContext(String name, InputStream value, int length, Runnable accepted) { - request.addContext(name, value, Operation.Phase.HEADERS); - if (accepted != null) { - accepted.run(); - } - } - @Override public void writeMessage(InputStream message, int length, Runnable accepted) { request.addPayload(message, Operation.Phase.PAYLOAD); @@ -111,18 +103,6 @@ public class SessionClientStream implements ClientStream { } } - @Override - public Operation addContext(String type, InputStream message, Phase nextPhase) { - try { - listener.contextRead(type, message, available(message)); - return super.addContext(type, message, nextPhase); - } finally { - if (getPhase() == Phase.CLOSED) { - propagateClosed(); - } - } - } - @Override public Operation addPayload(InputStream payload, Phase nextPhase) { try { diff --git a/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java b/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java index fd7ab6d618..07b821adb7 100644 --- a/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java +++ b/core/src/main/java/com/google/net/stubby/context/ForwardingChannel.java @@ -7,8 +7,6 @@ import com.google.net.stubby.Channel; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; -import java.io.InputStream; - import javax.annotation.Nullable; /** @@ -50,11 +48,6 @@ public abstract class ForwardingChannel implements Channel { this.delegate.halfClose(); } - @Override - public void sendContext(String name, InputStream value, @Nullable SettableFuture accepted) { - this.delegate.sendContext(name, value, accepted); - } - @Override public void sendPayload(RequestT payload, @Nullable SettableFuture accepted) { this.delegate.sendPayload(payload, accepted); @@ -77,11 +70,6 @@ public abstract class ForwardingChannel implements Channel { return delegate.onHeaders(headers); } - @Override - public ListenableFuture onContext(String name, InputStream value) { - return delegate.onContext(name, value); - } - @Override public ListenableFuture onPayload(T payload) { return delegate.onPayload(payload); diff --git a/core/src/main/java/com/google/net/stubby/http/ServletSession.java b/core/src/main/java/com/google/net/stubby/http/ServletSession.java index 2307487ae6..fe2a4732da 100644 --- a/core/src/main/java/com/google/net/stubby/http/ServletSession.java +++ b/core/src/main/java/com/google/net/stubby/http/ServletSession.java @@ -160,13 +160,6 @@ public class ServletSession extends HttpServlet { } } - @Override - public Operation addContext(String type, InputStream message, Phase nextPhase) { - super.addContext(type, message, nextPhase); - framer.writeContext(type, message, getPhase() == Phase.CLOSED, this); - return this; - } - @Override public Operation addPayload(InputStream payload, Phase nextPhase) { super.addPayload(payload, Phase.PAYLOAD); diff --git a/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java index cb3287021d..2461d55300 100644 --- a/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java +++ b/core/src/main/java/com/google/net/stubby/http/UrlConnectionClientSession.java @@ -66,13 +66,6 @@ public class UrlConnectionClientSession implements Session { framer.setAllowCompression(false); } - @Override - public Operation addContext(String type, InputStream message, Phase nextPhase) { - super.addContext(type, message, nextPhase); - framer.writeContext(type, message, getPhase() == Phase.CLOSED, this); - return this; - } - @Override public Operation addPayload(InputStream payload, Phase nextPhase) { super.addPayload(payload, nextPhase); diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java index 9a17a7083f..2573d19559 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Operation.java @@ -26,13 +26,6 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink { this.framer = framer; } - @Override - public Operation addContext(String type, InputStream message, Phase nextPhase) { - super.addContext(type, message, nextPhase); - framer.writeContext(type, message, getPhase() == Phase.CLOSED, this); - return this; - } - @Override public Operation addPayload(InputStream payload, Phase nextPhase) { super.addPayload(payload, nextPhase); diff --git a/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Operation.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Operation.java index 8c52ba887d..67d8ef10cd 100644 --- a/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Operation.java +++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Operation.java @@ -29,13 +29,6 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink { this.framer = framer; } - @Override - public Operation addContext(String type, InputStream message, Phase nextPhase) { - super.addContext(type, message, nextPhase); - framer.writeContext(type, message, getPhase() == Phase.CLOSED, this); - return this; - } - @Override public Operation addPayload(InputStream payload, Phase nextPhase) { super.addPayload(payload, nextPhase); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java index 10687c9857..01e7025d34 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractStream.java @@ -21,13 +21,13 @@ public abstract class AbstractStream implements Stream { * Indicates the phase of the GRPC stream in one direction. */ protected enum Phase { - CONTEXT, MESSAGE, STATUS + HEADERS, MESSAGE, STATUS } private final Object writeLock = new Object(); private final MessageFramer framer; - protected Phase inboundPhase = Phase.CONTEXT; - protected Phase outboundPhase = Phase.CONTEXT; + protected Phase inboundPhase = Phase.HEADERS; + protected Phase outboundPhase = Phase.HEADERS; /** * Handler for Framer output. @@ -46,25 +46,12 @@ public abstract class AbstractStream implements Stream { @Override public ListenableFuture headersRead(Metadata.Headers headers) { - inboundPhase(Phase.CONTEXT); + inboundPhase(Phase.HEADERS); ListenableFuture future = listener().headersRead(headers); disableWindowUpdate(future); return future; } - @Override - public ListenableFuture contextRead(String name, InputStream value, int length) { - ListenableFuture future = null; - try { - inboundPhase(Phase.CONTEXT); - future = listener().contextRead(name, value, length); - disableWindowUpdate(future); - return future; - } finally { - closeWhenDone(future, value); - } - } - @Override public ListenableFuture messageRead(InputStream input, int length) { ListenableFuture future = null; @@ -101,25 +88,6 @@ public abstract class AbstractStream implements Stream { } } - @Override - public final void writeContext(String name, InputStream value, int length, - @Nullable Runnable accepted) { - Preconditions.checkNotNull(name, "name"); - Preconditions.checkNotNull(value, "value"); - Preconditions.checkArgument(length >= 0, "length must be >= 0"); - outboundPhase(Phase.CONTEXT); - synchronized (writeLock) { - if (!framer.isClosed()) { - framer.writeContext(name, value, length); - } - } - - // TODO(user): add flow control. - if (accepted != null) { - accepted.run(); - } - } - @Override public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { Preconditions.checkNotNull(message, "message"); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java index 72e44c8321..817ff9422f 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/Deframer.java @@ -6,7 +6,6 @@ import com.google.net.stubby.Metadata; import com.google.net.stubby.Operation; import com.google.net.stubby.Status; import com.google.net.stubby.transport.Transport; -import com.google.protobuf.ByteString; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -91,19 +90,6 @@ public abstract class Deframer implements Framer.Sink { currentLength = LENGTH_NOT_SET; inFrame = false; } - } else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) { - // Not clear if using proto encoding here is of any benefit. - // Using ContextValue.parseFrom requires copying out of the framed chunk - // Writing a custom parser would have to do varint handling and potentially - // deal with out-of-order tags etc. - Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk); - try { - ByteString value = contextValue.getValue(); - target.contextRead(contextValue.getKey(), value.newInput(), value.size()); - } finally { - currentLength = LENGTH_NOT_SET; - inFrame = false; - } } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) { int status = framedChunk.read() << 8 | framedChunk.read(); Transport.Code code = Transport.Code.valueOf(status); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java index 42dbf4b5fe..162d662dd4 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java @@ -22,11 +22,6 @@ public class ForwardingStreamListener implements StreamListener { return delegate.headersRead(headers); } - @Override - public ListenableFuture contextRead(String name, InputStream value, int length) { - return delegate.contextRead(name, value, length); - } - @Override public ListenableFuture messageRead(InputStream message, int length) { return delegate.messageRead(message, length); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Framer.java b/core/src/main/java/com/google/net/stubby/newtransport/Framer.java index c5b273dade..1e9bdf4b0f 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/Framer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/Framer.java @@ -23,12 +23,6 @@ public interface Framer { public void deliverFrame(T frame, boolean endOfStream); } - /** - * Write out a Context-Value message. {@code message} will be completely consumed. - * {@code message.available()} must return the number of remaining bytes to be read. - */ - public void writeContext(String type, InputStream message, int length); - /** * Write out a Payload message. {@code payload} will be completely consumed. * {@code payload.available()} must return the number of remaining bytes to be read. diff --git a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java index 512ac90a0a..cc1a3062ef 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/GrpcDeframer.java @@ -1,6 +1,5 @@ package com.google.net.stubby.newtransport; -import static com.google.net.stubby.GrpcFramingUtil.CONTEXT_VALUE_FRAME; import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH; import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH; import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK; @@ -14,7 +13,6 @@ import com.google.net.stubby.Status; import com.google.net.stubby.transport.Transport; import java.io.Closeable; -import java.io.IOException; import java.util.concurrent.Executor; /** @@ -181,9 +179,6 @@ public class GrpcDeframer implements Closeable { private ListenableFuture processBody() { ListenableFuture future = null; switch (frameType) { - case CONTEXT_VALUE_FRAME: - future = processContext(); - break; case PAYLOAD_FRAME: future = processMessage(); break; @@ -200,30 +195,6 @@ public class GrpcDeframer implements Closeable { return future; } - /** - * Processes the payload of a context frame. - */ - private ListenableFuture processContext() { - Transport.ContextValue ctx; - try { - // Not clear if using proto encoding here is of any benefit. - // Using ContextValue.parseFrom requires copying out of the framed chunk - // Writing a custom parser would have to do varint handling and potentially - // deal with out-of-order tags etc. - ctx = Transport.ContextValue.parseFrom(Buffers.openStream(nextFrame, false)); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - nextFrame.close(); - nextFrame = null; - } - - // Call the handler. - Buffer ctxBuffer = Buffers.wrap(ctx.getValue()); - return listener.contextRead(ctx.getKey(), Buffers.openStream(ctxBuffer, true), - ctxBuffer.readableBytes()); - } - /** * Processes the payload of a message frame. */ diff --git a/core/src/main/java/com/google/net/stubby/newtransport/MessageFramer.java b/core/src/main/java/com/google/net/stubby/newtransport/MessageFramer.java index 87cb153d82..786861ec18 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/MessageFramer.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/MessageFramer.java @@ -2,59 +2,16 @@ package com.google.net.stubby.newtransport; import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.Status; -import com.google.net.stubby.transport.Transport; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.WireFormat; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Arrays; /** * Default {@link Framer} implementation. */ public class MessageFramer implements Framer { - /** - * Size of the GRPC message frame header which consists of - * 1 byte for the type (payload, context, status) - * 4 bytes for the length of the message - */ - private static final int MESSAGE_HEADER_SIZE = 5; - - /** - * UTF-8 charset which is used for key name encoding in context values - */ - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - /** - * Precomputed protobuf tags for ContextValue - */ - private static final byte[] VALUE_TAG; - private static final byte[] KEY_TAG; - - - static { - // Initialize constants for serializing context-value in a protobuf compatible manner - try { - byte[] buf = new byte[8]; - CodedOutputStream coded = CodedOutputStream.newInstance(buf); - coded.writeTag(Transport.ContextValue.KEY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED); - coded.flush(); - KEY_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten()); - coded = CodedOutputStream.newInstance(buf); - coded.writeTag(Transport.ContextValue.VALUE_FIELD_NUMBER, - WireFormat.WIRETYPE_LENGTH_DELIMITED); - coded.flush(); - VALUE_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten()); - } catch (IOException ioe) { - // Unrecoverable - throw new RuntimeException(ioe); - } - } - private CompressionFramer framer; private final ByteBuffer scratch = ByteBuffer.allocate(16); @@ -71,16 +28,6 @@ public class MessageFramer implements Framer { framer.setAllowCompression(enable); } - /** - * Set the preferred compression level for when compression is enabled. - * @param level the preferred compression level, or {@code -1} to use the framing default - * @see java.util.zip.Deflater#setLevel - */ - public void setCompressionLevel(int level) { - verifyNotClosed(); - framer.setCompressionLevel(level); - } - @Override public void writePayload(InputStream message, int messageLength) { verifyNotClosed(); @@ -99,41 +46,6 @@ public class MessageFramer implements Framer { } - @Override - public void writeContext(String key, InputStream message, int messageLen) { - verifyNotClosed(); - try { - scratch.clear(); - scratch.put(GrpcFramingUtil.CONTEXT_VALUE_FRAME); - byte[] keyBytes = key.getBytes(UTF_8); - int lenKeyPrefix = KEY_TAG.length + - CodedOutputStream.computeRawVarint32Size(keyBytes.length); - int lenValPrefix = VALUE_TAG.length + CodedOutputStream.computeRawVarint32Size(messageLen); - int totalLen = lenKeyPrefix + keyBytes.length + lenValPrefix + messageLen; - scratch.putInt(totalLen); - framer.write(scratch.array(), 0, scratch.position()); - - // Write key - scratch.clear(); - scratch.put(KEY_TAG); - writeRawVarInt32(keyBytes.length, scratch); - framer.write(scratch.array(), 0, scratch.position()); - framer.write(keyBytes, 0, keyBytes.length); - - // Write value - scratch.clear(); - scratch.put(VALUE_TAG); - writeRawVarInt32(messageLen, scratch); - framer.write(scratch.array(), 0, scratch.position()); - if (messageLen != framer.write(message)) { - throw new RuntimeException("Message length was inaccurate"); - } - framer.endOfMessage(); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - @Override public void writeStatus(Status status) { verifyNotClosed(); @@ -178,19 +90,4 @@ public class MessageFramer implements Framer { throw new IllegalStateException("Framer already closed"); } } - - /** - * Write a raw VarInt32 to the buffer - */ - private static void writeRawVarInt32(int value, ByteBuffer bytebuf) { - while (true) { - if ((value & ~0x7F) == 0) { - bytebuf.put((byte) value); - return; - } else { - bytebuf.put((byte) ((value & 0x7F) | 0x80)); - value >>>= 7; - } - } - } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/Stream.java b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java index e2853fecc9..eb0eb67e3f 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/Stream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/Stream.java @@ -16,26 +16,6 @@ public interface Stream { */ StreamState state(); - /** - * Writes the context name/value pair to the remote end-point. The bytes from the stream are - * immediate read by the Transport. This method will always return immediately and will not wait - * for the write to complete. - * - *

When the write is "accepted" by the transport, the given callback (if provided) will be - * called. The definition of what it means to be "accepted" is up to the transport implementation, - * but this is a general indication that the transport is capable of handling more out-bound data - * on the stream. If the stream/connection is closed for any reason before the write could be - * accepted, the callback will never be invoked. Any writes that are still pending upon receiving - * a {@link StreamListener#closed} callback are assumed to be cancelled. - * - * @param name the unique application-defined name for the context property. - * @param value the value of the context property. - * @param length the length of the {@link InputStream}. - * @param accepted an optional callback for when the transport has accepted the write. - */ - @Deprecated - void writeContext(String name, InputStream value, int length, @Nullable Runnable accepted); - /** * Writes a message payload to the remote end-point. The bytes from the stream are immediate read * by the Transport. This method will always return immediately and will not wait for the write to diff --git a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java index 7a3d71f570..980991a04e 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java @@ -25,29 +25,6 @@ public interface StreamListener { @Nullable ListenableFuture headersRead(Metadata.Headers headers); - /** - * Called upon receiving context information from the remote end-point. The {@link InputStream} is - * non-blocking and contains the entire context. - * - *

The method optionally returns a future that can be observed by flow control to determine - * when the context has been processed by the application. If {@code null} is returned, processing - * of this context is assumed to be complete upon returning from this method. - * - *

The {@code value} {@link InputStream} will be closed when the returned future completes. If - * no future is returned, the stream will be closed immediately after returning from this method. - * - *

This method should return quickly, as the same thread may be used to process other streams. - * - * @param name the unique name of the context - * @param value the value of the context. - * @param length the length of the value {@link InputStream}. - * @return a processing completion future, or {@code null} to indicate that processing of the - * context is immediately complete. - */ - @Nullable - @Deprecated - ListenableFuture contextRead(String name, InputStream value, int length); - /** * Called upon receiving a message from the remote end-point. The {@link InputStream} is * non-blocking and contains the entire message. diff --git a/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java b/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java index b80e34cec6..75618fba76 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/TransportFrameUtil.java @@ -39,39 +39,9 @@ public final class TransportFrameUtil { public static final int COMPRESSION_HEADER_LENGTH = COMPRESSION_TYPE_LENGTH + COMPRESSION_FRAME_LENGTH; - /** - * Length of flags block in bytes - */ - public static final int FRAME_TYPE_LENGTH = 1; - // Flags public static final byte PAYLOAD_FRAME = 0x0; - public static final byte CONTEXT_VALUE_FRAME = 0x1; - public static final byte CALL_HEADER_FRAME = 0x2; public static final byte STATUS_FRAME = 0x3; - public static final byte FRAME_TYPE_MASK = 0x3; - - /** - * Number of bytes for the length field within a frame - */ - public static final int FRAME_LENGTH = 4; - - /** - * Full length of the GRPC frame header. - */ - public static final int FRAME_HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH; - - public static boolean isContextValueFrame(int flags) { - return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME; - } - - public static boolean isPayloadFrame(byte flags) { - return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME; - } - - public static boolean isStatusFrame(byte flags) { - return (flags & FRAME_TYPE_MASK) == STATUS_FRAME; - } // TODO(user): This needs proper namespacing support, this is currently just a hack /** diff --git a/core/src/main/java/com/google/net/stubby/transport/Deframer.java b/core/src/main/java/com/google/net/stubby/transport/Deframer.java index 3e3fe9db13..da1c0e5c87 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Deframer.java +++ b/core/src/main/java/com/google/net/stubby/transport/Deframer.java @@ -73,20 +73,6 @@ public abstract class Deframer { currentLength = LENGTH_NOT_SET; inFrame = false; } - } else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) { - // Not clear if using proto encoding here is of any benefit. - // Using ContextValue.parseFrom requires copying out of the framed chunk - // Writing a custom parser would have to do varint handling and potentially - // deal with out-of-order tags etc. - Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk); - try { - target.addContext(contextValue.getKey(), - contextValue.getValue().newInput(), - target.getPhase()); - } finally { - currentLength = LENGTH_NOT_SET; - inFrame = false; - } } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) { int status = framedChunk.read() << 8 | framedChunk.read(); Transport.Code code = Transport.Code.valueOf(status); diff --git a/core/src/main/java/com/google/net/stubby/transport/Framer.java b/core/src/main/java/com/google/net/stubby/transport/Framer.java index 2ef03653d8..3ed64c61bb 100644 --- a/core/src/main/java/com/google/net/stubby/transport/Framer.java +++ b/core/src/main/java/com/google/net/stubby/transport/Framer.java @@ -24,12 +24,6 @@ public interface Framer { public void deliverFrame(ByteBuffer frame, boolean endOfMessage); } - /** - * Write out a Context-Value message. {@code message} will be completely consumed. - * {@code message.available()} must return the number of remaining bytes to be read. - */ - public void writeContext(String type, InputStream message, boolean flush, Sink sink); - /** * Write out a Payload message. {@code payload} will be completely consumed. * {@code payload.available()} must return the number of remaining bytes to be read. diff --git a/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java b/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java index 1dce95c1c3..37de113975 100644 --- a/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java +++ b/core/src/main/java/com/google/net/stubby/transport/MessageFramer.java @@ -2,58 +2,16 @@ package com.google.net.stubby.transport; import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.Status; -import com.google.protobuf.CodedOutputStream; -import com.google.protobuf.WireFormat; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.util.Arrays; /** * Default {@link Framer} implementation. */ public class MessageFramer implements Framer { - /** - * Size of the GRPC message frame header which consists of - * 1 byte for the type (payload, context, status) - * 4 bytes for the length of the message - */ - private static final int MESSAGE_HEADER_SIZE = 5; - - /** - * UTF-8 charset which is used for key name encoding in context values - */ - private static final Charset UTF_8 = Charset.forName("UTF-8"); - - /** - * Precomputed protobuf tags for ContextValue - */ - private static final byte[] VALUE_TAG; - private static final byte[] KEY_TAG; - - - static { - // Initialize constants for serializing context-value in a protobuf compatible manner - try { - byte[] buf = new byte[8]; - CodedOutputStream coded = CodedOutputStream.newInstance(buf); - coded.writeTag(Transport.ContextValue.KEY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED); - coded.flush(); - KEY_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten()); - coded = CodedOutputStream.newInstance(buf); - coded.writeTag(Transport.ContextValue.VALUE_FIELD_NUMBER, - WireFormat.WIRETYPE_LENGTH_DELIMITED); - coded.flush(); - VALUE_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten()); - } catch (IOException ioe) { - // Unrecoverable - throw new RuntimeException(ioe); - } - } - private CompressionFramer framer; private final ByteBuffer scratch = ByteBuffer.allocate(16); @@ -69,15 +27,6 @@ public class MessageFramer implements Framer { framer.setAllowCompression(enable); } - /** - * Set the preferred compression level for when compression is enabled. - * @param level the preferred compression level, or {@code -1} to use the framing default - * @see java.util.zip.Deflater#setLevel - */ - public void setCompressionLevel(int level) { - framer.setCompressionLevel(level); - } - @Override public void writePayload(InputStream message, boolean flush, Sink sink) { try { @@ -98,45 +47,6 @@ public class MessageFramer implements Framer { } } - - @Override - public void writeContext(String key, InputStream message, boolean flush, Sink sink) { - try { - scratch.clear(); - scratch.put(GrpcFramingUtil.CONTEXT_VALUE_FRAME); - byte[] keyBytes = key.getBytes(UTF_8); - int lenKeyPrefix = KEY_TAG.length + - CodedOutputStream.computeRawVarint32Size(keyBytes.length); - int messageLen = message.available(); - int lenValPrefix = VALUE_TAG.length + CodedOutputStream.computeRawVarint32Size(messageLen); - int totalLen = lenKeyPrefix + keyBytes.length + lenValPrefix + messageLen; - scratch.putInt(totalLen); - framer.write(scratch.array(), 0, scratch.position(), sink); - - // Write key - scratch.clear(); - scratch.put(KEY_TAG); - writeRawVarInt32(keyBytes.length, scratch); - framer.write(scratch.array(), 0, scratch.position(), sink); - framer.write(keyBytes, 0, keyBytes.length, sink); - - // Write value - scratch.clear(); - scratch.put(VALUE_TAG); - writeRawVarInt32(messageLen, scratch); - framer.write(scratch.array(), 0, scratch.position(), sink); - if (messageLen != framer.write(message, sink)) { - throw new RuntimeException("InputStream's available() was inaccurate"); - } - framer.endOfMessage(sink); - if (flush && framer != null) { - framer.flush(sink); - } - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - @Override public void writeStatus(Status status, boolean flush, Sink sink) { short code = (short) status.getCode().getNumber(); @@ -162,19 +72,4 @@ public class MessageFramer implements Framer { // TODO(user): Returning buffer to a pool would go here framer = null; } - - /** - * Write a raw VarInt32 to the buffer - */ - private static void writeRawVarInt32(int value, ByteBuffer bytebuf) { - while (true) { - if ((value & ~0x7F) == 0) { - bytebuf.put((byte) value); - return; - } else { - bytebuf.put((byte) ((value & 0x7F) | 0x80)); - value >>>= 7; - } - } - } } diff --git a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java index 24d4f0d84d..f360f47cde 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/GrpcDeframerTest.java @@ -1,13 +1,11 @@ package com.google.net.stubby.newtransport; -import static com.google.net.stubby.newtransport.TransportFrameUtil.CONTEXT_VALUE_FRAME; import static com.google.net.stubby.newtransport.TransportFrameUtil.PAYLOAD_FRAME; import static com.google.net.stubby.newtransport.TransportFrameUtil.STATUS_FRAME; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.notNull; import static org.mockito.Mockito.never; @@ -44,66 +42,33 @@ import javax.annotation.Nullable; */ @RunWith(JUnit4.class) public class GrpcDeframerTest { - private static final String KEY = "key"; private static final String MESSAGE = "hello world"; private static final ByteString MESSAGE_BSTR = ByteString.copyFromUtf8(MESSAGE); private static final Transport.Code STATUS_CODE = Transport.Code.CANCELLED; private GrpcDeframer reader; - private Transport.ContextValue contextProto; - private StubDecompressor decompressor; @Mock private StreamListener listener; - private SettableFuture contextFuture; - private SettableFuture messageFuture; @Before public void setup() { MockitoAnnotations.initMocks(this); - - contextFuture = SettableFuture.create(); messageFuture = SettableFuture.create(); - when(listener.contextRead(anyString(), any(InputStream.class), anyInt())).thenReturn( - contextFuture); when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture); decompressor = new StubDecompressor(); reader = new GrpcDeframer(decompressor, listener, MoreExecutors.directExecutor()); - - contextProto = Transport.ContextValue.newBuilder().setKey(KEY).setValue(MESSAGE_BSTR).build(); - } - - @Test - public void contextShouldCallTarget() throws Exception { - decompressor.init(contextFrame()); - reader.deframe(Buffers.empty(), false); - verifyContext(); - verifyNoPayload(); - verifyNoStatus(); - } - - @Test - public void contextWithEndOfStreamShouldNotifyStatus() throws Exception { - decompressor.init(contextFrame()); - reader.deframe(Buffers.empty(), true); - verifyContext(); - verifyNoPayload(); - verifyNoStatus(); - - contextFuture.set(null); - verifyStatus(Transport.Code.OK); } @Test public void payloadShouldCallTarget() throws Exception { decompressor.init(payloadFrame()); reader.deframe(Buffers.empty(), false); - verifyNoContext(); verifyPayload(); verifyNoStatus(); } @@ -112,7 +77,6 @@ public class GrpcDeframerTest { public void payloadWithEndOfStreamShouldNotifyStatus() throws Exception { decompressor.init(payloadFrame()); reader.deframe(Buffers.empty(), true); - verifyNoContext(); verifyPayload(); verifyNoStatus(); @@ -124,7 +88,6 @@ public class GrpcDeframerTest { public void statusShouldCallTarget() throws Exception { decompressor.init(statusFrame()); reader.deframe(Buffers.empty(), false); - verifyNoContext(); verifyNoPayload(); verifyStatus(); } @@ -133,7 +96,6 @@ public class GrpcDeframerTest { public void statusWithEndOfStreamShouldNotifyStatusOnce() throws Exception { decompressor.init(statusFrame()); reader.deframe(Buffers.empty(), true); - verifyNoContext(); verifyNoPayload(); verifyStatus(); } @@ -143,9 +105,6 @@ public class GrpcDeframerTest { ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(os); - // Write a context frame. - writeFrame(CONTEXT_VALUE_FRAME, contextProto.toByteArray(), dos); - // Write a payload frame. writeFrame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray(), dos); @@ -161,11 +120,6 @@ public class GrpcDeframerTest { reader.deframe(Buffers.empty(), false); // Verify that all callbacks were called. - verifyContext(); - verifyNoPayload(); - verifyNoStatus(); - - contextFuture.set(null); verifyPayload(); verifyNoStatus(); @@ -187,7 +141,6 @@ public class GrpcDeframerTest { byte[] chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); decompressor.init(chunk); reader.deframe(Buffers.empty(), false); - verifyNoContext(); verifyNoPayload(); verifyNoStatus(); @@ -197,17 +150,10 @@ public class GrpcDeframerTest { chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); decompressor.init(chunk); reader.deframe(Buffers.empty(), false); - verifyNoContext(); verifyPayload(); verifyNoStatus(); } - private void verifyContext() { - ArgumentCaptor captor = ArgumentCaptor.forClass(InputStream.class); - verify(listener).contextRead(eq(KEY), captor.capture(), eq(MESSAGE.length())); - assertEquals(MESSAGE, readString(captor.getValue(), MESSAGE.length())); - } - private void verifyPayload() { ArgumentCaptor captor = ArgumentCaptor.forClass(InputStream.class); verify(listener).messageRead(captor.capture(), eq(MESSAGE.length())); @@ -234,10 +180,6 @@ public class GrpcDeframerTest { assertEquals(code, captor.getValue().getCode()); } - private void verifyNoContext() { - verify(listener, never()).contextRead(any(String.class), any(InputStream.class), anyInt()); - } - private void verifyNoPayload() { verify(listener, never()).messageRead(any(InputStream.class), anyInt()); } @@ -246,10 +188,6 @@ public class GrpcDeframerTest { verify(listener, never()).closed(any(Status.class), notNull(Metadata.Trailers.class)); } - private byte[] contextFrame() throws IOException { - return frame(CONTEXT_VALUE_FRAME, contextProto.toByteArray()); - } - private static byte[] payloadFrame() throws IOException { return frame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray()); } diff --git a/core/src/test/java/com/google/net/stubby/newtransport/MessageFramerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/MessageFramerTest.java index 11730a7c12..419f71fa9b 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/MessageFramerTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/MessageFramerTest.java @@ -9,7 +9,6 @@ import com.google.common.primitives.Bytes; import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.Status; import com.google.net.stubby.transport.Transport; -import com.google.protobuf.ByteString; import org.junit.Test; import org.junit.runner.RunWith; @@ -53,37 +52,6 @@ public class MessageFramerTest { } } - @Test - public void testContext() throws Exception { - CapturingSink sink = new CapturingSink(); - MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE); - byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}; - byte[] contextValue = Transport.ContextValue.newBuilder() - .setKey("somekey") - .setValue(ByteString.copyFrom(payload)) - .build().toByteArray(); - byte[] unframedStream = - Bytes.concat( - new byte[]{GrpcFramingUtil.CONTEXT_VALUE_FRAME}, - new byte[]{0, 0, - (byte) (contextValue.length >> 8 & 0xff), - (byte) (contextValue.length & 0xff)}, - contextValue); - for (int i = 0; i < 1000; i++) { - framer.writeContext("somekey", new ByteArrayInputStream(payload), payload.length); - if ((i + 1) % 13 == 0) { - framer.flush(); - } - } - framer.flush(); - assertEquals(unframedStream.length * 1000, sink.deframedStream.length); - for (int i = 0; i < 1000; i++) { - assertArrayEquals(unframedStream, - Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length, - (i + 1) * unframedStream.length)); - } - } - @Test public void testStatus() throws Exception { CapturingSink sink = new CapturingSink(); diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java index 1e7f767d92..9702c64ad5 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java @@ -42,16 +42,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase { verify(channel).writeAndFlush(any(CancelStreamCommand.class)); } - @Test - public void writeContextShouldSendRequest() throws Exception { - // Force stream creation. - stream().id(STREAM_ID); - stream.writeContext(CONTEXT_KEY, input, input.available(), accepted); - stream.flush(); - verify(channel).writeAndFlush(new SendGrpcFrameCommand(1, contextFrame(), false)); - verify(accepted).run(); - } - @Test public void writeMessageShouldSendRequest() throws Exception { // Force stream creation. @@ -97,16 +87,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase { assertEquals(StreamState.CLOSED, stream.state()); } - @Override - @Test - public void inboundContextShouldCallListener() throws Exception { - // Receive headers first so that it's a valid GRPC response. - stream().id(1); - stream().inboundHeadersRecieved(grpcResponseHeaders(), false); - - super.inboundContextShouldCallListener(); - } - @Override @Test public void inboundMessageShouldCallListener() throws Exception { diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java index 52871b05a8..0030cd12ce 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java @@ -23,15 +23,6 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class NettyServerStreamTest extends NettyStreamTestBase { - @Test - public void writeContextShouldSendResponse() throws Exception { - stream.writeContext(CONTEXT_KEY, input, input.available(), accepted); - stream.flush(); - verify(channel).write(new SendResponseHeadersCommand(STREAM_ID)); - verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, contextFrame(), false)); - verify(accepted).run(); - } - @Test public void writeMessageShouldSendResponse() throws Exception { stream.writeMessage(input, input.available(), accepted); diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java index 9380d65044..ddc5208ded 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java @@ -1,6 +1,5 @@ package com.google.net.stubby.newtransport.netty; -import static com.google.net.stubby.GrpcFramingUtil.CONTEXT_VALUE_FRAME; import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME; import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME; import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.DEFAULT_WINDOW_UPDATE_RATIO; @@ -10,7 +9,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.never; @@ -21,8 +19,6 @@ import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.Status; import com.google.net.stubby.newtransport.StreamListener; -import com.google.net.stubby.transport.Transport.ContextValue; -import com.google.protobuf.ByteString; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -55,7 +51,6 @@ import java.util.concurrent.TimeUnit; * Base class for Netty stream unit tests. */ public abstract class NettyStreamTestBase { - protected static final String CONTEXT_KEY = "key"; protected static final String MESSAGE = "hello world"; protected static final int STREAM_ID = 1; @@ -106,8 +101,6 @@ public abstract class NettyStreamTestBase { when(eventLoop.inEventLoop()).thenReturn(true); processingFuture = SettableFuture.create(); - when(listener.contextRead(anyString(), any(InputStream.class), anyInt())).thenReturn( - processingFuture); when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(processingFuture); doAnswer(new Answer() { @@ -123,25 +116,6 @@ public abstract class NettyStreamTestBase { stream = createStream(); } - @Test - public void inboundContextShouldCallListener() throws Exception { - stream.inboundDataReceived(contextFrame(), false); - ArgumentCaptor captor = ArgumentCaptor.forClass(InputStream.class); - verify(listener).contextRead(eq(CONTEXT_KEY), captor.capture(), eq(MESSAGE.length())); - - // Verify that inbound flow control window update has been disabled for the stream. - verify(inboundFlow).setWindowUpdateRatio(eq(ctx), eq(STREAM_ID), eq(WINDOW_UPDATE_OFF)); - verify(inboundFlow, never()).setWindowUpdateRatio(eq(ctx), eq(STREAM_ID), - eq(DEFAULT_WINDOW_UPDATE_RATIO)); - assertEquals(MESSAGE, toString(captor.getValue())); - - // Verify that inbound flow control window update has been re-enabled for the stream after - // the future completes. - processingFuture.set(null); - verify(inboundFlow).setWindowUpdateRatio(eq(ctx), eq(STREAM_ID), - eq(DEFAULT_WINDOW_UPDATE_RATIO)); - } - @Test public void inboundMessageShouldCallListener() throws Exception { stream.inboundDataReceived(messageFrame(), false); @@ -169,24 +143,6 @@ public abstract class NettyStreamTestBase { return new String(bytes, UTF_8); } - protected final ByteBuf contextFrame() throws Exception { - byte[] body = ContextValue - .newBuilder() - .setKey(CONTEXT_KEY) - .setValue(ByteString.copyFromUtf8(MESSAGE)) - .build() - .toByteArray(); - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(os); - dos.write(CONTEXT_VALUE_FRAME); - dos.writeInt(body.length); - dos.write(body); - dos.close(); - - // Write the compression header followed by the context frame. - return compressionFrame(os.toByteArray()); - } - protected final ByteBuf messageFrame() throws Exception { ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(os); diff --git a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java index 1407b07ce7..630518d43a 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java @@ -7,7 +7,9 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.any; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; import com.google.common.util.concurrent.ListenableFuture; @@ -20,8 +22,6 @@ import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.ClientFra import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.OkHttpClientStream; import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport.Code; -import com.google.net.stubby.transport.Transport.ContextValue; -import com.google.protobuf.ByteString; import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.FrameReader; @@ -64,7 +64,6 @@ public class OkHttpClientTransportTest { // Flags private static final byte PAYLOAD_FRAME = 0x0; - public static final byte CONTEXT_VALUE_FRAME = 0x1; public static final byte STATUS_FRAME = 0x3; @Mock @@ -143,30 +142,6 @@ public class OkHttpClientTransportTest { } } - @Test - public void readContexts() throws Exception { - final int numContexts = 10; - final String key = "KEY"; - final String value = "value"; - MockStreamListener listener = new MockStreamListener(); - clientTransport.newStream(method,new Metadata.Headers(), listener); - assertTrue(streams.containsKey(3)); - for (int i = 0; i < numContexts; i++) { - BufferedSource source = mock(BufferedSource.class); - InputStream inputStream = createContextFrame(key + i, value + i); - when(source.inputStream()).thenReturn(inputStream); - frameHandler.data(i == numContexts - 1 ? true : false, 3, source, inputStream.available()); - } - listener.waitUntilStreamClosed(); - assertEquals(Status.OK, listener.status); - assertEquals(numContexts, listener.contexts.size()); - for (int i = 0; i < numContexts; i++) { - String val = listener.contexts.get(key + i); - assertNotNull(val); - assertEquals(value + i, val); - } - } - @Test public void readStatus() throws Exception { MockStreamListener listener = new MockStreamListener(); @@ -218,25 +193,6 @@ public class OkHttpClientTransportTest { checkSameInputStream(createMessageFrame(message), sentFrame.inputStream()); } - @Test - public void writeContext() throws Exception { - final String key = "KEY"; - final String value = "VALUE"; - MockStreamListener listener = new MockStreamListener(); - clientTransport.newStream(method,new Metadata.Headers(), listener); - OkHttpClientStream stream = streams.get(3); - InputStream input = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8)); - stream.writeContext(key, input, input.available(), null); - stream.flush(); - ArgumentCaptor captor = - ArgumentCaptor.forClass(Buffer.class); - verify(frameWriter).data(eq(false), eq(3), captor.capture()); - stream.cancel(); - verify(frameWriter).rstStream(eq(3), eq(ErrorCode.CANCEL)); - listener.waitUntilStreamClosed(); - assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL), listener.status); - } - @Test public void windowUpdate() throws Exception { MockStreamListener listener1 = new MockStreamListener(); @@ -249,26 +205,7 @@ public class OkHttpClientTransportTest { int messageLength = OkHttpClientTransport.DEFAULT_INITIAL_WINDOW_SIZE / 4; byte[] fakeMessage = new byte[messageLength]; - byte[] contextBody = ContextValue - .newBuilder() - .setKey("KEY") - .setValue(ByteString.copyFrom(fakeMessage)) - .build() - .toByteArray(); - - // Stream 1 receives context - InputStream contextFrame = createContextFrame(contextBody); - int contextFrameLength = contextFrame.available(); BufferedSource source = mock(BufferedSource.class); - when(source.inputStream()).thenReturn(contextFrame); - frameHandler.data(false, 3, source, contextFrame.available()); - - // Stream 2 receives context - contextFrame = createContextFrame(contextBody); - when(source.inputStream()).thenReturn(contextFrame); - frameHandler.data(false, 5, source, contextFrame.available()); - - verify(frameWriter).windowUpdate(eq(0), eq((long) 2 * contextFrameLength)); // Stream 1 receives a message InputStream messageFrame = createMessageFrame(fakeMessage); @@ -276,14 +213,27 @@ public class OkHttpClientTransportTest { when(source.inputStream()).thenReturn(messageFrame); frameHandler.data(false, 3, source, messageFrame.available()); - verify(frameWriter).windowUpdate(eq(3), eq((long) contextFrameLength + messageFrameLength)); - // Stream 2 receives a message messageFrame = createMessageFrame(fakeMessage); when(source.inputStream()).thenReturn(messageFrame); frameHandler.data(false, 5, source, messageFrame.available()); - verify(frameWriter).windowUpdate(eq(5), eq((long) contextFrameLength + messageFrameLength)); + verify(frameWriter).windowUpdate(eq(0), eq((long) 2 * messageFrameLength)); + reset(frameWriter); + + // Stream 1 receives another message + messageFrame = createMessageFrame(fakeMessage); + when(source.inputStream()).thenReturn(messageFrame); + frameHandler.data(false, 3, source, messageFrame.available()); + + verify(frameWriter).windowUpdate(eq(3), eq((long) 2 * messageFrameLength)); + + // Stream 2 receives another message + messageFrame = createMessageFrame(fakeMessage); + when(source.inputStream()).thenReturn(messageFrame); + frameHandler.data(false, 5, source, messageFrame.available()); + + verify(frameWriter).windowUpdate(eq(5), eq((long) 2 * messageFrameLength)); verify(frameWriter).windowUpdate(eq(0), eq((long) 2 * messageFrameLength)); stream1.cancel(); @@ -426,29 +376,6 @@ public class OkHttpClientTransportTest { return addCompressionHeader(messageFrame); } - private static InputStream createContextFrame(String key, String value) throws IOException { - byte[] body = ContextValue - .newBuilder() - .setKey(key) - .setValue(ByteString.copyFromUtf8(value)) - .build() - .toByteArray(); - return createContextFrame(body); - } - - private static InputStream createContextFrame(byte[] body) throws IOException { - ByteArrayOutputStream os = new ByteArrayOutputStream(); - DataOutputStream dos = new DataOutputStream(os); - dos.write(CONTEXT_VALUE_FRAME); - dos.writeInt(body.length); - dos.write(body); - dos.close(); - byte[] contextFrame = os.toByteArray(); - - // Write the compression header followed by the context frame. - return addCompressionHeader(contextFrame); - } - private static InputStream createStatusFrame(short code) throws IOException { ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream dos = new DataOutputStream(os); @@ -516,16 +443,6 @@ public class OkHttpClientTransportTest { ArrayList messages = new ArrayList(); Map contexts = new HashMap(); - @Override - public ListenableFuture contextRead(String name, InputStream value, int length) { - String valueStr = getContent(value); - if (valueStr != null) { - // We assume only one context for each name. - contexts.put(name, valueStr); - } - return null; - } - @Override public ListenableFuture headersRead(Metadata.Headers headers) { return null; diff --git a/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java b/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java index fae878cdfb..7707cfcf27 100644 --- a/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java +++ b/core/src/test/java/com/google/net/stubby/transport/MessageFramerTest.java @@ -8,7 +8,6 @@ import com.google.common.io.ByteBuffers; import com.google.common.primitives.Bytes; import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.Status; -import com.google.protobuf.ByteString; import org.junit.Test; import org.junit.runner.RunWith; @@ -52,37 +51,6 @@ public class MessageFramerTest { } } - @Test - public void testContext() throws Exception { - MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE); - byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}; - byte[] contextValue = Transport.ContextValue.newBuilder() - .setKey("somekey") - .setValue(ByteString.copyFrom(payload)) - .build().toByteArray(); - byte[] unframedStream = - Bytes.concat( - new byte[]{GrpcFramingUtil.CONTEXT_VALUE_FRAME}, - new byte[]{0, 0, - (byte) (contextValue.length >> 8 & 0xff), - (byte) (contextValue.length & 0xff)}, - contextValue); - CapturingSink sink = new CapturingSink(); - for (int i = 0; i < 1000; i++) { - framer.writeContext("somekey", new ByteArrayInputStream(payload), (i % 17 == 11), sink); - if ((i + 1) % 13 == 0) { - framer.flush(sink); - } - } - framer.flush(sink); - assertEquals(unframedStream.length * 1000, sink.deframedStream.length); - for (int i = 0; i < 1000; i++) { - assertArrayEquals(unframedStream, - Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length, - (i + 1) * unframedStream.length)); - } - } - @Test public void testStatus() throws Exception { MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE); diff --git a/stub/src/main/java/com/google/net/stubby/stub/Calls.java b/stub/src/main/java/com/google/net/stubby/stub/Calls.java index bba5c57a2b..c6def232b6 100644 --- a/stub/src/main/java/com/google/net/stubby/stub/Calls.java +++ b/stub/src/main/java/com/google/net/stubby/stub/Calls.java @@ -11,7 +11,6 @@ import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import com.google.net.stubby.transport.Transport; -import java.io.InputStream; import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.CancellationException; @@ -215,12 +214,6 @@ public class Calls { return null; } - @Override - public ListenableFuture onContext(String name, InputStream value) { - // StreamObservers don't receive contexts. - return null; - } - @Override public ListenableFuture onPayload(T payload) { observer.onValue(payload); @@ -253,12 +246,6 @@ public class Calls { return null; } - @Override - public ListenableFuture onContext(String name, InputStream value) { - // Don't care about contexts. - return null; - } - @Override public ListenableFuture onPayload(RespT value) { if (this.value != null) { @@ -338,12 +325,6 @@ public class Calls { private class QueuingListener extends Call.Listener { private boolean done = false; - @Override - public ListenableFuture onContext(String name, InputStream value) { - // Don't care about contexts. - return null; - } - @Override public ListenableFuture onHeaders(Metadata.Headers headers) { return null;