diff --git a/core/src/main/java/io/grpc/internal/AbstractClientStream.java b/core/src/main/java/io/grpc/internal/AbstractClientStream.java deleted file mode 100644 index 4ea3467c26..0000000000 --- a/core/src/main/java/io/grpc/internal/AbstractClientStream.java +++ /dev/null @@ -1,336 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package io.grpc.internal; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.MoreObjects; -import com.google.common.base.Preconditions; -import io.grpc.Metadata; -import io.grpc.Status; -import java.io.InputStream; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * The abstract base class for {@link ClientStream} implementations. - */ -public abstract class AbstractClientStream extends AbstractStream - implements ClientStream { - - private static final Logger log = Logger.getLogger(AbstractClientStream.class.getName()); - - private final StatsTraceContext statsTraceCtx; - private ClientStreamListener listener; - private boolean listenerClosed; - - // Stored status & trailers to report when deframer completes or - // transportReportStatus is directly called. - private Status status; - private Metadata trailers; - private Runnable closeListenerTask; - private volatile boolean cancelled; - - protected AbstractClientStream(WritableBufferAllocator bufferAllocator, int maxMessageSize, - StatsTraceContext statsTraceCtx) { - super(bufferAllocator, maxMessageSize, statsTraceCtx); - this.statsTraceCtx = checkNotNull(statsTraceCtx, "statsTraceCtx"); - } - - @Override - public void setMaxInboundMessageSize(int maxSize) { - setMaxInboundMessageSizeProtected(maxSize); - } - - @Override - public void setMaxOutboundMessageSize(int maxSize) { - setMaxOutboundMessageSizeProtected(maxSize); - } - - @Override - protected final ClientStreamListener listener() { - return listener; - } - - /** - * Indicates that the stream may proceed to do traffic. Sub classes should override and invoke - * this method after they have been created. - */ - @Override - public void start(ClientStreamListener listener) { - checkState(this.listener == null, "stream already started"); - this.listener = checkNotNull(listener, "listener"); - } - - @Override - protected void receiveMessage(InputStream is) { - if (!listenerClosed) { - checkState(listener != null, "stream not started"); - listener.messageRead(is); - } - } - - /** - * The transport implementation has detected a protocol error on the stream. Transports are - * responsible for properly closing streams when protocol errors occur. - * - * @param errorStatus the error to report - * @param metadata any metadata received - */ - protected void inboundTransportError(Status errorStatus, Metadata metadata) { - Preconditions.checkNotNull(metadata, "metadata"); - if (inboundPhase() == Phase.STATUS) { - log.log(Level.INFO, "Received transport error on closed stream {0} {1}", - new Object[]{id(), errorStatus}); - return; - } - // For transport errors we immediately report status to the application layer - // and do not wait for additional payloads. - transportReportStatus(errorStatus, false, metadata); - } - - /** - * Called by transport implementations when they receive headers. - * - * @param headers the parsed headers - */ - protected void inboundHeadersReceived(Metadata headers) { - checkState(listener != null, "stream not started"); - if (inboundPhase() == Phase.STATUS) { - log.log(Level.INFO, "Received headers on closed stream {0} {1}", - new Object[]{id(), headers}); - } - - inboundPhase(Phase.MESSAGE); - listener.headersRead(headers); - } - - /** - * Processes the contents of a received data frame from the server. - * - * @param frame the received data frame. Its ownership is transferred to this method. - */ - protected void inboundDataReceived(ReadableBuffer frame) { - Preconditions.checkNotNull(frame, "frame"); - boolean needToCloseFrame = true; - try { - if (inboundPhase() == Phase.STATUS) { - return; - } - if (inboundPhase() == Phase.HEADERS) { - // Have not received headers yet so error - inboundTransportError(Status.INTERNAL - .withDescription("headers not received before payload"), new Metadata()); - return; - } - inboundPhase(Phase.MESSAGE); - - needToCloseFrame = false; - deframe(frame, false); - } finally { - if (needToCloseFrame) { - frame.close(); - } - } - } - - @Override - protected void inboundDeliveryPaused() { - runCloseListenerTask(); - } - - @Override - protected final void deframeFailed(Throwable cause) { - cancel(Status.fromThrowable(cause)); - } - - /** - * Processes the trailers and status from the server. - * - * @param trailers the received trailers - * @param status the status extracted from the trailers - */ - protected void inboundTrailersReceived(Metadata trailers, Status status) { - Preconditions.checkNotNull(trailers, "trailers"); - if (inboundPhase() == Phase.STATUS) { - log.log(Level.INFO, "Received trailers on closed stream {0}\n {1}\n {2}", - new Object[]{id(), status, trailers}); - } - // Stash the status & trailers so they can be delivered by the deframer calls - // remoteEndClosed - this.status = status; - this.trailers = trailers; - deframe(ReadableBuffers.empty(), true); - } - - @Override - protected void remoteEndClosed() { - transportReportStatus(status, true, trailers); - } - - @Override - protected final void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { - Preconditions.checkArgument(frame != null || endOfStream, "null frame before EOS"); - sendFrame(frame, endOfStream, flush); - } - - /** - * Sends an outbound frame to the remote end point. - * - * @param frame a buffer containing the chunk of data to be sent or {@code null} if the framer is - * closing and has no data to send. - * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by - * this endpoint. - * @param flush {@code true} if more data may not be arriving soon - */ - protected abstract void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush); - - /** - * Report stream closure with status to the application layer if not already reported. This method - * must be called from the transport thread. - * - * @param newStatus the new status to set - * @param stopDelivery if {@code true}, interrupts any further delivery of inbound messages that - * may already be queued up in the deframer. If {@code false}, the listener will be - * notified immediately after all currently completed messages in the deframer have been - * delivered to the application. - * @param trailers new instance of {@code Trailers}, either empty or those returned by the server - */ - public void transportReportStatus(final Status newStatus, boolean stopDelivery, - final Metadata trailers) { - Preconditions.checkNotNull(newStatus, "newStatus"); - - boolean closingLater = closeListenerTask != null && !stopDelivery; - if (listenerClosed || closingLater) { - // We already closed (or are about to close) the listener. - return; - } - - inboundPhase(Phase.STATUS); - status = newStatus; - closeListenerTask = null; - - // Determine if the deframer is stalled (i.e. currently has no complete messages to deliver). - boolean deliveryStalled = isDeframerStalled(); - - if (stopDelivery || deliveryStalled) { - // Close the listener immediately. - closeListener(newStatus, trailers); - } else { - // Delay close until inboundDeliveryStalled() - closeListenerTask = newCloseListenerTask(newStatus, trailers); - } - } - - /** - * Creates a new {@link Runnable} to close the listener with the given status/trailers. - */ - private Runnable newCloseListenerTask(final Status status, final Metadata trailers) { - return new Runnable() { - @Override - public void run() { - closeListener(status, trailers); - } - }; - } - - /** - * Closes the listener if not previously closed. - * - * @throws IllegalStateException if the call has not yet been started. - */ - private void closeListener(Status newStatus, Metadata trailers) { - checkState(listener != null, "stream not started"); - if (!listenerClosed) { - listenerClosed = true; - closeDeframer(); - statsTraceCtx.streamClosed(newStatus); - listener.closed(newStatus, trailers); - } - } - - /** - * Executes the pending listener close task, if one exists. - */ - private void runCloseListenerTask() { - if (closeListenerTask != null) { - closeListenerTask.run(); - closeListenerTask = null; - } - } - - @Override - public final void halfClose() { - if (outboundPhase(Phase.STATUS) != Phase.STATUS) { - closeFramer(); - } - } - - /** - * Cancel the stream. Called by the application layer, never called by the transport. - */ - @Override - public final void cancel(Status reason) { - checkArgument(!reason.isOk(), "Should not cancel with OK status"); - cancelled = true; - sendCancel(reason); - dispose(); - } - - @Override - public final boolean isReady() { - return !cancelled && super.isReady(); - } - - /** - * Cancel the stream and send a stream cancellation message to the remote server, if necessary. - * Can be called by either the application or transport layers. This method is safe to be called - * at any time and multiple times. - */ - protected abstract void sendCancel(Status reason); - - @Override - protected MoreObjects.ToStringHelper toStringHelper() { - MoreObjects.ToStringHelper toStringHelper = super.toStringHelper(); - if (status != null) { - toStringHelper.add("status", status); - } - return toStringHelper; - } - - @Override - public boolean isClosed() { - return super.isClosed() || listenerClosed; - } -} diff --git a/core/src/main/java/io/grpc/internal/AbstractStream.java b/core/src/main/java/io/grpc/internal/AbstractStream.java deleted file mode 100644 index 84295e441d..0000000000 --- a/core/src/main/java/io/grpc/internal/AbstractStream.java +++ /dev/null @@ -1,448 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package io.grpc.internal; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.MoreObjects; -import io.grpc.Codec; -import io.grpc.Compressor; -import io.grpc.Decompressor; -import java.io.InputStream; -import javax.annotation.concurrent.GuardedBy; - -/** - * Abstract base class for {@link Stream} implementations. - */ -public abstract class AbstractStream implements Stream { - /** - * The default number of queued bytes for a given stream, below which - * {@link StreamListener#onReady()} will be called. - */ - public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024; - - public static final int ABSENT_ID = -1; - - /** - * Indicates the phase of the GRPC stream in one direction. - */ - protected enum Phase { - HEADERS, MESSAGE, STATUS - } - - private final MessageFramer framer; - private final MessageDeframer deframer; - - /** - * Inbound phase is exclusively written to by the transport thread. - */ - private Phase inboundPhase = Phase.HEADERS; - - /** - * Outbound phase is exclusively written to by the application thread. - */ - private Phase outboundPhase = Phase.HEADERS; - - /** - * The number of queued bytes for a given stream, below which {@link StreamListener#onReady()} - * will be called. - */ - private int onReadyThreshold = DEFAULT_ONREADY_THRESHOLD; - - /** - * The number of bytes currently queued, waiting to be sent. When this falls below - * onReadyThreshold, {@link StreamListener#onReady()} will be called. - */ - private int numSentBytesQueued; - - /** - * Indicates the stream has been created on the connection. This implies that the stream is no - * longer limited by MAX_CONCURRENT_STREAMS. - */ - @GuardedBy("onReadyLock") - private boolean allocated; - - private final Object onReadyLock = new Object(); - - @VisibleForTesting - class FramerSink implements MessageFramer.Sink { - @Override - public void deliverFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { - internalSendFrame(frame, endOfStream, flush); - } - } - - @VisibleForTesting - class DeframerListener implements MessageDeframer.Listener { - @Override - public void bytesRead(int numBytes) { - returnProcessedBytes(numBytes); - } - - @Override - public void messageRead(InputStream input) { - receiveMessage(input); - } - - @Override - public void deliveryStalled() { - inboundDeliveryPaused(); - } - - @Override - public void endOfStream() { - remoteEndClosed(); - } - } - - AbstractStream(WritableBufferAllocator bufferAllocator, int maxMessageSize, - StatsTraceContext statsTraceCtx) { - framer = new MessageFramer(new FramerSink(), bufferAllocator, statsTraceCtx); - deframer = new MessageDeframer(new DeframerListener(), Codec.Identity.NONE, maxMessageSize, - statsTraceCtx, getClass().getName()); - } - - protected final void setMaxInboundMessageSizeProtected(int maxSize) { - deframer.setMaxInboundMessageSize(maxSize); - } - - protected final void setMaxOutboundMessageSizeProtected(int maxSize) { - framer.setMaxOutboundMessageSize(maxSize); - } - - @VisibleForTesting - AbstractStream(MessageFramer framer, MessageDeframer deframer) { - this.framer = framer; - this.deframer = deframer; - } - - /** - * Override this method to provide a stream listener. - */ - protected abstract StreamListener listener(); - - /** - * Returns the internal ID for this stream. Note that ID can be {@link #ABSENT_ID} for client - * streams as the transport may defer creating the stream to the remote side until it has a - * payload or metadata to send. - */ - public abstract int id(); - - /** - * The number of queued bytes for a given stream, below which {@link StreamListener#onReady()} - * will be called. Defaults to {@link #DEFAULT_ONREADY_THRESHOLD}. - */ - public int getOnReadyThreshold() { - synchronized (onReadyLock) { - return onReadyThreshold; - } - } - - @Override - public void writeMessage(InputStream message) { - checkNotNull(message, "message"); - outboundPhase(Phase.MESSAGE); - if (!framer.isClosed()) { - framer.writePayload(message); - } - } - - @Override - public final void setMessageCompression(boolean enable) { - framer.setMessageCompression(enable); - } - - @Override - public final void flush() { - if (!framer.isClosed()) { - framer.flush(); - } - } - - @Override - public boolean isReady() { - if (listener() != null && outboundPhase() != Phase.STATUS) { - synchronized (onReadyLock) { - return allocated && numSentBytesQueued < onReadyThreshold; - } - } - return false; - } - - /** - * Closes the underlying framer. - * - *

No-op if the framer has already been closed. - */ - final void closeFramer() { - if (!framer.isClosed()) { - framer.close(); - } - } - - /** - * Frees any resources associated with this stream. Subclass implementations must call this - * version. - * - *

NOTE: Can be called by both the transport thread and the application thread. Transport - * threads need to dispose when the remote side has terminated the stream. Application threads - * will dispose when the application decides to close the stream as part of normal processing. - */ - public void dispose() { - framer.dispose(); - } - - /** - * Sends an outbound frame to the remote end point. - * - * @param frame a buffer containing the chunk of data to be sent. - * @param endOfStream if {@code true} indicates that no more data will be sent on the stream by - * this endpoint. - * @param flush {@code true} if more data may not be arriving soon - */ - protected abstract void internalSendFrame(WritableBuffer frame, boolean endOfStream, - boolean flush); - - /** - * Handles a message that was just deframed. - * - * @param is the stream containing the message - */ - protected abstract void receiveMessage(InputStream is); - - /** - * Handles the event that the deframer has no pending deliveries. - */ - protected abstract void inboundDeliveryPaused(); - - /** - * Handles the event that the deframer has reached end of stream. - */ - protected abstract void remoteEndClosed(); - - /** - * Returns the given number of processed bytes back to inbound flow control to enable receipt of - * more data. - */ - protected abstract void returnProcessedBytes(int processedBytes); - - /** - * Called when a {@link #deframe(ReadableBuffer, boolean)} operation failed. - * - * @param cause the actual failure - */ - protected abstract void deframeFailed(Throwable cause); - - /** - * Closes this deframer and frees any resources. After this method is called, additional calls - * will have no effect. - */ - protected final void closeDeframer() { - deframer.close(); - } - - /** - * Called to parse a received frame and attempt delivery of any completed - * messages. Must be called from the transport thread. - */ - protected final void deframe(ReadableBuffer frame, boolean endOfStream) { - try { - deframer.deframe(frame, endOfStream); - } catch (Throwable t) { - deframeFailed(t); - } - } - - /** - * Indicates whether delivery is currently stalled, pending receipt of more data. - */ - protected final boolean isDeframerStalled() { - return deframer.isStalled(); - } - - /** - * Called to request the given number of messages from the deframer. Must be called - * from the transport thread. - */ - protected final void requestMessagesFromDeframer(int numMessages) { - try { - deframer.request(numMessages); - } catch (Throwable t) { - deframeFailed(t); - } - } - - @Override - public final void setCompressor(Compressor compressor) { - framer.setCompressor(checkNotNull(compressor, "compressor")); - } - - @Override - public final void setDecompressor(Decompressor decompressor) { - deframer.setDecompressor(checkNotNull(decompressor, "decompressor")); - } - - /** - * Event handler to be called by the subclass when the stream's headers have passed any connection - * flow control (i.e., MAX_CONCURRENT_STREAMS). It may call the listener's {@link - * StreamListener#onReady()} handler if appropriate. This must be called from the transport - * thread, since the listener may be called back directly. - */ - protected final void onStreamAllocated() { - checkState(listener() != null); - synchronized (onReadyLock) { - checkState(!allocated, "Already allocated"); - allocated = true; - } - notifyIfReady(); - } - - /** - * Event handler to be called by the subclass when a number of bytes are being queued for sending - * to the remote endpoint. - * - * @param numBytes the number of bytes being sent. - */ - protected final void onSendingBytes(int numBytes) { - synchronized (onReadyLock) { - numSentBytesQueued += numBytes; - } - } - - /** - * Event handler to be called by the subclass when a number of bytes has been sent to the remote - * endpoint. May call back the listener's {@link StreamListener#onReady()} handler if appropriate. - * This must be called from the transport thread, since the listener may be called back directly. - * - * @param numBytes the number of bytes that were sent. - */ - protected final void onSentBytes(int numBytes) { - boolean doNotify; - synchronized (onReadyLock) { - boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold; - numSentBytesQueued -= numBytes; - boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold; - doNotify = !belowThresholdBefore && belowThresholdAfter; - } - if (doNotify) { - notifyIfReady(); - } - } - - @VisibleForTesting - final void notifyIfReady() { - boolean doNotify = false; - synchronized (onReadyLock) { - doNotify = isReady(); - } - if (doNotify) { - listener().onReady(); - } - } - - final Phase inboundPhase() { - return inboundPhase; - } - - /** - * Transitions the inbound phase to the given phase and returns the previous phase. - * - * @throws IllegalStateException if the transition is disallowed - */ - final Phase inboundPhase(Phase nextPhase) { - Phase tmp = inboundPhase; - inboundPhase = verifyNextPhase(inboundPhase, nextPhase); - return tmp; - } - - final Phase outboundPhase() { - return outboundPhase; - } - - /** - * Transitions the outbound phase to the given phase and returns the previous phase. - * - * @throws IllegalStateException if the transition is disallowed - */ - final Phase outboundPhase(Phase nextPhase) { - Phase tmp = outboundPhase; - outboundPhase = verifyNextPhase(outboundPhase, nextPhase); - return tmp; - } - - @VisibleForTesting - Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { - if (nextPhase.ordinal() < currentPhase.ordinal()) { - throw new IllegalStateException( - String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase)); - } - return nextPhase; - } - - /** - * Returns {@code true} if the stream can receive data from its remote peer. - */ - public boolean canReceive() { - return inboundPhase() != Phase.STATUS; - } - - /** - * Returns {@code true} if the stream can send data to its remote peer. - */ - public boolean canSend() { - return outboundPhase() != Phase.STATUS; - } - - /** - * Whether the stream is fully closed. Note that this method is not thread-safe as {@code - * inboundPhase} and {@code outboundPhase} are mutated in different threads. Tests must account - * for thread coordination when calling. - */ - @VisibleForTesting - public boolean isClosed() { - return inboundPhase() == Phase.STATUS && outboundPhase() == Phase.STATUS; - } - - @Override - public String toString() { - return toStringHelper().toString(); - } - - protected MoreObjects.ToStringHelper toStringHelper() { - return MoreObjects.toStringHelper(this) - .add("id", id()) - .add("inboundPhase", inboundPhase().name()) - .add("outboundPhase", outboundPhase().name()); - } -} diff --git a/core/src/main/java/io/grpc/internal/AbstractStream2.java b/core/src/main/java/io/grpc/internal/AbstractStream2.java index d50a027962..df50d1d7cb 100644 --- a/core/src/main/java/io/grpc/internal/AbstractStream2.java +++ b/core/src/main/java/io/grpc/internal/AbstractStream2.java @@ -34,6 +34,7 @@ package io.grpc.internal; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import com.google.common.annotations.VisibleForTesting; import io.grpc.Codec; import io.grpc.Compressor; import io.grpc.Decompressor; @@ -119,7 +120,8 @@ public abstract class AbstractStream2 implements Stream { * The default number of queued bytes for a given stream, below which * {@link StreamListener#onReady()} will be called. */ - private static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024; + @VisibleForTesting + public static final int DEFAULT_ONREADY_THRESHOLD = 32 * 1024; private final MessageDeframer deframer; private final Object onReadyLock = new Object(); diff --git a/core/src/main/java/io/grpc/internal/Http2ClientStream.java b/core/src/main/java/io/grpc/internal/Http2ClientStream.java deleted file mode 100644 index e68dbdf3f9..0000000000 --- a/core/src/main/java/io/grpc/internal/Http2ClientStream.java +++ /dev/null @@ -1,255 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package io.grpc.internal; - -import com.google.common.base.Charsets; -import com.google.common.base.Preconditions; -import io.grpc.InternalMetadata; -import io.grpc.Metadata; -import io.grpc.Status; -import java.nio.charset.Charset; -import javax.annotation.Nullable; - -/** - * Base implementation for client streams using HTTP2 as the transport. - */ -public abstract class Http2ClientStream extends AbstractClientStream { - - /** - * Metadata marshaller for HTTP status lines. - */ - private static final InternalMetadata.TrustedAsciiMarshaller HTTP_STATUS_MARSHALLER = - new InternalMetadata.TrustedAsciiMarshaller() { - @Override - public byte[] toAsciiString(Integer value) { - throw new UnsupportedOperationException(); - } - - /** - * RFC 7231 says status codes are 3 digits long. - * - * @see: RFC 7231 - */ - @Override - public Integer parseAsciiString(byte[] serialized) { - if (serialized.length >= 3) { - return (serialized[0] - '0') * 100 + (serialized[1] - '0') * 10 + (serialized[2] - '0'); - } - throw new NumberFormatException( - "Malformed status code " + new String(serialized, InternalMetadata.US_ASCII)); - } - }; - - private static final Metadata.Key HTTP2_STATUS = InternalMetadata.keyOf(":status", - HTTP_STATUS_MARSHALLER); - - /** When non-{@code null}, {@link #transportErrorMetadata} must also be non-{@code null}. */ - private Status transportError; - private Metadata transportErrorMetadata; - private Charset errorCharset = Charsets.UTF_8; - private boolean headersReceived; - - protected Http2ClientStream(WritableBufferAllocator bufferAllocator, int maxMessageSize, - StatsTraceContext statsTraceCtx) { - super(bufferAllocator, maxMessageSize, statsTraceCtx); - } - - /** - * Called by subclasses whenever {@code Headers} are received from the transport. - * - * @param headers the received headers - */ - protected void transportHeadersReceived(Metadata headers) { - Preconditions.checkNotNull(headers, "headers"); - if (transportError != null) { - // Already received a transport error so just augment it. Something is really, really strange. - transportError = transportError.augmentDescription("headers: " + headers); - return; - } - try { - if (headersReceived) { - transportError = Status.INTERNAL.withDescription("Received headers twice"); - return; - } - Integer httpStatus = headers.get(HTTP2_STATUS); - if (httpStatus != null && httpStatus >= 100 && httpStatus < 200) { - // Ignore the headers. See RFC 7540 ยง8.1 - return; - } - headersReceived = true; - - transportError = validateInitialMetadata(headers); - if (transportError != null) { - return; - } - - stripTransportDetails(headers); - inboundHeadersReceived(headers); - } finally { - if (transportError != null) { - // Note we don't immediately report the transport error, instead we wait for more data on - // the stream so we can accumulate more detail into the error before reporting it. - transportError = transportError.augmentDescription("headers: " + headers); - transportErrorMetadata = headers; - errorCharset = extractCharset(headers); - } - } - } - - /** - * Called by subclasses whenever a data frame is received from the transport. - * - * @param frame the received data frame - * @param endOfStream {@code true} if there will be no more data received for this stream - */ - protected void transportDataReceived(ReadableBuffer frame, boolean endOfStream) { - if (transportError == null && inboundPhase() == Phase.HEADERS) { - // Must receive headers prior to receiving any payload as we use headers to check for - // protocol correctness. - transportError = Status.INTERNAL.withDescription("no headers received prior to data"); - transportErrorMetadata = new Metadata(); - } - if (transportError != null) { - // We've already detected a transport error and now we're just accumulating more detail - // for it. - transportError = transportError.augmentDescription("DATA-----------------------------\n" - + ReadableBuffers.readAsString(frame, errorCharset)); - frame.close(); - if (transportError.getDescription().length() > 1000 || endOfStream) { - inboundTransportError(transportError, transportErrorMetadata); - // We have enough error detail so lets cancel. - sendCancel(Status.CANCELLED); - } - } else { - inboundDataReceived(frame); - if (endOfStream) { - // This is a protocol violation as we expect to receive trailers. - transportError = - Status.INTERNAL.withDescription("Received unexpected EOS on DATA frame from server."); - transportErrorMetadata = new Metadata(); - inboundTransportError(transportError, transportErrorMetadata); - } - } - } - - /** - * Called by subclasses for the terminal trailer metadata on a stream. - * - * @param trailers the received terminal trailer metadata - */ - protected void transportTrailersReceived(Metadata trailers) { - Preconditions.checkNotNull(trailers, "trailers"); - if (transportError == null && !headersReceived) { - transportError = validateInitialMetadata(trailers); - if (transportError != null) { - transportErrorMetadata = trailers; - } - } - if (transportError != null) { - transportError = transportError.augmentDescription("trailers: " + trailers); - inboundTransportError(transportError, transportErrorMetadata); - sendCancel(Status.CANCELLED); - } else { - Status status = statusFromTrailers(trailers); - stripTransportDetails(trailers); - inboundTrailersReceived(trailers, status); - } - } - - /** - * Extract the response status from trailers. - */ - private Status statusFromTrailers(Metadata trailers) { - Status status = trailers.get(Status.CODE_KEY); - if (status != null) { - return status.withDescription(trailers.get(Status.MESSAGE_KEY)); - } - // No status; something is broken. Try to provide a resonanable error. - if (headersReceived) { - return Status.UNKNOWN.withDescription("missing GRPC status in response"); - } - Integer httpStatus = trailers.get(HTTP2_STATUS); - if (httpStatus != null) { - status = GrpcUtil.httpStatusToGrpcStatus(httpStatus); - } else { - status = Status.INTERNAL.withDescription("missing HTTP status code"); - } - return status.augmentDescription( - "missing GRPC status, inferred error from HTTP status code"); - } - - /** - * Inspect initial headers to make sure they conform to HTTP and gRPC, returning a {@code Status} - * on failure. - * - * @return status with description of failure, or {@code null} when valid - */ - @Nullable - private Status validateInitialMetadata(Metadata headers) { - Integer httpStatus = headers.get(HTTP2_STATUS); - if (httpStatus == null) { - return Status.INTERNAL.withDescription("Missing HTTP status code"); - } - String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY); - if (!GrpcUtil.isGrpcContentType(contentType)) { - return GrpcUtil.httpStatusToGrpcStatus(httpStatus) - .augmentDescription("invalid content-type: " + contentType); - } - return null; - } - - /** - * Inspect the raw metadata and figure out what charset is being used. - */ - private static Charset extractCharset(Metadata headers) { - String contentType = headers.get(GrpcUtil.CONTENT_TYPE_KEY); - if (contentType != null) { - String[] split = contentType.split("charset="); - try { - return Charset.forName(split[split.length - 1].trim()); - } catch (Exception t) { - // Ignore and assume UTF-8 - } - } - return Charsets.UTF_8; - } - - /** - * Strip HTTP transport implementation details so they don't leak via metadata into - * the application layer. - */ - private static void stripTransportDetails(Metadata metadata) { - metadata.discardAll(HTTP2_STATUS); - metadata.discardAll(Status.CODE_KEY); - metadata.discardAll(Status.MESSAGE_KEY); - } -} diff --git a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java deleted file mode 100644 index 0e012ee470..0000000000 --- a/core/src/test/java/io/grpc/internal/AbstractClientStreamTest.java +++ /dev/null @@ -1,288 +0,0 @@ -/* - * Copyright 2015, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package io.grpc.internal; - -import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.verify; - -import io.grpc.Attributes; -import io.grpc.Codec; -import io.grpc.Metadata; -import io.grpc.Status; -import io.grpc.Status.Code; -import io.grpc.internal.AbstractStream.Phase; -import io.grpc.internal.MessageFramerTest.ByteWritableBuffer; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -/** - * Test for {@link AbstractClientStream}. This class tries to test functionality in - * AbstractClientStream, but not in any super classes. - */ -@RunWith(JUnit4.class) -public class AbstractClientStreamTest { - - @Rule public final ExpectedException thrown = ExpectedException.none(); - - @Mock private ClientStreamListener mockListener; - @Captor private ArgumentCaptor statusCaptor; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - private final WritableBufferAllocator allocator = new WritableBufferAllocator() { - @Override - public WritableBuffer allocate(int capacityHint) { - return new ByteWritableBuffer(capacityHint); - } - }; - - @Test - public void cancel_doNotAcceptOk() { - for (Code code : Code.values()) { - ClientStreamListener listener = new NoopClientStreamListener(); - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(listener); - if (code != Code.OK) { - stream.cancel(Status.fromCodeValue(code.value())); - } else { - try { - stream.cancel(Status.fromCodeValue(code.value())); - fail(); - } catch (IllegalArgumentException e) { - // ignore - } - } - } - } - - @Test - public void cancel_failsOnNull() { - ClientStreamListener listener = new NoopClientStreamListener(); - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(listener); - thrown.expect(NullPointerException.class); - - stream.cancel(null); - } - - @Test - public void cancel_notifiesOnlyOnce() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator) { - @Override - protected void sendCancel(Status errorStatus) { - transportReportStatus(errorStatus, true/*stop delivery*/, new Metadata()); - } - }; - stream.start(mockListener); - - stream.cancel(Status.DEADLINE_EXCEEDED); - - verify(mockListener).closed(isA(Status.class), isA(Metadata.class)); - } - - @Test - public void startFailsOnNullListener() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - - thrown.expect(NullPointerException.class); - - stream.start(null); - } - - @Test - public void cantCallStartTwice() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - thrown.expect(IllegalStateException.class); - - stream.start(mockListener); - } - - @Test - public void deframeFailed_notifiesListener() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator) { - @Override - protected void sendCancel(Status errorStatus) { - transportReportStatus(errorStatus, true/*stop delivery*/, new Metadata()); - } - }; - stream.start(mockListener); - - stream.deframeFailed(new RuntimeException("something bad")); - - verify(mockListener).closed(statusCaptor.capture(), isA(Metadata.class)); - assertEquals(Code.UNKNOWN, statusCaptor.getValue().getCode()); - } - - @Test - public void inboundDataReceived_failsOnNullFrame() { - ClientStreamListener listener = new NoopClientStreamListener(); - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(listener); - thrown.expect(NullPointerException.class); - - stream.inboundDataReceived(null); - } - - @Test - public void inboundDataReceived_failsOnNoHeaders() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - stream.inboundPhase(Phase.HEADERS); - - stream.inboundDataReceived(ReadableBuffers.empty()); - - verify(mockListener).closed(statusCaptor.capture(), isA(Metadata.class)); - assertEquals(Code.INTERNAL, statusCaptor.getValue().getCode()); - } - - @Test - public void inboundHeadersReceived_notifiesListener() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - Metadata headers = new Metadata(); - - stream.inboundHeadersReceived(headers); - verify(mockListener).headersRead(headers); - } - - @Test - public void inboundHeadersReceived_failsOnPhaseStatus() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - Metadata headers = new Metadata(); - stream.inboundPhase(Phase.STATUS); - - thrown.expect(IllegalStateException.class); - - stream.inboundHeadersReceived(headers); - } - - @Test - public void inboundHeadersReceived_succeedsOnPhaseMessage() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - Metadata headers = new Metadata(); - stream.inboundPhase(Phase.MESSAGE); - - stream.inboundHeadersReceived(headers); - - verify(mockListener).headersRead(headers); - } - - @Test - public void inboundHeadersReceived_acceptsGzipEncoding() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - Metadata headers = new Metadata(); - headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, new Codec.Gzip().getMessageEncoding()); - - stream.inboundHeadersReceived(headers); - verify(mockListener).headersRead(headers); - } - - @Test - public void inboundHeadersReceived_acceptsIdentityEncoding() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - Metadata headers = new Metadata(); - headers.put(GrpcUtil.MESSAGE_ENCODING_KEY, Codec.Identity.NONE.getMessageEncoding()); - - stream.inboundHeadersReceived(headers); - verify(mockListener).headersRead(headers); - } - - @Test - public void rstStreamClosesStream() { - AbstractClientStream stream = new BaseAbstractClientStream(allocator); - stream.start(mockListener); - // The application will call request when waiting for a message, which will in turn call this - // on the transport thread. - stream.requestMessagesFromDeframer(1); - // Send first byte of 2 byte message - stream.deframe(ReadableBuffers.wrap(new byte[] {0, 0, 0, 0, 2, 1}), false); - Status status = Status.INTERNAL; - // Simulate getting a reset - stream.transportReportStatus(status, false /*stop delivery*/, new Metadata()); - - assertTrue(stream.isClosed()); - } - - /** - * No-op base class for testing. - */ - private static class BaseAbstractClientStream extends AbstractClientStream { - protected BaseAbstractClientStream(WritableBufferAllocator allocator) { - super(allocator, DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP); - } - - @Override - public void setAuthority(String authority) {} - - @Override - public Attributes getAttributes() { - return Attributes.EMPTY; - } - - @Override - public void request(int numMessages) {} - - @Override - protected void sendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {} - - @Override - protected void sendCancel(Status reason) {} - - @Override - public int id() { - return ABSENT_ID; - } - - @Override - protected void returnProcessedBytes(int processedBytes) {} - } -} diff --git a/core/src/test/java/io/grpc/internal/AbstractStreamTest.java b/core/src/test/java/io/grpc/internal/AbstractStreamTest.java deleted file mode 100644 index 446ede1af0..0000000000 --- a/core/src/test/java/io/grpc/internal/AbstractStreamTest.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Copyright 2015, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - - -package io.grpc.internal; - -import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.verify; - -import com.google.common.collect.ImmutableMultimap; -import com.google.common.collect.Multimap; -import io.grpc.internal.AbstractStream.Phase; -import io.grpc.internal.MessageFramerTest.ByteWritableBuffer; -import java.io.InputStream; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -@RunWith(JUnit4.class) -public class AbstractStreamTest { - @Mock private StreamListener streamListener; - - @Mock MessageFramer framer; - @Mock MessageDeframer deframer; - - private final WritableBufferAllocator allocator = new WritableBufferAllocator() { - @Override - public WritableBuffer allocate(int capacityHint) { - return new ByteWritableBuffer(capacityHint); - } - }; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void onStreamAllocated_shouldNotifyReady() { - AbstractStream stream = new AbstractStreamBase(null); - - stream.onStreamAllocated(); - - verify(streamListener).onReady(); - } - - @Test - public void setMessageCompression() { - AbstractStream as = new AbstractStreamBase(framer, deframer); - as.setMessageCompression(true); - - verify(framer).setMessageCompression(true); - } - - @Test - public void validPhaseTransitions() { - AbstractStream stream = new AbstractStreamBase(null); - Multimap validTransitions = ImmutableMultimap.builder() - .put(Phase.HEADERS, Phase.HEADERS) - .put(Phase.HEADERS, Phase.MESSAGE) - .put(Phase.HEADERS, Phase.STATUS) - .put(Phase.MESSAGE, Phase.MESSAGE) - .put(Phase.MESSAGE, Phase.STATUS) - .put(Phase.STATUS, Phase.STATUS) - .build(); - - for (Phase startPhase : Phase.values()) { - for (Phase endPhase : Phase.values()) { - if (validTransitions.containsEntry(startPhase, endPhase)) { - stream.verifyNextPhase(startPhase, endPhase); - } else { - try { - stream.verifyNextPhase(startPhase, endPhase); - fail(); - } catch (IllegalStateException expected) { - // continue - } - } - } - } - } - - /** - * Base class for testing. - */ - private class AbstractStreamBase extends AbstractStream { - private AbstractStreamBase(WritableBufferAllocator bufferAllocator) { - super(allocator, DEFAULT_MAX_MESSAGE_SIZE, StatsTraceContext.NOOP); - } - - private AbstractStreamBase(MessageFramer framer, MessageDeframer deframer) { - super(framer, deframer); - } - - @Override - public void request(int numMessages) { - throw new UnsupportedOperationException(); - } - - @Override - public int id() { - return ABSENT_ID; - } - - @Override - protected StreamListener listener() { - return streamListener; - } - - @Override - protected void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { - throw new UnsupportedOperationException(); - } - - @Override - protected void receiveMessage(InputStream is) { - throw new UnsupportedOperationException(); - } - - @Override - protected void inboundDeliveryPaused() { - throw new UnsupportedOperationException(); - } - - @Override - protected void remoteEndClosed() { - throw new UnsupportedOperationException(); - } - - @Override - protected void returnProcessedBytes(int processedBytes) { - throw new UnsupportedOperationException(); - } - - @Override - protected void deframeFailed(Throwable cause) { - throw new UnsupportedOperationException(); - } - } -} - diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index d9f08e0c03..336621266d 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -69,7 +69,7 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusException; -import io.grpc.internal.AbstractStream; +import io.grpc.internal.AbstractStream2; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; import io.grpc.internal.GrpcUtil; @@ -1121,7 +1121,8 @@ public class OkHttpClientTransportTest { public void notifyOnReady() throws Exception { initTransport(); // exactly one byte below the threshold - int messageLength = AbstractStream.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1; + int messageLength = + AbstractStream2.TransportState.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1; setInitialWindowSize(0); MockStreamListener listener = new MockStreamListener(); OkHttpClientStream stream = clientTransport.newStream(method, new Metadata());