Merge branch 'master' of github.com:carl-mastrangelo/grpc-java

This commit is contained in:
Carl Mastrangelo 2015-07-22 15:30:30 -07:00
commit 977c3d1c77
4 changed files with 195 additions and 86 deletions

View File

@ -31,6 +31,8 @@
package io.grpc.transport;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Preconditions;
import io.grpc.Metadata;
@ -68,7 +70,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
* thread.
*/
public final void setListener(ServerStreamListener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
this.listener = checkNotNull(listener);
// Now that the stream has actually been initialized, call the listener's onReady callback if
// appropriate.
@ -77,7 +79,7 @@ public abstract class AbstractServerStream<IdT> extends AbstractStream<IdT>
@Override
protected ServerStreamListener listener() {
return this.listener;
return listener;
}
@Override

View File

@ -31,11 +31,11 @@
package io.grpc.transport;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import java.io.InputStream;
@ -44,6 +44,8 @@ import javax.annotation.concurrent.GuardedBy;
/**
* Abstract base class for {@link Stream} implementations.
*
* @param <IdT> type of the unique identifier of this stream.
*/
public abstract class AbstractStream<IdT> implements Stream {
/**
@ -84,12 +86,6 @@ public abstract class AbstractStream<IdT> implements Stream {
*/
private int numSentBytesQueued;
/**
* Indicates whether the listener is currently eligible for notification of
* {@link StreamListener#onReady()}.
*/
@GuardedBy("onReadyLock")
private boolean shouldNotifyOnReady = true;
/**
* Indicates the stream has been created on the connection. This implies that the stream is no
* longer limited by MAX_CONCURRENT_STREAMS.
@ -132,6 +128,11 @@ public abstract class AbstractStream<IdT> implements Stream {
deframer = new MessageDeframer(inboundMessageHandler);
}
/**
* Override this method to provide a stream listener.
*/
protected abstract StreamListener listener();
/**
* Returns the internal ID for this stream. Note that ID can be {@code null} for client streams
* as the transport may defer creating the stream to the remote side until it has a payload or
@ -145,34 +146,14 @@ public abstract class AbstractStream<IdT> implements Stream {
* will be called. Defaults to {@link #DEFAULT_ONREADY_THRESHOLD}.
*/
public int getOnReadyThreshold() {
return onReadyThreshold;
}
/**
* Sets the number of queued bytes for a given stream, below which
* {@link StreamListener#onReady()} will be called. If not called, defaults to
* {@link #DEFAULT_ONREADY_THRESHOLD}.
*
* <p>This must be called from the transport thread, since a listener may be called back directly.
*/
public void setOnReadyThreshold(int onReadyThreshold) {
checkArgument(onReadyThreshold > 0, "onReadyThreshold must be > 0");
boolean doNotify;
synchronized (onReadyLock) {
if (this.onReadyThreshold <= numSentBytesQueued && onReadyThreshold > numSentBytesQueued) {
shouldNotifyOnReady = true;
}
this.onReadyThreshold = onReadyThreshold;
doNotify = needToNotifyOnReady();
}
if (doNotify) {
listener().onReady();
return onReadyThreshold;
}
}
@Override
public void writeMessage(InputStream message) {
Preconditions.checkNotNull(message, "message");
checkNotNull(message);
outboundPhase(Phase.MESSAGE);
if (!framer.isClosed()) {
framer.writePayload(message);
@ -190,9 +171,7 @@ public abstract class AbstractStream<IdT> implements Stream {
public final boolean isReady() {
if (listener() != null && outboundPhase() != Phase.STATUS) {
synchronized (onReadyLock) {
if (allocated && numSentBytesQueued < onReadyThreshold) {
return true;
}
return allocated && numSentBytesQueued < onReadyThreshold;
}
}
return false;
@ -221,11 +200,6 @@ public abstract class AbstractStream<IdT> implements Stream {
framer.dispose();
}
/**
* Gets the listener to this stream.
*/
protected abstract StreamListener listener();
/**
* Sends an outbound frame to the remote end point.
*
@ -312,18 +286,13 @@ public abstract class AbstractStream<IdT> implements Stream {
* StreamListener#onReady()} handler if appropriate. This must be called from the transport
* thread, since the listener may be called back directly.
*/
protected void onStreamAllocated() {
boolean doNotify;
protected final void onStreamAllocated() {
checkState(listener() != null);
synchronized (onReadyLock) {
if (allocated) {
throw new IllegalStateException("Already allocated");
}
checkState(!allocated, "Already allocated");
allocated = true;
doNotify = needToNotifyOnReady();
}
if (doNotify) {
listener().onReady();
}
notifyIfReady();
}
/**
@ -335,9 +304,6 @@ public abstract class AbstractStream<IdT> implements Stream {
protected final void onSendingBytes(int numBytes) {
synchronized (onReadyLock) {
numSentBytesQueued += numBytes;
if (!isReady()) {
shouldNotifyOnReady = true;
}
}
}
@ -351,30 +317,27 @@ public abstract class AbstractStream<IdT> implements Stream {
protected final void onSentBytes(int numBytes) {
boolean doNotify;
synchronized (onReadyLock) {
boolean belowThresholdBefore = numSentBytesQueued < onReadyThreshold;
numSentBytesQueued -= numBytes;
doNotify = needToNotifyOnReady();
boolean belowThresholdAfter = numSentBytesQueued < onReadyThreshold;
doNotify = !belowThresholdBefore && belowThresholdAfter;
}
if (doNotify) {
notifyIfReady();
}
}
@VisibleForTesting
final void notifyIfReady() {
boolean doNotify = false;
synchronized (onReadyLock) {
doNotify = isReady();
}
if (doNotify) {
listener().onReady();
}
}
/**
* Determines whether or not we need to call the {@link StreamListener#onReady()} handler now.
* Calling this method has the side-effect of unsetting {@link #shouldNotifyOnReady} so the
* handler should always be invoked immediately after calling this method.
*/
@GuardedBy("onReadyLock")
private boolean needToNotifyOnReady() {
if (shouldNotifyOnReady && isReady()) {
// Returning true here counts as a call to the onReady callback, so
// unset the flag.
shouldNotifyOnReady = false;
return true;
}
return false;
}
final Phase inboundPhase() {
return inboundPhase;
}
@ -405,7 +368,8 @@ public abstract class AbstractStream<IdT> implements Stream {
return tmp;
}
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
@VisibleForTesting
Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal()) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
@ -449,6 +413,5 @@ public abstract class AbstractStream<IdT> implements Stream {
.add("id", id())
.add("inboundPhase", inboundPhase().name())
.add("outboundPhase", outboundPhase().name());
}
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 2015, Google Inc. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
*
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
package io.grpc.transport;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.verify;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import io.grpc.transport.AbstractStream.Phase;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.io.InputStream;
import javax.annotation.Nullable;
@RunWith(JUnit4.class)
public class AbstractStreamTest {
@Mock private StreamListener streamListener;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}
@Test
public void onStreamAllocated_shouldNotifyReady() {
AbstractStream<Object> stream = new AbstractStreamBase<Object>(null);
stream.onStreamAllocated();
verify(streamListener).onReady();
}
@Test
public void validPhaseTransitions() {
AbstractStream<Object> stream = new AbstractStreamBase<Object>(null);
Multimap<Phase, Phase> validTransitions = ImmutableMultimap.<Phase, Phase>builder()
.put(Phase.HEADERS, Phase.HEADERS)
.put(Phase.HEADERS, Phase.MESSAGE)
.put(Phase.HEADERS, Phase.STATUS)
.put(Phase.MESSAGE, Phase.MESSAGE)
.put(Phase.MESSAGE, Phase.STATUS)
.put(Phase.STATUS, Phase.STATUS)
.build();
for (Phase startPhase : Phase.values()) {
for (Phase endPhase : Phase.values()) {
if (validTransitions.containsEntry(startPhase, endPhase)) {
stream.verifyNextPhase(startPhase, endPhase);
} else {
try {
stream.verifyNextPhase(startPhase, endPhase);
fail();
} catch (IllegalStateException expected) {
// continue
}
}
}
}
}
/**
* Base class for testing.
*/
private class AbstractStreamBase<IdT> extends AbstractStream<IdT> {
private AbstractStreamBase(WritableBufferAllocator bufferAllocator) {
super(bufferAllocator);
}
@Override
public void request(int numMessages) {
throw new UnsupportedOperationException();
}
@Override
@Nullable
public IdT id() {
throw new UnsupportedOperationException();
}
@Override
protected StreamListener listener() {
return streamListener;
}
@Override
protected void internalSendFrame(WritableBuffer frame, boolean endOfStream, boolean flush) {
throw new UnsupportedOperationException();
}
@Override
protected void receiveMessage(InputStream is) {
throw new UnsupportedOperationException();
}
@Override
protected void inboundDeliveryPaused() {
throw new UnsupportedOperationException();
}
@Override
protected void remoteEndClosed() {
throw new UnsupportedOperationException();
}
@Override
protected void returnProcessedBytes(int processedBytes) {
throw new UnsupportedOperationException();
}
@Override
protected void deframeFailed(Throwable cause) {
throw new UnsupportedOperationException();
}
}
}

View File

@ -72,6 +72,7 @@ import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.transport.AbstractStream;
import io.grpc.transport.ClientStreamListener;
import io.grpc.transport.ClientTransport;
import io.grpc.transport.HttpUtil;
@ -926,12 +927,12 @@ public class OkHttpClientTransportTest {
@Test
public void notifyOnReady() throws Exception {
initTransport();
final int messageLength = 15;
// exactly one byte below the threshold
int messageLength = AbstractStream.DEFAULT_ONREADY_THRESHOLD - HEADER_LENGTH - 1;
setInitialWindowSize(0);
MockStreamListener listener = new MockStreamListener();
OkHttpClientStream stream = clientTransport.newStream(
method,new Metadata.Headers(), listener);
stream.setOnReadyThreshold(HEADER_LENGTH + 20);
method, new Metadata.Headers(), listener);
assertTrue(stream.isReady());
// Be notified at the beginning.
assertTrue(listener.isOnReadyCalled());
@ -964,18 +965,6 @@ public class OkHttpClientTransportTest {
assertTrue(stream.isReady());
assertTrue(listener.isOnReadyCalled());
// Now the first message is still in the queue, and it's size is smaller than the threshold.
// Increase the threshold should have no affection.
stream.setOnReadyThreshold(messageLength * 10);
assertFalse(listener.isOnReadyCalled());
// Decrease the threshold should have no affection too.
stream.setOnReadyThreshold(HEADER_LENGTH);
assertFalse(listener.isOnReadyCalled());
// But now increase the threshold to larger than the queued message size, onReady should be
// triggered.
stream.setOnReadyThreshold(HEADER_LENGTH + messageLength + 1);
assertTrue(listener.isOnReadyCalled());
stream.cancel(Status.CANCELLED);
}