core: Delete AbstractStream (replaced by AbstractStream2)

AbstractStream2 will be renamed in a later commit.
This commit is contained in:
Eric Anderson 2017-04-13 16:04:46 -07:00
parent 55e3b71888
commit b661ac7d73
7 changed files with 6 additions and 1503 deletions

View File

@ -1,336 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import io.grpc.Metadata;
import io.grpc.Status;
import java.io.InputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The abstract base class for {@link ClientStream} implementations.
*/
public abstract class AbstractClientStream extends AbstractStream
implements ClientStream {
private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName());
private final StatsTraceContext statsTraceCtx;
private ClientStreamListener listener;
private boolean listenerClosed;
// Stored status & trailers to report when deframer completes or
// transportReportStatus is directly called.
private Status status;
private Metadata trailers;
private Runnable closeListenerTask;
private volatile boolean cancelled;
protected AbstractClientStream(WritableBufferAllocator bufferAllocator, int maxMessageSize,
StatsTraceContext statsTraceCtx) {
super(bufferAllocator, maxMessageSize, statsTraceCtx);
this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx");
}
@Override
public void setMaxInboundMessageSize(int maxSize) {
setMaxInboundMessageSizeProtected(maxSize);
}
@Override
public void setMaxOutboundMessageSize(int maxSize) {
setMaxOutboundMessageSizeProtected(maxSize);
}
@Override
protected final ClientStreamListener listener() {
return listener;
}
/**
* Indicates that the stream may proceed to do traffic. Sub classes should override and invoke
* this method after they have been created.
*/
@Override
public void start(ClientStreamListener listener) {
checkState(this.listener == null, "stream already started");
this.listener = checkNotNull(listener, "listener");
}
@Override
protected void receiveMessage(InputStream is) {
if (!listenerClosed) {
checkState(listener != null, "stream not started");
listener.messageRead(is);
}
}
/**
* The transport implementation has detected a protocol error on the stream. Transports are
* responsible for properly closing streams when protocol errors occur.
*
* @param errorStatus the error to report
* @param metadata any metadata received
*/
protected void inboundTransportError(Status errorStatus, Metadata metadata) {
Preconditions.checkNotNull(metadata, "metadata");
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received transport error on closed stream {0} {1}",
new Object[]{id(), errorStatus});
return;
}
// For transport errors we immediately report status to the application layer
// and do not wait for additional payloads.
transportReportStatus(errorStatus, false, metadata);
}
/**
* Called by transport implementations when they receive headers.
*
* @param headers the parsed headers
*/
protected void inboundHeadersReceived(Metadata headers) {
checkState(listener != null, "stream not started");
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received headers on closed stream {0} {1}",
new Object[]{id(), headers});
}
inboundPhase(Phase.MESSAGE);
listener.headersRead(headers);
}
/**
* Processes the contents of a received data frame from the server.
*
* @param frame the received data frame. Its ownership is transferred to this method.
*/
protected void inboundDataReceived(ReadableBuffer frame) {
Preconditions.checkNotNull(frame, "frame");
boolean needToCloseFrame = true;
try {
if (inboundPhase() == Phase.STATUS) {
return;
}
if (inboundPhase() == Phase.HEADERS) {
// Have not received headers yet so error
inboundTransportError(Status.INTERNAL
.withDescription("headers not received before payload"), new Metadata());
return;
}
inboundPhase(Phase.MESSAGE);
needToCloseFrame = false;
deframe(frame, false);
} finally {
if (needToCloseFrame) {
frame.close();
}
}
}
@Override
protected void inboundDeliveryPaused() {
runCloseListenerTask();
}
@Override
protected final void deframeFailed(Throwable cause) {
cancel(Status.fromThrowable(cause));
}
/**
* Processes the trailers and status from the server.
*
* @param trailers the received trailers
* @param status the status extracted from the trailers
*/
protected void inboundTrailersReceived(Metadata trailers, Status status) {
Preconditions.checkNotNull(trailers, "trailers");
if (inboundPhase() == Phase.STATUS) {
log.log(Level.INFO, "Received trailers on closed stream {0}\n {1}\n {2}",
new Object[]{id(), status, trailers});
}
// Stash the status & trailers so they can be delivered by the deframer calls
// remoteEndClosed
this.status = status;
this.trailers = trailers;
deframe(ReadableBuffers.empty(), true);
}
@Override
protected void remoteEndClosed() {
transportReportStatus(status, true, trailers);
}
@Override
protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS");
sendFrame(frame, endOfStream, flush);
}
/**
* Sends an outbound frame to the remote end point.
*
* @param frame a buffer containing the chunk of data to be sent or {@code null} if the framer is
* closing and has no data to send.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
* @param flush {@code true} if more data may not be arriving soon
*/
protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush);
/**
* Report stream closure with status to the application layer if not already reported. This method
* must be called from the transport thread.
*
* @param newStatus the new status to set
* @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that
* may already be queued up in the deframer. If {@code false}, the listener will be
* notified immediately after all currently completed messages in the deframer have been
* delivered to the application.
* @param trailers new instance of {@code Trailers}, either empty or those returned by the server
*/
public void transportReportStatus(final Status newStatus, boolean stopDelivery,
final Metadata trailers) {
Preconditions.checkNotNull(newStatus, "newStatus");
boolean closingLater = closeListenerTask != null && !stopDelivery;
if (listenerClosed || closingLater) {
// We already closed (or are about to close) the listener.
return;
}
inboundPhase(Phase.STATUS);
status = newStatus;
closeListenerTask = null;
// Determine if the deframer is stalled (i.e. currently has no complete messages to deliver).
boolean deliveryStalled = isDeframerStalled();
if (stopDelivery || deliveryStalled) {
// Close the listener immediately.
closeListener(newStatus, trailers);
} else {
// Delay close until inboundDeliveryStalled()
closeListenerTask = newCloseListenerTask(newStatus, trailers);
}
}
/**
* Creates a new {@link Runnable} to close the listener with the given status/trailers.
*/
private Runnable newCloseListenerTask(final Status status, final Metadata trailers) {
return new Runnable() {
@Override
public void run() {
closeListener(status, trailers);
}
};
}
/**
* Closes the listener if not previously closed.
*
* @throws IllegalStateException if the call has not yet been started.
*/
private void closeListener(Status newStatus, Metadata trailers) {
checkState(listener != null, "stream not started");
if (!listenerClosed) {
listenerClosed = true;
closeDeframer();
statsTraceCtx.streamClosed(newStatus);
listener.closed(newStatus, trailers);
}
}
/**
* Executes the pending listener close task, if one exists.
*/
private void runCloseListenerTask() {
if (closeListenerTask != null) {
closeListenerTask.run();
closeListenerTask = null;
}
}
@Override
public final void halfClose() {
if (outboundPhase(Phase.STATUS) != Phase.STATUS) {
closeFramer();
}
}
/**
* Cancel the stream. Called by the application layer, never called by the transport.
*/
@Override
public final void cancel(Status reason) {
checkArgument(!reason.isOk(), "Should not cancel with OK status");
cancelled = true;
sendCancel(reason);
dispose();
}
@Override
public final boolean isReady() {
return !cancelled && super.isReady();
}
/**
* Cancel the stream and send a stream cancellation message to the remote server, if necessary.
* Can be called by either the application or transport layers. This method is safe to be called
* at any time and multiple times.
*/
protected abstract void sendCancel(Status reason);
@Override
protected MoreObjects.ToStringHelper toStringHelper() {
MoreObjects.ToStringHelper toStringHelper = super.toStringHelper();
if (status != null) {
toStringHelper.add("status", status);
}
return toStringHelper;
}
@Override
public boolean isClosed() {
return super.isClosed() || listenerClosed;
}
}

