Adding simple Transport for HTTP. Also creating abstract base classes for common stream/transport code.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=70983246
This commit is contained in:
nathanmittler 2014-07-11 14:57:47 -07:00 committed by Eric Anderson
parent e7a43e4e38
commit c0a06819b7
6 changed files with 509 additions and 266 deletions

View File

@ -0,0 +1,42 @@
package com.google.net.stubby.newtransport;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.MethodDescriptor;
/**
* Abstract base class for all {@link ClientTransport} implementations. Implements the
* {@link #newStream} method to perform a state check on the service before allowing stream
* creation.
*/
public abstract class AbstractClientTransport extends AbstractService implements ClientTransport {
@Override
public final ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(listener, "listener");
if (state() == State.STARTING) {
// Wait until the transport is running before creating the new stream.
awaitRunning();
}
if (state() != State.RUNNING) {
throw new IllegalStateException("Invalid state for creating new stream: " + state());
}
// Create the stream.
return newStreamInternal(method, listener);
}
/**
* Called by {@link #newStream} to perform the actual creation of the new {@link ClientStream}.
* This is only called after the transport has successfully transitioned to the {@code RUNNING}
* state.
*
* @param method the RPC method to be invoked on the server by the new stream.
* @param listener the listener for events on the new stream.
* @return the new stream.
*/
protected abstract ClientStream newStreamInternal(MethodDescriptor<?, ?> method,
StreamListener listener);
}

View File

@ -0,0 +1,263 @@
package com.google.net.stubby.newtransport;
import static com.google.net.stubby.newtransport.StreamState.CLOSED;
import static com.google.net.stubby.newtransport.StreamState.OPEN;
import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.net.stubby.Status;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
/**
* Abstract base class for {@link Stream} implementations.
*/
public abstract class AbstractStream implements Stream {
/**
* Indicates the phase of the GRPC stream in one direction.
*/
protected enum Phase {
CONTEXT, MESSAGE, STATUS
}
private volatile StreamState state = StreamState.OPEN;
private Status status;
private final Object stateLock = new Object();
private final Object writeLock = new Object();
private final MessageFramer framer;
private final StreamListener listener;
protected Phase inboundPhase = Phase.CONTEXT;
protected Phase outboundPhase = Phase.CONTEXT;
/**
* Handler for Framer output.
*/
private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer frame, boolean endOfStream) {
sendFrame(frame, endOfStream);
}
};
/**
* Handler for Deframer output.
*/
private final Framer inboundMessageHandler = new Framer() {
@Override
public void writeContext(String name, InputStream value, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.CONTEXT);
future = listener.contextRead(name, value, length);
} finally {
closeWhenDone(future, value);
}
}
@Override
public void writePayload(InputStream input, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.MESSAGE);
future = listener.messageRead(input, length);
} finally {
closeWhenDone(future, input);
}
}
@Override
public void writeStatus(Status status) {
inboundPhase(Phase.STATUS);
setStatus(status);
}
@Override
public void flush() {}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {}
@Override
public void dispose() {}
};
protected AbstractStream(StreamListener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
framer = new MessageFramer(outboundFrameHandler, 4096);
// No compression at the moment.
framer.setAllowCompression(false);
}
@Override
public StreamState state() {
return state;
}
@Override
public final void close() {
outboundPhase(Phase.STATUS);
synchronized (stateLock) {
state = state == OPEN ? READ_ONLY : CLOSED;
}
synchronized (writeLock) {
framer.close();
}
}
/**
* Free any resources associated with this stream. Subclass implementations must call this
* version.
*/
public void dispose() {
synchronized (writeLock) {
framer.dispose();
}
}
@Override
public final void writeContext(String name, InputStream value, int length,
@Nullable Runnable accepted) {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(value, "value");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.CONTEXT);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writeContext(name, value, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public final void writeMessage(InputStream message, int length, @Nullable Runnable accepted) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.MESSAGE);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writePayload(message, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public final void flush() {
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.flush();
}
}
}
/**
* Sets the status if not already set and notifies the stream listener that the stream was closed.
* This method must be called from the transport thread.
*
* @param newStatus the new status to set
* @return {@code} true if the status was not already set.
*/
public boolean setStatus(final Status newStatus) {
Preconditions.checkNotNull(newStatus, "newStatus");
synchronized (stateLock) {
if (status != null) {
// Disallow override of current status.
return false;
}
status = newStatus;
state = CLOSED;
}
// Invoke the observer callback.
listener.closed(newStatus);
// Free any resources.
dispose();
return true;
}
/**
* Sends an outbound frame to the server.
*
* @param frame a buffer containing the chunk of data to be sent.
* @param endOfStream if {@code true} indicates that no more data will be sent on the stream by
* this endpoint.
*/
protected abstract void sendFrame(ByteBuffer frame, boolean endOfStream);
/**
* Gets the handler for inbound messages. Subclasses must use this as the target for a
* {@link com.google.net.stubby.newtransport.Deframer}.
*/
protected final Framer inboundMessageHandler() {
return inboundMessageHandler;
}
/**
* Transitions the inbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
protected final void inboundPhase(Phase nextPhase) {
inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
}
/**
* Transitions the outbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
protected final void outboundPhase(Phase nextPhase) {
outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
}
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
}
return nextPhase;
}
/**
* If the given future is provided, closes the {@link InputStream} when it completes. Otherwise
* the {@link InputStream} is closed immediately.
*/
private static void closeWhenDone(@Nullable ListenableFuture<Void> future,
final InputStream input) {
if (future == null) {
Closeables.closeQuietly(input);
return;
}
// Close the buffer when the future completes.
future.addListener(new Runnable() {
@Override
public void run() {
Closeables.closeQuietly(input);
}
}, MoreExecutors.sameThreadExecutor());
}
}

