First steps in reducing dependency on proto from runtime.

- Remove transport.proto and move status codes into Status.java with a little refactoring to make
status easier & more precise to use
- Move DeferredProtoInputStream into a proto subpackage
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=76392172
This commit is contained in:
lryan 2014-09-25 18:25:54 -07:00 committed by Eric Anderson
parent 65be3c72d9
commit 71e4a92c10
38 changed files with 370 additions and 222 deletions

View File

@ -2,7 +2,6 @@ package com.google.net.stubby;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.MapMaker; import com.google.common.collect.MapMaker;
import com.google.net.stubby.transport.Transport;
import java.io.InputStream; import java.io.InputStream;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -45,7 +44,7 @@ public abstract class AbstractOperation implements Operation {
*/ */
protected Operation progressTo(Phase desiredPhase) { protected Operation progressTo(Phase desiredPhase) {
if (desiredPhase.ordinal() < phase.ordinal()) { if (desiredPhase.ordinal() < phase.ordinal()) {
close(new Status(Transport.Code.INTERNAL, close(Status.INTERNAL.withDescription(
"Canot move to " + desiredPhase.name() + " from " + phase.name())); "Canot move to " + desiredPhase.name() + " from " + phase.name()));
} else { } else {
phase = desiredPhase; phase = desiredPhase;

View File

@ -4,12 +4,14 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import javax.annotation.Nullable;
/** /**
* Extension to {@link InputStream} to allow for deferred production of data. Allows for * Extension to {@link InputStream} to allow for deferred production of data. Allows for
* zero-copy conversions when the goal is to copy the contents of a resource to a * zero-copy conversions when the goal is to copy the contents of a resource to a
* stream or buffer. * stream or buffer.
*/ */
public abstract class DeferredInputStream extends InputStream { public abstract class DeferredInputStream<T> extends InputStream {
/** /**
* Produce the entire contents of this stream to the specified target * Produce the entire contents of this stream to the specified target
@ -17,4 +19,11 @@ public abstract class DeferredInputStream extends InputStream {
* @return number of bytes written * @return number of bytes written
*/ */
public abstract int flushTo(OutputStream target) throws IOException; public abstract int flushTo(OutputStream target) throws IOException;
/**
* Returns the object that backs the stream. If any bytes have been read from the stream
* then {@code null} is returned.
*/
@Nullable
public abstract T getDeferred();
} }

View File

@ -3,7 +3,6 @@ package com.google.net.stubby;
import com.google.net.stubby.newtransport.ClientStream; import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -82,7 +81,7 @@ public class SessionClientStream implements ClientStream {
*/ */
@Override @Override
public void cancel() { public void cancel() {
request.close(new Status(Transport.Code.CANCELLED)); request.close(Status.CANCELLED);
} }
/** /**

View File

@ -2,9 +2,11 @@ package com.google.net.stubby;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.net.stubby.transport.Transport; import com.google.common.collect.Lists;
import java.util.logging.Logger; import java.util.List;
import java.util.Objects;
import java.util.TreeMap;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable; import javax.annotation.concurrent.Immutable;
@ -13,16 +15,198 @@ import javax.annotation.concurrent.Immutable;
* Defines the status of an operation using the canonical error space. * Defines the status of an operation using the canonical error space.
*/ */
@Immutable @Immutable
public class Status { public final class Status {
public static final Status OK = new Status(Transport.Code.OK);
public static final Status CANCELLED = new Status(Transport.Code.CANCELLED); /**
public static final Metadata.Key<Transport.Code> CODE_KEY * The set of canonical error codes. If new codes are added over time they must choose
= Metadata.Key.of("grpc-status", new CodeMarshaller()); * a numerical value that does not collide with any previously defined code.
*/
public enum Code {
OK(0),
// The operation was cancelled (typically by the caller).
CANCELLED(1),
// Unknown error. An example of where this error may be returned is
// if a Status value received from another address space belongs to
// an error-space that is not known in this address space. Also
// errors raised by APIs that do not return enough error information
// may be converted to this error.
UNKNOWN(2),
// Client specified an invalid argument. Note that this differs
// from FAILED_PRECONDITION. INVALID_ARGUMENT indicates arguments
// that are problematic regardless of the state of the system
// (e.g., a malformed file name).
INVALID_ARGUMENT(3),
// Deadline expired before operation could complete. For operations
// that change the state of the system, this error may be returned
// even if the operation has completed successfully. For example, a
// successful response from a server could have been delayed long
// enough for the deadline to expire.
DEADLINE_EXCEEDED(4),
// Some requested entity (e.g., file or directory) was not found.
NOT_FOUND(5),
// Some entity that we attempted to create (e.g., file or directory)
// already exists.
ALREADY_EXISTS(6),
// The caller does not have permission to execute the specified
// operation. PERMISSION_DENIED must not be used for rejections
// caused by exhausting some resource (use RESOURCE_EXHAUSTED
// instead for those errors). PERMISSION_DENIED must not be
// used if the caller cannot be identified (use UNAUTHENTICATED
// instead for those errors).
PERMISSION_DENIED(7),
// Some resource has been exhausted, perhaps a per-user quota, or
// perhaps the entire file system is out of space.
RESOURCE_EXHAUSTED(8),
// Operation was rejected because the system is not in a state
// required for the operation's execution. For example, directory
// to be deleted may be non-empty, an rmdir operation is applied to
// a non-directory, etc.
//
// A litmus test that may help a service implementor in deciding
// between FAILED_PRECONDITION, ABORTED, and UNAVAILABLE:
// (a) Use UNAVAILABLE if the client can retry just the failing call.
// (b) Use ABORTED if the client should retry at a higher-level
// (e.g., restarting a read-modify-write sequence).
// (c) Use FAILED_PRECONDITION if the client should not retry until
// the system state has been explicitly fixed. E.g., if an "rmdir"
// fails because the directory is non-empty, FAILED_PRECONDITION
// should be returned since the client should not retry unless
// they have first fixed up the directory by deleting files from it.
FAILED_PRECONDITION(9),
// The operation was aborted, typically due to a concurrency issue
// like sequencer check failures, transaction aborts, etc.
//
// See litmus test above for deciding between FAILED_PRECONDITION,
// ABORTED, and UNAVAILABLE.
ABORTED(10),
// Operation was attempted past the valid range. E.g., seeking or
// reading past end of file.
//
// Unlike INVALID_ARGUMENT, this error indicates a problem that may
// be fixed if the system state changes. For example, a 32-bit file
// system will generate INVALID_ARGUMENT if asked to read at an
// offset that is not in the range [0,2^32-1], but it will generate
// OUT_OF_RANGE if asked to read from an offset past the current
// file size.
//
// There is a fair bit of overlap between FAILED_PRECONDITION and
// OUT_OF_RANGE. We recommend using OUT_OF_RANGE (the more specific
// error) when it applies so that callers who are iterating through
// a space can easily look for an OUT_OF_RANGE error to detect when
// they are done.
OUT_OF_RANGE(11),
// Operation is not implemented or not supported/enabled in this service.
UNIMPLEMENTED(12),
// Internal errors. Means some invariants expected by underlying
// system has been broken. If you see one of these errors,
// something is very broken.
INTERNAL(13),
// The service is currently unavailable. This is a most likely a
// transient condition and may be corrected by retrying with
// a backoff.
//
// See litmus test above for deciding between FAILED_PRECONDITION,
// ABORTED, and UNAVAILABLE.
UNAVAILABLE(14),
// Unrecoverable data loss or corruption.
DATA_LOSS(15),
// The request does not have valid authentication credentials for the
// operation.
UNAUTHENTICATED(16);
private final int value;
private Code(int value) {
this.value = value;
}
public int value() {
return value;
}
private Status status() {
return STATUS_LIST.get(value);
}
}
// Create the canonical list of Status instances indexed by their code values.
private static List<Status> STATUS_LIST;
static {
TreeMap<Integer, Status> canonicalizer = new TreeMap<>();
for (Code code : Code.values()) {
Status replaced = canonicalizer.put(code.value(), new Status(code));
if (replaced != null) {
throw new IllegalStateException("Code value duplication between " +
replaced.getCode().name() + " & " + code.name());
}
}
STATUS_LIST = Lists.newArrayList(canonicalizer.values());
}
// A pseudo-enum of Status instances mapped 1:1 with values in Code. This simplifies construction
// patterns for derived implementations of Status.
public static final Status OK = Code.OK.status();
public static final Status CANCELLED = Code.CANCELLED.status();
public static final Status UNKNOWN = Code.UNKNOWN.status();
public static final Status INVALID_ARGUMENT = Code.INVALID_ARGUMENT.status();
public static final Status DEADLINE_EXCEEDED = Code.DEADLINE_EXCEEDED.status();
public static final Status NOT_FOUND = Code.NOT_FOUND.status();
public static final Status ALREADY_EXISTS = Code.ALREADY_EXISTS.status();
public static final Status PERMISSION_DENIED = Code.PERMISSION_DENIED.status();
public static final Status UNAUTHENTICATED = Code.PERMISSION_DENIED.status();
public static final Status RESOURCE_EXHAUSTED = Code.RESOURCE_EXHAUSTED.status();
public static final Status FAILED_PRECONDITION =
Code.FAILED_PRECONDITION.status();
public static final Status ABORTED = Code.ABORTED.status();
public static final Status OUT_OF_RANGE = Code.OUT_OF_RANGE.status();
public static final Status UNIMPLEMENTED = Code.UNIMPLEMENTED.status();
public static final Status INTERNAL = Code.INTERNAL.status();
public static final Status UNAVAILABLE = Code.UNAVAILABLE.status();
public static final Status DATA_LOSS = Code.DATA_LOSS.status();
/**
* Return a {@link Status} given a canonical error {@link Code} value.
*/
public static Status fromCodeValue(int codeValue) {
Status status;
if (codeValue < 0 || codeValue > STATUS_LIST.size()) {
return UNKNOWN.withDescription("Unknown code " + codeValue);
} else {
return status = STATUS_LIST.get(codeValue);
}
}
/**
* Key to bind status code to trailers.
*/
public static final Metadata.Key<Status> CODE_KEY
= Metadata.Key.of("grpc-status", new StatusCodeMarshaller());
/**
* Key to bind status message to trailers.
*/
public static final Metadata.Key<String> MESSAGE_KEY public static final Metadata.Key<String> MESSAGE_KEY
= Metadata.Key.of("grpc-message", Metadata.STRING_MARSHALLER); = Metadata.Key.of("grpc-message", Metadata.STRING_MARSHALLER);
private static final Logger log = Logger.getLogger(Status.class.getName()); /**
* Extract an error {@link Status} from the causal chain of a {@link Throwable}.
*/
public static Status fromThrowable(Throwable t) { public static Status fromThrowable(Throwable t) {
for (Throwable cause : Throwables.getCausalChain(t)) { for (Throwable cause : Throwables.getCausalChain(t)) {
if (cause instanceof OperationException) { if (cause instanceof OperationException) {
@ -32,32 +216,44 @@ public class Status {
} }
} }
// Couldn't find a cause with a Status // Couldn't find a cause with a Status
return new Status(Transport.Code.INTERNAL, t); return INTERNAL.withCause(t);
} }
private final Transport.Code code; private final Code code;
private final String description; private final String description;
private final Throwable cause; private final Throwable cause;
public Status(Transport.Code code) { private Status(Code code) {
this(code, null, null); this(code, null, null);
} }
public Status(Transport.Code code, @Nullable String description) { private Status(Code code, @Nullable String description, @Nullable Throwable cause) {
this(code, description, null);
}
public Status(Transport.Code code, @Nullable Throwable cause) {
this(code, null, cause);
}
public Status(Transport.Code code, @Nullable String description, @Nullable Throwable cause) {
this.code = Preconditions.checkNotNull(code); this.code = Preconditions.checkNotNull(code);
this.description = description; this.description = description;
this.cause = cause; this.cause = cause;
} }
public Transport.Code getCode() { /**
* Create a derived instance of {@link Status} with the given cause.
*/
public Status withCause(Throwable cause) {
if (Objects.equals(this.cause, cause)) {
return this;
}
return new Status(this.code, this.description, cause);
}
/**
* Create a derived instance of {@link Status} with the given description.
*/
public Status withDescription(String description) {
if (Objects.equals(this.description, description)) {
return this;
}
return new Status(this.code, description, this.cause);
}
public Code getCode() {
return code; return code;
} }
@ -71,25 +267,25 @@ public class Status {
return cause; return cause;
} }
/**
* Is this status OK, i.e. not an error.
*/
public boolean isOk() { public boolean isOk() {
return OK.getCode() == getCode(); return Code.OK == code;
} }
/** /**
* Override this status with another if allowed. * Convert this {@link Status} to a {@link RuntimeException}. Use {@code #fromThrowable}
* to recover this {@link Status} instance when the returned exception is in the causal chain.
*/ */
public Status overrideWith(Status newStatus) {
if (this.getCode() == Transport.Code.OK || newStatus.code == Transport.Code.OK) {
return this;
} else {
return newStatus;
}
}
public RuntimeException asRuntimeException() { public RuntimeException asRuntimeException() {
return new OperationRuntimeException(this); return new OperationRuntimeException(this);
} }
/**
* Convert this {@link Status} to an {@link Exception}. Use {@code #fromThrowable}
* to recover this {@link Status} instance when the returned exception is in the causal chain.
*/
public Exception asException() { public Exception asException() {
return new OperationException(this); return new OperationException(this);
} }
@ -142,34 +338,25 @@ public class Status {
return builder.toString(); return builder.toString();
} }
private static class CodeMarshaller implements Metadata.Marshaller<Transport.Code> { private static class StatusCodeMarshaller implements Metadata.Marshaller<Status> {
@Override @Override
public byte[] toBytes(Transport.Code value) { public byte[] toBytes(Status status) {
return Metadata.INTEGER_MARSHALLER.toBytes(value.getNumber()); return Metadata.INTEGER_MARSHALLER.toBytes(status.getCode().value());
} }
@Override @Override
public String toAscii(Transport.Code value) { public String toAscii(Status status) {
return Metadata.INTEGER_MARSHALLER.toAscii(value.getNumber()); return Metadata.INTEGER_MARSHALLER.toAscii(status.getCode().value());
} }
@Override @Override
public Transport.Code parseBytes(byte[] serialized) { public Status parseBytes(byte[] serialized) {
return intToCode(Metadata.INTEGER_MARSHALLER.parseBytes(serialized)); return fromCodeValue(Metadata.INTEGER_MARSHALLER.parseBytes(serialized));
} }
@Override @Override
public Transport.Code parseAscii(String ascii) { public Status parseAscii(String ascii) {
return intToCode(Metadata.INTEGER_MARSHALLER.parseAscii(ascii)); return fromCodeValue(Metadata.INTEGER_MARSHALLER.parseAscii(ascii));
}
private Transport.Code intToCode(Integer i) {
Transport.Code code = Transport.Code.valueOf(i);
if (code == null) {
log.warning("Unknown Code: " + i);
code = Transport.Code.UNKNOWN;
}
return code;
} }
} }
} }

View File

@ -12,7 +12,6 @@ import com.google.net.stubby.Session;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer; import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.MessageFramer; import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.TransportFrameUtil; import com.google.net.stubby.transport.TransportFrameUtil;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
@ -156,7 +155,7 @@ public class ServletSession extends HttpServlet {
try { try {
responseStream.write(TransportFrameUtil.NO_COMPRESS_FLAG); responseStream.write(TransportFrameUtil.NO_COMPRESS_FLAG);
} catch (IOException ioe) { } catch (IOException ioe) {
close(new Status(Transport.Code.INTERNAL, ioe)); close(Status.INTERNAL.withCause(ioe));
} }
} }
@ -188,7 +187,7 @@ public class ServletSession extends HttpServlet {
frame.position(1); frame.position(1);
ByteBuffers.asByteSource(frame).copyTo(responseStream); ByteBuffers.asByteSource(frame).copyTo(responseStream);
} catch (Throwable t) { } catch (Throwable t) {
close(new Status(Transport.Code.INTERNAL, t)); close(Status.INTERNAL.withCause(t));
} finally { } finally {
if (closed && endOfMessage) { if (closed && endOfMessage) {
framer.close(); framer.close();

View File

@ -9,7 +9,6 @@ import com.google.net.stubby.Session;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer; import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.MessageFramer; import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -101,7 +100,7 @@ public class UrlConnectionClientSession implements Session {
connection.disconnect(); connection.disconnect();
} }
} catch (IOException ioe) { } catch (IOException ioe) {
close(new Status(Transport.Code.INTERNAL, ioe)); close(Status.INTERNAL.withCause(ioe));
} }
} }
} }