View File

@ -1,448 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Decompressor;
import java.io.InputStream;
import javax.annotation.concurrent.GuardedBy;
/**
* Abstract base class for {@link Stream} implementations.
*/
public abstract class AbstractStream implements Stream {
/**
* The default number of queued bytes for a given stream, below which
* {@link StreamListener#onReady()} will be called.
*/
public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
public static final int ABSENT_ID = -1;
/**
* Indicates the phase of the GRPC stream in one direction.
*/
protected enum Phase {
HEADERS, MESSAGE, STATUS
}
private final MessageFramer framer;
private final MessageDeframer deframer;
/**
* Inbound phase is exclusively written to by the transport thread.
*/
private Phase inboundPhase = Phase.HEADERS;
/**
* Outbound phase is exclusively written to by the application thread.
*/
private Phase outboundPhase = Phase.HEADERS;
/**
* The number of queued bytes for a given stream, below which {@link StreamListener#onReady()}
* will be called.
*/
private int onReadyThreshold = DEFAULT_ONREADY_THRESHOLD;
/**
* The number of bytes currently queued, waiting to be sent. When this falls below
* onReadyThreshold, {@link StreamListener#onReady()} will be called.
*/
private int numSentBytesQueued;
/**
* Indicates the stream has been created on the connection. This implies that the stream is no
* longer limited by MAX_CONCURRENT_STREAMS.
*/
@GuardedBy("onReadyLock")
private boolean allocated;
private final Object onReadyLock = new Object();
@VisibleForTesting
class FramerSink implements MessageFramer.Sink {
@Override
public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
internalSendFrame(frame, endOfStream, flush);
}
}
@VisibleForTesting
class DeframerListener implements MessageDeframer.Listener {
@Override
public void bytesRead(int numBytes) {
returnProcessedBytes(numBytes);
}
@Override
public void messageRead(InputStream input) {
receiveMessage(input);
}
@Override
public void deliveryStalled() {
inboundDeliveryPaused();
}
@Override
public void endOfStream() {
remoteEndClosed();
}
}
AbstractStream(WritableBufferAllocator bufferAllocator, int maxMessageSize,
StatsTraceContext statsTraceCtx) {
framer = new MessageFramer(new FramerSink(), bufferAllocator, statsTraceCtx);
deframer = new MessageDeframer(new DeframerListener(), Codec.Identity.NONE, maxMessageSize,
statsTraceCtx, getClass().getName());
}
protected final void setMaxInboundMessageSizeProtected(int maxSize) {
deframer.setMaxInboundMessageSize(maxSize);
}
protected final void setMaxOutboundMessageSizeProtected(int maxSize) {
framer.setMaxOutboundMessageSize(maxSize);
}
@VisibleForTesting
AbstractStream(MessageFramer framer, MessageDeframer deframer) {
this.framer = framer;
this.deframer = deframer;
}
/**
* Override this method to provide a stream listener.
*/
protected abstract StreamListener listener();
/**
* Returns the internal ID for this stream. Note that ID can be {@link #ABSENT_ID} for client
* streams as the transport may defer creating the stream to the remote side until it has a
* payload or metadata to send.
*/
public abstract int id();
/**
* The number of queued bytes for a given stream, below which {@link StreamListener#onReady()}
* will be called. Defaults to {@link #DEFAULT_ONREADY_THRESHOLD}.
*/
public int getOnReadyThreshold() {
synchronized (onReadyLock) {
return onReadyThreshold;
}
}
@Override
public void writeMessage(InputStream message) {
checkNotNull(message, "message");
outboundPhase(Phase.MESSAGE);
if (!framer.isClosed()) {
framer.writePayload(message);
}
}
@Override
public final void setMessageCompression(boolean enable) {
framer.setMessageCompression(enable);
}
@Override
public final void flush() {
if (!framer.isClosed()) {
framer.flush();
}
}
@Override
public boolean isReady() {
if (listener() != null && outboundPhase() != Phase.STATUS) {
synchronized (onReadyLock) {
return allocated && numSentBytesQueued < onReadyThreshold;
}
}
return false;
}
/**
* Closes the underlying framer.
*
* <p>No-op if the framer has already been closed.
*/
final void closeFramer() {
if (!framer.isClosed()) {
framer.close();
}
}
/**
* Frees any resources associated with this stream. Subclass implementations must call this
* version.
*
* <p>NOTE: Can be called by both the transport thread and the application thread. Transport
* threads need to dispose when the remote side has terminated the stream. Application threads
* will dispose when the application decides to close the stream as part of normal processing.
*/
public void dispose() {
framer.dispose();
}
/**
* Sends an outbound frame to the remote end point.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
* @param flush {@code true} if more data may not be arriving soon
*/
protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream,
boolean flush);
/**
* Handles a message that was just deframed.
*
* @param is the stream containing the message
*/
protected abstract void receiveMessage(InputStream is);
/**
* Handles the event that the deframer has no pending deliveries.
*/
protected abstract void inboundDeliveryPaused();
/**
* Handles the event that the deframer has reached end of stream.
*/
protected abstract void remoteEndClosed();
/**
* Returns the given number of processed bytes back to inbound flow control to enable receipt of
* more data.
*/
protected abstract void returnProcessedBytes(int processedBytes);
/**
* Called when a {@link #deframe(ReadableBuffer, boolean)} operation failed.
*
* @param cause the actual failure
*/
protected abstract void deframeFailed(Throwable cause);
/**
* Closes this deframer and frees any resources. After this method is called, additional calls
* will have no effect.
*/
protected final void closeDeframer() {
deframer.close();
}
/**
* Called to parse a received frame and attempt delivery of any completed
* messages. Must be called from the transport thread.
*/
protected final void deframe(ReadableBuffer frame, boolean endOfStream) {
try {
deframer.deframe(frame, endOfStream);
} catch (Throwable t) {
deframeFailed(t);
}
}
/**
* Indicates whether delivery is currently stalled, pending receipt of more data.
*/
protected final boolean isDeframerStalled() {
return deframer.isStalled();
}
/**
* Called to request the given number of messages from the deframer. Must be called
* from the transport thread.
*/
protected final void requestMessagesFromDeframer(int numMessages) {
try {
deframer.request(numMessages);
} catch (Throwable t) {
deframeFailed(t);
}
}
@Override
public final void setCompressor(Compressor compressor) {
framer.setCompressor(checkNotNull(compressor, "compressor"));
}
@Override
public final void setDecompressor(Decompressor decompressor) {
deframer.setDecompressor(checkNotNull(decompressor, "decompressor"));
}
/**
* Event handler to be called by the subclass when the stream's headers have passed any connection
* flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link
* StreamListener#onReady()} handler if appropriate. This must be called from the transport
* thread, since the listener may be called back directly.
*/
protected final void onStreamAllocated() {
checkState(listener() != null);
synchronized (onReadyLock) {
checkState(!allocated, "Already allocated");
allocated = true;
}
notifyIfReady();
}
/**
* Event handler to be called by the subclass when a number of bytes are being queued for sending
* to the remote endpoint.
*
* @param numBytes the number of bytes being sent.
*/
protected final void onSendingBytes(int numBytes) {
synchronized (onReadyLock) {
numSentBytesQueued += numBytes;
}
}
/**
* Event handler to be called by the subclass when a number of bytes has been sent to the remote
* endpoint. May call back the listener's {@link StreamListener#onReady()} handler if appropriate.
* This must be called from the transport thread, since the listener may be called back directly.
*
* @param numBytes the number of bytes that were sent.
*/
protected final void onSentBytes(int numBytes) {
boolean doNotify;
synchronized (onReadyLock) {
boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
numSentBytesQueued -= numBytes;
boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
doNotify = !belowThresholdBefore && belowThresholdAfter;
}
if (doNotify) {
notifyIfReady();
}
}
@VisibleForTesting
final void notifyIfReady() {
boolean doNotify = false;
synchronized (onReadyLock) {
doNotify = isReady();
}
if (doNotify) {
listener().onReady();
}
}
final Phase inboundPhase() {
return inboundPhase;
}
/**
* Transitions the inbound phase to the given phase and returns the previous phase.
*
* @throws IllegalStateException if the transition is disallowed
*/
final Phase inboundPhase(Phase nextPhase) {
Phase tmp = inboundPhase;
inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
return tmp;
}
final Phase outboundPhase() {
return outboundPhase;
}
/**
* Transitions the outbound phase to the given phase and returns the previous phase.
*
* @throws IllegalStateException if the transition is disallowed
*/
final Phase outboundPhase(Phase nextPhase) {
Phase tmp = outboundPhase;
outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
return tmp;
}
@VisibleForTesting
Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal()) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
}
return nextPhase;
}
/**
* Returns {@code true} if the stream can receive data from its remote peer.
*/
public boolean canReceive() {
return inboundPhase() != Phase.STATUS;
}
/**
* Returns {@code true} if the stream can send data to its remote peer.
*/
public boolean canSend() {
return outboundPhase() != Phase.STATUS;
}
/**
* Whether the stream is fully closed. Note that this method is not thread-safe as {@code
* inboundPhase} and {@code outboundPhase} are mutated in different threads. Tests must account
* for thread coordination when calling.
*/
@VisibleForTesting
public boolean isClosed() {
return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS;
}
@Override
public String toString() {
return toStringHelper().toString();
}
protected MoreObjects.ToStringHelper toStringHelper() {
return MoreObjects.toStringHelper(this)
.add("id", id())
.add("inboundPhase", inboundPhase().name())
.add("outboundPhase", outboundPhase().name());
}
}