View File

@ -0,0 +1,164 @@
package com.google.net.stubby.newtransport.http;
import static com.google.net.stubby.Status.CANCELLED;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_HEADER;
import static com.google.net.stubby.newtransport.HttpUtil.CONTENT_TYPE_PROTORPC;
import static com.google.net.stubby.newtransport.HttpUtil.HTTP_METHOD;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteBuffers;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.AbstractStream;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.InputStreamDeframer;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* A simple client-side transport for RPC-over-HTTP/1.1. All execution (including listener
* callbacks) are executed in the application thread context.
*/
public class HttpClientTransport extends AbstractClientTransport {
private final URI baseUri;
private final Set<HttpClientStream> streams =
Collections.synchronizedSet(new HashSet<HttpClientStream>());
public HttpClientTransport(URI baseUri) {
this.baseUri = Preconditions.checkNotNull(baseUri, "baseUri");
}
@Override
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
URI uri = baseUri.resolve(method.getName());
HttpClientStream stream = new HttpClientStream(uri, listener);
synchronized (streams) {
// Check for RUNNING to deal with race condition of this being executed right after doStop
// cancels all the streams.
if (state() != State.RUNNING) {
throw new IllegalStateException("Invalid state for creating new stream: " + state());
}
streams.add(stream);
return stream;
}
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
// Cancel all of the streams for this transport.
synchronized (streams) {
// Guaranteed to be in the STOPPING state here.
for (HttpClientStream stream : streams.toArray(new HttpClientStream[0])) {
stream.cancel();
}
}
notifyStopped();
}
/**
* Client stream implementation for an HTTP transport.
*/
private class HttpClientStream extends AbstractStream implements ClientStream {
final HttpURLConnection connection;
final DataOutputStream outputStream;
boolean connected;
HttpClientStream(URI uri, StreamListener listener) {
super(listener);
try {
connection = (HttpURLConnection) uri.toURL().openConnection();
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setRequestMethod(HTTP_METHOD);
connection.setRequestProperty(CONTENT_TYPE_HEADER, CONTENT_TYPE_PROTORPC);
outputStream = new DataOutputStream(connection.getOutputStream());
connected = true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void cancel() {
outboundPhase = Phase.STATUS;
if (setStatus(CANCELLED)) {
disconnect();
}
}
@Override
protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
if (state() == StreamState.CLOSED) {
// Ignore outbound frames after the stream has closed.
return;
}
try {
// Synchronizing here to protect against cancellation due to the transport shutting down.
synchronized (connection) {
// Write the data to the connection output stream.
ByteBuffers.asByteSource(frame).copyTo(outputStream);
if (endOfStream) {
// Close the output stream on this connection.
connection.getOutputStream().close();
// The request has completed so now process the response. This results in the listener's
// closed() callback being invoked since we're indicating that this is the end of the
// response stream.
//
// NOTE: Must read the response in the sending thread, since URLConnection has threading
// issues.
new InputStreamDeframer(inboundMessageHandler()).deliverFrame(
connection.getInputStream(), true);
// Close the input stream and disconnect.
connection.getInputStream().close();
disconnect();
}
}
} catch (IOException ioe) {
setStatus(new Status(Transport.Code.INTERNAL, ioe));
}
}
@Override
public void dispose() {
super.dispose();
disconnect();
}
/**
* Disconnects the HTTP connection if currently connected.
*/
private void disconnect() {
// Synchronizing since this may be called for the stream (i.e. cancel or read complete) or
// due to shutting down the transport (i.e. cancel).
synchronized (connection) {
if (connected) {
connected = false;
streams.remove(this);
connection.disconnect();
}
}
}
}
}

View File

