Added basic unit test and reorg how onready calls are made

Forgot to add this last file

updated method name

Remove unused function

Remove helper function for threshold edge detection

Remove helper function for threshold edge detection

Re make listener abstract

Added copyright

removed unused field
This commit is contained in:
Carl Mastrangelo 2015-07-14 13:23:13 -07:00
parent 8bd8ed879f
commit 24a9f39e46
4 changed files with 195 additions and 86 deletions

View File

@ -31,6 +31,8 @@
package io.grpc.transport; package io.grpc.transport;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import io.grpc.Metadata; import io.grpc.Metadata;
@ -68,7 +70,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* thread. * thread.
*/ */
public final void setListener(ServerStreamListener listener) { public final void setListener(ServerStreamListener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener"); this.listener = checkNotNull(listener);
// Now that the stream has actually been initialized, call the listener's onReady callback if // Now that the stream has actually been initialized, call the listener's onReady callback if
// appropriate. // appropriate.
@ -77,7 +79,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
@Override @Override
protected ServerStreamListener listener() { protected ServerStreamListener listener() {
return this.listener; return listener;
} }
@Override @Override

View File

@ -31,11 +31,11 @@
package io.grpc.transport; package io.grpc.transport;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.io.InputStream; import java.io.InputStream;
@ -44,6 +44,8 @@ import javax.annotation.concurrent.GuardedBy;
/** /**
* Abstract base class for {@link Stream} implementations. * Abstract base class for {@link Stream} implementations.
*
* @param <IdT> type of the unique identifier of this stream.
*/ */
public abstract class AbstractStream<IdT> implements Stream { public abstract class AbstractStream<IdT> implements Stream {
/** /**
@ -84,12 +86,6 @@ public abstract class AbstractStream<IdT> implements Stream {
*/ */
private int numSentBytesQueued; private int numSentBytesQueued;
/**
* Indicates whether the listener is currently eligible for notification of
* {@link StreamListener#onReady()}.
*/
@GuardedBy("onReadyLock")
private boolean shouldNotifyOnReady = true;
/** /**
* Indicates the stream has been created on the connection. This implies that the stream is no * Indicates the stream has been created on the connection. This implies that the stream is no
* longer limited by MAX_CONCURRENT_STREAMS. * longer limited by MAX_CONCURRENT_STREAMS.
@ -132,6 +128,11 @@ public abstract class AbstractStream<IdT> implements Stream {
deframer = new MessageDeframer(inboundMessageHandler); deframer = new MessageDeframer(inboundMessageHandler);
} }
/**
* Override this method to provide a stream listener.
*/
protected abstract StreamListener listener();
/** /**
* Returns the internal ID for this stream. Note that ID can be {@code null} for client streams * Returns the internal ID for this stream. Note that ID can be {@code null} for client streams
* as the transport may defer creating the stream to the remote side until it has a payload or * as the transport may defer creating the stream to the remote side until it has a payload or
@ -145,34 +146,14 @@ public abstract class AbstractStream<IdT> implements Stream {
* will be called. Defaults to {@link #DEFAULT_ONREADY_THRESHOLD}. * will be called. Defaults to {@link #DEFAULT_ONREADY_THRESHOLD}.
*/ */
public int getOnReadyThreshold() { public int getOnReadyThreshold() {
return onReadyThreshold;
}
/**
* Sets the number of queued bytes for a given stream, below which
* {@link StreamListener#onReady()} will be called. If not called, defaults to
* {@link #DEFAULT_ONREADY_THRESHOLD}.
*
* <p>This must be called from the transport thread, since a listener may be called back directly.
*/
public void setOnReadyThreshold(int onReadyThreshold) {
checkArgument(onReadyThreshold > 0, "onReadyThreshold must be > 0");
boolean doNotify;
synchronized (onReadyLock) { synchronized (onReadyLock) {
if (this.onReadyThreshold <= numSentBytesQueued && onReadyThreshold > numSentBytesQueued) { return onReadyThreshold;
shouldNotifyOnReady = true;
}
this.onReadyThreshold = onReadyThreshold;
doNotify = needToNotifyOnReady();
}
if (doNotify) {
listener().onReady();
} }
} }
@Override @Override
public void writeMessage(InputStream message) { public void writeMessage(InputStream message) {
Preconditions.checkNotNull(message, "message"); checkNotNull(message);
outboundPhase(Phase.MESSAGE); outboundPhase(Phase.MESSAGE);
if (!framer.isClosed()) { if (!framer.isClosed()) {
framer.writePayload(message); framer.writePayload(message);
@ -190,9 +171,7 @@ public abstract class AbstractStream<IdT> implements Stream {
public final boolean isReady() { public final boolean isReady() {
if (listener() != null && outboundPhase() != Phase.STATUS) { if (listener() != null && outboundPhase() != Phase.STATUS) {
synchronized (onReadyLock) { synchronized (onReadyLock) {
if (allocated && numSentBytesQueued < onReadyThreshold) { return allocated && numSentBytesQueued < onReadyThreshold;
return true;
}
} }
} }
return false; return false;
@ -221,11 +200,6 @@ public abstract class AbstractStream<IdT> implements Stream {
framer.dispose(); framer.dispose();
} }
/**
* Gets the listener to this stream.
*/
protected abstract StreamListener listener();
/** /**
* Sends an outbound frame to the remote end point. * Sends an outbound frame to the remote end point.
* *
@ -312,18 +286,13 @@ public abstract class AbstractStream<IdT> implements Stream {
* StreamListener#onReady()} handler if appropriate. This must be called from the transport * StreamListener#onReady()} handler if appropriate. This must be called from the transport
* thread, since the listener may be called back directly. * thread, since the listener may be called back directly.
*/ */
protected void onStreamAllocated() { protected final void onStreamAllocated() {
boolean doNotify; checkState(listener() != null);
synchronized (onReadyLock) { synchronized (onReadyLock) {
if (allocated) { checkState(!allocated, "Already allocated");
throw new IllegalStateException("Already allocated");
}
allocated = true; allocated = true;
doNotify = needToNotifyOnReady();
}
if (doNotify) {
listener().onReady();
} }
notifyIfReady();
} }
/** /**
@ -335,9 +304,6 @@ public abstract class AbstractStream<IdT> implements Stream {
protected final void onSendingBytes(int numBytes) { protected final void onSendingBytes(int numBytes) {
synchronized (onReadyLock) { synchronized (onReadyLock) {
numSentBytesQueued += numBytes; numSentBytesQueued += numBytes;
if (!isReady()) {
shouldNotifyOnReady = true;
}
} }
} }
@ -351,30 +317,27 @@ public abstract class AbstractStream<IdT> implements Stream {
protected final void onSentBytes(int numBytes) { protected final void onSentBytes(int numBytes) {
boolean doNotify; boolean doNotify;
synchronized (onReadyLock) { synchronized (onReadyLock) {
boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
numSentBytesQueued -= numBytes; numSentBytesQueued -= numBytes;
doNotify = needToNotifyOnReady(); boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
doNotify = !belowThresholdBefore && belowThresholdAfter;
}
if (doNotify) {
notifyIfReady();
}
}
@VisibleForTesting
final void notifyIfReady() {
boolean doNotify = false;
synchronized (onReadyLock) {
doNotify = isReady();
} }
if (doNotify) { if (doNotify) {
listener().onReady(); listener().onReady();
} }
} }
/**
* Determines whether or not we need to call the {@link StreamListener#onReady()} handler now.
* Calling this method has the side-effect of unsetting {@link #shouldNotifyOnReady} so the
* handler should always be invoked immediately after calling this method.
*/
@GuardedBy("onReadyLock")
private boolean needToNotifyOnReady() {
if (shouldNotifyOnReady && isReady()) {
// Returning true here counts as a call to the onReady callback, so
// unset the flag.
shouldNotifyOnReady = false;
return true;
}
return false;
}
final Phase inboundPhase() { final Phase inboundPhase() {
return inboundPhase; return inboundPhase;
} }
@ -405,7 +368,8 @@ public abstract class AbstractStream<IdT> implements Stream {
return tmp; return tmp;
} }
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) { @VisibleForTesting
Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal()) { if (nextPhase.ordinal() < currentPhase.ordinal()) {
throw new IllegalStateException( throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase)); String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
@ -449,6 +413,5 @@ public abstract class AbstractStream<IdT> implements Stream {
.add("id", id()) .add("id", id())
.add("inboundPhase", inboundPhase().name()) .add("inboundPhase", inboundPhase().name())
.add("outboundPhase", outboundPhase().name()); .add("outboundPhase", outboundPhase().name());
} }
} }

View File

@ -0,0 +1,155 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import io.grpc.transport.AbstractStream.Phase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.InputStream;
import javax.annotation.Nullable;
@RunWith(JUnit4.class)
public class AbstractStreamTest {
@Mock private StreamListener streamListener;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void onStreamAllocated_shouldNotifyReady() {
AbstractStream<Object> stream = new AbstractStreamBase<Object>(null);
stream.onStreamAllocated();
verify(streamListener).onReady();
}
@Test
public void validPhaseTransitions() {
AbstractStream<Object> stream = new AbstractStreamBase<Object>(null);
Multimap<Phase, Phase> validTransitions = ImmutableMultimap.<Phase, Phase>builder()
.put(Phase.HEADERS, Phase.HEADERS)
.put(Phase.HEADERS, Phase.MESSAGE)
.put(Phase.HEADERS, Phase.STATUS)
.put(Phase.MESSAGE, Phase.MESSAGE)
.put(Phase.MESSAGE, Phase.STATUS)
.put(Phase.STATUS, Phase.STATUS)
.build();
for (Phase startPhase : Phase.values()) {
for (Phase endPhase : Phase.values()) {
if (validTransitions.containsEntry(startPhase, endPhase)) {
stream.verifyNextPhase(startPhase, endPhase);
} else {
try {
stream.verifyNextPhase(startPhase, endPhase);
fail();
} catch (IllegalStateException expected) {
// continue
}
}
}
}
}
/**
* Base class for testing.
*/
private class AbstractStreamBase<IdT> extends AbstractStream<IdT> {
private AbstractStreamBase(WritableBufferAllocator bufferAllocator) {
super(bufferAllocator);
}
@Override
public void request(int numMessages) {
throw new UnsupportedOperationException();
}
@Override
@Nullable
public IdT id() {
throw new UnsupportedOperationException();
}
@Override
protected StreamListener listener() {
return streamListener;
}
@Override
protected void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
throw new UnsupportedOperationException();
}
@Override
protected void receiveMessage(InputStream is) {
throw new UnsupportedOperationException();
}
@Override
protected void inboundDeliveryPaused() {
throw new UnsupportedOperationException();
}
@Override
protected void remoteEndClosed() {
throw new UnsupportedOperationException();
}
@Override
protected void returnProcessedBytes(int processedBytes) {
throw new UnsupportedOperationException();
}
@Override
protected void deframeFailed(Throwable cause) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -72,6 +72,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType; import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status; import io.grpc.Status;
import io.grpc.StatusException; import io.grpc.StatusException;
import io.grpc.transport.AbstractStream;
import io.grpc.transport.ClientStreamListener; import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.ClientTransport; import io.grpc.transport.ClientTransport;
import io.grpc.transport.HttpUtil; import io.grpc.transport.HttpUtil;
@ -926,12 +927,12 @@ public class OkHttpClientTransportTest {
@Test @Test
public void notifyOnReady() throws Exception { public void notifyOnReady() throws Exception {
initTransport(); initTransport();
final int messageLength = 15; // exactly one byte below the threshold
int messageLength = AbstractStream.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1;
setInitialWindowSize(0); setInitialWindowSize(0);
MockStreamListener listener = new MockStreamListener(); MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = clientTransport.newStream( OkHttpClientStream stream = clientTransport.newStream(
method, new Metadata.Headers(), listener); method, new Metadata.Headers(), listener);
stream.setOnReadyThreshold(HEADER_LENGTH + 20);
assertTrue(stream.isReady()); assertTrue(stream.isReady());
// Be notified at the beginning. // Be notified at the beginning.
assertTrue(listener.isOnReadyCalled()); assertTrue(listener.isOnReadyCalled());
@ -964,18 +965,6 @@ public class OkHttpClientTransportTest {
assertTrue(stream.isReady()); assertTrue(stream.isReady());
assertTrue(listener.isOnReadyCalled()); assertTrue(listener.isOnReadyCalled());
// Now the first message is still in the queue, and it's size is smaller than the threshold.
// Increase the threshold should have no affection.
stream.setOnReadyThreshold(messageLength * 10);
assertFalse(listener.isOnReadyCalled());
// Decrease the threshold should have no affection too.
stream.setOnReadyThreshold(HEADER_LENGTH);
assertFalse(listener.isOnReadyCalled());
// But now increase the threshold to larger than the queued message size, onReady should be
// triggered.
stream.setOnReadyThreshold(HEADER_LENGTH + messageLength + 1);
assertTrue(listener.isOnReadyCalled());
stream.cancel(Status.CANCELLED); stream.cancel(Status.CANCELLED);
} }