View File

@ -34,6 +34,7 @@ package io.grpc.internal;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.Codec;
import io.grpc.Compressor;
import io.grpc.Decompressor;
@ -119,7 +120,8 @@ public abstract class AbstractStream2 implements Stream {
* The default number of queued bytes for a given stream, below which
* {@link StreamListener#onReady()} will be called.
*/
private static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
@VisibleForTesting
public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024;
private final MessageDeframer deframer;
private final Object onReadyLock = new Object();

View File

@ -1,255 +0,0 @@
/*
* Copyright 2014, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.Status;
import java.nio.charset.Charset;
import javax.annotation.Nullable;
/**
* Base implementation for client streams using HTTP2 as the transport.
*/
public abstract class Http2ClientStream extends AbstractClientStream {
/**
* Metadata marshaller for HTTP status lines.
*/
private static final InternalMetadata.TrustedAsciiMarshaller<Integer> HTTP_STATUS_MARSHALLER =
new InternalMetadata.TrustedAsciiMarshaller<Integer>() {
@Override
public byte[] toAsciiString(Integer value) {
throw new UnsupportedOperationException();
}
/**
* RFC 7231 says status codes are 3 digits long.
*
* @see: <a href="https://tools.ietf.org/html/rfc7231#section-6">RFC 7231</a>
*/
@Override
public Integer parseAsciiString(byte[] serialized) {
if (serialized.length >= 3) {
return (serialized[0] - '0') * 100 + (serialized[1] - '0') * 10 + (serialized[2] - '0');
}
throw new NumberFormatException(
"Malformed status code " + new String(serialized, InternalMetadata.US_ASCII));
}
};
private static final Metadata.Key<Integer> HTTP2_STATUS = InternalMetadata.keyOf(":status",
HTTP_STATUS_MARSHALLER);
/** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */
private Status transportError;
private Metadata transportErrorMetadata;
private Charset errorCharset = Charsets.UTF_8;
private boolean headersReceived;
protected Http2ClientStream(WritableBufferAllocator bufferAllocator, int maxMessageSize,
StatsTraceContext statsTraceCtx) {
super(bufferAllocator, maxMessageSize, statsTraceCtx);
}
/**
* Called by subclasses whenever {@code Headers} are received from the transport.
*
* @param headers the received headers
*/
protected void transportHeadersReceived(Metadata headers) {
Preconditions.checkNotNull(headers, "headers");
if (transportError != null) {
// Already received a transport error so just augment it. Something is really, really strange.
transportError = transportError.augmentDescription("headers: " + headers);
return;
}
try {
if (headersReceived) {
transportError = Status.INTERNAL.withDescription("Received headers twice");
return;
}
Integer httpStatus = headers.get(HTTP2_STATUS);
if (httpStatus != null && httpStatus >= 100 && httpStatus < 200) {
// Ignore the headers. See RFC 7540 §8.1
return;
}
headersReceived = true;
transportError = validateInitialMetadata(headers);
if (transportError != null) {
return;
}
stripTransportDetails(headers);
inboundHeadersReceived(headers);
} finally {
if (transportError != null) {
// Note we don't immediately report the transport error, instead we wait for more data on
// the stream so we can accumulate more detail into the error before reporting it.
transportError = transportError.augmentDescription("headers: " + headers);
transportErrorMetadata = headers;
errorCharset = extractCharset(headers);
}
}
}
/**
* Called by subclasses whenever a data frame is received from the transport.
*
* @param frame the received data frame
* @param endOfStream {@code true} if there will be no more data received for this stream
*/
protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) {
if (transportError == null && inboundPhase() == Phase.HEADERS) {
// Must receive headers prior to receiving any payload as we use headers to check for
// protocol correctness.
transportError = Status.INTERNAL.withDescription("no headers received prior to data");
transportErrorMetadata = new Metadata();
}
if (transportError != null) {
// We've already detected a transport error and now we're just accumulating more detail
// for it.
transportError = transportError.augmentDescription("DATA-----------------------------\n"
+ ReadableBuffers.readAsString(frame, errorCharset));
frame.close();
if (transportError.getDescription().length() > 1000 || endOfStream) {
inboundTransportError(transportError, transportErrorMetadata);
// We have enough error detail so lets cancel.
sendCancel(Status.CANCELLED);
}
} else {
inboundDataReceived(frame);
if (endOfStream) {
// This is a protocol violation as we expect to receive trailers.
transportError =
Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server.");
transportErrorMetadata = new Metadata();
inboundTransportError(transportError, transportErrorMetadata);
}
}
}
/**
* Called by subclasses for the terminal trailer metadata on a stream.
*
* @param trailers the received terminal trailer metadata
*/
protected void transportTrailersReceived(Metadata trailers) {
Preconditions.checkNotNull(trailers, "trailers");
if (transportError == null && !headersReceived) {
transportError = validateInitialMetadata(trailers);
if (transportError != null) {
transportErrorMetadata = trailers;
}
}
if (transportError != null) {
transportError = transportError.augmentDescription("trailers: " + trailers);
inboundTransportError(transportError, transportErrorMetadata);
sendCancel(Status.CANCELLED);
} else {
Status status = statusFromTrailers(trailers);
stripTransportDetails(trailers);
inboundTrailersReceived(trailers, status);
}
}
/**
* Extract the response status from trailers.
*/
private Status statusFromTrailers(Metadata trailers) {
Status status = trailers.get(Status.CODE_KEY);
if (status != null) {
return status.withDescription(trailers.get(Status.MESSAGE_KEY));
}
// No status; something is broken. Try to provide a resonanable error.
if (headersReceived) {
return Status.UNKNOWN.withDescription("missing GRPC status in response");
}
Integer httpStatus = trailers.get(HTTP2_STATUS);
if (httpStatus != null) {
status = GrpcUtil.httpStatusToGrpcStatus(httpStatus);
} else {
status = Status.INTERNAL.withDescription("missing HTTP status code");
}
return status.augmentDescription(
"missing GRPC status, inferred error from HTTP status code");
}
/**
* Inspect initial headers to make sure they conform to HTTP and gRPC, returning a {@code Status}
* on failure.
*
* @return status with description of failure, or {@code null} when valid
*/
@Nullable
private Status validateInitialMetadata(Metadata headers) {
Integer httpStatus = headers.get(HTTP2_STATUS);
if (httpStatus == null) {
return Status.INTERNAL.withDescription("Missing HTTP status code");
}
String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
if (!GrpcUtil.isGrpcContentType(contentType)) {
return GrpcUtil.httpStatusToGrpcStatus(httpStatus)
.augmentDescription("invalid content-type: " + contentType);
}
return null;
}
/**
* Inspect the raw metadata and figure out what charset is being used.
*/
private static Charset extractCharset(Metadata headers) {
String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY);
if (contentType != null) {
String[] split = contentType.split("charset=");
try {
return Charset.forName(split[split.length - 1].trim());
} catch (Exception t) {
// Ignore and assume UTF-8
}
}
return Charsets.UTF_8;
}
/**
* Strip HTTP transport implementation details so they don't leak via metadata into
* the application layer.
*/
private static void stripTransportDetails(Metadata metadata) {
metadata.discardAll(HTTP2_STATUS);
metadata.discardAll(Status.CODE_KEY);
metadata.discardAll(Status.MESSAGE_KEY);
}
}

