Phase 2 of converting to new Headers mechanism for side-channel.

Remove 'context' from interfaces/tests/framers/...

Next phases
- Switch the wire format (ESF needs to be done in near lock-step)
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=75164642
This commit is contained in:
lryan 2014-09-09 19:25:43 -07:00 committed by Eric Anderson
parent e4bd1c7d69
commit a7d735e69b
33 changed files with 25 additions and 911 deletions

View File

@ -56,20 +56,6 @@ public abstract class AbstractOperation implements Operation {
return this; return this;
} }
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
if (getPhase() == Phase.CLOSED) {
throw new RuntimeException("addContext called after operation closed");
}
if (phase == Phase.PAYLOAD) {
progressTo(Phase.FOOTERS);
}
if (phase == Phase.HEADERS || phase == Phase.FOOTERS) {
return progressTo(nextPhase);
}
throw new IllegalStateException("Cannot add context in phase " + phase.name());
}
@Override @Override
public Operation addPayload(InputStream payload, Phase nextPhase) { public Operation addPayload(InputStream payload, Phase nextPhase) {
if (getPhase() == Phase.CLOSED) { if (getPhase() == Phase.CLOSED) {

View File

@ -3,8 +3,6 @@ package com.google.net.stubby;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import java.io.InputStream;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -43,17 +41,6 @@ public abstract class Call<RequestT, ResponseT> {
*/ */
public abstract ListenableFuture<Void> onHeaders(Metadata.Headers headers); public abstract ListenableFuture<Void> onHeaders(Metadata.Headers headers);
/**
* A response context has been received. Any context messages will precede payload messages.
*
* <p>The {@code value} {@link InputStream} will be closed when the returned future completes.
* If no future is returned, the value will be closed immediately after returning from this
* method.
*/
@Nullable
@Deprecated
public abstract ListenableFuture<Void> onContext(String name, InputStream value);
/** /**
* A response payload has been received. For streaming calls, there may be zero payload * A response payload has been received. For streaming calls, there may be zero payload
* messages. * messages.
@ -80,40 +67,6 @@ public abstract class Call<RequestT, ResponseT> {
// TODO(user): Might be better to put into Channel#newCall, might reduce decoration burden // TODO(user): Might be better to put into Channel#newCall, might reduce decoration burden
public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers); public abstract void start(Listener<ResponseT> responseListener, Metadata.Headers headers);
/**
* Send a context message. Context messages are intended for side-channel information like
* statistics and authentication.
*
* @param name key identifier of context
* @param value context value bytes
* @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed,
* or after {@link #sendPayload}
*/
@Deprecated
public void sendContext(String name, InputStream value) {
sendContext(name, value, null);
}
/**
* Send a context message. Context messages are intended for side-channel information like
* statistics and authentication.
*
* <p>If {@code accepted} is non-{@code null}, then the future will be completed when the flow
* control window is able to fully permit the context message. If the Call fails before being
* accepted, then the future will be cancelled. Callers concerned with unbounded buffering should
* wait until the future completes before sending more messages.
*
* @param name key identifier of context
* @param value context value bytes
* @param accepted notification for adhering to flow control, or {@code null}
* @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed,
* or after {@link #sendPayload}
*/
@Deprecated
public abstract void sendContext(String name, InputStream value,
@Nullable SettableFuture<Void> accepted);
/** /**
* Prevent any further processing for this Call. No further messages may be sent or will be * Prevent any further processing for this Call. No further messages may be sent or will be
* received. The server is informed of cancellations, but may not stop processing the call. * received. The server is informed of cancellations, but may not stop processing the call.

View File

@ -167,17 +167,6 @@ public final class ChannelImpl extends AbstractService implements Channel {
} }
} }
@Override
public void sendContext(String name, InputStream value, SettableFuture<Void> accepted) {
Preconditions.checkState(stream != null, "Not started");
if (accepted == null) {
stream.writeContext(name, value, available(value), null);
} else {
inProcessFutures.add(accepted);
stream.writeContext(name, value, available(value), new AcceptedRunnable(accepted));
}
}
@Override @Override
public void sendPayload(ReqT payload, SettableFuture<Void> accepted) { public void sendPayload(ReqT payload, SettableFuture<Void> accepted) {
Preconditions.checkState(stream != null, "Not started"); Preconditions.checkState(stream != null, "Not started");
@ -251,17 +240,6 @@ public final class ChannelImpl extends AbstractService implements Channel {
}); });
} }
@Override
public ListenableFuture<Void> contextRead(final String name, final InputStream value,
final int length) {
return dispatchCallable(new Callable<ListenableFuture<Void>>() {
@Override
public ListenableFuture<Void> call() {
return observer.onContext(name, value);
}
});
}
@Override @Override
public ListenableFuture<Void> messageRead(final InputStream message, final int length) { public ListenableFuture<Void> messageRead(final InputStream message, final int length) {
return dispatchCallable(new Callable<ListenableFuture<Void>>() { return dispatchCallable(new Callable<ListenableFuture<Void>>() {

View File

@ -26,8 +26,6 @@ public class GrpcFramingUtil {
// Flags // Flags
public static final byte PAYLOAD_FRAME = 0x0; public static final byte PAYLOAD_FRAME = 0x0;
public static final byte CONTEXT_VALUE_FRAME = 0x1;
public static final byte CALL_HEADER_FRAME = 0x2;
public static final byte STATUS_FRAME = 0x3; public static final byte STATUS_FRAME = 0x3;
public static final byte FRAME_TYPE_MASK = 0x3; public static final byte FRAME_TYPE_MASK = 0x3;
@ -36,10 +34,6 @@ public class GrpcFramingUtil {
*/ */
public static final int FRAME_LENGTH = 4; public static final int FRAME_LENGTH = 4;
public static boolean isContextValueFrame(int flags) {
return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME;
}
public static boolean isPayloadFrame(byte flags) { public static boolean isPayloadFrame(byte flags) {
return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME; return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME;
} }

View File

@ -14,7 +14,7 @@ public interface Operation {
public static enum Phase { public static enum Phase {
/** /**
* Used to communicate key-value pairs that define common context for the operation but * Used to communicate key-value pairs that define metadata for the call but
* that are not strictly part of the interface. Provided prior to delivering any formal * that are not strictly part of the interface. Provided prior to delivering any formal
* parameters * parameters
*/ */
@ -24,11 +24,11 @@ public interface Operation {
*/ */
PAYLOAD, PAYLOAD,
/** /**
* Used to communicate key-value pairs that define common context for the call but * Used to communicate key-value pairs that define metadata for the call but
* that are not strictly part of the interface. Provided after all formal parameters have * that are not strictly part of the interface. Provided after all formal parameters have
* been delivered. * been delivered.
*/ */
FOOTERS, TRAILERS,
/** /**
* Indicates that the operation is closed and will not accept further input. * Indicates that the operation is closed and will not accept further input.
*/ */
@ -46,25 +46,6 @@ public interface Operation {
*/ */
public Phase getPhase(); public Phase getPhase();
/**
* Add a key-value context value.
* Allowed when phase = HEADERS | FOOTERS.
* Valid next phases
* HEADERS -> PAYLOAD_FRAME | FOOTERS | CLOSED
* FOOTERS -> CLOSED
* <p>
* The {@link InputStream} message must be entirely consumed before this call returns.
* Implementations should not pass references to this stream across thread boundaries without
* taking a copy.
* <p>
* {@code payload.available()} must return the number of remaining bytes to be read.
*
* @return this object
*/
// TODO(user): Context is an incredibly general term. Consider having two signatures
// addHeader and addTrailer to follow HTTP nomenclature more closely.
public Operation addContext(String type, InputStream message, Phase nextPhase);
/** /**
* Send a payload to the receiver, indicates that more may follow. * Send a payload to the receiver, indicates that more may follow.
* Allowed when phase = PAYLOAD_FRAME * Allowed when phase = PAYLOAD_FRAME

View File

@ -17,7 +17,6 @@ public class ProtocolConstants {
// Flags // Flags
public static final int PAYLOAD_FRAME = 0x0; public static final int PAYLOAD_FRAME = 0x0;
public static final int CONTEXT_VALUE_FRAME = 0x1;
public static final int RESPONSE_STATUS_FRAME = 0x2; public static final int RESPONSE_STATUS_FRAME = 0x2;
public static final int RESERVED_FRAME = 0x3; public static final int RESERVED_FRAME = 0x3;
public static final int FRAME_TYPE_MASK = 0x3; public static final int FRAME_TYPE_MASK = 0x3;
@ -28,10 +27,6 @@ public class ProtocolConstants {
*/ */
public static final int FRAME_LENGTH = 4; public static final int FRAME_LENGTH = 4;
public static boolean isContextValueFrame(int flags) {
return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME;
}
public static boolean isPayloadFrame(byte flags) { public static boolean isPayloadFrame(byte flags) {
return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME; return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME;
} }

View File

@ -2,8 +2,6 @@ package com.google.net.stubby;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import java.io.InputStream;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -46,18 +44,6 @@ public abstract class ServerCall<ResponseT> {
@Nullable @Nullable
public abstract ListenableFuture<Void> headersRead(Metadata.Headers headers); public abstract ListenableFuture<Void> headersRead(Metadata.Headers headers);
/**
* A request context has been received. Any context messages will precede payload messages.
*
* <p>The {@code value} {@link InputStream} will be closed when the returned future completes.
* If no future is returned, the value will be closed immediately after returning from this
* method.
*/
@Nullable
@Deprecated
public abstract ListenableFuture<Void> onContext(String name, InputStream value);
/** /**
* A request payload has been receiveed. For streaming calls, there may be zero payload * A request payload has been receiveed. For streaming calls, there may be zero payload
* messages. * messages.
@ -100,17 +86,6 @@ public abstract class ServerCall<ResponseT> {
*/ */
public abstract void close(Status status, Metadata.Trailers trailers); public abstract void close(Status status, Metadata.Trailers trailers);
/**
* Send a context message. Context messages are intended for side-channel information like
* statistics and authentication.
*
* @param name key identifier of context
* @param value context value bytes
* @throws IllegalStateException if call is {@link #close}d, or after {@link #sendPayload}
*/
@Deprecated
public abstract void sendContext(String name, InputStream value);
/** /**
* Send a payload message. Payload messages are the primary form of communication associated with * Send a payload message. Payload messages are the primary form of communication associated with
* RPCs. Multiple payload messages may exist for streaming calls. * RPCs. Multiple payload messages may exist for streaming calls.

View File

@ -65,14 +65,6 @@ public class SessionClientStream implements ClientStream {
request.close(Status.OK); request.close(Status.OK);
} }
@Override
public void writeContext(String name, InputStream value, int length, Runnable accepted) {
request.addContext(name, value, Operation.Phase.HEADERS);
if (accepted != null) {
accepted.run();
}
}
@Override @Override
public void writeMessage(InputStream message, int length, Runnable accepted) { public void writeMessage(InputStream message, int length, Runnable accepted) {
request.addPayload(message, Operation.Phase.PAYLOAD); request.addPayload(message, Operation.Phase.PAYLOAD);
@ -111,18 +103,6 @@ public class SessionClientStream implements ClientStream {
} }
} }
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
try {
listener.contextRead(type, message, available(message));
return super.addContext(type, message, nextPhase);
} finally {
if (getPhase() == Phase.CLOSED) {
propagateClosed();
}
}
}
@Override @Override
public Operation addPayload(InputStream payload, Phase nextPhase) { public Operation addPayload(InputStream payload, Phase nextPhase) {
try { try {

View File

@ -7,8 +7,6 @@ import com.google.net.stubby.Channel;
import com.google.net.stubby.Metadata; import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import java.io.InputStream;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -50,11 +48,6 @@ public abstract class ForwardingChannel implements Channel {
this.delegate.halfClose(); this.delegate.halfClose();
} }
@Override
public void sendContext(String name, InputStream value, @Nullable SettableFuture<Void> accepted) {
this.delegate.sendContext(name, value, accepted);
}
@Override @Override
public void sendPayload(RequestT payload, @Nullable SettableFuture<Void> accepted) { public void sendPayload(RequestT payload, @Nullable SettableFuture<Void> accepted) {
this.delegate.sendPayload(payload, accepted); this.delegate.sendPayload(payload, accepted);
@ -77,11 +70,6 @@ public abstract class ForwardingChannel implements Channel {
return delegate.onHeaders(headers); return delegate.onHeaders(headers);
} }
@Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
return delegate.onContext(name, value);
}
@Override @Override
public ListenableFuture<Void> onPayload(T payload) { public ListenableFuture<Void> onPayload(T payload) {
return delegate.onPayload(payload); return delegate.onPayload(payload);

View File

@ -160,13 +160,6 @@ public class ServletSession extends HttpServlet {
} }
} }
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
super.addContext(type, message, nextPhase);
framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
return this;
}
@Override @Override
public Operation addPayload(InputStream payload, Phase nextPhase) { public Operation addPayload(InputStream payload, Phase nextPhase) {
super.addPayload(payload, Phase.PAYLOAD); super.addPayload(payload, Phase.PAYLOAD);

View File

@ -66,13 +66,6 @@ public class UrlConnectionClientSession implements Session {
framer.setAllowCompression(false); framer.setAllowCompression(false);
} }
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
super.addContext(type, message, nextPhase);
framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
return this;
}
@Override @Override
public Operation addPayload(InputStream payload, Phase nextPhase) { public Operation addPayload(InputStream payload, Phase nextPhase) {
super.addPayload(payload, nextPhase); super.addPayload(payload, nextPhase);

View File

@ -26,13 +26,6 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
this.framer = framer; this.framer = framer;
} }
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
super.addContext(type, message, nextPhase);
framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
return this;
}
@Override @Override
public Operation addPayload(InputStream payload, Phase nextPhase) { public Operation addPayload(InputStream payload, Phase nextPhase) {
super.addPayload(payload, nextPhase); super.addPayload(payload, nextPhase);

View File

@ -29,13 +29,6 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
this.framer = framer; this.framer = framer;
} }
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
super.addContext(type, message, nextPhase);
framer.writeContext(type, message, getPhase() == Phase.CLOSED, this);
return this;
}
@Override @Override
public Operation addPayload(InputStream payload, Phase nextPhase) { public Operation addPayload(InputStream payload, Phase nextPhase) {
super.addPayload(payload, nextPhase); super.addPayload(payload, nextPhase);

View File

@ -21,13 +21,13 @@ public abstract class AbstractStream implements Stream {
* Indicates the phase of the GRPC stream in one direction. * Indicates the phase of the GRPC stream in one direction.
*/ */
protected enum Phase { protected enum Phase {
CONTEXT, MESSAGE, STATUS HEADERS, MESSAGE, STATUS
} }
private final Object writeLock = new Object(); private final Object writeLock = new Object();
private final MessageFramer framer; private final MessageFramer framer;
protected Phase inboundPhase = Phase.CONTEXT; protected Phase inboundPhase = Phase.HEADERS;
protected Phase outboundPhase = Phase.CONTEXT; protected Phase outboundPhase = Phase.HEADERS;
/** /**
* Handler for Framer output. * Handler for Framer output.
@ -46,25 +46,12 @@ public abstract class AbstractStream implements Stream {
@Override @Override
public ListenableFuture<Void> headersRead(Metadata.Headers headers) { public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
inboundPhase(Phase.CONTEXT); inboundPhase(Phase.HEADERS);
ListenableFuture<Void> future = listener().headersRead(headers); ListenableFuture<Void> future = listener().headersRead(headers);
disableWindowUpdate(future); disableWindowUpdate(future);
return future; return future;
} }
@Override
public ListenableFuture<Void> contextRead(String name, InputStream value, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.CONTEXT);
future = listener().contextRead(name, value, length);
disableWindowUpdate(future);
return future;
} finally {
closeWhenDone(future, value);
}
}
@Override @Override
public ListenableFuture<Void> messageRead(InputStream input, int length) { public ListenableFuture<Void> messageRead(InputStream input, int length) {
ListenableFuture<Void> future = null; ListenableFuture<Void> future = null;
@ -101,25 +88,6 @@ public abstract class AbstractStream implements Stream {
} }
} }
@Override
public final void writeContext(String name, InputStream value, int length,
@Nullable Runnable accepted) {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(value, "value");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.CONTEXT);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writeContext(name, value, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override @Override
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) { public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
Preconditions.checkNotNull(message, "message"); Preconditions.checkNotNull(message, "message");

View File

@ -6,7 +6,6 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.Operation; import com.google.net.stubby.Operation;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -91,19 +90,6 @@ public abstract class Deframer<F> implements Framer.Sink<F> {
currentLength = LENGTH_NOT_SET; currentLength = LENGTH_NOT_SET;
inFrame = false; inFrame = false;
} }
} else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) {
// Not clear if using proto encoding here is of any benefit.
// Using ContextValue.parseFrom requires copying out of the framed chunk
// Writing a custom parser would have to do varint handling and potentially
// deal with out-of-order tags etc.
Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
try {
ByteString value = contextValue.getValue();
target.contextRead(contextValue.getKey(), value.newInput(), value.size());
} finally {
currentLength = LENGTH_NOT_SET;
inFrame = false;
}
} else if (GrpcFramingUtil.isStatusFrame(currentFlags)) { } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
int status = framedChunk.read() << 8 | framedChunk.read(); int status = framedChunk.read() << 8 | framedChunk.read();
Transport.Code code = Transport.Code.valueOf(status); Transport.Code code = Transport.Code.valueOf(status);

View File

@ -22,11 +22,6 @@ public class ForwardingStreamListener implements StreamListener {
return delegate.headersRead(headers); return delegate.headersRead(headers);
} }
@Override
public ListenableFuture<Void> contextRead(String name, InputStream value, int length) {
return delegate.contextRead(name, value, length);
}
@Override @Override
public ListenableFuture<Void> messageRead(InputStream message, int length) { public ListenableFuture<Void> messageRead(InputStream message, int length) {
return delegate.messageRead(message, length); return delegate.messageRead(message, length);

View File

@ -23,12 +23,6 @@ public interface Framer {
public void deliverFrame(T frame, boolean endOfStream); public void deliverFrame(T frame, boolean endOfStream);
} }
/**
* Write out a Context-Value message. {@code message} will be completely consumed.
* {@code message.available()} must return the number of remaining bytes to be read.
*/
public void writeContext(String type, InputStream message, int length);
/** /**
* Write out a Payload message. {@code payload} will be completely consumed. * Write out a Payload message. {@code payload} will be completely consumed.
* {@code payload.available()} must return the number of remaining bytes to be read. * {@code payload.available()} must return the number of remaining bytes to be read.

View File

@ -1,6 +1,5 @@
package com.google.net.stubby.newtransport; package com.google.net.stubby.newtransport;
import static com.google.net.stubby.GrpcFramingUtil.CONTEXT_VALUE_FRAME;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH; import static com.google.net.stubby.GrpcFramingUtil.FRAME_LENGTH;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH; import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_LENGTH;
import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK; import static com.google.net.stubby.GrpcFramingUtil.FRAME_TYPE_MASK;
@ -14,7 +13,6 @@ import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
/** /**
@ -181,9 +179,6 @@ public class GrpcDeframer implements Closeable {
private ListenableFuture<Void> processBody() { private ListenableFuture<Void> processBody() {
ListenableFuture<Void> future = null; ListenableFuture<Void> future = null;
switch (frameType) { switch (frameType) {
case CONTEXT_VALUE_FRAME:
future = processContext();
break;
case PAYLOAD_FRAME: case PAYLOAD_FRAME:
future = processMessage(); future = processMessage();
break; break;
@ -200,30 +195,6 @@ public class GrpcDeframer implements Closeable {
return future; return future;
} }
/**
* Processes the payload of a context frame.
*/
private ListenableFuture<Void> processContext() {
Transport.ContextValue ctx;
try {
// Not clear if using proto encoding here is of any benefit.
// Using ContextValue.parseFrom requires copying out of the framed chunk
// Writing a custom parser would have to do varint handling and potentially
// deal with out-of-order tags etc.
ctx = Transport.ContextValue.parseFrom(Buffers.openStream(nextFrame, false));
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
nextFrame.close();
nextFrame = null;
}
// Call the handler.
Buffer ctxBuffer = Buffers.wrap(ctx.getValue());
return listener.contextRead(ctx.getKey(), Buffers.openStream(ctxBuffer, true),
ctxBuffer.readableBytes());
}
/** /**
* Processes the payload of a message frame. * Processes the payload of a message frame.
*/ */

View File

@ -2,59 +2,16 @@ package com.google.net.stubby.newtransport;
import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
/** /**
* Default {@link Framer} implementation. * Default {@link Framer} implementation.
*/ */
public class MessageFramer implements Framer { public class MessageFramer implements Framer {
/**
* Size of the GRPC message frame header which consists of
* 1 byte for the type (payload, context, status)
* 4 bytes for the length of the message
*/
private static final int MESSAGE_HEADER_SIZE = 5;
/**
* UTF-8 charset which is used for key name encoding in context values
*/
private static final Charset UTF_8 = Charset.forName("UTF-8");
/**
* Precomputed protobuf tags for ContextValue
*/
private static final byte[] VALUE_TAG;
private static final byte[] KEY_TAG;
static {
// Initialize constants for serializing context-value in a protobuf compatible manner
try {
byte[] buf = new byte[8];
CodedOutputStream coded = CodedOutputStream.newInstance(buf);
coded.writeTag(Transport.ContextValue.KEY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
coded.flush();
KEY_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
coded = CodedOutputStream.newInstance(buf);
coded.writeTag(Transport.ContextValue.VALUE_FIELD_NUMBER,
WireFormat.WIRETYPE_LENGTH_DELIMITED);
coded.flush();
VALUE_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
} catch (IOException ioe) {
// Unrecoverable
throw new RuntimeException(ioe);
}
}
private CompressionFramer framer; private CompressionFramer framer;
private final ByteBuffer scratch = ByteBuffer.allocate(16); private final ByteBuffer scratch = ByteBuffer.allocate(16);
@ -71,16 +28,6 @@ public class MessageFramer implements Framer {
framer.setAllowCompression(enable); framer.setAllowCompression(enable);
} }
/**
* Set the preferred compression level for when compression is enabled.
* @param level the preferred compression level, or {@code -1} to use the framing default
* @see java.util.zip.Deflater#setLevel
*/
public void setCompressionLevel(int level) {
verifyNotClosed();
framer.setCompressionLevel(level);
}
@Override @Override
public void writePayload(InputStream message, int messageLength) { public void writePayload(InputStream message, int messageLength) {
verifyNotClosed(); verifyNotClosed();
@ -99,41 +46,6 @@ public class MessageFramer implements Framer {
} }
@Override
public void writeContext(String key, InputStream message, int messageLen) {
verifyNotClosed();
try {
scratch.clear();
scratch.put(GrpcFramingUtil.CONTEXT_VALUE_FRAME);
byte[] keyBytes = key.getBytes(UTF_8);
int lenKeyPrefix = KEY_TAG.length +
CodedOutputStream.computeRawVarint32Size(keyBytes.length);
int lenValPrefix = VALUE_TAG.length + CodedOutputStream.computeRawVarint32Size(messageLen);
int totalLen = lenKeyPrefix + keyBytes.length + lenValPrefix + messageLen;
scratch.putInt(totalLen);
framer.write(scratch.array(), 0, scratch.position());
// Write key
scratch.clear();
scratch.put(KEY_TAG);
writeRawVarInt32(keyBytes.length, scratch);
framer.write(scratch.array(), 0, scratch.position());
framer.write(keyBytes, 0, keyBytes.length);
// Write value
scratch.clear();
scratch.put(VALUE_TAG);
writeRawVarInt32(messageLen, scratch);
framer.write(scratch.array(), 0, scratch.position());
if (messageLen != framer.write(message)) {
throw new RuntimeException("Message length was inaccurate");
}
framer.endOfMessage();
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override @Override
public void writeStatus(Status status) { public void writeStatus(Status status) {
verifyNotClosed(); verifyNotClosed();
@ -178,19 +90,4 @@ public class MessageFramer implements Framer {
throw new IllegalStateException("Framer already closed"); throw new IllegalStateException("Framer already closed");
} }
} }
/**
* Write a raw VarInt32 to the buffer
*/
private static void writeRawVarInt32(int value, ByteBuffer bytebuf) {
while (true) {
if ((value & ~0x7F) == 0) {
bytebuf.put((byte) value);
return;
} else {
bytebuf.put((byte) ((value & 0x7F) | 0x80));
value >>>= 7;
}
}
}
} }

View File

@ -16,26 +16,6 @@ public interface Stream {
*/ */
StreamState state(); StreamState state();
/**
* Writes the context name/value pair to the remote end-point. The bytes from the stream are
* immediate read by the Transport. This method will always return immediately and will not wait
* for the write to complete.
*
* <p>When the write is "accepted" by the transport, the given callback (if provided) will be
* called. The definition of what it means to be "accepted" is up to the transport implementation,
* but this is a general indication that the transport is capable of handling more out-bound data
* on the stream. If the stream/connection is closed for any reason before the write could be
* accepted, the callback will never be invoked. Any writes that are still pending upon receiving
* a {@link StreamListener#closed} callback are assumed to be cancelled.
*
* @param name the unique application-defined name for the context property.
* @param value the value of the context property.
* @param length the length of the {@link InputStream}.
* @param accepted an optional callback for when the transport has accepted the write.
*/
@Deprecated
void writeContext(String name, InputStream value, int length, @Nullable Runnable accepted);
/** /**
* Writes a message payload to the remote end-point. The bytes from the stream are immediate read * Writes a message payload to the remote end-point. The bytes from the stream are immediate read
* by the Transport. This method will always return immediately and will not wait for the write to * by the Transport. This method will always return immediately and will not wait for the write to

View File

@ -25,29 +25,6 @@ public interface StreamListener {
@Nullable @Nullable
ListenableFuture<Void> headersRead(Metadata.Headers headers); ListenableFuture<Void> headersRead(Metadata.Headers headers);
/**
* Called upon receiving context information from the remote end-point. The {@link InputStream} is
* non-blocking and contains the entire context.
*
* <p>The method optionally returns a future that can be observed by flow control to determine
* when the context has been processed by the application. If {@code null} is returned, processing
* of this context is assumed to be complete upon returning from this method.
*
* <p>The {@code value} {@link InputStream} will be closed when the returned future completes. If
* no future is returned, the stream will be closed immediately after returning from this method.
*
* <p>This method should return quickly, as the same thread may be used to process other streams.
*
* @param name the unique name of the context
* @param value the value of the context.
* @param length the length of the value {@link InputStream}.
* @return a processing completion future, or {@code null} to indicate that processing of the
* context is immediately complete.
*/
@Nullable
@Deprecated
ListenableFuture<Void> contextRead(String name, InputStream value, int length);
/** /**
* Called upon receiving a message from the remote end-point. The {@link InputStream} is * Called upon receiving a message from the remote end-point. The {@link InputStream} is
* non-blocking and contains the entire message. * non-blocking and contains the entire message.

View File

@ -39,39 +39,9 @@ public final class TransportFrameUtil {
public static final int COMPRESSION_HEADER_LENGTH = public static final int COMPRESSION_HEADER_LENGTH =
COMPRESSION_TYPE_LENGTH + COMPRESSION_FRAME_LENGTH; COMPRESSION_TYPE_LENGTH + COMPRESSION_FRAME_LENGTH;
/**
* Length of flags block in bytes
*/
public static final int FRAME_TYPE_LENGTH = 1;
// Flags // Flags
public static final byte PAYLOAD_FRAME = 0x0; public static final byte PAYLOAD_FRAME = 0x0;
public static final byte CONTEXT_VALUE_FRAME = 0x1;
public static final byte CALL_HEADER_FRAME = 0x2;
public static final byte STATUS_FRAME = 0x3; public static final byte STATUS_FRAME = 0x3;
public static final byte FRAME_TYPE_MASK = 0x3;
/**
* Number of bytes for the length field within a frame
*/
public static final int FRAME_LENGTH = 4;
/**
* Full length of the GRPC frame header.
*/
public static final int FRAME_HEADER_LENGTH = FRAME_TYPE_LENGTH + FRAME_LENGTH;
public static boolean isContextValueFrame(int flags) {
return (flags & FRAME_TYPE_MASK) == CONTEXT_VALUE_FRAME;
}
public static boolean isPayloadFrame(byte flags) {
return (flags & FRAME_TYPE_MASK) == PAYLOAD_FRAME;
}
public static boolean isStatusFrame(byte flags) {
return (flags & FRAME_TYPE_MASK) == STATUS_FRAME;
}
// TODO(user): This needs proper namespacing support, this is currently just a hack // TODO(user): This needs proper namespacing support, this is currently just a hack
/** /**

View File

@ -73,20 +73,6 @@ public abstract class Deframer<F> {
currentLength = LENGTH_NOT_SET; currentLength = LENGTH_NOT_SET;
inFrame = false; inFrame = false;
} }
} else if (GrpcFramingUtil.isContextValueFrame(currentFlags)) {
// Not clear if using proto encoding here is of any benefit.
// Using ContextValue.parseFrom requires copying out of the framed chunk
// Writing a custom parser would have to do varint handling and potentially
// deal with out-of-order tags etc.
Transport.ContextValue contextValue = Transport.ContextValue.parseFrom(framedChunk);
try {
target.addContext(contextValue.getKey(),
contextValue.getValue().newInput(),
target.getPhase());
} finally {
currentLength = LENGTH_NOT_SET;
inFrame = false;
}
} else if (GrpcFramingUtil.isStatusFrame(currentFlags)) { } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
int status = framedChunk.read() << 8 | framedChunk.read(); int status = framedChunk.read() << 8 | framedChunk.read();
Transport.Code code = Transport.Code.valueOf(status); Transport.Code code = Transport.Code.valueOf(status);

View File

@ -24,12 +24,6 @@ public interface Framer {
public void deliverFrame(ByteBuffer frame, boolean endOfMessage); public void deliverFrame(ByteBuffer frame, boolean endOfMessage);
} }
/**
* Write out a Context-Value message. {@code message} will be completely consumed.
* {@code message.available()} must return the number of remaining bytes to be read.
*/
public void writeContext(String type, InputStream message, boolean flush, Sink sink);
/** /**
* Write out a Payload message. {@code payload} will be completely consumed. * Write out a Payload message. {@code payload} will be completely consumed.
* {@code payload.available()} must return the number of remaining bytes to be read. * {@code payload.available()} must return the number of remaining bytes to be read.

View File

@ -2,58 +2,16 @@ package com.google.net.stubby.transport;
import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.WireFormat;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.Arrays;
/** /**
* Default {@link Framer} implementation. * Default {@link Framer} implementation.
*/ */
public class MessageFramer implements Framer { public class MessageFramer implements Framer {
/**
* Size of the GRPC message frame header which consists of
* 1 byte for the type (payload, context, status)
* 4 bytes for the length of the message
*/
private static final int MESSAGE_HEADER_SIZE = 5;
/**
* UTF-8 charset which is used for key name encoding in context values
*/
private static final Charset UTF_8 = Charset.forName("UTF-8");
/**
* Precomputed protobuf tags for ContextValue
*/
private static final byte[] VALUE_TAG;
private static final byte[] KEY_TAG;
static {
// Initialize constants for serializing context-value in a protobuf compatible manner
try {
byte[] buf = new byte[8];
CodedOutputStream coded = CodedOutputStream.newInstance(buf);
coded.writeTag(Transport.ContextValue.KEY_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
coded.flush();
KEY_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
coded = CodedOutputStream.newInstance(buf);
coded.writeTag(Transport.ContextValue.VALUE_FIELD_NUMBER,
WireFormat.WIRETYPE_LENGTH_DELIMITED);
coded.flush();
VALUE_TAG = Arrays.copyOf(buf, coded.getTotalBytesWritten());
} catch (IOException ioe) {
// Unrecoverable
throw new RuntimeException(ioe);
}
}
private CompressionFramer framer; private CompressionFramer framer;
private final ByteBuffer scratch = ByteBuffer.allocate(16); private final ByteBuffer scratch = ByteBuffer.allocate(16);
@ -69,15 +27,6 @@ public class MessageFramer implements Framer {
framer.setAllowCompression(enable); framer.setAllowCompression(enable);
} }
/**
* Set the preferred compression level for when compression is enabled.
* @param level the preferred compression level, or {@code -1} to use the framing default
* @see java.util.zip.Deflater#setLevel
*/
public void setCompressionLevel(int level) {
framer.setCompressionLevel(level);
}
@Override @Override
public void writePayload(InputStream message, boolean flush, Sink sink) { public void writePayload(InputStream message, boolean flush, Sink sink) {
try { try {
@ -98,45 +47,6 @@ public class MessageFramer implements Framer {
} }
} }
@Override
public void writeContext(String key, InputStream message, boolean flush, Sink sink) {
try {
scratch.clear();
scratch.put(GrpcFramingUtil.CONTEXT_VALUE_FRAME);
byte[] keyBytes = key.getBytes(UTF_8);
int lenKeyPrefix = KEY_TAG.length +
CodedOutputStream.computeRawVarint32Size(keyBytes.length);
int messageLen = message.available();
int lenValPrefix = VALUE_TAG.length + CodedOutputStream.computeRawVarint32Size(messageLen);
int totalLen = lenKeyPrefix + keyBytes.length + lenValPrefix + messageLen;
scratch.putInt(totalLen);
framer.write(scratch.array(), 0, scratch.position(), sink);
// Write key
scratch.clear();
scratch.put(KEY_TAG);
writeRawVarInt32(keyBytes.length, scratch);
framer.write(scratch.array(), 0, scratch.position(), sink);
framer.write(keyBytes, 0, keyBytes.length, sink);
// Write value
scratch.clear();
scratch.put(VALUE_TAG);
writeRawVarInt32(messageLen, scratch);
framer.write(scratch.array(), 0, scratch.position(), sink);
if (messageLen != framer.write(message, sink)) {
throw new RuntimeException("InputStream's available() was inaccurate");
}
framer.endOfMessage(sink);
if (flush && framer != null) {
framer.flush(sink);
}
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
@Override @Override
public void writeStatus(Status status, boolean flush, Sink sink) { public void writeStatus(Status status, boolean flush, Sink sink) {
short code = (short) status.getCode().getNumber(); short code = (short) status.getCode().getNumber();
@ -162,19 +72,4 @@ public class MessageFramer implements Framer {
// TODO(user): Returning buffer to a pool would go here // TODO(user): Returning buffer to a pool would go here
framer = null; framer = null;
} }
/**
* Write a raw VarInt32 to the buffer
*/
private static void writeRawVarInt32(int value, ByteBuffer bytebuf) {
while (true) {
if ((value & ~0x7F) == 0) {
bytebuf.put((byte) value);
return;
} else {
bytebuf.put((byte) ((value & 0x7F) | 0x80));
value >>>= 7;
}
}
}
} }

View File

@ -1,13 +1,11 @@
package com.google.net.stubby.newtransport; package com.google.net.stubby.newtransport;
import static com.google.net.stubby.newtransport.TransportFrameUtil.CONTEXT_VALUE_FRAME;
import static com.google.net.stubby.newtransport.TransportFrameUtil.PAYLOAD_FRAME; import static com.google.net.stubby.newtransport.TransportFrameUtil.PAYLOAD_FRAME;
import static com.google.net.stubby.newtransport.TransportFrameUtil.STATUS_FRAME; import static com.google.net.stubby.newtransport.TransportFrameUtil.STATUS_FRAME;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.notNull; import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -44,66 +42,33 @@ import javax.annotation.Nullable;
*/ */
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class GrpcDeframerTest { public class GrpcDeframerTest {
private static final String KEY = "key";
private static final String MESSAGE = "hello world"; private static final String MESSAGE = "hello world";
private static final ByteString MESSAGE_BSTR = ByteString.copyFromUtf8(MESSAGE); private static final ByteString MESSAGE_BSTR = ByteString.copyFromUtf8(MESSAGE);
private static final Transport.Code STATUS_CODE = Transport.Code.CANCELLED; private static final Transport.Code STATUS_CODE = Transport.Code.CANCELLED;
private GrpcDeframer reader; private GrpcDeframer reader;
private Transport.ContextValue contextProto;
private StubDecompressor decompressor; private StubDecompressor decompressor;
@Mock @Mock
private StreamListener listener; private StreamListener listener;
private SettableFuture<Void> contextFuture;
private SettableFuture<Void> messageFuture; private SettableFuture<Void> messageFuture;
@Before @Before
public void setup() { public void setup() {
MockitoAnnotations.initMocks(this); MockitoAnnotations.initMocks(this);
contextFuture = SettableFuture.create();
messageFuture = SettableFuture.create(); messageFuture = SettableFuture.create();
when(listener.contextRead(anyString(), any(InputStream.class), anyInt())).thenReturn(
contextFuture);
when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture); when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(messageFuture);
decompressor = new StubDecompressor(); decompressor = new StubDecompressor();
reader = new GrpcDeframer(decompressor, listener, MoreExecutors.directExecutor()); reader = new GrpcDeframer(decompressor, listener, MoreExecutors.directExecutor());
contextProto = Transport.ContextValue.newBuilder().setKey(KEY).setValue(MESSAGE_BSTR).build();
}
@Test
public void contextShouldCallTarget() throws Exception {
decompressor.init(contextFrame());
reader.deframe(Buffers.empty(), false);
verifyContext();
verifyNoPayload();
verifyNoStatus();
}
@Test
public void contextWithEndOfStreamShouldNotifyStatus() throws Exception {
decompressor.init(contextFrame());
reader.deframe(Buffers.empty(), true);
verifyContext();
verifyNoPayload();
verifyNoStatus();
contextFuture.set(null);
verifyStatus(Transport.Code.OK);
} }
@Test @Test
public void payloadShouldCallTarget() throws Exception { public void payloadShouldCallTarget() throws Exception {
decompressor.init(payloadFrame()); decompressor.init(payloadFrame());
reader.deframe(Buffers.empty(), false); reader.deframe(Buffers.empty(), false);
verifyNoContext();
verifyPayload(); verifyPayload();
verifyNoStatus(); verifyNoStatus();
} }
@ -112,7 +77,6 @@ public class GrpcDeframerTest {
public void payloadWithEndOfStreamShouldNotifyStatus() throws Exception { public void payloadWithEndOfStreamShouldNotifyStatus() throws Exception {
decompressor.init(payloadFrame()); decompressor.init(payloadFrame());
reader.deframe(Buffers.empty(), true); reader.deframe(Buffers.empty(), true);
verifyNoContext();
verifyPayload(); verifyPayload();
verifyNoStatus(); verifyNoStatus();
@ -124,7 +88,6 @@ public class GrpcDeframerTest {
public void statusShouldCallTarget() throws Exception { public void statusShouldCallTarget() throws Exception {
decompressor.init(statusFrame()); decompressor.init(statusFrame());
reader.deframe(Buffers.empty(), false); reader.deframe(Buffers.empty(), false);
verifyNoContext();
verifyNoPayload(); verifyNoPayload();
verifyStatus(); verifyStatus();
} }
@ -133,7 +96,6 @@ public class GrpcDeframerTest {
public void statusWithEndOfStreamShouldNotifyStatusOnce() throws Exception { public void statusWithEndOfStreamShouldNotifyStatusOnce() throws Exception {
decompressor.init(statusFrame()); decompressor.init(statusFrame());
reader.deframe(Buffers.empty(), true); reader.deframe(Buffers.empty(), true);
verifyNoContext();
verifyNoPayload(); verifyNoPayload();
verifyStatus(); verifyStatus();
} }
@ -143,9 +105,6 @@ public class GrpcDeframerTest {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os); DataOutputStream dos = new DataOutputStream(os);
// Write a context frame.
writeFrame(CONTEXT_VALUE_FRAME, contextProto.toByteArray(), dos);
// Write a payload frame. // Write a payload frame.
writeFrame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray(), dos); writeFrame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray(), dos);
@ -161,11 +120,6 @@ public class GrpcDeframerTest {
reader.deframe(Buffers.empty(), false); reader.deframe(Buffers.empty(), false);
// Verify that all callbacks were called. // Verify that all callbacks were called.
verifyContext();
verifyNoPayload();
verifyNoStatus();
contextFuture.set(null);
verifyPayload(); verifyPayload();
verifyNoStatus(); verifyNoStatus();
@ -187,7 +141,6 @@ public class GrpcDeframerTest {
byte[] chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); byte[] chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx);
decompressor.init(chunk); decompressor.init(chunk);
reader.deframe(Buffers.empty(), false); reader.deframe(Buffers.empty(), false);
verifyNoContext();
verifyNoPayload(); verifyNoPayload();
verifyNoStatus(); verifyNoStatus();
@ -197,17 +150,10 @@ public class GrpcDeframerTest {
chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx); chunk = Arrays.copyOfRange(fullBuffer, startIx, endIx);
decompressor.init(chunk); decompressor.init(chunk);
reader.deframe(Buffers.empty(), false); reader.deframe(Buffers.empty(), false);
verifyNoContext();
verifyPayload(); verifyPayload();
verifyNoStatus(); verifyNoStatus();
} }
private void verifyContext() {
ArgumentCaptor<InputStream> captor = ArgumentCaptor.forClass(InputStream.class);
verify(listener).contextRead(eq(KEY), captor.capture(), eq(MESSAGE.length()));
assertEquals(MESSAGE, readString(captor.getValue(), MESSAGE.length()));
}
private void verifyPayload() { private void verifyPayload() {
ArgumentCaptor<InputStream> captor = ArgumentCaptor.forClass(InputStream.class); ArgumentCaptor<InputStream> captor = ArgumentCaptor.forClass(InputStream.class);
verify(listener).messageRead(captor.capture(), eq(MESSAGE.length())); verify(listener).messageRead(captor.capture(), eq(MESSAGE.length()));
@ -234,10 +180,6 @@ public class GrpcDeframerTest {
assertEquals(code, captor.getValue().getCode()); assertEquals(code, captor.getValue().getCode());
} }
private void verifyNoContext() {
verify(listener, never()).contextRead(any(String.class), any(InputStream.class), anyInt());
}
private void verifyNoPayload() { private void verifyNoPayload() {
verify(listener, never()).messageRead(any(InputStream.class), anyInt()); verify(listener, never()).messageRead(any(InputStream.class), anyInt());
} }
@ -246,10 +188,6 @@ public class GrpcDeframerTest {
verify(listener, never()).closed(any(Status.class), notNull(Metadata.Trailers.class)); verify(listener, never()).closed(any(Status.class), notNull(Metadata.Trailers.class));
} }
private byte[] contextFrame() throws IOException {
return frame(CONTEXT_VALUE_FRAME, contextProto.toByteArray());
}
private static byte[] payloadFrame() throws IOException { private static byte[] payloadFrame() throws IOException {
return frame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray()); return frame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray());
} }

View File

@ -9,7 +9,6 @@ import com.google.common.primitives.Bytes;
import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport;
import com.google.protobuf.ByteString;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -53,37 +52,6 @@ public class MessageFramerTest {
} }
} }
@Test
public void testContext() throws Exception {
CapturingSink sink = new CapturingSink();
MessageFramer framer = new MessageFramer(sink, TRANSPORT_FRAME_SIZE);
byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
byte[] contextValue = Transport.ContextValue.newBuilder()
.setKey("somekey")
.setValue(ByteString.copyFrom(payload))
.build().toByteArray();
byte[] unframedStream =
Bytes.concat(
new byte[]{GrpcFramingUtil.CONTEXT_VALUE_FRAME},
new byte[]{0, 0,
(byte) (contextValue.length >> 8 & 0xff),
(byte) (contextValue.length & 0xff)},
contextValue);
for (int i = 0; i < 1000; i++) {
framer.writeContext("somekey", new ByteArrayInputStream(payload), payload.length);
if ((i + 1) % 13 == 0) {
framer.flush();
}
}
framer.flush();
assertEquals(unframedStream.length * 1000, sink.deframedStream.length);
for (int i = 0; i < 1000; i++) {
assertArrayEquals(unframedStream,
Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
(i + 1) * unframedStream.length));
}
}
@Test @Test
public void testStatus() throws Exception { public void testStatus() throws Exception {
CapturingSink sink = new CapturingSink(); CapturingSink sink = new CapturingSink();

View File

@ -42,16 +42,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
verify(channel).writeAndFlush(any(CancelStreamCommand.class)); verify(channel).writeAndFlush(any(CancelStreamCommand.class));
} }
@Test
public void writeContextShouldSendRequest() throws Exception {
// Force stream creation.
stream().id(STREAM_ID);
stream.writeContext(CONTEXT_KEY, input, input.available(), accepted);
stream.flush();
verify(channel).writeAndFlush(new SendGrpcFrameCommand(1, contextFrame(), false));
verify(accepted).run();
}
@Test @Test
public void writeMessageShouldSendRequest() throws Exception { public void writeMessageShouldSendRequest() throws Exception {
// Force stream creation. // Force stream creation.
@ -97,16 +87,6 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
assertEquals(StreamState.CLOSED, stream.state()); assertEquals(StreamState.CLOSED, stream.state());
} }
@Override
@Test
public void inboundContextShouldCallListener() throws Exception {
// Receive headers first so that it's a valid GRPC response.
stream().id(1);
stream().inboundHeadersRecieved(grpcResponseHeaders(), false);
super.inboundContextShouldCallListener();
}
@Override @Override
@Test @Test
public void inboundMessageShouldCallListener() throws Exception { public void inboundMessageShouldCallListener() throws Exception {

View File

@ -23,15 +23,6 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
public class NettyServerStreamTest extends NettyStreamTestBase { public class NettyServerStreamTest extends NettyStreamTestBase {
@Test
public void writeContextShouldSendResponse() throws Exception {
stream.writeContext(CONTEXT_KEY, input, input.available(), accepted);
stream.flush();
verify(channel).write(new SendResponseHeadersCommand(STREAM_ID));
verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, contextFrame(), false));
verify(accepted).run();
}
@Test @Test
public void writeMessageShouldSendResponse() throws Exception { public void writeMessageShouldSendResponse() throws Exception {
stream.writeMessage(input, input.available(), accepted); stream.writeMessage(input, input.available(), accepted);

View File

@ -1,6 +1,5 @@
package com.google.net.stubby.newtransport.netty; package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.GrpcFramingUtil.CONTEXT_VALUE_FRAME;
import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME; import static com.google.net.stubby.GrpcFramingUtil.PAYLOAD_FRAME;
import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME; import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.DEFAULT_WINDOW_UPDATE_RATIO; import static io.netty.handler.codec.http2.DefaultHttp2InboundFlowController.DEFAULT_WINDOW_UPDATE_RATIO;
@ -10,7 +9,6 @@ import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
@ -21,8 +19,6 @@ import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.StreamListener; import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.transport.Transport.ContextValue;
import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -55,7 +51,6 @@ import java.util.concurrent.TimeUnit;
* Base class for Netty stream unit tests. * Base class for Netty stream unit tests.
*/ */
public abstract class NettyStreamTestBase { public abstract class NettyStreamTestBase {
protected static final String CONTEXT_KEY = "key";
protected static final String MESSAGE = "hello world"; protected static final String MESSAGE = "hello world";
protected static final int STREAM_ID = 1; protected static final int STREAM_ID = 1;
@ -106,8 +101,6 @@ public abstract class NettyStreamTestBase {
when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.inEventLoop()).thenReturn(true);
processingFuture = SettableFuture.create(); processingFuture = SettableFuture.create();
when(listener.contextRead(anyString(), any(InputStream.class), anyInt())).thenReturn(
processingFuture);
when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(processingFuture); when(listener.messageRead(any(InputStream.class), anyInt())).thenReturn(processingFuture);
doAnswer(new Answer<Void>() { doAnswer(new Answer<Void>() {
@ -123,25 +116,6 @@ public abstract class NettyStreamTestBase {
stream = createStream(); stream = createStream();
} }
@Test
public void inboundContextShouldCallListener() throws Exception {
stream.inboundDataReceived(contextFrame(), false);
ArgumentCaptor<InputStream> captor = ArgumentCaptor.forClass(InputStream.class);
verify(listener).contextRead(eq(CONTEXT_KEY), captor.capture(), eq(MESSAGE.length()));
// Verify that inbound flow control window update has been disabled for the stream.
verify(inboundFlow).setWindowUpdateRatio(eq(ctx), eq(STREAM_ID), eq(WINDOW_UPDATE_OFF));
verify(inboundFlow, never()).setWindowUpdateRatio(eq(ctx), eq(STREAM_ID),
eq(DEFAULT_WINDOW_UPDATE_RATIO));
assertEquals(MESSAGE, toString(captor.getValue()));
// Verify that inbound flow control window update has been re-enabled for the stream after
// the future completes.
processingFuture.set(null);
verify(inboundFlow).setWindowUpdateRatio(eq(ctx), eq(STREAM_ID),
eq(DEFAULT_WINDOW_UPDATE_RATIO));
}
@Test @Test
public void inboundMessageShouldCallListener() throws Exception { public void inboundMessageShouldCallListener() throws Exception {
stream.inboundDataReceived(messageFrame(), false); stream.inboundDataReceived(messageFrame(), false);
@ -169,24 +143,6 @@ public abstract class NettyStreamTestBase {
return new String(bytes, UTF_8); return new String(bytes, UTF_8);
} }
protected final ByteBuf contextFrame() throws Exception {
byte[] body = ContextValue
.newBuilder()
.setKey(CONTEXT_KEY)
.setValue(ByteString.copyFromUtf8(MESSAGE))
.build()
.toByteArray();
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os);
dos.write(CONTEXT_VALUE_FRAME);
dos.writeInt(body.length);
dos.write(body);
dos.close();
// Write the compression header followed by the context frame.
return compressionFrame(os.toByteArray());
}
protected final ByteBuf messageFrame() throws Exception { protected final ByteBuf messageFrame() throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os); DataOutputStream dos = new DataOutputStream(os);

View File

@ -7,7 +7,9 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.any; import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq; import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -20,8 +22,6 @@ import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.ClientFra
import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.OkHttpClientStream; import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.OkHttpClientStream;
import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code; import com.google.net.stubby.transport.Transport.Code;
import com.google.net.stubby.transport.Transport.ContextValue;
import com.google.protobuf.ByteString;
import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.FrameReader; import com.squareup.okhttp.internal.spdy.FrameReader;
@ -64,7 +64,6 @@ public class OkHttpClientTransportTest {
// Flags // Flags
private static final byte PAYLOAD_FRAME = 0x0; private static final byte PAYLOAD_FRAME = 0x0;
public static final byte CONTEXT_VALUE_FRAME = 0x1;
public static final byte STATUS_FRAME = 0x3; public static final byte STATUS_FRAME = 0x3;
@Mock @Mock
@ -143,30 +142,6 @@ public class OkHttpClientTransportTest {
} }
} }
@Test
public void readContexts() throws Exception {
final int numContexts = 10;
final String key = "KEY";
final String value = "value";
MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method,new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3));
for (int i = 0; i < numContexts; i++) {
BufferedSource source = mock(BufferedSource.class);
InputStream inputStream = createContextFrame(key + i, value + i);
when(source.inputStream()).thenReturn(inputStream);
frameHandler.data(i == numContexts - 1 ? true : false, 3, source, inputStream.available());
}
listener.waitUntilStreamClosed();
assertEquals(Status.OK, listener.status);
assertEquals(numContexts, listener.contexts.size());
for (int i = 0; i < numContexts; i++) {
String val = listener.contexts.get(key + i);
assertNotNull(val);
assertEquals(value + i, val);
}
}
@Test @Test
public void readStatus() throws Exception { public void readStatus() throws Exception {
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
@ -218,25 +193,6 @@ public class OkHttpClientTransportTest {
checkSameInputStream(createMessageFrame(message), sentFrame.inputStream()); checkSameInputStream(createMessageFrame(message), sentFrame.inputStream());
} }
@Test
public void writeContext() throws Exception {
final String key = "KEY";
final String value = "VALUE";
MockStreamListener listener = new MockStreamListener();
clientTransport.newStream(method,new Metadata.Headers(), listener);
OkHttpClientStream stream = streams.get(3);
InputStream input = new ByteArrayInputStream(value.getBytes(StandardCharsets.UTF_8));
stream.writeContext(key, input, input.available(), null);
stream.flush();
ArgumentCaptor<Buffer> captor =
ArgumentCaptor.forClass(Buffer.class);
verify(frameWriter).data(eq(false), eq(3), captor.capture());
stream.cancel();
verify(frameWriter).rstStream(eq(3), eq(ErrorCode.CANCEL));
listener.waitUntilStreamClosed();
assertEquals(OkHttpClientTransport.toGrpcStatus(ErrorCode.CANCEL), listener.status);
}
@Test @Test
public void windowUpdate() throws Exception { public void windowUpdate() throws Exception {
MockStreamListener listener1 = new MockStreamListener(); MockStreamListener listener1 = new MockStreamListener();
@ -249,26 +205,7 @@ public class OkHttpClientTransportTest {
int messageLength = OkHttpClientTransport.DEFAULT_INITIAL_WINDOW_SIZE / 4; int messageLength = OkHttpClientTransport.DEFAULT_INITIAL_WINDOW_SIZE / 4;
byte[] fakeMessage = new byte[messageLength]; byte[] fakeMessage = new byte[messageLength];
byte[] contextBody = ContextValue
.newBuilder()
.setKey("KEY")
.setValue(ByteString.copyFrom(fakeMessage))
.build()
.toByteArray();
// Stream 1 receives context
InputStream contextFrame = createContextFrame(contextBody);
int contextFrameLength = contextFrame.available();
BufferedSource source = mock(BufferedSource.class); BufferedSource source = mock(BufferedSource.class);
when(source.inputStream()).thenReturn(contextFrame);
frameHandler.data(false, 3, source, contextFrame.available());
// Stream 2 receives context
contextFrame = createContextFrame(contextBody);
when(source.inputStream()).thenReturn(contextFrame);
frameHandler.data(false, 5, source, contextFrame.available());
verify(frameWriter).windowUpdate(eq(0), eq((long) 2 * contextFrameLength));
// Stream 1 receives a message // Stream 1 receives a message
InputStream messageFrame = createMessageFrame(fakeMessage); InputStream messageFrame = createMessageFrame(fakeMessage);
@ -276,14 +213,27 @@ public class OkHttpClientTransportTest {
when(source.inputStream()).thenReturn(messageFrame); when(source.inputStream()).thenReturn(messageFrame);
frameHandler.data(false, 3, source, messageFrame.available()); frameHandler.data(false, 3, source, messageFrame.available());
verify(frameWriter).windowUpdate(eq(3), eq((long) contextFrameLength + messageFrameLength));
// Stream 2 receives a message // Stream 2 receives a message
messageFrame = createMessageFrame(fakeMessage); messageFrame = createMessageFrame(fakeMessage);
when(source.inputStream()).thenReturn(messageFrame); when(source.inputStream()).thenReturn(messageFrame);
frameHandler.data(false, 5, source, messageFrame.available()); frameHandler.data(false, 5, source, messageFrame.available());
verify(frameWriter).windowUpdate(eq(5), eq((long) contextFrameLength + messageFrameLength)); verify(frameWriter).windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
reset(frameWriter);
// Stream 1 receives another message
messageFrame = createMessageFrame(fakeMessage);
when(source.inputStream()).thenReturn(messageFrame);
frameHandler.data(false, 3, source, messageFrame.available());
verify(frameWriter).windowUpdate(eq(3), eq((long) 2 * messageFrameLength));
// Stream 2 receives another message
messageFrame = createMessageFrame(fakeMessage);
when(source.inputStream()).thenReturn(messageFrame);
frameHandler.data(false, 5, source, messageFrame.available());
verify(frameWriter).windowUpdate(eq(5), eq((long) 2 * messageFrameLength));
verify(frameWriter).windowUpdate(eq(0), eq((long) 2 * messageFrameLength)); verify(frameWriter).windowUpdate(eq(0), eq((long) 2 * messageFrameLength));
stream1.cancel(); stream1.cancel();
@ -426,29 +376,6 @@ public class OkHttpClientTransportTest {
return addCompressionHeader(messageFrame); return addCompressionHeader(messageFrame);
} }
private static InputStream createContextFrame(String key, String value) throws IOException {
byte[] body = ContextValue
.newBuilder()
.setKey(key)
.setValue(ByteString.copyFromUtf8(value))
.build()
.toByteArray();
return createContextFrame(body);
}
private static InputStream createContextFrame(byte[] body) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os);
dos.write(CONTEXT_VALUE_FRAME);
dos.writeInt(body.length);
dos.write(body);
dos.close();
byte[] contextFrame = os.toByteArray();
// Write the compression header followed by the context frame.
return addCompressionHeader(contextFrame);
}
private static InputStream createStatusFrame(short code) throws IOException { private static InputStream createStatusFrame(short code) throws IOException {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os); DataOutputStream dos = new DataOutputStream(os);
@ -516,16 +443,6 @@ public class OkHttpClientTransportTest {
ArrayList<String> messages = new ArrayList<String>(); ArrayList<String> messages = new ArrayList<String>();
Map<String, String> contexts = new HashMap<String, String>(); Map<String, String> contexts = new HashMap<String, String>();
@Override
public ListenableFuture<Void> contextRead(String name, InputStream value, int length) {
String valueStr = getContent(value);
if (valueStr != null) {
// We assume only one context for each name.
contexts.put(name, valueStr);
}
return null;
}
@Override @Override
public ListenableFuture<Void> headersRead(Metadata.Headers headers) { public ListenableFuture<Void> headersRead(Metadata.Headers headers) {
return null; return null;

View File

@ -8,7 +8,6 @@ import com.google.common.io.ByteBuffers;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.protobuf.ByteString;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -52,37 +51,6 @@ public class MessageFramerTest {
} }
} }
@Test
public void testContext() throws Exception {
MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE);
byte[] payload = new byte[]{11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23};
byte[] contextValue = Transport.ContextValue.newBuilder()
.setKey("somekey")
.setValue(ByteString.copyFrom(payload))
.build().toByteArray();
byte[] unframedStream =
Bytes.concat(
new byte[]{GrpcFramingUtil.CONTEXT_VALUE_FRAME},
new byte[]{0, 0,
(byte) (contextValue.length >> 8 & 0xff),
(byte) (contextValue.length & 0xff)},
contextValue);
CapturingSink sink = new CapturingSink();
for (int i = 0; i < 1000; i++) {
framer.writeContext("somekey", new ByteArrayInputStream(payload), (i % 17 == 11), sink);
if ((i + 1) % 13 == 0) {
framer.flush(sink);
}
}
framer.flush(sink);
assertEquals(unframedStream.length * 1000, sink.deframedStream.length);
for (int i = 0; i < 1000; i++) {
assertArrayEquals(unframedStream,
Arrays.copyOfRange(sink.deframedStream, i * unframedStream.length,
(i + 1) * unframedStream.length));
}
}
@Test @Test
public void testStatus() throws Exception { public void testStatus() throws Exception {
MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE); MessageFramer framer = new MessageFramer(TRANSPORT_FRAME_SIZE);

View File

@ -11,7 +11,6 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport;
import java.io.InputStream;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
@ -215,12 +214,6 @@ public class Calls {
return null; return null;
} }
@Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// StreamObservers don't receive contexts.
return null;
}
@Override @Override
public ListenableFuture<Void> onPayload(T payload) { public ListenableFuture<Void> onPayload(T payload) {
observer.onValue(payload); observer.onValue(payload);
@ -253,12 +246,6 @@ public class Calls {
return null; return null;
} }
@Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// Don't care about contexts.
return null;
}
@Override @Override
public ListenableFuture<Void> onPayload(RespT value) { public ListenableFuture<Void> onPayload(RespT value) {
if (this.value != null) { if (this.value != null) {
@ -338,12 +325,6 @@ public class Calls {
private class QueuingListener extends Call.Listener<T> { private class QueuingListener extends Call.Listener<T> {
private boolean done = false; private boolean done = false;
@Override
public ListenableFuture<Void> onContext(String name, InputStream value) {
// Don't care about contexts.
return null;
}
@Override @Override
public ListenableFuture<Void> onHeaders(Metadata.Headers headers) { public ListenableFuture<Void> onHeaders(Metadata.Headers headers) {
return null; return null;