cronet: open-source experimental Cronet transport code

This commit is contained in:
Eric Gribkoff 2017-10-06 11:18:03 -07:00 committed by GitHub
parent b07c70a09f
commit 4d67c3d63f
7 changed files with 1125 additions and 0 deletions

46
cronet/README.md Normal file
View File

@ -0,0 +1,46 @@
gRPC Cronet Transport
========================
**EXPERIMENTAL:** *gRPC's Cronet transport is an experimental API, and is not
yet integrated with our build system. Using Cronet with gRPC requires manually
integrating the Cronet libraries and the gRPC code in this directory into your
Android application.*
This code enables using the [Chromium networking stack
(Cronet)](https://chromium.googlesource.com/chromium/src/+/master/components/cronet)
as the transport layer for gRPC on Android. This lets your Android app make
RPCs using the same networking stack as used in the Chrome browser.
Some advantages of using Cronet with gRPC:
* Bundles an OpenSSL implementation, enabling TLS connections even on older
versions of Android without additional configuration
* Robust to Android network connectivity changes
* Support for [QUIC](https://www.chromium.org/quic)
Cronet jars are not currently available on Maven. The instructions at
https://github.com/GoogleChrome/cronet-sample/blob/master/README.md describe
how to manually download the Cronet binaries and add them to your Android
application. You will also need to copy the gRPC source files contained in this
directory into your application's code, as we do not currently provide a
`grpc-cronet` dependency.
To use Cronet, you must have the `ACCESS_NETWORK_STATE` permission set in
`AndroidManifest.xml`:
```
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
```
Once the above steps are completed, you can create a gRPC Cronet channel as
follows:
```
import io.grpc.cronet.CronetChannelBuilder;
import org.chromium.net.ExperimentalCronetEngine;
...
ExperimentalCronetEngine engine =
new ExperimentalCronetEngine.Builder(context /* Android Context */).build();
ManagedChannel channel = CronetChannelBuilder.forAddress("localhost", 8080, engine).build();
```

View File

@ -0,0 +1,34 @@
/*
* Copyright 2017, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.cronet;
import io.grpc.CallOptions;
/** Call options for use with the Cronet transport. */
public final class CronetCallOptions {
private CronetCallOptions() {}
/**
* Used for attaching annotation objects to Cronet streams. When the stream finishes, the user can
* get Cronet metrics from {@link org.chromium.net.RequestFinishedInfo.Listener} with the same
* annotation object.
*
* The Object must not be null.
*/
public static final CallOptions.Key<Object> CRONET_ANNOTATION_KEY =
CallOptions.Key.of("cronet-annotation", null);
}

View File

@ -0,0 +1,179 @@
/*
* Copyright 2016, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.cronet;
import static com.google.common.base.Preconditions.checkArgument;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import io.grpc.Attributes;
import io.grpc.ExperimentalApi;
import io.grpc.NameResolver;
import io.grpc.internal.AbstractManagedChannelImplBuilder;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.SharedResourceHolder;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nullable;
import org.chromium.net.BidirectionalStream;
import org.chromium.net.CronetEngine;
import org.chromium.net.ExperimentalCronetEngine;
/** Convenience class for building channels with the cronet transport. */
@ExperimentalApi("There is no plan to make this API stable, given transport API instability")
public class CronetChannelBuilder extends
AbstractManagedChannelImplBuilder<CronetChannelBuilder> {
/** BidirectionalStream.Builder factory used for getting the gRPC BidirectionalStream. */
public static abstract class StreamBuilderFactory {
public abstract BidirectionalStream.Builder newBidirectionalStreamBuilder(
String url, BidirectionalStream.Callback callback, Executor executor);
}
/** Creates a new builder for the given server host, port and CronetEngine. */
public static CronetChannelBuilder forAddress(
String host, int port, final CronetEngine cronetEngine) {
Preconditions.checkNotNull(cronetEngine, "cronetEngine");
return new CronetChannelBuilder(
host,
port,
new StreamBuilderFactory() {
@Override
public BidirectionalStream.Builder newBidirectionalStreamBuilder(
String url, BidirectionalStream.Callback callback, Executor executor) {
return ((ExperimentalCronetEngine) cronetEngine)
.newBidirectionalStreamBuilder(url, callback, executor);
}
});
}
/** Creates a new builder for the given server host, port and StreamBuilderFactory. */
public static CronetChannelBuilder forAddress(
String host, int port, StreamBuilderFactory streamFactory) {
return new CronetChannelBuilder(host, port, streamFactory);
}
/**
* Always fails. Call {@link #forAddress(String, int, CronetEngine)} instead.
*/
public static CronetChannelBuilder forTarget(String target) {
throw new UnsupportedOperationException("call forAddress() instead");
}
/**
* Always fails. Call {@link #forAddress(String, int, CronetEngine)} instead.
*/
public static CronetChannelBuilder forAddress(String name, int port) {
throw new UnsupportedOperationException("call forAddress(String, int, CronetEngine) instead");
}
private boolean alwaysUsePut = false;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private StreamBuilderFactory streamFactory;
private CronetChannelBuilder(String host, int port, StreamBuilderFactory streamFactory) {
super(
InetSocketAddress.createUnresolved(host, port),
GrpcUtil.authorityFromHostAndPort(host, port));
this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory");
}
/**
* Sets the maximum message size allowed to be received on the channel. If not called,
* defaults to {@link io.grpc.internal.GrpcUtil#DEFAULT_MAX_MESSAGE_SIZE}.
*/
public final CronetChannelBuilder maxMessageSize(int maxMessageSize) {
checkArgument(maxMessageSize >= 0, "maxMessageSize must be >= 0");
this.maxMessageSize = maxMessageSize;
return this;
}
/**
* Sets the Cronet channel to always use PUT instead of POST. Defaults to false.
*/
public final CronetChannelBuilder alwaysUsePut(boolean enable) {
this.alwaysUsePut = enable;
return this;
}
/**
* Not supported for building cronet channel.
*/
@Override
public final CronetChannelBuilder usePlaintext(boolean skipNegotiation) {
throw new IllegalArgumentException("Plaintext not currently supported");
}
@Override
protected final ClientTransportFactory buildTransportFactory() {
return new CronetTransportFactory(streamFactory, MoreExecutors.directExecutor(),
maxMessageSize, alwaysUsePut);
}
@Override
protected Attributes getNameResolverParams() {
return Attributes.newBuilder()
.set(NameResolver.Factory.PARAMS_DEFAULT_PORT, GrpcUtil.DEFAULT_PORT_SSL).build();
}
@VisibleForTesting
static class CronetTransportFactory implements ClientTransportFactory {
private final ScheduledExecutorService timeoutService =
SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
private final Executor executor;
private final int maxMessageSize;
private final boolean alwaysUsePut;
private final StreamBuilderFactory streamFactory;
private CronetTransportFactory(
StreamBuilderFactory streamFactory,
Executor executor,
int maxMessageSize,
boolean alwaysUsePut) {
this.maxMessageSize = maxMessageSize;
this.alwaysUsePut = alwaysUsePut;
this.streamFactory = streamFactory;
this.executor = Preconditions.checkNotNull(executor, "executor");
}
@Override
public ConnectionClientTransport newClientTransport(SocketAddress addr, String authority,
@Nullable String userAgent) {
InetSocketAddress inetSocketAddr = (InetSocketAddress) addr;
return new CronetClientTransport(streamFactory, inetSocketAddr, authority, userAgent,
executor, maxMessageSize, alwaysUsePut);
}
@Override
public ScheduledExecutorService getScheduledExecutorService() {
return timeoutService;
}
@Override
public void close() {
SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, timeoutService);
}
}
}

View File

@ -0,0 +1,523 @@
/*
* Copyright 2016, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.cronet;
import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
import static io.grpc.internal.GrpcUtil.TE_HEADER;
import static io.grpc.internal.GrpcUtil.USER_AGENT_KEY;
// TODO(ericgribkoff): Consider changing from android.util.Log to java logging.
import android.util.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.InternalMetadata;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
import io.grpc.internal.AbstractClientStream;
import io.grpc.internal.Http2ClientStreamTransportState;
import io.grpc.internal.ReadableBuffers;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.WritableBuffer;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.TransportFrameUtil;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.Map;
import java.util.List;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.chromium.net.BidirectionalStream;
import org.chromium.net.CronetException;
import org.chromium.net.ExperimentalBidirectionalStream;
import org.chromium.net.UrlResponseInfo;
/**
* Client stream for the cronet transport.
*/
class CronetClientStream extends AbstractClientStream {
private static final int READ_BUFFER_CAPACITY = 4 * 1024;
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocateDirect(0);
private static final String LOG_TAG = "grpc-java-cronet";
private final String url;
private final String userAgent;
private final Executor executor;
private final Metadata headers;
private final CronetClientTransport transport;
private final Runnable startCallback;
@VisibleForTesting
final boolean idempotent;
private BidirectionalStream stream;
private final boolean delayRequestHeader;
private final Object annotation;
private final TransportState state;
private final Sink sink = new Sink();
private StreamBuilderFactory streamFactory;
CronetClientStream(
final String url,
@Nullable String userAgent,
Executor executor,
final Metadata headers,
CronetClientTransport transport,
Runnable startCallback,
Object lock,
int maxMessageSize,
boolean alwaysUsePut,
MethodDescriptor<?, ?> method,
StatsTraceContext statsTraceCtx,
CallOptions callOptions) {
super(new CronetWritableBufferAllocator(), statsTraceCtx, headers, method.isSafe());
this.url = Preconditions.checkNotNull(url, "url");
this.userAgent = Preconditions.checkNotNull(userAgent, "userAgent");
this.executor = Preconditions.checkNotNull(executor, "executor");
this.headers = Preconditions.checkNotNull(headers, "headers");
this.transport = Preconditions.checkNotNull(transport, "transport");
this.startCallback = Preconditions.checkNotNull(startCallback, "startCallback");
this.idempotent = method.isIdempotent() || alwaysUsePut;
// Only delay flushing header for unary rpcs.
this.delayRequestHeader = (method.getType() == MethodDescriptor.MethodType.UNARY);
this.annotation = callOptions.getOption(CronetCallOptions.CRONET_ANNOTATION_KEY);
this.state = new TransportState(maxMessageSize, statsTraceCtx, lock);
}
@Override
protected TransportState transportState() {
return state;
}
@Override
protected Sink abstractClientStreamSink() {
return sink;
}
@Override
public void setAuthority(String authority) {
throw new UnsupportedOperationException("Cronet does not support overriding authority");
}
class Sink implements AbstractClientStream.Sink {
@Override
public void writeHeaders(Metadata metadata, byte[] payload) {
startCallback.run();
BidirectionalStreamCallback callback = new BidirectionalStreamCallback();
String path = url;
if (payload != null) {
path += "?" + BaseEncoding.base64().encode(payload);
}
BidirectionalStream.Builder builder =
streamFactory.newBidirectionalStreamBuilder(path, callback, executor);
if (payload != null) {
builder.setHttpMethod("GET");
} else if (idempotent) {
builder.setHttpMethod("PUT");
}
if (delayRequestHeader) {
builder.delayRequestHeadersUntilFirstFlush(true);
}
if (annotation != null) {
((ExperimentalBidirectionalStream.Builder) builder).addRequestAnnotation(annotation);
}
setGrpcHeaders(builder);
stream = builder.build();
stream.start();
}
@Override
public void writeFrame(WritableBuffer buffer, boolean endOfStream, boolean flush) {
synchronized (state.lock) {
if (state.cancelSent) {
return;
}
ByteBuffer byteBuffer;
if (buffer != null) {
byteBuffer = ((CronetWritableBuffer) buffer).buffer();
byteBuffer.flip();
} else {
byteBuffer = EMPTY_BUFFER;
}
onSendingBytes(byteBuffer.remaining());
if (!state.streamReady) {
state.enqueuePendingData(new PendingData(byteBuffer, endOfStream, flush));
} else {
streamWrite(byteBuffer, endOfStream, flush);
}
}
}
@Override
public void request(final int numMessages) {
synchronized (state.lock) {
state.requestMessagesFromDeframer(numMessages);
}
}
@Override
public void cancel(Status reason) {
synchronized (state.lock) {
if (state.cancelSent) {
return;
}
state.cancelSent = true;
state.cancelReason = reason;
state.clearPendingData();
if (stream != null) {
// Will report stream finish when BidirectionalStreamCallback.onCanceled is called.
stream.cancel();
} else {
transport.finishStream(CronetClientStream.this, reason);
}
}
}
}
class TransportState extends Http2ClientStreamTransportState {
private final Object lock;
@GuardedBy("lock")
private Queue<PendingData> pendingData = new LinkedList<PendingData>();
@GuardedBy("lock")
private boolean streamReady;
@GuardedBy("lock")
private boolean cancelSent = false;
@GuardedBy("lock")
private int bytesPendingProcess;
@GuardedBy("lock")
private Status cancelReason;
@GuardedBy("lock")
private boolean readClosed;
public TransportState(int maxMessageSize, StatsTraceContext statsTraceCtx, Object lock) {
super(maxMessageSize, statsTraceCtx);
this.lock = Preconditions.checkNotNull(lock, "lock");
}
@GuardedBy("lock")
public void start(StreamBuilderFactory factory) {
streamFactory = factory;
}
@GuardedBy("lock")
@Override
protected void onStreamAllocated() {
super.onStreamAllocated();
}
@GuardedBy("lock")
@Override
protected void http2ProcessingFailed(Status status, boolean stopDelivery, Metadata trailers) {
stream.cancel();
transportReportStatus(status, stopDelivery, trailers);
}
@GuardedBy("lock")
@Override
public void deframeFailed(Throwable cause) {
http2ProcessingFailed(Status.fromThrowable(cause), true, new Metadata());
}
@Override
public void runOnTransportThread(final Runnable r) {
synchronized (lock) {
r.run();
}
}
@GuardedBy("lock")
@Override
public void bytesRead(int processedBytes) {
bytesPendingProcess -= processedBytes;
if (bytesPendingProcess == 0 && !readClosed) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "BidirectionalStream.read");
}
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
}
}
@GuardedBy("lock")
private void transportHeadersReceived(Metadata metadata, boolean endOfStream) {
if (endOfStream) {
transportTrailersReceived(metadata);
} else {
transportHeadersReceived(metadata);
}
}
@GuardedBy("lock")
private void transportDataReceived(ByteBuffer buffer, boolean endOfStream) {
bytesPendingProcess += buffer.remaining();
super.transportDataReceived(ReadableBuffers.wrap(buffer), endOfStream);
}
@GuardedBy("lock")
private void clearPendingData() {
for (PendingData data : pendingData) {
data.buffer.clear();
}
pendingData.clear();
}
@GuardedBy("lock")
private void enqueuePendingData(PendingData data) {
pendingData.add(data);
}
@GuardedBy("lock")
private void writeAllPendingData() {
for (PendingData data : pendingData) {
streamWrite(data.buffer, data.endOfStream, data.flush);
}
pendingData.clear();
}
}
// TODO(ericgribkoff): move header related method to a common place like GrpcUtil.
private static boolean isApplicationHeader(String key) {
// Don't allow reserved non HTTP/2 pseudo headers to be added
// HTTP/2 headers can not be created as keys because Header.Key disallows the ':' character.
return !CONTENT_TYPE_KEY.name().equalsIgnoreCase(key)
&& !USER_AGENT_KEY.name().equalsIgnoreCase(key)
&& !TE_HEADER.name().equalsIgnoreCase(key);
}
private void setGrpcHeaders(BidirectionalStream.Builder builder) {
// Psuedo-headers are set by cronet.
// All non-pseudo headers must come after pseudo headers.
// TODO(ericgribkoff): remove this and set it on CronetEngine after crbug.com/588204 gets fixed.
builder.addHeader(USER_AGENT_KEY.name(), userAgent);
builder.addHeader(CONTENT_TYPE_KEY.name(), GrpcUtil.CONTENT_TYPE_GRPC);
builder.addHeader("te", GrpcUtil.TE_TRAILERS);
// Now add any application-provided headers.
// TODO(ericgribkoff): make a String-based version to avoid unnecessary conversion between
// String and byte array.
byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(headers);
for (int i = 0; i < serializedHeaders.length; i += 2) {
String key = new String(serializedHeaders[i], Charset.forName("UTF-8"));
// TODO(ericgribkoff): log an error or throw an exception
if (isApplicationHeader(key)) {
String value = new String(serializedHeaders[i + 1], Charset.forName("UTF-8"));
builder.addHeader(key, value);
}
}
}
private void streamWrite(ByteBuffer buffer, boolean endOfStream, boolean flush) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "BidirectionalStream.write");
}
stream.write(buffer, endOfStream);
if (flush) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "BidirectionalStream.flush");
}
stream.flush();
}
}
private void finishStream(Status status) {
transport.finishStream(this, status);
}
@Override
public Attributes getAttributes() {
return Attributes.EMPTY;
}
class BidirectionalStreamCallback extends BidirectionalStream.Callback {
private List<Map.Entry<String, String>> trailerList;
@Override
public void onStreamReady(BidirectionalStream stream) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onStreamReady");
}
synchronized (state.lock) {
// Now that the stream is ready, call the listener's onReady callback if
// appropriate.
state.onStreamAllocated();
state.streamReady = true;
state.writeAllPendingData();
}
}
@Override
public void onResponseHeadersReceived(BidirectionalStream stream, UrlResponseInfo info) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onResponseHeadersReceived. Header=" + info.getAllHeadersAsList());
Log.v(LOG_TAG, "BidirectionalStream.read");
}
reportHeaders(info.getAllHeadersAsList(), false);
stream.read(ByteBuffer.allocateDirect(READ_BUFFER_CAPACITY));
}
@Override
public void onReadCompleted(BidirectionalStream stream, UrlResponseInfo info,
ByteBuffer buffer, boolean endOfStream) {
buffer.flip();
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onReadCompleted. Size=" + buffer.remaining());
}
synchronized (state.lock) {
state.readClosed = endOfStream;
// The endOfStream in gRPC has a different meaning so we always call transportDataReceived
// with endOfStream=false.
if (buffer.remaining() != 0) {
state.transportDataReceived(buffer, false);
}
}
if (endOfStream && trailerList != null) {
// Process trailers if we have already received any.
reportHeaders(trailerList, true);
}
}
@Override
public void onWriteCompleted(BidirectionalStream stream, UrlResponseInfo info,
ByteBuffer buffer, boolean endOfStream) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onWriteCompleted");
}
synchronized (state.lock) {
state.onSentBytes(buffer.position());
}
}
@Override
public void onResponseTrailersReceived(BidirectionalStream stream, UrlResponseInfo info,
UrlResponseInfo.HeaderBlock trailers) {
processTrailers(trailers.getAsList());
}
// We need this method because UrlResponseInfo.HeaderBlock is a final class and cannot be
// mocked.
@VisibleForTesting
void processTrailers(List<Map.Entry<String, String>> trailerList) {
this.trailerList = trailerList;
boolean readClosed;
synchronized (state.lock) {
readClosed = state.readClosed;
}
if (readClosed) {
// There's no pending onReadCompleted callback so we can report trailers now.
reportHeaders(trailerList, true);
}
// Otherwise report trailers in onReadCompleted, or onSucceeded.
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onResponseTrailersReceived. Trailer=" + trailerList.toString());
}
}
@Override
public void onSucceeded(BidirectionalStream stream, UrlResponseInfo info) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onSucceeded");
}
if (!haveTrailersBeenReported()) {
if (trailerList != null) {
reportHeaders(trailerList, true);
} else if (info != null) {
reportHeaders(info.getAllHeadersAsList(), true);
} else {
throw new AssertionError("No response header or trailer");
}
}
finishStream(toGrpcStatus(info));
}
@Override
public void onFailed(BidirectionalStream stream, UrlResponseInfo info,
CronetException error) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onFailed");
}
finishStream(Status.UNAVAILABLE.withCause(error));
}
@Override
public void onCanceled(BidirectionalStream stream, UrlResponseInfo info) {
if (Log.isLoggable(LOG_TAG, Log.VERBOSE)) {
Log.v(LOG_TAG, "onCanceled");
}
Status status;
synchronized (state.lock) {
if (state.cancelReason != null) {
status = state.cancelReason;
} else if (info != null) {
status = toGrpcStatus(info);
} else {
status = Status.CANCELLED;
}
}
finishStream(status);
}
private void reportHeaders(List<Map.Entry<String, String>> headers, boolean endOfStream) {
// TODO(ericgribkoff): create new utility methods to eliminate all these conversions
List<String> headerList = new ArrayList<String>();
for (Map.Entry<String, String> entry : headers) {
headerList.add(entry.getKey());
headerList.add(entry.getValue());
}
byte[][] headerValues = new byte[headerList.size()][];
for (int i = 0; i < headerList.size(); i += 2) {
headerValues[i] = headerList.get(i).getBytes(Charset.forName("UTF-8"));
headerValues[i + 1] = headerList.get(i + 1).getBytes(Charset.forName("UTF-8"));
}
Metadata metadata =
InternalMetadata.newMetadata(TransportFrameUtil.toRawSerializedHeaders(headerValues));
synchronized (state.lock) {
// There's no pending onReadCompleted callback so we can report trailers now.
state.transportHeadersReceived(metadata, endOfStream);
}
}
private boolean haveTrailersBeenReported() {
synchronized (state.lock) {
return trailerList != null && state.readClosed;
}
}
private Status toGrpcStatus(UrlResponseInfo info) {
return GrpcUtil.httpStatusToGrpcStatus(info.getHttpStatusCode());
}
}
private static class PendingData {
ByteBuffer buffer;
boolean endOfStream;
boolean flush;
PendingData(ByteBuffer buffer, boolean endOfStream, boolean flush) {
this.buffer = buffer;
this.endOfStream = endOfStream;
this.flush = flush;
}
}
}

