From f8bbc124681d99b4aa22d81d5b5086e644b3852c Mon Sep 17 00:00:00 2001 From: lryan Date: Tue, 3 Jun 2014 14:22:33 -0700 Subject: [PATCH] Remove SPDY support from GRPC Will re-create the E2E tests through GFE when we can properly initiate HTTP2 with GFE. ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=68488736 --- .../com/google/net/stubby/NoOpRequest.java | 11 + .../stubby/http2/netty/ByteBufDeframer.java | 3 - .../net/stubby/http2/netty/Http2Codec.java | 49 +++-- .../net/stubby/http2/netty/Http2Request.java | 2 +- .../net/stubby/http2/netty/Http2Response.java | 2 +- .../net/stubby/http2/netty/Http2Session.java | 2 +- .../{spdy => http2}/okhttp/Headers.java | 2 +- .../okhttp/Http2Operation.java} | 8 +- .../okhttp/Http2Request.java} | 8 +- .../okhttp/Http2Response.java} | 10 +- .../{spdy => http2}/okhttp/OkHttpSession.java | 52 ++--- .../stubby/spdy/netty/ByteBufDeframer.java | 53 ----- .../net/stubby/spdy/netty/SpdyClient.java | 100 --------- .../net/stubby/spdy/netty/SpdyCodec.java | 199 ------------------ .../net/stubby/spdy/netty/SpdyOperation.java | 78 ------- .../net/stubby/spdy/netty/SpdyRequest.java | 55 ----- .../net/stubby/spdy/netty/SpdyResponse.java | 41 ---- .../net/stubby/spdy/netty/SpdyServer.java | 75 ------- .../net/stubby/spdy/netty/SpdySession.java | 46 ---- .../stubby/transport/CompressionFramer.java | 2 +- 20 files changed, 79 insertions(+), 719 deletions(-) create mode 100644 core/src/main/java/com/google/net/stubby/NoOpRequest.java rename core/src/main/java/com/google/net/stubby/{spdy => http2}/okhttp/Headers.java (95%) rename core/src/main/java/com/google/net/stubby/{spdy/okhttp/SpdyOperation.java => http2/okhttp/Http2Operation.java} (88%) rename core/src/main/java/com/google/net/stubby/{spdy/okhttp/SpdyRequest.java => http2/okhttp/Http2Request.java} (80%) rename core/src/main/java/com/google/net/stubby/{spdy/okhttp/SpdyResponse.java => http2/okhttp/Http2Response.java} (74%) rename core/src/main/java/com/google/net/stubby/{spdy => http2}/okhttp/OkHttpSession.java (85%) delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/ByteBufDeframer.java delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/SpdyClient.java delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/SpdyCodec.java delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/SpdyOperation.java delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/SpdyRequest.java delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/SpdyResponse.java delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/SpdyServer.java delete mode 100644 core/src/main/java/com/google/net/stubby/spdy/netty/SpdySession.java diff --git a/core/src/main/java/com/google/net/stubby/NoOpRequest.java b/core/src/main/java/com/google/net/stubby/NoOpRequest.java new file mode 100644 index 0000000000..00389fe5b5 --- /dev/null +++ b/core/src/main/java/com/google/net/stubby/NoOpRequest.java @@ -0,0 +1,11 @@ +package com.google.net.stubby; + +/** + * A request that does no work. + */ +public class NoOpRequest extends AbstractRequest { + + public NoOpRequest(Response response) { + super(response); + } +} diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java b/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java index 57a703ae05..e39e8fb6d2 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/ByteBufDeframer.java @@ -15,9 +15,6 @@ import java.nio.ByteOrder; /** * Parse a sequence of {@link ByteBuf} instances that represent the frames of a GRPC call - * - * TODO(user): This is essentially a duplicate of the spdy deframer. Should find a way to - * share common code. */ public class ByteBufDeframer extends Deframer { diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java index d0ad9440e9..f2cfd7774a 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Codec.java @@ -1,9 +1,11 @@ package com.google.net.stubby.http2.netty; +import com.google.net.stubby.NoOpRequest; import com.google.net.stubby.Operation; import com.google.net.stubby.Operation.Phase; import com.google.net.stubby.Request; import com.google.net.stubby.RequestRegistry; +import com.google.net.stubby.Response; import com.google.net.stubby.Session; import com.google.net.stubby.Status; import com.google.net.stubby.transport.MessageFramer; @@ -66,7 +68,8 @@ public class Http2Codec extends ChannelHandlerAdapter { } this.alloc = ctx.alloc(); Http2StreamFrame frame = (Http2StreamFrame) msg; - Request operation = requestRegistry.lookup(frame.getStreamId()); + int streamId = frame.getStreamId(); + Request operation = requestRegistry.lookup(streamId); try { if (operation == null) { if (client) { @@ -75,8 +78,8 @@ public class Http2Codec extends ChannelHandlerAdapter { } else { operation = serverStart(ctx, frame); if (operation == null) { - // Unknown operation, refuse the stream - sendRstStream(ctx, frame.getStreamId(), Http2Error.REFUSED_STREAM); + closeWithError(new NoOpRequest(createResponse(ctx, streamId).build()), + new Status(Code.NOT_FOUND)); } } } else { @@ -84,22 +87,33 @@ public class Http2Codec extends ChannelHandlerAdapter { progress(client ? operation.getResponse() : operation, frame); } } catch (Throwable e) { - closeWithInternalError(operation, e); - sendRstStream(ctx, frame.getStreamId(), Http2Error.INTERNAL_ERROR); - throw e; + Status status = Status.fromThrowable(e); + if (operation == null) { + // Create a no-op request so we can use common error handling + operation = new NoOpRequest(createResponse(ctx, streamId).build()); + } + closeWithError(operation, status); + } + } + + + /** + * Closes the request and its associated response with an internal error. + */ + private void closeWithError(Request request, Status status) { + try { + request.close(status); + request.getResponse().close(status); + } finally { + requestRegistry.remove(request.getId()); } } /** - * Closes the request and its associate response with an internal error. + * Create an HTTP2 response handler */ - private void closeWithInternalError(Request request, Throwable e) { - if (request != null) { - Status status = new Status(Code.INTERNAL, e); - request.close(status); - request.getResponse().close(status); - requestRegistry.remove(request.getId()); - } + private Response.ResponseBuilder createResponse(ChannelHandlerContext ctx, int streamId) { + return Http2Response.builder(streamId, ctx.channel(), new MessageFramer(4096)); } /** @@ -130,8 +144,7 @@ public class Http2Codec extends ChannelHandlerAdapter { return null; } // Create the operation and bind a HTTP2 response operation - Request op = session.startRequest(operationName, - Http2Response.builder(frame.getStreamId(), ctx.channel(), new MessageFramer(4096))); + Request op = session.startRequest(operationName, createResponse(ctx, frame.getStreamId())); if (op == null) { return null; } @@ -157,11 +170,11 @@ public class Http2Codec extends ChannelHandlerAdapter { progressPayload(operation, (Http2DataFrame) frame); } else if (frame instanceof Http2RstStreamFrame) { // Cancel - operation.close(null); + operation.close(new Status(Code.ABORTED, "HTTP2 stream reset")); finish(operation); } else { // TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS - operation.close(null); + operation.close(Status.OK); finish(operation); } } diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java index f62449d614..af7b13a0d0 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Request.java @@ -13,7 +13,7 @@ import java.net.InetAddress; import java.net.UnknownHostException; /** - * A SPDY based implementation of {@link Request} + * A HTTP2 based implementation of {@link Request} */ class Http2Request extends Http2Operation implements Request { diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java index 5efc50f8c3..f7822ce581 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Response.java @@ -6,7 +6,7 @@ import com.google.net.stubby.transport.Framer; import io.netty.channel.Channel; /** - * A SPDY based implementation of a {@link Response}. + * A HTTP2 based implementation of a {@link Response}. */ class Http2Response extends Http2Operation implements Response { diff --git a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java index 225bb1be09..44fa751980 100644 --- a/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java +++ b/core/src/main/java/com/google/net/stubby/http2/netty/Http2Session.java @@ -14,7 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger; * An implementation of {@link Session} that can be used by clients to start * a {@link Request} */ -class Http2Session implements Session { +public class Http2Session implements Session { public static final String PROTORPC = "application/protorpc"; diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/Headers.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/Headers.java similarity index 95% rename from core/src/main/java/com/google/net/stubby/spdy/okhttp/Headers.java rename to core/src/main/java/com/google/net/stubby/http2/okhttp/Headers.java index f5e176c7a9..dd62408ed4 100644 --- a/core/src/main/java/com/google/net/stubby/spdy/okhttp/Headers.java +++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/Headers.java @@ -1,4 +1,4 @@ -package com.google.net.stubby.spdy.okhttp; +package com.google.net.stubby.http2.okhttp; import com.google.common.collect.Lists; diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyOperation.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Operation.java similarity index 88% rename from core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyOperation.java rename to core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Operation.java index d82934cfb5..8c52ba887d 100644 --- a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyOperation.java +++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Operation.java @@ -1,4 +1,4 @@ -package com.google.net.stubby.spdy.okhttp; +package com.google.net.stubby.http2.okhttp; import com.google.common.io.ByteBuffers; import com.google.net.stubby.AbstractOperation; @@ -16,14 +16,14 @@ import java.io.InputStream; import java.nio.ByteBuffer; /** - * Base implementation of {@link Operation} that writes SPDY frames + * Base implementation of {@link Operation} that writes HTTP2 frames */ -abstract class SpdyOperation extends AbstractOperation implements Framer.Sink { +abstract class Http2Operation extends AbstractOperation implements Framer.Sink { protected final Framer framer; private final FrameWriter frameWriter; - SpdyOperation(int id, FrameWriter frameWriter, Framer framer) { + Http2Operation(int id, FrameWriter frameWriter, Framer framer) { super(id); this.frameWriter = frameWriter; this.framer = framer; diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyRequest.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java similarity index 80% rename from core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyRequest.java rename to core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java index 198d614892..82271f231b 100644 --- a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyRequest.java +++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Request.java @@ -1,4 +1,4 @@ -package com.google.net.stubby.spdy.okhttp; +package com.google.net.stubby.http2.okhttp; import com.google.net.stubby.Request; import com.google.net.stubby.RequestRegistry; @@ -12,12 +12,12 @@ import com.squareup.okhttp.internal.spdy.FrameWriter; import java.io.IOException; /** - * A SPDY based implementation of {@link Request} + * A HTTP2 based implementation of {@link Request} */ -public class SpdyRequest extends SpdyOperation implements Request { +public class Http2Request extends Http2Operation implements Request { private final Response response; - public SpdyRequest(FrameWriter frameWriter, String operationName, + public Http2Request(FrameWriter frameWriter, String operationName, Response response, RequestRegistry requestRegistry, Framer framer) { super(response.getId(), frameWriter, framer); diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyResponse.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Response.java similarity index 74% rename from core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyResponse.java rename to core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Response.java index 9994c4887e..1ef52569e5 100644 --- a/core/src/main/java/com/google/net/stubby/spdy/okhttp/SpdyResponse.java +++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/Http2Response.java @@ -1,4 +1,4 @@ -package com.google.net.stubby.spdy.okhttp; +package com.google.net.stubby.http2.okhttp; import com.google.net.stubby.Response; import com.google.net.stubby.Status; @@ -10,9 +10,9 @@ import com.squareup.okhttp.internal.spdy.FrameWriter; import java.io.IOException; /** - * A SPDY based implementation of a {@link Response}. + * A HTTP2 based implementation of a {@link Response}. */ -public class SpdyResponse extends SpdyOperation implements Response { +public class Http2Response extends Http2Operation implements Response { public static ResponseBuilder builder(final int id, final FrameWriter framewriter, final Framer framer) { @@ -24,12 +24,12 @@ public class SpdyResponse extends SpdyOperation implements Response { @Override public Response build() { - return new SpdyResponse(id, framewriter, framer); + return new Http2Response(id, framewriter, framer); } }; } - private SpdyResponse(int id, FrameWriter frameWriter, Framer framer) { + private Http2Response(int id, FrameWriter frameWriter, Framer framer) { super(id, frameWriter, framer); try { frameWriter.synStream(false, false, getId(), 0, 0, 0, Headers.createResponseHeaders()); diff --git a/core/src/main/java/com/google/net/stubby/spdy/okhttp/OkHttpSession.java b/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java similarity index 85% rename from core/src/main/java/com/google/net/stubby/spdy/okhttp/OkHttpSession.java rename to core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java index 8ae40bb76d..0f979e15fb 100644 --- a/core/src/main/java/com/google/net/stubby/spdy/okhttp/OkHttpSession.java +++ b/core/src/main/java/com/google/net/stubby/http2/okhttp/OkHttpSession.java @@ -1,4 +1,4 @@ -package com.google.net.stubby.spdy.okhttp; +package com.google.net.stubby.http2.okhttp; import com.google.common.collect.ImmutableMap; import com.google.common.io.ByteStreams; @@ -16,7 +16,6 @@ import com.google.net.stubby.transport.MessageFramer; import com.google.net.stubby.transport.Transport; import com.google.net.stubby.transport.Transport.Code; -import com.squareup.okhttp.Protocol; import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.FrameReader; import com.squareup.okhttp.internal.spdy.FrameWriter; @@ -24,7 +23,6 @@ import com.squareup.okhttp.internal.spdy.Header; import com.squareup.okhttp.internal.spdy.HeadersMode; import com.squareup.okhttp.internal.spdy.Http20Draft10; import com.squareup.okhttp.internal.spdy.Settings; -import com.squareup.okhttp.internal.spdy.Spdy3; import com.squareup.okhttp.internal.spdy.Variant; import okio.BufferedSink; @@ -64,19 +62,19 @@ public class OkHttpSession implements Session { new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials")) .build(); - public static Session startClient(Protocol protocol, Socket socket, - RequestRegistry requestRegistry, Executor executor) { + public static Session startClient(Socket socket, RequestRegistry requestRegistry, + Executor executor) { try { - return new OkHttpSession(protocol, socket, requestRegistry, executor); + return new OkHttpSession(socket, requestRegistry, executor); } catch (IOException ioe) { throw new RuntimeException(ioe); } } - public static Session startServer(Protocol protocol, Socket socket, Session server, - RequestRegistry requestRegistry, Executor executor) { + public static Session startServer(Socket socket, Session server, RequestRegistry requestRegistry, + Executor executor) { try { - return new OkHttpSession(protocol, socket, server, requestRegistry, executor); + return new OkHttpSession(socket, server, requestRegistry, executor); } catch (IOException ioe) { throw new RuntimeException(ioe); } @@ -93,9 +91,9 @@ public class OkHttpSession implements Session { /** * Construct a client-side session */ - private OkHttpSession(Protocol protocol, Socket socket, RequestRegistry requestRegistry, + private OkHttpSession(Socket socket, RequestRegistry requestRegistry, Executor executor) throws IOException { - Variant variant = getProtocolVariant(protocol); + Variant variant = new Http20Draft10(); // TODO(user): use Okio.buffer(Socket) countingInputStream = new CountingInputStream(socket.getInputStream()); countingOutputStream = new CountingOutputStream(socket.getOutputStream()); @@ -114,9 +112,9 @@ public class OkHttpSession implements Session { /** * Construct a server-side session */ - private OkHttpSession(Protocol protocol, Socket socket, Session server, + private OkHttpSession(Socket socket, Session server, RequestRegistry requestRegistry, Executor executor) throws IOException { - Variant variant = getProtocolVariant(protocol); + Variant variant = new Http20Draft10(); // TODO(user): use Okio.buffer(Socket) countingInputStream = new CountingInputStream(socket.getInputStream()); countingOutputStream = new CountingOutputStream(socket.getOutputStream()); @@ -137,17 +135,6 @@ public class OkHttpSession implements Session { return "in=" + countingInputStream.getCount() + ";out=" + countingOutputStream.getCount(); } - private Variant getProtocolVariant(Protocol protocol) { - switch (protocol) { - case HTTP_2: - return new Http20Draft10(); - case SPDY_3: - return new Spdy3(); - default: - throw new IllegalArgumentException("Unsupported protocol: " + protocol); - } - } - private int getNextStreamId() { // Client initiated streams are odd, server initiated ones are even // We start clients at 3 to avoid conflicting with HTTP negotiation @@ -162,9 +149,9 @@ public class OkHttpSession implements Session { public Request startRequest(String operationName, Response.ResponseBuilder responseBuilder) { int nextStreamId = getNextStreamId(); Response response = responseBuilder.build(nextStreamId); - SpdyRequest spdyRequest = new SpdyRequest(frameWriter, operationName, response, requestRegistry, - new MessageFramer(4096)); - return spdyRequest; + Http2Request request = new Http2Request(frameWriter, operationName, response, + requestRegistry, new MessageFramer(4096)); + return request; } /** @@ -219,7 +206,7 @@ public class OkHttpSession implements Session { /** - * Handle a SPDY DATA frame + * Handle a HTTP2 DATA frame */ @Override public void data(boolean inFinished, int streamId, BufferedSource in, int length) @@ -246,7 +233,7 @@ public class OkHttpSession implements Session { } /** - * Called when a SPDY stream is closed. + * Called when a HTTP2 stream is closed. */ private void finish(int streamId) { Request request = requestRegistry.remove(streamId); @@ -256,7 +243,7 @@ public class OkHttpSession implements Session { } /** - * Handle a SPDY HEADER or SYN_STREAM frame + * Handle HTTP2 HEADER & CONTINUATION frames */ @Override public void headers(boolean arg0, @@ -269,12 +256,11 @@ public class OkHttpSession implements Session { Operation op = getOperation(streamId); // Start an Operation for SYN_STREAM - if (op == null && (headersMode == HeadersMode.SPDY_SYN_STREAM - || headersMode == HeadersMode.HTTP_20_HEADERS)) { + if (op == null && headersMode == HeadersMode.HTTP_20_HEADERS) { for (Header header : headers) { if (header.name.equals(Header.TARGET_PATH)) { Request request = serverSession.startRequest(header.value.utf8(), - SpdyResponse.builder(streamId, frameWriter, new MessageFramer(4096))); + Http2Response.builder(streamId, frameWriter, new MessageFramer(4096))); requestRegistry.register(request); op = request; break; diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/ByteBufDeframer.java b/core/src/main/java/com/google/net/stubby/spdy/netty/ByteBufDeframer.java deleted file mode 100644 index 5863e8c2a7..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/ByteBufDeframer.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.net.stubby.transport.Deframer; -import com.google.net.stubby.transport.TransportFrameUtil; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufInputStream; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; - -import java.io.DataInputStream; -import java.io.IOException; -import java.nio.ByteOrder; - -/** - * Parse a sequence of {@link ByteBuf} instances that represent the frames of a GRPC call - */ -public class ByteBufDeframer extends Deframer { - - private final CompositeByteBuf buffer; - - public ByteBufDeframer() { - buffer = Unpooled.compositeBuffer(); - } - - @Override - protected DataInputStream prefix(ByteBuf frame) throws IOException { - buffer.addComponent(frame); - buffer.writerIndex(buffer.writerIndex() + frame.writerIndex() - frame.readerIndex()); - return new DataInputStream(new ByteBufInputStream(buffer)); - } - - @Override - protected int consolidate() { - buffer.consolidate(); - return buffer.readableBytes(); - } - - @Override - protected ByteBuf decompress(ByteBuf frame) throws IOException { - frame = frame.order(ByteOrder.BIG_ENDIAN); - int compressionType = frame.readUnsignedByte(); - int frameLength = frame.readUnsignedMedium(); - if (frameLength != frame.readableBytes()) { - throw new IllegalArgumentException("GRPC and buffer lengths misaligned. Frame length=" - + frameLength + ", readableBytes=" + frame.readableBytes()); - } - if (TransportFrameUtil.isNotCompressed(compressionType)) { - return frame; - } - throw new IOException("Unknown compression type " + compressionType); - } -} diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyClient.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyClient.java deleted file mode 100644 index 1b6f8cb08e..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyClient.java +++ /dev/null @@ -1,100 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.common.base.Throwables; -import com.google.net.stubby.RequestRegistry; -import com.google.net.stubby.Session; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.spdy.SpdyFrameCodec; -import io.netty.handler.codec.spdy.SpdyVersion; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.GenericFutureListener; - -import javax.annotation.Nullable; -import javax.net.ssl.SSLEngine; - -/** - * Simple client connection startup that creates a {@link SpdySession} for use - * with protocol bindings. - */ -public class SpdyClient { - private final String host; - private final int port; - private final RequestRegistry requestRegistry; - private ChannelFuture channelFuture; - private final SSLEngine sslEngine; - - public SpdyClient(String host, int port, RequestRegistry requestRegistry) { - this(host, port, requestRegistry, null); - } - - public SpdyClient(String host, int port, RequestRegistry requestRegistry, - @Nullable SSLEngine sslEngine) { - this.host = host; - this.port = port; - this.requestRegistry = requestRegistry; - this.sslEngine = sslEngine; - // TODO(user): NPN support - if (sslEngine != null) { - sslEngine.setUseClientMode(true); - } - } - - public Session startAndWait() { - EventLoopGroup workerGroup = new NioEventLoopGroup(); - try { - Bootstrap b = new Bootstrap(); // (1) - b.group(workerGroup); // (2) - b.channel(NioSocketChannel.class); // (3) - b.option(ChannelOption.SO_KEEPALIVE, true); // (4) - // TODO(user): Evaluate use of pooled allocator - b.option(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); - b.handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - if (sslEngine != null) { - // Assume TLS when using SSL - ch.pipeline().addLast(new SslHandler(sslEngine, false)); - } - ch.pipeline().addLast( - new SpdyFrameCodec(SpdyVersion.SPDY_3_1), - new SpdyCodec(requestRegistry)); - } - }); - // Start the client. - channelFuture = b.connect(host, port); - // Wait for the connection - channelFuture.sync(); // (5) - ChannelFuture closeFuture = channelFuture.channel().closeFuture(); - closeFuture.addListener(new WorkerCleanupListener(workerGroup)); - return new SpdySession(channelFuture.channel(), requestRegistry); - } catch (Throwable t) { - workerGroup.shutdownGracefully(); - throw Throwables.propagate(t); - } - } - - private static class WorkerCleanupListener - implements GenericFutureListener> { - private final EventLoopGroup workerGroup; - - public WorkerCleanupListener(EventLoopGroup workerGroup) { - this.workerGroup = workerGroup; - } - - @Override - public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { - workerGroup.shutdownGracefully(); - } - } - - -} diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyCodec.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyCodec.java deleted file mode 100644 index bd45c0aa94..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyCodec.java +++ /dev/null @@ -1,199 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.net.stubby.Operation; -import com.google.net.stubby.Operation.Phase; -import com.google.net.stubby.Request; -import com.google.net.stubby.RequestRegistry; -import com.google.net.stubby.Session; -import com.google.net.stubby.Status; -import com.google.net.stubby.transport.MessageFramer; -import com.google.net.stubby.transport.Transport; -import com.google.net.stubby.transport.Transport.Code; - -import io.netty.channel.ChannelHandlerAdapter; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.spdy.DefaultSpdyRstStreamFrame; -import io.netty.handler.codec.spdy.SpdyDataFrame; -import io.netty.handler.codec.spdy.SpdyHeaders; -import io.netty.handler.codec.spdy.SpdyHeadersFrame; -import io.netty.handler.codec.spdy.SpdyRstStreamFrame; -import io.netty.handler.codec.spdy.SpdyStreamFrame; -import io.netty.handler.codec.spdy.SpdyStreamStatus; -import io.netty.handler.codec.spdy.SpdySynStreamFrame; - -/** - * Codec used by clients and servers to interpret SPDY frames in the context of an ongoing - * request-response dialog - */ -public class SpdyCodec extends ChannelHandlerAdapter { - - private final boolean client; - private final RequestRegistry requestRegistry; - private final Session session; - - /** - * Constructor used by servers, takes a session which will receive operation events. - */ - public SpdyCodec(Session session, RequestRegistry requestRegistry) { - this.client = false; - this.session = session; - this.requestRegistry = requestRegistry; - } - - /** - * Constructor used by clients to send operations to a remote server - */ - public SpdyCodec(RequestRegistry requestRegistry) { - this.client = true; - this.session = null; - this.requestRegistry = requestRegistry; - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // Abort any active requests. - requestRegistry.drainAllRequests(new Status(Transport.Code.ABORTED)); - - super.channelInactive(ctx); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (!(msg instanceof SpdyStreamFrame)) { - return; - } - SpdyStreamFrame frame = (SpdyStreamFrame) msg; - Request operation = requestRegistry.lookup(frame.getStreamId()); - try { - if (operation == null) { - if (client) { - // For clients an operation must already exist in the registry - throw new IllegalStateException("Response operation must already be bound"); - } else { - operation = serverStart(ctx, frame); - if (operation == null) { - // Unknown operation, refuse the stream - sendRstStream(ctx, frame.getStreamId(), SpdyStreamStatus.REFUSED_STREAM); - } - } - } else { - // Consume the frame - progress(client ? operation.getResponse() : operation, frame); - } - } catch (Throwable e) { - closeWithInternalError(operation, e); - sendRstStream(ctx, frame.getStreamId(), SpdyStreamStatus.INTERNAL_ERROR); - throw e; - } - } - - /** - * Closes the request and its associate response with an internal error. - */ - private void closeWithInternalError(Request request, Throwable e) { - if (request != null) { - Status status = new Status(Code.INTERNAL, e); - request.close(status); - request.getResponse().close(status); - requestRegistry.remove(request.getId()); - } - } - - /** - * Writes the Spdy RST Stream frame to the remote endpoint, indicating a stream failure. - */ - private void sendRstStream(ChannelHandlerContext ctx, int streamId, SpdyStreamStatus status) { - DefaultSpdyRstStreamFrame frame = new DefaultSpdyRstStreamFrame(streamId, status.getCode()); - ctx.writeAndFlush(frame); - } - - /** - * Start the Request operation on the server - */ - private Request serverStart(ChannelHandlerContext ctx, SpdyStreamFrame frame) { - if (!(frame instanceof SpdySynStreamFrame)) { - // TODO(user): Better error detail to client here - return null; - } - SpdySynStreamFrame headers = (SpdySynStreamFrame) frame; - if (!SpdySession.PROTORPC.equals(headers.headers().get("content-type"))) { - return null; - } - // Use Path to specify the operation - String operationName = - normalizeOperationName(headers.headers().get(SpdyHeaders.HttpNames.PATH)); - if (operationName == null) { - return null; - } - // Create the operation and bind a SPDY response operation - Request op = session.startRequest(operationName, - SpdyResponse.builder(frame.getStreamId(), ctx.channel(), new MessageFramer(4096))); - if (op == null) { - return null; - } - requestRegistry.register(op); - // Immediately deframe the remaining headers in the frame - progressHeaders(op, (SpdyHeadersFrame) frame); - return op; - } - - // TODO(user): This needs proper namespacing support, this is currently just a hack - private static String normalizeOperationName(String path) { - return path.substring(1); - } - - - /** - * Consume a received frame - */ - private void progress(Operation operation, SpdyStreamFrame frame) { - if (frame instanceof SpdyHeadersFrame) { - progressHeaders(operation, (SpdyHeadersFrame) frame); - } else if (frame instanceof SpdyDataFrame) { - progressPayload(operation, (SpdyDataFrame) frame); - } else if (frame instanceof SpdyRstStreamFrame) { - // Cancel - operation.close(null); - finish(operation); - } else { - // TODO(user): More refined handling for PING, GO_AWAY, SYN_STREAM, WINDOW_UPDATE, SETTINGS - operation.close(null); - finish(operation); - } - } - - /** - * Consume headers in the frame. Any header starting with ';' is considered reserved - */ - private void progressHeaders(Operation operation, SpdyHeadersFrame frame) { - // TODO(user): Currently we do not do anything with SPDY headers - if (frame.isLast()) { - finish(operation); - } - } - - private void progressPayload(Operation operation, SpdyDataFrame frame) { - if (operation == null) { - return; - } - ByteBufDeframer deframer = operation.get(ByteBufDeframer.class); - if (deframer == null) { - deframer = new ByteBufDeframer(); - operation.put(ByteBufDeframer.class, deframer); - } - deframer.deframe(frame.content(), operation); - if (frame.isLast()) { - finish(operation); - } - } - - /** - * Called when a SPDY stream is closed. - */ - private void finish(Operation operation) { - requestRegistry.remove(operation.getId()); - if (operation.getPhase() != Phase.CLOSED) { - operation.close(Status.OK); - } - } -} diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyOperation.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyOperation.java deleted file mode 100644 index 74d9187663..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyOperation.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.net.stubby.AbstractOperation; -import com.google.net.stubby.Operation; -import com.google.net.stubby.Status; -import com.google.net.stubby.transport.Framer; -import com.google.net.stubby.transport.Transport; - -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.handler.codec.spdy.DefaultSpdyDataFrame; -import io.netty.handler.codec.spdy.SpdyHeadersFrame; - -import java.io.InputStream; -import java.nio.ByteBuffer; - -/** - * Base implementation of {@link Operation} that writes SPDY frames - */ -abstract class SpdyOperation extends AbstractOperation implements Framer.Sink { - - protected final Framer framer; - private final Channel channel; - - SpdyOperation(SpdyHeadersFrame headersFrame, Channel channel, Framer framer) { - super(headersFrame.getStreamId()); - this.channel = channel; - this.framer = framer; - channel.write(headersFrame); - } - - @Override - public Operation addContext(String type, InputStream message, Phase nextPhase) { - super.addContext(type, message, nextPhase); - framer.writeContext(type, message, getPhase() == Phase.CLOSED, this); - return this; - } - - @Override - public Operation addPayload(InputStream payload, Phase nextPhase) { - super.addPayload(payload, nextPhase); - framer.writePayload(payload, getPhase() == Phase.CLOSED, this); - return this; - } - - @Override - public Operation close(Status status) { - boolean alreadyClosed = getPhase() == Phase.CLOSED; - super.close(status); - if (!alreadyClosed) { - framer.writeStatus(status, true, this); - } - return this; - } - - @Override - public void deliverFrame(ByteBuffer frame, boolean endOfMessage) { - boolean closed = getPhase() == Phase.CLOSED; - DefaultSpdyDataFrame dataFrame = new DefaultSpdyDataFrame(getId(), - Unpooled.wrappedBuffer(frame)); - boolean streamClosed = closed && endOfMessage; - dataFrame.setLast(streamClosed); - try { - ChannelFuture channelFuture = channel.writeAndFlush(dataFrame); - if (!streamClosed) { - // Sync for all except the last frame to prevent buffer corruption. - channelFuture.get(); - } - } catch (Exception e) { - close(new Status(Transport.Code.INTERNAL, e)); - } finally { - if (streamClosed) { - framer.close(); - } - } - } -} diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyRequest.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyRequest.java deleted file mode 100644 index 4d67f21c6b..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyRequest.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.net.stubby.Request; -import com.google.net.stubby.Response; -import com.google.net.stubby.transport.Framer; - -import io.netty.channel.Channel; -import io.netty.handler.codec.spdy.DefaultSpdySynStreamFrame; -import io.netty.handler.codec.spdy.SpdyHeaders; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * A SPDY based implementation of {@link Request} - */ -class SpdyRequest extends SpdyOperation implements Request { - - // TODO(user): Inject this - private static final String HOST_NAME; - static { - String hostName; - try { - hostName = InetAddress.getLocalHost().getHostName(); - } catch (UnknownHostException uhe) { - hostName = "localhost"; - } - HOST_NAME = hostName; - } - - private static DefaultSpdySynStreamFrame createHeadersFrame(int id, String operationName) { - DefaultSpdySynStreamFrame headersFrame = new DefaultSpdySynStreamFrame(id, 0, (byte) 0); - headersFrame.headers().add(SpdyHeaders.HttpNames.METHOD, "POST"); - // TODO(user) Convert operation names to URIs - headersFrame.headers().add(SpdyHeaders.HttpNames.PATH, "/" + operationName); - headersFrame.headers().add(SpdyHeaders.HttpNames.VERSION, "HTTP/1.1"); - headersFrame.headers().add(SpdyHeaders.HttpNames.HOST, HOST_NAME); - headersFrame.headers().add(SpdyHeaders.HttpNames.SCHEME, "https"); - headersFrame.headers().add("content-type", SpdySession.PROTORPC); - return headersFrame; - } - - private final Response response; - - public SpdyRequest(Response response, Channel channel, String operationName, - Framer framer) { - super(createHeadersFrame(response.getId(), operationName), channel, framer); - this.response = response; - } - - @Override - public Response getResponse() { - return response; - } -} diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyResponse.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyResponse.java deleted file mode 100644 index e861ccb908..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyResponse.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.net.stubby.Response; -import com.google.net.stubby.transport.Framer; - -import io.netty.channel.Channel; -import io.netty.handler.codec.spdy.DefaultSpdySynReplyFrame; -import io.netty.handler.codec.spdy.SpdyHeaders; - -/** - * A SPDY based implementation of a {@link Response}. - */ -class SpdyResponse extends SpdyOperation implements Response { - - public static ResponseBuilder builder(final int id, final Channel channel, - final Framer framer) { - return new ResponseBuilder() { - @Override - public Response build(int id) { - throw new UnsupportedOperationException(); - } - - @Override - public Response build() { - return new SpdyResponse(id, channel, framer); - } - }; - } - - public static DefaultSpdySynReplyFrame createSynReply(int id) { - DefaultSpdySynReplyFrame synReplyFrame = new DefaultSpdySynReplyFrame(id); - // TODO(user): Need to review status code handling - synReplyFrame.headers().add(SpdyHeaders.HttpNames.STATUS, "200"); - return synReplyFrame; - } - - private SpdyResponse(int id, Channel channel, Framer framer) { - super(createSynReply(id), channel, framer); - } - -} diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyServer.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyServer.java deleted file mode 100644 index 9b566f0f2a..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdyServer.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.net.stubby.RequestRegistry; -import com.google.net.stubby.Session; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.UnpooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.codec.spdy.SpdyFrameCodec; -import io.netty.handler.codec.spdy.SpdyVersion; - -/** - * Simple server connection startup that attaches a {@link Session} implementation to - * a connection. - */ -public class SpdyServer implements Runnable { - private final int port; - private final Session session; - private final RequestRegistry operations; - private Channel channel; - - public SpdyServer(int port, Session session, RequestRegistry operations) { - this.port = port; - this.session = session; - this.operations = operations; - } - - @Override - public void run() { - EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) - EventLoopGroup workerGroup = new NioEventLoopGroup(); - try { - ServerBootstrap b = new ServerBootstrap(); // (2) - // TODO(user): Evaluate use of pooled allocator - b.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT); - b.group(bossGroup, workerGroup) - .channel(NioServerSocketChannel.class) // (3) - .childHandler(new ChannelInitializer() { // (4) - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new SpdyFrameCodec(SpdyVersion.SPDY_3_1), - new SpdyCodec(session, operations)); - } - }) - .option(ChannelOption.SO_BACKLOG, 128) // (5) - .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) - - // Bind and startContext to accept incoming connections. - ChannelFuture f = b.bind(port).sync(); // (7) - - // Wait until the server socket is closed. - channel = f.channel(); - channel.closeFuture().sync(); - } catch (Exception e) { - e.printStackTrace(); - } finally { - workerGroup.shutdownGracefully(); - bossGroup.shutdownGracefully(); - } - } - - public void stop() throws Exception { - if (channel != null) { - channel.close().get(); - } - } -} diff --git a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdySession.java b/core/src/main/java/com/google/net/stubby/spdy/netty/SpdySession.java deleted file mode 100644 index 25a77812a3..0000000000 --- a/core/src/main/java/com/google/net/stubby/spdy/netty/SpdySession.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.google.net.stubby.spdy.netty; - -import com.google.net.stubby.Request; -import com.google.net.stubby.RequestRegistry; -import com.google.net.stubby.Response; -import com.google.net.stubby.Session; -import com.google.net.stubby.transport.MessageFramer; - -import io.netty.channel.Channel; - -import java.util.concurrent.atomic.AtomicInteger; - -/** - * An implementation of {@link Session} that can be used by clients to start - * a {@link Request} - */ -public class SpdySession implements Session { - - public static final String PROTORPC = "application/protorpc"; - - private final Channel channel; - private final boolean clientSession; - private final RequestRegistry requestRegistry; - private AtomicInteger sessionId; - - public SpdySession(Channel channel, RequestRegistry requestRegistry) { - this.channel = channel; - this.clientSession = true; - this.requestRegistry = requestRegistry; - // Clients are odd numbers starting at 1, servers are even numbers stating at 2 - sessionId = new AtomicInteger(1); - } - - private int getNextStreamId() { - return (sessionId.getAndIncrement() * 2) + (clientSession ? -1 : 0); - } - - @Override - public Request startRequest(String operationName, Response.ResponseBuilder response) { - int nextSessionId = getNextStreamId(); - Request operation = new SpdyRequest(response.build(nextSessionId), channel, operationName, - new MessageFramer(4096)); - requestRegistry.register(operation); - return operation; - } -} diff --git a/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java b/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java index 3cb292ea0a..bd2fb57cf9 100644 --- a/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java +++ b/core/src/main/java/com/google/net/stubby/transport/CompressionFramer.java @@ -15,7 +15,7 @@ import java.util.logging.Logger; import java.util.zip.Deflater; /** - * Compression framer for SPDY and HTTP/2 transport frames, for use in both compression and + * Compression framer for HTTP/2 transport frames, for use in both compression and * non-compression scenarios. Receives message-stream as input. It is able to change compression * configuration on-the-fly, but will not actually begin using the new configuration until the next * full frame.