From c638f958fcff578881da960df3ecd76509e8770a Mon Sep 17 00:00:00 2001 From: ejona Date: Tue, 17 Jun 2014 13:10:05 -0700 Subject: [PATCH] Port client tests to newtransport. Where possible, I tried to cleanly shutdown the Channel. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=69404518 --- .../com/google/net/stubby/ChannelImpl.java | 2 -- .../net/stubby/SessionClientStream.java | 2 +- .../net/stubby/SessionClientTransport.java | 35 +++++++++++++++++++ .../stubby/SessionClientTransportFactory.java | 26 -------------- .../net/stubby/http2/netty/Http2Client.java | 16 ++++++++- 5 files changed, 51 insertions(+), 30 deletions(-) create mode 100644 core/src/main/java/com/google/net/stubby/SessionClientTransport.java diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java index 013d3c4e6b..acf48615e7 100644 --- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java +++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java @@ -43,8 +43,6 @@ public final class ChannelImpl extends AbstractService implements Channel { public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) { this.transportFactory = transportFactory; this.executor = executor; - // FIXME(ejona): Remove once we have our top-level lifecycle. - startAsync(); } @Override diff --git a/core/src/main/java/com/google/net/stubby/SessionClientStream.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java index 94f8f2e5ad..ca07cf1184 100644 --- a/core/src/main/java/com/google/net/stubby/SessionClientStream.java +++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java @@ -19,7 +19,7 @@ import java.io.IOException; * new transport layer is created. */ // TODO(user): Delete this class when new transport interfaces are introduced -class SessionClientStream implements ClientStream { +public class SessionClientStream implements ClientStream { private final StreamListener listener; /** * The {@link Request} used by the stub to dispatch the call diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransport.java b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java new file mode 100644 index 0000000000..a6bfa4f227 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/SessionClientTransport.java @@ -0,0 +1,35 @@ +package com.google.net.stubby; + +import com.google.common.util.concurrent.AbstractService; +import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientTransport; +import com.google.net.stubby.newtransport.StreamListener; + +/** + * Shim between Session and Channel. Will be removed when Session is removed. + */ +public class SessionClientTransport extends AbstractService implements ClientTransport { + private final Session session; + + public SessionClientTransport(Session session) { + this.session = session; + } + + @Override + protected void doStart() { + notifyStarted(); + } + + @Override + public void doStop() { + notifyStopped(); + } + + @Override + public ClientStream newStream(MethodDescriptor method, StreamListener listener) { + final SessionClientStream stream = new SessionClientStream(listener); + Request request = session.startRequest(method.getName(), stream.responseBuilder()); + stream.start(request); + return stream; + } +} diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java index 9c32b77c55..9fe65c4e82 100644 --- a/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java +++ b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java @@ -1,11 +1,7 @@ package com.google.net.stubby; -import com.google.common.util.concurrent.AbstractService; -import com.google.net.stubby.MethodDescriptor; -import com.google.net.stubby.newtransport.ClientStream; import com.google.net.stubby.newtransport.ClientTransport; import com.google.net.stubby.newtransport.ClientTransportFactory; -import com.google.net.stubby.newtransport.StreamListener; /** * Shim between Session and Channel. Will be removed when Session is removed. @@ -23,26 +19,4 @@ public class SessionClientTransportFactory implements ClientTransportFactory { public ClientTransport newClientTransport() { return transport; } - - private static class SessionClientTransport extends AbstractService implements ClientTransport { - private final Session session; - - public SessionClientTransport(Session session) { - this.session = session; - } - - @Override - protected void doStart() {} - - @Override - public void doStop() {} - - @Override - public ClientStream newStream(MethodDescriptor method, StreamListener listener) { - final SessionClientStream stream = new SessionClientStream(listener); - Request request = session.startRequest(method.getName(), stream.responseBuilder()); - stream.start(request); - return stream; - } - } } diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java index 1fc7f59119..694c07dcec 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Client.java @@ -6,6 +6,7 @@ import com.google.net.stubby.Session; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -28,6 +29,7 @@ public class Http2Client { private final int port; private final RequestRegistry requestRegistry; private final SSLEngine sslEngine; + private Channel channel; public Http2Client(String host, int port, RequestRegistry requestRegistry) { this(host, port, requestRegistry, null); @@ -69,7 +71,8 @@ public class Http2Client { ChannelFuture channelFuture = b.connect(host, port); // Wait for the connection channelFuture.sync(); // (5) - ChannelFuture closeFuture = channelFuture.channel().closeFuture(); + channel = channelFuture.channel(); + ChannelFuture closeFuture = channel.closeFuture(); closeFuture.addListener(new WorkerCleanupListener(workerGroup)); return new Http2Session(http2Codec.getWriter(), requestRegistry); } catch (Throwable t) { @@ -78,6 +81,17 @@ public class Http2Client { } } + public void stop() { + if (channel != null && channel.isOpen()) { + try { + channel.close().get(); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + channel = null; + } + private static class WorkerCleanupListener implements GenericFutureListener> { private final EventLoopGroup workerGroup;