View File

@ -10,7 +10,6 @@ import com.google.net.stubby.Response;
import com.google.net.stubby.Session; import com.google.net.stubby.Session;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.MessageFramer; import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport.Code;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -111,7 +110,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
operation = serverStart(ctx, streamId, headers); operation = serverStart(ctx, streamId, headers);
if (operation == null) { if (operation == null) {
closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()), closeWithError(new NoOpRequest(createResponse(new Http2Writer(ctx), streamId).build()),
new Status(Code.NOT_FOUND)); Status.NOT_FOUND);
} }
} }
} }
@ -131,7 +130,7 @@ public class Http2Codec extends AbstractHttp2ConnectionHandler {
throws Http2Exception { throws Http2Exception {
Request request = requestRegistry.lookup(streamId); Request request = requestRegistry.lookup(streamId);
if (request != null) { if (request != null) {
closeWithError(request, new Status(Code.CANCELLED, "Stream reset")); closeWithError(request, Status.CANCELLED.withDescription("Stream reset"));
requestRegistry.remove(streamId); requestRegistry.remove(streamId);
} }
} }

View File

@ -4,14 +4,13 @@ import com.google.net.stubby.AbstractOperation;
import com.google.net.stubby.Operation; import com.google.net.stubby.Operation;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer; import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
import java.io.InputStream;
import java.nio.ByteBuffer;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import java.io.InputStream;
import java.nio.ByteBuffer;
/** /**
* Base implementation of {@link Operation} that writes HTTP2 frames * Base implementation of {@link Operation} that writes HTTP2 frames
*/ */
@ -55,7 +54,7 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
channelFuture.get(); channelFuture.get();
} }
} catch (Exception e) { } catch (Exception e) {
close(new Status(Transport.Code.INTERNAL, e)); close(Status.INTERNAL.withCause(e));
} finally { } finally {
if (closed) { if (closed) {
framer.close(); framer.close();

View File

@ -5,7 +5,6 @@ import com.google.net.stubby.AbstractOperation;
import com.google.net.stubby.Operation; import com.google.net.stubby.Operation;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Framer; import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
import com.squareup.okhttp.internal.spdy.FrameWriter; import com.squareup.okhttp.internal.spdy.FrameWriter;
@ -58,7 +57,7 @@ abstract class Http2Operation extends AbstractOperation implements Framer.Sink {
frameWriter.data(closed && endOfMessage, getId(), buffer); frameWriter.data(closed && endOfMessage, getId(), buffer);
frameWriter.flush(); frameWriter.flush();
} catch (IOException ioe) { } catch (IOException ioe) {
close(new Status(Transport.Code.INTERNAL, ioe)); close(Status.INTERNAL.withCause(ioe));
} finally { } finally {
if (closed && endOfMessage) { if (closed && endOfMessage) {
framer.close(); framer.close();

View File

@ -7,7 +7,6 @@ import com.google.net.stubby.Response;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.okhttp.Headers; import com.google.net.stubby.newtransport.okhttp.Headers;
import com.google.net.stubby.transport.Framer; import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
import com.squareup.okhttp.internal.spdy.FrameWriter; import com.squareup.okhttp.internal.spdy.FrameWriter;
import com.squareup.okhttp.internal.spdy.Header; import com.squareup.okhttp.internal.spdy.Header;
@ -37,7 +36,7 @@ public class Http2Request extends Http2Operation implements Request {
Headers.createRequestHeaders(headers, defaultPath, defaultAuthority); Headers.createRequestHeaders(headers, defaultPath, defaultAuthority);
frameWriter.synStream(false, false, getId(), 0, requestHeaders); frameWriter.synStream(false, false, getId(), 0, requestHeaders);
} catch (IOException ioe) { } catch (IOException ioe) {
close(new Status(Transport.Code.UNKNOWN, ioe)); close(Status.UNKNOWN.withCause(ioe));
} }
} }

View File

@ -4,7 +4,6 @@ import com.google.net.stubby.Response;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.okhttp.Headers; import com.google.net.stubby.newtransport.okhttp.Headers;
import com.google.net.stubby.transport.Framer; import com.google.net.stubby.transport.Framer;
import com.google.net.stubby.transport.Transport;
import com.squareup.okhttp.internal.spdy.FrameWriter; import com.squareup.okhttp.internal.spdy.FrameWriter;
@ -35,7 +34,7 @@ public class Http2Response extends Http2Operation implements Response {
try { try {
frameWriter.synStream(false, false, getId(), 0, Headers.createResponseHeaders()); frameWriter.synStream(false, false, getId(), 0, Headers.createResponseHeaders());
} catch (IOException ioe) { } catch (IOException ioe) {
close(new Status(Transport.Code.INTERNAL, ioe)); close(Status.INTERNAL.withCause(ioe));
} }
} }
} }

