diff --git a/core/src/main/java/com/google/net/stubby/ChannelImpl.java b/core/src/main/java/com/google/net/stubby/ChannelImpl.java index acf48615e7..013d3c4e6b 100644 --- a/core/src/main/java/com/google/net/stubby/ChannelImpl.java +++ b/core/src/main/java/com/google/net/stubby/ChannelImpl.java @@ -43,6 +43,8 @@ public final class ChannelImpl extends AbstractService implements Channel { public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) { this.transportFactory = transportFactory; this.executor = executor; + // FIXME(ejona): Remove once we have our top-level lifecycle. + startAsync(); } @Override diff --git a/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java b/core/src/main/java/com/google/net/stubby/SessionClientStream.java similarity index 50% rename from stub/src/main/java/com/google/net/stubby/stub/SessionCall.java rename to core/src/main/java/com/google/net/stubby/SessionClientStream.java index 6ea93810bc..94f8f2e5ad 100644 --- a/stub/src/main/java/com/google/net/stubby/stub/SessionCall.java +++ b/core/src/main/java/com/google/net/stubby/SessionClientStream.java @@ -1,72 +1,95 @@ -package com.google.net.stubby.stub; +package com.google.net.stubby; -import com.google.common.util.concurrent.SettableFuture; import com.google.net.stubby.AbstractResponse; -import com.google.net.stubby.Call; -import com.google.net.stubby.MethodDescriptor; import com.google.net.stubby.Operation; import com.google.net.stubby.Request; import com.google.net.stubby.Response; import com.google.net.stubby.Session; import com.google.net.stubby.Status; +import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.StreamListener; +import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.transport.Transport; import java.io.InputStream; +import java.io.IOException; /** * A temporary shim layer between the new (Channel) and the old (Session). Will go away when the * new transport layer is created. */ // TODO(user): Delete this class when new transport interfaces are introduced -public class SessionCall extends Call { +class SessionClientStream implements ClientStream { + private final StreamListener listener; /** * The {@link Request} used by the stub to dispatch the call */ private Request request; + private Response response; - private Listener responseListener; - - private final MethodDescriptor methodDescriptor; - private final Session session; - - protected SessionCall(MethodDescriptor methodDescriptor, Session session) { - // This will go away when we introduce new transport API.... nothing to see here - this.methodDescriptor = methodDescriptor; - this.session = session; + public SessionClientStream(StreamListener listener) { + this.listener = listener; } - @Override - public void start(Listener responseListener) { - request = session.startRequest(methodDescriptor.getName(), new Response.ResponseBuilder() { + public void start(Request request) { + this.request = request; + } + + public Response.ResponseBuilder responseBuilder() { + return new Response.ResponseBuilder() { @Override public Response build(int id) { - return new CallResponse(id); + response = new SessionResponse(id); + return response; } @Override public Response build() { - return new CallResponse(-1); + response = new SessionResponse(-1); + return response; } - }); - this.responseListener = responseListener; + }; } @Override - public void sendPayload(RequestT value, SettableFuture future) { - request.addPayload(methodDescriptor.streamRequest(value), Operation.Phase.PAYLOAD); - if (future != null) { - future.set(null); + public StreamState state() { + boolean requestOpen = request.getPhase() != Operation.Phase.CLOSED; + boolean responseOpen = response.getPhase() != Operation.Phase.CLOSED; + if (requestOpen && responseOpen) { + return StreamState.OPEN; + } else if (requestOpen) { + return StreamState.WRITE_ONLY; + } else if (responseOpen) { + return StreamState.READ_ONLY; + } else { + return StreamState.CLOSED; } } @Override - public void sendContext(String name, InputStream value, SettableFuture future) { + public void close() { + request.close(Status.OK); + } + + @Override + public void writeContext(String name, InputStream value, int length, Runnable accepted) { request.addContext(name, value, Operation.Phase.HEADERS); - if (future != null) { - future.set(null); + if (accepted != null) { + accepted.run(); } } + @Override + public void writeMessage(InputStream message, int length, Runnable accepted) { + request.addPayload(message, Operation.Phase.PAYLOAD); + if (accepted != null) { + accepted.run(); + } + } + + @Override + public void flush() {} + /** * An error occurred while producing the request output. Cancel the request * and close the response stream. @@ -76,25 +99,28 @@ public class SessionCall extends Call request.close(new Status(Transport.Code.CANCELLED)); } - @Override - public void halfClose() { - request.close(Status.OK); - } - /** * Adapts the transport layer response to calls on the response observer or * recorded context state. */ - private class CallResponse extends AbstractResponse { + private class SessionResponse extends AbstractResponse { - private CallResponse(int id) { + private SessionResponse(int id) { super(id); } + private int available(InputStream is) { + try { + return is.available(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + @Override public Operation addContext(String type, InputStream message, Phase nextPhase) { try { - responseListener.onContext(type, message); + listener.contextRead(type, message, available(message)); return super.addContext(type, message, nextPhase); } finally { if (getPhase() == Phase.CLOSED) { @@ -106,7 +132,7 @@ public class SessionCall extends Call @Override public Operation addPayload(InputStream payload, Phase nextPhase) { try { - responseListener.onPayload(methodDescriptor.parseResponse(payload)); + listener.messageRead(payload, available(payload)); return super.addPayload(payload, nextPhase); } finally { if (getPhase() == Phase.CLOSED) { @@ -125,7 +151,7 @@ public class SessionCall extends Call } private void propagateClosed() { - responseListener.onClose(getStatus()); + listener.closed(getStatus()); } } } diff --git a/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java new file mode 100644 index 0000000000..9c32b77c55 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/SessionClientTransportFactory.java @@ -0,0 +1,48 @@ +package com.google.net.stubby; + +import com.google.common.util.concurrent.AbstractService; +import com.google.net.stubby.MethodDescriptor; +import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientTransport; +import com.google.net.stubby.newtransport.ClientTransportFactory; +import com.google.net.stubby.newtransport.StreamListener; + +/** + * Shim between Session and Channel. Will be removed when Session is removed. + * + *

This factory always returns the same instance, which does not adhere to the API. + */ +public class SessionClientTransportFactory implements ClientTransportFactory { + private final SessionClientTransport transport; + + public SessionClientTransportFactory(Session session) { + transport = new SessionClientTransport(session); + } + + @Override + public ClientTransport newClientTransport() { + return transport; + } + + private static class SessionClientTransport extends AbstractService implements ClientTransport { + private final Session session; + + public SessionClientTransport(Session session) { + this.session = session; + } + + @Override + protected void doStart() {} + + @Override + public void doStop() {} + + @Override + public ClientStream newStream(MethodDescriptor method, StreamListener listener) { + final SessionClientStream stream = new SessionClientStream(listener); + Request request = session.startRequest(method.getName(), stream.responseBuilder()); + stream.start(request); + return stream; + } + } +} diff --git a/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java b/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java deleted file mode 100644 index 088418e820..0000000000 --- a/stub/src/main/java/com/google/net/stubby/stub/SessionChannel.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.google.net.stubby.stub; - -import com.google.net.stubby.Channel; -import com.google.net.stubby.MethodDescriptor; -import com.google.net.stubby.Session; - -/** - * This class is a shim between Session & Channel. Will be removed when the new transport - * API is introduced. - */ -public class SessionChannel implements Channel { - private final Session session; - - public SessionChannel(Session session) { - this.session = session; - } - - @Override - public SessionCall newCall(MethodDescriptor method) { - return new SessionCall(method, session); - } -}