From 6fc356b13de337aed441713a7f1ef77eb651f056 Mon Sep 17 00:00:00 2001 From: ejona Date: Mon, 22 Sep 2014 12:49:20 -0700 Subject: [PATCH] Split out a ClientStreamListener from StreamListener. Headers and trailers are only received on client-side, so we need a client-specific listener. Close() also has slightly different semantics between server-side and client-side. Most of the changes are simple name changes, but AbstractServerStream does update to the new close() semantics. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=76094225 --- .../com/google/net/stubby/ChannelImpl.java | 8 ++--- .../net/stubby/SessionClientStream.java | 6 ++-- .../net/stubby/SessionClientTransport.java | 4 +-- .../newtransport/AbstractClientStream.java | 4 +-- .../newtransport/AbstractClientTransport.java | 4 +-- .../newtransport/AbstractServerStream.java | 27 +++++--------- .../newtransport/ClientStreamListener.java | 35 +++++++++++++++++++ .../stubby/newtransport/ClientTransport.java | 6 ++-- .../ForwardingStreamListener.java | 34 ------------------ .../newtransport/ServerStreamListener.java | 15 ++++++++ .../stubby/newtransport/StreamListener.java | 27 -------------- .../http/HttpClientTransport.java | 6 ++-- .../newtransport/netty/NettyClientStream.java | 4 +-- .../netty/NettyClientTransport.java | 4 +-- .../okhttp/OkHttpClientTransport.java | 6 ++-- .../netty/NettyClientStreamTest.java | 9 +++++ .../netty/NettyServerHandlerTest.java | 3 +- .../netty/NettyServerStreamTest.java | 13 +++---- .../netty/NettyStreamTestBase.java | 7 +--- .../okhttp/OkHttpClientTransportTest.java | 4 +-- 20 files changed, 103 insertions(+), 123 deletions(-) create mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ClientStreamListener.java delete mode 100644 core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java index f44af56f36..5be4ee69eb 100644 --- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java +++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java @@ -7,9 +7,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientTransport; import com.google.net.stubby.newtransport.ClientTransportFactory; -import com.google.net.stubby.newtransport.StreamListener; import java.io.IOException; import java.io.InputStream; @@ -139,7 +139,7 @@ public final class ChannelImpl extends AbstractService implements Channel { public void start(Listener observer, Metadata.Headers headers) { Preconditions.checkState(stream == null, "Already started"); stream = obtainActiveTransport().newStream(method, headers, - new StreamListenerImpl(observer)); + new ClientStreamListenerImpl(observer)); } @Override @@ -191,10 +191,10 @@ public final class ChannelImpl extends AbstractService implements Channel { } } - private class StreamListenerImpl implements StreamListener { + private class ClientStreamListenerImpl implements ClientStreamListener { private final Listener observer; - public StreamListenerImpl(Listener observer) { + public ClientStreamListenerImpl(Listener observer) { Preconditions.checkNotNull(observer); this.observer = observer; } diff --git a/core/src/main/java/com/google/net/stubby/SessionClientStream.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java index c794bad0d6..17883be3f0 100644 --- a/core/src/main/java/com/google/net/stubby/SessionClientStream.java +++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java @@ -1,7 +1,7 @@ package com.google.net.stubby; import com.google.net.stubby.newtransport.ClientStream; -import com.google.net.stubby.newtransport.StreamListener; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.transport.Transport; @@ -14,14 +14,14 @@ import java.io.InputStream; */ // TODO(user): Delete this class when new transport interfaces are introduced public class SessionClientStream implements ClientStream { - private final StreamListener listener; + private final ClientStreamListener listener; /** * The {@link Request} used by the stub to dispatch the call */ private Request request; private Response response; - public SessionClientStream(StreamListener listener) { + public SessionClientStream(ClientStreamListener listener) { this.listener = listener; } diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java index e92e96de6a..fc6da298dc 100644 --- a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java @@ -2,8 +2,8 @@ package com.google.net.stubby; import com.google.common.util.concurrent.AbstractService; import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientTransport; -import com.google.net.stubby.newtransport.StreamListener; /** * Shim between Session and Channel. Will be removed when Session is removed. @@ -28,7 +28,7 @@ public class SessionClientTransport extends AbstractService implements ClientTra @Override public ClientStream newStream(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener) { + ClientStreamListener listener) { final SessionClientStream stream = new SessionClientStream(listener); Request request = session.startRequest(method.getName(), headers, stream.responseBuilder()); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java index 0b81babd5a..6f2d143d59 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientStream.java @@ -19,7 +19,7 @@ import javax.annotation.concurrent.GuardedBy; */ public abstract class AbstractClientStream extends AbstractStream implements ClientStream { - private final StreamListener listener; + private final ClientStreamListener listener; @GuardedBy("stateLock") private Status status; @@ -30,7 +30,7 @@ public abstract class AbstractClientStream extends AbstractStream implements Cli private Status stashedStatus; private Metadata.Trailers stashedTrailers; - protected AbstractClientStream(StreamListener listener) { + protected AbstractClientStream(ClientStreamListener listener) { this.listener = Preconditions.checkNotNull(listener); } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java index ebd235c755..c75faea7f8 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractClientTransport.java @@ -15,7 +15,7 @@ public abstract class AbstractClientTransport extends AbstractService implements @Override public final ClientStream newStream(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener) { + ClientStreamListener listener) { Preconditions.checkNotNull(method, "method"); Preconditions.checkNotNull(listener, "listener"); if (state() == State.STARTING) { @@ -42,5 +42,5 @@ public abstract class AbstractClientTransport extends AbstractService implements */ protected abstract ClientStream newStreamInternal(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener); + ClientStreamListener listener); } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java index 12048448aa..4c7d33fbf4 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/AbstractServerStream.java @@ -27,9 +27,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser /** Whether listener.closed() has been called. */ @GuardedBy("stateLock") private boolean listenerClosed; - /** Saved application status for notifying when graceful stream termination completes. */ - @GuardedBy("stateLock") - private Status gracefulStatus; + /** Whether the stream was closed gracefull by the application (vs. a transport-level failure). */ + private boolean gracefulClose; /** Saved trailers from close() that need to be sent once the framer has sent all messages. */ private Metadata.Trailers stashedTrailers; @@ -58,13 +57,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser Preconditions.checkState(!status.isOk() || state == WRITE_ONLY, "Cannot close with OK before client half-closes"); state = CLOSED; - if (!listenerClosed) { - // Delay calling listener.closed() until the status has been flushed to the network (which - // is notified via complete()). Since there may be large buffers involved, the actual - // completion of the RPC could be much later than this call. - gracefulStatus = status; - } } + gracefulClose = true; trailers.removeAll(Status.CODE_KEY); trailers.removeAll(Status.MESSAGE_KEY); trailers.put(Status.CODE_KEY, status.getCode()); @@ -109,24 +103,21 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser * The Stream is considered completely closed and there is no further opportunity for error. It * calls the listener's {@code closed()} if it was not already done by {@link #abortStream}. Note * that it is expected that either {@code closed()} or {@code abortStream()} was previously - * called, as otherwise there is no status to provide to the listener. + * called, since {@code closed()} is required for a normal stream closure and {@code + * abortStream()} for abnormal. */ public void complete() { - Status status; synchronized (stateLock) { if (listenerClosed) { return; } listenerClosed = true; - status = gracefulStatus; - gracefulStatus = null; } - if (status == null) { - listener.closed(new Status(Transport.Code.INTERNAL, "successful complete() without close()"), - new Metadata.Trailers()); + if (!gracefulClose) { + listener.closed(new Status(Transport.Code.INTERNAL, "successful complete() without close()")); throw new IllegalStateException("successful complete() without close()"); } - listener.closed(status, new Metadata.Trailers()); + listener.closed(Status.OK); } @Override @@ -179,7 +170,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser dispose(); } finally { if (closeListener) { - listener.closed(status, new Metadata.Trailers()); + listener.closed(status); } } } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ClientStreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ClientStreamListener.java new file mode 100644 index 0000000000..063ab7f324 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/ClientStreamListener.java @@ -0,0 +1,35 @@ +package com.google.net.stubby.newtransport; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.net.stubby.Metadata; +import com.google.net.stubby.Status; + +import javax.annotation.Nullable; + +/** An observer of client-side stream events. */ +public interface ClientStreamListener extends StreamListener { + /** + * Called upon receiving all header information from the remote end-point. + *

This method should return quickly, as the same thread may be used to process other streams. + * + * @param headers the fully buffered received headers. + * @return a processing completion future, or {@code null} to indicate that processing of the + * headers is immediately complete. + */ + @Nullable + ListenableFuture headersRead(Metadata.Headers headers); + + /** + * Called when the stream is fully closed. {@link + * com.google.net.stubby.transport.Transport.Code#OK} is the only status code that is guaranteed + * to have been sent from the remote server. Any other status code may have been caused by + * abnormal stream termination. This is guaranteed to always be the final call on a listener. No + * further callbacks will be issued. + * + *

This method should return quickly, as the same thread may be used to process other streams. + * + * @param status details about the remote closure + * @param trailers trailing metadata + */ + void closed(Status status, Metadata.Trailers trailers); +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java index a596c6da54..418cf6e9ef 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/ClientTransport.java @@ -19,8 +19,8 @@ public interface ClientTransport extends Service { * TODO(user): Consider also throwing for stopping. *

* This method returns immediately and does not wait for any validation of the request. If - * creation fails for any reason, {@link StreamListener#closed} will be called to provide the - * error information. Any sent messages for this stream will be buffered until creation has + * creation fails for any reason, {@link ClientStreamListener#closed} will be called to provide + * the error information. Any sent messages for this stream will be buffered until creation has * completed (either successfully or unsuccessfully). * * @param method the descriptor of the remote method to be called for this stream. @@ -30,5 +30,5 @@ public interface ClientTransport extends Service { */ ClientStream newStream(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener); + ClientStreamListener listener); } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java deleted file mode 100644 index 162d662dd4..0000000000 --- a/core/src/main/java/com/google/net/stubby/newtransport/ForwardingStreamListener.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.google.net.stubby.newtransport; - -import com.google.common.util.concurrent.ListenableFuture; -import com.google.net.stubby.Metadata; -import com.google.net.stubby.Status; - -import java.io.InputStream; - -/** - * A decorator around another {@link StreamListener}. - */ -public class ForwardingStreamListener implements StreamListener { - - private final StreamListener delegate; - - public ForwardingStreamListener(StreamListener delegate) { - this.delegate = delegate; - } - - @Override - public ListenableFuture headersRead(Metadata.Headers headers) { - return delegate.headersRead(headers); - } - - @Override - public ListenableFuture messageRead(InputStream message, int length) { - return delegate.messageRead(message, length); - } - - @Override - public void closed(Status status, Metadata.Trailers trailers) { - delegate.closed(status, trailers); - } -} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/ServerStreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/ServerStreamListener.java index 9ef1417558..1af44763d5 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/ServerStreamListener.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/ServerStreamListener.java @@ -11,4 +11,19 @@ public interface ServerStreamListener extends StreamListener { *

This method should return quickly, as the same thread may be used to process other streams. */ void halfClosed(); + + /** + * Called when the stream is fully closed. A status code of {@link + * com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the stream. + * Any other value implies abnormal termination. Since clients cannot send status, the passed + * status is always library-generated and only is concerned with transport-level stream shutdown + * (the call itself may have had a failing status, but if the stream terminated cleanly with the + * status appearing to have been sent, then the passed status here would be OK). This is + * guaranteed to always be the final call on a listener. No further callbacks will be issued. + * + *

This method should return quickly, as the same thread may be used to process other streams. + * + * @param status details about the remote closure + */ + void closed(Status status); } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java index e4b9a7b07e..f9ef2610b1 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/StreamListener.java @@ -1,8 +1,6 @@ package com.google.net.stubby.newtransport; import com.google.common.util.concurrent.ListenableFuture; -import com.google.net.stubby.Metadata; -import com.google.net.stubby.Status; import java.io.InputStream; @@ -13,18 +11,6 @@ import javax.annotation.Nullable; * time. */ public interface StreamListener { - - /** - * Called upon receiving all header information from the remote end-point. - *

This method should return quickly, as the same thread may be used to process other streams. - * - * @param headers the fully buffered received headers. - * @return a processing completion future, or {@code null} to indicate that processing of the - * headers is immediately complete. - */ - @Nullable - ListenableFuture headersRead(Metadata.Headers headers); - /** * Called upon receiving a message from the remote end-point. The {@link InputStream} is * non-blocking and contains the entire message. @@ -46,17 +32,4 @@ public interface StreamListener { */ @Nullable ListenableFuture messageRead(InputStream message, int length); - - /** - * Called when the stream is fully closed. A status code of {@link - * com.google.net.stubby.transport.Transport.Code#OK} implies normal termination of the stream. - * Any other value implies abnormal termination. This is guaranteed to always be the final call on - * a listener. No further callbacks will be issued. - * - *

This method should return quickly, as the same thread may be used to process other streams. - * - * @param status details about the remote closure - * @param trailers trailing metadata - */ - void closed(Status status, Metadata.Trailers trailers); } diff --git a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java index 399840f986..cbb0666919 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/http/HttpClientTransport.java @@ -13,8 +13,8 @@ import com.google.net.stubby.Status; import com.google.net.stubby.newtransport.AbstractClientStream; import com.google.net.stubby.newtransport.AbstractClientTransport; import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.InputStreamDeframer; -import com.google.net.stubby.newtransport.StreamListener; import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.transport.Transport; @@ -44,7 +44,7 @@ public class HttpClientTransport extends AbstractClientTransport { @Override protected ClientStream newStreamInternal(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener) { + ClientStreamListener listener) { URI uri = baseUri.resolve(method.getName()); HttpClientStream stream = new HttpClientStream(uri, headers.serializeAscii(), listener); synchronized (streams) { @@ -83,7 +83,7 @@ public class HttpClientTransport extends AbstractClientTransport { final DataOutputStream outputStream; boolean connected; - HttpClientStream(URI uri, String[] headers, StreamListener listener) { + HttpClientStream(URI uri, String[] headers, ClientStreamListener listener) { super(listener); try { diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java index 1d990c3612..7c1ff34f29 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientStream.java @@ -8,10 +8,10 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; import com.google.net.stubby.newtransport.AbstractClientStream; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.GrpcDeframer; import com.google.net.stubby.newtransport.HttpUtil; import com.google.net.stubby.newtransport.MessageDeframer2; -import com.google.net.stubby.newtransport.StreamListener; import com.google.net.stubby.transport.Transport; import io.netty.buffer.ByteBuf; @@ -40,7 +40,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream { private boolean isGrpcResponse; private StringBuilder nonGrpcErrorMessage = new StringBuilder(); - NettyClientStream(StreamListener listener, Channel channel, + NettyClientStream(ClientStreamListener listener, Channel channel, DefaultHttp2InboundFlowController inboundFlow) { super(listener); this.channel = Preconditions.checkNotNull(channel, "channel"); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java index 3e24f9ba6c..9762968c8d 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/netty/NettyClientTransport.java @@ -7,8 +7,8 @@ import com.google.net.stubby.Metadata; import com.google.net.stubby.MethodDescriptor; import com.google.net.stubby.newtransport.AbstractClientTransport; import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientTransport; -import com.google.net.stubby.newtransport.StreamListener; import com.google.net.stubby.newtransport.netty.NettyClientTransportFactory.NegotiationType; import com.google.net.stubby.testing.utils.ssl.SslContextFactory; @@ -89,7 +89,7 @@ class NettyClientTransport extends AbstractClientTransport { @Override protected ClientStream newStreamInternal(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener) { + ClientStreamListener listener) { // Create the stream. NettyClientStream stream = new NettyClientStream(listener, channel, handler.inboundFlow()); diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java index 852591f0ea..65940b2e67 100644 --- a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java +++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java @@ -10,9 +10,9 @@ import com.google.net.stubby.Status; import com.google.net.stubby.newtransport.AbstractClientStream; import com.google.net.stubby.newtransport.AbstractClientTransport; import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientTransport; import com.google.net.stubby.newtransport.InputStreamDeframer; -import com.google.net.stubby.newtransport.StreamListener; import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport.Code; @@ -129,7 +129,7 @@ public class OkHttpClientTransport extends AbstractClientTransport { @Override protected ClientStream newStreamInternal(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener) { + ClientStreamListener listener) { return new OkHttpClientStream(method, headers, listener); } @@ -403,7 +403,7 @@ public class OkHttpClientTransport extends AbstractClientTransport { int unacknowledgedBytesRead; OkHttpClientStream(MethodDescriptor method, Metadata.Headers headers, - StreamListener listener) { + ClientStreamListener listener) { super(listener); deframer = new InputStreamDeframer(inboundMessageHandler()); synchronized (lock) { diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java index 6799422bc9..bb0e2a9a11 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyClientStreamTest.java @@ -12,6 +12,7 @@ import static org.mockito.Mockito.verify; import com.google.net.stubby.Metadata; import com.google.net.stubby.Status; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.transport.Transport; @@ -23,12 +24,20 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; /** * Tests for {@link NettyClientStream}. */ @RunWith(JUnit4.class) public class NettyClientStreamTest extends NettyStreamTestBase { + @Mock + protected ClientStreamListener listener; + + @Override + protected ClientStreamListener listener() { + return listener; + } @Test public void closeShouldSucceed() { diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java index bd5b68b6ea..9f39747be2 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerHandlerTest.java @@ -9,7 +9,6 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.notNull; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -159,7 +158,7 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase { handler.channelRead(ctx, rstStreamFrame(STREAM_ID, Http2Error.CANCEL.code())); verify(streamListener, never()).messageRead(any(InputStream.class), anyInt()); - verify(streamListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class)); + verify(streamListener).closed(Status.CANCELLED); verifyNoMoreInteractions(streamListener); } diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java index 40ce287405..fb34310d85 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyServerStreamTest.java @@ -2,8 +2,6 @@ package com.google.net.stubby.newtransport.netty; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.notNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; @@ -27,7 +25,6 @@ import org.mockito.Mock; /** Unit tests for {@link NettyServerStream}. */ @RunWith(JUnit4.class) public class NettyServerStreamTest extends NettyStreamTestBase { - @Mock protected ServerStreamListener serverListener; private Metadata.Trailers trailers = new Metadata.Trailers(); @@ -61,7 +58,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { verifyZeroInteractions(serverListener); // Sending complete. Listener gets closed() stream().complete(); - verify(serverListener).closed(eq(Status.CANCELLED), notNull(Metadata.Trailers.class)); + verify(serverListener).closed(Status.OK); assertEquals(StreamState.CLOSED, stream.state()); verifyZeroInteractions(serverListener); } @@ -80,7 +77,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { new SendGrpcFrameCommand(STREAM_ID, statusFrame(Status.OK), true)); // Sending and receiving complete. Listener gets closed() stream().complete(); - verify(serverListener).closed(eq(Status.OK), notNull(Metadata.Trailers.class)); + verify(serverListener).closed(Status.OK); verifyNoMoreInteractions(serverListener); } @@ -89,7 +86,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { Status status = new Status(Transport.Code.INTERNAL, new Throwable()); stream().abortStream(status, true); assertEquals(StreamState.CLOSED, stream.state()); - verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class)); + verify(serverListener).closed(same(status)); verify(channel).writeAndFlush(new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true)); verifyNoMoreInteractions(serverListener); } @@ -99,7 +96,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { Status status = new Status(Transport.Code.INTERNAL, new Throwable()); stream().abortStream(status, false); assertEquals(StreamState.CLOSED, stream.state()); - verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class)); + verify(serverListener).closed(same(status)); verify(channel, never()).writeAndFlush( new SendGrpcFrameCommand(STREAM_ID, statusFrame(status), true)); verifyNoMoreInteractions(serverListener); @@ -114,7 +111,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase { verify(serverListener).halfClosed(); // Abort stream().abortStream(status, true); - verify(serverListener).closed(same(status), notNull(Metadata.Trailers.class)); + verify(serverListener).closed(same(status)); assertEquals(StreamState.CLOSED, stream.state()); verifyNoMoreInteractions(serverListener); } diff --git a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java index 5b737fe7ab..81725f2e5b 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/netty/NettyStreamTestBase.java @@ -66,9 +66,6 @@ public abstract class NettyStreamTestBase { @Mock protected ChannelFuture future; - @Mock - protected StreamListener listener; - @Mock protected Runnable accepted; @@ -137,9 +134,7 @@ public abstract class NettyStreamTestBase { protected abstract NettyStream createStream(); - protected StreamListener listener() { - return listener; - } + protected abstract StreamListener listener(); private String toString(InputStream in) throws Exception { byte[] bytes = new byte[in.available()]; diff --git a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java index 630518d43a..0e9ad60e9b 100644 --- a/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java +++ b/core/src/test/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportTest.java @@ -17,7 +17,7 @@ import com.google.common.util.concurrent.Service; import com.google.net.stubby.Metadata; import com.google.net.stubby.MethodDescriptor; import com.google.net.stubby.Status; -import com.google.net.stubby.newtransport.StreamListener; +import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.ClientFrameHandler; import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.OkHttpClientStream; import com.google.net.stubby.transport.Transport; @@ -437,7 +437,7 @@ public class OkHttpClientTransportTest { } } - private static class MockStreamListener implements StreamListener { + private static class MockStreamListener implements ClientStreamListener { Status status; CountDownLatch closed = new CountDownLatch(1); ArrayList messages = new ArrayList();