Port client tests to newtransport.

Where possible, I tried to cleanly shutdown the Channel.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69404518
This commit is contained in:
ejona 2014-06-17 13:10:05 -07:00 committed by Eric Anderson
parent 82c87abcbf
commit c638f958fc
5 changed files with 51 additions and 30 deletions

View File

@ -43,8 +43,6 @@ public final class ChannelImpl extends AbstractService implements Channel {
public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) {
this.transportFactory = transportFactory;
this.executor = executor;
// FIXME(ejona): Remove once we have our top-level lifecycle.
startAsync();
}
@Override

View File

@ -19,7 +19,7 @@ import java.io.IOException;
* new transport layer is created.
*/
// TODO(user): Delete this class when new transport interfaces are introduced
class SessionClientStream implements ClientStream {
public class SessionClientStream implements ClientStream {
private final StreamListener listener;
/**
* The {@link Request} used by the stub to dispatch the call

View File

@ -0,0 +1,35 @@
package com.google.net.stubby;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.StreamListener;
/**
* Shim between Session and Channel. Will be removed when Session is removed.
*/
public class SessionClientTransport extends AbstractService implements ClientTransport {
private final Session session;
public SessionClientTransport(Session session) {
this.session = session;
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
public void doStop() {
notifyStopped();
}
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
final SessionClientStream stream = new SessionClientStream(listener);
Request request = session.startRequest(method.getName(), stream.responseBuilder());
stream.start(request);
return stream;
}
}

View File

@ -1,11 +1,7 @@
package com.google.net.stubby;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.ClientTransportFactory;
import com.google.net.stubby.newtransport.StreamListener;
/**
* Shim between Session and Channel. Will be removed when Session is removed.
@ -23,26 +19,4 @@ public class SessionClientTransportFactory implements ClientTransportFactory {
public ClientTransport newClientTransport() {
return transport;
}
private static class SessionClientTransport extends AbstractService implements ClientTransport {
private final Session session;
public SessionClientTransport(Session session) {
this.session = session;
}
@Override
protected void doStart() {}
@Override
public void doStop() {}
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
final SessionClientStream stream = new SessionClientStream(listener);
Request request = session.startRequest(method.getName(), stream.responseBuilder());
stream.start(request);
return stream;
}
}
}

View File

@ -6,6 +6,7 @@ import com.google.net.stubby.Session;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -28,6 +29,7 @@ public class Http2Client {
private final int port;
private final RequestRegistry requestRegistry;
private final SSLEngine sslEngine;
private Channel channel;
public Http2Client(String host, int port, RequestRegistry requestRegistry) {
this(host, port, requestRegistry, null);
@ -69,7 +71,8 @@ public class Http2Client {
ChannelFuture channelFuture = b.connect(host, port);
// Wait for the connection
channelFuture.sync(); // (5)
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
channel = channelFuture.channel();
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new WorkerCleanupListener(workerGroup));
return new Http2Session(http2Codec.getWriter(), requestRegistry);
} catch (Throwable t) {
@ -78,6 +81,17 @@ public class Http2Client {
}
}
public void stop() {
if (channel != null && channel.isOpen()) {
try {
channel.close().get();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
channel = null;
}
private static class WorkerCleanupListener
implements GenericFutureListener<io.netty.util.concurrent.Future<Void>> {
private final EventLoopGroup workerGroup;