Low allocation OTLP trace marshaler (#6410)
This commit is contained in:
parent
e1f707e8dc
commit
2e59f5477f
|
@ -89,12 +89,17 @@ public final class OtelEncodingUtils {
|
|||
/** Returns the {@code byte[]} decoded from the given hex {@link CharSequence}. */
|
||||
public static byte[] bytesFromBase16(CharSequence value, int length) {
|
||||
byte[] result = new byte[length / 2];
|
||||
for (int i = 0; i < length; i += 2) {
|
||||
result[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
|
||||
}
|
||||
bytesFromBase16(value, length, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
/** Fills {@code bytes} with bytes decoded from the given hex {@link CharSequence}. */
|
||||
public static void bytesFromBase16(CharSequence value, int length, byte[] bytes) {
|
||||
for (int i = 0; i < length; i += 2) {
|
||||
bytes[i / 2] = byteFromBase16(value.charAt(i), value.charAt(i + 1));
|
||||
}
|
||||
}
|
||||
|
||||
/** Fills {@code dest} with the hex encoding of {@code bytes}. */
|
||||
public static void bytesToBase16(byte[] bytes, char[] dest, int length) {
|
||||
for (int i = 0; i < length; i++) {
|
||||
|
|
|
@ -108,6 +108,14 @@ final class JsonSerializer extends Serializer {
|
|||
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeString(
|
||||
ProtoFieldInfo field, String string, int utf8Length, MarshalerContext context)
|
||||
throws IOException {
|
||||
generator.writeFieldName(field.getJsonName());
|
||||
generator.writeString(string);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
|
||||
generator.writeBinaryField(field.getJsonName(), value);
|
||||
|
@ -165,6 +173,44 @@ final class JsonSerializer extends Serializer {
|
|||
generator.writeEndArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void serializeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
List<? extends T> messages,
|
||||
StatelessMarshaler<T> marshaler,
|
||||
MarshalerContext context)
|
||||
throws IOException {
|
||||
generator.writeArrayFieldStart(field.getJsonName());
|
||||
for (int i = 0; i < messages.size(); i++) {
|
||||
T message = messages.get(i);
|
||||
generator.writeStartObject();
|
||||
marshaler.writeTo(this, message, context);
|
||||
generator.writeEndObject();
|
||||
}
|
||||
generator.writeEndArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeStartRepeated(ProtoFieldInfo field) throws IOException {
|
||||
generator.writeArrayFieldStart(field.getJsonName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeEndRepeated() throws IOException {
|
||||
generator.writeEndArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
|
||||
throws IOException {
|
||||
generator.writeStartObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeEndRepeatedElement() throws IOException {
|
||||
generator.writeEndObject();
|
||||
}
|
||||
|
||||
// Not a field.
|
||||
void writeMessageValue(Marshaler message) throws IOException {
|
||||
generator.writeStartObject();
|
||||
|
|
|
@ -0,0 +1,226 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.marshal;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanId;
|
||||
import io.opentelemetry.api.trace.TraceId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.IdentityHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Supplier;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Class for keeping marshaling state. The state consists of integers, that we call sizes, and
|
||||
* objects, that we call data. Both integers and objects can be read from the state in the order
|
||||
* they were added (first in, first out). Additionally, this class provides various pools and caches
|
||||
* for objects that can be reused between marshalling attempts.
|
||||
*/
|
||||
public final class MarshalerContext {
|
||||
private final boolean marshalStringNoAllocation;
|
||||
|
||||
private int[] sizes = new int[16];
|
||||
private int sizeReadIndex;
|
||||
private int sizeWriteIndex;
|
||||
private Object[] data = new Object[16];
|
||||
private int dataReadIndex;
|
||||
private int dataWriteIndex;
|
||||
|
||||
@SuppressWarnings("BooleanParameter")
|
||||
public MarshalerContext() {
|
||||
this(true);
|
||||
}
|
||||
|
||||
public MarshalerContext(boolean marshalStringNoAllocation) {
|
||||
this.marshalStringNoAllocation = marshalStringNoAllocation;
|
||||
}
|
||||
|
||||
public boolean marshalStringNoAllocation() {
|
||||
return marshalStringNoAllocation;
|
||||
}
|
||||
|
||||
public void addSize(int size) {
|
||||
growSizeIfNeeded();
|
||||
sizes[sizeWriteIndex++] = size;
|
||||
}
|
||||
|
||||
public int addSize() {
|
||||
growSizeIfNeeded();
|
||||
return sizeWriteIndex++;
|
||||
}
|
||||
|
||||
private void growSizeIfNeeded() {
|
||||
if (sizeWriteIndex == sizes.length) {
|
||||
int[] newSizes = new int[sizes.length * 2];
|
||||
System.arraycopy(sizes, 0, newSizes, 0, sizes.length);
|
||||
sizes = newSizes;
|
||||
}
|
||||
}
|
||||
|
||||
public void setSize(int index, int size) {
|
||||
sizes[index] = size;
|
||||
}
|
||||
|
||||
public int getSize() {
|
||||
return sizes[sizeReadIndex++];
|
||||
}
|
||||
|
||||
public void addData(@Nullable Object o) {
|
||||
growDataIfNeeded();
|
||||
data[dataWriteIndex++] = o;
|
||||
}
|
||||
|
||||
private void growDataIfNeeded() {
|
||||
if (dataWriteIndex == data.length) {
|
||||
Object[] newData = new Object[data.length * 2];
|
||||
System.arraycopy(data, 0, newData, 0, data.length);
|
||||
data = newData;
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T getData(Class<T> type) {
|
||||
return type.cast(data[dataReadIndex++]);
|
||||
}
|
||||
|
||||
private final IdPool traceIdPool = new IdPool(TraceId.getLength() / 2);
|
||||
|
||||
/** Returns a buffer that can be used to hold a trace id. */
|
||||
public byte[] getTraceIdBuffer() {
|
||||
return traceIdPool.get();
|
||||
}
|
||||
|
||||
private final IdPool spanIdPool = new IdPool(SpanId.getLength() / 2);
|
||||
|
||||
/** Returns a buffer that can be used to hold a span id. */
|
||||
public byte[] getSpanIdBuffer() {
|
||||
return spanIdPool.get();
|
||||
}
|
||||
|
||||
private static class IdPool {
|
||||
private final List<byte[]> pool = new ArrayList<>();
|
||||
int index;
|
||||
final int idSize;
|
||||
|
||||
IdPool(int idSize) {
|
||||
this.idSize = idSize;
|
||||
}
|
||||
|
||||
byte[] get() {
|
||||
if (index < pool.size()) {
|
||||
return pool.get(index++);
|
||||
}
|
||||
byte[] result = new byte[idSize];
|
||||
pool.add(result);
|
||||
index++;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
index = 0;
|
||||
}
|
||||
}
|
||||
|
||||
private final Pool<Map<?, ?>> mapPool = new Pool<>(IdentityHashMap::new, Map::clear);
|
||||
|
||||
/** Returns a pooled identity map. */
|
||||
@SuppressWarnings("unchecked")
|
||||
public <K, V> Map<K, V> getIdentityMap() {
|
||||
return (Map<K, V>) mapPool.get();
|
||||
}
|
||||
|
||||
private final Pool<List<?>> listPool = new Pool<>(ArrayList::new, List::clear);
|
||||
|
||||
/** Returns a pooled list. */
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> List<T> getList() {
|
||||
return (List<T>) listPool.get();
|
||||
}
|
||||
|
||||
private static class Pool<T> {
|
||||
private final List<T> pool = new ArrayList<>();
|
||||
private int index;
|
||||
private final Supplier<T> factory;
|
||||
private final Consumer<T> clean;
|
||||
|
||||
Pool(Supplier<T> factory, Consumer<T> clean) {
|
||||
this.factory = factory;
|
||||
this.clean = clean;
|
||||
}
|
||||
|
||||
T get() {
|
||||
if (index < pool.size()) {
|
||||
return pool.get(index++);
|
||||
}
|
||||
T result = factory.get();
|
||||
pool.add(result);
|
||||
index++;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
for (int i = 0; i < index; i++) {
|
||||
clean.accept(pool.get(i));
|
||||
}
|
||||
index = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/** Reset context so that serialization could be re-run. */
|
||||
public void resetReadIndex() {
|
||||
sizeReadIndex = 0;
|
||||
dataReadIndex = 0;
|
||||
}
|
||||
|
||||
/** Reset context so that it could be reused. */
|
||||
public void reset() {
|
||||
sizeReadIndex = 0;
|
||||
sizeWriteIndex = 0;
|
||||
for (int i = 0; i < dataWriteIndex; i++) {
|
||||
data[i] = null;
|
||||
}
|
||||
dataReadIndex = 0;
|
||||
dataWriteIndex = 0;
|
||||
|
||||
traceIdPool.reset();
|
||||
spanIdPool.reset();
|
||||
|
||||
mapPool.reset();
|
||||
listPool.reset();
|
||||
}
|
||||
|
||||
private static final AtomicInteger KEY_INDEX = new AtomicInteger();
|
||||
|
||||
public static class Key {
|
||||
final int index = KEY_INDEX.getAndIncrement();
|
||||
}
|
||||
|
||||
public static Key key() {
|
||||
return new Key();
|
||||
}
|
||||
|
||||
private Object[] instances = new Object[16];
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> T getInstance(Key key, Supplier<T> supplier) {
|
||||
if (key.index >= instances.length) {
|
||||
Object[] newData = new Object[instances.length * 2];
|
||||
System.arraycopy(instances, 0, newData, 0, instances.length);
|
||||
instances = newData;
|
||||
}
|
||||
|
||||
T result = (T) instances[key.index];
|
||||
if (result == null) {
|
||||
result = supplier.get();
|
||||
instances[key.index] = result;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
|
@ -40,6 +40,18 @@ final class ProtoSerializer extends Serializer implements AutoCloseable {
|
|||
writeBytes(field, traceIdBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeTraceId(ProtoFieldInfo field, String traceId, MarshalerContext context)
|
||||
throws IOException {
|
||||
byte[] traceIdBytes = idCache.get(traceId);
|
||||
if (traceIdBytes == null) {
|
||||
traceIdBytes = context.getTraceIdBuffer();
|
||||
OtelEncodingUtils.bytesFromBase16(traceId, TraceId.getLength(), traceIdBytes);
|
||||
idCache.put(traceId, traceIdBytes);
|
||||
}
|
||||
writeBytes(field, traceIdBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeSpanId(ProtoFieldInfo field, String spanId) throws IOException {
|
||||
byte[] spanIdBytes =
|
||||
|
@ -48,6 +60,18 @@ final class ProtoSerializer extends Serializer implements AutoCloseable {
|
|||
writeBytes(field, spanIdBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeSpanId(ProtoFieldInfo field, String spanId, MarshalerContext context)
|
||||
throws IOException {
|
||||
byte[] spanIdBytes = idCache.get(spanId);
|
||||
if (spanIdBytes == null) {
|
||||
spanIdBytes = context.getSpanIdBuffer();
|
||||
OtelEncodingUtils.bytesFromBase16(spanId, SpanId.getLength(), spanIdBytes);
|
||||
idCache.put(spanId, spanIdBytes);
|
||||
}
|
||||
writeBytes(field, spanIdBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBool(ProtoFieldInfo field, boolean value) throws IOException {
|
||||
output.writeUInt32NoTag(field.getTag());
|
||||
|
@ -122,6 +146,16 @@ final class ProtoSerializer extends Serializer implements AutoCloseable {
|
|||
writeBytes(field, utf8Bytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeString(
|
||||
ProtoFieldInfo field, String string, int utf8Length, MarshalerContext context)
|
||||
throws IOException {
|
||||
output.writeUInt32NoTag(field.getTag());
|
||||
output.writeUInt32NoTag(utf8Length);
|
||||
|
||||
StatelessMarshalerUtil.writeUtf8(output, string, utf8Length, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
|
||||
output.writeUInt32NoTag(field.getTag());
|
||||
|
@ -179,6 +213,42 @@ final class ProtoSerializer extends Serializer implements AutoCloseable {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> void serializeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
List<? extends T> messages,
|
||||
StatelessMarshaler<T> marshaler,
|
||||
MarshalerContext context)
|
||||
throws IOException {
|
||||
for (int i = 0; i < messages.size(); i++) {
|
||||
T message = messages.get(i);
|
||||
writeStartMessage(field, context.getSize());
|
||||
marshaler.writeTo(this, message, context);
|
||||
writeEndMessage();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeStartRepeated(ProtoFieldInfo field) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeEndRepeated() {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
|
||||
throws IOException {
|
||||
writeStartMessage(field, protoMessageSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeEndRepeatedElement() {
|
||||
writeEndMessage();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeSerializedMessage(byte[] protoSerialized, String jsonSerialized)
|
||||
throws IOException {
|
||||
|
|
|
@ -5,9 +5,15 @@
|
|||
|
||||
package io.opentelemetry.exporter.internal.marshal;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
|
@ -23,6 +29,7 @@ import javax.annotation.Nullable;
|
|||
* at any time.
|
||||
*/
|
||||
public abstract class Serializer implements AutoCloseable {
|
||||
private static final MarshalerContext.Key ATTRIBUTES_WRITER_KEY = MarshalerContext.key();
|
||||
|
||||
Serializer() {}
|
||||
|
||||
|
@ -34,8 +41,21 @@ public abstract class Serializer implements AutoCloseable {
|
|||
writeTraceId(field, traceId);
|
||||
}
|
||||
|
||||
public void serializeTraceId(
|
||||
ProtoFieldInfo field, @Nullable String traceId, MarshalerContext context) throws IOException {
|
||||
if (traceId == null) {
|
||||
return;
|
||||
}
|
||||
writeTraceId(field, traceId, context);
|
||||
}
|
||||
|
||||
protected abstract void writeTraceId(ProtoFieldInfo field, String traceId) throws IOException;
|
||||
|
||||
protected void writeTraceId(ProtoFieldInfo field, String traceId, MarshalerContext context)
|
||||
throws IOException {
|
||||
writeTraceId(field, traceId);
|
||||
}
|
||||
|
||||
/** Serializes a span ID field. */
|
||||
public void serializeSpanId(ProtoFieldInfo field, @Nullable String spanId) throws IOException {
|
||||
if (spanId == null) {
|
||||
|
@ -44,8 +64,21 @@ public abstract class Serializer implements AutoCloseable {
|
|||
writeSpanId(field, spanId);
|
||||
}
|
||||
|
||||
public void serializeSpanId(
|
||||
ProtoFieldInfo field, @Nullable String spanId, MarshalerContext context) throws IOException {
|
||||
if (spanId == null) {
|
||||
return;
|
||||
}
|
||||
writeSpanId(field, spanId, context);
|
||||
}
|
||||
|
||||
protected abstract void writeSpanId(ProtoFieldInfo field, String spanId) throws IOException;
|
||||
|
||||
protected void writeSpanId(ProtoFieldInfo field, String spanId, MarshalerContext context)
|
||||
throws IOException {
|
||||
writeSpanId(field, spanId);
|
||||
}
|
||||
|
||||
/** Serializes a protobuf {@code bool} field. */
|
||||
public void serializeBool(ProtoFieldInfo field, boolean value) throws IOException {
|
||||
if (!value) {
|
||||
|
@ -175,9 +208,32 @@ public abstract class Serializer implements AutoCloseable {
|
|||
writeString(field, utf8Bytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes a protobuf {@code string} field. {@code string} is the value to be serialized and
|
||||
* {@code utf8Length} is the length of the string after it is encoded in UTF8. This method reads
|
||||
* elements from context, use together with {@link
|
||||
* StatelessMarshalerUtil#sizeStringWithContext(ProtoFieldInfo, String, MarshalerContext)}.
|
||||
*/
|
||||
public void serializeStringWithContext(
|
||||
ProtoFieldInfo field, @Nullable String string, MarshalerContext context) throws IOException {
|
||||
if (string == null || string.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
if (context.marshalStringNoAllocation()) {
|
||||
writeString(field, string, context.getSize(), context);
|
||||
} else {
|
||||
byte[] valueUtf8 = context.getData(byte[].class);
|
||||
writeString(field, valueUtf8);
|
||||
}
|
||||
}
|
||||
|
||||
/** Writes a protobuf {@code string} field, even if it matches the default value. */
|
||||
public abstract void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOException;
|
||||
|
||||
public abstract void writeString(
|
||||
ProtoFieldInfo field, String string, int utf8Length, MarshalerContext context)
|
||||
throws IOException;
|
||||
|
||||
/** Serializes a protobuf {@code bytes} field. */
|
||||
public void serializeBytes(ProtoFieldInfo field, byte[] value) throws IOException {
|
||||
if (value.length == 0) {
|
||||
|
@ -200,6 +256,36 @@ public abstract class Serializer implements AutoCloseable {
|
|||
writeEndMessage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes a protobuf embedded {@code message}. This method adds elements to context, use
|
||||
* together with {@link StatelessMarshalerUtil#sizeMessageWithContext(ProtoFieldInfo, Object,
|
||||
* StatelessMarshaler, MarshalerContext)}.
|
||||
*/
|
||||
public <T> void serializeMessageWithContext(
|
||||
ProtoFieldInfo field, T message, StatelessMarshaler<T> marshaler, MarshalerContext context)
|
||||
throws IOException {
|
||||
writeStartMessage(field, context.getSize());
|
||||
marshaler.writeTo(this, message, context);
|
||||
writeEndMessage();
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes a protobuf embedded {@code message}. This method adds elements to context, use
|
||||
* together with {@link StatelessMarshalerUtil#sizeMessageWithContext(ProtoFieldInfo, Object,
|
||||
* Object, StatelessMarshaler2, MarshalerContext)}.
|
||||
*/
|
||||
public <K, V> void serializeMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
K key,
|
||||
V value,
|
||||
StatelessMarshaler2<K, V> marshaler,
|
||||
MarshalerContext context)
|
||||
throws IOException {
|
||||
writeStartMessage(field, context.getSize());
|
||||
marshaler.writeTo(this, key, value, context);
|
||||
writeEndMessage();
|
||||
}
|
||||
|
||||
@SuppressWarnings("SameParameterValue")
|
||||
protected abstract void writeStartRepeatedPrimitive(
|
||||
ProtoFieldInfo field, int protoSizePerElement, int numElements) throws IOException;
|
||||
|
@ -217,7 +303,8 @@ public abstract class Serializer implements AutoCloseable {
|
|||
return;
|
||||
}
|
||||
writeStartRepeatedPrimitive(field, WireFormat.FIXED64_SIZE, values.size());
|
||||
for (long value : values) {
|
||||
for (int i = 0; i < values.size(); i++) {
|
||||
Long value = values.get(i);
|
||||
writeFixed64Value(value);
|
||||
}
|
||||
writeEndRepeatedPrimitive();
|
||||
|
@ -286,7 +373,8 @@ public abstract class Serializer implements AutoCloseable {
|
|||
return;
|
||||
}
|
||||
writeStartRepeatedPrimitive(field, WireFormat.FIXED64_SIZE, values.size());
|
||||
for (double value : values) {
|
||||
for (int i = 0; i < values.size(); i++) {
|
||||
Double value = values.get(i);
|
||||
writeDoubleValue(value);
|
||||
}
|
||||
writeEndRepeatedPrimitive();
|
||||
|
@ -301,6 +389,179 @@ public abstract class Serializer implements AutoCloseable {
|
|||
public abstract void serializeRepeatedMessage(
|
||||
ProtoFieldInfo field, List<? extends Marshaler> repeatedMessage) throws IOException;
|
||||
|
||||
/**
|
||||
* Serializes {@code repeated message} field. This method reads elements from context, use
|
||||
* together with {@link StatelessMarshalerUtil#sizeRepeatedMessageWithContext(ProtoFieldInfo,
|
||||
* List, StatelessMarshaler, MarshalerContext)}.
|
||||
*/
|
||||
public abstract <T> void serializeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
List<? extends T> messages,
|
||||
StatelessMarshaler<T> marshaler,
|
||||
MarshalerContext context)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Serializes {@code repeated message} field. This method reads elements from context, use
|
||||
* together with {@link StatelessMarshalerUtil#sizeRepeatedMessageWithContext(ProtoFieldInfo,
|
||||
* Collection, StatelessMarshaler, MarshalerContext, MarshalerContext.Key)}.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> void serializeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
Collection<? extends T> messages,
|
||||
StatelessMarshaler<T> marshaler,
|
||||
MarshalerContext context,
|
||||
MarshalerContext.Key key)
|
||||
throws IOException {
|
||||
if (messages instanceof List) {
|
||||
serializeRepeatedMessageWithContext(field, (List<T>) messages, marshaler, context);
|
||||
return;
|
||||
}
|
||||
|
||||
writeStartRepeated(field);
|
||||
|
||||
if (!messages.isEmpty()) {
|
||||
RepeatedElementWriter<T> writer = context.getInstance(key, RepeatedElementWriter::new);
|
||||
writer.initialize(field, this, marshaler, context);
|
||||
messages.forEach(writer);
|
||||
}
|
||||
|
||||
writeEndRepeated();
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes {@code repeated message} field. This method reads elements from context, use
|
||||
* together with {@link StatelessMarshalerUtil#sizeRepeatedMessageWithContext(ProtoFieldInfo, Map,
|
||||
* StatelessMarshaler2, MarshalerContext, MarshalerContext.Key)}.
|
||||
*/
|
||||
public <K, V> void serializeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
Map<K, V> messages,
|
||||
StatelessMarshaler2<K, V> marshaler,
|
||||
MarshalerContext context,
|
||||
MarshalerContext.Key key)
|
||||
throws IOException {
|
||||
writeStartRepeated(field);
|
||||
|
||||
if (!messages.isEmpty()) {
|
||||
RepeatedElementPairWriter<K, V> writer =
|
||||
context.getInstance(key, RepeatedElementPairWriter::new);
|
||||
writer.initialize(field, this, marshaler, context);
|
||||
messages.forEach(writer);
|
||||
}
|
||||
|
||||
writeEndRepeated();
|
||||
}
|
||||
|
||||
/**
|
||||
* Serializes {@code repeated message} field. This method reads elements from context, use
|
||||
* together with {@link StatelessMarshalerUtil#sizeRepeatedMessageWithContext(ProtoFieldInfo,
|
||||
* Attributes, StatelessMarshaler2, MarshalerContext)}.
|
||||
*/
|
||||
public void serializeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
Attributes attributes,
|
||||
StatelessMarshaler2<AttributeKey<?>, Object> marshaler,
|
||||
MarshalerContext context)
|
||||
throws IOException {
|
||||
writeStartRepeated(field);
|
||||
|
||||
if (!attributes.isEmpty()) {
|
||||
RepeatedElementPairWriter<AttributeKey<?>, Object> writer =
|
||||
context.getInstance(ATTRIBUTES_WRITER_KEY, RepeatedElementPairWriter::new);
|
||||
writer.initialize(field, this, marshaler, context);
|
||||
attributes.forEach(writer);
|
||||
}
|
||||
|
||||
writeEndRepeated();
|
||||
}
|
||||
|
||||
private static class RepeatedElementWriter<T> implements Consumer<T> {
|
||||
@SuppressWarnings("NullAway")
|
||||
private ProtoFieldInfo field;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private Serializer output;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private StatelessMarshaler<T> marshaler;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private MarshalerContext context;
|
||||
|
||||
void initialize(
|
||||
ProtoFieldInfo field,
|
||||
Serializer output,
|
||||
StatelessMarshaler<T> marshaler,
|
||||
MarshalerContext context) {
|
||||
this.field = field;
|
||||
this.output = output;
|
||||
this.marshaler = marshaler;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(T element) {
|
||||
try {
|
||||
output.writeStartRepeatedElement(field, context.getSize());
|
||||
marshaler.writeTo(output, element, context);
|
||||
output.writeEndRepeatedElement();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class RepeatedElementPairWriter<K, V> implements BiConsumer<K, V> {
|
||||
@SuppressWarnings("NullAway")
|
||||
private ProtoFieldInfo field;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private Serializer output;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private StatelessMarshaler2<K, V> marshaler;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private MarshalerContext context;
|
||||
|
||||
void initialize(
|
||||
ProtoFieldInfo field,
|
||||
Serializer output,
|
||||
StatelessMarshaler2<K, V> marshaler,
|
||||
MarshalerContext context) {
|
||||
this.field = field;
|
||||
this.output = output;
|
||||
this.marshaler = marshaler;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(K key, V value) {
|
||||
try {
|
||||
output.writeStartRepeatedElement(field, context.getSize());
|
||||
marshaler.writeTo(output, key, value, context);
|
||||
output.writeEndRepeatedElement();
|
||||
} catch (IOException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Writes start of repeated messages. */
|
||||
protected abstract void writeStartRepeated(ProtoFieldInfo field) throws IOException;
|
||||
|
||||
/** Writes end of repeated messages. */
|
||||
protected abstract void writeEndRepeated() throws IOException;
|
||||
|
||||
/** Writes start of a repeated message element. */
|
||||
protected abstract void writeStartRepeatedElement(ProtoFieldInfo field, int protoMessageSize)
|
||||
throws IOException;
|
||||
|
||||
/** Writes end of a repeated message element. */
|
||||
protected abstract void writeEndRepeatedElement() throws IOException;
|
||||
|
||||
/** Writes the value for a message field that has been pre-serialized. */
|
||||
public abstract void writeSerializedMessage(byte[] protoSerialized, String jsonSerialized)
|
||||
throws IOException;
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.marshal;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Marshaler from an SDK structure to protobuf wire format. It is intended that the instances of
|
||||
* this interface don't keep marshaling state and can be singletons. Any state needed for marshaling
|
||||
* should be stored in {@link MarshalerContext}. Marshaler should be used so that first {@link
|
||||
* #getBinarySerializedSize} is called and after that {@link #writeTo} is called. Calling {@link
|
||||
* #getBinarySerializedSize} may add values to {@link MarshalerContext} that are later used in
|
||||
* {@link #writeTo}.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
public interface StatelessMarshaler<T> {
|
||||
|
||||
/** Returns the number of bytes marshaling given value will write in proto binary format. */
|
||||
int getBinarySerializedSize(T value, MarshalerContext context);
|
||||
|
||||
/** Marshal given value using the provided {@link Serializer}. */
|
||||
void writeTo(Serializer output, T value, MarshalerContext context) throws IOException;
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.marshal;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Marshaler from an SDK structure to protobuf wire format. It is intended that the instances of
|
||||
* this interface don't keep marshaling state and can be singletons. Any state needed for marshaling
|
||||
* should be stored in {@link MarshalerContext}. Marshaler should be used so that first {@link
|
||||
* #getBinarySerializedSize} is called and after that {@link #writeTo} is called. Calling {@link
|
||||
* #getBinarySerializedSize} may add values to {@link MarshalerContext} that are later used in
|
||||
* {@link #writeTo}.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
public interface StatelessMarshaler2<K, V> {
|
||||
|
||||
/** Returns the number of bytes this Marshaler will write. */
|
||||
int getBinarySerializedSize(K key, V value, MarshalerContext context);
|
||||
|
||||
/** Marshal given key and value using the provided {@link Serializer}. */
|
||||
void writeTo(Serializer output, K key, V value, MarshalerContext context) throws IOException;
|
||||
}
|
|
@ -0,0 +1,436 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.marshal;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Marshaler utilities.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
public final class StatelessMarshalerUtil {
|
||||
private static final MarshalerContext.Key GROUPER_KEY = MarshalerContext.key();
|
||||
private static final MarshalerContext.Key ATTRIBUTES_SIZE_CALCULATOR_KEY = MarshalerContext.key();
|
||||
|
||||
/** Groups SDK items by resource and instrumentation scope. */
|
||||
public static <T> Map<Resource, Map<InstrumentationScopeInfo, List<T>>> groupByResourceAndScope(
|
||||
Collection<T> dataList,
|
||||
Function<T, Resource> getResource,
|
||||
Function<T, InstrumentationScopeInfo> getInstrumentationScope,
|
||||
MarshalerContext context) {
|
||||
Map<Resource, Map<InstrumentationScopeInfo, List<T>>> result = context.getIdentityMap();
|
||||
|
||||
Grouper<T> grouper = context.getInstance(GROUPER_KEY, Grouper::new);
|
||||
grouper.initialize(result, getResource, getInstrumentationScope, context);
|
||||
dataList.forEach(grouper);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
private static class Grouper<T> implements Consumer<T> {
|
||||
@SuppressWarnings("NullAway")
|
||||
private Map<Resource, Map<InstrumentationScopeInfo, List<T>>> result;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private Function<T, Resource> getResource;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private Function<T, InstrumentationScopeInfo> getInstrumentationScope;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private MarshalerContext context;
|
||||
|
||||
void initialize(
|
||||
Map<Resource, Map<InstrumentationScopeInfo, List<T>>> result,
|
||||
Function<T, Resource> getResource,
|
||||
Function<T, InstrumentationScopeInfo> getInstrumentationScope,
|
||||
MarshalerContext context) {
|
||||
this.result = result;
|
||||
this.getResource = getResource;
|
||||
this.getInstrumentationScope = getInstrumentationScope;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(T data) {
|
||||
Resource resource = getResource.apply(data);
|
||||
Map<InstrumentationScopeInfo, List<T>> scopeInfoListMap = result.get(resource);
|
||||
if (scopeInfoListMap == null) {
|
||||
scopeInfoListMap = context.getIdentityMap();
|
||||
result.put(resource, scopeInfoListMap);
|
||||
}
|
||||
InstrumentationScopeInfo instrumentationScopeInfo = getInstrumentationScope.apply(data);
|
||||
List<T> elementList = scopeInfoListMap.get(instrumentationScopeInfo);
|
||||
if (elementList == null) {
|
||||
elementList = context.getList();
|
||||
scopeInfoListMap.put(instrumentationScopeInfo, elementList);
|
||||
}
|
||||
elementList.add(data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of a string field. This method adds elements to context, use together with
|
||||
* {@link Serializer#serializeStringWithContext(ProtoFieldInfo, String, MarshalerContext)}.
|
||||
*/
|
||||
public static int sizeStringWithContext(
|
||||
ProtoFieldInfo field, @Nullable String value, MarshalerContext context) {
|
||||
if (value == null || value.isEmpty()) {
|
||||
return sizeBytes(field, 0);
|
||||
}
|
||||
if (context.marshalStringNoAllocation()) {
|
||||
int utf8Size = getUtf8Size(value, context);
|
||||
context.addSize(utf8Size);
|
||||
return sizeBytes(field, utf8Size);
|
||||
} else {
|
||||
byte[] valueUtf8 = MarshalerUtil.toBytes(value);
|
||||
context.addData(valueUtf8);
|
||||
return sizeBytes(field, valueUtf8.length);
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns the size of a bytes field. */
|
||||
private static int sizeBytes(ProtoFieldInfo field, int length) {
|
||||
if (length == 0) {
|
||||
return 0;
|
||||
}
|
||||
return field.getTagSize() + CodedOutputStream.computeLengthDelimitedFieldSize(length);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of a repeated message field. This method adds elements to context, use
|
||||
* together with {@link Serializer#serializeRepeatedMessageWithContext(ProtoFieldInfo, List,
|
||||
* StatelessMarshaler, MarshalerContext)}.
|
||||
*/
|
||||
public static <T> int sizeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
List<? extends T> messages,
|
||||
StatelessMarshaler<T> marshaler,
|
||||
MarshalerContext context) {
|
||||
if (messages.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int size = 0;
|
||||
int fieldTagSize = field.getTagSize();
|
||||
for (int i = 0; i < messages.size(); i++) {
|
||||
T message = messages.get(i);
|
||||
int sizeIndex = context.addSize();
|
||||
int fieldSize = marshaler.getBinarySerializedSize(message, context);
|
||||
context.setSize(sizeIndex, fieldSize);
|
||||
size += fieldTagSize + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of a repeated message field. This method adds elements to context, use
|
||||
* together with {@link Serializer#serializeRepeatedMessageWithContext(ProtoFieldInfo, Collection,
|
||||
* StatelessMarshaler, MarshalerContext, MarshalerContext.Key)}.
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> int sizeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
Collection<? extends T> messages,
|
||||
StatelessMarshaler<T> marshaler,
|
||||
MarshalerContext context,
|
||||
MarshalerContext.Key key) {
|
||||
if (messages instanceof List) {
|
||||
return sizeRepeatedMessageWithContext(field, (List<T>) messages, marshaler, context);
|
||||
}
|
||||
|
||||
if (messages.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
RepeatedElementSizeCalculator<T> sizeCalculator =
|
||||
context.getInstance(key, RepeatedElementSizeCalculator::new);
|
||||
sizeCalculator.initialize(field, marshaler, context);
|
||||
messages.forEach(sizeCalculator);
|
||||
|
||||
return sizeCalculator.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of a repeated message field. This method adds elements to context, use
|
||||
* together with {@link Serializer#serializeRepeatedMessageWithContext(ProtoFieldInfo, Map,
|
||||
* StatelessMarshaler2, MarshalerContext, MarshalerContext.Key)}.
|
||||
*/
|
||||
public static <K, V> int sizeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
Map<K, V> messages,
|
||||
StatelessMarshaler2<K, V> marshaler,
|
||||
MarshalerContext context,
|
||||
MarshalerContext.Key key) {
|
||||
if (messages.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
RepeatedElementPairSizeCalculator<K, V> sizeCalculator =
|
||||
context.getInstance(key, RepeatedElementPairSizeCalculator::new);
|
||||
sizeCalculator.initialize(field, marshaler, context);
|
||||
messages.forEach(sizeCalculator);
|
||||
|
||||
return sizeCalculator.size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of a repeated message field. This method adds elements to context, use
|
||||
* together with {@link Serializer#serializeRepeatedMessageWithContext(ProtoFieldInfo, Attributes,
|
||||
* StatelessMarshaler2, MarshalerContext)}.
|
||||
*/
|
||||
public static int sizeRepeatedMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
Attributes attributes,
|
||||
StatelessMarshaler2<AttributeKey<?>, Object> marshaler,
|
||||
MarshalerContext context) {
|
||||
if (attributes.isEmpty()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
RepeatedElementPairSizeCalculator<AttributeKey<?>, Object> sizeCalculator =
|
||||
context.getInstance(ATTRIBUTES_SIZE_CALCULATOR_KEY, RepeatedElementPairSizeCalculator::new);
|
||||
sizeCalculator.initialize(field, marshaler, context);
|
||||
attributes.forEach(sizeCalculator);
|
||||
|
||||
return sizeCalculator.size;
|
||||
}
|
||||
|
||||
private static class RepeatedElementSizeCalculator<T> implements Consumer<T> {
|
||||
private int size;
|
||||
private int fieldTagSize;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private StatelessMarshaler<T> marshaler;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private MarshalerContext context;
|
||||
|
||||
void initialize(
|
||||
ProtoFieldInfo field, StatelessMarshaler<T> marshaler, MarshalerContext context) {
|
||||
this.size = 0;
|
||||
this.fieldTagSize = field.getTagSize();
|
||||
this.marshaler = marshaler;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(T element) {
|
||||
int sizeIndex = context.addSize();
|
||||
int fieldSize = marshaler.getBinarySerializedSize(element, context);
|
||||
context.setSize(sizeIndex, fieldSize);
|
||||
size += fieldTagSize + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
|
||||
}
|
||||
}
|
||||
|
||||
private static class RepeatedElementPairSizeCalculator<K, V> implements BiConsumer<K, V> {
|
||||
private int size;
|
||||
private int fieldTagSize;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private StatelessMarshaler2<K, V> marshaler;
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private MarshalerContext context;
|
||||
|
||||
void initialize(
|
||||
ProtoFieldInfo field, StatelessMarshaler2<K, V> marshaler, MarshalerContext context) {
|
||||
this.size = 0;
|
||||
this.fieldTagSize = field.getTagSize();
|
||||
this.marshaler = marshaler;
|
||||
this.context = context;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void accept(K key, V value) {
|
||||
int sizeIndex = context.addSize();
|
||||
int fieldSize = marshaler.getBinarySerializedSize(key, value, context);
|
||||
context.setSize(sizeIndex, fieldSize);
|
||||
size += fieldTagSize + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of a message field. This method adds elements to context, use together with
|
||||
* {@link Serializer#serializeMessageWithContext(ProtoFieldInfo, Object, StatelessMarshaler,
|
||||
* MarshalerContext)}.
|
||||
*/
|
||||
public static <T> int sizeMessageWithContext(
|
||||
ProtoFieldInfo field, T element, StatelessMarshaler<T> marshaler, MarshalerContext context) {
|
||||
int sizeIndex = context.addSize();
|
||||
int fieldSize = marshaler.getBinarySerializedSize(element, context);
|
||||
int size = field.getTagSize() + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
|
||||
context.setSize(sizeIndex, fieldSize);
|
||||
return size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the size of a message field. This method adds elements to context, use together with
|
||||
* {@link Serializer#serializeMessageWithContext(ProtoFieldInfo, Object, Object,
|
||||
* StatelessMarshaler2, MarshalerContext)}.
|
||||
*/
|
||||
public static <K, V> int sizeMessageWithContext(
|
||||
ProtoFieldInfo field,
|
||||
K key,
|
||||
V value,
|
||||
StatelessMarshaler2<K, V> marshaler,
|
||||
MarshalerContext context) {
|
||||
int sizeIndex = context.addSize();
|
||||
int fieldSize = marshaler.getBinarySerializedSize(key, value, context);
|
||||
int size = field.getTagSize() + CodedOutputStream.computeUInt32SizeNoTag(fieldSize) + fieldSize;
|
||||
context.setSize(sizeIndex, fieldSize);
|
||||
return size;
|
||||
}
|
||||
|
||||
/** Returns the size of utf8 encoded string in bytes. */
|
||||
@SuppressWarnings("UnusedVariable")
|
||||
private static int getUtf8Size(String string, MarshalerContext context) {
|
||||
return getUtf8Size(string);
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
static int getUtf8Size(String string) {
|
||||
return encodedUtf8Length(string);
|
||||
}
|
||||
|
||||
// adapted from
|
||||
// https://github.com/protocolbuffers/protobuf/blob/b618f6750aed641a23d5f26fbbaf654668846d24/java/core/src/main/java/com/google/protobuf/Utf8.java#L217
|
||||
private static int encodedUtf8Length(String string) {
|
||||
// Warning to maintainers: this implementation is highly optimized.
|
||||
int utf16Length = string.length();
|
||||
int utf8Length = utf16Length;
|
||||
int i = 0;
|
||||
|
||||
// This loop optimizes for pure ASCII.
|
||||
while (i < utf16Length && string.charAt(i) < 0x80) {
|
||||
i++;
|
||||
}
|
||||
|
||||
// This loop optimizes for chars less than 0x800.
|
||||
for (; i < utf16Length; i++) {
|
||||
char c = string.charAt(i);
|
||||
if (c < 0x800) {
|
||||
utf8Length += ((0x7f - c) >>> 31); // branch free!
|
||||
} else {
|
||||
utf8Length += encodedUtf8LengthGeneral(string, i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (utf8Length < utf16Length) {
|
||||
// Necessary and sufficient condition for overflow because of maximum 3x expansion
|
||||
throw new IllegalArgumentException(
|
||||
"UTF-8 length does not fit in int: " + (utf8Length + (1L << 32)));
|
||||
}
|
||||
|
||||
return utf8Length;
|
||||
}
|
||||
|
||||
// adapted from
|
||||
// https://github.com/protocolbuffers/protobuf/blob/b618f6750aed641a23d5f26fbbaf654668846d24/java/core/src/main/java/com/google/protobuf/Utf8.java#L247
|
||||
private static int encodedUtf8LengthGeneral(String string, int start) {
|
||||
int utf16Length = string.length();
|
||||
int utf8Length = 0;
|
||||
for (int i = start; i < utf16Length; i++) {
|
||||
char c = string.charAt(i);
|
||||
if (c < 0x800) {
|
||||
utf8Length += (0x7f - c) >>> 31; // branch free!
|
||||
} else {
|
||||
utf8Length += 2;
|
||||
if (Character.isSurrogate(c)) {
|
||||
// Check that we have a well-formed surrogate pair.
|
||||
if (Character.codePointAt(string, i) != c) {
|
||||
i++;
|
||||
} else {
|
||||
// invalid sequence
|
||||
// At this point we have accumulated 3 byes of length (2 in this method and 1 in caller)
|
||||
// for current character, reduce the length to 1 bytes as we are going to encode the
|
||||
// invalid character as ?
|
||||
utf8Length -= 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return utf8Length;
|
||||
}
|
||||
|
||||
/** Write utf8 encoded string to output stream. */
|
||||
@SuppressWarnings("UnusedVariable") // context argument is added for future use
|
||||
static void writeUtf8(
|
||||
CodedOutputStream output, String string, int utf8Length, MarshalerContext context)
|
||||
throws IOException {
|
||||
writeUtf8(output, string, utf8Length);
|
||||
}
|
||||
|
||||
// Visible for testing
|
||||
@SuppressWarnings("UnusedVariable") // utf8Length argument is added for future use
|
||||
static void writeUtf8(CodedOutputStream output, String string, int utf8Length)
|
||||
throws IOException {
|
||||
encodeUtf8(output, string);
|
||||
}
|
||||
|
||||
// encode utf8 the same way as length is computed in encodedUtf8Length
|
||||
// adapted from
|
||||
// https://github.com/protocolbuffers/protobuf/blob/b618f6750aed641a23d5f26fbbaf654668846d24/java/core/src/main/java/com/google/protobuf/Utf8.java#L1016
|
||||
private static void encodeUtf8(CodedOutputStream output, String in) throws IOException {
|
||||
int utf16Length = in.length();
|
||||
int i = 0;
|
||||
// Designed to take advantage of
|
||||
// https://wiki.openjdk.java.net/display/HotSpotInternals/RangeCheckElimination
|
||||
for (char c; i < utf16Length && (c = in.charAt(i)) < 0x80; i++) {
|
||||
output.write((byte) c);
|
||||
}
|
||||
if (i == utf16Length) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (char c; i < utf16Length; i++) {
|
||||
c = in.charAt(i);
|
||||
if (c < 0x80) {
|
||||
// 1 byte, 7 bits
|
||||
output.write((byte) c);
|
||||
} else if (c < 0x800) { // 11 bits, two UTF-8 bytes
|
||||
output.write((byte) ((0xF << 6) | (c >>> 6)));
|
||||
output.write((byte) (0x80 | (0x3F & c)));
|
||||
} else if (!Character.isSurrogate(c)) {
|
||||
// Maximum single-char code point is 0xFFFF, 16 bits, three UTF-8 bytes
|
||||
output.write((byte) ((0xF << 5) | (c >>> 12)));
|
||||
output.write((byte) (0x80 | (0x3F & (c >>> 6))));
|
||||
output.write((byte) (0x80 | (0x3F & c)));
|
||||
} else {
|
||||
// Minimum code point represented by a surrogate pair is 0x10000, 17 bits,
|
||||
// four UTF-8 bytes
|
||||
int codePoint = Character.codePointAt(in, i);
|
||||
if (codePoint != c) {
|
||||
output.write((byte) ((0xF << 4) | (codePoint >>> 18)));
|
||||
output.write((byte) (0x80 | (0x3F & (codePoint >>> 12))));
|
||||
output.write((byte) (0x80 | (0x3F & (codePoint >>> 6))));
|
||||
output.write((byte) (0x80 | (0x3F & codePoint)));
|
||||
i++;
|
||||
} else {
|
||||
// invalid sequence
|
||||
output.write((byte) '?');
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private StatelessMarshalerUtil() {}
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.marshal;
|
||||
|
||||
import static io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil.getUtf8Size;
|
||||
import static io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil.writeUtf8;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Random;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class StatelessMarshalerUtilTest {
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("AvoidEscapedUnicodeCharacters")
|
||||
void encodeUtf8() {
|
||||
assertThat(getUtf8Size("")).isEqualTo(0);
|
||||
assertThat(testUtf8("", 0)).isEqualTo("");
|
||||
|
||||
assertThat(getUtf8Size("a")).isEqualTo(1);
|
||||
assertThat(testUtf8("a", 1)).isEqualTo("a");
|
||||
|
||||
assertThat(getUtf8Size("©")).isEqualTo(2);
|
||||
assertThat(testUtf8("©", 2)).isEqualTo("©");
|
||||
|
||||
assertThat(getUtf8Size("∆")).isEqualTo(3);
|
||||
assertThat(testUtf8("∆", 3)).isEqualTo("∆");
|
||||
|
||||
assertThat(getUtf8Size("😀")).isEqualTo(4);
|
||||
assertThat(testUtf8("😀", 4)).isEqualTo("😀");
|
||||
|
||||
// test that invalid characters are replaced with ?
|
||||
assertThat(getUtf8Size("\uD83D😀\uDE00")).isEqualTo(6);
|
||||
assertThat(testUtf8("\uD83D😀\uDE00", 6)).isEqualTo("?😀?");
|
||||
|
||||
// the same invalid sequence as encoded by the jdk
|
||||
byte[] bytes = "\uD83D😀\uDE00".getBytes(StandardCharsets.UTF_8);
|
||||
assertThat(bytes.length).isEqualTo(6);
|
||||
assertThat(new String(bytes, StandardCharsets.UTF_8)).isEqualTo("?😀?");
|
||||
}
|
||||
|
||||
@RepeatedTest(1000)
|
||||
void testUtf8SizeLatin1() {
|
||||
Random random = new Random();
|
||||
byte[] bytes = new byte[15001];
|
||||
random.nextBytes(bytes);
|
||||
String string = new String(bytes, StandardCharsets.ISO_8859_1);
|
||||
int utf8Size = string.getBytes(StandardCharsets.UTF_8).length;
|
||||
assertThat(getUtf8Size(string)).isEqualTo(utf8Size);
|
||||
}
|
||||
|
||||
private static String testUtf8(String string, int utf8Length) {
|
||||
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
|
||||
CodedOutputStream codedOutputStream = CodedOutputStream.newInstance(outputStream);
|
||||
writeUtf8(codedOutputStream, string, utf8Length);
|
||||
codedOutputStream.flush();
|
||||
return new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
|
||||
} catch (Exception exception) {
|
||||
throw new IllegalArgumentException(exception);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.exporter.internal.otlp.traces.LowAllocationTraceRequestMarshaler;
|
||||
import io.opentelemetry.exporter.internal.otlp.traces.TraceRequestMarshaler;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -24,29 +25,72 @@ import org.openjdk.jmh.annotations.Warmup;
|
|||
@Fork(1)
|
||||
public class RequestMarshalBenchmarks {
|
||||
|
||||
private static final LowAllocationTraceRequestMarshaler MARSHALER =
|
||||
new LowAllocationTraceRequestMarshaler();
|
||||
private static final TestOutputStream OUTPUT = new TestOutputStream();
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public TestOutputStream createCustomMarshal(RequestMarshalState state) {
|
||||
public int createStatefulMarshaler(RequestMarshalState state) {
|
||||
TraceRequestMarshaler requestMarshaler = TraceRequestMarshaler.create(state.spanDataList);
|
||||
return new TestOutputStream(requestMarshaler.getBinarySerializedSize());
|
||||
return requestMarshaler.getBinarySerializedSize();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public TestOutputStream marshalCustom(RequestMarshalState state) throws IOException {
|
||||
public int marshalStatefulBinary(RequestMarshalState state) throws IOException {
|
||||
TraceRequestMarshaler requestMarshaler = TraceRequestMarshaler.create(state.spanDataList);
|
||||
TestOutputStream customOutput =
|
||||
new TestOutputStream(requestMarshaler.getBinarySerializedSize());
|
||||
requestMarshaler.writeBinaryTo(customOutput);
|
||||
return customOutput;
|
||||
OUTPUT.reset(requestMarshaler.getBinarySerializedSize());
|
||||
requestMarshaler.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public TestOutputStream marshalJson(RequestMarshalState state) throws IOException {
|
||||
public int marshalStatefulJson(RequestMarshalState state) throws IOException {
|
||||
TraceRequestMarshaler requestMarshaler = TraceRequestMarshaler.create(state.spanDataList);
|
||||
TestOutputStream customOutput = new TestOutputStream();
|
||||
requestMarshaler.writeJsonTo(customOutput);
|
||||
return customOutput;
|
||||
OUTPUT.reset();
|
||||
requestMarshaler.writeJsonTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int createStatelessMarshaler(RequestMarshalState state) {
|
||||
LowAllocationTraceRequestMarshaler requestMarshaler = MARSHALER;
|
||||
requestMarshaler.initialize(state.spanDataList);
|
||||
try {
|
||||
return requestMarshaler.getBinarySerializedSize();
|
||||
} finally {
|
||||
requestMarshaler.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalStatelessBinary(RequestMarshalState state) throws IOException {
|
||||
LowAllocationTraceRequestMarshaler requestMarshaler = MARSHALER;
|
||||
requestMarshaler.initialize(state.spanDataList);
|
||||
try {
|
||||
OUTPUT.reset();
|
||||
requestMarshaler.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
} finally {
|
||||
requestMarshaler.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalStatelessJson(RequestMarshalState state) throws IOException {
|
||||
LowAllocationTraceRequestMarshaler requestMarshaler = MARSHALER;
|
||||
requestMarshaler.initialize(state.spanDataList);
|
||||
try {
|
||||
OUTPUT.reset();
|
||||
requestMarshaler.writeJsonTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
} finally {
|
||||
requestMarshaler.reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.Marshaler;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.BenchmarkMode;
|
||||
import org.openjdk.jmh.annotations.Fork;
|
||||
import org.openjdk.jmh.annotations.Measurement;
|
||||
import org.openjdk.jmh.annotations.Mode;
|
||||
import org.openjdk.jmh.annotations.OutputTimeUnit;
|
||||
import org.openjdk.jmh.annotations.Threads;
|
||||
import org.openjdk.jmh.annotations.Warmup;
|
||||
|
||||
@BenchmarkMode({Mode.AverageTime})
|
||||
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
||||
@Warmup(iterations = 5, time = 1)
|
||||
@Measurement(iterations = 10, time = 1)
|
||||
@Fork(1)
|
||||
public class StringMarshalBenchmark {
|
||||
private static final TestMarshaler MARSHALER = new TestMarshaler();
|
||||
private static final TestOutputStream OUTPUT = new TestOutputStream();
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalAsciiString(StringMarshalState state) throws IOException {
|
||||
OUTPUT.reset();
|
||||
Marshaler marshaler = StringAnyValueMarshaler.create(state.asciiString);
|
||||
marshaler.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalLatin1String(StringMarshalState state) throws IOException {
|
||||
OUTPUT.reset();
|
||||
Marshaler marshaler = StringAnyValueMarshaler.create(state.latin1String);
|
||||
marshaler.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalUnicodeString(StringMarshalState state) throws IOException {
|
||||
OUTPUT.reset();
|
||||
Marshaler marshaler = StringAnyValueMarshaler.create(state.unicodeString);
|
||||
marshaler.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalAsciiStringLowAllocation(StringMarshalState state) throws IOException {
|
||||
OUTPUT.reset();
|
||||
try {
|
||||
MARSHALER.initialize(state.asciiString);
|
||||
MARSHALER.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
} finally {
|
||||
MARSHALER.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalLatin1StringLowAllocation(StringMarshalState state) throws IOException {
|
||||
OUTPUT.reset();
|
||||
try {
|
||||
MARSHALER.initialize(state.latin1String);
|
||||
MARSHALER.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
} finally {
|
||||
MARSHALER.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@Threads(1)
|
||||
public int marshalUnicodeStringLowAllocation(StringMarshalState state) throws IOException {
|
||||
OUTPUT.reset();
|
||||
try {
|
||||
MARSHALER.initialize(state.unicodeString);
|
||||
MARSHALER.writeBinaryTo(OUTPUT);
|
||||
return OUTPUT.getCount();
|
||||
} finally {
|
||||
MARSHALER.reset();
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestMarshaler extends Marshaler {
|
||||
private final MarshalerContext context = new MarshalerContext();
|
||||
private int size;
|
||||
private String value;
|
||||
|
||||
public void initialize(String string) {
|
||||
value = string;
|
||||
size = StringAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(string, context);
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
context.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output) throws IOException {
|
||||
StringAnyValueStatelessMarshaler.INSTANCE.writeTo(output, value, context);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
public class StringMarshalState {
|
||||
|
||||
@Param({"16", "512"})
|
||||
int stringSize;
|
||||
|
||||
String asciiString;
|
||||
String latin1String;
|
||||
String unicodeString;
|
||||
|
||||
@Setup
|
||||
public void setup() {
|
||||
asciiString = makeString('a', stringSize);
|
||||
latin1String = makeString('ä', stringSize);
|
||||
unicodeString = makeString('∆', stringSize);
|
||||
}
|
||||
|
||||
private static String makeString(char c, int size) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < size; i++) {
|
||||
sb.append(c);
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -8,7 +8,7 @@ package io.opentelemetry.exporter.internal.otlp;
|
|||
import java.io.OutputStream;
|
||||
|
||||
class TestOutputStream extends OutputStream {
|
||||
private final int size;
|
||||
private int size;
|
||||
private int count;
|
||||
|
||||
TestOutputStream() {
|
||||
|
@ -26,4 +26,17 @@ class TestOutputStream extends OutputStream {
|
|||
throw new IllegalStateException("max size exceeded");
|
||||
}
|
||||
}
|
||||
|
||||
void reset(int size) {
|
||||
this.size = size;
|
||||
this.count = 0;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
reset(-1);
|
||||
}
|
||||
|
||||
int getCount() {
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeType;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler2;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.proto.common.v1.internal.ArrayValue;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/** See {@link ArrayAnyValueMarshaler}. */
|
||||
// TODO: add support for List<io.opentelemetry.api.incubator.logs.AnyValue<?>>
|
||||
final class ArrayAnyValueStatelessMarshaler<T>
|
||||
implements StatelessMarshaler2<AttributeType, List<T>> {
|
||||
static final ArrayAnyValueStatelessMarshaler<Object> INSTANCE =
|
||||
new ArrayAnyValueStatelessMarshaler<>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void writeTo(Serializer output, AttributeType type, List<T> list, MarshalerContext context)
|
||||
throws IOException {
|
||||
switch (type) {
|
||||
case STRING_ARRAY:
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES,
|
||||
(List<String>) list,
|
||||
StringAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
return;
|
||||
case LONG_ARRAY:
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES, (List<Long>) list, IntAnyValueStatelessMarshaler.INSTANCE, context);
|
||||
return;
|
||||
case BOOLEAN_ARRAY:
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES,
|
||||
(List<Boolean>) list,
|
||||
BoolAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
return;
|
||||
case DOUBLE_ARRAY:
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES,
|
||||
(List<Double>) list,
|
||||
DoubleAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
return;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported attribute type.");
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public int getBinarySerializedSize(AttributeType type, List<T> list, MarshalerContext context) {
|
||||
switch (type) {
|
||||
case STRING_ARRAY:
|
||||
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES,
|
||||
(List<String>) list,
|
||||
StringAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
case LONG_ARRAY:
|
||||
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES, (List<Long>) list, IntAnyValueStatelessMarshaler.INSTANCE, context);
|
||||
case BOOLEAN_ARRAY:
|
||||
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES,
|
||||
(List<Boolean>) list,
|
||||
BoolAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
case DOUBLE_ARRAY:
|
||||
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
ArrayValue.VALUES,
|
||||
(List<Double>) list,
|
||||
DoubleAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unsupported attribute type.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.proto.common.v1.internal.AnyValue;
|
||||
import java.io.IOException;
|
||||
|
||||
/** See {@link BoolAnyValueMarshaler}. */
|
||||
final class BoolAnyValueStatelessMarshaler implements StatelessMarshaler<Boolean> {
|
||||
static final BoolAnyValueStatelessMarshaler INSTANCE = new BoolAnyValueStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, Boolean value, MarshalerContext context)
|
||||
throws IOException {
|
||||
output.writeBool(AnyValue.BOOL_VALUE, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(Boolean value, MarshalerContext context) {
|
||||
return AnyValue.BOOL_VALUE.getTagSize() + CodedOutputStream.computeBoolSizeNoTag(value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.proto.common.v1.internal.AnyValue;
|
||||
import java.io.IOException;
|
||||
|
||||
/** See {@link DoubleAnyValueMarshaler}. */
|
||||
final class DoubleAnyValueStatelessMarshaler implements StatelessMarshaler<Double> {
|
||||
static final DoubleAnyValueStatelessMarshaler INSTANCE = new DoubleAnyValueStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, Double value, MarshalerContext context)
|
||||
throws IOException {
|
||||
output.writeDouble(AnyValue.DOUBLE_VALUE, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(Double value, MarshalerContext context) {
|
||||
return AnyValue.DOUBLE_VALUE.getTagSize() + CodedOutputStream.computeDoubleSizeNoTag(value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.CodedOutputStream;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.proto.common.v1.internal.AnyValue;
|
||||
import java.io.IOException;
|
||||
|
||||
/** See {@link IntAnyValueMarshaler}. */
|
||||
final class IntAnyValueStatelessMarshaler implements StatelessMarshaler<Long> {
|
||||
static final IntAnyValueStatelessMarshaler INSTANCE = new IntAnyValueStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, Long value, MarshalerContext context) throws IOException {
|
||||
output.writeInt64(AnyValue.INT_VALUE, value);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(Long value, MarshalerContext context) {
|
||||
return AnyValue.INT_VALUE.getTagSize() + CodedOutputStream.computeInt64SizeNoTag(value);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,142 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.AttributeType;
|
||||
import io.opentelemetry.api.internal.InternalAttributeKeyImpl;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler2;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.proto.common.v1.internal.AnyValue;
|
||||
import io.opentelemetry.proto.common.v1.internal.KeyValue;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A Marshaler of key value pairs. See {@link KeyValueMarshaler}.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
public final class KeyValueStatelessMarshaler
|
||||
implements StatelessMarshaler2<AttributeKey<?>, Object> {
|
||||
public static final KeyValueStatelessMarshaler INSTANCE = new KeyValueStatelessMarshaler();
|
||||
private static final byte[] EMPTY_BYTES = new byte[0];
|
||||
|
||||
@Override
|
||||
public void writeTo(
|
||||
Serializer output, AttributeKey<?> attributeKey, Object value, MarshalerContext context)
|
||||
throws IOException {
|
||||
if (attributeKey.getKey().isEmpty()) {
|
||||
output.serializeString(KeyValue.KEY, EMPTY_BYTES);
|
||||
} else if (attributeKey instanceof InternalAttributeKeyImpl) {
|
||||
byte[] keyUtf8 = ((InternalAttributeKeyImpl<?>) attributeKey).getKeyUtf8();
|
||||
output.serializeString(KeyValue.KEY, keyUtf8);
|
||||
} else {
|
||||
output.serializeStringWithContext(KeyValue.KEY, attributeKey.getKey(), context);
|
||||
}
|
||||
output.serializeMessageWithContext(
|
||||
KeyValue.VALUE, attributeKey, value, ValueStatelessMarshaler.INSTANCE, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(
|
||||
AttributeKey<?> attributeKey, Object value, MarshalerContext context) {
|
||||
int size = 0;
|
||||
if (!attributeKey.getKey().isEmpty()) {
|
||||
if (attributeKey instanceof InternalAttributeKeyImpl) {
|
||||
byte[] keyUtf8 = ((InternalAttributeKeyImpl<?>) attributeKey).getKeyUtf8();
|
||||
size += MarshalerUtil.sizeBytes(KeyValue.KEY, keyUtf8);
|
||||
} else {
|
||||
return StatelessMarshalerUtil.sizeStringWithContext(
|
||||
KeyValue.KEY, attributeKey.getKey(), context);
|
||||
}
|
||||
}
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeMessageWithContext(
|
||||
KeyValue.VALUE, attributeKey, value, ValueStatelessMarshaler.INSTANCE, context);
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
private static class ValueStatelessMarshaler
|
||||
implements StatelessMarshaler2<AttributeKey<?>, Object> {
|
||||
static final ValueStatelessMarshaler INSTANCE = new ValueStatelessMarshaler();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public int getBinarySerializedSize(
|
||||
AttributeKey<?> attributeKey, Object value, MarshalerContext context) {
|
||||
AttributeType attributeType = attributeKey.getType();
|
||||
switch (attributeType) {
|
||||
case STRING:
|
||||
return StringAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
|
||||
(String) value, context);
|
||||
case LONG:
|
||||
return IntAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
|
||||
(Long) value, context);
|
||||
case BOOLEAN:
|
||||
return BoolAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
|
||||
(Boolean) value, context);
|
||||
case DOUBLE:
|
||||
return DoubleAnyValueStatelessMarshaler.INSTANCE.getBinarySerializedSize(
|
||||
(Double) value, context);
|
||||
case STRING_ARRAY:
|
||||
case LONG_ARRAY:
|
||||
case BOOLEAN_ARRAY:
|
||||
case DOUBLE_ARRAY:
|
||||
return StatelessMarshalerUtil.sizeMessageWithContext(
|
||||
AnyValue.ARRAY_VALUE,
|
||||
attributeType,
|
||||
(List<Object>) value,
|
||||
ArrayAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
}
|
||||
// Error prone ensures the switch statement is complete, otherwise only can happen with
|
||||
// unaligned versions which are not supported.
|
||||
throw new IllegalArgumentException("Unsupported attribute type.");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void writeTo(
|
||||
Serializer output, AttributeKey<?> attributeKey, Object value, MarshalerContext context)
|
||||
throws IOException {
|
||||
AttributeType attributeType = attributeKey.getType();
|
||||
switch (attributeType) {
|
||||
case STRING:
|
||||
StringAnyValueStatelessMarshaler.INSTANCE.writeTo(output, (String) value, context);
|
||||
return;
|
||||
case LONG:
|
||||
IntAnyValueStatelessMarshaler.INSTANCE.writeTo(output, (Long) value, context);
|
||||
return;
|
||||
case BOOLEAN:
|
||||
BoolAnyValueStatelessMarshaler.INSTANCE.writeTo(output, (Boolean) value, context);
|
||||
return;
|
||||
case DOUBLE:
|
||||
DoubleAnyValueStatelessMarshaler.INSTANCE.writeTo(output, (Double) value, context);
|
||||
return;
|
||||
case STRING_ARRAY:
|
||||
case LONG_ARRAY:
|
||||
case BOOLEAN_ARRAY:
|
||||
case DOUBLE_ARRAY:
|
||||
output.serializeMessageWithContext(
|
||||
AnyValue.ARRAY_VALUE,
|
||||
attributeType,
|
||||
(List<Object>) value,
|
||||
ArrayAnyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
return;
|
||||
}
|
||||
// Error prone ensures the switch statement is complete, otherwise only can happen with
|
||||
// unaligned versions which are not supported.
|
||||
throw new IllegalArgumentException("Unsupported attribute type.");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.proto.common.v1.internal.AnyValue;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A Marshaler of string-valued {@link AnyValue}. See {@link StringAnyValueMarshaler}.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
final class StringAnyValueStatelessMarshaler implements StatelessMarshaler<String> {
|
||||
static final StringAnyValueStatelessMarshaler INSTANCE = new StringAnyValueStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, String value, MarshalerContext context)
|
||||
throws IOException {
|
||||
output.serializeStringWithContext(AnyValue.STRING_VALUE, value, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(String value, MarshalerContext context) {
|
||||
return StatelessMarshalerUtil.sizeStringWithContext(AnyValue.STRING_VALUE, value, context);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,63 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler2;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.otlp.InstrumentationScopeMarshaler;
|
||||
import io.opentelemetry.proto.trace.v1.internal.ScopeSpans;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/** See {@link InstrumentationScopeSpansMarshaler}. */
|
||||
final class InstrumentationScopeSpansStatelessMarshaler
|
||||
implements StatelessMarshaler2<InstrumentationScopeInfo, List<SpanData>> {
|
||||
static final InstrumentationScopeSpansStatelessMarshaler INSTANCE =
|
||||
new InstrumentationScopeSpansStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(
|
||||
Serializer output,
|
||||
InstrumentationScopeInfo instrumentationScope,
|
||||
List<SpanData> spans,
|
||||
MarshalerContext context)
|
||||
throws IOException {
|
||||
InstrumentationScopeMarshaler instrumentationScopeMarshaler =
|
||||
context.getData(InstrumentationScopeMarshaler.class);
|
||||
|
||||
output.serializeMessage(ScopeSpans.SCOPE, instrumentationScopeMarshaler);
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
ScopeSpans.SPANS, spans, SpanStatelessMarshaler.INSTANCE, context);
|
||||
output.serializeStringWithContext(
|
||||
ScopeSpans.SCHEMA_URL, instrumentationScope.getSchemaUrl(), context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(
|
||||
InstrumentationScopeInfo instrumentationScope,
|
||||
List<SpanData> spans,
|
||||
MarshalerContext context) {
|
||||
InstrumentationScopeMarshaler instrumentationScopeMarshaler =
|
||||
InstrumentationScopeMarshaler.create(instrumentationScope);
|
||||
context.addData(instrumentationScopeMarshaler);
|
||||
|
||||
int size = 0;
|
||||
size += MarshalerUtil.sizeMessage(ScopeSpans.SCOPE, instrumentationScopeMarshaler);
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
ScopeSpans.SPANS, spans, SpanStatelessMarshaler.INSTANCE, context);
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeStringWithContext(
|
||||
ScopeSpans.SCHEMA_URL, instrumentationScope.getSchemaUrl(), context);
|
||||
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.Marshaler;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.proto.collector.trace.v1.internal.ExportTraceServiceRequest;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* {@link Marshaler} to convert SDK {@link SpanData} to OTLP ExportTraceServiceRequest. See {@link
|
||||
* TraceRequestMarshaler}.
|
||||
*
|
||||
* <p>Example usage:
|
||||
*
|
||||
* <pre>{@code
|
||||
* void marshal(LowAllocationTraceRequestMarshaler requestMarshaler, OutputStream output,
|
||||
* List<SpanData> spanList) throws IOException {
|
||||
* requestMarshaler.initialize(spanList);
|
||||
* try {
|
||||
* requestMarshaler.writeBinaryTo(output);
|
||||
* } finally {
|
||||
* requestMarshaler.reset();
|
||||
* }
|
||||
* }
|
||||
* }</pre>
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
public final class LowAllocationTraceRequestMarshaler extends Marshaler {
|
||||
private static final MarshalerContext.Key RESOURCE_SPAN_SIZE_CALCULATOR_KEY =
|
||||
MarshalerContext.key();
|
||||
private static final MarshalerContext.Key RESOURCE_SPAN_WRITER_KEY = MarshalerContext.key();
|
||||
|
||||
private final MarshalerContext context = new MarshalerContext();
|
||||
|
||||
@SuppressWarnings("NullAway")
|
||||
private Map<Resource, Map<InstrumentationScopeInfo, List<SpanData>>> resourceAndScopeMap;
|
||||
|
||||
private int size;
|
||||
|
||||
public void initialize(Collection<SpanData> spanDataList) {
|
||||
resourceAndScopeMap = groupByResourceAndScope(context, spanDataList);
|
||||
size = calculateSize(context, resourceAndScopeMap);
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
context.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output) throws IOException {
|
||||
// serializing can be retried, reset the indexes, so we could call writeTo multiple times
|
||||
context.resetReadIndex();
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
ExportTraceServiceRequest.RESOURCE_SPANS,
|
||||
resourceAndScopeMap,
|
||||
ResourceSpansStatelessMarshaler.INSTANCE,
|
||||
context,
|
||||
RESOURCE_SPAN_WRITER_KEY);
|
||||
}
|
||||
|
||||
private static int calculateSize(
|
||||
MarshalerContext context,
|
||||
Map<Resource, Map<InstrumentationScopeInfo, List<SpanData>>> resourceAndScopeMap) {
|
||||
return StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
ExportTraceServiceRequest.RESOURCE_SPANS,
|
||||
resourceAndScopeMap,
|
||||
ResourceSpansStatelessMarshaler.INSTANCE,
|
||||
context,
|
||||
RESOURCE_SPAN_SIZE_CALCULATOR_KEY);
|
||||
}
|
||||
|
||||
private static Map<Resource, Map<InstrumentationScopeInfo, List<SpanData>>>
|
||||
groupByResourceAndScope(MarshalerContext context, Collection<SpanData> spanDataList) {
|
||||
|
||||
if (spanDataList.isEmpty()) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
|
||||
return StatelessMarshalerUtil.groupByResourceAndScope(
|
||||
spanDataList,
|
||||
// TODO(anuraaga): Replace with an internal SdkData type of interface that exposes these
|
||||
// two.
|
||||
SpanData::getResource,
|
||||
SpanData::getInstrumentationScopeInfo,
|
||||
context);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler2;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.otlp.ResourceMarshaler;
|
||||
import io.opentelemetry.proto.trace.v1.internal.ResourceSpans;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A Marshaler of ResourceSpans. See {@link ResourceSpansMarshaler}.
|
||||
*
|
||||
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
|
||||
* at any time.
|
||||
*/
|
||||
public final class ResourceSpansStatelessMarshaler
|
||||
implements StatelessMarshaler2<Resource, Map<InstrumentationScopeInfo, List<SpanData>>> {
|
||||
static final ResourceSpansStatelessMarshaler INSTANCE = new ResourceSpansStatelessMarshaler();
|
||||
private static final MarshalerContext.Key SCOPE_SPAN_WRITER_KEY = MarshalerContext.key();
|
||||
private static final MarshalerContext.Key SCOPE_SPAN_SIZE_CALCULATOR_KEY = MarshalerContext.key();
|
||||
|
||||
@Override
|
||||
public void writeTo(
|
||||
Serializer output,
|
||||
Resource resource,
|
||||
Map<InstrumentationScopeInfo, List<SpanData>> scopeMap,
|
||||
MarshalerContext context)
|
||||
throws IOException {
|
||||
ResourceMarshaler resourceMarshaler = context.getData(ResourceMarshaler.class);
|
||||
output.serializeMessage(ResourceSpans.RESOURCE, resourceMarshaler);
|
||||
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
ResourceSpans.SCOPE_SPANS,
|
||||
scopeMap,
|
||||
InstrumentationScopeSpansStatelessMarshaler.INSTANCE,
|
||||
context,
|
||||
SCOPE_SPAN_WRITER_KEY);
|
||||
|
||||
output.serializeStringWithContext(ResourceSpans.SCHEMA_URL, resource.getSchemaUrl(), context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(
|
||||
Resource resource,
|
||||
Map<InstrumentationScopeInfo, List<SpanData>> scopeMap,
|
||||
MarshalerContext context) {
|
||||
|
||||
int size = 0;
|
||||
|
||||
ResourceMarshaler resourceMarshaler = ResourceMarshaler.create(resource);
|
||||
context.addData(resourceMarshaler);
|
||||
size += MarshalerUtil.sizeMessage(ResourceSpans.RESOURCE, resourceMarshaler);
|
||||
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
ResourceSpans.SCOPE_SPANS,
|
||||
scopeMap,
|
||||
InstrumentationScopeSpansStatelessMarshaler.INSTANCE,
|
||||
context,
|
||||
SCOPE_SPAN_SIZE_CALCULATOR_KEY);
|
||||
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeStringWithContext(
|
||||
ResourceSpans.SCHEMA_URL, resource.getSchemaUrl(), context);
|
||||
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler;
|
||||
import io.opentelemetry.proto.trace.v1.internal.Span;
|
||||
import io.opentelemetry.sdk.trace.data.EventData;
|
||||
import java.io.IOException;
|
||||
|
||||
/** See {@link SpanEventMarshaler}. */
|
||||
final class SpanEventStatelessMarshaler implements StatelessMarshaler<EventData> {
|
||||
static final SpanEventStatelessMarshaler INSTANCE = new SpanEventStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, EventData event, MarshalerContext context)
|
||||
throws IOException {
|
||||
output.serializeFixed64(Span.Event.TIME_UNIX_NANO, event.getEpochNanos());
|
||||
output.serializeStringWithContext(Span.Event.NAME, event.getName(), context);
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
Span.Event.ATTRIBUTES, event.getAttributes(), KeyValueStatelessMarshaler.INSTANCE, context);
|
||||
int droppedAttributesCount = event.getTotalAttributeCount() - event.getAttributes().size();
|
||||
output.serializeUInt32(Span.Event.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(EventData event, MarshalerContext context) {
|
||||
int size = 0;
|
||||
size += MarshalerUtil.sizeFixed64(Span.Event.TIME_UNIX_NANO, event.getEpochNanos());
|
||||
size += StatelessMarshalerUtil.sizeStringWithContext(Span.Event.NAME, event.getName(), context);
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
Span.Event.ATTRIBUTES,
|
||||
event.getAttributes(),
|
||||
KeyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
int droppedAttributesCount = event.getTotalAttributeCount() - event.getAttributes().size();
|
||||
size += MarshalerUtil.sizeUInt32(Span.Event.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
|
||||
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -5,7 +5,7 @@
|
|||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import static io.opentelemetry.api.trace.propagation.internal.W3CTraceContextEncoding.encodeTraceState;
|
||||
import static io.opentelemetry.exporter.internal.otlp.traces.SpanMarshaler.encodeTraceState;
|
||||
|
||||
import io.opentelemetry.api.trace.TraceFlags;
|
||||
import io.opentelemetry.api.trace.TraceState;
|
||||
|
@ -16,12 +16,11 @@ import io.opentelemetry.exporter.internal.otlp.KeyValueMarshaler;
|
|||
import io.opentelemetry.proto.trace.v1.internal.Span;
|
||||
import io.opentelemetry.sdk.trace.data.LinkData;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.List;
|
||||
|
||||
final class SpanLinkMarshaler extends MarshalerWithSize {
|
||||
private static final SpanLinkMarshaler[] EMPTY = new SpanLinkMarshaler[0];
|
||||
private static final byte[] EMPTY_BYTES = new byte[0];
|
||||
|
||||
private final String traceId;
|
||||
private final String spanId;
|
||||
private final byte[] traceStateUtf8;
|
||||
|
@ -46,11 +45,7 @@ final class SpanLinkMarshaler extends MarshalerWithSize {
|
|||
|
||||
// Visible for testing
|
||||
static SpanLinkMarshaler create(LinkData link) {
|
||||
TraceState traceState = link.getSpanContext().getTraceState();
|
||||
byte[] traceStateUtf8 =
|
||||
traceState.isEmpty()
|
||||
? EMPTY_BYTES
|
||||
: encodeTraceState(traceState).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] traceStateUtf8 = encodeSpanLinkTraceState(link);
|
||||
|
||||
return new SpanLinkMarshaler(
|
||||
link.getSpanContext().getTraceId(),
|
||||
|
@ -118,4 +113,9 @@ final class SpanLinkMarshaler extends MarshalerWithSize {
|
|||
Span.Link.FLAGS, SpanFlags.withParentIsRemoteFlags(flags, isLinkContextRemote));
|
||||
return size;
|
||||
}
|
||||
|
||||
static byte[] encodeSpanLinkTraceState(LinkData link) {
|
||||
TraceState traceState = link.getSpanContext().getTraceState();
|
||||
return encodeTraceState(traceState);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import static io.opentelemetry.exporter.internal.otlp.traces.SpanLinkMarshaler.encodeSpanLinkTraceState;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler;
|
||||
import io.opentelemetry.proto.trace.v1.internal.Span;
|
||||
import io.opentelemetry.sdk.trace.data.LinkData;
|
||||
import java.io.IOException;
|
||||
|
||||
/** See {@link SpanLinkMarshaler}. */
|
||||
final class SpanLinkStatelessMarshaler implements StatelessMarshaler<LinkData> {
|
||||
static final SpanLinkStatelessMarshaler INSTANCE = new SpanLinkStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, LinkData link, MarshalerContext context)
|
||||
throws IOException {
|
||||
output.serializeTraceId(Span.Link.TRACE_ID, link.getSpanContext().getTraceId(), context);
|
||||
output.serializeSpanId(Span.Link.SPAN_ID, link.getSpanContext().getSpanId(), context);
|
||||
output.serializeString(Span.Link.TRACE_STATE, context.getData(byte[].class));
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
Span.Link.ATTRIBUTES, link.getAttributes(), KeyValueStatelessMarshaler.INSTANCE, context);
|
||||
int droppedAttributesCount = link.getTotalAttributeCount() - link.getAttributes().size();
|
||||
output.serializeUInt32(Span.Link.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
|
||||
output.serializeFixed32(
|
||||
Span.Link.FLAGS,
|
||||
SpanFlags.withParentIsRemoteFlags(
|
||||
link.getSpanContext().getTraceFlags(), link.getSpanContext().isRemote()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(LinkData link, MarshalerContext context) {
|
||||
byte[] traceStateUtf8 = encodeSpanLinkTraceState(link);
|
||||
context.addData(traceStateUtf8);
|
||||
|
||||
int size = 0;
|
||||
size += MarshalerUtil.sizeTraceId(Span.Link.TRACE_ID, link.getSpanContext().getTraceId());
|
||||
size += MarshalerUtil.sizeSpanId(Span.Link.SPAN_ID, link.getSpanContext().getSpanId());
|
||||
size += MarshalerUtil.sizeBytes(Span.Link.TRACE_STATE, traceStateUtf8);
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
Span.Link.ATTRIBUTES,
|
||||
link.getAttributes(),
|
||||
KeyValueStatelessMarshaler.INSTANCE,
|
||||
context);
|
||||
int droppedAttributesCount = link.getTotalAttributeCount() - link.getAttributes().size();
|
||||
size += MarshalerUtil.sizeUInt32(Span.Link.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
|
||||
size +=
|
||||
MarshalerUtil.sizeFixed32(
|
||||
Span.Link.FLAGS,
|
||||
SpanFlags.withParentIsRemoteFlags(
|
||||
link.getSpanContext().getTraceFlags(), link.getSpanContext().isRemote()));
|
||||
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -5,11 +5,10 @@
|
|||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import static io.opentelemetry.api.trace.propagation.internal.W3CTraceContextEncoding.encodeTraceState;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.TraceFlags;
|
||||
import io.opentelemetry.api.trace.TraceState;
|
||||
import io.opentelemetry.api.trace.propagation.internal.W3CTraceContextEncoding;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
|
||||
import io.opentelemetry.exporter.internal.marshal.ProtoEnumInfo;
|
||||
|
@ -54,11 +53,7 @@ final class SpanMarshaler extends MarshalerWithSize {
|
|||
? spanData.getParentSpanContext().getSpanId()
|
||||
: null;
|
||||
|
||||
TraceState traceState = spanData.getSpanContext().getTraceState();
|
||||
byte[] traceStateUtf8 =
|
||||
traceState.isEmpty()
|
||||
? EMPTY_BYTES
|
||||
: encodeTraceState(traceState).getBytes(StandardCharsets.UTF_8);
|
||||
byte[] traceStateUtf8 = encodeSpanTraceState(spanData);
|
||||
|
||||
return new SpanMarshaler(
|
||||
spanData.getSpanContext().getTraceId(),
|
||||
|
@ -226,4 +221,15 @@ final class SpanMarshaler extends MarshalerWithSize {
|
|||
// NB: Should not be possible with aligned versions.
|
||||
return Span.SpanKind.SPAN_KIND_UNSPECIFIED;
|
||||
}
|
||||
|
||||
static byte[] encodeSpanTraceState(SpanData span) {
|
||||
TraceState traceState = span.getSpanContext().getTraceState();
|
||||
return encodeTraceState(traceState);
|
||||
}
|
||||
|
||||
static byte[] encodeTraceState(TraceState traceState) {
|
||||
return traceState.isEmpty()
|
||||
? EMPTY_BYTES
|
||||
: W3CTraceContextEncoding.encodeTraceState(traceState).getBytes(StandardCharsets.UTF_8);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,117 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import static io.opentelemetry.exporter.internal.otlp.traces.SpanMarshaler.encodeSpanTraceState;
|
||||
import static io.opentelemetry.exporter.internal.otlp.traces.SpanMarshaler.toProtoSpanKind;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.otlp.KeyValueStatelessMarshaler;
|
||||
import io.opentelemetry.proto.trace.v1.internal.Span;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.io.IOException;
|
||||
|
||||
/** See {@link SpanMarshaler}. */
|
||||
final class SpanStatelessMarshaler implements StatelessMarshaler<SpanData> {
|
||||
static final SpanStatelessMarshaler INSTANCE = new SpanStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, SpanData span, MarshalerContext context)
|
||||
throws IOException {
|
||||
output.serializeTraceId(Span.TRACE_ID, span.getTraceId(), context);
|
||||
output.serializeSpanId(Span.SPAN_ID, span.getSpanId(), context);
|
||||
|
||||
byte[] traceStateUtf8 = context.getData(byte[].class);
|
||||
output.serializeString(Span.TRACE_STATE, traceStateUtf8);
|
||||
String parentSpanId =
|
||||
span.getParentSpanContext().isValid() ? span.getParentSpanContext().getSpanId() : null;
|
||||
output.serializeSpanId(Span.PARENT_SPAN_ID, parentSpanId, context);
|
||||
|
||||
output.serializeStringWithContext(Span.NAME, span.getName(), context);
|
||||
output.serializeEnum(Span.KIND, toProtoSpanKind(span.getKind()));
|
||||
|
||||
output.serializeFixed64(Span.START_TIME_UNIX_NANO, span.getStartEpochNanos());
|
||||
output.serializeFixed64(Span.END_TIME_UNIX_NANO, span.getEndEpochNanos());
|
||||
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
Span.ATTRIBUTES, span.getAttributes(), KeyValueStatelessMarshaler.INSTANCE, context);
|
||||
int droppedAttributesCount = span.getTotalAttributeCount() - span.getAttributes().size();
|
||||
output.serializeUInt32(Span.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
|
||||
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
Span.EVENTS, span.getEvents(), SpanEventStatelessMarshaler.INSTANCE, context);
|
||||
int droppedEventsCount = span.getTotalRecordedEvents() - span.getEvents().size();
|
||||
output.serializeUInt32(Span.DROPPED_EVENTS_COUNT, droppedEventsCount);
|
||||
|
||||
output.serializeRepeatedMessageWithContext(
|
||||
Span.LINKS, span.getLinks(), SpanLinkStatelessMarshaler.INSTANCE, context);
|
||||
int droppedLinksCount = span.getTotalRecordedLinks() - span.getLinks().size();
|
||||
output.serializeUInt32(Span.DROPPED_LINKS_COUNT, droppedLinksCount);
|
||||
|
||||
output.serializeMessageWithContext(
|
||||
Span.STATUS, span.getStatus(), SpanStatusStatelessMarshaler.INSTANCE, context);
|
||||
|
||||
output.serializeFixed32(
|
||||
Span.FLAGS,
|
||||
SpanFlags.withParentIsRemoteFlags(
|
||||
span.getSpanContext().getTraceFlags(), span.getParentSpanContext().isRemote()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(SpanData span, MarshalerContext context) {
|
||||
int size = 0;
|
||||
size += MarshalerUtil.sizeTraceId(Span.TRACE_ID, span.getTraceId());
|
||||
size += MarshalerUtil.sizeSpanId(Span.SPAN_ID, span.getSpanId());
|
||||
|
||||
byte[] traceStateUtf8 = encodeSpanTraceState(span);
|
||||
context.addData(traceStateUtf8);
|
||||
|
||||
size += MarshalerUtil.sizeBytes(Span.TRACE_STATE, traceStateUtf8);
|
||||
String parentSpanId =
|
||||
span.getParentSpanContext().isValid() ? span.getParentSpanContext().getSpanId() : null;
|
||||
size += MarshalerUtil.sizeSpanId(Span.PARENT_SPAN_ID, parentSpanId);
|
||||
|
||||
size += StatelessMarshalerUtil.sizeStringWithContext(Span.NAME, span.getName(), context);
|
||||
size += MarshalerUtil.sizeEnum(Span.KIND, toProtoSpanKind(span.getKind()));
|
||||
|
||||
size += MarshalerUtil.sizeFixed64(Span.START_TIME_UNIX_NANO, span.getStartEpochNanos());
|
||||
size += MarshalerUtil.sizeFixed64(Span.END_TIME_UNIX_NANO, span.getEndEpochNanos());
|
||||
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
Span.ATTRIBUTES, span.getAttributes(), KeyValueStatelessMarshaler.INSTANCE, context);
|
||||
int droppedAttributesCount = span.getTotalAttributeCount() - span.getAttributes().size();
|
||||
size += MarshalerUtil.sizeUInt32(Span.DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
|
||||
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
Span.EVENTS, span.getEvents(), SpanEventStatelessMarshaler.INSTANCE, context);
|
||||
int droppedEventsCount = span.getTotalRecordedEvents() - span.getEvents().size();
|
||||
size += MarshalerUtil.sizeUInt32(Span.DROPPED_EVENTS_COUNT, droppedEventsCount);
|
||||
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeRepeatedMessageWithContext(
|
||||
Span.LINKS, span.getLinks(), SpanLinkStatelessMarshaler.INSTANCE, context);
|
||||
int droppedLinksCount = span.getTotalRecordedLinks() - span.getLinks().size();
|
||||
size += MarshalerUtil.sizeUInt32(Span.DROPPED_LINKS_COUNT, droppedLinksCount);
|
||||
|
||||
size +=
|
||||
StatelessMarshalerUtil.sizeMessageWithContext(
|
||||
Span.STATUS, span.getStatus(), SpanStatusStatelessMarshaler.INSTANCE, context);
|
||||
|
||||
size +=
|
||||
MarshalerUtil.sizeFixed32(
|
||||
Span.FLAGS,
|
||||
SpanFlags.withParentIsRemoteFlags(
|
||||
span.getSpanContext().getTraceFlags(), span.getParentSpanContext().isRemote()));
|
||||
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -19,12 +19,7 @@ final class SpanStatusMarshaler extends MarshalerWithSize {
|
|||
private final byte[] descriptionUtf8;
|
||||
|
||||
static SpanStatusMarshaler create(StatusData status) {
|
||||
ProtoEnumInfo protoStatusCode = Status.StatusCode.STATUS_CODE_UNSET;
|
||||
if (status.getStatusCode() == StatusCode.OK) {
|
||||
protoStatusCode = Status.StatusCode.STATUS_CODE_OK;
|
||||
} else if (status.getStatusCode() == StatusCode.ERROR) {
|
||||
protoStatusCode = Status.StatusCode.STATUS_CODE_ERROR;
|
||||
}
|
||||
ProtoEnumInfo protoStatusCode = toProtoSpanStatus(status);
|
||||
byte[] description = MarshalerUtil.toBytes(status.getDescription());
|
||||
return new SpanStatusMarshaler(protoStatusCode, description);
|
||||
}
|
||||
|
@ -47,4 +42,14 @@ final class SpanStatusMarshaler extends MarshalerWithSize {
|
|||
size += MarshalerUtil.sizeEnum(Status.CODE, protoStatusCode);
|
||||
return size;
|
||||
}
|
||||
|
||||
static ProtoEnumInfo toProtoSpanStatus(StatusData status) {
|
||||
ProtoEnumInfo protoStatusCode = Status.StatusCode.STATUS_CODE_UNSET;
|
||||
if (status.getStatusCode() == StatusCode.OK) {
|
||||
protoStatusCode = Status.StatusCode.STATUS_CODE_OK;
|
||||
} else if (status.getStatusCode() == StatusCode.ERROR) {
|
||||
protoStatusCode = Status.StatusCode.STATUS_CODE_ERROR;
|
||||
}
|
||||
return protoStatusCode;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import static io.opentelemetry.exporter.internal.otlp.traces.SpanStatusMarshaler.toProtoSpanStatus;
|
||||
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerUtil;
|
||||
import io.opentelemetry.exporter.internal.marshal.ProtoEnumInfo;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.proto.trace.v1.internal.Status;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import java.io.IOException;
|
||||
|
||||
/** See {@link SpanStatusMarshaler}. */
|
||||
final class SpanStatusStatelessMarshaler implements StatelessMarshaler<StatusData> {
|
||||
static final SpanStatusStatelessMarshaler INSTANCE = new SpanStatusStatelessMarshaler();
|
||||
|
||||
@Override
|
||||
public void writeTo(Serializer output, StatusData status, MarshalerContext context)
|
||||
throws IOException {
|
||||
ProtoEnumInfo protoStatusCode = toProtoSpanStatus(status);
|
||||
byte[] descriptionUtf8 = context.getData(byte[].class);
|
||||
|
||||
output.serializeString(Status.MESSAGE, descriptionUtf8);
|
||||
output.serializeEnum(Status.CODE, protoStatusCode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize(StatusData status, MarshalerContext context) {
|
||||
ProtoEnumInfo protoStatusCode = toProtoSpanStatus(status);
|
||||
byte[] descriptionUtf8 = MarshalerUtil.toBytes(status.getDescription());
|
||||
context.addData(descriptionUtf8);
|
||||
|
||||
int size = 0;
|
||||
size += MarshalerUtil.sizeBytes(Status.MESSAGE, descriptionUtf8);
|
||||
size += MarshalerUtil.sizeEnum(Status.CODE, protoStatusCode);
|
||||
|
||||
return size;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.exporter.internal.otlp.traces;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.api.common.Attributes;
|
||||
import io.opentelemetry.api.trace.SpanContext;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.api.trace.TraceFlags;
|
||||
import io.opentelemetry.api.trace.TraceState;
|
||||
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
|
||||
import io.opentelemetry.sdk.resources.Resource;
|
||||
import io.opentelemetry.sdk.testing.trace.TestSpanData;
|
||||
import io.opentelemetry.sdk.trace.data.EventData;
|
||||
import io.opentelemetry.sdk.trace.data.LinkData;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class LowAllocationTraceRequestMarshalerTest {
|
||||
|
||||
private static final AttributeKey<Boolean> KEY_BOOL = AttributeKey.booleanKey("key_bool");
|
||||
private static final AttributeKey<String> KEY_STRING = AttributeKey.stringKey("key_string");
|
||||
private static final AttributeKey<Long> KEY_INT = AttributeKey.longKey("key_int");
|
||||
private static final AttributeKey<Double> KEY_DOUBLE = AttributeKey.doubleKey("key_double");
|
||||
private static final AttributeKey<List<String>> KEY_STRING_ARRAY =
|
||||
AttributeKey.stringArrayKey("key_string_array");
|
||||
private static final AttributeKey<List<Long>> KEY_LONG_ARRAY =
|
||||
AttributeKey.longArrayKey("key_long_array");
|
||||
private static final AttributeKey<List<Double>> KEY_DOUBLE_ARRAY =
|
||||
AttributeKey.doubleArrayKey("key_double_array");
|
||||
private static final AttributeKey<List<Boolean>> KEY_BOOLEAN_ARRAY =
|
||||
AttributeKey.booleanArrayKey("key_boolean_array");
|
||||
private static final AttributeKey<String> LINK_ATTR_KEY = AttributeKey.stringKey("link_attr_key");
|
||||
|
||||
private static final Resource RESOURCE =
|
||||
Resource.create(
|
||||
Attributes.builder()
|
||||
.put(KEY_BOOL, true)
|
||||
.put(KEY_STRING, "string")
|
||||
.put(KEY_INT, 100L)
|
||||
.put(KEY_DOUBLE, 100.3)
|
||||
.put(KEY_STRING_ARRAY, Arrays.asList("string", "string"))
|
||||
.put(KEY_LONG_ARRAY, Arrays.asList(12L, 23L))
|
||||
.put(KEY_DOUBLE_ARRAY, Arrays.asList(12.3, 23.1))
|
||||
.put(KEY_BOOLEAN_ARRAY, Arrays.asList(true, false))
|
||||
.build());
|
||||
|
||||
private static final InstrumentationScopeInfo INSTRUMENTATION_SCOPE_INFO =
|
||||
InstrumentationScopeInfo.create("name");
|
||||
private static final String TRACE_ID = "7b2e170db4df2d593ddb4ddf2ddf2d59";
|
||||
private static final String SPAN_ID = "170d3ddb4d23e81f";
|
||||
private static final SpanContext SPAN_CONTEXT =
|
||||
SpanContext.create(TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault());
|
||||
|
||||
private final List<SpanData> spanDataList = createSpanDataList();
|
||||
|
||||
private static List<SpanData> createSpanDataList() {
|
||||
List<SpanData> spanDataList = new ArrayList<>();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
spanDataList.add(createSpanData());
|
||||
}
|
||||
return spanDataList;
|
||||
}
|
||||
|
||||
private static SpanData createSpanData() {
|
||||
return TestSpanData.builder()
|
||||
.setResource(RESOURCE)
|
||||
.setInstrumentationScopeInfo(INSTRUMENTATION_SCOPE_INFO)
|
||||
.setHasEnded(true)
|
||||
.setSpanContext(SPAN_CONTEXT)
|
||||
.setParentSpanContext(SpanContext.getInvalid())
|
||||
.setName("GET /api/endpoint")
|
||||
.setKind(SpanKind.SERVER)
|
||||
.setStartEpochNanos(12345)
|
||||
.setEndEpochNanos(12349)
|
||||
.setAttributes(
|
||||
Attributes.builder()
|
||||
.put(KEY_BOOL, true)
|
||||
.put(KEY_STRING, "string")
|
||||
.put(KEY_INT, 100L)
|
||||
.put(KEY_DOUBLE, 100.3)
|
||||
.build())
|
||||
.setTotalAttributeCount(2)
|
||||
.setEvents(
|
||||
Arrays.asList(
|
||||
EventData.create(12347, "my_event_1", Attributes.empty()),
|
||||
EventData.create(12348, "my_event_2", Attributes.of(KEY_INT, 1234L)),
|
||||
EventData.create(12349, "my_event_3", Attributes.empty())))
|
||||
.setTotalRecordedEvents(4)
|
||||
.setLinks(
|
||||
Arrays.asList(
|
||||
LinkData.create(SPAN_CONTEXT),
|
||||
LinkData.create(SPAN_CONTEXT, Attributes.of(LINK_ATTR_KEY, "value"))))
|
||||
.setTotalRecordedLinks(3)
|
||||
.setStatus(StatusData.ok())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateOutput() throws Exception {
|
||||
byte[] result;
|
||||
{
|
||||
TraceRequestMarshaler requestMarshaler = TraceRequestMarshaler.create(spanDataList);
|
||||
ByteArrayOutputStream customOutput =
|
||||
new ByteArrayOutputStream(requestMarshaler.getBinarySerializedSize());
|
||||
requestMarshaler.writeBinaryTo(customOutput);
|
||||
result = customOutput.toByteArray();
|
||||
}
|
||||
|
||||
byte[] lowAllocationResult;
|
||||
{
|
||||
LowAllocationTraceRequestMarshaler requestMarshaler =
|
||||
new LowAllocationTraceRequestMarshaler();
|
||||
requestMarshaler.initialize(spanDataList);
|
||||
ByteArrayOutputStream customOutput =
|
||||
new ByteArrayOutputStream(requestMarshaler.getBinarySerializedSize());
|
||||
requestMarshaler.writeBinaryTo(customOutput);
|
||||
lowAllocationResult = customOutput.toByteArray();
|
||||
}
|
||||
|
||||
assertThat(lowAllocationResult).isEqualTo(result);
|
||||
}
|
||||
|
||||
@Test
|
||||
void validateJsonOutput() throws Exception {
|
||||
String result;
|
||||
{
|
||||
TraceRequestMarshaler requestMarshaler = TraceRequestMarshaler.create(spanDataList);
|
||||
ByteArrayOutputStream customOutput =
|
||||
new ByteArrayOutputStream(requestMarshaler.getBinarySerializedSize());
|
||||
requestMarshaler.writeJsonTo(customOutput);
|
||||
result = new String(customOutput.toByteArray(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
String lowAllocationResult;
|
||||
{
|
||||
LowAllocationTraceRequestMarshaler requestMarshaler =
|
||||
new LowAllocationTraceRequestMarshaler();
|
||||
requestMarshaler.initialize(spanDataList);
|
||||
ByteArrayOutputStream customOutput =
|
||||
new ByteArrayOutputStream(requestMarshaler.getBinarySerializedSize());
|
||||
requestMarshaler.writeJsonTo(customOutput);
|
||||
lowAllocationResult = new String(customOutput.toByteArray(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
assertThat(lowAllocationResult).isEqualTo(result);
|
||||
}
|
||||
}
|
|
@ -27,6 +27,9 @@ import io.opentelemetry.api.trace.TraceFlags;
|
|||
import io.opentelemetry.api.trace.TraceId;
|
||||
import io.opentelemetry.api.trace.TraceState;
|
||||
import io.opentelemetry.exporter.internal.marshal.Marshaler;
|
||||
import io.opentelemetry.exporter.internal.marshal.MarshalerContext;
|
||||
import io.opentelemetry.exporter.internal.marshal.Serializer;
|
||||
import io.opentelemetry.exporter.internal.marshal.StatelessMarshaler;
|
||||
import io.opentelemetry.proto.common.v1.AnyValue;
|
||||
import io.opentelemetry.proto.common.v1.ArrayValue;
|
||||
import io.opentelemetry.proto.common.v1.InstrumentationScope;
|
||||
|
@ -40,6 +43,7 @@ import io.opentelemetry.sdk.resources.Resource;
|
|||
import io.opentelemetry.sdk.testing.trace.TestSpanData;
|
||||
import io.opentelemetry.sdk.trace.data.EventData;
|
||||
import io.opentelemetry.sdk.trace.data.LinkData;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import io.opentelemetry.sdk.trace.data.StatusData;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -49,6 +53,8 @@ import java.util.Base64;
|
|||
import java.util.Collections;
|
||||
import java.util.Locale;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
|
||||
class TraceRequestMarshalerTest {
|
||||
|
||||
|
@ -116,12 +122,13 @@ class TraceRequestMarshalerTest {
|
|||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoSpan() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoSpan(MarshalerSource marshalerSource) {
|
||||
Span protoSpan =
|
||||
parse(
|
||||
Span.getDefaultInstance(),
|
||||
SpanMarshaler.create(
|
||||
marshalerSource.create(
|
||||
TestSpanData.builder()
|
||||
.setHasEnded(true)
|
||||
.setSpanContext(SPAN_CONTEXT)
|
||||
|
@ -245,12 +252,13 @@ class TraceRequestMarshalerTest {
|
|||
.isEqualTo(Status.newBuilder().setCode(STATUS_CODE_OK).build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoSpan_withRemoteParent() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoSpan_withRemoteParent(MarshalerSource marshalerSource) {
|
||||
Span protoSpan =
|
||||
parse(
|
||||
Span.getDefaultInstance(),
|
||||
SpanMarshaler.create(
|
||||
marshalerSource.create(
|
||||
TestSpanData.builder()
|
||||
.setHasEnded(true)
|
||||
.setSpanContext(SPAN_CONTEXT)
|
||||
|
@ -292,9 +300,10 @@ class TraceRequestMarshalerTest {
|
|||
.isEqualTo(io.opentelemetry.proto.trace.v1.internal.Span.SpanKind.SPAN_KIND_CONSUMER);
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoStatus() {
|
||||
assertThat(parse(Status.getDefaultInstance(), SpanStatusMarshaler.create(StatusData.unset())))
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoStatus(MarshalerSource marshalerSource) {
|
||||
assertThat(parse(Status.getDefaultInstance(), marshalerSource.create(StatusData.unset())))
|
||||
.isEqualTo(Status.newBuilder().setCode(STATUS_CODE_UNSET).build());
|
||||
assertThat(
|
||||
parse(
|
||||
|
@ -313,12 +322,13 @@ class TraceRequestMarshalerTest {
|
|||
.isEqualTo(Status.newBuilder().setCode(STATUS_CODE_OK).setMessage("OK_OVERRIDE").build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoSpanEvent_WithoutAttributes() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoSpanEvent_WithoutAttributes(MarshalerSource marshalerSource) {
|
||||
assertThat(
|
||||
parse(
|
||||
Span.Event.getDefaultInstance(),
|
||||
SpanEventMarshaler.create(
|
||||
marshalerSource.create(
|
||||
EventData.create(12345, "test_without_attributes", Attributes.empty()))))
|
||||
.isEqualTo(
|
||||
Span.Event.newBuilder()
|
||||
|
@ -327,12 +337,13 @@ class TraceRequestMarshalerTest {
|
|||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoSpanEvent_WithAttributes() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoSpanEvent_WithAttributes(MarshalerSource marshalerSource) {
|
||||
assertThat(
|
||||
parse(
|
||||
Span.Event.getDefaultInstance(),
|
||||
SpanEventMarshaler.create(
|
||||
marshalerSource.create(
|
||||
EventData.create(
|
||||
12345,
|
||||
"test_with_attributes",
|
||||
|
@ -351,12 +362,13 @@ class TraceRequestMarshalerTest {
|
|||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoSpanLink_WithoutAttributes() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoSpanLink_WithoutAttributes(MarshalerSource marshalerSource) {
|
||||
assertThat(
|
||||
parse(
|
||||
Span.Link.getDefaultInstance(),
|
||||
SpanLinkMarshaler.create(LinkData.create(SPAN_CONTEXT))))
|
||||
marshalerSource.create(LinkData.create(SPAN_CONTEXT))))
|
||||
.isEqualTo(
|
||||
Span.Link.newBuilder()
|
||||
.setTraceId(ByteString.copyFrom(TRACE_ID_BYTES))
|
||||
|
@ -368,12 +380,13 @@ class TraceRequestMarshalerTest {
|
|||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoSpanLink_WithRemoteContext() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoSpanLink_WithRemoteContext(MarshalerSource marshalerSource) {
|
||||
assertThat(
|
||||
parse(
|
||||
Span.Link.getDefaultInstance(),
|
||||
SpanLinkMarshaler.create(LinkData.create(PARENT_SPAN_CONTEXT))))
|
||||
marshalerSource.create(LinkData.create(PARENT_SPAN_CONTEXT))))
|
||||
.isEqualTo(
|
||||
Span.Link.newBuilder()
|
||||
.setTraceId(ByteString.copyFrom(TRACE_ID_BYTES))
|
||||
|
@ -384,12 +397,13 @@ class TraceRequestMarshalerTest {
|
|||
.build());
|
||||
}
|
||||
|
||||
@Test
|
||||
void toProtoSpanLink_WithAttributes() {
|
||||
@ParameterizedTest
|
||||
@EnumSource(MarshalerSource.class)
|
||||
void toProtoSpanLink_WithAttributes(MarshalerSource marshalerSource) {
|
||||
assertThat(
|
||||
parse(
|
||||
Span.Link.getDefaultInstance(),
|
||||
SpanLinkMarshaler.create(
|
||||
marshalerSource.create(
|
||||
LinkData.create(
|
||||
SPAN_CONTEXT, Attributes.of(stringKey("key_string"), "string"), 5))))
|
||||
.isEqualTo(
|
||||
|
@ -494,7 +508,6 @@ class TraceRequestMarshalerTest {
|
|||
}
|
||||
|
||||
private static String toJson(Marshaler marshaler) {
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
try {
|
||||
marshaler.writeJsonTo(bos);
|
||||
|
@ -503,4 +516,75 @@ class TraceRequestMarshalerTest {
|
|||
}
|
||||
return new String(bos.toByteArray(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static <T> Marshaler createMarshaler(StatelessMarshaler<T> marshaler, T data) {
|
||||
return new Marshaler() {
|
||||
private final MarshalerContext context = new MarshalerContext();
|
||||
private final int size = marshaler.getBinarySerializedSize(data, context);
|
||||
|
||||
@Override
|
||||
public int getBinarySerializedSize() {
|
||||
return size;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void writeTo(Serializer output) throws IOException {
|
||||
context.resetReadIndex();
|
||||
marshaler.writeTo(output, data, context);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private enum MarshalerSource {
|
||||
MARSHALER {
|
||||
@Override
|
||||
Marshaler create(SpanData spanData) {
|
||||
return SpanMarshaler.create(spanData);
|
||||
}
|
||||
|
||||
@Override
|
||||
Marshaler create(StatusData statusData) {
|
||||
return SpanStatusMarshaler.create(statusData);
|
||||
}
|
||||
|
||||
@Override
|
||||
Marshaler create(EventData eventData) {
|
||||
return SpanEventMarshaler.create(eventData);
|
||||
}
|
||||
|
||||
@Override
|
||||
Marshaler create(LinkData linkData) {
|
||||
return SpanLinkMarshaler.create(linkData);
|
||||
}
|
||||
},
|
||||
LOW_ALLOCATION_MARSHALER {
|
||||
@Override
|
||||
Marshaler create(SpanData spanData) {
|
||||
return createMarshaler(SpanStatelessMarshaler.INSTANCE, spanData);
|
||||
}
|
||||
|
||||
@Override
|
||||
Marshaler create(StatusData statusData) {
|
||||
return createMarshaler(SpanStatusStatelessMarshaler.INSTANCE, statusData);
|
||||
}
|
||||
|
||||
@Override
|
||||
Marshaler create(EventData eventData) {
|
||||
return createMarshaler(SpanEventStatelessMarshaler.INSTANCE, eventData);
|
||||
}
|
||||
|
||||
@Override
|
||||
Marshaler create(LinkData linkData) {
|
||||
return createMarshaler(SpanLinkStatelessMarshaler.INSTANCE, linkData);
|
||||
}
|
||||
};
|
||||
|
||||
abstract Marshaler create(SpanData spanData);
|
||||
|
||||
abstract Marshaler create(StatusData statusData);
|
||||
|
||||
abstract Marshaler create(EventData eventData);
|
||||
|
||||
abstract Marshaler create(LinkData linkData);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue