Implement JsonSerializer (#3587)

* Extract trace / span ID serialization logic to Serializer to allow changing for JSON.

* Implement JsonSerializer

* Drift
This commit is contained in:
Anuraag Agrawal 2021-09-13 15:25:25 +09:00 committed by GitHub
parent 12225fecaa
commit 39f3362e43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 503 additions and 88 deletions

View File

@ -23,6 +23,8 @@ dependencies {
// contains internal code.
compileOnly(project(":proto"))
compileOnly("com.fasterxml.jackson.core:jackson-core")
// Similar to above note about :proto, we include helpers shared by gRPC or okhttp exporters but
// do not want to impose these dependency on all of our consumers.
compileOnly("com.squareup.okhttp3:okhttp")
@ -36,6 +38,9 @@ dependencies {
testImplementation(project(":proto"))
testImplementation(project(":sdk:testing"))
testImplementation("com.fasterxml.jackson.core:jackson-core")
testImplementation("com.google.protobuf:protobuf-java-util")
testImplementation("org.jeasy:easy-random-randomizers")
testImplementation("com.google.api.grpc:proto-google-common-protos")
@ -45,7 +50,10 @@ dependencies {
jmhImplementation(project(":proto"))
jmhImplementation(project(":sdk:testing"))
jmhImplementation(project(":sdk-extensions:resources"))
jmhImplementation("io.grpc:grpc-api")
jmhImplementation("com.fasterxml.jackson.core:jackson-core")
jmhImplementation("org.curioswitch.curiostack:protobuf-jackson")
jmhImplementation("com.google.protobuf:protobuf-java-util")
jmhRuntimeOnly("io.grpc:grpc-netty")
}
wire {

View File

@ -3,12 +3,11 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.trace;
package io.opentelemetry.exporter.otlp.internal;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.PooledByteBufAllocator;
import io.opentelemetry.exporter.otlp.internal.TraceRequestMarshaler;
import io.opentelemetry.exporter.otlp.internal.grpc.MarshalerInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

View File

@ -3,14 +3,16 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.trace;
package io.opentelemetry.exporter.otlp.internal;
import io.opentelemetry.exporter.otlp.internal.SpanAdapter;
import io.opentelemetry.exporter.otlp.internal.TraceRequestMarshaler;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.curioswitch.common.protobuf.json.MessageMarshaller;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -27,6 +29,12 @@ import org.openjdk.jmh.annotations.Warmup;
@Fork(1)
public class RequestMarshalBenchmarks {
private static final MessageMarshaller BYTEBUDDY_MARSHALLER =
MessageMarshaller.builder()
.register(ExportTraceServiceRequest.class)
.omittingInsignificantWhitespace(true)
.build();
@Benchmark
@Threads(1)
public ByteArrayOutputStream createProtoMarshal(RequestMarshalState state) {
@ -65,4 +73,41 @@ public class RequestMarshalBenchmarks {
requestMarshaler.writeBinaryTo(customOutput);
return customOutput;
}
@Benchmark
@Threads(1)
public ByteArrayOutputStream marshalJson(RequestMarshalState state) throws IOException {
TraceRequestMarshaler requestMarshaler = TraceRequestMarshaler.create(state.spanDataList);
ByteArrayOutputStream customOutput = new ByteArrayOutputStream();
requestMarshaler.writeJsonTo(customOutput);
return customOutput;
}
@Benchmark
@Threads(1)
public ByteArrayOutputStream marshalJsonProtoByteBuddy(RequestMarshalState state)
throws IOException {
ExportTraceServiceRequest protoRequest =
ExportTraceServiceRequest.newBuilder()
.addAllResourceSpans(SpanAdapter.toProtoResourceSpans(state.spanDataList))
.build();
ByteArrayOutputStream customOutput = new ByteArrayOutputStream();
BYTEBUDDY_MARSHALLER.writeValue(protoRequest, customOutput);
return customOutput;
}
@Benchmark
@Threads(1)
public ByteArrayOutputStream marshalJsonProtoReflection(RequestMarshalState state)
throws IOException {
ExportTraceServiceRequest protoRequest =
ExportTraceServiceRequest.newBuilder()
.addAllResourceSpans(SpanAdapter.toProtoResourceSpans(state.spanDataList))
.build();
ByteArrayOutputStream customOutput = new ByteArrayOutputStream();
JsonFormat.printer()
.omittingInsignificantWhitespace()
.appendTo(protoRequest, new OutputStreamWriter(customOutput, StandardCharsets.UTF_8));
return customOutput;
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.trace;
package io.opentelemetry.exporter.otlp.internal;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;

View File

@ -18,7 +18,8 @@ final class InstrumentationLibraryMarshaler extends MarshalerWithSize {
InstrumentationLibraryInfo, InstrumentationLibraryMarshaler>
LIBRARY_MARSHALER_CACHE = new WeakConcurrentMap.WithInlinedExpunction<>();
private final byte[] serializedInfo;
private final byte[] serializedBinary;
private final String serializedJson;
static InstrumentationLibraryMarshaler create(InstrumentationLibraryInfo libraryInfo) {
InstrumentationLibraryMarshaler cached = LIBRARY_MARSHALER_CACHE.get(libraryInfo);
@ -27,33 +28,59 @@ final class InstrumentationLibraryMarshaler extends MarshalerWithSize {
// a few times until the cache gets filled which is fine.
byte[] name = MarshalerUtil.toBytes(libraryInfo.getName());
byte[] version = MarshalerUtil.toBytes(libraryInfo.getVersion());
cached = new InstrumentationLibraryMarshaler(name, version);
RealInstrumentationLibraryMarshaler realMarshaler =
new RealInstrumentationLibraryMarshaler(name, version);
ByteArrayOutputStream binaryBos =
new ByteArrayOutputStream(realMarshaler.getBinarySerializedSize());
try {
realMarshaler.writeBinaryTo(binaryBos);
} catch (IOException e) {
throw new UncheckedIOException(
"Serialization error, this is likely a bug in OpenTelemetry.", e);
}
String json = MarshalerUtil.preserializeJsonFields(realMarshaler);
cached = new InstrumentationLibraryMarshaler(binaryBos.toByteArray(), json);
LIBRARY_MARSHALER_CACHE.put(libraryInfo, cached);
}
return cached;
}
private InstrumentationLibraryMarshaler(byte[] name, byte[] version) {
super(computeSize(name, version));
ByteArrayOutputStream bos = new ByteArrayOutputStream(getBinarySerializedSize());
try (ProtoSerializer serializer = new ProtoSerializer(bos)) {
serializer.serializeString(InstrumentationLibrary.NAME, name);
serializer.serializeString(InstrumentationLibrary.VERSION, version);
} catch (IOException e) {
// Presized so can't happen (we would have already thrown OutOfMemoryError)
throw new UncheckedIOException(e);
}
serializedInfo = bos.toByteArray();
private InstrumentationLibraryMarshaler(byte[] binary, String json) {
super(binary.length);
serializedBinary = binary;
serializedJson = json;
}
@Override
public void writeTo(Serializer output) throws IOException {
// TODO(anuraaga): Preserialize JSON as well.
output.writeSerializedMessage(serializedInfo, MarshalerUtil.EMPTY_BYTES);
output.writeSerializedMessage(serializedBinary, serializedJson);
}
private static int computeSize(byte[] name, byte[] version) {
return MarshalerUtil.sizeBytes(InstrumentationLibrary.NAME, name)
+ MarshalerUtil.sizeBytes(InstrumentationLibrary.VERSION, version);
private static final class RealInstrumentationLibraryMarshaler extends MarshalerWithSize {
private final byte[] name;
private final byte[] version;
RealInstrumentationLibraryMarshaler(byte[] name, byte[] version) {
super(computeSize(name, version));
this.name = name;
this.version = version;
}
@Override
void writeTo(Serializer output) throws IOException {
output.serializeString(InstrumentationLibrary.NAME, name);
output.serializeString(InstrumentationLibrary.VERSION, version);
}
private static int computeSize(byte[] name, byte[] version) {
return MarshalerUtil.sizeBytes(InstrumentationLibrary.NAME, name)
+ MarshalerUtil.sizeBytes(InstrumentationLibrary.VERSION, version);
}
}
}

View File

@ -0,0 +1,143 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.otlp.internal;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
final class JsonSerializer extends Serializer {
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private final JsonGenerator generator;
JsonSerializer(OutputStream output) throws IOException {
generator = JSON_FACTORY.createGenerator(output);
}
@Override
protected void writeTraceId(ProtoFieldInfo field, String traceId) throws IOException {
generator.writeStringField(field.getJsonName(), traceId);
}
@Override
protected void writeSpanId(ProtoFieldInfo field, String spanId) throws IOException {
generator.writeStringField(field.getJsonName(), spanId);
}
@Override
protected void writeBool(ProtoFieldInfo field, boolean value) throws IOException {
generator.writeBooleanField(field.getJsonName(), value);
}
@Override
protected void writeEnum(ProtoFieldInfo field, int enumNumber) throws IOException {
generator.writeNumberField(field.getJsonName(), enumNumber);
}
@Override
protected void writeUint32(ProtoFieldInfo field, int value) throws IOException {
generator.writeNumberField(field.getJsonName(), value);
}
@Override
protected void writeInt64(ProtoFieldInfo field, long value) throws IOException {
generator.writeStringField(field.getJsonName(), Long.toString(value));
}
@Override
protected void writeFixed64(ProtoFieldInfo field, long value) throws IOException {
generator.writeStringField(field.getJsonName(), Long.toString(value));
}
@Override
protected void writeFixed64Value(long value) throws IOException {
generator.writeString(Long.toString(value));
}
@Override
protected void writeDouble(ProtoFieldInfo field, double value) throws IOException {
generator.writeNumberField(field.getJsonName(), value);
}
@Override
protected void writeDoubleValue(double value) throws IOException {
generator.writeNumber(value);
}
@Override
protected void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeUTF8String(utf8Bytes, 0, utf8Bytes.length);
}
@Override
protected void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
}
@Override
protected void writeStartMessage(ProtoFieldInfo field, int protoMessageSize) throws IOException {
generator.writeObjectFieldStart(field.getJsonName());
}
@Override
protected void writeEndMessage() throws IOException {
generator.writeEndObject();
}
@Override
protected void writeStartRepeatedPrimitive(
ProtoFieldInfo field, int protoSizePerElement, int numElements) throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
}
@Override
protected void writeEndRepeatedPrimitive() throws IOException {
generator.writeEndArray();
}
@Override
public void serializeRepeatedMessage(ProtoFieldInfo field, Marshaler[] repeatedMessage)
throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (Marshaler marshaler : repeatedMessage) {
writeMessageValue(marshaler);
}
generator.writeEndArray();
}
@Override
public void serializeRepeatedMessage(
ProtoFieldInfo field, List<? extends Marshaler> repeatedMessage) throws IOException {
generator.writeArrayFieldStart(field.getJsonName());
for (Marshaler marshaler : repeatedMessage) {
writeMessageValue(marshaler);
}
generator.writeEndArray();
}
// Not a field.
void writeMessageValue(Marshaler message) throws IOException {
generator.writeStartObject();
message.writeTo(this);
generator.writeEndObject();
}
@Override
public void writeSerializedMessage(byte[] protoSerialized, String jsonSerialized)
throws IOException {
generator.writeRaw(jsonSerialized);
}
@Override
public void close() throws IOException {
generator.close();
}
}

View File

@ -18,11 +18,18 @@ public abstract class Marshaler {
/** Marshals into the {@link OutputStream} in proto binary format. */
public final void writeBinaryTo(OutputStream output) throws IOException {
try (ProtoSerializer serializer = new ProtoSerializer(output)) {
try (Serializer serializer = new ProtoSerializer(output)) {
writeTo(serializer);
}
}
/** Marshals into the {@link OutputStream} in proto JSON format. */
public final void writeJsonTo(OutputStream output) throws IOException {
try (JsonSerializer serializer = new JsonSerializer(output)) {
serializer.writeMessageValue(this);
}
}
/** Returns the number of bytes this Marshaler will write in proto binary format. */
public abstract int getBinarySerializedSize();

View File

@ -9,6 +9,9 @@ import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
@ -24,6 +27,19 @@ final class MarshalerUtil {
private static final int SPAN_ID_VALUE_SIZE =
CodedOutputStream.computeLengthDelimitedFieldSize(SpanId.getLength() / 2);
static final boolean JSON_AVAILABLE;
static {
boolean jsonAvailable = false;
try {
Class.forName("com.fasterxml.jackson.core.JsonFactory");
jsonAvailable = true;
} catch (ClassNotFoundException e) {
// Not available
}
JSON_AVAILABLE = jsonAvailable;
}
static final byte[] EMPTY_BYTES = new byte[0];
static <T, U> Map<Resource, Map<InstrumentationLibraryInfo, List<U>>> groupByResourceAndLibrary(
@ -45,6 +61,28 @@ final class MarshalerUtil {
return result;
}
static String preserializeJsonFields(Marshaler marshaler) {
if (!MarshalerUtil.JSON_AVAILABLE) {
return "";
}
ByteArrayOutputStream jsonBos = new ByteArrayOutputStream();
try {
marshaler.writeJsonTo(jsonBos);
} catch (IOException e) {
throw new UncheckedIOException(
"Serialization error, this is likely a bug in OpenTelemetry.", e);
}
// We effectively cache `writeTo`, however Jackson would not allow us to only write out
// fields
// which is what writeTo does. So we need to write to an object but skip the object start
// /
// end.
byte[] jsonBytes = jsonBos.toByteArray();
return new String(jsonBytes, 1, jsonBytes.length - 2, StandardCharsets.UTF_8);
}
static int sizeRepeatedFixed64(ProtoFieldInfo field, List<Long> values) {
return sizeRepeatedFixed64(field, values.size());
}

View File

@ -621,8 +621,7 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
private final long startTimeUnixNano;
private final long timeUnixNano;
// Always fixed64, for a double it's the bits themselves.
private final long value;
private final PointData value;
private final ProtoFieldInfo valueField;
private final ExemplarMarshaler[] exemplars;
@ -644,21 +643,18 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(point.getAttributes());
final long value;
final ProtoFieldInfo valueField;
if (point instanceof LongPointData) {
value = ((LongPointData) point).getValue();
valueField = NumberDataPoint.AS_INT;
} else {
assert point instanceof DoublePointData;
value = Double.doubleToRawLongBits(((DoublePointData) point).getValue());
valueField = NumberDataPoint.AS_DOUBLE;
}
return new NumberDataPointMarshaler(
point.getStartEpochNanos(),
point.getEpochNanos(),
value,
point,
valueField,
exemplarMarshalers,
attributeMarshalers);
@ -667,12 +663,12 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
private NumberDataPointMarshaler(
long startTimeUnixNano,
long timeUnixNano,
long value,
PointData value,
ProtoFieldInfo valueField,
ExemplarMarshaler[] exemplars,
KeyValueMarshaler[] attributes) {
super(
calculateSize(startTimeUnixNano, timeUnixNano, value, valueField, exemplars, attributes));
calculateSize(startTimeUnixNano, timeUnixNano, valueField, value, exemplars, attributes));
this.startTimeUnixNano = startTimeUnixNano;
this.timeUnixNano = timeUnixNano;
this.value = value;
@ -685,7 +681,11 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
public void writeTo(Serializer output) throws IOException {
output.serializeFixed64(NumberDataPoint.START_TIME_UNIX_NANO, startTimeUnixNano);
output.serializeFixed64(NumberDataPoint.TIME_UNIX_NANO, timeUnixNano);
output.serializeFixed64(valueField, value);
if (valueField == NumberDataPoint.AS_INT) {
output.serializeFixed64(valueField, ((LongPointData) value).getValue());
} else {
output.serializeDouble(valueField, ((DoublePointData) value).getValue());
}
output.serializeRepeatedMessage(NumberDataPoint.EXEMPLARS, exemplars);
output.serializeRepeatedMessage(NumberDataPoint.ATTRIBUTES, attributes);
}
@ -693,14 +693,18 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
private static int calculateSize(
long startTimeUnixNano,
long timeUnixNano,
long value,
ProtoFieldInfo valueField,
PointData value,
ExemplarMarshaler[] exemplars,
KeyValueMarshaler[] attributes) {
int size = 0;
size += MarshalerUtil.sizeFixed64(NumberDataPoint.START_TIME_UNIX_NANO, startTimeUnixNano);
size += MarshalerUtil.sizeFixed64(NumberDataPoint.TIME_UNIX_NANO, timeUnixNano);
size += MarshalerUtil.sizeFixed64(valueField, value);
if (valueField == NumberDataPoint.AS_INT) {
size += MarshalerUtil.sizeFixed64(valueField, ((LongPointData) value).getValue());
} else {
size += MarshalerUtil.sizeDouble(valueField, ((DoublePointData) value).getValue());
}
size += MarshalerUtil.sizeRepeatedMessage(NumberDataPoint.EXEMPLARS, exemplars);
size += MarshalerUtil.sizeRepeatedMessage(NumberDataPoint.ATTRIBUTES, attributes);
return size;
@ -711,8 +715,7 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
private final long timeUnixNano;
// Always fixed64, for a double it's the bits themselves.
private final long value;
private final Exemplar value;
private final ProtoFieldInfo valueField;
@Nullable private final String spanId;
@ -733,20 +736,17 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(exemplar.getFilteredAttributes());
final long value;
final ProtoFieldInfo valueField;
if (exemplar instanceof LongExemplar) {
value = ((LongExemplar) exemplar).getValue();
valueField = io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_INT;
} else {
assert exemplar instanceof DoubleExemplar;
value = Double.doubleToRawLongBits(((DoubleExemplar) exemplar).getValue());
valueField = io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_DOUBLE;
}
return new ExemplarMarshaler(
exemplar.getEpochNanos(),
value,
exemplar,
valueField,
exemplar.getSpanId(),
exemplar.getTraceId(),
@ -755,14 +755,14 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
private ExemplarMarshaler(
long timeUnixNano,
long value,
Exemplar value,
ProtoFieldInfo valueField,
@Nullable String spanId,
@Nullable String traceId,
KeyValueMarshaler[] filteredAttributeMarshalers) {
super(
calculateSize(
timeUnixNano, value, valueField, spanId, traceId, filteredAttributeMarshalers));
timeUnixNano, valueField, value, spanId, traceId, filteredAttributeMarshalers));
this.timeUnixNano = timeUnixNano;
this.value = value;
this.valueField = valueField;
@ -775,7 +775,11 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
public void writeTo(Serializer output) throws IOException {
output.serializeFixed64(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.TIME_UNIX_NANO, timeUnixNano);
output.serializeFixed64(valueField, value);
if (valueField == io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_INT) {
output.serializeFixed64(valueField, ((LongExemplar) value).getValue());
} else {
output.serializeDouble(valueField, ((DoubleExemplar) value).getValue());
}
output.serializeSpanId(io.opentelemetry.proto.metrics.v1.internal.Exemplar.SPAN_ID, spanId);
output.serializeTraceId(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.TRACE_ID, traceId);
@ -786,8 +790,8 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
private static int calculateSize(
long timeUnixNano,
long value,
ProtoFieldInfo valueField,
Exemplar value,
@Nullable String spanId,
@Nullable String traceId,
KeyValueMarshaler[] filteredAttributeMarshalers) {
@ -795,7 +799,11 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
size +=
MarshalerUtil.sizeFixed64(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.TIME_UNIX_NANO, timeUnixNano);
size += MarshalerUtil.sizeFixed64(valueField, value);
if (valueField == io.opentelemetry.proto.metrics.v1.internal.Exemplar.AS_INT) {
size += MarshalerUtil.sizeFixed64(valueField, ((LongExemplar) value).getValue());
} else {
size += MarshalerUtil.sizeDouble(valueField, ((DoubleExemplar) value).getValue());
}
size +=
MarshalerUtil.sizeSpanId(
io.opentelemetry.proto.metrics.v1.internal.Exemplar.SPAN_ID, spanId);

View File

@ -145,7 +145,7 @@ final class ProtoSerializer extends Serializer implements AutoCloseable {
}
@Override
public void writeSerializedMessage(byte[] protoSerialized, byte[] jsonSerialized)
public void writeSerializedMessage(byte[] protoSerialized, String jsonSerialized)
throws IOException {
output.writeRawBytes(protoSerialized);
}

View File

@ -16,38 +16,62 @@ final class ResourceMarshaler extends MarshalerWithSize {
private static final WeakConcurrentMap<io.opentelemetry.sdk.resources.Resource, ResourceMarshaler>
RESOURCE_MARSHALER_CACHE = new WeakConcurrentMap.WithInlinedExpunction<>();
private final byte[] serializedResource;
private final byte[] serializedBinary;
private final String serializedJson;
static ResourceMarshaler create(io.opentelemetry.sdk.resources.Resource resource) {
ResourceMarshaler cached = RESOURCE_MARSHALER_CACHE.get(resource);
if (cached == null) {
// Since WeakConcurrentMap doesn't support computeIfAbsent, we may end up doing the conversion
// a few times until the cache gets filled which is fine.
cached = new ResourceMarshaler(KeyValueMarshaler.createRepeated(resource.getAttributes()));
RealResourceMarshaler realMarshaler =
new RealResourceMarshaler(KeyValueMarshaler.createRepeated(resource.getAttributes()));
ByteArrayOutputStream binaryBos =
new ByteArrayOutputStream(realMarshaler.getBinarySerializedSize());
try {
realMarshaler.writeBinaryTo(binaryBos);
} catch (IOException e) {
throw new UncheckedIOException(
"Serialization error, this is likely a bug in OpenTelemetry.", e);
}
String json = MarshalerUtil.preserializeJsonFields(realMarshaler);
cached = new ResourceMarshaler(binaryBos.toByteArray(), json);
RESOURCE_MARSHALER_CACHE.put(resource, cached);
}
return cached;
}
private ResourceMarshaler(KeyValueMarshaler[] attributeMarshalers) {
super(calculateSize(attributeMarshalers));
ByteArrayOutputStream bos = new ByteArrayOutputStream(getBinarySerializedSize());
try (ProtoSerializer serializer = new ProtoSerializer(bos)) {
serializer.serializeRepeatedMessage(Resource.ATTRIBUTES, attributeMarshalers);
} catch (IOException e) {
// Presized so can't happen (we would have already thrown OutOfMemoryError)
throw new UncheckedIOException(e);
}
serializedResource = bos.toByteArray();
private ResourceMarshaler(byte[] binary, String json) {
super(binary.length);
serializedBinary = binary;
serializedJson = json;
}
@Override
public void writeTo(Serializer output) throws IOException {
// TODO(anuraaga): Preserialize JSON as well.
output.writeSerializedMessage(serializedResource, MarshalerUtil.EMPTY_BYTES);
output.writeSerializedMessage(serializedBinary, serializedJson);
}
private static int calculateSize(KeyValueMarshaler[] attributeMarshalers) {
return MarshalerUtil.sizeRepeatedMessage(Resource.ATTRIBUTES, attributeMarshalers);
private static final class RealResourceMarshaler extends MarshalerWithSize {
private final KeyValueMarshaler[] attributes;
private RealResourceMarshaler(KeyValueMarshaler[] attributes) {
super(calculateSize(attributes));
this.attributes = attributes;
}
@Override
void writeTo(Serializer output) throws IOException {
output.serializeRepeatedMessage(Resource.ATTRIBUTES, attributes);
}
private static int calculateSize(KeyValueMarshaler[] attributeMarshalers) {
return MarshalerUtil.sizeRepeatedMessage(Resource.ATTRIBUTES, attributeMarshalers);
}
}
}

View File

@ -18,7 +18,7 @@ import javax.annotation.Nullable;
* <li>Can be implemented to serialize into protobuf JSON format (not binary)
* </ul>
*/
public abstract class Serializer {
public abstract class Serializer implements AutoCloseable {
Serializer() {}
@ -40,7 +40,7 @@ public abstract class Serializer {
writeSpanId(field, spanId);
}
protected abstract void writeSpanId(ProtoFieldInfo field, String traceId) throws IOException;
protected abstract void writeSpanId(ProtoFieldInfo field, String spanId) throws IOException;
/** Serializes a protobuf {@code bool} field. */
public void serializeBool(ProtoFieldInfo field, boolean value) throws IOException {
@ -172,6 +172,9 @@ public abstract class Serializer {
ProtoFieldInfo field, List<? extends Marshaler> repeatedMessage) throws IOException;
/** Writes the value for a message field that has been pre-serialized. */
public abstract void writeSerializedMessage(byte[] protoSerialized, byte[] jsonSerialized)
public abstract void writeSerializedMessage(byte[] protoSerialized, String jsonSerialized)
throws IOException;
@Override
public abstract void close() throws IOException;
}

View File

@ -354,7 +354,7 @@ public final class TraceRequestMarshaler extends MarshalerWithSize {
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeFixed64(Span.Event.TIME_UNIX_NANO, epochNanos);
output.serializeBytes(Span.Event.NAME, name);
output.serializeString(Span.Event.NAME, name);
output.serializeRepeatedMessage(Span.Event.ATTRIBUTES, attributeMarshalers);
output.serializeUInt32(Span.Event.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
}

View File

@ -15,8 +15,9 @@ import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.Parser;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.OtelEncodingUtils;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
@ -53,9 +54,12 @@ import io.opentelemetry.sdk.resources.Resource;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Test;
@ -727,8 +731,8 @@ class MetricsRequestMarshalerTest {
.map(
point ->
parse(
NumberDataPoint.parser(),
toByteArray(MetricsRequestMarshaler.NumberDataPointMarshaler.create(point))))
NumberDataPoint.getDefaultInstance(),
MetricsRequestMarshaler.NumberDataPointMarshaler.create(point)))
.collect(Collectors.toList());
}
@ -738,8 +742,8 @@ class MetricsRequestMarshalerTest {
.map(
point ->
parse(
SummaryDataPoint.parser(),
toByteArray(MetricsRequestMarshaler.SummaryDataPointMarshaler.create(point))))
SummaryDataPoint.getDefaultInstance(),
MetricsRequestMarshaler.SummaryDataPointMarshaler.create(point)))
.collect(Collectors.toList());
}
@ -749,29 +753,31 @@ class MetricsRequestMarshalerTest {
.map(
point ->
parse(
HistogramDataPoint.parser(),
toByteArray(MetricsRequestMarshaler.HistogramDataPointMarshaler.create(point))))
HistogramDataPoint.getDefaultInstance(),
MetricsRequestMarshaler.HistogramDataPointMarshaler.create(point)))
.collect(Collectors.toList());
}
private static Metric toProtoMetric(MetricData metricData) {
return parse(
Metric.parser(), toByteArray(MetricsRequestMarshaler.MetricMarshaler.create(metricData)));
Metric.getDefaultInstance(), MetricsRequestMarshaler.MetricMarshaler.create(metricData));
}
private static List<ResourceMetrics> toProtoResourceMetrics(
Collection<MetricData> metricDataList) {
ExportMetricsServiceRequest exportRequest =
parse(
ExportMetricsServiceRequest.parser(),
toByteArray(MetricsRequestMarshaler.create(metricDataList)));
ExportMetricsServiceRequest.getDefaultInstance(),
MetricsRequestMarshaler.create(metricDataList));
return exportRequest.getResourceMetricsList();
}
private static <T extends Message> T parse(Parser<T> parser, byte[] serialized) {
final T result;
@SuppressWarnings("unchecked")
private static <T extends Message> T parse(T prototype, Marshaler marshaler) {
byte[] serialized = toByteArray(marshaler);
T result;
try {
result = parser.parseFrom(serialized);
result = (T) prototype.newBuilderForType().mergeFrom(serialized).build();
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
@ -781,9 +787,46 @@ class MetricsRequestMarshalerTest {
// tieme. If the lengths are equal and the resulting protos are equal, the marshaling is
// guaranteed to be valid.
assertThat(result.getSerializedSize()).isEqualTo(serialized.length);
// We don't compare JSON strings due to some differences (particularly serializing enums as
// numbers instead of names). This may improve in the future but what matters is what we produce
// can be parsed.
String json = toJson(marshaler);
Message.Builder builder = prototype.newBuilderForType();
try {
JsonFormat.parser().merge(json, builder);
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
// Hackily swap out "hex as base64" decoded IDs with correct ones since no JSON protobuf
// libraries currently support customizing on the parse side.
if (result instanceof NumberDataPoint) {
NumberDataPoint.Builder fixed = (NumberDataPoint.Builder) builder;
for (Exemplar.Builder exemplar : fixed.getExemplarsBuilderList()) {
exemplar.setTraceId(toHex(exemplar.getTraceId()));
exemplar.setSpanId(toHex(exemplar.getSpanId()));
}
}
if (result instanceof HistogramDataPoint) {
HistogramDataPoint.Builder fixed = (HistogramDataPoint.Builder) builder;
for (Exemplar.Builder exemplar : fixed.getExemplarsBuilderList()) {
exemplar.setTraceId(toHex(exemplar.getTraceId()));
exemplar.setSpanId(toHex(exemplar.getSpanId()));
}
}
assertThat(builder.build()).isEqualTo(result);
return result;
}
private static ByteString toHex(ByteString hexReadAsBase64) {
String hex =
Base64.getEncoder().encodeToString(hexReadAsBase64.toByteArray()).toLowerCase(Locale.ROOT);
return ByteString.copyFrom(OtelEncodingUtils.bytesFromBase16(hex, hex.length()));
}
private static byte[] toByteArray(Marshaler marshaler) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
@ -793,4 +836,15 @@ class MetricsRequestMarshalerTest {
}
return bos.toByteArray();
}
private static String toJson(Marshaler marshaler) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
marshaler.writeJsonTo(bos);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return new String(bos.toByteArray(), StandardCharsets.UTF_8);
}
}

View File

@ -39,6 +39,7 @@ import org.jeasy.random.randomizers.number.ByteRandomizer;
import org.jeasy.random.randomizers.number.DoubleRandomizer;
import org.jeasy.random.randomizers.number.LongRandomizer;
import org.jeasy.random.randomizers.range.IntegerRangeRandomizer;
import org.jeasy.random.randomizers.range.LongRangeRandomizer;
import org.jeasy.random.randomizers.text.StringRandomizer;
final class SpanDataRandomizerRegistry implements RandomizerRegistry {
@ -57,6 +58,8 @@ final class SpanDataRandomizerRegistry implements RandomizerRegistry {
private DoubleRandomizer doubleRandomizer;
private LongRandomizer longRandomizer;
private StringRandomizer stringRandomizer;
private IntegerRangeRandomizer unsignedInt32Randomizer;
private LongRangeRandomizer unsignedInt64Randomizer;
private AttributesRandomizer attributesRandomizer;
private Map<AttributeType, Supplier<Randomizer<?>>> attributeValueRandomizers;
@ -94,6 +97,9 @@ final class SpanDataRandomizerRegistry implements RandomizerRegistry {
parameters.getStringLengthRange().getMin(),
parameters.getStringLengthRange().getMax(),
parameters.getSeed());
unsignedInt32Randomizer =
new IntegerRangeRandomizer(0, Integer.MAX_VALUE, parameters.getSeed());
unsignedInt64Randomizer = new LongRangeRandomizer(0L, Long.MAX_VALUE, parameters.getSeed());
attributeValueRandomizers = new EnumMap<>(AttributeType.class);
for (AttributeType type : AttributeType.values()) {
@ -161,12 +167,21 @@ final class SpanDataRandomizerRegistry implements RandomizerRegistry {
@Override
@Nullable
public Randomizer<?> getRandomizer(Field field) {
if (field.getName().contains("Count") || field.getName().contains("totalRecorded")) {
return unsignedInt32Randomizer;
}
if (field.getName().contains("Nanos")) {
return unsignedInt64Randomizer;
}
// TODO(anuraaga): Make work for autovalue, unfortunately it only adds Nullable to constructor
// parameters / getters but not to fields.
// https://github.com/open-telemetry/opentelemetry-java/issues/3498
if (field.getAnnotation(Nullable.class) == null) {
return null;
}
Randomizer<?> delegate = getRandomizer(field.getType());
if (delegate == null) {
return null;

View File

@ -7,13 +7,20 @@ package io.opentelemetry.exporter.otlp.internal;
import static org.assertj.core.api.Assertions.assertThat;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.OtelEncodingUtils;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.trace.v1.InstrumentationLibrarySpans;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.proto.trace.v1.Span;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.trace.TestSpanData;
@ -23,9 +30,13 @@ import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Random;
import org.jeasy.random.EasyRandom;
import org.jeasy.random.EasyRandomParameters;
@ -172,6 +183,45 @@ class TraceRequestMarshalerTest {
+ "\nparsed from marshaler output: "
+ reverse);
}
// We don't compare JSON strings due to some differences (particularly serializing enums as
// numbers instead of names). This may improve in the future but what matters is what we produce
// can be parsed.
ByteArrayOutputStream jsonOutput = new ByteArrayOutputStream();
requestMarshaler.writeJsonTo(jsonOutput);
String json = new String(jsonOutput.toByteArray(), StandardCharsets.UTF_8);
ExportTraceServiceRequest.Builder builder = ExportTraceServiceRequest.newBuilder();
try {
JsonFormat.parser().merge(json, builder);
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
// Hackily swap out "hex as base64" decoded IDs with correct ones since no JSON protobuf
// libraries currently support customizing on the parse side.
for (ResourceSpans.Builder rs : builder.getResourceSpansBuilderList()) {
for (InstrumentationLibrarySpans.Builder ils :
rs.getInstrumentationLibrarySpansBuilderList()) {
for (Span.Builder s : ils.getSpansBuilderList()) {
s.setTraceId(toHex(s.getTraceId()));
s.setSpanId(toHex(s.getSpanId()));
s.setParentSpanId(toHex(s.getParentSpanId()));
for (Span.Link.Builder l : s.getLinksBuilderList()) {
l.setTraceId(toHex(l.getTraceId()));
l.setSpanId(toHex(l.getSpanId()));
}
}
}
}
assertThat(builder.build()).isEqualTo(ExportTraceServiceRequest.parseFrom(customOutputBytes));
}
private static ByteString toHex(ByteString hexReadAsBase64) {
String hex =
Base64.getEncoder().encodeToString(hexReadAsBase64.toByteArray()).toLowerCase(Locale.ROOT);
return ByteString.copyFrom(OtelEncodingUtils.bytesFromBase16(hex, hex.length()));
}
private static SpanData testSpanDataWithInstrumentationLibrary(

View File

@ -2,7 +2,6 @@ plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
id("otel.jmh-conventions")
id("otel.animalsniffer-conventions")
id("org.unbroken-dome.test-sets")
@ -58,11 +57,6 @@ dependencies {
add("testGrpcOkhttpRuntimeOnly", "io.grpc:grpc-okhttp")
add("testGrpcOkhttpRuntimeOnly", "org.bouncycastle:bcpkix-jdk15on")
jmhImplementation(project(":proto"))
jmhImplementation(project(":sdk:testing"))
jmhRuntimeOnly("io.grpc:grpc-netty")
jmhImplementation("io.grpc:grpc-testing")
add("testSpanPipeline", project(":proto"))
add("testSpanPipeline", "io.grpc:grpc-protobuf")
add("testSpanPipeline", "io.grpc:grpc-testing")