Serialize log body any value (#5938)

This commit is contained in:
jack-berg 2023-12-07 13:29:41 -06:00 committed by GitHub
parent 247ef4d26e
commit 9a3391d5e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 711 additions and 197 deletions

View File

@ -109,7 +109,7 @@ final class JsonSerializer extends Serializer {
}
@Override
protected void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
generator.writeBinaryField(field.getJsonName(), value);
}

View File

@ -123,7 +123,7 @@ final class ProtoSerializer extends Serializer implements AutoCloseable {
}
@Override
protected void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
output.writeUInt32NoTag(field.getTag());
output.writeByteArrayNoTag(value);
}

View File

@ -177,7 +177,7 @@ public abstract class Serializer implements AutoCloseable {
writeBytes(field, value);
}
protected abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException;
public abstract void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException;
protected abstract void writeStartMessage(ProtoFieldInfo field, int protoMessageSize)
throws IOException;

View File

@ -16,6 +16,7 @@ dependencies {
protoSource("io.opentelemetry.proto:opentelemetry-proto:${versions["io.opentelemetry.proto"]}")
api(project(":exporters:common"))
implementation(project(":extensions:incubator"))
compileOnly(project(":sdk:metrics"))
compileOnly(project(":sdk:trace"))

View File

@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.extension.incubator.logs.AnyValue;
import io.opentelemetry.extension.incubator.logs.KeyAnyValue;
import java.nio.ByteBuffer;
import java.util.List;
/**
* Utility methods for obtaining AnyValue marshaler.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class AnyValueMarshaler {
private AnyValueMarshaler() {}
@SuppressWarnings("unchecked")
public static MarshalerWithSize create(AnyValue<?> anyValue) {
switch (anyValue.getType()) {
case STRING:
return StringAnyValueMarshaler.create((String) anyValue.getValue());
case BOOLEAN:
return BoolAnyValueMarshaler.create((boolean) anyValue.getValue());
case LONG:
return IntAnyValueMarshaler.create((long) anyValue.getValue());
case DOUBLE:
return DoubleAnyValueMarshaler.create((double) anyValue.getValue());
case ARRAY:
return ArrayAnyValueMarshaler.createAnyValue((List<AnyValue<?>>) anyValue.getValue());
case KEY_VALUE_LIST:
return KeyValueListAnyValueMarshaler.create((List<KeyAnyValue>) anyValue.getValue());
case BYTES:
return BytesAnyValueMarshaler.create((ByteBuffer) anyValue.getValue());
}
throw new IllegalArgumentException("Unsupported AnyValue type: " + anyValue.getType());
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
import io.opentelemetry.proto.common.v1.internal.ArrayValue;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
final class ArrayAnyValueMarshaler extends MarshalerWithSize {
private final Marshaler value;
private ArrayAnyValueMarshaler(ArrayValueMarshaler value) {
super(calculateSize(value));
this.value = value;
}
static MarshalerWithSize createAnyValue(
List<io.opentelemetry.extension.incubator.logs.AnyValue<?>> values) {
return createInternal(values, AnyValueMarshaler::create);
}
static MarshalerWithSize createString(List<String> values) {
return createInternal(values, StringAnyValueMarshaler::create);
}
static MarshalerWithSize createBool(List<Boolean> values) {
return createInternal(values, BoolAnyValueMarshaler::create);
}
static MarshalerWithSize createInt(List<Long> values) {
return createInternal(values, IntAnyValueMarshaler::create);
}
static MarshalerWithSize createDouble(List<Double> values) {
return createInternal(values, DoubleAnyValueMarshaler::create);
}
private static <T, M extends MarshalerWithSize> MarshalerWithSize createInternal(
List<T> values, Function<T, M> initializer) {
int len = values.size();
Marshaler[] marshalers = new Marshaler[len];
for (int i = 0; i < len; i++) {
marshalers[i] = initializer.apply(values.get(i));
}
return new ArrayAnyValueMarshaler(new ArrayValueMarshaler(marshalers));
}
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeMessage(AnyValue.ARRAY_VALUE, value);
}
private static int calculateSize(Marshaler value) {
return MarshalerUtil.sizeMessage(AnyValue.ARRAY_VALUE, value);
}
private static class ArrayValueMarshaler extends MarshalerWithSize {
private final Marshaler[] values;
private ArrayValueMarshaler(Marshaler[] values) {
super(calculateSize(values));
this.values = values;
}
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeRepeatedMessage(ArrayValue.VALUES, values);
}
private static int calculateSize(Marshaler[] values) {
return MarshalerUtil.sizeRepeatedMessage(ArrayValue.VALUES, values);
}
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
import java.io.IOException;
final class BoolAnyValueMarshaler extends MarshalerWithSize {
private final boolean value;
private BoolAnyValueMarshaler(boolean value) {
super(calculateSize(value));
this.value = value;
}
static MarshalerWithSize create(boolean value) {
return new BoolAnyValueMarshaler(value);
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the
// value is empty since it's a oneof.
output.writeBool(AnyValue.BOOL_VALUE, value);
}
private static int calculateSize(boolean value) {
return AnyValue.BOOL_VALUE.getTagSize() + CodedOutputStream.computeBoolSizeNoTag(value);
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
import java.io.IOException;
import java.nio.ByteBuffer;
final class BytesAnyValueMarshaler extends MarshalerWithSize {
private final byte[] value;
private BytesAnyValueMarshaler(byte[] value) {
super(calculateSize(value));
this.value = value;
}
static MarshalerWithSize create(ByteBuffer value) {
byte[] bytes = new byte[value.remaining()];
value.get(bytes);
return new BytesAnyValueMarshaler(bytes);
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the
// value is empty since it's a oneof.
output.writeBytes(AnyValue.BYTES_VALUE, value);
}
private static int calculateSize(byte[] value) {
return AnyValue.BYTES_VALUE.getTagSize() + CodedOutputStream.computeByteArraySizeNoTag(value);
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
import java.io.IOException;
final class DoubleAnyValueMarshaler extends MarshalerWithSize {
private final double value;
private DoubleAnyValueMarshaler(double value) {
super(calculateSize(value));
this.value = value;
}
static MarshalerWithSize create(double value) {
return new DoubleAnyValueMarshaler(value);
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the
// value is empty since it's a oneof.
output.writeDouble(AnyValue.DOUBLE_VALUE, value);
}
private static int calculateSize(double value) {
return AnyValue.DOUBLE_VALUE.getTagSize() + CodedOutputStream.computeDoubleSizeNoTag(value);
}
}

View File

@ -37,7 +37,8 @@ public final class InstrumentationScopeMarshaler extends MarshalerWithSize {
// a few times until the cache gets filled which is fine.
byte[] name = MarshalerUtil.toBytes(scopeInfo.getName());
byte[] version = MarshalerUtil.toBytes(scopeInfo.getVersion());
KeyValueMarshaler[] attributes = KeyValueMarshaler.createRepeated(scopeInfo.getAttributes());
KeyValueMarshaler[] attributes =
KeyValueMarshaler.createForAttributes(scopeInfo.getAttributes());
RealInstrumentationScopeMarshaler realMarshaler =
new RealInstrumentationScopeMarshaler(name, version, attributes);

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
import java.io.IOException;
final class IntAnyValueMarshaler extends MarshalerWithSize {
private final long value;
private IntAnyValueMarshaler(long value) {
super(calculateSize(value));
this.value = value;
}
static MarshalerWithSize create(long value) {
return new IntAnyValueMarshaler(value);
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the
// value is empty since it's a oneof.
output.writeInt64(AnyValue.INT_VALUE, value);
}
private static int calculateSize(long value) {
return AnyValue.INT_VALUE.getTagSize() + CodedOutputStream.computeInt64SizeNoTag(value);
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.extension.incubator.logs.KeyAnyValue;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
import io.opentelemetry.proto.common.v1.internal.KeyValueList;
import java.io.IOException;
import java.util.List;
final class KeyValueListAnyValueMarshaler extends MarshalerWithSize {
private final Marshaler value;
private KeyValueListAnyValueMarshaler(KeyValueListMarshaler value) {
super(calculateSize(value));
this.value = value;
}
static MarshalerWithSize create(List<KeyAnyValue> values) {
int len = values.size();
KeyValueMarshaler[] marshalers = new KeyValueMarshaler[values.size()];
for (int i = 0; i < len; i++) {
marshalers[i] = KeyValueMarshaler.createForKeyAnyValue(values.get(i));
}
return new KeyValueListAnyValueMarshaler(new KeyValueListMarshaler(marshalers));
}
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeMessage(AnyValue.KVLIST_VALUE, value);
}
private static int calculateSize(Marshaler value) {
return MarshalerUtil.sizeMessage(AnyValue.KVLIST_VALUE, value);
}
private static class KeyValueListMarshaler extends MarshalerWithSize {
private final Marshaler[] values;
private KeyValueListMarshaler(KeyValueMarshaler[] values) {
super(calculateSize(values));
this.values = values;
}
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeRepeatedMessage(KeyValueList.VALUES, values);
}
private static int calculateSize(Marshaler[] values) {
return MarshalerUtil.sizeRepeatedMessage(KeyValueList.VALUES, values);
}
}
}

View File

@ -8,13 +8,11 @@ package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.internal.InternalAttributeKeyImpl;
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
import io.opentelemetry.proto.common.v1.internal.ArrayValue;
import io.opentelemetry.extension.incubator.logs.KeyAnyValue;
import io.opentelemetry.proto.common.v1.internal.KeyValue;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -22,38 +20,52 @@ import java.util.List;
import java.util.function.BiConsumer;
/**
* A Marshaler of {@link Attributes}.
* A Marshaler of key value pairs.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class KeyValueMarshaler extends MarshalerWithSize {
private static final byte[] EMPTY_BYTES = new byte[0];
private static final KeyValueMarshaler[] EMPTY_REPEATED = new KeyValueMarshaler[0];
private final byte[] keyUtf8;
private final Marshaler value;
private KeyValueMarshaler(byte[] keyUtf8, Marshaler value) {
super(calculateSize(keyUtf8, value));
this.keyUtf8 = keyUtf8;
this.value = value;
}
/** Returns Marshaler for the given KeyAnyValue. */
public static KeyValueMarshaler createForKeyAnyValue(KeyAnyValue keyAnyValue) {
return new KeyValueMarshaler(
keyAnyValue.getKey().getBytes(StandardCharsets.UTF_8),
AnyValueMarshaler.create(keyAnyValue.getAnyValue()));
}
/** Returns Marshalers for the given Attributes. */
@SuppressWarnings("AvoidObjectArrays")
public static KeyValueMarshaler[] createRepeated(Attributes attributes) {
public static KeyValueMarshaler[] createForAttributes(Attributes attributes) {
if (attributes.isEmpty()) {
return EMPTY_REPEATED;
}
KeyValueMarshaler[] attributeMarshalers = new KeyValueMarshaler[attributes.size()];
KeyValueMarshaler[] marshalers = new KeyValueMarshaler[attributes.size()];
attributes.forEach(
new BiConsumer<AttributeKey<?>, Object>() {
int index = 0;
@Override
public void accept(AttributeKey<?> attributeKey, Object o) {
attributeMarshalers[index++] = KeyValueMarshaler.create(attributeKey, o);
marshalers[index++] = create(attributeKey, o);
}
});
return attributeMarshalers;
return marshalers;
}
private final byte[] keyUtf8;
private final Marshaler value;
@SuppressWarnings("unchecked")
private static KeyValueMarshaler create(AttributeKey<?> attributeKey, Object value) {
byte[] keyUtf8;
@ -66,42 +78,30 @@ public final class KeyValueMarshaler extends MarshalerWithSize {
}
switch (attributeKey.getType()) {
case STRING:
return new KeyValueMarshaler(
keyUtf8, new StringAnyValueMarshaler(MarshalerUtil.toBytes((String) value)));
return new KeyValueMarshaler(keyUtf8, StringAnyValueMarshaler.create((String) value));
case LONG:
return new KeyValueMarshaler(keyUtf8, new Int64AnyValueMarshaler((long) value));
return new KeyValueMarshaler(keyUtf8, IntAnyValueMarshaler.create((long) value));
case BOOLEAN:
return new KeyValueMarshaler(keyUtf8, new BoolAnyValueMarshaler((boolean) value));
return new KeyValueMarshaler(keyUtf8, BoolAnyValueMarshaler.create((boolean) value));
case DOUBLE:
return new KeyValueMarshaler(keyUtf8, new AnyDoubleFieldMarshaler((double) value));
return new KeyValueMarshaler(keyUtf8, DoubleAnyValueMarshaler.create((double) value));
case STRING_ARRAY:
return new KeyValueMarshaler(
keyUtf8,
new ArrayAnyValueMarshaler(ArrayValueMarshaler.createString((List<String>) value)));
keyUtf8, ArrayAnyValueMarshaler.createString((List<String>) value));
case LONG_ARRAY:
return new KeyValueMarshaler(
keyUtf8,
new ArrayAnyValueMarshaler(ArrayValueMarshaler.createInt64((List<Long>) value)));
return new KeyValueMarshaler(keyUtf8, ArrayAnyValueMarshaler.createInt((List<Long>) value));
case BOOLEAN_ARRAY:
return new KeyValueMarshaler(
keyUtf8,
new ArrayAnyValueMarshaler(ArrayValueMarshaler.createBool((List<Boolean>) value)));
keyUtf8, ArrayAnyValueMarshaler.createBool((List<Boolean>) value));
case DOUBLE_ARRAY:
return new KeyValueMarshaler(
keyUtf8,
new ArrayAnyValueMarshaler(ArrayValueMarshaler.createDouble((List<Double>) value)));
keyUtf8, ArrayAnyValueMarshaler.createDouble((List<Double>) value));
}
// Error prone ensures the switch statement is complete, otherwise only can happen with
// unaligned versions which are not supported.
throw new IllegalArgumentException("Unsupported attribute type.");
}
private KeyValueMarshaler(byte[] keyUtf8, Marshaler value) {
super(calculateSize(keyUtf8, value));
this.keyUtf8 = keyUtf8;
this.value = value;
}
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeString(KeyValue.KEY, keyUtf8);
@ -114,140 +114,4 @@ public final class KeyValueMarshaler extends MarshalerWithSize {
size += MarshalerUtil.sizeMessage(KeyValue.VALUE, value);
return size;
}
private static class BoolAnyValueMarshaler extends MarshalerWithSize {
private final boolean value;
BoolAnyValueMarshaler(boolean value) {
super(calculateSize(value));
this.value = value;
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the
// value is empty since it's a oneof.
output.writeBool(AnyValue.BOOL_VALUE, value);
}
private static int calculateSize(boolean value) {
return AnyValue.BOOL_VALUE.getTagSize() + CodedOutputStream.computeBoolSizeNoTag(value);
}
}
private static class Int64AnyValueMarshaler extends MarshalerWithSize {
private final long value;
Int64AnyValueMarshaler(long value) {
super(calculateSize(value));
this.value = value;
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the
// value is empty since it's a oneof.
output.writeInt64(AnyValue.INT_VALUE, value);
}
private static int calculateSize(long value) {
return AnyValue.INT_VALUE.getTagSize() + CodedOutputStream.computeInt64SizeNoTag(value);
}
}
private static class AnyDoubleFieldMarshaler extends MarshalerWithSize {
private final double value;
AnyDoubleFieldMarshaler(double value) {
super(calculateSize(value));
this.value = value;
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the
// value is empty since it's a oneof.
output.writeDouble(AnyValue.DOUBLE_VALUE, value);
}
private static int calculateSize(double value) {
return AnyValue.DOUBLE_VALUE.getTagSize() + CodedOutputStream.computeDoubleSizeNoTag(value);
}
}
private static class ArrayAnyValueMarshaler extends MarshalerWithSize {
private final Marshaler value;
private ArrayAnyValueMarshaler(Marshaler value) {
super(calculateSize(value));
this.value = value;
}
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeMessage(AnyValue.ARRAY_VALUE, value);
}
private static int calculateSize(Marshaler value) {
return MarshalerUtil.sizeMessage(AnyValue.ARRAY_VALUE, value);
}
}
private static class ArrayValueMarshaler extends MarshalerWithSize {
static ArrayValueMarshaler createString(List<String> values) {
int len = values.size();
Marshaler[] marshalers = new StringAnyValueMarshaler[len];
for (int i = 0; i < len; i++) {
marshalers[i] = new StringAnyValueMarshaler(values.get(i).getBytes(StandardCharsets.UTF_8));
}
return new ArrayValueMarshaler(marshalers);
}
static ArrayValueMarshaler createBool(List<Boolean> values) {
int len = values.size();
Marshaler[] marshalers = new BoolAnyValueMarshaler[len];
for (int i = 0; i < len; i++) {
marshalers[i] = new BoolAnyValueMarshaler(values.get(i));
}
return new ArrayValueMarshaler(marshalers);
}
static ArrayValueMarshaler createInt64(List<Long> values) {
int len = values.size();
Marshaler[] marshalers = new Int64AnyValueMarshaler[len];
for (int i = 0; i < len; i++) {
marshalers[i] = new Int64AnyValueMarshaler(values.get(i));
}
return new ArrayValueMarshaler(marshalers);
}
static ArrayValueMarshaler createDouble(List<Double> values) {
int len = values.size();
Marshaler[] marshalers = new AnyDoubleFieldMarshaler[len];
for (int i = 0; i < len; i++) {
marshalers[i] = new AnyDoubleFieldMarshaler(values.get(i));
}
return new ArrayValueMarshaler(marshalers);
}
private final Marshaler[] values;
private ArrayValueMarshaler(Marshaler[] values) {
super(calculateSize(values));
this.values = values;
}
@Override
public void writeTo(Serializer output) throws IOException {
output.serializeRepeatedMessage(ArrayValue.VALUES, values);
}
private static int calculateSize(Marshaler[] values) {
return MarshalerUtil.sizeRepeatedMessage(ArrayValue.VALUES, values);
}
}
}

View File

@ -36,7 +36,8 @@ public final class ResourceMarshaler extends MarshalerWithSize {
// a few times until the cache gets filled which is fine.
RealResourceMarshaler realMarshaler =
new RealResourceMarshaler(KeyValueMarshaler.createRepeated(resource.getAttributes()));
new RealResourceMarshaler(
KeyValueMarshaler.createForAttributes(resource.getAttributes()));
ByteArrayOutputStream binaryBos =
new ByteArrayOutputStream(realMarshaler.getBinarySerializedSize());

View File

@ -6,6 +6,7 @@
package io.opentelemetry.exporter.internal.otlp;
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.proto.common.v1.internal.AnyValue;
@ -17,15 +18,19 @@ import java.io.IOException;
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class StringAnyValueMarshaler extends MarshalerWithSize {
final class StringAnyValueMarshaler extends MarshalerWithSize {
private final byte[] valueUtf8;
public StringAnyValueMarshaler(byte[] valueUtf8) {
private StringAnyValueMarshaler(byte[] valueUtf8) {
super(calculateSize(valueUtf8));
this.valueUtf8 = valueUtf8;
}
static MarshalerWithSize create(String value) {
return new StringAnyValueMarshaler(MarshalerUtil.toBytes(value));
}
@Override
public void writeTo(Serializer output) throws IOException {
// Do not call serialize* method because we always have to write the message tag even if the

View File

@ -14,17 +14,22 @@ import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.marshal.ProtoEnumInfo;
import io.opentelemetry.exporter.internal.marshal.Serializer;
import io.opentelemetry.exporter.internal.otlp.AnyValueMarshaler;
import io.opentelemetry.exporter.internal.otlp.KeyValueMarshaler;
import io.opentelemetry.exporter.internal.otlp.StringAnyValueMarshaler;
import io.opentelemetry.extension.incubator.logs.AnyValue;
import io.opentelemetry.proto.logs.v1.internal.LogRecord;
import io.opentelemetry.proto.logs.v1.internal.SeverityNumber;
import io.opentelemetry.sdk.logs.data.Body;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.internal.AnyValueBody;
import java.io.IOException;
import javax.annotation.Nullable;
final class LogMarshaler extends MarshalerWithSize {
private static final String INVALID_TRACE_ID = TraceId.getInvalid();
private static final String INVALID_SPAN_ID = SpanId.getInvalid();
private static final MarshalerWithSize EMPTY_BODY_MARSHALER =
AnyValueMarshaler.create(AnyValue.of(""));
private final long timeUnixNano;
private final long observedTimeUnixNano;
@ -39,11 +44,9 @@ final class LogMarshaler extends MarshalerWithSize {
static LogMarshaler create(LogRecordData logRecordData) {
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(logRecordData.getAttributes());
KeyValueMarshaler.createForAttributes(logRecordData.getAttributes());
// TODO(jack-berg): handle AnyValue log body
StringAnyValueMarshaler anyValueMarshaler =
new StringAnyValueMarshaler(MarshalerUtil.toBytes(logRecordData.getBody().asString()));
MarshalerWithSize bodyMarshaler = body(logRecordData.getBody());
SpanContext spanContext = logRecordData.getSpanContext();
return new LogMarshaler(
@ -51,7 +54,7 @@ final class LogMarshaler extends MarshalerWithSize {
logRecordData.getObservedTimestampEpochNanos(),
toProtoSeverityNumber(logRecordData.getSeverity()),
MarshalerUtil.toBytes(logRecordData.getSeverityText()),
anyValueMarshaler,
bodyMarshaler,
attributeMarshalers,
logRecordData.getTotalAttributeCount() - logRecordData.getAttributes().size(),
spanContext.getTraceFlags(),
@ -59,6 +62,19 @@ final class LogMarshaler extends MarshalerWithSize {
spanContext.getSpanId().equals(INVALID_SPAN_ID) ? null : spanContext.getSpanId());
}
private static MarshalerWithSize body(Body body) {
if (body instanceof AnyValueBody) {
return AnyValueMarshaler.create(((AnyValueBody) body).asAnyValue());
}
switch (body.getType()) {
case STRING:
return AnyValueMarshaler.create(AnyValue.of(body.asString()));
case EMPTY:
return EMPTY_BODY_MARSHALER;
}
throw new IllegalStateException("Unsupported Body type: " + body.getType());
}
private LogMarshaler(
long timeUnixNano,
long observedTimeUnixNano,

View File

@ -39,7 +39,7 @@ final class ExemplarMarshaler extends MarshalerWithSize {
private static ExemplarMarshaler create(ExemplarData exemplar) {
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(exemplar.getFilteredAttributes());
KeyValueMarshaler.createForAttributes(exemplar.getFilteredAttributes());
ProtoFieldInfo valueField;
if (exemplar instanceof LongExemplarData) {

View File

@ -83,7 +83,7 @@ public class ExponentialHistogramDataPointMarshaler extends MarshalerWithSize {
}
static ExponentialHistogramDataPointMarshaler create(ExponentialHistogramPointData point) {
KeyValueMarshaler[] attributes = KeyValueMarshaler.createRepeated(point.getAttributes());
KeyValueMarshaler[] attributes = KeyValueMarshaler.createForAttributes(point.getAttributes());
ExemplarMarshaler[] exemplars = ExemplarMarshaler.createRepeated(point.getExemplars());
ExponentialHistogramBucketsMarshaler positiveBuckets =

View File

@ -41,7 +41,7 @@ final class HistogramDataPointMarshaler extends MarshalerWithSize {
static HistogramDataPointMarshaler create(HistogramPointData point) {
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(point.getAttributes());
KeyValueMarshaler.createForAttributes(point.getAttributes());
ExemplarMarshaler[] exemplarMarshalers = ExemplarMarshaler.createRepeated(point.getExemplars());
return new HistogramDataPointMarshaler(

View File

@ -40,7 +40,7 @@ final class NumberDataPointMarshaler extends MarshalerWithSize {
static NumberDataPointMarshaler create(PointData point) {
ExemplarMarshaler[] exemplarMarshalers = ExemplarMarshaler.createRepeated(point.getExemplars());
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(point.getAttributes());
KeyValueMarshaler.createForAttributes(point.getAttributes());
ProtoFieldInfo valueField;
if (point instanceof LongPointData) {

View File

@ -20,7 +20,7 @@ final class SummaryDataPointMarshaler extends MarshalerWithSize {
private final long count;
private final double sum;
private final ValueAtQuantileMarshaler[] quantileValues;
private final KeyValueMarshaler[] attributes;
private final MarshalerWithSize[] attributes;
static SummaryDataPointMarshaler[] createRepeated(Collection<SummaryPointData> points) {
SummaryDataPointMarshaler[] marshalers = new SummaryDataPointMarshaler[points.size()];
@ -34,8 +34,8 @@ final class SummaryDataPointMarshaler extends MarshalerWithSize {
static SummaryDataPointMarshaler create(SummaryPointData point) {
ValueAtQuantileMarshaler[] quantileMarshalers =
ValueAtQuantileMarshaler.createRepeated(point.getValues());
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(point.getAttributes());
MarshalerWithSize[] attributeMarshalers =
KeyValueMarshaler.createForAttributes(point.getAttributes());
return new SummaryDataPointMarshaler(
point.getStartEpochNanos(),
@ -52,7 +52,7 @@ final class SummaryDataPointMarshaler extends MarshalerWithSize {
long count,
double sum,
ValueAtQuantileMarshaler[] quantileValues,
KeyValueMarshaler[] attributes) {
MarshalerWithSize[] attributes) {
super(calculateSize(startTimeUnixNano, timeUnixNano, count, sum, quantileValues, attributes));
this.startTimeUnixNano = startTimeUnixNano;
this.timeUnixNano = timeUnixNano;
@ -78,7 +78,7 @@ final class SummaryDataPointMarshaler extends MarshalerWithSize {
long count,
double sum,
ValueAtQuantileMarshaler[] quantileValues,
KeyValueMarshaler[] attributes) {
MarshalerWithSize[] attributes) {
int size = 0;
size += MarshalerUtil.sizeFixed64(SummaryDataPoint.START_TIME_UNIX_NANO, startTimeUnixNano);
size += MarshalerUtil.sizeFixed64(SummaryDataPoint.TIME_UNIX_NANO, timeUnixNano);

View File

@ -40,7 +40,7 @@ final class SpanEventMarshaler extends MarshalerWithSize {
return new SpanEventMarshaler(
event.getEpochNanos(),
MarshalerUtil.toBytes(event.getName()),
KeyValueMarshaler.createRepeated(event.getAttributes()),
KeyValueMarshaler.createForAttributes(event.getAttributes()),
event.getTotalAttributeCount() - event.getAttributes().size());
}

View File

@ -52,7 +52,7 @@ final class SpanLinkMarshaler extends MarshalerWithSize {
link.getSpanContext().getTraceId(),
link.getSpanContext().getSpanId(),
traceStateUtf8,
KeyValueMarshaler.createRepeated(link.getAttributes()),
KeyValueMarshaler.createForAttributes(link.getAttributes()),
link.getTotalAttributeCount() - link.getAttributes().size());
}

View File

@ -41,7 +41,7 @@ final class SpanMarshaler extends MarshalerWithSize {
// Because SpanMarshaler is always part of a repeated field, it cannot return "null".
static SpanMarshaler create(SpanData spanData) {
KeyValueMarshaler[] attributeMarshalers =
KeyValueMarshaler.createRepeated(spanData.getAttributes());
KeyValueMarshaler.createForAttributes(spanData.getAttributes());
SpanEventMarshaler[] spanEventMarshalers =
SpanEventMarshaler.createRepeated(spanData.getEvents());
SpanLinkMarshaler[] spanLinkMarshalers = SpanLinkMarshaler.createRepeated(spanData.getLinks());

View File

@ -0,0 +1,171 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.internal.otlp;
import static io.opentelemetry.extension.incubator.logs.AnyValue.of;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.exporter.internal.marshal.Marshaler;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.extension.incubator.logs.KeyAnyValue;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.ArrayValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.common.v1.KeyValueList;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@SuppressWarnings("BadImport")
class AnyValueMarshalerTest {
@ParameterizedTest
@MethodSource("serializeAnyValueArgs")
void anyValueString(
io.opentelemetry.extension.incubator.logs.AnyValue<?> anyValue,
AnyValue expectedSerializedValue) {
MarshalerWithSize marshaler = AnyValueMarshaler.create(anyValue);
AnyValue serializedValue = parse(AnyValue.getDefaultInstance(), marshaler);
assertThat(serializedValue).isEqualTo(expectedSerializedValue);
}
private static Stream<Arguments> serializeAnyValueArgs() {
return Stream.of(
// primitives
arguments(of("str"), AnyValue.newBuilder().setStringValue("str").build()),
arguments(of(true), AnyValue.newBuilder().setBoolValue(true).build()),
arguments(of(1), AnyValue.newBuilder().setIntValue(1).build()),
arguments(of(1.1), AnyValue.newBuilder().setDoubleValue(1.1).build()),
// heterogeneous array
arguments(
of(of("str"), of(true), of(1), of(1.1)),
AnyValue.newBuilder()
.setArrayValue(
ArrayValue.newBuilder()
.addValues(AnyValue.newBuilder().setStringValue("str").build())
.addValues(AnyValue.newBuilder().setBoolValue(true).build())
.addValues(AnyValue.newBuilder().setIntValue(1).build())
.addValues(AnyValue.newBuilder().setDoubleValue(1.1).build())
.build())
.build()),
// map
arguments(
of(KeyAnyValue.of("key1", of("val1")), KeyAnyValue.of("key2", of(2))),
AnyValue.newBuilder()
.setKvlistValue(
KeyValueList.newBuilder()
.addValues(
KeyValue.newBuilder()
.setKey("key1")
.setValue(AnyValue.newBuilder().setStringValue("val1").build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("key2")
.setValue(AnyValue.newBuilder().setIntValue(2).build())
.build())
.build())
.build()),
// map of maps
arguments(
of(
Collections.singletonMap(
"child", of(Collections.singletonMap("grandchild", of("str"))))),
AnyValue.newBuilder()
.setKvlistValue(
KeyValueList.newBuilder()
.addValues(
KeyValue.newBuilder()
.setKey("child")
.setValue(
AnyValue.newBuilder()
.setKvlistValue(
KeyValueList.newBuilder()
.addValues(
KeyValue.newBuilder()
.setKey("grandchild")
.setValue(
AnyValue.newBuilder()
.setStringValue("str")
.build())
.build())
.build())
.build())
.build())
.build())
.build()),
// bytes
arguments(
of("hello world".getBytes(StandardCharsets.UTF_8)),
AnyValue.newBuilder()
.setBytesValue(ByteString.copyFrom("hello world".getBytes(StandardCharsets.UTF_8)))
.build()));
}
@SuppressWarnings("unchecked")
private static <T extends Message> T parse(T prototype, Marshaler marshaler) {
byte[] serialized = toByteArray(marshaler);
T result;
try {
result = (T) prototype.newBuilderForType().mergeFrom(serialized).build();
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
// Our marshaler should produce the exact same length of serialized output (for example, field
// default values are not outputted), so we check that here. The output itself may have slightly
// different ordering, mostly due to the way we don't output oneof values in field order all the
// tieme. If the lengths are equal and the resulting protos are equal, the marshaling is
// guaranteed to be valid.
assertThat(result.getSerializedSize()).isEqualTo(serialized.length);
// We don't compare JSON strings due to some differences (particularly serializing enums as
// numbers instead of names). This may improve in the future but what matters is what we produce
// can be parsed.
String json = toJson(marshaler);
Message.Builder builder = prototype.newBuilderForType();
try {
JsonFormat.parser().merge(json, builder);
} catch (InvalidProtocolBufferException e) {
throw new UncheckedIOException(e);
}
assertThat(builder.build()).isEqualTo(result);
return result;
}
private static byte[] toByteArray(Marshaler marshaler) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
marshaler.writeBinaryTo(bos);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return bos.toByteArray();
}
private static String toJson(Marshaler marshaler) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
marshaler.writeJsonTo(bos);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return new String(bos.toByteArray(), StandardCharsets.UTF_8);
}
}

View File

@ -10,6 +10,7 @@ dependencies {
implementation(project(":exporters:otlp:all"))
implementation(project(":api:events"))
implementation(project(":extensions:incubator"))
compileOnly("com.google.errorprone:error_prone_annotations")

View File

@ -5,11 +5,13 @@
package io.opentelemetry.integrationtest;
import static io.opentelemetry.extension.incubator.logs.AnyValue.of;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.testcontainers.Testcontainers.exposeHostPorts;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServiceRequestContext;
@ -38,6 +40,8 @@ import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporter;
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter;
import io.opentelemetry.extension.incubator.logs.ExtendedLogRecordBuilder;
import io.opentelemetry.extension.incubator.logs.KeyAnyValue;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest;
import io.opentelemetry.proto.collector.logs.v1.ExportLogsServiceResponse;
import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest;
@ -45,7 +49,9 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceResponse;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.ArrayValue;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.common.v1.KeyValueList;
import io.opentelemetry.proto.logs.v1.ResourceLogs;
import io.opentelemetry.proto.logs.v1.ScopeLogs;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
@ -73,6 +79,7 @@ import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
@ -514,6 +521,7 @@ abstract class OtlpExporterIntegrationTest {
testLogRecordExporter(exporter);
}
@SuppressWarnings("BadImport")
private static void testLogRecordExporter(LogRecordExporter logRecordExporter) {
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder()
@ -539,10 +547,23 @@ abstract class OtlpExporterIntegrationTest {
TraceState.getDefault());
try (Scope unused = Span.wrap(spanContext).makeCurrent()) {
logger
.logRecordBuilder()
((ExtendedLogRecordBuilder) logger.logRecordBuilder())
.setBody(
of(
KeyAnyValue.of("str_key", of("value")),
KeyAnyValue.of("bool_key", of(true)),
KeyAnyValue.of("int_key", of(1L)),
KeyAnyValue.of("double_key", of(1.1)),
KeyAnyValue.of("bytes_key", of("value".getBytes(StandardCharsets.UTF_8))),
KeyAnyValue.of("arr_key", of(of("value"), of(1L))),
KeyAnyValue.of(
"kv_list",
of(
KeyAnyValue.of("child_str_key", of("value")),
KeyAnyValue.of(
"child_kv_list",
of(KeyAnyValue.of("grandchild_str_key", of("value"))))))))
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setBody("log body")
.setAllAttributes(Attributes.builder().put("key", "value").build())
.setSeverity(Severity.DEBUG)
.setSeverityText("DEBUG")
@ -576,7 +597,98 @@ abstract class OtlpExporterIntegrationTest {
// LogRecord via Logger.logRecordBuilder()...emit()
io.opentelemetry.proto.logs.v1.LogRecord protoLog1 = ilLogs.getLogRecords(0);
assertThat(protoLog1.getBody().getStringValue()).isEqualTo("log body");
assertThat(protoLog1.getBody())
.isEqualTo(
AnyValue.newBuilder()
.setKvlistValue(
KeyValueList.newBuilder()
.addValues(
KeyValue.newBuilder()
.setKey("str_key")
.setValue(AnyValue.newBuilder().setStringValue("value").build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("bool_key")
.setValue(AnyValue.newBuilder().setBoolValue(true).build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("int_key")
.setValue(AnyValue.newBuilder().setIntValue(1).build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("double_key")
.setValue(AnyValue.newBuilder().setDoubleValue(1.1).build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("bytes_key")
.setValue(
AnyValue.newBuilder()
.setBytesValue(
ByteString.copyFrom(
"value".getBytes(StandardCharsets.UTF_8)))
.build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("arr_key")
.setValue(
AnyValue.newBuilder()
.setArrayValue(
ArrayValue.newBuilder()
.addValues(
AnyValue.newBuilder()
.setStringValue("value")
.build())
.addValues(
AnyValue.newBuilder().setIntValue(1).build())
.build())
.build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("kv_list")
.setValue(
AnyValue.newBuilder()
.setKvlistValue(
KeyValueList.newBuilder()
.addValues(
KeyValue.newBuilder()
.setKey("child_str_key")
.setValue(
AnyValue.newBuilder()
.setStringValue("value")
.build())
.build())
.addValues(
KeyValue.newBuilder()
.setKey("child_kv_list")
.setValue(
AnyValue.newBuilder()
.setKvlistValue(
KeyValueList.newBuilder()
.addValues(
KeyValue.newBuilder()
.setKey(
"grandchild_str_key")
.setValue(
AnyValue
.newBuilder()
.setStringValue(
"value")
.build())
.build())
.build())
.build())
.build())
.build())
.build())
.build())
.build())
.build());
assertThat(protoLog1.getAttributesList())
.isEqualTo(
Collections.singletonList(

View File

@ -88,7 +88,7 @@ final class SdkLogRecordBuilder implements ExtendedLogRecordBuilder {
@Override
public SdkLogRecordBuilder setBody(String body) {
this.body = Body.string(body);
this.body = AnyValueBody.create(AnyValue.of(body));
return this;
}