View File

@ -14,8 +14,6 @@ import com.google.net.stubby.Session;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.InputStreamDeframer; import com.google.net.stubby.transport.InputStreamDeframer;
import com.google.net.stubby.transport.MessageFramer; import com.google.net.stubby.transport.MessageFramer;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.FrameReader; import com.squareup.okhttp.internal.spdy.FrameReader;
@ -46,22 +44,22 @@ public class OkHttpSession implements Session {
private static final ImmutableMap<ErrorCode, Status> ERROR_CODE_TO_STATUS = ImmutableMap private static final ImmutableMap<ErrorCode, Status> ERROR_CODE_TO_STATUS = ImmutableMap
.<ErrorCode, Status>builder() .<ErrorCode, Status>builder()
.put(ErrorCode.NO_ERROR, Status.OK) .put(ErrorCode.NO_ERROR, Status.OK)
.put(ErrorCode.PROTOCOL_ERROR, new Status(Transport.Code.INTERNAL, "Protocol error")) .put(ErrorCode.PROTOCOL_ERROR, Status.INTERNAL.withDescription("Protocol error"))
.put(ErrorCode.INVALID_STREAM, new Status(Transport.Code.INTERNAL, "Invalid stream")) .put(ErrorCode.INVALID_STREAM, Status.INTERNAL.withDescription("Invalid stream"))
.put(ErrorCode.UNSUPPORTED_VERSION, .put(ErrorCode.UNSUPPORTED_VERSION,
new Status(Transport.Code.INTERNAL, "Unsupported version")) Status.INTERNAL.withDescription("Unsupported version"))
.put(ErrorCode.STREAM_IN_USE, new Status(Transport.Code.INTERNAL, "Stream in use")) .put(ErrorCode.STREAM_IN_USE, Status.INTERNAL.withDescription("Stream in use"))
.put(ErrorCode.STREAM_ALREADY_CLOSED, .put(ErrorCode.STREAM_ALREADY_CLOSED,
new Status(Transport.Code.INTERNAL, "Stream already closed")) Status.INTERNAL.withDescription("Stream already closed"))
.put(ErrorCode.INTERNAL_ERROR, new Status(Transport.Code.INTERNAL, "Internal error")) .put(ErrorCode.INTERNAL_ERROR, Status.INTERNAL.withDescription("Internal error"))
.put(ErrorCode.FLOW_CONTROL_ERROR, new Status(Transport.Code.INTERNAL, "Flow control error")) .put(ErrorCode.FLOW_CONTROL_ERROR, Status.INTERNAL.withDescription("Flow control error"))
.put(ErrorCode.STREAM_CLOSED, new Status(Transport.Code.INTERNAL, "Stream closed")) .put(ErrorCode.STREAM_CLOSED, Status.INTERNAL.withDescription("Stream closed"))
.put(ErrorCode.FRAME_TOO_LARGE, new Status(Transport.Code.INTERNAL, "Frame too large")) .put(ErrorCode.FRAME_TOO_LARGE, Status.INTERNAL.withDescription("Frame too large"))
.put(ErrorCode.REFUSED_STREAM, new Status(Transport.Code.INTERNAL, "Refused stream")) .put(ErrorCode.REFUSED_STREAM, Status.INTERNAL.withDescription("Refused stream"))
.put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled")) .put(ErrorCode.CANCEL, Status.CANCELLED.withDescription("Cancelled"))
.put(ErrorCode.COMPRESSION_ERROR, new Status(Transport.Code.INTERNAL, "Compression error")) .put(ErrorCode.COMPRESSION_ERROR, Status.INTERNAL.withDescription("Compression error"))
.put(ErrorCode.INVALID_CREDENTIALS, .put(ErrorCode.INVALID_CREDENTIALS,
new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials")) Status.PERMISSION_DENIED.withDescription("Invalid credentials"))
.build(); .build();
public static Session startClient(Socket socket, RequestRegistry requestRegistry, public static Session startClient(Socket socket, RequestRegistry requestRegistry,
@ -200,7 +198,7 @@ public class OkHttpSession implements Session {
} }
} catch (Throwable ioe) { } catch (Throwable ioe) {
ioe.printStackTrace(); ioe.printStackTrace();
closeAllRequests(new Status(Code.INTERNAL, ioe.getMessage())); closeAllRequests(Status.INTERNAL.withCause(ioe));
} finally { } finally {
// Restore the original thread name. // Restore the original thread name.
Thread.currentThread().setName(threadName); Thread.currentThread().setName(threadName);

View File

@ -53,7 +53,8 @@ public abstract class AbstractClientStream extends AbstractStream implements Cli
*/ */
public void stashTrailers(Metadata.Trailers trailers) { public void stashTrailers(Metadata.Trailers trailers) {
Preconditions.checkNotNull(status, "trailers"); Preconditions.checkNotNull(status, "trailers");
stashedStatus = new Status(trailers.get(Status.CODE_KEY), trailers.get(Status.MESSAGE_KEY)); stashedStatus = trailers.get(Status.CODE_KEY)
.withDescription(trailers.get(Status.MESSAGE_KEY));
trailers.removeAll(Status.CODE_KEY); trailers.removeAll(Status.CODE_KEY);
trailers.removeAll(Status.MESSAGE_KEY); trailers.removeAll(Status.MESSAGE_KEY);
stashedTrailers = trailers; stashedTrailers = trailers;

View File

@ -8,7 +8,6 @@ import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata; import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.InputStream; import java.io.InputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -61,7 +60,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
gracefulClose = true; gracefulClose = true;
trailers.removeAll(Status.CODE_KEY); trailers.removeAll(Status.CODE_KEY);
trailers.removeAll(Status.MESSAGE_KEY); trailers.removeAll(Status.MESSAGE_KEY);
trailers.put(Status.CODE_KEY, status.getCode()); trailers.put(Status.CODE_KEY, status);
if (status.getDescription() != null) { if (status.getDescription() != null) {
trailers.put(Status.MESSAGE_KEY, status.getDescription()); trailers.put(Status.MESSAGE_KEY, status.getDescription());
} }
@ -114,7 +113,7 @@ public abstract class AbstractServerStream extends AbstractStream implements Ser
listenerClosed = true; listenerClosed = true;
} }
if (!gracefulClose) { if (!gracefulClose) {
listener.closed(new Status(Transport.Code.INTERNAL, "successful complete() without close()")); listener.closed(Status.INTERNAL.withDescription("successful complete() without close()"));
throw new IllegalStateException("successful complete() without close()"); throw new IllegalStateException("successful complete() without close()");
} }
listener.closed(Status.OK); listener.closed(Status.OK);

View File

@ -4,7 +4,6 @@ import com.google.common.io.ByteStreams;
import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Operation; import com.google.net.stubby.Operation;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -37,7 +36,7 @@ public abstract class Deframer<F> implements Framer.Sink<F> {
int remaining = internalDeliverFrame(frame); int remaining = internalDeliverFrame(frame);
if (endOfStream) { if (endOfStream) {
if (remaining > 0) { if (remaining > 0) {
writeStatus(new Status(Transport.Code.UNKNOWN, "EOF on incomplete frame")); writeStatus(Status.UNKNOWN.withDescription("EOF on incomplete frame"));
} else if (!statusDelivered) { } else if (!statusDelivered) {
writeStatus(Status.OK); writeStatus(Status.OK);
} }
@ -90,16 +89,9 @@ public abstract class Deframer<F> implements Framer.Sink<F> {
inFrame = false; inFrame = false;
} }
} else if (GrpcFramingUtil.isStatusFrame(currentFlags)) { } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
int status = framedChunk.read() << 8 | framedChunk.read(); int code = framedChunk.read() << 8 | framedChunk.read();
Transport.Code code = Transport.Code.valueOf(status);
// TODO(user): Resolve what to do with remainder of framedChunk
try { try {
if (code == null) { writeStatus(Status.fromCodeValue(code));
// Log for unknown code
writeStatus(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
} else {
writeStatus(new Status(code));
}
} finally { } finally {
currentLength = LENGTH_NOT_SET; currentLength = LENGTH_NOT_SET;
inFrame = false; inFrame = false;
@ -111,7 +103,7 @@ public abstract class Deframer<F> implements Framer.Sink<F> {
} }
} }
} catch (IOException ioe) { } catch (IOException ioe) {
Status status = new Status(Transport.Code.UNKNOWN, ioe); Status status = Status.UNKNOWN.withCause(ioe);
writeStatus(status); writeStatus(status);
throw status.asRuntimeException(); throw status.asRuntimeException();
} }

View File

@ -8,9 +8,7 @@ import static com.google.net.stubby.GrpcFramingUtil.STATUS_FRAME;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.io.Closeable; import java.io.Closeable;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -215,10 +213,7 @@ public class GrpcDeframer implements Closeable {
*/ */
private void processStatus() { private void processStatus() {
try { try {
int statusCode = nextFrame.readUnsignedShort(); notifyStatus(Status.fromCodeValue(nextFrame.readUnsignedShort()));
Transport.Code code = Transport.Code.valueOf(statusCode);
notifyStatus(code != null ? new Status(code)
: new Status(Transport.Code.UNKNOWN, "Unknown status code " + statusCode));
} finally { } finally {
nextFrame.close(); nextFrame.close();
nextFrame = null; nextFrame = null;

View File

@ -1,6 +1,6 @@
package com.google.net.stubby.newtransport; package com.google.net.stubby.newtransport;
import com.google.net.stubby.transport.Transport; import com.google.net.stubby.Status;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
@ -38,29 +38,29 @@ public final class HttpUtil {
/** /**
* Maps HTTP error response status codes to transport codes. * Maps HTTP error response status codes to transport codes.
*/ */
public static Transport.Code httpStatusToTransportCode(int httpStatusCode) { public static Status httpStatusToGrpcStatus(int httpStatusCode) {
// Specific HTTP code handling. // Specific HTTP code handling.
switch (httpStatusCode) { switch (httpStatusCode) {
case HttpURLConnection.HTTP_UNAUTHORIZED: // 401 case HttpURLConnection.HTTP_UNAUTHORIZED: // 401
return Transport.Code.UNAUTHENTICATED; return Status.UNAUTHENTICATED;
case HttpURLConnection.HTTP_FORBIDDEN: // 403 case HttpURLConnection.HTTP_FORBIDDEN: // 403
return Transport.Code.PERMISSION_DENIED; return Status.PERMISSION_DENIED;
default: default:
} }
// Generic HTTP code handling. // Generic HTTP code handling.
if (httpStatusCode < 300) { if (httpStatusCode < 300) {
return Transport.Code.OK; return Status.OK;
} }
if (httpStatusCode < 400) { if (httpStatusCode < 400) {
return Transport.Code.UNAVAILABLE; return Status.UNAVAILABLE;
} }
if (httpStatusCode < 500) { if (httpStatusCode < 500) {
return Transport.Code.INVALID_ARGUMENT; return Status.INVALID_ARGUMENT;
} }
if (httpStatusCode < 600) { if (httpStatusCode < 600) {
return Transport.Code.FAILED_PRECONDITION; return Status.FAILED_PRECONDITION;
} }
return Transport.Code.INTERNAL; return Status.INTERNAL;
} }
private HttpUtil() {} private HttpUtil() {}

View File

@ -6,8 +6,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.Closeable; import java.io.Closeable;
import java.io.InputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;

View File

@ -49,7 +49,7 @@ public class MessageFramer implements Framer {
@Override @Override
public void writeStatus(Status status) { public void writeStatus(Status status) {
verifyNotClosed(); verifyNotClosed();
short code = (short) status.getCode().getNumber(); short code = (short) status.getCode().value();
scratch.clear(); scratch.clear();
scratch.put(GrpcFramingUtil.STATUS_FRAME); scratch.put(GrpcFramingUtil.STATUS_FRAME);
int length = 2; int length = 2;

View File

@ -16,7 +16,6 @@ import com.google.net.stubby.newtransport.ClientStream;
import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.InputStreamDeframer; import com.google.net.stubby.newtransport.InputStreamDeframer;
import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
@ -142,7 +141,7 @@ public class HttpClientTransport extends AbstractClientTransport {
} }
} }
} catch (IOException ioe) { } catch (IOException ioe) {
setStatus(new Status(Transport.Code.INTERNAL, ioe), new Metadata.Trailers()); setStatus(Status.INTERNAL.withCause(ioe), new Metadata.Trailers());
} }
} }

View File

@ -5,7 +5,6 @@ import static com.google.net.stubby.newtransport.netty.NettyClientStream.PENDING
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.net.stubby.Metadata; import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -34,7 +33,7 @@ import java.util.Iterator;
* the context of the Netty Channel thread. * the context of the Netty Channel thread.
*/ */
class NettyClientHandler extends AbstractHttp2ConnectionHandler { class NettyClientHandler extends AbstractHttp2ConnectionHandler {
private static final Status GOAWAY_STATUS = new Status(Transport.Code.UNAVAILABLE); private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
/** /**
* A pending stream creation. * A pending stream creation.
@ -144,7 +143,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
// TODO(user): do something with errorCode? // TODO(user): do something with errorCode?
Http2Stream http2Stream = connection().requireStream(streamId); Http2Stream http2Stream = connection().requireStream(streamId);
NettyClientStream stream = clientStream(http2Stream); NettyClientStream stream = clientStream(http2Stream);
stream.setStatus(new Status(Transport.Code.UNKNOWN), new Metadata.Trailers()); stream.setStatus(Status.UNKNOWN, new Metadata.Trailers());
} }
/** /**
@ -389,7 +388,7 @@ class NettyClientHandler extends AbstractHttp2ConnectionHandler {
case RESERVED_REMOTE: case RESERVED_REMOTE:
// Disallowed state, terminate the stream. // Disallowed state, terminate the stream.
clientStream(stream).setStatus( clientStream(stream).setStatus(
new Status(Transport.Code.INTERNAL, "Stream in invalid state: " + stream.state()), Status.INTERNAL.withDescription("Stream in invalid state: " + stream.state()),
new Metadata.Trailers()); new Metadata.Trailers());
writeRstStream(ctx(), stream.id(), Http2Error.INTERNAL_ERROR.code(), ctx().newPromise()); writeRstStream(ctx(), stream.id(), Http2Error.INTERNAL_ERROR.code(), ctx().newPromise());
ctx().flush(); ctx().flush();

View File

@ -15,7 +15,6 @@ import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.GrpcDeframer; import com.google.net.stubby.newtransport.GrpcDeframer;
import com.google.net.stubby.newtransport.HttpUtil; import com.google.net.stubby.newtransport.HttpUtil;
import com.google.net.stubby.newtransport.MessageDeframer2; import com.google.net.stubby.newtransport.MessageDeframer2;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -39,7 +38,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
private final GrpcDeframer deframer; private final GrpcDeframer deframer;
private final MessageDeframer2 deframer2; private final MessageDeframer2 deframer2;
private final WindowUpdateManager windowUpdateManager; private final WindowUpdateManager windowUpdateManager;
private Transport.Code responseCode = Transport.Code.UNKNOWN; private Status responseStatus = Status.UNKNOWN;
private boolean isGrpcResponse; private boolean isGrpcResponse;
private StringBuilder nonGrpcErrorMessage = new StringBuilder(); private StringBuilder nonGrpcErrorMessage = new StringBuilder();
@ -83,15 +82,15 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
* Called in the channel thread to process headers received from the server. * Called in the channel thread to process headers received from the server.
*/ */
public void inboundHeadersRecieved(Http2Headers headers, boolean endOfStream) { public void inboundHeadersRecieved(Http2Headers headers, boolean endOfStream) {
responseCode = responseCode(headers, responseCode); responseStatus = responseStatus(headers, responseStatus);
isGrpcResponse = isGrpcResponse(headers, responseCode); isGrpcResponse = isGrpcResponse(headers, responseStatus);
if (endOfStream) { if (endOfStream) {
if (isGrpcResponse) { if (isGrpcResponse) {
// TODO(user): call stashTrailers() as appropriate, then provide endOfStream to // TODO(user): call stashTrailers() as appropriate, then provide endOfStream to
// deframer. // deframer.
setStatus(new Status(responseCode), new Metadata.Trailers()); setStatus(responseStatus, new Metadata.Trailers());
} else { } else {
setStatus(new Status(responseCode), new Metadata.Trailers()); setStatus(responseStatus, new Metadata.Trailers());
} }
} }
} }
@ -125,7 +124,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
if (endOfStream) { if (endOfStream) {
String msg = nonGrpcErrorMessage.toString(); String msg = nonGrpcErrorMessage.toString();
setStatus(new Status(responseCode, msg), new Metadata.Trailers()); setStatus(responseStatus.withDescription(msg), new Metadata.Trailers());
} }
} }
} }
@ -145,7 +144,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
/** /**
* Determines whether or not the response from the server is a GRPC response. * Determines whether or not the response from the server is a GRPC response.
*/ */
private boolean isGrpcResponse(Http2Headers headers, Transport.Code code) { private boolean isGrpcResponse(Http2Headers headers, Status status) {
if (isGrpcResponse) { if (isGrpcResponse) {
// Already verified that it's a gRPC response. // Already verified that it's a gRPC response.
return true; return true;
@ -157,7 +156,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
} }
// GRPC responses should always return OK. Updated this code once b/16290036 is fixed. // GRPC responses should always return OK. Updated this code once b/16290036 is fixed.
if (code == Transport.Code.OK) { if (status.isOk()) {
// ESF currently returns the wrong content-type for grpc. // ESF currently returns the wrong content-type for grpc.
return true; return true;
} }
@ -169,7 +168,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
/** /**
* Parses the response status and converts it to a transport code. * Parses the response status and converts it to a transport code.
*/ */
private Transport.Code responseCode(Http2Headers headers, Transport.Code defaultValue) { private static Status responseStatus(Http2Headers headers, Status defaultValue) {
if (headers == null) { if (headers == null) {
return defaultValue; return defaultValue;
} }
@ -177,9 +176,7 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
// First, check to see if we found a v2 protocol grpc-status header. // First, check to see if we found a v2 protocol grpc-status header.
AsciiString grpcStatus = headers.get(GRPC_STATUS_HEADER); AsciiString grpcStatus = headers.get(GRPC_STATUS_HEADER);
if (grpcStatus != null) { if (grpcStatus != null) {
int code = grpcStatus.parseInt(); return Status.fromCodeValue(grpcStatus.parseInt());
Transport.Code value = Transport.Code.valueOf(code);
return value != null ? value : Transport.Code.UNKNOWN;
} }
// Next, check the HTTP/2 status. // Next, check the HTTP/2 status.
@ -188,6 +185,6 @@ class NettyClientStream extends AbstractClientStream implements NettyStream {
return defaultValue; return defaultValue;
} }
HttpResponseStatus status = HttpResponseStatus.parseLine(statusLine); HttpResponseStatus status = HttpResponseStatus.parseLine(statusLine);
return HttpUtil.httpStatusToTransportCode(status.code()); return HttpUtil.httpStatusToGrpcStatus(status.code());
} }
} }

View File

@ -10,7 +10,6 @@ import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ServerStreamListener; import com.google.net.stubby.newtransport.ServerStreamListener;
import com.google.net.stubby.newtransport.ServerTransportListener; import com.google.net.stubby.newtransport.ServerTransportListener;
import com.google.net.stubby.newtransport.TransportFrameUtil; import com.google.net.stubby.newtransport.TransportFrameUtil;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
@ -43,7 +42,7 @@ class NettyServerHandler extends AbstractHttp2ConnectionHandler {
private static Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); private static Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
private static final Status GOAWAY_STATUS = new Status(Transport.Code.UNAVAILABLE); private static final Status GOAWAY_STATUS = Status.UNAVAILABLE;
private final ServerTransportListener transportListener; private final ServerTransportListener transportListener;
private final DefaultHttp2InboundFlowController inboundFlow; private final DefaultHttp2InboundFlowController inboundFlow;

View File

@ -14,8 +14,6 @@ import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.ClientTransport; import com.google.net.stubby.newtransport.ClientTransport;
import com.google.net.stubby.newtransport.InputStreamDeframer; import com.google.net.stubby.newtransport.InputStreamDeframer;
import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.FrameReader; import com.squareup.okhttp.internal.spdy.FrameReader;
@ -58,30 +56,30 @@ public class OkHttpClientTransport extends AbstractClientTransport {
Map<ErrorCode, Status> errorToStatus = new HashMap<ErrorCode, Status>(); Map<ErrorCode, Status> errorToStatus = new HashMap<ErrorCode, Status>();
errorToStatus.put(ErrorCode.NO_ERROR, Status.OK); errorToStatus.put(ErrorCode.NO_ERROR, Status.OK);
errorToStatus.put(ErrorCode.PROTOCOL_ERROR, errorToStatus.put(ErrorCode.PROTOCOL_ERROR,
new Status(Transport.Code.INTERNAL, "Protocol error")); Status.INTERNAL.withDescription("Protocol error"));
errorToStatus.put(ErrorCode.INVALID_STREAM, errorToStatus.put(ErrorCode.INVALID_STREAM,
new Status(Transport.Code.INTERNAL, "Invalid stream")); Status.INTERNAL.withDescription("Invalid stream"));
errorToStatus.put(ErrorCode.UNSUPPORTED_VERSION, errorToStatus.put(ErrorCode.UNSUPPORTED_VERSION,
new Status(Transport.Code.INTERNAL, "Unsupported version")); Status.INTERNAL.withDescription("Unsupported version"));
errorToStatus.put(ErrorCode.STREAM_IN_USE, errorToStatus.put(ErrorCode.STREAM_IN_USE,
new Status(Transport.Code.INTERNAL, "Stream in use")); Status.INTERNAL.withDescription("Stream in use"));
errorToStatus.put(ErrorCode.STREAM_ALREADY_CLOSED, errorToStatus.put(ErrorCode.STREAM_ALREADY_CLOSED,
new Status(Transport.Code.INTERNAL, "Stream already closed")); Status.INTERNAL.withDescription("Stream already closed"));
errorToStatus.put(ErrorCode.INTERNAL_ERROR, errorToStatus.put(ErrorCode.INTERNAL_ERROR,
new Status(Transport.Code.INTERNAL, "Internal error")); Status.INTERNAL.withDescription("Internal error"));
errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR, errorToStatus.put(ErrorCode.FLOW_CONTROL_ERROR,
new Status(Transport.Code.INTERNAL, "Flow control error")); Status.INTERNAL.withDescription("Flow control error"));
errorToStatus.put(ErrorCode.STREAM_CLOSED, errorToStatus.put(ErrorCode.STREAM_CLOSED,
new Status(Transport.Code.INTERNAL, "Stream closed")); Status.INTERNAL.withDescription("Stream closed"));
errorToStatus.put(ErrorCode.FRAME_TOO_LARGE, errorToStatus.put(ErrorCode.FRAME_TOO_LARGE,
new Status(Transport.Code.INTERNAL, "Frame too large")); Status.INTERNAL.withDescription("Frame too large"));
errorToStatus.put(ErrorCode.REFUSED_STREAM, errorToStatus.put(ErrorCode.REFUSED_STREAM,
new Status(Transport.Code.INTERNAL, "Refused stream")); Status.INTERNAL.withDescription("Refused stream"));
errorToStatus.put(ErrorCode.CANCEL, new Status(Transport.Code.CANCELLED, "Cancelled")); errorToStatus.put(ErrorCode.CANCEL, Status.CANCELLED.withDescription("Cancelled"));
errorToStatus.put(ErrorCode.COMPRESSION_ERROR, errorToStatus.put(ErrorCode.COMPRESSION_ERROR,
new Status(Transport.Code.INTERNAL, "Compression error")); Status.INTERNAL.withDescription("Compression error"));
errorToStatus.put(ErrorCode.INVALID_CREDENTIALS, errorToStatus.put(ErrorCode.INVALID_CREDENTIALS,
new Status(Transport.Code.PERMISSION_DENIED, "Invalid credentials")); Status.PERMISSION_DENIED.withDescription("Invalid credentials"));
ERROR_CODE_TO_STATUS = Collections.unmodifiableMap(errorToStatus); ERROR_CODE_TO_STATUS = Collections.unmodifiableMap(errorToStatus);
} }
@ -163,7 +161,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
normalClose = !goAway; normalClose = !goAway;
} }
if (normalClose) { if (normalClose) {
abort(new Status(Code.INTERNAL, "Transport stopped")); abort(Status.INTERNAL.withDescription("Transport stopped"));
// Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams. // Send GOAWAY with lastGoodStreamId of 0, since we don't expect any server-initiated streams.
// The GOAWAY is part of graceful shutdown. // The GOAWAY is part of graceful shutdown.
frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]); frameWriter.goAway(0, ErrorCode.NO_ERROR, new byte[0]);
@ -353,7 +351,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
@Override @Override
public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) { public void goAway(int lastGoodStreamId, ErrorCode errorCode, ByteString debugData) {
onGoAway(lastGoodStreamId, new Status(Code.UNAVAILABLE, "Go away")); onGoAway(lastGoodStreamId, Status.UNAVAILABLE.withDescription("Go away"));
} }
@Override @Override
@ -387,7 +385,7 @@ public class OkHttpClientTransport extends AbstractClientTransport {
stream.streamId = nextStreamId; stream.streamId = nextStreamId;
streams.put(stream.streamId, stream); streams.put(stream.streamId, stream);
if (nextStreamId >= Integer.MAX_VALUE - 2) { if (nextStreamId >= Integer.MAX_VALUE - 2) {
onGoAway(Integer.MAX_VALUE, new Status(Code.INTERNAL, "Stream id exhaust")); onGoAway(Integer.MAX_VALUE, Status.INTERNAL.withDescription("Stream id exhaust"));
} else { } else {
nextStreamId += 2; nextStreamId += 2;
} }

View File

@ -1,6 +1,7 @@
package com.google.net.stubby; package com.google.net.stubby.proto;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.net.stubby.DeferredInputStream;
import com.google.protobuf.CodedOutputStream; import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLite;
@ -11,9 +12,9 @@ import java.io.OutputStream;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
* Implementation of {@link DeferredInputStream} backed by a protobuf. * Implementation of {@link com.google.net.stubby.DeferredInputStream} backed by a protobuf.
*/ */
public class DeferredProtoInputStream extends DeferredInputStream { public class DeferredProtoInputStream extends DeferredInputStream<MessageLite> {
// DeferredProtoInputStream is first initialized with a *message*. *partial* is initially null. // DeferredProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and // Once there has been a read operation on this stream, *message* is serialized to *partial* and
@ -29,7 +30,7 @@ public class DeferredProtoInputStream extends DeferredInputStream {
* Returns the original protobuf message. Returns null after this stream has been read. * Returns the original protobuf message. Returns null after this stream has been read.
*/ */
@Nullable @Nullable
public MessageLite getMessage() { public MessageLite getDeferred() {
return message; return message;
} }

View File

@ -74,16 +74,10 @@ public abstract class Deframer<F> {
inFrame = false; inFrame = false;
} }
} else if (GrpcFramingUtil.isStatusFrame(currentFlags)) { } else if (GrpcFramingUtil.isStatusFrame(currentFlags)) {
int status = framedChunk.read() << 8 | framedChunk.read(); int code = framedChunk.read() << 8 | framedChunk.read();
Transport.Code code = Transport.Code.valueOf(status);
// TODO(user): Resolve what to do with remainder of framedChunk // TODO(user): Resolve what to do with remainder of framedChunk
try { try {
if (code == null) { target.close(Status.fromCodeValue(code));
// Log for unknown code
target.close(new Status(Transport.Code.UNKNOWN, "Unknown status code " + status));
} else {
target.close(new Status(code));
}
} finally { } finally {
currentLength = LENGTH_NOT_SET; currentLength = LENGTH_NOT_SET;
inFrame = false; inFrame = false;
@ -95,7 +89,7 @@ public abstract class Deframer<F> {
} }
} }
} catch (IOException ioe) { } catch (IOException ioe) {
Status status = new Status(Transport.Code.UNKNOWN, ioe); Status status = Status.UNKNOWN.withCause(ioe);
target.close(status); target.close(status);
throw status.asRuntimeException(); throw status.asRuntimeException();
} }

