From 13ca42aff69e934086055f0c3a73772987d9686c Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Tue, 19 Jun 2018 15:30:12 -0700 Subject: [PATCH] protobuf{,lite,nano}: make more classes final - Split anonymous classes in named and final classes - Fix some Javadocs and annotate when things were added. --- .../grpc/protobuf/lite/ProtoInputStream.java | 6 +- .../io/grpc/protobuf/lite/ProtoLiteUtils.java | 288 ++++++++++-------- .../protobuf/nano/MessageNanoFactory.java | 2 + .../protobuf/nano/NanoProtoInputStream.java | 6 +- .../java/io/grpc/protobuf/nano/NanoUtils.java | 107 ++++--- .../protobuf/ProtoFileDescriptorSupplier.java | 2 + .../java/io/grpc/protobuf/ProtoUtils.java | 10 +- 7 files changed, 233 insertions(+), 188 deletions(-) diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java index 41fabd1bda..27ec32ac3c 100644 --- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java +++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoInputStream.java @@ -30,7 +30,7 @@ import javax.annotation.Nullable; /** * An {@link InputStream} backed by a protobuf. */ -class ProtoInputStream extends InputStream implements Drainable, KnownLength { +final class ProtoInputStream extends InputStream implements Drainable, KnownLength { // ProtoInputStream is first initialized with a *message*. *partial* is initially null. // Once there has been a read operation on this stream, *message* is serialized to *partial* and @@ -39,7 +39,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength { private final Parser parser; @Nullable private ByteArrayInputStream partial; - public ProtoInputStream(MessageLite message, Parser parser) { + ProtoInputStream(MessageLite message, Parser parser) { this.message = message; this.parser = parser; } @@ -103,7 +103,7 @@ class ProtoInputStream extends InputStream implements Drainable, KnownLength { } @Override - public int available() throws IOException { + public int available() { if (message != null) { return message.getSerializedSize(); } else if (partial != null) { diff --git a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java index 1f58095ac0..402105f327 100644 --- a/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java +++ b/protobuf-lite/src/main/java/io/grpc/protobuf/lite/ProtoLiteUtils.java @@ -40,9 +40,10 @@ import java.lang.ref.WeakReference; * Utility methods for using protobuf with grpc. */ @ExperimentalApi("Experimental until Lite is stable in protobuf") -public class ProtoLiteUtils { +public final class ProtoLiteUtils { - private static volatile ExtensionRegistryLite globalRegistry = + // default visibility to avoid synthetic accessors + static volatile ExtensionRegistryLite globalRegistry = ExtensionRegistryLite.getEmptyRegistry(); private static final int BUF_SIZE = 8192; @@ -66,149 +67,31 @@ public class ProtoLiteUtils { *

If you need custom parsing behavior for protos, you will need to make your own * {@code MethodDescriptor.Marshaller} for the time being. * + * @since 1.0.0 */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/1787") public static void setExtensionRegistry(ExtensionRegistryLite newRegistry) { globalRegistry = checkNotNull(newRegistry, "newRegistry"); } - private static final ThreadLocal> bufs = new ThreadLocal>() { - - @Override - protected Reference initialValue() { - return new WeakReference(new byte[4096]); // Picked at random. - } - }; - - /** Create a {@code Marshaller} for protos of the same type as {@code defaultInstance}. */ - public static Marshaller marshaller(final T defaultInstance) { - @SuppressWarnings("unchecked") - final Parser parser = (Parser) defaultInstance.getParserForType(); + /** + * Creates a {@link Marshaller} for protos of the same type as {@code defaultInstance}. + * + * @since 1.0.0 + */ + public static Marshaller marshaller(T defaultInstance) { // TODO(ejona): consider changing return type to PrototypeMarshaller (assuming ABI safe) - return new PrototypeMarshaller() { - @SuppressWarnings("unchecked") - @Override - public Class getMessageClass() { - // Precisely T since protobuf doesn't let messages extend other messages. - return (Class) defaultInstance.getClass(); - } - - @Override - public T getMessagePrototype() { - return defaultInstance; - } - - @Override - public InputStream stream(T value) { - return new ProtoInputStream(value, parser); - } - - @Override - public T parse(InputStream stream) { - if (stream instanceof ProtoInputStream) { - ProtoInputStream protoStream = (ProtoInputStream) stream; - // Optimization for in-memory transport. Returning provided object is safe since protobufs - // are immutable. - // - // However, we can't assume the types match, so we have to verify the parser matches. - // Today the parser is always the same for a given proto, but that isn't guaranteed. Even - // if not, using the same MethodDescriptor would ensure the parser matches and permit us - // to enable this optimization. - if (protoStream.parser() == parser) { - try { - @SuppressWarnings("unchecked") - T message = (T) ((ProtoInputStream) stream).message(); - return message; - } catch (IllegalStateException ex) { - // Stream must have been read from, which is a strange state. Since the point of this - // optimization is to be transparent, instead of throwing an error we'll continue, - // even though it seems likely there's a bug. - } - } - } - CodedInputStream cis = null; - try { - if (stream instanceof KnownLength) { - int size = stream.available(); - if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) { - // buf should not be used after this method has returned. - byte[] buf = bufs.get().get(); - if (buf == null || buf.length < size) { - buf = new byte[size]; - bufs.set(new WeakReference(buf)); - } - - int remaining = size; - while (remaining > 0) { - int position = size - remaining; - int count = stream.read(buf, position, remaining); - if (count == -1) { - break; - } - remaining -= count; - } - - if (remaining != 0) { - int position = size - remaining; - throw new RuntimeException("size inaccurate: " + size + " != " + position); - } - cis = CodedInputStream.newInstance(buf, 0, size); - } else if (size == 0) { - return defaultInstance; - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - if (cis == null) { - cis = CodedInputStream.newInstance(stream); - } - // Pre-create the CodedInputStream so that we can remove the size limit restriction - // when parsing. - cis.setSizeLimit(Integer.MAX_VALUE); - - try { - return parseFrom(cis); - } catch (InvalidProtocolBufferException ipbe) { - throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence") - .withCause(ipbe).asRuntimeException(); - } - } - - private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException { - T message = parser.parseFrom(stream, globalRegistry); - try { - stream.checkLastTagWas(0); - return message; - } catch (InvalidProtocolBufferException e) { - e.setUnfinishedMessage(message); - throw e; - } - } - }; + return new MessageMarshaller(defaultInstance); } /** * Produce a metadata marshaller for a protobuf type. + * + * @since 1.0.0 */ public static Metadata.BinaryMarshaller metadataMarshaller( - final T instance) { - return new Metadata.BinaryMarshaller() { - @Override - public byte[] toBytes(T value) { - return value.toByteArray(); - } - - @Override - @SuppressWarnings("unchecked") - public T parseBytes(byte[] serialized) { - try { - return (T) instance.getParserForType().parseFrom(serialized, globalRegistry); - } catch (InvalidProtocolBufferException ipbe) { - throw new IllegalArgumentException(ipbe); - } - } - }; + T defaultInstance) { + return new MetadataMarshaller(defaultInstance); } /** Copies the data from input stream to output stream. */ @@ -231,4 +114,145 @@ public class ProtoLiteUtils { private ProtoLiteUtils() { } + + private static final class MessageMarshaller + implements PrototypeMarshaller { + private static final ThreadLocal> bufs = new ThreadLocal>(); + + private final Parser parser; + private final T defaultInstance; + + @SuppressWarnings("unchecked") + MessageMarshaller(T defaultInstance) { + this.defaultInstance = defaultInstance; + parser = (Parser) defaultInstance.getParserForType(); + } + + + @SuppressWarnings("unchecked") + @Override + public Class getMessageClass() { + // Precisely T since protobuf doesn't let messages extend other messages. + return (Class) defaultInstance.getClass(); + } + + @Override + public T getMessagePrototype() { + return defaultInstance; + } + + @Override + public InputStream stream(T value) { + return new ProtoInputStream(value, parser); + } + + @Override + public T parse(InputStream stream) { + if (stream instanceof ProtoInputStream) { + ProtoInputStream protoStream = (ProtoInputStream) stream; + // Optimization for in-memory transport. Returning provided object is safe since protobufs + // are immutable. + // + // However, we can't assume the types match, so we have to verify the parser matches. + // Today the parser is always the same for a given proto, but that isn't guaranteed. Even + // if not, using the same MethodDescriptor would ensure the parser matches and permit us + // to enable this optimization. + if (protoStream.parser() == parser) { + try { + @SuppressWarnings("unchecked") + T message = (T) ((ProtoInputStream) stream).message(); + return message; + } catch (IllegalStateException ex) { + // Stream must have been read from, which is a strange state. Since the point of this + // optimization is to be transparent, instead of throwing an error we'll continue, + // even though it seems likely there's a bug. + } + } + } + CodedInputStream cis = null; + try { + if (stream instanceof KnownLength) { + int size = stream.available(); + if (size > 0 && size <= DEFAULT_MAX_MESSAGE_SIZE) { + Reference ref; + // buf should not be used after this method has returned. + byte[] buf; + if ((ref = bufs.get()) == null || (buf = ref.get()) == null || buf.length < size) { + buf = new byte[size]; + bufs.set(new WeakReference(buf)); + } + + int remaining = size; + while (remaining > 0) { + int position = size - remaining; + int count = stream.read(buf, position, remaining); + if (count == -1) { + break; + } + remaining -= count; + } + + if (remaining != 0) { + int position = size - remaining; + throw new RuntimeException("size inaccurate: " + size + " != " + position); + } + cis = CodedInputStream.newInstance(buf, 0, size); + } else if (size == 0) { + return defaultInstance; + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (cis == null) { + cis = CodedInputStream.newInstance(stream); + } + // Pre-create the CodedInputStream so that we can remove the size limit restriction + // when parsing. + cis.setSizeLimit(Integer.MAX_VALUE); + + try { + return parseFrom(cis); + } catch (InvalidProtocolBufferException ipbe) { + throw Status.INTERNAL.withDescription("Invalid protobuf byte sequence") + .withCause(ipbe).asRuntimeException(); + } + } + + private T parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException { + T message = parser.parseFrom(stream, globalRegistry); + try { + stream.checkLastTagWas(0); + return message; + } catch (InvalidProtocolBufferException e) { + e.setUnfinishedMessage(message); + throw e; + } + } + } + + private static final class MetadataMarshaller + implements Metadata.BinaryMarshaller { + + private final T defaultInstance; + + MetadataMarshaller(T defaultInstance) { + this.defaultInstance = defaultInstance; + } + + @Override + public byte[] toBytes(T value) { + return value.toByteArray(); + } + + @Override + @SuppressWarnings("unchecked") + public T parseBytes(byte[] serialized) { + try { + return (T) defaultInstance.getParserForType().parseFrom(serialized, globalRegistry); + } catch (InvalidProtocolBufferException ipbe) { + throw new IllegalArgumentException(ipbe); + } + } + } } diff --git a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java index a5b2078018..ce2a9b576c 100644 --- a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java +++ b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/MessageNanoFactory.java @@ -22,6 +22,8 @@ import com.google.protobuf.nano.MessageNano; * Produce new message instances. Used by a marshaller to deserialize incoming messages. * *

Should be implemented by generated code. + * + * @since 1.0.0 */ public interface MessageNanoFactory { T newInstance(); diff --git a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java index 69ea3002f4..a684649f5b 100644 --- a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java +++ b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoProtoInputStream.java @@ -27,7 +27,7 @@ import javax.annotation.Nullable; /** * An {@link InputStream} backed by a nano proto. */ -class NanoProtoInputStream extends InputStream implements KnownLength { +final class NanoProtoInputStream extends InputStream implements KnownLength { // NanoProtoInputStream is first initialized with a *message*. *partial* is initially null. // Once there has been a read operation on this stream, *message* is serialized to *partial* and @@ -35,7 +35,7 @@ class NanoProtoInputStream extends InputStream implements KnownLength { @Nullable private MessageNano message; @Nullable private ByteArrayInputStream partial; - public NanoProtoInputStream(MessageNano message) { + NanoProtoInputStream(MessageNano message) { this.message = message; } @@ -84,7 +84,7 @@ class NanoProtoInputStream extends InputStream implements KnownLength { } @Override - public int available() throws IOException { + public int available() { if (message != null) { return message.getSerializedSize(); } else if (partial != null) { diff --git a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java index 2a34c7b467..2195707e95 100644 --- a/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java +++ b/protobuf-nano/src/main/java/io/grpc/protobuf/nano/NanoUtils.java @@ -30,60 +30,71 @@ import java.io.OutputStream; /** * Utility methods for using nano proto with grpc. */ -public class NanoUtils { - - private static final int BUF_SIZE = 8192; +public final class NanoUtils { private NanoUtils() {} - /** Adapt {@code parser} to a {@code Marshaller}. */ - public static Marshaller marshaller( - final MessageNanoFactory factory) { - return new Marshaller() { - @Override - public InputStream stream(T value) { - return new NanoProtoInputStream(value); - } - - @Override - public T parse(InputStream stream) { - try { - // TODO(simonma): Investigate whether we can do 0-copy here. - CodedInputByteBufferNano input = - CodedInputByteBufferNano.newInstance(toByteArray(stream)); - input.setSizeLimit(Integer.MAX_VALUE); - T message = factory.newInstance(); - message.mergeFrom(input); - return message; - } catch (IOException ipbe) { - throw Status.INTERNAL.withDescription("Failed parsing nano proto message").withCause(ipbe) - .asRuntimeException(); - } - } - }; + /** + * Adapt {@code parser} to a {@link Marshaller}. + * + * @since 1.0.0 + */ + public static Marshaller marshaller(MessageNanoFactory factory) { + return new MessageMarshaller(factory); } - // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) - private static byte[] toByteArray(InputStream in) throws IOException { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - copy(in, out); - return out.toByteArray(); - } + private static final class MessageMarshaller implements Marshaller { + private static final int BUF_SIZE = 8192; - // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) - private static long copy(InputStream from, OutputStream to) throws IOException { - checkNotNull(from); - checkNotNull(to); - byte[] buf = new byte[BUF_SIZE]; - long total = 0; - while (true) { - int r = from.read(buf); - if (r == -1) { - break; - } - to.write(buf, 0, r); - total += r; + private final MessageNanoFactory factory; + + MessageMarshaller(MessageNanoFactory factory) { + this.factory = factory; + } + + @Override + public InputStream stream(T value) { + return new NanoProtoInputStream(value); + } + + @Override + public T parse(InputStream stream) { + try { + // TODO(simonma): Investigate whether we can do 0-copy here. + CodedInputByteBufferNano input = + CodedInputByteBufferNano.newInstance(toByteArray(stream)); + input.setSizeLimit(Integer.MAX_VALUE); + T message = factory.newInstance(); + message.mergeFrom(input); + return message; + } catch (IOException ipbe) { + throw Status.INTERNAL.withDescription("Failed parsing nano proto message").withCause(ipbe) + .asRuntimeException(); + } + } + + // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) + private static byte[] toByteArray(InputStream in) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + copy(in, out); + return out.toByteArray(); + } + + // Copied from guava com.google.common.io.ByteStreams because its API is unstable (beta) + private static long copy(InputStream from, OutputStream to) throws IOException { + checkNotNull(from); + checkNotNull(to); + byte[] buf = new byte[BUF_SIZE]; + long total = 0; + while (true) { + int r = from.read(buf); + if (r == -1) { + break; + } + to.write(buf, 0, r); + total += r; + } + return total; } - return total; } } diff --git a/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java b/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java index fc8f99a3fd..b87cf333ce 100644 --- a/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java +++ b/protobuf/src/main/java/io/grpc/protobuf/ProtoFileDescriptorSupplier.java @@ -20,6 +20,8 @@ import com.google.protobuf.Descriptors.FileDescriptor; /** * Provides access to the underlying proto file descriptor. + * + * @since 1.1.0 */ public interface ProtoFileDescriptorSupplier { /** diff --git a/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java b/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java index 21ea6f0201..a230d5043f 100644 --- a/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java +++ b/protobuf/src/main/java/io/grpc/protobuf/ProtoUtils.java @@ -25,15 +25,21 @@ import io.grpc.protobuf.lite.ProtoLiteUtils; /** * Utility methods for using protobuf with grpc. */ -public class ProtoUtils { +public final class ProtoUtils { - /** Create a {@code Marshaller} for protos of the same type as {@code defaultInstance}. */ + /** + * Create a {@link Marshaller} for protos of the same type as {@code defaultInstance}. + * + * @since 1.0.0 + */ public static Marshaller marshaller(final T defaultInstance) { return ProtoLiteUtils.marshaller(defaultInstance); } /** * Produce a metadata key for a generated protobuf type. + * + * @since 1.0.0 */ public static Metadata.Key keyForProto(T instance) { return Metadata.Key.of(