View File

@ -1,288 +0,0 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.verify;
import io.grpc.Attributes;
import io.grpc.Codec;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.internal.AbstractStream.Phase;
import io.grpc.internal.MessageFramerTest.ByteWritableBuffer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
* Test for {@link AbstractClientStream}. This class tries to test functionality in
* AbstractClientStream, but not in any super classes.
*/
@RunWith(JUnit4.class)
public class AbstractClientStreamTest {
@Rule public final ExpectedException thrown = ExpectedException.none();
@Mock private ClientStreamListener mockListener;
@Captor private ArgumentCaptor<Status> statusCaptor;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
private final WritableBufferAllocator allocator = new WritableBufferAllocator() {
@Override
public WritableBuffer allocate(int capacityHint) {
return new ByteWritableBuffer(capacityHint);
}
};
@Test
public void cancel_doNotAcceptOk() {
for (Code code : Code.values()) {
ClientStreamListener listener = new NoopClientStreamListener();
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(listener);
if (code != Code.OK) {
stream.cancel(Status.fromCodeValue(code.value()));
} else {
try {
stream.cancel(Status.fromCodeValue(code.value()));
fail();
} catch (IllegalArgumentException e) {
// ignore
}
}
}
}
@Test
public void cancel_failsOnNull() {
ClientStreamListener listener = new NoopClientStreamListener();
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(listener);
thrown.expect(NullPointerException.class);
stream.cancel(null);
}
@Test
public void cancel_notifiesOnlyOnce() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator) {
@Override
protected void sendCancel(Status errorStatus) {
transportReportStatus(errorStatus, true/*stop delivery*/, new Metadata());
}
};
stream.start(mockListener);
stream.cancel(Status.DEADLINE_EXCEEDED);
verify(mockListener).closed(isA(Status.class), isA(Metadata.class));
}
@Test
public void startFailsOnNullListener() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
thrown.expect(NullPointerException.class);
stream.start(null);
}
@Test
public void cantCallStartTwice() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
thrown.expect(IllegalStateException.class);
stream.start(mockListener);
}
@Test
public void deframeFailed_notifiesListener() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator) {
@Override
protected void sendCancel(Status errorStatus) {
transportReportStatus(errorStatus, true/*stop delivery*/, new Metadata());
}
};
stream.start(mockListener);
stream.deframeFailed(new RuntimeException("something bad"));
verify(mockListener).closed(statusCaptor.capture(), isA(Metadata.class));
assertEquals(Code.UNKNOWN, statusCaptor.getValue().getCode());
}
@Test
public void inboundDataReceived_failsOnNullFrame() {
ClientStreamListener listener = new NoopClientStreamListener();
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(listener);
thrown.expect(NullPointerException.class);
stream.inboundDataReceived(null);
}
@Test
public void inboundDataReceived_failsOnNoHeaders() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
stream.inboundPhase(Phase.HEADERS);
stream.inboundDataReceived(ReadableBuffers.empty());
verify(mockListener).closed(statusCaptor.capture(), isA(Metadata.class));
assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode());
}
@Test
public void inboundHeadersReceived_notifiesListener() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
Metadata headers = new Metadata();
stream.inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
@Test
public void inboundHeadersReceived_failsOnPhaseStatus() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
Metadata headers = new Metadata();
stream.inboundPhase(Phase.STATUS);
thrown.expect(IllegalStateException.class);
stream.inboundHeadersReceived(headers);
}
@Test
public void inboundHeadersReceived_succeedsOnPhaseMessage() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
Metadata headers = new Metadata();
stream.inboundPhase(Phase.MESSAGE);
stream.inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
@Test
public void inboundHeadersReceived_acceptsGzipEncoding() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding());
stream.inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
@Test
public void inboundHeadersReceived_acceptsIdentityEncoding() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
Metadata headers = new Metadata();
headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, Codec.Identity.NONE.getMessageEncoding());
stream.inboundHeadersReceived(headers);
verify(mockListener).headersRead(headers);
}
@Test
public void rstStreamClosesStream() {
AbstractClientStream stream = new BaseAbstractClientStream(allocator);
stream.start(mockListener);
// The application will call request when waiting for a message, which will in turn call this
// on the transport thread.
stream.requestMessagesFromDeframer(1);
// Send first byte of 2 byte message
stream.deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}), false);
Status status = Status.INTERNAL;
// Simulate getting a reset
stream.transportReportStatus(status, false /*stop delivery*/, new Metadata());
assertTrue(stream.isClosed());
}
/**
* No-op base class for testing.
*/
private static class BaseAbstractClientStream extends AbstractClientStream {
protected BaseAbstractClientStream(WritableBufferAllocator allocator) {
super(allocator, DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP);
}
@Override
public void setAuthority(String authority) {}
@Override
public Attributes getAttributes() {
return Attributes.EMPTY;
}
@Override
public void request(int numMessages) {}
@Override
protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {}
@Override
protected void sendCancel(Status reason) {}
@Override
public int id() {
return ABSENT_ID;
}
@Override
protected void returnProcessedBytes(int processedBytes) {}
}
}