View File

@ -49,7 +49,7 @@ public class MessageFramer implements Framer {
@Override @Override
public void writeStatus(Status status, boolean flush, Sink sink) { public void writeStatus(Status status, boolean flush, Sink sink) {
short code = (short) status.getCode().getNumber(); short code = (short) status.getCode().value();
scratch.clear(); scratch.clear();
scratch.put(GrpcFramingUtil.STATUS_FRAME); scratch.put(GrpcFramingUtil.STATUS_FRAME);
int length = 2; int length = 2;

View File

@ -15,7 +15,6 @@ import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import org.junit.Before; import org.junit.Before;
@ -42,7 +41,7 @@ import javax.annotation.Nullable;
public class GrpcDeframerTest { public class GrpcDeframerTest {
private static final String MESSAGE = "hello world"; private static final String MESSAGE = "hello world";
private static final ByteString MESSAGE_BSTR = ByteString.copyFromUtf8(MESSAGE); private static final ByteString MESSAGE_BSTR = ByteString.copyFromUtf8(MESSAGE);
private static final Transport.Code STATUS_CODE = Transport.Code.CANCELLED; private static final Status STATUS_CODE = Status.CANCELLED;
private GrpcDeframer reader; private GrpcDeframer reader;
@ -79,7 +78,7 @@ public class GrpcDeframerTest {
verifyNoStatus(); verifyNoStatus();
messageFuture.set(null); messageFuture.set(null);
verifyStatus(Transport.Code.OK); verifyStatus(Status.Code.OK);
} }
@Test @Test
@ -107,7 +106,7 @@ public class GrpcDeframerTest {
writeFrame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray(), dos); writeFrame(PAYLOAD_FRAME, MESSAGE_BSTR.toByteArray(), dos);
// Write a status frame. // Write a status frame.
byte[] statusBytes = new byte[] {0, (byte) STATUS_CODE.getNumber()}; byte[] statusBytes = new byte[] {0, (byte) STATUS_CODE.getCode().value()};
writeFrame(STATUS_FRAME, statusBytes, dos); writeFrame(STATUS_FRAME, statusBytes, dos);
// Now write the complete frame: compression header followed by the 3 message frames. // Now write the complete frame: compression header followed by the 3 message frames.
@ -169,10 +168,10 @@ public class GrpcDeframerTest {
} }
private void verifyStatus() { private void verifyStatus() {
verifyStatus(Transport.Code.CANCELLED); verifyStatus(Status.Code.CANCELLED);
} }
private void verifyStatus(Transport.Code code) { private void verifyStatus(Status.Code code) {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(sink).statusRead(captor.capture()); verify(sink).statusRead(captor.capture());
verify(sink).endOfStream(); verify(sink).endOfStream();
@ -193,7 +192,7 @@ public class GrpcDeframerTest {
} }
private static byte[] statusFrame() throws IOException { private static byte[] statusFrame() throws IOException {
byte[] bytes = new byte[] {0, (byte) STATUS_CODE.getNumber()}; byte[] bytes = new byte[] {0, (byte) STATUS_CODE.getCode().value()};
return frame(STATUS_FRAME, bytes); return frame(STATUS_FRAME, bytes);
} }

View File

@ -7,7 +7,6 @@ import static org.junit.Assert.assertTrue;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
import com.google.net.stubby.GrpcFramingUtil; import com.google.net.stubby.GrpcFramingUtil;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
@ -60,7 +59,7 @@ public class MessageFramerTest {
new byte[]{0, 0, 0, 2}, // Len is 2 bytes new byte[]{0, 0, 0, 2}, // Len is 2 bytes
new byte[]{0, 13}); // Internal==13 new byte[]{0, 13}); // Internal==13
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
framer.writeStatus(new Status(Transport.Code.INTERNAL)); framer.writeStatus(Status.INTERNAL);
if ((i + 1) % 13 == 0) { if ((i + 1) % 13 == 0) {
framer.flush(); framer.flush();
} }

View File

@ -19,7 +19,6 @@ import static org.mockito.Mockito.when;
import com.google.net.stubby.Metadata; import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
@ -228,7 +227,7 @@ public class NettyClientHandlerTest extends NettyHandlerTestBase {
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
InOrder inOrder = inOrder(stream); InOrder inOrder = inOrder(stream);
inOrder.verify(stream, calls(1)).setStatus(captor.capture(), notNull(Metadata.Trailers.class)); inOrder.verify(stream, calls(1)).setStatus(captor.capture(), notNull(Metadata.Trailers.class));
assertEquals(Transport.Code.UNAVAILABLE, captor.getValue().getCode()); assertEquals(Status.UNAVAILABLE.getCode(), captor.getValue().getCode());
} }
private void setMaxConcurrentStreams(int max) throws Exception { private void setMaxConcurrentStreams(int max) throws Exception {

View File

@ -14,7 +14,6 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http2.DefaultHttp2Headers; import io.netty.handler.codec.http2.DefaultHttp2Headers;
@ -73,7 +72,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
@Test @Test
public void setStatusWithErrorShouldCloseStream() { public void setStatusWithErrorShouldCloseStream() {
Status errorStatus = new Status(Transport.Code.INTERNAL); Status errorStatus = Status.INTERNAL;
stream().setStatus(errorStatus, new Metadata.Trailers()); stream().setStatus(errorStatus, new Metadata.Trailers());
verify(listener).closed(eq(errorStatus), any(Metadata.Trailers.class)); verify(listener).closed(eq(errorStatus), any(Metadata.Trailers.class));
assertEquals(StreamState.CLOSED, stream.state()); assertEquals(StreamState.CLOSED, stream.state());
@ -81,7 +80,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
@Test @Test
public void setStatusWithOkShouldNotOverrideError() { public void setStatusWithOkShouldNotOverrideError() {
Status errorStatus = new Status(Transport.Code.INTERNAL); Status errorStatus = Status.INTERNAL;
stream().setStatus(errorStatus, new Metadata.Trailers()); stream().setStatus(errorStatus, new Metadata.Trailers());
stream().setStatus(Status.OK, new Metadata.Trailers()); stream().setStatus(Status.OK, new Metadata.Trailers());
verify(listener).closed(any(Status.class), any(Metadata.Trailers.class)); verify(listener).closed(any(Status.class), any(Metadata.Trailers.class));
@ -90,7 +89,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
@Test @Test
public void setStatusWithErrorShouldNotOverridePreviousError() { public void setStatusWithErrorShouldNotOverridePreviousError() {
Status errorStatus = new Status(Transport.Code.INTERNAL); Status errorStatus = Status.INTERNAL;
stream().setStatus(errorStatus, new Metadata.Trailers()); stream().setStatus(errorStatus, new Metadata.Trailers());
stream().setStatus(Status.fromThrowable(new RuntimeException("fake")), stream().setStatus(Status.fromThrowable(new RuntimeException("fake")),
new Metadata.Trailers()); new Metadata.Trailers());
@ -115,10 +114,10 @@ public class NettyClientStreamTest extends NettyStreamTestBase {
// Receive headers first so that it's a valid GRPC response. // Receive headers first so that it's a valid GRPC response.
stream().inboundHeadersRecieved(grpcResponseHeaders(), false); stream().inboundHeadersRecieved(grpcResponseHeaders(), false);
stream.inboundDataReceived(statusFrame(new Status(Transport.Code.INTERNAL)), false); stream.inboundDataReceived(statusFrame(Status.INTERNAL), false);
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
verify(listener).closed(captor.capture(), any(Metadata.Trailers.class)); verify(listener).closed(captor.capture(), any(Metadata.Trailers.class));
assertEquals(Transport.Code.INTERNAL, captor.getValue().getCode()); assertEquals(Status.INTERNAL.getCode(), captor.getValue().getCode());
assertEquals(StreamState.CLOSED, stream.state()); assertEquals(StreamState.CLOSED, stream.state());
} }

View File

@ -12,7 +12,6 @@ import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ServerStreamListener; import com.google.net.stubby.newtransport.ServerStreamListener;
import com.google.net.stubby.newtransport.StreamState; import com.google.net.stubby.newtransport.StreamState;
import com.google.net.stubby.transport.Transport;
import io.netty.buffer.EmptyByteBuf; import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
@ -83,7 +82,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
@Test @Test
public void abortStreamAndSendStatus() throws Exception { public void abortStreamAndSendStatus() throws Exception {
Status status = new Status(Transport.Code.INTERNAL, new Throwable()); Status status = Status.INTERNAL.withCause(new Throwable());
stream().abortStream(status, true); stream().abortStream(status, true);
assertEquals(StreamState.CLOSED, stream.state()); assertEquals(StreamState.CLOSED, stream.state());
verify(serverListener).closed(same(status)); verify(serverListener).closed(same(status));
@ -93,7 +92,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
@Test @Test
public void abortStreamAndNotSendStatus() throws Exception { public void abortStreamAndNotSendStatus() throws Exception {
Status status = new Status(Transport.Code.INTERNAL, new Throwable()); Status status = Status.INTERNAL.withCause(new Throwable());
stream().abortStream(status, false); stream().abortStream(status, false);
assertEquals(StreamState.CLOSED, stream.state()); assertEquals(StreamState.CLOSED, stream.state());
verify(serverListener).closed(same(status)); verify(serverListener).closed(same(status));
@ -104,7 +103,7 @@ public class NettyServerStreamTest extends NettyStreamTestBase {
@Test @Test
public void abortStreamAfterClientHalfCloseShouldCallClose() { public void abortStreamAfterClientHalfCloseShouldCallClose() {
Status status = new Status(Transport.Code.INTERNAL, new Throwable()); Status status = Status.INTERNAL.withCause(new Throwable());
// Client half-closes. Listener gets halfClosed() // Client half-closes. Listener gets halfClosed()
stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true); stream().inboundDataReceived(new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT), true);
assertEquals(StreamState.WRITE_ONLY, stream.state()); assertEquals(StreamState.WRITE_ONLY, stream.state());

View File

@ -26,8 +26,11 @@ import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise; import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -37,10 +40,6 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import io.netty.handler.codec.http2.DefaultHttp2InboundFlowController;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
@ -157,7 +156,7 @@ public abstract class NettyStreamTestBase {
protected final ByteBuf statusFrame(Status status) throws Exception { protected final ByteBuf statusFrame(Status status) throws Exception {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(os); DataOutputStream dos = new DataOutputStream(os);
short code = (short) status.getCode().getNumber(); short code = (short) status.getCode().value();
dos.write(STATUS_FRAME); dos.write(STATUS_FRAME);
int length = 2; int length = 2;
dos.writeInt(length); dos.writeInt(length);

View File

@ -9,7 +9,6 @@ import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset; import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@ -20,8 +19,6 @@ import com.google.net.stubby.Status;
import com.google.net.stubby.newtransport.ClientStreamListener; import com.google.net.stubby.newtransport.ClientStreamListener;
import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.ClientFrameHandler; import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.ClientFrameHandler;
import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.OkHttpClientStream; import com.google.net.stubby.newtransport.okhttp.OkHttpClientTransport.OkHttpClientStream;
import com.google.net.stubby.transport.Transport;
import com.google.net.stubby.transport.Transport.Code;
import com.squareup.okhttp.internal.spdy.ErrorCode; import com.squareup.okhttp.internal.spdy.ErrorCode;
import com.squareup.okhttp.internal.spdy.FrameReader; import com.squareup.okhttp.internal.spdy.FrameReader;
@ -113,9 +110,9 @@ public class OkHttpClientTransportTest {
listener1.waitUntilStreamClosed(); listener1.waitUntilStreamClosed();
listener2.waitUntilStreamClosed(); listener2.waitUntilStreamClosed();
assertEquals(0, streams.size()); assertEquals(0, streams.size());
assertEquals(Code.INTERNAL, listener1.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode());
assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage()); assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage());
assertEquals(Code.INTERNAL, listener1.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode());
assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage()); assertEquals(NETWORK_ISSUE_MESSAGE, listener2.status.getCause().getMessage());
assertTrue("Service state: " + clientTransport.state(), assertTrue("Service state: " + clientTransport.state(),
Service.State.TERMINATED == clientTransport.state()); Service.State.TERMINATED == clientTransport.state());
@ -148,11 +145,11 @@ public class OkHttpClientTransportTest {
clientTransport.newStream(method,new Metadata.Headers(), listener); clientTransport.newStream(method,new Metadata.Headers(), listener);
assertTrue(streams.containsKey(3)); assertTrue(streams.containsKey(3));
BufferedSource source = mock(BufferedSource.class); BufferedSource source = mock(BufferedSource.class);
InputStream inputStream = createStatusFrame((short) Transport.Code.UNAVAILABLE.getNumber()); InputStream inputStream = createStatusFrame((short) Status.UNAVAILABLE.getCode().value());
when(source.inputStream()).thenReturn(inputStream); when(source.inputStream()).thenReturn(inputStream);
frameHandler.data(true, 3, source, inputStream.available()); frameHandler.data(true, 3, source, inputStream.available());
listener.waitUntilStreamClosed(); listener.waitUntilStreamClosed();
assertEquals(Transport.Code.UNAVAILABLE, listener.status.getCode()); assertEquals(Status.UNAVAILABLE.getCode(), listener.status.getCode());
} }
@Test @Test
@ -259,8 +256,8 @@ public class OkHttpClientTransportTest {
listener2.waitUntilStreamClosed(); listener2.waitUntilStreamClosed();
verify(frameWriter).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any()); verify(frameWriter).goAway(eq(0), eq(ErrorCode.NO_ERROR), (byte[]) any());
assertEquals(0, streams.size()); assertEquals(0, streams.size());
assertEquals(Code.INTERNAL, listener1.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener1.status.getCode());
assertEquals(Code.INTERNAL, listener2.status.getCode()); assertEquals(Status.INTERNAL.getCode(), listener2.status.getCode());
assertEquals(Service.State.TERMINATED, clientTransport.state()); assertEquals(Service.State.TERMINATED, clientTransport.state());
} }
@ -282,7 +279,7 @@ public class OkHttpClientTransportTest {
// Stream 2 should be closed. // Stream 2 should be closed.
listener2.waitUntilStreamClosed(); listener2.waitUntilStreamClosed();
assertEquals(1, streams.size()); assertEquals(1, streams.size());
assertEquals(Code.UNAVAILABLE, listener2.status.getCode()); assertEquals(Status.UNAVAILABLE.getCode(), listener2.status.getCode());
// New stream should be failed. // New stream should be failed.
MockStreamListener listener3 = new MockStreamListener(); MockStreamListener listener3 = new MockStreamListener();

View File

@ -60,7 +60,7 @@ public class MessageFramerTest {
new byte[]{0, 13}); // Internal==13 new byte[]{0, 13}); // Internal==13
CapturingSink sink = new CapturingSink(); CapturingSink sink = new CapturingSink();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
framer.writeStatus(new Status(Transport.Code.INTERNAL), (i % 17 == 11), sink); framer.writeStatus(Status.INTERNAL, (i % 17 == 11), sink);
if ((i + 1) % 13 == 0) { if ((i + 1) % 13 == 0) {
framer.flush(sink); framer.flush(sink);
} }

View File

@ -9,7 +9,6 @@ import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.net.stubby.Call; import com.google.net.stubby.Call;
import com.google.net.stubby.Metadata; import com.google.net.stubby.Metadata;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport;
import java.util.Iterator; import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
@ -249,7 +248,7 @@ public class Calls {
@Override @Override
public ListenableFuture<Void> onPayload(RespT value) { public ListenableFuture<Void> onPayload(RespT value) {
if (this.value != null) { if (this.value != null) {
throw new Status(Transport.Code.INTERNAL, "More than one value received for unary call") throw Status.INTERNAL.withDescription("More than one value received for unary call")
.asRuntimeException(); .asRuntimeException();
} }
this.value = value; this.value = value;
@ -262,7 +261,7 @@ public class Calls {
if (value == null) { if (value == null) {
// No value received so mark the future as an error // No value received so mark the future as an error
responseFuture.setException( responseFuture.setException(
new Status(Transport.Code.INTERNAL, "No value received for unary call") Status.INTERNAL.withDescription("No value received for unary call")
.asRuntimeException().fillInStackTrace()); .asRuntimeException().fillInStackTrace());
} }
responseFuture.set(value); responseFuture.set(value);

View File

@ -1,9 +1,8 @@
package com.google.net.stubby.stub; package com.google.net.stubby.stub;
import com.google.net.stubby.DeferredProtoInputStream;
import com.google.net.stubby.Marshaller; import com.google.net.stubby.Marshaller;
import com.google.net.stubby.Status; import com.google.net.stubby.Status;
import com.google.net.stubby.transport.Transport; import com.google.net.stubby.proto.DeferredProtoInputStream;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser; import com.google.protobuf.Parser;
@ -29,8 +28,8 @@ public class Marshallers {
try { try {
return parser.parseFrom(stream); return parser.parseFrom(stream);
} catch (InvalidProtocolBufferException ipbe) { } catch (InvalidProtocolBufferException ipbe) {
throw new Status(Transport.Code.INTERNAL, "Invalid protobuf byte sequence", ipbe) throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
.asRuntimeException(); .withCause(ipbe).asRuntimeException();
} }
} }
}; };

View File

@ -1,6 +1,6 @@
package com.google.net.stubby.stub; package com.google.net.stubby.stub;
import com.google.net.stubby.DeferredProtoInputStream; import com.google.net.stubby.proto.DeferredProtoInputStream;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLite;