diff --git a/core/src/main/java/io/grpc/transport/AbstractServerStream.java b/core/src/main/java/io/grpc/transport/AbstractServerStream.java index 5a050712f0..b6252a7e99 100644 --- a/core/src/main/java/io/grpc/transport/AbstractServerStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractServerStream.java @@ -31,6 +31,8 @@ package io.grpc.transport; +import static com.google.common.base.Preconditions.checkNotNull; + import com.google.common.base.Preconditions; import io.grpc.Metadata; @@ -68,7 +70,7 @@ public abstract class AbstractServerStream extends AbstractStream * thread. */ public final void setListener(ServerStreamListener listener) { - this.listener = Preconditions.checkNotNull(listener, "listener"); + this.listener = checkNotNull(listener); // Now that the stream has actually been initialized, call the listener's onReady callback if // appropriate. @@ -77,7 +79,7 @@ public abstract class AbstractServerStream extends AbstractStream @Override protected ServerStreamListener listener() { - return this.listener; + return listener; } @Override diff --git a/core/src/main/java/io/grpc/transport/AbstractStream.java b/core/src/main/java/io/grpc/transport/AbstractStream.java index 70f0fe7610..b0c29cdabb 100644 --- a/core/src/main/java/io/grpc/transport/AbstractStream.java +++ b/core/src/main/java/io/grpc/transport/AbstractStream.java @@ -31,11 +31,11 @@ package io.grpc.transport; -import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; -import com.google.common.base.Preconditions; import java.io.InputStream; @@ -44,6 +44,8 @@ import javax.annotation.concurrent.GuardedBy; /** * Abstract base class for {@link Stream} implementations. + * + * @param type of the unique identifier of this stream. */ public abstract class AbstractStream implements Stream { /** @@ -84,12 +86,6 @@ public abstract class AbstractStream implements Stream { */ private int numSentBytesQueued; - /** - * Indicates whether the listener is currently eligible for notification of - * {@link StreamListener#onReady()}. - */ - @GuardedBy("onReadyLock") - private boolean shouldNotifyOnReady = true; /** * Indicates the stream has been created on the connection. This implies that the stream is no * longer limited by MAX_CONCURRENT_STREAMS. @@ -132,6 +128,11 @@ public abstract class AbstractStream implements Stream { deframer = new MessageDeframer(inboundMessageHandler); } + /** + * Override this method to provide a stream listener. + */ + protected abstract StreamListener listener(); + /** * Returns the internal ID for this stream. Note that ID can be {@code null} for client streams * as the transport may defer creating the stream to the remote side until it has a payload or @@ -145,34 +146,14 @@ public abstract class AbstractStream implements Stream { * will be called. Defaults to {@link #DEFAULT_ONREADY_THRESHOLD}. */ public int getOnReadyThreshold() { - return onReadyThreshold; - } - - /** - * Sets the number of queued bytes for a given stream, below which - * {@link StreamListener#onReady()} will be called. If not called, defaults to - * {@link #DEFAULT_ONREADY_THRESHOLD}. - * - *

This must be called from the transport thread, since a listener may be called back directly. - */ - public void setOnReadyThreshold(int onReadyThreshold) { - checkArgument(onReadyThreshold > 0, "onReadyThreshold must be > 0"); - boolean doNotify; synchronized (onReadyLock) { - if (this.onReadyThreshold <= numSentBytesQueued && onReadyThreshold > numSentBytesQueued) { - shouldNotifyOnReady = true; - } - this.onReadyThreshold = onReadyThreshold; - doNotify = needToNotifyOnReady(); - } - if (doNotify) { - listener().onReady(); + return onReadyThreshold; } } @Override public void writeMessage(InputStream message) { - Preconditions.checkNotNull(message, "message"); + checkNotNull(message); outboundPhase(Phase.MESSAGE); if (!framer.isClosed()) { framer.writePayload(message); @@ -190,9 +171,7 @@ public abstract class AbstractStream implements Stream { public final boolean isReady() { if (listener() != null && outboundPhase() != Phase.STATUS) { synchronized (onReadyLock) { - if (allocated && numSentBytesQueued < onReadyThreshold) { - return true; - } + return allocated && numSentBytesQueued < onReadyThreshold; } } return false; @@ -221,11 +200,6 @@ public abstract class AbstractStream implements Stream { framer.dispose(); } - /** - * Gets the listener to this stream. - */ - protected abstract StreamListener listener(); - /** * Sends an outbound frame to the remote end point. * @@ -312,18 +286,13 @@ public abstract class AbstractStream implements Stream { * StreamListener#onReady()} handler if appropriate. This must be called from the transport * thread, since the listener may be called back directly. */ - protected void onStreamAllocated() { - boolean doNotify; + protected final void onStreamAllocated() { + checkState(listener() != null); synchronized (onReadyLock) { - if (allocated) { - throw new IllegalStateException("Already allocated"); - } + checkState(!allocated, "Already allocated"); allocated = true; - doNotify = needToNotifyOnReady(); - } - if (doNotify) { - listener().onReady(); } + notifyIfReady(); } /** @@ -335,9 +304,6 @@ public abstract class AbstractStream implements Stream { protected final void onSendingBytes(int numBytes) { synchronized (onReadyLock) { numSentBytesQueued += numBytes; - if (!isReady()) { - shouldNotifyOnReady = true; - } } } @@ -351,30 +317,27 @@ public abstract class AbstractStream implements Stream { protected final void onSentBytes(int numBytes) { boolean doNotify; synchronized (onReadyLock) { + boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold; numSentBytesQueued -= numBytes; - doNotify = needToNotifyOnReady(); + boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold; + doNotify = !belowThresholdBefore && belowThresholdAfter; + } + if (doNotify) { + notifyIfReady(); + } + } + + @VisibleForTesting + final void notifyIfReady() { + boolean doNotify = false; + synchronized (onReadyLock) { + doNotify = isReady(); } if (doNotify) { listener().onReady(); } } - /** - * Determines whether or not we need to call the {@link StreamListener#onReady()} handler now. - * Calling this method has the side-effect of unsetting {@link #shouldNotifyOnReady} so the - * handler should always be invoked immediately after calling this method. - */ - @GuardedBy("onReadyLock") - private boolean needToNotifyOnReady() { - if (shouldNotifyOnReady && isReady()) { - // Returning true here counts as a call to the onReady callback, so - // unset the flag. - shouldNotifyOnReady = false; - return true; - } - return false; - } - final Phase inboundPhase() { return inboundPhase; } @@ -405,7 +368,8 @@ public abstract class AbstractStream implements Stream { return tmp; } - private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { + @VisibleForTesting + Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { if (nextPhase.ordinal() < currentPhase.ordinal()) { throw new IllegalStateException( String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase)); @@ -449,6 +413,5 @@ public abstract class AbstractStream implements Stream { .add("id", id()) .add("inboundPhase", inboundPhase().name()) .add("outboundPhase", outboundPhase().name()); - } } diff --git a/core/src/test/java/io/grpc/transport/AbstractStreamTest.java b/core/src/test/java/io/grpc/transport/AbstractStreamTest.java new file mode 100644 index 0000000000..7f5b121e05 --- /dev/null +++ b/core/src/test/java/io/grpc/transport/AbstractStreamTest.java @@ -0,0 +1,155 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + + +package io.grpc.transport; + +import static org.junit.Assert.fail; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Multimap; + +import io.grpc.transport.AbstractStream.Phase; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.io.InputStream; + +import javax.annotation.Nullable; + +@RunWith(JUnit4.class) +public class AbstractStreamTest { + @Mock private StreamListener streamListener; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void onStreamAllocated_shouldNotifyReady() { + AbstractStream stream = new AbstractStreamBase(null); + + stream.onStreamAllocated(); + + verify(streamListener).onReady(); + } + + @Test + public void validPhaseTransitions() { + AbstractStream stream = new AbstractStreamBase(null); + Multimap validTransitions = ImmutableMultimap.builder() + .put(Phase.HEADERS, Phase.HEADERS) + .put(Phase.HEADERS, Phase.MESSAGE) + .put(Phase.HEADERS, Phase.STATUS) + .put(Phase.MESSAGE, Phase.MESSAGE) + .put(Phase.MESSAGE, Phase.STATUS) + .put(Phase.STATUS, Phase.STATUS) + .build(); + + for (Phase startPhase : Phase.values()) { + for (Phase endPhase : Phase.values()) { + if (validTransitions.containsEntry(startPhase, endPhase)) { + stream.verifyNextPhase(startPhase, endPhase); + } else { + try { + stream.verifyNextPhase(startPhase, endPhase); + fail(); + } catch (IllegalStateException expected) { + // continue + } + } + } + } + } + + /** + * Base class for testing. + */ + private class AbstractStreamBase extends AbstractStream { + private AbstractStreamBase(WritableBufferAllocator bufferAllocator) { + super(bufferAllocator); + } + + @Override + public void request(int numMessages) { + throw new UnsupportedOperationException(); + } + + @Override + @Nullable + public IdT id() { + throw new UnsupportedOperationException(); + } + + @Override + protected StreamListener listener() { + return streamListener; + } + + @Override + protected void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) { + throw new UnsupportedOperationException(); + } + + @Override + protected void receiveMessage(InputStream is) { + throw new UnsupportedOperationException(); + } + + @Override + protected void inboundDeliveryPaused() { + throw new UnsupportedOperationException(); + } + + @Override + protected void remoteEndClosed() { + throw new UnsupportedOperationException(); + } + + @Override + protected void returnProcessedBytes(int processedBytes) { + throw new UnsupportedOperationException(); + } + + @Override + protected void deframeFailed(Throwable cause) { + throw new UnsupportedOperationException(); + } + } +} + diff --git a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java index 09526c222e..cec61987d4 100644 --- a/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/transport/okhttp/OkHttpClientTransportTest.java @@ -72,6 +72,7 @@ import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; import io.grpc.StatusException; +import io.grpc.transport.AbstractStream; import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientTransport; import io.grpc.transport.HttpUtil; @@ -926,12 +927,12 @@ public class OkHttpClientTransportTest { @Test public void notifyOnReady() throws Exception { initTransport(); - final int messageLength = 15; + // exactly one byte below the threshold + int messageLength = AbstractStream.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1; setInitialWindowSize(0); MockStreamListener listener = new MockStreamListener(); OkHttpClientStream stream = clientTransport.newStream( - method,new Metadata.Headers(), listener); - stream.setOnReadyThreshold(HEADER_LENGTH + 20); + method, new Metadata.Headers(), listener); assertTrue(stream.isReady()); // Be notified at the beginning. assertTrue(listener.isOnReadyCalled()); @@ -964,18 +965,6 @@ public class OkHttpClientTransportTest { assertTrue(stream.isReady()); assertTrue(listener.isOnReadyCalled()); - // Now the first message is still in the queue, and it's size is smaller than the threshold. - // Increase the threshold should have no affection. - stream.setOnReadyThreshold(messageLength * 10); - assertFalse(listener.isOnReadyCalled()); - // Decrease the threshold should have no affection too. - stream.setOnReadyThreshold(HEADER_LENGTH); - assertFalse(listener.isOnReadyCalled()); - // But now increase the threshold to larger than the queued message size, onReady should be - // triggered. - stream.setOnReadyThreshold(HEADER_LENGTH + messageLength + 1); - assertTrue(listener.isOnReadyCalled()); - stream.cancel(Status.CANCELLED); }