View File

@ -1,173 +0,0 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.internal;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import io.grpc.internal.AbstractStream.Phase;
import io.grpc.internal.MessageFramerTest.ByteWritableBuffer;
import java.io.InputStream;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class AbstractStreamTest {
@Mock private StreamListener streamListener;
@Mock MessageFramer framer;
@Mock MessageDeframer deframer;
private final WritableBufferAllocator allocator = new WritableBufferAllocator() {
@Override
public WritableBuffer allocate(int capacityHint) {
return new ByteWritableBuffer(capacityHint);
}
};
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void onStreamAllocated_shouldNotifyReady() {
AbstractStream stream = new AbstractStreamBase(null);
stream.onStreamAllocated();
verify(streamListener).onReady();
}
@Test
public void setMessageCompression() {
AbstractStream as = new AbstractStreamBase(framer, deframer);
as.setMessageCompression(true);
verify(framer).setMessageCompression(true);
}
@Test
public void validPhaseTransitions() {
AbstractStream stream = new AbstractStreamBase(null);
Multimap<Phase, Phase> validTransitions = ImmutableMultimap.<Phase, Phase>builder()
.put(Phase.HEADERS, Phase.HEADERS)
.put(Phase.HEADERS, Phase.MESSAGE)
.put(Phase.HEADERS, Phase.STATUS)
.put(Phase.MESSAGE, Phase.MESSAGE)
.put(Phase.MESSAGE, Phase.STATUS)
.put(Phase.STATUS, Phase.STATUS)
.build();
for (Phase startPhase : Phase.values()) {
for (Phase endPhase : Phase.values()) {
if (validTransitions.containsEntry(startPhase, endPhase)) {
stream.verifyNextPhase(startPhase, endPhase);
} else {
try {
stream.verifyNextPhase(startPhase, endPhase);
fail();
} catch (IllegalStateException expected) {
// continue
}
}
}
}
}
/**
* Base class for testing.
*/
private class AbstractStreamBase extends AbstractStream {
private AbstractStreamBase(WritableBufferAllocator bufferAllocator) {
super(allocator, DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP);
}
private AbstractStreamBase(MessageFramer framer, MessageDeframer deframer) {
super(framer, deframer);
}
@Override
public void request(int numMessages) {
throw new UnsupportedOperationException();
}
@Override
public int id() {
return ABSENT_ID;
}
@Override
protected StreamListener listener() {
return streamListener;
}
@Override
protected void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
throw new UnsupportedOperationException();
}
@Override
protected void receiveMessage(InputStream is) {
throw new UnsupportedOperationException();
}
@Override
protected void inboundDeliveryPaused() {
throw new UnsupportedOperationException();
}
@Override
protected void remoteEndClosed() {
throw new UnsupportedOperationException();
}
@Override
protected void returnProcessedBytes(int processedBytes) {
throw new UnsupportedOperationException();
}
@Override
protected void deframeFailed(Throwable cause) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -69,7 +69,7 @@ import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.internal.AbstractStream;
import io.grpc.internal.AbstractStream2;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
import io.grpc.internal.GrpcUtil;
@ -1121,7 +1121,8 @@ public class OkHttpClientTransportTest {
public void notifyOnReady() throws Exception {
initTransport();
// exactly one byte below the threshold
int messageLength = AbstractStream.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1;
int messageLength =
AbstractStream2.TransportState.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1;
setInitialWindowSize(0);
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = clientTransport.newStream(method, new Metadata());