@ -0,0 +1,27 @@
package com.google.net.stubby.newtransport.http;
import com.google.net.stubby.newtransport.ClientTransportFactory;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Factory that manufactures instances of {@link HttpClientTransport}.
*/
public class HttpClientTransportFactory implements ClientTransportFactory {
private final URI baseUri;
public HttpClientTransportFactory(String host, int port, boolean ssl) {
try {
String scheme = ssl ? "https" : "http";
baseUri = new URI(scheme, null, host, port, "/", null, null);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
@Override
public HttpClientTransport newClientTransport() {
return new HttpClientTransport(baseUri);
}
}

View File

@ -1,120 +1,33 @@
package com.google.net.stubby.newtransport.netty;
import static com.google.net.stubby.newtransport.StreamState.CLOSED;
import static com.google.net.stubby.newtransport.StreamState.OPEN;
import static com.google.net.stubby.newtransport.StreamState.READ_ONLY;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.AbstractStream;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.Deframer;
import com.google.net.stubby.newtransport.Framer;
import com.google.net.stubby.newtransport.MessageFramer;
import com.google.net.stubby.newtransport.StreamListener;
import com.google.net.stubby.newtransport.StreamState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;
/**
* Client stream for a Netty transport.
*/
class NettyClientStream implements ClientStream {
class NettyClientStream extends AbstractStream implements ClientStream {
public static final int PENDING_STREAM_ID = -1;
/**
* Indicates the phase of the GRPC stream in one direction.
*/
private enum Phase {
CONTEXT, MESSAGE, STATUS
}
/**
* Guards transition of stream state.
*/
private final Object stateLock = new Object();
/**
* Guards access to the frame writer.
*/
private final Object writeLock = new Object();
private volatile StreamState state = OPEN;
private volatile int id = PENDING_STREAM_ID;
private Status status;
private Phase inboundPhase = Phase.CONTEXT;
private Phase outboundPhase = Phase.CONTEXT;
private final StreamListener listener;
private final Channel channel;
private final Framer framer;
private final Deframer<ByteBuf> deframer;
private final Framer.Sink<ByteBuffer> outboundFrameHandler = new Framer.Sink<ByteBuffer>() {
@Override
public void deliverFrame(ByteBuffer buffer, boolean endStream) {
ByteBuf buf = toByteBuf(buffer);
send(buf, endStream, endStream);
}
};
private final Framer inboundMessageHandler = new Framer() {
@Override
public void writeContext(String name, InputStream value, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.CONTEXT);
future = listener.contextRead(name, value, length);
} finally {
closeWhenDone(future, value);
}
}
@Override
public void writePayload(InputStream input, int length) {
ListenableFuture<Void> future = null;
try {
inboundPhase(Phase.MESSAGE);
future = listener.messageRead(input, length);
} finally {
closeWhenDone(future, input);
}
}
@Override
public void writeStatus(Status status) {
inboundPhase(Phase.STATUS);
setStatus(status);
}
@Override
public void flush() {}
@Override
public boolean isClosed() {
return false;
}
@Override
public void close() {}
@Override
public void dispose() {}
};
NettyClientStream(StreamListener listener, Channel channel) {
this.listener = Preconditions.checkNotNull(listener, "listener");
super(listener);
this.channel = Preconditions.checkNotNull(channel, "channel");
this.deframer = new ByteBufDeframer(channel.alloc(), inboundMessageHandler);
this.framer = new MessageFramer(outboundFrameHandler, 4096);
this.deframer = new ByteBufDeframer(channel.alloc(), inboundMessageHandler());
}
/**
@ -128,25 +41,6 @@ class NettyClientStream implements ClientStream {
this.id = id;
}
@Override
public StreamState state() {
return state;
}
@Override
public void close() {
outboundPhase(Phase.STATUS);
// Transition the state to mark the close the local side of the stream.
synchronized (stateLock) {
state = state == OPEN ? READ_ONLY : CLOSED;
}
// Close the frame writer and send any buffered frames.
synchronized (writeLock) {
framer.close();
}
}
@Override
public void cancel() {
outboundPhase = Phase.STATUS;
@ -155,60 +49,6 @@ class NettyClientStream implements ClientStream {
channel.writeAndFlush(new CancelStreamCommand(this));
}
/**
* Free any resources associated with this stream.
*/
public void dispose() {
synchronized (writeLock) {
framer.dispose();
}
}
@Override
public void writeContext(String name, InputStream value, int length,
@Nullable final Runnable accepted) {
Preconditions.checkNotNull(name, "name");
Preconditions.checkNotNull(value, "value");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.CONTEXT);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writeContext(name, value, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public void writeMessage(InputStream message, int length, @Nullable final Runnable accepted) {
Preconditions.checkNotNull(message, "message");
Preconditions.checkArgument(length >= 0, "length must be >= 0");
outboundPhase(Phase.MESSAGE);
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.writePayload(message, length);
}
}
// TODO(user): add flow control.
if (accepted != null) {
accepted.run();
}
}
@Override
public void flush() {
synchronized (writeLock) {
if (!framer.isClosed()) {
framer.flush();
}
}
}
/**
* Called in the channel thread to process the content of an inbound DATA frame.
*
@ -219,7 +59,7 @@ class NettyClientStream implements ClientStream {
public void inboundDataReceived(ByteBuf frame, boolean endOfStream, ChannelPromise promise) {
Preconditions.checkNotNull(frame, "frame");
Preconditions.checkNotNull(promise, "promise");
if (state == CLOSED) {
if (state() == CLOSED) {
promise.setSuccess();
return;
}
@ -231,44 +71,11 @@ class NettyClientStream implements ClientStream {
promise.setSuccess();
}
/**
* Sets the status if not already set and notifies the stream listener that the stream was closed.
* This method must be called from the Netty channel thread.
*
* @param newStatus the new status to set
* @return {@code} true if the status was not already set.
*/
public boolean setStatus(final Status newStatus) {
Preconditions.checkNotNull(newStatus, "newStatus");
synchronized (stateLock) {
if (status != null) {
// Disallow override of current status.
return false;
}
status = newStatus;
state = CLOSED;
}
// Invoke the observer callback.
listener.closed(newStatus);
// Free any resources.
dispose();
return true;
}
/**
* Writes the given frame to the channel.
*
* @param data the grpc frame to be written.
* @param endStream indicates whether this is the last frame to be sent for this stream.
* @param endMessage indicates whether the data ends at a message boundary.
*/
private void send(ByteBuf data, boolean endStream, boolean endMessage) {
SendGrpcFrameCommand frame = new SendGrpcFrameCommand(this, data, endStream, endMessage);
channel.writeAndFlush(frame);
@Override
protected void sendFrame(ByteBuffer frame, boolean endOfStream) {
SendGrpcFrameCommand cmd =
new SendGrpcFrameCommand(this, toByteBuf(frame), endOfStream, endOfStream);
channel.writeAndFlush(cmd);
}
/**
@ -279,49 +86,4 @@ class NettyClientStream implements ClientStream {
buf.writeBytes(source);
return buf;
}
/**
* Transitions the inbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
private void inboundPhase(Phase nextPhase) {
inboundPhase = verifyNextPhase(inboundPhase, nextPhase);
}
/**
* Transitions the outbound phase. If the transition is disallowed, throws a
* {@link IllegalStateException}.
*/
private void outboundPhase(Phase nextPhase) {
outboundPhase = verifyNextPhase(outboundPhase, nextPhase);
}
private Phase verifyNextPhase(Phase currentPhase, Phase nextPhase) {
// Only allow forward progression.
if (nextPhase.ordinal() < currentPhase.ordinal() || currentPhase == Phase.STATUS) {
throw new IllegalStateException(
String.format("Cannot transition phase from %s to %s", currentPhase, nextPhase));
}
return nextPhase;
}
/**
* If the given future is provided, closes the {@link InputStream} when it completes. Otherwise
* the {@link InputStream} is closed immediately.
*/
private static void closeWhenDone(@Nullable ListenableFuture<Void> future,
final InputStream input) {
if (future == null) {
Closeables.closeQuietly(input);
return;
}
// Close the buffer when the future completes.
future.addListener(new Runnable() {
@Override
public void run() {
Closeables.closeQuietly(input);
}
}, MoreExecutors.sameThreadExecutor());
}
}

View File

@ -3,8 +3,8 @@ package com.google.net.stubby.newtransport.netty;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import com.google.net.stubby.MethodDescriptor;
import com.google.net.stubby.newtransport.AbstractClientTransport;
import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.StreamListener;
@ -25,7 +25,7 @@ import java.util.concurrent.ExecutionException;
/**
* A Netty-based {@link ClientTransport} implementation.
*/
class NettyClientTransport extends AbstractService implements ClientTransport {
class NettyClientTransport extends AbstractClientTransport {
private final String host;
private final int port;
@ -58,22 +58,7 @@ class NettyClientTransport extends AbstractService implements ClientTransport {
}
@Override
public ClientStream newStream(MethodDescriptor<?, ?> method, StreamListener listener) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(listener, "listener");
switch (state()) {
case STARTING:
// Wait until the transport is running before creating the new stream.
awaitRunning();
break;
case NEW:
case TERMINATED:
case FAILED:
throw new IllegalStateException("Unable to create new stream in state: " + state());
default:
break;
}
protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, StreamListener listener) {
// Create the stream.
NettyClientStream stream = new NettyClientStream(listener, channel);