Use marshaler for json logging. (#3616)

This commit is contained in:
Anuraag Agrawal 2021-09-15 10:40:53 +09:00 committed by GitHub
parent 72d33c4153
commit 0ae5e511a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 183 additions and 227 deletions

View File

@ -98,7 +98,6 @@ val DEPENDENCIES = listOf(
"org.awaitility:awaitility:4.1.0",
"org.bouncycastle:bcpkix-jdk15on:1.69",
"org.codehaus.mojo:animal-sniffer-annotations:1.20",
"org.curioswitch.curiostack:protobuf-jackson:1.2.0",
"org.jctools:jctools-core:3.3.0",
"org.junit-pioneer:junit-pioneer:1.4.2",
"org.skyscreamer:jsonassert:1.5.0",

View File

@ -13,9 +13,8 @@ dependencies {
compileOnly(project(":sdk:metrics"))
implementation(project(":exporters:otlp:common"))
implementation(project(":proto"))
implementation("org.curioswitch.curiostack:protobuf-jackson")
implementation("com.fasterxml.jackson.core:jackson-core")
testImplementation(project(":sdk:testing"))

View File

@ -1,52 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.logging.otlp;
import com.fasterxml.jackson.core.Base64Variant;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import com.fasterxml.jackson.core.util.JsonGeneratorDelegate;
import io.opentelemetry.api.internal.TemporaryBuffers;
import java.io.IOException;
final class HexEncodingStringJsonGenerator extends JsonGeneratorDelegate {
static final JsonFactory JSON_FACTORY = new JsonFactory();
static JsonGenerator create(SegmentedStringWriter stringWriter) {
final JsonGenerator delegate;
try {
delegate = JSON_FACTORY.createGenerator(stringWriter);
} catch (IOException e) {
throw new IllegalStateException("Unable to create in-memory JsonGenerator, can't happen.", e);
}
return new HexEncodingStringJsonGenerator(delegate);
}
private HexEncodingStringJsonGenerator(JsonGenerator delegate) {
super(delegate);
}
@Override
public void writeBinary(Base64Variant b64variant, byte[] data, int offset, int len)
throws IOException {
writeString(bytesToHex(data, offset, len));
}
private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray();
private static String bytesToHex(byte[] bytes, int offset, int len) {
int hexLength = len * 2;
char[] hexChars = TemporaryBuffers.chars(hexLength);
for (int i = 0; i < len; i++) {
int v = bytes[offset + i] & 0xFF;
hexChars[i * 2] = HEX_ARRAY[v >>> 4];
hexChars[i * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars, 0, hexLength);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.exporter.logging.otlp;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import java.io.IOException;
final class JsonUtil {
static final JsonFactory JSON_FACTORY = new JsonFactory();
static JsonGenerator create(SegmentedStringWriter stringWriter) {
try {
return JSON_FACTORY.createGenerator(stringWriter);
} catch (IOException e) {
throw new IllegalStateException("Unable to create in-memory JsonGenerator, can't happen.", e);
}
}
private JsonUtil() {}
}

View File

@ -5,34 +5,25 @@
package io.opentelemetry.exporter.logging.otlp;
import static io.opentelemetry.exporter.logging.otlp.HexEncodingStringJsonGenerator.JSON_FACTORY;
import static io.opentelemetry.exporter.logging.otlp.JsonUtil.JSON_FACTORY;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import io.opentelemetry.exporter.otlp.internal.MetricAdapter;
import io.opentelemetry.proto.metrics.v1.ResourceMetrics;
import io.opentelemetry.exporter.otlp.internal.metrics.ResourceMetricsMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.curioswitch.common.protobuf.json.MessageMarshaller;
/**
* A {@link MetricExporter} which writes {@linkplain MetricData spans} to a {@link Logger} in OTLP
* JSON format. Each log line will include a single {@link ResourceMetrics}.
* JSON format. Each log line will include a single {@code ResourceMetrics}.
*/
public final class OtlpJsonLoggingMetricExporter implements MetricExporter {
private static final MessageMarshaller marshaller =
MessageMarshaller.builder()
.register(ResourceMetrics.class)
.omittingInsignificantWhitespace(true)
.build();
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingMetricExporter.class.getName());
@ -45,11 +36,11 @@ public final class OtlpJsonLoggingMetricExporter implements MetricExporter {
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
List<ResourceMetrics> allResourceMetrics = MetricAdapter.toProtoResourceMetrics(metrics);
for (ResourceMetrics resourceMetrics : allResourceMetrics) {
ResourceMetricsMarshaler[] allResourceMetrics = ResourceMetricsMarshaler.create(metrics);
for (ResourceMetricsMarshaler resourceMetrics : allResourceMetrics) {
SegmentedStringWriter sw = new SegmentedStringWriter(JSON_FACTORY._getBufferRecycler());
try (JsonGenerator gen = HexEncodingStringJsonGenerator.create(sw)) {
marshaller.writeValue(resourceMetrics, gen);
try (JsonGenerator gen = JsonUtil.create(sw)) {
resourceMetrics.writeJsonTo(gen);
} catch (IOException e) {
// Shouldn't happen in practice, just skip it.
continue;

View File

@ -7,30 +7,21 @@ package io.opentelemetry.exporter.logging.otlp;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
import io.opentelemetry.exporter.otlp.internal.SpanAdapter;
import io.opentelemetry.proto.trace.v1.ResourceSpans;
import io.opentelemetry.exporter.otlp.internal.traces.ResourceSpansMarshaler;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.curioswitch.common.protobuf.json.MessageMarshaller;
/**
* A {@link SpanExporter} which writes {@linkplain SpanData spans} to a {@link Logger} in OTLP JSON
* format. Each log line will include a single {@link ResourceSpans}.
* format. Each log line will include a single {@code ResourceSpans}.
*/
public final class OtlpJsonLoggingSpanExporter implements SpanExporter {
private static final MessageMarshaller marshaller =
MessageMarshaller.builder()
.register(ResourceSpans.class)
.omittingInsignificantWhitespace(true)
.build();
private static final Logger logger =
Logger.getLogger(OtlpJsonLoggingSpanExporter.class.getName());
@ -43,13 +34,12 @@ public final class OtlpJsonLoggingSpanExporter implements SpanExporter {
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
List<ResourceSpans> allResourceSpans = SpanAdapter.toProtoResourceSpans(spans);
for (ResourceSpans resourceSpans : allResourceSpans) {
ResourceSpansMarshaler[] allResourceSpans = ResourceSpansMarshaler.create(spans);
for (ResourceSpansMarshaler resourceSpans : allResourceSpans) {
SegmentedStringWriter sw =
new SegmentedStringWriter(
HexEncodingStringJsonGenerator.JSON_FACTORY._getBufferRecycler());
try (JsonGenerator gen = HexEncodingStringJsonGenerator.create(sw)) {
marshaller.writeValue(resourceSpans, gen);
new SegmentedStringWriter(JsonUtil.JSON_FACTORY._getBufferRecycler());
try (JsonGenerator gen = JsonUtil.create(sw)) {
resourceSpans.writeJsonTo(gen);
} catch (IOException e) {
// Shouldn't happen in practice, just skip it.
continue;

View File

@ -101,7 +101,7 @@ class OtlpJsonLoggingMetricExporterTest {
+ " \"timeUnixNano\": \"2\","
+ " \"asDouble\": 4.0"
+ " }],"
+ " \"aggregationTemporality\": \"AGGREGATION_TEMPORALITY_CUMULATIVE\","
+ " \"aggregationTemporality\": 2,"
+ " \"isMonotonic\": true"
+ " }"
+ " }]"
@ -124,7 +124,7 @@ class OtlpJsonLoggingMetricExporterTest {
+ " \"timeUnixNano\": \"2\","
+ " \"asDouble\": 4.0"
+ " }],"
+ " \"aggregationTemporality\": \"AGGREGATION_TEMPORALITY_CUMULATIVE\","
+ " \"aggregationTemporality\": 2,"
+ " \"isMonotonic\": true"
+ " }"
+ " }]"

View File

@ -118,12 +118,12 @@ class OtlpJsonLoggingSpanExporterTest {
+ " \"traceId\": \"12340000000043211234000000004321\","
+ " \"spanId\": \"8765000000005678\","
+ " \"name\": \"testSpan2\","
+ " \"kind\": \"SPAN_KIND_CLIENT\","
+ " \"kind\": 3,"
+ " \"startTimeUnixNano\": \"500\","
+ " \"endTimeUnixNano\": \"1501\","
+ " \"status\": {"
+ " \"deprecatedCode\": \"DEPRECATED_STATUS_CODE_UNKNOWN_ERROR\","
+ " \"code\": \"STATUS_CODE_ERROR\""
+ " \"deprecatedCode\": 2,"
+ " \"code\": 2"
+ " }"
+ " }]"
+ " }, {"
@ -135,7 +135,7 @@ class OtlpJsonLoggingSpanExporterTest {
+ " \"traceId\": \"12345678876543211234567887654321\","
+ " \"spanId\": \"8765432112345678\","
+ " \"name\": \"testSpan1\","
+ " \"kind\": \"SPAN_KIND_INTERNAL\","
+ " \"kind\": 1,"
+ " \"startTimeUnixNano\": \"100\","
+ " \"endTimeUnixNano\": \"1100\","
+ " \"attributes\": [{"
@ -160,7 +160,7 @@ class OtlpJsonLoggingSpanExporterTest {
+ " }]"
+ " }],"
+ " \"status\": {"
+ " \"code\": \"STATUS_CODE_OK\""
+ " \"code\": 1"
+ " }"
+ " }]"
+ " }]"

View File

@ -51,8 +51,6 @@ dependencies {
jmhImplementation(project(":sdk:testing"))
jmhImplementation(project(":sdk-extensions:resources"))
jmhImplementation("com.fasterxml.jackson.core:jackson-core")
jmhImplementation("org.curioswitch.curiostack:protobuf-jackson")
jmhImplementation("com.google.protobuf:protobuf-java-util")
jmhRuntimeOnly("io.grpc:grpc-netty")
}

View File

@ -5,15 +5,11 @@
package io.opentelemetry.exporter.otlp.internal;
import com.google.protobuf.util.JsonFormat;
import io.opentelemetry.exporter.otlp.internal.traces.TraceRequestMarshaler;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.curioswitch.common.protobuf.json.MessageMarshaller;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
@ -30,12 +26,6 @@ import org.openjdk.jmh.annotations.Warmup;
@Fork(1)
public class RequestMarshalBenchmarks {
private static final MessageMarshaller BYTEBUDDY_MARSHALLER =
MessageMarshaller.builder()
.register(ExportTraceServiceRequest.class)
.omittingInsignificantWhitespace(true)
.build();
@Benchmark
@Threads(1)
public ByteArrayOutputStream createProtoMarshal(RequestMarshalState state) {
@ -83,32 +73,4 @@ public class RequestMarshalBenchmarks {
requestMarshaler.writeJsonTo(customOutput);
return customOutput;
}
@Benchmark
@Threads(1)
public ByteArrayOutputStream marshalJsonProtoByteBuddy(RequestMarshalState state)
throws IOException {
ExportTraceServiceRequest protoRequest =
ExportTraceServiceRequest.newBuilder()
.addAllResourceSpans(SpanAdapter.toProtoResourceSpans(state.spanDataList))
.build();
ByteArrayOutputStream customOutput = new ByteArrayOutputStream();
BYTEBUDDY_MARSHALLER.writeValue(protoRequest, customOutput);
return customOutput;
}
@Benchmark
@Threads(1)
public ByteArrayOutputStream marshalJsonProtoReflection(RequestMarshalState state)
throws IOException {
ExportTraceServiceRequest protoRequest =
ExportTraceServiceRequest.newBuilder()
.addAllResourceSpans(SpanAdapter.toProtoResourceSpans(state.spanDataList))
.build();
ByteArrayOutputStream customOutput = new ByteArrayOutputStream();
JsonFormat.printer()
.omittingInsignificantWhitespace()
.appendTo(protoRequest, new OutputStreamWriter(customOutput, StandardCharsets.UTF_8));
return customOutput;
}
}

View File

@ -9,6 +9,7 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
final class JsonSerializer extends Serializer {
@ -18,7 +19,11 @@ final class JsonSerializer extends Serializer {
private final JsonGenerator generator;
JsonSerializer(OutputStream output) throws IOException {
generator = JSON_FACTORY.createGenerator(output);
this(JSON_FACTORY.createGenerator(output));
}
JsonSerializer(JsonGenerator generator) {
this.generator = generator;
}
@Override
@ -74,7 +79,13 @@ final class JsonSerializer extends Serializer {
@Override
protected void writeString(ProtoFieldInfo field, byte[] utf8Bytes) throws IOException {
generator.writeFieldName(field.getJsonName());
generator.writeUTF8String(utf8Bytes, 0, utf8Bytes.length);
// Marshalers encoded String into UTF-8 bytes to optimize for binary serialization where
// we are able to avoid the encoding process happening twice, one for size computation and one
// for actual writing. JsonGenerator actually has a writeUTF8String that would be able to accept
// this, but it only works when writing to an OutputStream, but not to a String like we do for
// writing to logs. It's wasteful to take a String, convert it to bytes, and convert back to
// the same String but we can see if this can be improved in the future.
generator.writeString(new String(utf8Bytes, StandardCharsets.UTF_8));
}
@Override

View File

@ -5,6 +5,7 @@
package io.opentelemetry.exporter.otlp.internal;
import com.fasterxml.jackson.core.JsonGenerator;
import java.io.IOException;
import java.io.OutputStream;
@ -30,6 +31,13 @@ public abstract class Marshaler {
}
}
/** Marshals into the {@link JsonGenerator} in proto JSON format. */
public final void writeJsonTo(JsonGenerator output) throws IOException {
try (JsonSerializer serializer = new JsonSerializer(output)) {
serializer.writeMessageValue(this);
}
}
/** Returns the number of bytes this Marshaler will write in proto binary format. */
public abstract int getBinarySerializedSize();

View File

@ -5,20 +5,14 @@
package io.opentelemetry.exporter.otlp.internal.metrics;
import io.opentelemetry.exporter.otlp.internal.InstrumentationLibraryMarshaler;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.MarshalerUtil;
import io.opentelemetry.exporter.otlp.internal.MarshalerWithSize;
import io.opentelemetry.exporter.otlp.internal.ResourceMarshaler;
import io.opentelemetry.exporter.otlp.internal.Serializer;
import io.opentelemetry.proto.collector.metrics.v1.internal.ExportMetricsServiceRequest;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* {@link Marshaler} to convert SDK {@link MetricData} to OTLP ExportMetricsServiceRequest.
@ -35,33 +29,7 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
* MetricData} into a serialized OTLP ExportMetricsServiceRequest.
*/
public static MetricsRequestMarshaler create(Collection<MetricData> metricDataList) {
Map<Resource, Map<InstrumentationLibraryInfo, List<Marshaler>>> resourceAndLibraryMap =
groupByResourceAndLibrary(metricDataList);
ResourceMetricsMarshaler[] resourceMetricsMarshalers =
new ResourceMetricsMarshaler[resourceAndLibraryMap.size()];
int posResource = 0;
for (Map.Entry<Resource, Map<InstrumentationLibraryInfo, List<Marshaler>>> entry :
resourceAndLibraryMap.entrySet()) {
final InstrumentationLibraryMetricsMarshaler[] instrumentationLibrarySpansMarshalers =
new InstrumentationLibraryMetricsMarshaler[entry.getValue().size()];
int posInstrumentation = 0;
for (Map.Entry<InstrumentationLibraryInfo, List<Marshaler>> entryIs :
entry.getValue().entrySet()) {
instrumentationLibrarySpansMarshalers[posInstrumentation++] =
new InstrumentationLibraryMetricsMarshaler(
InstrumentationLibraryMarshaler.create(entryIs.getKey()),
MarshalerUtil.toBytes(entryIs.getKey().getSchemaUrl()),
entryIs.getValue());
}
resourceMetricsMarshalers[posResource++] =
new ResourceMetricsMarshaler(
ResourceMarshaler.create(entry.getKey()),
MarshalerUtil.toBytes(entry.getKey().getSchemaUrl()),
instrumentationLibrarySpansMarshalers);
}
return new MetricsRequestMarshaler(resourceMetricsMarshalers);
return new MetricsRequestMarshaler(ResourceMetricsMarshaler.create(metricDataList));
}
private MetricsRequestMarshaler(ResourceMetricsMarshaler[] resourceMetricsMarshalers) {
@ -82,15 +50,4 @@ public final class MetricsRequestMarshaler extends MarshalerWithSize {
ExportMetricsServiceRequest.RESOURCE_METRICS, resourceMetricsMarshalers);
return size;
}
private static Map<Resource, Map<InstrumentationLibraryInfo, List<Marshaler>>>
groupByResourceAndLibrary(Collection<MetricData> metricDataList) {
return MarshalerUtil.groupByResourceAndLibrary(
metricDataList,
// TODO(anuraaga): Replace with an internal SdkData type of interface that exposes these
// two.
MetricData::getResource,
MetricData::getInstrumentationLibraryInfo,
MetricMarshaler::create);
}
}

View File

@ -5,18 +5,63 @@
package io.opentelemetry.exporter.otlp.internal.metrics;
import io.opentelemetry.exporter.otlp.internal.InstrumentationLibraryMarshaler;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.MarshalerUtil;
import io.opentelemetry.exporter.otlp.internal.MarshalerWithSize;
import io.opentelemetry.exporter.otlp.internal.ResourceMarshaler;
import io.opentelemetry.exporter.otlp.internal.Serializer;
import io.opentelemetry.proto.metrics.v1.internal.ResourceMetrics;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.resources.Resource;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
final class ResourceMetricsMarshaler extends MarshalerWithSize {
/**
* A Marshaler of ResourceMetrics.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class ResourceMetricsMarshaler extends MarshalerWithSize {
private final ResourceMarshaler resourceMarshaler;
private final byte[] schemaUrl;
private final InstrumentationLibraryMetricsMarshaler[] instrumentationLibraryMetricsMarshalers;
/** Returns Marshalers of ResourceMetrics created by grouping the provided metricData. */
public static ResourceMetricsMarshaler[] create(Collection<MetricData> metricDataList) {
Map<Resource, Map<InstrumentationLibraryInfo, List<Marshaler>>> resourceAndLibraryMap =
groupByResourceAndLibrary(metricDataList);
ResourceMetricsMarshaler[] resourceMetricsMarshalers =
new ResourceMetricsMarshaler[resourceAndLibraryMap.size()];
int posResource = 0;
for (Map.Entry<Resource, Map<InstrumentationLibraryInfo, List<Marshaler>>> entry :
resourceAndLibraryMap.entrySet()) {
final InstrumentationLibraryMetricsMarshaler[] instrumentationLibrarySpansMarshalers =
new InstrumentationLibraryMetricsMarshaler[entry.getValue().size()];
int posInstrumentation = 0;
for (Map.Entry<InstrumentationLibraryInfo, List<Marshaler>> entryIs :
entry.getValue().entrySet()) {
instrumentationLibrarySpansMarshalers[posInstrumentation++] =
new InstrumentationLibraryMetricsMarshaler(
InstrumentationLibraryMarshaler.create(entryIs.getKey()),
MarshalerUtil.toBytes(entryIs.getKey().getSchemaUrl()),
entryIs.getValue());
}
resourceMetricsMarshalers[posResource++] =
new ResourceMetricsMarshaler(
ResourceMarshaler.create(entry.getKey()),
MarshalerUtil.toBytes(entry.getKey().getSchemaUrl()),
instrumentationLibrarySpansMarshalers);
}
return resourceMetricsMarshalers;
}
ResourceMetricsMarshaler(
ResourceMarshaler resourceMarshaler,
byte[] schemaUrl,
@ -48,4 +93,15 @@ final class ResourceMetricsMarshaler extends MarshalerWithSize {
instrumentationLibraryMetricsMarshalers);
return size;
}
private static Map<Resource, Map<InstrumentationLibraryInfo, List<Marshaler>>>
groupByResourceAndLibrary(Collection<MetricData> metricDataList) {
return MarshalerUtil.groupByResourceAndLibrary(
metricDataList,
// TODO(anuraaga): Replace with an internal SdkData type of interface that exposes these
// two.
MetricData::getResource,
MetricData::getInstrumentationLibraryInfo,
MetricMarshaler::create);
}
}

View File

@ -5,18 +5,61 @@
package io.opentelemetry.exporter.otlp.internal.traces;
import io.opentelemetry.exporter.otlp.internal.InstrumentationLibraryMarshaler;
import io.opentelemetry.exporter.otlp.internal.MarshalerUtil;
import io.opentelemetry.exporter.otlp.internal.MarshalerWithSize;
import io.opentelemetry.exporter.otlp.internal.ResourceMarshaler;
import io.opentelemetry.exporter.otlp.internal.Serializer;
import io.opentelemetry.proto.trace.v1.internal.ResourceSpans;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
final class ResourceSpansMarshaler extends MarshalerWithSize {
/**
* A Marshaler of ResourceSpans.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class ResourceSpansMarshaler extends MarshalerWithSize {
private final ResourceMarshaler resourceMarshaler;
private final byte[] schemaUrlUtf8;
private final InstrumentationLibrarySpansMarshaler[] instrumentationLibrarySpansMarshalers;
/** Returns Marshalers of ResourceSpans created by grouping the provided SpanData. */
public static ResourceSpansMarshaler[] create(Collection<SpanData> spanDataList) {
Map<Resource, Map<InstrumentationLibraryInfo, List<SpanMarshaler>>> resourceAndLibraryMap =
groupByResourceAndLibrary(spanDataList);
final ResourceSpansMarshaler[] resourceSpansMarshalers =
new ResourceSpansMarshaler[resourceAndLibraryMap.size()];
int posResource = 0;
for (Map.Entry<Resource, Map<InstrumentationLibraryInfo, List<SpanMarshaler>>> entry :
resourceAndLibraryMap.entrySet()) {
final InstrumentationLibrarySpansMarshaler[] instrumentationLibrarySpansMarshalers =
new InstrumentationLibrarySpansMarshaler[entry.getValue().size()];
int posInstrumentation = 0;
for (Map.Entry<InstrumentationLibraryInfo, List<SpanMarshaler>> entryIs :
entry.getValue().entrySet()) {
instrumentationLibrarySpansMarshalers[posInstrumentation++] =
new InstrumentationLibrarySpansMarshaler(
InstrumentationLibraryMarshaler.create(entryIs.getKey()),
MarshalerUtil.toBytes(entryIs.getKey().getSchemaUrl()),
entryIs.getValue());
}
resourceSpansMarshalers[posResource++] =
new ResourceSpansMarshaler(
ResourceMarshaler.create(entry.getKey()),
MarshalerUtil.toBytes(entry.getKey().getSchemaUrl()),
instrumentationLibrarySpansMarshalers);
}
return resourceSpansMarshalers;
}
ResourceSpansMarshaler(
ResourceMarshaler resourceMarshaler,
byte[] schemaUrlUtf8,
@ -47,4 +90,15 @@ final class ResourceSpansMarshaler extends MarshalerWithSize {
ResourceSpans.INSTRUMENTATION_LIBRARY_SPANS, instrumentationLibrarySpansMarshalers);
return size;
}
private static Map<Resource, Map<InstrumentationLibraryInfo, List<SpanMarshaler>>>
groupByResourceAndLibrary(Collection<SpanData> spanDataList) {
return MarshalerUtil.groupByResourceAndLibrary(
spanDataList,
// TODO(anuraaga): Replace with an internal SdkData type of interface that exposes these
// two.
SpanData::getResource,
SpanData::getInstrumentationLibraryInfo,
SpanMarshaler::create);
}
}

View File

@ -5,20 +5,14 @@
package io.opentelemetry.exporter.otlp.internal.traces;
import io.opentelemetry.exporter.otlp.internal.InstrumentationLibraryMarshaler;
import io.opentelemetry.exporter.otlp.internal.Marshaler;
import io.opentelemetry.exporter.otlp.internal.MarshalerUtil;
import io.opentelemetry.exporter.otlp.internal.MarshalerWithSize;
import io.opentelemetry.exporter.otlp.internal.ResourceMarshaler;
import io.opentelemetry.exporter.otlp.internal.Serializer;
import io.opentelemetry.proto.collector.trace.v1.internal.ExportTraceServiceRequest;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* {@link Marshaler} to convert SDK {@link SpanData} to OTLP ExportTraceServiceRequest.
@ -35,33 +29,7 @@ public final class TraceRequestMarshaler extends MarshalerWithSize {
* SpanData} into a serialized OTLP ExportTraceServiceRequest.
*/
public static TraceRequestMarshaler create(Collection<SpanData> spanDataList) {
Map<Resource, Map<InstrumentationLibraryInfo, List<SpanMarshaler>>> resourceAndLibraryMap =
groupByResourceAndLibrary(spanDataList);
final ResourceSpansMarshaler[] resourceSpansMarshalers =
new ResourceSpansMarshaler[resourceAndLibraryMap.size()];
int posResource = 0;
for (Map.Entry<Resource, Map<InstrumentationLibraryInfo, List<SpanMarshaler>>> entry :
resourceAndLibraryMap.entrySet()) {
final InstrumentationLibrarySpansMarshaler[] instrumentationLibrarySpansMarshalers =
new InstrumentationLibrarySpansMarshaler[entry.getValue().size()];
int posInstrumentation = 0;
for (Map.Entry<InstrumentationLibraryInfo, List<SpanMarshaler>> entryIs :
entry.getValue().entrySet()) {
instrumentationLibrarySpansMarshalers[posInstrumentation++] =
new InstrumentationLibrarySpansMarshaler(
InstrumentationLibraryMarshaler.create(entryIs.getKey()),
MarshalerUtil.toBytes(entryIs.getKey().getSchemaUrl()),
entryIs.getValue());
}
resourceSpansMarshalers[posResource++] =
new ResourceSpansMarshaler(
ResourceMarshaler.create(entry.getKey()),
MarshalerUtil.toBytes(entry.getKey().getSchemaUrl()),
instrumentationLibrarySpansMarshalers);
}
return new TraceRequestMarshaler(resourceSpansMarshalers);
return new TraceRequestMarshaler(ResourceSpansMarshaler.create(spanDataList));
}
private TraceRequestMarshaler(ResourceSpansMarshaler[] resourceSpansMarshalers) {
@ -76,15 +44,4 @@ public final class TraceRequestMarshaler extends MarshalerWithSize {
output.serializeRepeatedMessage(
ExportTraceServiceRequest.RESOURCE_SPANS, resourceSpansMarshalers);
}
private static Map<Resource, Map<InstrumentationLibraryInfo, List<SpanMarshaler>>>
groupByResourceAndLibrary(Collection<SpanData> spanDataList) {
return MarshalerUtil.groupByResourceAndLibrary(
spanDataList,
// TODO(anuraaga): Replace with an internal SdkData type of interface that exposes these
// two.
SpanData::getResource,
SpanData::getInstrumentationLibraryInfo,
data -> SpanMarshaler.create(data));
}
}