diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/AsyncFrameWriter.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/AsyncFrameWriter.java new file mode 100644 index 0000000000..63226c3add --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/AsyncFrameWriter.java @@ -0,0 +1,200 @@ +package com.google.net.stubby.newtransport.okhttp; + +import com.google.common.util.concurrent.SerializingExecutor; +import com.google.common.util.concurrent.Service; + +import com.squareup.okhttp.internal.spdy.ErrorCode; +import com.squareup.okhttp.internal.spdy.FrameWriter; +import com.squareup.okhttp.internal.spdy.Header; +import com.squareup.okhttp.internal.spdy.Settings; + +import okio.Buffer; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Executor; + +class AsyncFrameWriter implements FrameWriter { + private final FrameWriter frameWriter; + private final Executor executor; + private final Service transport; + + public AsyncFrameWriter(FrameWriter frameWriter, Service transport, Executor executor) { + this.frameWriter = frameWriter; + this.transport = transport; + // Although writes are thread-safe, we serialize them to prevent consuming many Threads that are + // just waiting on each other. + this.executor = new SerializingExecutor(executor); + } + + @Override + public void connectionHeader() { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.connectionHeader(); + } + }); + } + + @Override + public void ackSettings() { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.ackSettings(); + } + }); + } + + @Override + public void pushPromise(final int streamId, final int promisedStreamId, + final List
requestHeaders) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.pushPromise(streamId, promisedStreamId, requestHeaders); + } + }); + } + + @Override + public void flush() { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.flush(); + } + }); + } + + @Override + public void synStream(final boolean outFinished, final boolean inFinished, final int streamId, + final int associatedStreamId, final int priority, final int slot, + final List
headerBlock) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.synStream(outFinished, inFinished, streamId, associatedStreamId, priority, + slot, headerBlock); + } + }); + } + + @Override + public void synReply(final boolean outFinished, final int streamId, + final List
headerBlock) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.synReply(outFinished, streamId, headerBlock); + } + }); + } + + @Override + public void headers(final int streamId, final List
headerBlock) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.headers(streamId, headerBlock); + } + }); + } + + @Override + public void rstStream(final int streamId, final ErrorCode errorCode) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.rstStream(streamId, errorCode); + } + }); + } + + @Override + public void data(final boolean outFinished, final int streamId, final Buffer source, + final int byteCount) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.data(outFinished, streamId, source, byteCount); + } + }); + } + + @Override + public void data(final boolean outFinished, final int streamId, final Buffer source) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.data(outFinished, streamId, source); + } + }); + } + + @Override + public void settings(final Settings okHttpSettings) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.settings(okHttpSettings); + } + }); + } + + @Override + public void ping(final boolean ack, final int payload1, final int payload2) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.ping(ack, payload1, payload2); + } + }); + } + + @Override + public void goAway(final int lastGoodStreamId, final ErrorCode errorCode, + final byte[] debugData) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.goAway(lastGoodStreamId, errorCode, debugData); + } + }); + } + + @Override + public void windowUpdate(final int streamId, final long windowSizeIncrement) { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.windowUpdate(streamId, windowSizeIncrement); + } + }); + } + + @Override + public void close() { + executor.execute(new WriteRunnable() { + @Override + public void doRun() throws IOException { + frameWriter.close(); + } + }); + } + + private abstract class WriteRunnable implements Runnable { + @Override + public final void run() { + try { + doRun(); + } catch (IOException ex) { + transport.stopAsync(); + throw new RuntimeException(ex); + } + } + + public abstract void doRun() throws IOException; + } +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java new file mode 100644 index 0000000000..b60897b082 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransport.java @@ -0,0 +1,333 @@ +package com.google.net.stubby.newtransport.okhttp; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.ByteBuffers; +import com.google.common.io.ByteStreams; +import com.google.net.stubby.MethodDescriptor; +import com.google.net.stubby.Status; +import com.google.net.stubby.http2.okhttp.Headers; +import com.google.net.stubby.newtransport.AbstractClientTransport; +import com.google.net.stubby.newtransport.AbstractStream; +import com.google.net.stubby.newtransport.ClientStream; +import com.google.net.stubby.newtransport.ClientTransport; +import com.google.net.stubby.newtransport.InputStreamDeframer; +import com.google.net.stubby.newtransport.StreamListener; +import com.google.net.stubby.transport.Transport; +import com.google.net.stubby.transport.Transport.Code; + +import com.squareup.okhttp.internal.spdy.ErrorCode; +import com.squareup.okhttp.internal.spdy.FrameReader; +import com.squareup.okhttp.internal.spdy.Header; +import com.squareup.okhttp.internal.spdy.HeadersMode; +import com.squareup.okhttp.internal.spdy.Http20Draft10; +import com.squareup.okhttp.internal.spdy.Settings; +import com.squareup.okhttp.internal.spdy.Variant; + +import okio.ByteString; +import okio.BufferedSink; +import okio.BufferedSource; +import okio.Okio; +import okio.Buffer; + +import java.io.IOException; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import javax.annotation.concurrent.GuardedBy; + +/** + * A okhttp-based {@link ClientTransport} implementation. + */ +public class OkHttpClientTransport extends AbstractClientTransport { + /** The default initial window size in HTTP/2 is 64 KiB for the stream and connection. */ + static final int DEFAULT_INITIAL_WINDOW_SIZE = 64 * 1024; + + private static final ImmutableMap ERROR_CODE_TO_STATUS = ImmutableMap + .builder() + .put(ErrorCode.NO_ERROR, Status.OK) + .put(ErrorCode.PROTOCOL_ERROR, new Status(Transport.Code.INTERNAL, "Protocol error")) + .put(ErrorCode.INVALID_STREAM, new Status(Transport.Code.INTERNAL, "Invalid stream")) + .put(ErrorCode.UNSUPPORTED_VERSION, + new Status(Transport.Code.INTERNAL, "Unsupported version")) + .put(ErrorCode.STREAM_IN_USE, new Status(Transport.Code.INTERNAL, "Stream in use")) + .put(ErrorCode.STREAM_ALREADY_CLOSED, + new Status(Transport.Code.INTERNAL, "Stream already closed")) + .put(ErrorCode.INTERNAL_ERROR, new Status(Transport.Code.INTERNAL, "Internal error")) + .put(ErrorCode.FLOW_CONTROL_ERROR, new Status(Transport.Code.INTERNAL, "Flow control error")) + .put(ErrorCode.STREAM_CLOSED, new Status(Transport.Code.INTERNAL, "Stream closed")) + .put(ErrorCode.FRAME_TOO_LARGE, new Status(Transport.Code.INTERNAL, "Frame too large")) + .put(ErrorCode.REFUSED_STREAM, new Status(Transport.Code.INTERNAL, "Refused stream")) + .put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled")) + .put(ErrorCode.COMPRESSION_ERROR, new Status(Transport.Code.INTERNAL, "Compression error")) + .put(ErrorCode.INVALID_CREDENTIALS, + new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials")) + .build(); + + private final String host; + private final int port; + private FrameReader frameReader; + private AsyncFrameWriter frameWriter; + @GuardedBy("this") + private int nextStreamId; + private final Map streams = + Collections.synchronizedMap(new HashMap()); + private final ExecutorService executor = Executors.newCachedThreadPool(); + private int unacknowledgedBytesRead; + + public OkHttpClientTransport(String host, int port) { + this.host = host; + this.port = port; + // Client initiated streams are odd, server initiated ones are even. Server should not need to + // use it. We start clients at 3 to avoid conflicting with HTTP negotiation. + nextStreamId = 3; + } + + @Override + protected ClientStream newStreamInternal(MethodDescriptor method, StreamListener listener) { + return new OkHttpClientStream(method, listener); + } + + @Override + protected void doStart() { + BufferedSource source; + BufferedSink sink; + try { + Socket socket = new Socket(host, port); + // TODO(user): use SpdyConnection. + source = Okio.buffer(Okio.source(socket)); + sink = Okio.buffer(Okio.sink(socket)); + } catch (IOException e) { + throw new RuntimeException(e); + } + Variant variant = new Http20Draft10(); + frameReader = variant.newReader(source, true); + frameWriter = new AsyncFrameWriter(variant.newWriter(sink, true), this, executor); + + executor.execute(new ClientFrameHandler()); + notifyStarted(); + } + + @Override + protected void doStop() { + closeAllStreams(new Status(Code.INTERNAL, "Transport stopped")); + frameWriter.close(); + try { + frameReader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + executor.shutdown(); + notifyStopped(); + } + + /** + * Close and remove all streams. + */ + private void closeAllStreams(Status status) { + Collection streamsCopy; + synchronized (streams) { + streamsCopy = streams.values(); + streams.clear(); + } + for (OkHttpClientStream stream : streamsCopy) { + stream.setStatus(status); + } + } + + /** + * Called when a HTTP2 stream is closed. + * + *

Return false if the stream has already finished. + */ + private boolean finishStream(int streamId, Status status) { + OkHttpClientStream stream; + stream = streams.remove(streamId); + if (stream != null) { + // This is mainly for failed streams, for successfully finished streams, it's a no-op. + stream.setStatus(status); + return true; + } + return false; + } + + /** + * Runnable which reads frames and dispatches them to in flight calls + */ + private class ClientFrameHandler implements FrameReader.Handler, Runnable { + private ClientFrameHandler() {} + + @Override + public void run() { + String threadName = Thread.currentThread().getName(); + Thread.currentThread().setName("OkHttpClientTransport"); + try { + // Read until the underlying socket closes. + while (frameReader.nextFrame(this)) { + } + } catch (IOException ioe) { + ioe.printStackTrace(); + closeAllStreams(new Status(Code.INTERNAL, ioe.getMessage())); + } finally { + // Restore the original thread name. + Thread.currentThread().setName(threadName); + } + } + + /** + * Handle a HTTP2 DATA frame + */ + @Override + public void data(boolean inFinished, int streamId, BufferedSource in, int length) + throws IOException { + final OkHttpClientStream stream; + stream = streams.get(streamId); + if (stream == null) { + frameWriter.rstStream(streamId, ErrorCode.INVALID_STREAM); + return; + } + InputStreamDeframer deframer = stream.getDeframer(); + + // Wait until the frame is complete. + in.require(length); + + deframer.deliverFrame(ByteStreams.limit(in.inputStream(), length), inFinished); + unacknowledgedBytesRead += length; + stream.unacknowledgedBytesRead += length; + if (unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) { + frameWriter.windowUpdate(0, unacknowledgedBytesRead); + unacknowledgedBytesRead = 0; + } + if (stream.unacknowledgedBytesRead >= DEFAULT_INITIAL_WINDOW_SIZE / 2) { + frameWriter.windowUpdate(streamId, stream.unacknowledgedBytesRead); + stream.unacknowledgedBytesRead = 0; + } + if (inFinished) { + finishStream(streamId, Status.OK); + } + } + + /** + * Handle HTTP2 HEADER and CONTINUATION frames + */ + @Override + public void headers(boolean outFinished, + boolean inFinished, + int streamId, + int associatedStreamId, + int priority, + List

headers, + HeadersMode headersMode) { + // TODO(user): handle received headers. + } + + @Override + public void rstStream(int streamId, ErrorCode errorCode) { + finishStream(streamId, ERROR_CODE_TO_STATUS.get(errorCode)); + } + + @Override + public void settings(boolean clearPrevious, Settings settings) { + // not impl + frameWriter.ackSettings(); + } + + @Override + public void ping(boolean ack, int payload1, int payload2) { + if (!ack) { + frameWriter.ping(true, payload1, payload2); + } + } + + @Override + public void ackSettings() { + // Do nothing currently. + } + + @Override + public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { + // TODO(user): Log here and implement the real Go away behavior: streams have + // id <= lastGoodStreamId should not be closed. + closeAllStreams(new Status(Code.UNAVAILABLE, "Go away")); + stopAsync(); + } + + @Override + public void pushPromise(int streamId, int promisedStreamId, List
requestHeaders) + throws IOException { + // TODO(user): should send SETTINGS_ENABLE_PUSH=0, then here we should reset it with + // PROTOCOL_ERROR. + frameWriter.rstStream(streamId, ErrorCode.REFUSED_STREAM); + } + + @Override + public void windowUpdate(int arg0, long arg1) { + // TODO(user): flow control. + } + + @Override + public void priority(int streamId, int priority) { + // Ignore priority change. + // TODO(user): log + } + } + + /** + * Client stream for the okhttp transport. + */ + private class OkHttpClientStream extends AbstractStream implements ClientStream { + int streamId; + final InputStreamDeframer deframer; + int unacknowledgedBytesRead; + + public OkHttpClientStream(MethodDescriptor method, StreamListener listener) { + super(listener); + Preconditions.checkState(streamId == 0, "StreamId should be 0"); + synchronized (OkHttpClientTransport.this) { + streamId = nextStreamId; + nextStreamId += 2; + streams.put(streamId, this); + frameWriter.synStream(false, false, streamId, 0, 0, 0, + Headers.createRequestHeaders(method.getName())); + } + deframer = new InputStreamDeframer(inboundMessageHandler()); + } + + public InputStreamDeframer getDeframer() { + return deframer; + } + + @Override + protected void sendFrame(ByteBuffer frame, boolean endOfStream) { + Preconditions.checkState(streamId != 0, "streamId should be set"); + Buffer buffer; + try { + // Read the data into a buffer. + // TODO(user): swap to NIO buffers or zero-copy if/when okhttp/okio supports it + buffer = new Buffer().readFrom(ByteBuffers.newConsumingInputStream(frame)); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // Write the data to the remote endpoint. + frameWriter.data(endOfStream, streamId, buffer); + frameWriter.flush(); + } + + @Override + public void cancel() { + Preconditions.checkState(streamId != 0, "streamId should be set"); + outboundPhase = Phase.STATUS; + if (finishStream(streamId, ERROR_CODE_TO_STATUS.get(ErrorCode.CANCEL))) { + frameWriter.rstStream(streamId, ErrorCode.CANCEL); + } + } + } +} diff --git a/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportFactory.java b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportFactory.java new file mode 100644 index 0000000000..93db9ac8b5 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/newtransport/okhttp/OkHttpClientTransportFactory.java @@ -0,0 +1,23 @@ +package com.google.net.stubby.newtransport.okhttp; + +import com.google.net.stubby.newtransport.ClientTransport; +import com.google.net.stubby.newtransport.ClientTransportFactory; + +/** + * Factory that manufactures instances of {@link OkHttpClientTransport}. + */ +public class OkHttpClientTransportFactory implements ClientTransportFactory { + private final String host; + private final int port; + + public OkHttpClientTransportFactory(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public ClientTransport newClientTransport() { + return new OkHttpClientTransport(host, port); + } + +}