[2/2] Swap Operation wrapping from Channel to Transport.

Session is now (properly) implementing transport API, so ChannelImpl has some
testing.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=69074094
This commit is contained in:
ejona 2014-06-12 09:28:30 -07:00 committed by Eric Anderson
parent 4fd4845481
commit 4d642200e7
4 changed files with 114 additions and 60 deletions

View File

@ -43,6 +43,8 @@ public final class ChannelImpl extends AbstractService implements Channel {
public ChannelImpl(ClientTransportFactory transportFactory, ExecutorService executor) {
this.transportFactory = transportFactory;
this.executor = executor;
// FIXME(ejona): Remove once we have our top-level lifecycle.
startAsync();
}
@Override

View File

@ -1,72 +1,95 @@
package com.google.net.stubby.stub;
package com.google.net.stubby;
import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.AbstractResponse;
import com.google.net.stubby.Call;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Operation;
import com.google.net.stubby.Request;
import com.google.net.stubby.Response;
import com.google.net.stubby.Session;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import java.io.InputStream;
import java.io.IOException;
/**
* A temporary shim layer between the new (Channel) and the old (Session). Will go away when the
* new transport layer is created.
*/
// TODO(user): Delete this class when new transport interfaces are introduced
public class SessionCall<RequestT, ResponseT> extends Call<RequestT, ResponseT> {
class SessionClientStream implements ClientStream {
private final StreamListener listener;
/**
* The {@link Request} used by the stub to dispatch the call
*/
private Request request;
private Response response;
private Listener<ResponseT> responseListener;
private final MethodDescriptor<RequestT, ResponseT> methodDescriptor;
private final Session session;
protected SessionCall(MethodDescriptor<RequestT, ResponseT> methodDescriptor, Session session) {
// This will go away when we introduce new transport API.... nothing to see here
this.methodDescriptor = methodDescriptor;
this.session = session;
public SessionClientStream(StreamListener listener) {
this.listener = listener;
}
@Override
public void start(Listener<ResponseT> responseListener) {
request = session.startRequest(methodDescriptor.getName(), new Response.ResponseBuilder() {
public void start(Request request) {
this.request = request;
}
public Response.ResponseBuilder responseBuilder() {
return new Response.ResponseBuilder() {
@Override
public Response build(int id) {
return new CallResponse(id);
response = new SessionResponse(id);
return response;
}
@Override
public Response build() {
return new CallResponse(-1);
response = new SessionResponse(-1);
return response;
}
});
this.responseListener = responseListener;
};
}
@Override
public void sendPayload(RequestT value, SettableFuture<Void> future) {
request.addPayload(methodDescriptor.streamRequest(value), Operation.Phase.PAYLOAD);
if (future != null) {
future.set(null);
public StreamState state() {
boolean requestOpen = request.getPhase() != Operation.Phase.CLOSED;
boolean responseOpen = response.getPhase() != Operation.Phase.CLOSED;
if (requestOpen && responseOpen) {
return StreamState.OPEN;
} else if (requestOpen) {
return StreamState.WRITE_ONLY;
} else if (responseOpen) {
return StreamState.READ_ONLY;
} else {
return StreamState.CLOSED;
}
}
@Override
public void sendContext(String name, InputStream value, SettableFuture<Void> future) {
public void close() {
request.close(Status.OK);
}
@Override
public void writeContext(String name, InputStream value, int length, Runnable accepted) {
request.addContext(name, value, Operation.Phase.HEADERS);
if (future != null) {
future.set(null);
if (accepted != null) {
accepted.run();
}
}
@Override
public void writeMessage(InputStream message, int length, Runnable accepted) {
request.addPayload(message, Operation.Phase.PAYLOAD);
if (accepted != null) {
accepted.run();
}
}
@Override
public void flush() {}
/**
* An error occurred while producing the request output. Cancel the request
* and close the response stream.
@ -76,25 +99,28 @@ public class SessionCall<RequestT, ResponseT> extends Call<RequestT, ResponseT>
request.close(new Status(Transport.Code.CANCELLED));
}
@Override
public void halfClose() {
request.close(Status.OK);
}
/**
* Adapts the transport layer response to calls on the response observer or
* recorded context state.
*/
private class CallResponse extends AbstractResponse {
private class SessionResponse extends AbstractResponse {
private CallResponse(int id) {
private SessionResponse(int id) {
super(id);
}
private int available(InputStream is) {
try {
return is.available();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
public Operation addContext(String type, InputStream message, Phase nextPhase) {
try {
responseListener.onContext(type, message);
listener.contextRead(type, message, available(message));
return super.addContext(type, message, nextPhase);
} finally {
if (getPhase() == Phase.CLOSED) {
@ -106,7 +132,7 @@ public class SessionCall<RequestT, ResponseT> extends Call<RequestT, ResponseT>
@Override
public Operation addPayload(InputStream payload, Phase nextPhase) {
try {
responseListener.onPayload(methodDescriptor.parseResponse(payload));
listener.messageRead(payload, available(payload));
return super.addPayload(payload, nextPhase);
} finally {
if (getPhase() == Phase.CLOSED) {
@ -125,7 +151,7 @@ public class SessionCall<RequestT, ResponseT> extends Call<RequestT, ResponseT>
}
private void propagateClosed() {
responseListener.onClose(getStatus());
listener.closed(getStatus());
}
}
}

View File

@ -0,0 +1,48 @@
package com.google.net.stubby;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.ClientTransportFactory;
import com.google.net.stubby.newtransport.StreamListener;
/**
* Shim between Session and Channel. Will be removed when Session is removed.
*
* <p>This factory always returns the same instance, which does not adhere to the API.
*/
public class SessionClientTransportFactory implements ClientTransportFactory {
private final SessionClientTransport transport;
public SessionClientTransportFactory(Session session) {
transport = new SessionClientTransport(session);
}
@Override
public ClientTransport newClientTransport() {
return transport;
}
private static class SessionClientTransport extends AbstractService implements ClientTransport {
private final Session session;
public SessionClientTransport(Session session) {
this.session = session;
}
@Override
protected void doStart() {}
@Override
public void doStop() {}
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
final SessionClientStream stream = new SessionClientStream(listener);
Request request = session.startRequest(method.getName(), stream.responseBuilder());
stream.start(request);
return stream;
}
}
}

View File

@ -1,22 +0,0 @@
package com.google.net.stubby.stub;
import com.google.net.stubby.Channel;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Session;
/**
* This class is a shim between Session &amp; Channel. Will be removed when the new transport
* API is introduced.
*/
public class SessionChannel implements Channel {
private final Session session;
public SessionChannel(Session session) {
this.session = session;
}
@Override
public <ReqT, RespT> SessionCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method) {
return new SessionCall<ReqT, RespT>(method, session);
}
}