diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java
index 41fabd1bda..27ec32ac3c 100644
--- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java
+++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java
@@ -30,7 +30,7 @@ import javax.annotation.Nullable;
/**
* An {@link InputStream} backed by a protobuf.
*/
-class ProtoInputStream extends InputStream implements Drainable, KnownLength {
+final class ProtoInputStream extends InputStream implements Drainable, KnownLength {
// ProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
@@ -39,7 +39,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength {
private final Parser> parser;
@Nullable private ByteArrayInputStream partial;
- public ProtoInputStream(MessageLite message, Parser> parser) {
+ ProtoInputStream(MessageLite message, Parser> parser) {
this.message = message;
this.parser = parser;
}
@@ -103,7 +103,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength {
}
@Override
- public int available() throws IOException {
+ public int available() {
if (message != null) {
return message.getSerializedSize();
} else if (partial != null) {
diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java
index 1f58095ac0..402105f327 100644
--- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java
+++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java
@@ -40,9 +40,10 @@ import java.lang.ref.WeakReference;
* Utility methods for using protobuf with grpc.
*/
@ExperimentalApi("Experimental until Lite is stable in protobuf")
-public class ProtoLiteUtils {
+public final class ProtoLiteUtils {
- private static volatile ExtensionRegistryLite globalRegistry =
+ // default visibility to avoid synthetic accessors
+ static volatile ExtensionRegistryLite globalRegistry =
ExtensionRegistryLite.getEmptyRegistry();
private static final int BUF_SIZE = 8192;
@@ -66,149 +67,31 @@ public class ProtoLiteUtils {
*
If you need custom parsing behavior for protos, you will need to make your own
* {@code MethodDescriptor.Marshaller} for the time being.
*
+ * @since 1.0.0
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1787")
public static void setExtensionRegistry(ExtensionRegistryLite newRegistry) {
globalRegistry = checkNotNull(newRegistry, "newRegistry");
}
- private static final ThreadLocal> bufs = new ThreadLocal>() {
-
- @Override
- protected Reference initialValue() {
- return new WeakReference(new byte[4096]); // Picked at random.
- }
- };
-
- /** Create a {@code Marshaller} for protos of the same type as {@code defaultInstance}. */
- public static Marshaller marshaller(final T defaultInstance) {
- @SuppressWarnings("unchecked")
- final Parser parser = (Parser) defaultInstance.getParserForType();
+ /**
+ * Creates a {@link Marshaller} for protos of the same type as {@code defaultInstance}.
+ *
+ * @since 1.0.0
+ */
+ public static Marshaller marshaller(T defaultInstance) {
// TODO(ejona): consider changing return type to PrototypeMarshaller (assuming ABI safe)
- return new PrototypeMarshaller() {
- @SuppressWarnings("unchecked")
- @Override
- public Class getMessageClass() {
- // Precisely T since protobuf doesn't let messages extend other messages.
- return (Class) defaultInstance.getClass();
- }
-
- @Override
- public T getMessagePrototype() {
- return defaultInstance;
- }
-
- @Override
- public InputStream stream(T value) {
- return new ProtoInputStream(value, parser);
- }
-
- @Override
- public T parse(InputStream stream) {
- if (stream instanceof ProtoInputStream) {
- ProtoInputStream protoStream = (ProtoInputStream) stream;
- // Optimization for in-memory transport. Returning provided object is safe since protobufs
- // are immutable.
- //
- // However, we can't assume the types match, so we have to verify the parser matches.
- // Today the parser is always the same for a given proto, but that isn't guaranteed. Even
- // if not, using the same MethodDescriptor would ensure the parser matches and permit us
- // to enable this optimization.
- if (protoStream.parser() == parser) {
- try {
- @SuppressWarnings("unchecked")
- T message = (T) ((ProtoInputStream) stream).message();
- return message;
- } catch (IllegalStateException ex) {
- // Stream must have been read from, which is a strange state. Since the point of this
- // optimization is to be transparent, instead of throwing an error we'll continue,
- // even though it seems likely there's a bug.
- }
- }
- }
- CodedInputStream cis = null;
- try {
- if (stream instanceof KnownLength) {
- int size = stream.available();
- if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
- // buf should not be used after this method has returned.
- byte[] buf = bufs.get().get();
- if (buf == null || buf.length < size) {
- buf = new byte[size];
- bufs.set(new WeakReference(buf));
- }
-
- int remaining = size;
- while (remaining > 0) {
- int position = size - remaining;
- int count = stream.read(buf, position, remaining);
- if (count == -1) {
- break;
- }
- remaining -= count;
- }
-
- if (remaining != 0) {
- int position = size - remaining;
- throw new RuntimeException("size inaccurate: " + size + " != " + position);
- }
- cis = CodedInputStream.newInstance(buf, 0, size);
- } else if (size == 0) {
- return defaultInstance;
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- if (cis == null) {
- cis = CodedInputStream.newInstance(stream);
- }
- // Pre-create the CodedInputStream so that we can remove the size limit restriction
- // when parsing.
- cis.setSizeLimit(Integer.MAX_VALUE);
-
- try {
- return parseFrom(cis);
- } catch (InvalidProtocolBufferException ipbe) {
- throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
- .withCause(ipbe).asRuntimeException();
- }
- }
-
- private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
- T message = parser.parseFrom(stream, globalRegistry);
- try {
- stream.checkLastTagWas(0);
- return message;
- } catch (InvalidProtocolBufferException e) {
- e.setUnfinishedMessage(message);
- throw e;
- }
- }
- };
+ return new MessageMarshaller(defaultInstance);
}
/**
* Produce a metadata marshaller for a protobuf type.
+ *
+ * @since 1.0.0
*/
public static Metadata.BinaryMarshaller metadataMarshaller(
- final T instance) {
- return new Metadata.BinaryMarshaller() {
- @Override
- public byte[] toBytes(T value) {
- return value.toByteArray();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public T parseBytes(byte[] serialized) {
- try {
- return (T) instance.getParserForType().parseFrom(serialized, globalRegistry);
- } catch (InvalidProtocolBufferException ipbe) {
- throw new IllegalArgumentException(ipbe);
- }
- }
- };
+ T defaultInstance) {
+ return new MetadataMarshaller(defaultInstance);
}
/** Copies the data from input stream to output stream. */
@@ -231,4 +114,145 @@ public class ProtoLiteUtils {
private ProtoLiteUtils() {
}
+
+ private static final class MessageMarshaller
+ implements PrototypeMarshaller {
+ private static final ThreadLocal> bufs = new ThreadLocal>();
+
+ private final Parser parser;
+ private final T defaultInstance;
+
+ @SuppressWarnings("unchecked")
+ MessageMarshaller(T defaultInstance) {
+ this.defaultInstance = defaultInstance;
+ parser = (Parser) defaultInstance.getParserForType();
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Class getMessageClass() {
+ // Precisely T since protobuf doesn't let messages extend other messages.
+ return (Class) defaultInstance.getClass();
+ }
+
+ @Override
+ public T getMessagePrototype() {
+ return defaultInstance;
+ }
+
+ @Override
+ public InputStream stream(T value) {
+ return new ProtoInputStream(value, parser);
+ }
+
+ @Override
+ public T parse(InputStream stream) {
+ if (stream instanceof ProtoInputStream) {
+ ProtoInputStream protoStream = (ProtoInputStream) stream;
+ // Optimization for in-memory transport. Returning provided object is safe since protobufs
+ // are immutable.
+ //
+ // However, we can't assume the types match, so we have to verify the parser matches.
+ // Today the parser is always the same for a given proto, but that isn't guaranteed. Even
+ // if not, using the same MethodDescriptor would ensure the parser matches and permit us
+ // to enable this optimization.
+ if (protoStream.parser() == parser) {
+ try {
+ @SuppressWarnings("unchecked")
+ T message = (T) ((ProtoInputStream) stream).message();
+ return message;
+ } catch (IllegalStateException ex) {
+ // Stream must have been read from, which is a strange state. Since the point of this
+ // optimization is to be transparent, instead of throwing an error we'll continue,
+ // even though it seems likely there's a bug.
+ }
+ }
+ }
+ CodedInputStream cis = null;
+ try {
+ if (stream instanceof KnownLength) {
+ int size = stream.available();
+ if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) {
+ Reference ref;
+ // buf should not be used after this method has returned.
+ byte[] buf;
+ if ((ref = bufs.get()) == null || (buf = ref.get()) == null || buf.length < size) {
+ buf = new byte[size];
+ bufs.set(new WeakReference(buf));
+ }
+
+ int remaining = size;
+ while (remaining > 0) {
+ int position = size - remaining;
+ int count = stream.read(buf, position, remaining);
+ if (count == -1) {
+ break;
+ }
+ remaining -= count;
+ }
+
+ if (remaining != 0) {
+ int position = size - remaining;
+ throw new RuntimeException("size inaccurate: " + size + " != " + position);
+ }
+ cis = CodedInputStream.newInstance(buf, 0, size);
+ } else if (size == 0) {
+ return defaultInstance;
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ if (cis == null) {
+ cis = CodedInputStream.newInstance(stream);
+ }
+ // Pre-create the CodedInputStream so that we can remove the size limit restriction
+ // when parsing.
+ cis.setSizeLimit(Integer.MAX_VALUE);
+
+ try {
+ return parseFrom(cis);
+ } catch (InvalidProtocolBufferException ipbe) {
+ throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence")
+ .withCause(ipbe).asRuntimeException();
+ }
+ }
+
+ private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
+ T message = parser.parseFrom(stream, globalRegistry);
+ try {
+ stream.checkLastTagWas(0);
+ return message;
+ } catch (InvalidProtocolBufferException e) {
+ e.setUnfinishedMessage(message);
+ throw e;
+ }
+ }
+ }
+
+ private static final class MetadataMarshaller
+ implements Metadata.BinaryMarshaller {
+
+ private final T defaultInstance;
+
+ MetadataMarshaller(T defaultInstance) {
+ this.defaultInstance = defaultInstance;
+ }
+
+ @Override
+ public byte[] toBytes(T value) {
+ return value.toByteArray();
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public T parseBytes(byte[] serialized) {
+ try {
+ return (T) defaultInstance.getParserForType().parseFrom(serialized, globalRegistry);
+ } catch (InvalidProtocolBufferException ipbe) {
+ throw new IllegalArgumentException(ipbe);
+ }
+ }
+ }
}
diff --git a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java
index a5b2078018..ce2a9b576c 100644
--- a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java
+++ b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java
@@ -22,6 +22,8 @@ import com.google.protobuf.nano.MessageNano;
* Produce new message instances. Used by a marshaller to deserialize incoming messages.
*
* Should be implemented by generated code.
+ *
+ * @since 1.0.0
*/
public interface MessageNanoFactory {
T newInstance();
diff --git a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java
index 69ea3002f4..a684649f5b 100644
--- a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java
+++ b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java
@@ -27,7 +27,7 @@ import javax.annotation.Nullable;
/**
* An {@link InputStream} backed by a nano proto.
*/
-class NanoProtoInputStream extends InputStream implements KnownLength {
+final class NanoProtoInputStream extends InputStream implements KnownLength {
// NanoProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
@@ -35,7 +35,7 @@ class NanoProtoInputStream extends InputStream implements KnownLength {
@Nullable private MessageNano message;
@Nullable private ByteArrayInputStream partial;
- public NanoProtoInputStream(MessageNano message) {
+ NanoProtoInputStream(MessageNano message) {
this.message = message;
}
@@ -84,7 +84,7 @@ class NanoProtoInputStream extends InputStream implements KnownLength {
}
@Override
- public int available() throws IOException {
+ public int available() {
if (message != null) {
return message.getSerializedSize();
} else if (partial != null) {
diff --git a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java
index 2a34c7b467..2195707e95 100644
--- a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java
+++ b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java
@@ -30,60 +30,71 @@ import java.io.OutputStream;
/**
* Utility methods for using nano proto with grpc.
*/
-public class NanoUtils {
-
- private static final int BUF_SIZE = 8192;
+public final class NanoUtils {
private NanoUtils() {}
- /** Adapt {@code parser} to a {@code Marshaller}. */
- public static Marshaller marshaller(
- final MessageNanoFactory factory) {
- return new Marshaller() {
- @Override
- public InputStream stream(T value) {
- return new NanoProtoInputStream(value);
- }
-
- @Override
- public T parse(InputStream stream) {
- try {
- // TODO(simonma): Investigate whether we can do 0-copy here.
- CodedInputByteBufferNano input =
- CodedInputByteBufferNano.newInstance(toByteArray(stream));
- input.setSizeLimit(Integer.MAX_VALUE);
- T message = factory.newInstance();
- message.mergeFrom(input);
- return message;
- } catch (IOException ipbe) {
- throw Status.INTERNAL.withDescription("Failed parsing nano proto message").withCause(ipbe)
- .asRuntimeException();
- }
- }
- };
+ /**
+ * Adapt {@code parser} to a {@link Marshaller}.
+ *
+ * @since 1.0.0
+ */
+ public static Marshaller marshaller(MessageNanoFactory factory) {
+ return new MessageMarshaller(factory);
}
- // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
- private static byte[] toByteArray(InputStream in) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- copy(in, out);
- return out.toByteArray();
- }
+ private static final class MessageMarshaller implements Marshaller {
+ private static final int BUF_SIZE = 8192;
- // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
- private static long copy(InputStream from, OutputStream to) throws IOException {
- checkNotNull(from);
- checkNotNull(to);
- byte[] buf = new byte[BUF_SIZE];
- long total = 0;
- while (true) {
- int r = from.read(buf);
- if (r == -1) {
- break;
- }
- to.write(buf, 0, r);
- total += r;
+ private final MessageNanoFactory factory;
+
+ MessageMarshaller(MessageNanoFactory factory) {
+ this.factory = factory;
+ }
+
+ @Override
+ public InputStream stream(T value) {
+ return new NanoProtoInputStream(value);
+ }
+
+ @Override
+ public T parse(InputStream stream) {
+ try {
+ // TODO(simonma): Investigate whether we can do 0-copy here.
+ CodedInputByteBufferNano input =
+ CodedInputByteBufferNano.newInstance(toByteArray(stream));
+ input.setSizeLimit(Integer.MAX_VALUE);
+ T message = factory.newInstance();
+ message.mergeFrom(input);
+ return message;
+ } catch (IOException ipbe) {
+ throw Status.INTERNAL.withDescription("Failed parsing nano proto message").withCause(ipbe)
+ .asRuntimeException();
+ }
+ }
+
+ // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
+ private static byte[] toByteArray(InputStream in) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ copy(in, out);
+ return out.toByteArray();
+ }
+
+ // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta)
+ private static long copy(InputStream from, OutputStream to) throws IOException {
+ checkNotNull(from);
+ checkNotNull(to);
+ byte[] buf = new byte[BUF_SIZE];
+ long total = 0;
+ while (true) {
+ int r = from.read(buf);
+ if (r == -1) {
+ break;
+ }
+ to.write(buf, 0, r);
+ total += r;
+ }
+ return total;
}
- return total;
}
}
diff --git a/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java b/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java
index fc8f99a3fd..b87cf333ce 100644
--- a/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java
+++ b/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java
@@ -20,6 +20,8 @@ import com.google.protobuf.Descriptors.FileDescriptor;
/**
* Provides access to the underlying proto file descriptor.
+ *
+ * @since 1.1.0
*/
public interface ProtoFileDescriptorSupplier {
/**
diff --git a/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java b/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java
index 21ea6f0201..a230d5043f 100644
--- a/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java
+++ b/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java
@@ -25,15 +25,21 @@ import io.grpc.protobuf.lite.ProtoLiteUtils;
/**
* Utility methods for using protobuf with grpc.
*/
-public class ProtoUtils {
+public final class ProtoUtils {
- /** Create a {@code Marshaller} for protos of the same type as {@code defaultInstance}. */
+ /**
+ * Create a {@link Marshaller} for protos of the same type as {@code defaultInstance}.
+ *
+ * @since 1.0.0
+ */
public static Marshaller marshaller(final T defaultInstance) {
return ProtoLiteUtils.marshaller(defaultInstance);
}
/**
* Produce a metadata key for a generated protobuf type.
+ *
+ * @since 1.0.0
*/
public static Metadata.Key keyForProto(T instance) {
return Metadata.Key.of(