View File

@ -0,0 +1,245 @@
/*
* Copyright 2016, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.cronet;
import com.google.common.base.Preconditions;
import io.grpc.Attributes;
import io.grpc.CallOptions;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.cronet.CronetChannelBuilder.StreamBuilderFactory;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.LogId;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.WithLogId;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
/**
* A cronet-based {@link ConnectionClientTransport} implementation.
*/
class CronetClientTransport implements ConnectionClientTransport, WithLogId {
private final LogId logId = LogId.allocate(getClass().getName());
private final InetSocketAddress address;
private final String authority;
private final String userAgent;
private Listener listener;
private final Object lock = new Object();
@GuardedBy("lock")
private final Set<CronetClientStream> streams =
new HashSet<CronetClientStream>();
private final Executor executor;
private final int maxMessageSize;
private final boolean alwaysUsePut;
// Indicates the transport is in go-away state: no new streams will be processed,
// but existing streams may continue.
@GuardedBy("lock")
private boolean goAway;
// Used to indicate the special phase while we are going to enter go-away state but before
// goAway is turned to true, see the comment at where this is set about why it is needed.
@GuardedBy("lock")
private boolean startedGoAway;
@GuardedBy("lock")
private Status goAwayStatus;
@GuardedBy("lock")
private boolean stopped;
@GuardedBy("lock")
// Whether this transport has started.
private boolean started;
private StreamBuilderFactory streamFactory;
CronetClientTransport(
StreamBuilderFactory streamFactory,
InetSocketAddress address,
String authority,
@Nullable String userAgent,
Executor executor,
int maxMessageSize,
boolean alwaysUsePut) {
this.address = Preconditions.checkNotNull(address, "address");
this.authority = authority;
this.userAgent = GrpcUtil.getGrpcUserAgent("cronet", userAgent);
this.maxMessageSize = maxMessageSize;
this.alwaysUsePut = alwaysUsePut;
this.executor = Preconditions.checkNotNull(executor, "executor");
this.streamFactory = Preconditions.checkNotNull(streamFactory, "streamFactory");
}
@Override
public CronetClientStream newStream(final MethodDescriptor<?, ?> method, final Metadata headers,
final CallOptions callOptions) {
Preconditions.checkNotNull(method, "method");
Preconditions.checkNotNull(headers, "headers");
final String defaultPath = "/" + method.getFullMethodName();
final String url = "https://" + authority + defaultPath;
final StatsTraceContext statsTraceCtx =
StatsTraceContext.newClientContext(callOptions, headers);
class StartCallback implements Runnable {
final CronetClientStream clientStream = new CronetClientStream(
url, userAgent, executor, headers, CronetClientTransport.this, this, lock, maxMessageSize,
alwaysUsePut, method, statsTraceCtx, callOptions);
@Override
public void run() {
synchronized (lock) {
if (goAway) {
clientStream.transportState().transportReportStatus(goAwayStatus, true, new Metadata());
} else if (started) {
startStream(clientStream);
} else {
throw new AssertionError("Transport is not started");
}
}
}
}
return new StartCallback().clientStream;
}
@GuardedBy("lock")
private void startStream(CronetClientStream stream) {
streams.add(stream);
stream.transportState().start(streamFactory);
}
@Override
public Runnable start(Listener listener) {
this.listener = Preconditions.checkNotNull(listener, "listener");
synchronized (lock) {
started = true;
}
return new Runnable() {
@Override
public void run() {
// Listener callbacks should not be called simultaneously
CronetClientTransport.this.listener.transportReady();
}
};
}
@Override
public String toString() {
return super.toString() + "(" + address + ")";
}
public void shutdown() {
shutdown(Status.UNAVAILABLE.withDescription("Transport stopped"));
}
@Override
public void shutdown(Status status) {
synchronized (lock) {
if (goAway) {
return;
}
}
startGoAway(status);
}
@Override
public void shutdownNow(Status status) {
shutdown(status);
ArrayList<CronetClientStream> streamsCopy;
synchronized (lock) {
// A copy is always necessary since cancel() can call finishStream() which calls
// streams.remove()
streamsCopy = new ArrayList<CronetClientStream>(streams);
}
for (int i = 0; i < streamsCopy.size(); i++) {
// Avoid deadlock by calling into stream without lock held
streamsCopy.get(i).cancel(status);
}
stopIfNecessary();
}
@Override
public Attributes getAttributes() {
// TODO(zhangkun83): fill channel security attributes
return Attributes.EMPTY;
}
private void startGoAway(Status status) {
synchronized (lock) {
if (startedGoAway) {
// Another go-away is in progress, ignore this one.
return;
}
// We use startedGoAway here instead of goAway, because once the goAway becomes true, other
// thread in stopIfNecessary() may stop the transport and cause the
// listener.transportTerminated() be called before listener.transportShutdown().
startedGoAway = true;
}
listener.transportShutdown(status);
synchronized (lock) {
goAway = true;
goAwayStatus = status;
}
stopIfNecessary();
}
@Override
public void ping(final PingCallback callback, Executor executor) {
// TODO(ericgribkoff): depend on cronet implemenetation
throw new UnsupportedOperationException();
}
@Override
public LogId getLogId() {
return logId;
}
/**
* When the transport is in goAway state, we should stop it once all active streams finish.
*/
void stopIfNecessary() {
synchronized (lock) {
if (goAway && !stopped && streams.size() == 0) {
stopped = true;
} else {
return;
}
}
listener.transportTerminated();
}
void finishStream(CronetClientStream stream, Status status) {
synchronized (lock) {
if (streams.remove(stream)) {
boolean isCancelled = (status.getCode() == Code.CANCELLED
|| status.getCode() == Code.DEADLINE_EXCEEDED);
stream.transportState().transportReportStatus(status, isCancelled, new Metadata());
} else {
return;
}
}
stopIfNecessary();
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright 2016, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.cronet;
import com.google.common.base.Preconditions;
import io.grpc.internal.WritableBuffer;
import java.nio.ByteBuffer;
class CronetWritableBuffer implements WritableBuffer {
private final ByteBuffer buffer;
public CronetWritableBuffer(ByteBuffer buffer, int capacity) {
this.buffer = Preconditions.checkNotNull(buffer, "buffer");
}
@Override
public void write(byte[] src, int srcIndex, int length) {
buffer.put(src, srcIndex, length);
}
@Override
public void write(byte b) {
buffer.put(b);
}
@Override
public int writableBytes() {
return buffer.remaining();
}
@Override
public int readableBytes() {
return buffer.position();
}
@Override
public void release() {
}
ByteBuffer buffer() {
return buffer;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright 2016, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.cronet;
import io.grpc.internal.WritableBuffer;
import io.grpc.internal.WritableBufferAllocator;
import java.nio.ByteBuffer;
/**
* The default allocator for {@link CronetWritableBuffer}s used by the Cronet transport.
*/
class CronetWritableBufferAllocator implements WritableBufferAllocator {
// Set the maximum buffer size to 1MB
private static final int MAX_BUFFER = 1024 * 1024;
/**
* Construct a new instance.
*/
CronetWritableBufferAllocator() {
}
@Override
public WritableBuffer allocate(int capacityHint) {
capacityHint = Math.min(MAX_BUFFER, capacityHint);
return new CronetWritableBuffer(ByteBuffer.allocateDirect(capacityHint), capacityHint);
}
}