Add LogRecordProcessor to record event log records as span events (#1551)

This commit is contained in:
jack-berg 2024-11-20 17:43:36 -06:00 committed by GitHub
parent c2f8c17c45
commit 4c72e59721
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 450 additions and 2 deletions

View File

@ -56,6 +56,7 @@ components:
processors:
- LikeTheSalad
- breedx-splk
- jack-berg
prometheus-collector:
- jkwatson
resource-providers:

View File

@ -1,10 +1,38 @@
# Processors
This module provides tools to intercept and process signals globally.
## Interceptable exporters
This module provides tools to intercept and process signals globally:
* `InterceptableSpanExporter`
* `InterceptableMetricExporter`
* `InterceptableLogRecordExporter`
## Event to SpanEvent Bridge
`EventToSpanEventBridge` is a `LogRecordProcessor` which records events (i.e. log records with an `event.name` attribute) as span events for the current span if:
* The log record has a valid span context
* `Span.current()` returns a span where `Span.isRecording()` is true
For details of how the event log record is translated to span event, see [EventToSpanEventBridge Javadoc](./src/main/java/io/opentelemetry/contrib/eventbridge/EventToSpanEventBridge.java).
`EventToSpanEventBridge` can be referenced in [declarative configuration](https://opentelemetry.io/docs/languages/java/configuration/#declarative-configuration) as follows:
```yaml
# Configure tracer provider as usual, omitted for brevity
tracer_provider: ...
logger_provider:
processors:
# TODO(jack-berg): remove "{}" after releasing [opentelemetry-java#6891](https://github.com/open-telemetry/opentelemetry-java/pull/6891/files)
- event_to_span_event_bridge: {}
```
## Component owners
- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
- [Jack Berg](https://github.com/jack-berg), New Relic
- [Jason Plumb](https://github.com/breedx-splk), Splunk
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).

View File

@ -13,5 +13,13 @@ java {
dependencies {
api("io.opentelemetry:opentelemetry-sdk")
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi")
// For EventToSpanEventBridge
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
implementation("com.fasterxml.jackson.core:jackson-core")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
}

View File

@ -0,0 +1,136 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.eventbridge;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.otlp.AnyValueMarshaler;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A processor that records events (i.e. log records with an {@code event.name} attribute) as span
* events for the current span if:
*
* <ul>
* <li>The log record has a valid span context
* <li>{@link Span#current()} returns a span where {@link Span#isRecording()} is true
* <li>The log record's span context is the same as {@link Span#current()}
* </ul>
*
* <p>The event {@link LogRecordData} is converted to a span event as follows:
*
* <ul>
* <li>{@code event.name} attribute is mapped to span event name
* <li>{@link LogRecordData#getTimestampEpochNanos()} is mapped to span event timestamp
* <li>{@link LogRecordData#getAttributes()} are mapped to span event attributes, excluding {@code
* event.name}
* <li>{@link LogRecordData#getObservedTimestampEpochNanos()} is mapped to span event attribute
* with key {@code log.record.observed_timestamp}
* <li>{@link LogRecordData#getSeverity()} is mapped to span event attribute with key {@code
* log.record.severity_number}
* <li>{@link LogRecordData#getBodyValue()} is mapped to span event attribute with key {@code
* log.record.body}, as an escaped JSON string following the standard protobuf JSON encoding
* <li>{@link LogRecordData#getTotalAttributeCount()} - {@link
* LogRecordData#getAttributes()}.size() is mapped to span event attribute with key {@code
* log.record.dropped_attributes_count}
* </ul>
*/
public final class EventToSpanEventBridge implements LogRecordProcessor {
private static final Logger logger = Logger.getLogger(EventToSpanEventBridge.class.getName());
private static final AttributeKey<String> EVENT_NAME = AttributeKey.stringKey("event.name");
private static final AttributeKey<Long> LOG_RECORD_OBSERVED_TIME_UNIX_NANO =
AttributeKey.longKey("log.record.observed_time_unix_nano");
private static final AttributeKey<Long> LOG_RECORD_SEVERITY_NUMBER =
AttributeKey.longKey("log.record.severity_number");
private static final AttributeKey<String> LOG_RECORD_BODY =
AttributeKey.stringKey("log.record.body");
private static final AttributeKey<Long> LOG_RECORD_DROPPED_ATTRIBUTES_COUNT =
AttributeKey.longKey("log.record.dropped_attributes_count");
private EventToSpanEventBridge() {}
/** Create an instance. */
public static EventToSpanEventBridge create() {
return new EventToSpanEventBridge();
}
@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
LogRecordData logRecordData = logRecord.toLogRecordData();
String eventName = logRecordData.getAttributes().get(EVENT_NAME);
if (eventName == null) {
return;
}
SpanContext logSpanContext = logRecordData.getSpanContext();
if (!logSpanContext.isValid()) {
return;
}
Span currentSpan = Span.current();
if (!currentSpan.isRecording()) {
return;
}
if (!currentSpan.getSpanContext().equals(logSpanContext)) {
return;
}
currentSpan.addEvent(
eventName,
toSpanEventAttributes(logRecordData),
logRecordData.getTimestampEpochNanos(),
TimeUnit.NANOSECONDS);
}
private static Attributes toSpanEventAttributes(LogRecordData logRecord) {
AttributesBuilder builder =
logRecord.getAttributes().toBuilder().removeIf(key -> key.equals(EVENT_NAME));
builder.put(LOG_RECORD_OBSERVED_TIME_UNIX_NANO, logRecord.getObservedTimestampEpochNanos());
builder.put(LOG_RECORD_SEVERITY_NUMBER, logRecord.getSeverity().getSeverityNumber());
// Add bridging for logRecord.getSeverityText() if EventBuilder adds severity text setter
Value<?> body = logRecord.getBodyValue();
if (body != null) {
MarshalerWithSize marshaler = AnyValueMarshaler.create(body);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
marshaler.writeJsonTo(out);
builder.put(LOG_RECORD_BODY, out.toString(StandardCharsets.UTF_8.name()));
} catch (IOException e) {
logger.log(Level.WARNING, "Error converting log record body to JSON", e);
}
}
int droppedAttributesCount =
logRecord.getTotalAttributeCount() - logRecord.getAttributes().size();
if (droppedAttributesCount > 0) {
builder.put(LOG_RECORD_DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
}
return builder.build();
}
@Override
public String toString() {
return "EventToSpanEventBridge{}";
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.eventbridge.internal;
import io.opentelemetry.contrib.eventbridge.EventToSpanEventBridge;
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
/**
* Declarative configuration SPI implementation for {@link EventToSpanEventBridge}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class EventToSpanEventBridgeComponentProvider
implements ComponentProvider<LogRecordProcessor> {
@Override
public Class<LogRecordProcessor> getType() {
return LogRecordProcessor.class;
}
@Override
public String getName() {
return "event_to_span_event_bridge";
}
@Override
public LogRecordProcessor create(StructuredConfigProperties config) {
return EventToSpanEventBridge.create();
}
}

View File

@ -0,0 +1 @@
io.opentelemetry.contrib.eventbridge.internal.EventToSpanEventBridgeComponentProvider

View File

@ -0,0 +1,203 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.eventbridge;
import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.incubator.events.EventLogger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.testing.time.TestClock;
import io.opentelemetry.sdk.trace.IdGenerator;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;
class EventToSpanEventBridgeTest {
private final InMemorySpanExporter spanExporter = InMemorySpanExporter.create();
private final SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.setSampler(onlyServerSpans())
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();
private final TestClock testClock = TestClock.create();
private final SdkEventLoggerProvider eventLoggerProvider =
SdkEventLoggerProvider.create(
SdkLoggerProvider.builder()
.setClock(testClock)
.addLogRecordProcessor(EventToSpanEventBridge.create())
.build());
private final Tracer tracer = tracerProvider.get("tracer");
private final EventLogger eventLogger = eventLoggerProvider.get("event-logger");
private static Sampler onlyServerSpans() {
return new Sampler() {
@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {
return SpanKind.SERVER.equals(spanKind)
? SamplingResult.recordAndSample()
: SamplingResult.drop();
}
@Override
public String getDescription() {
return "description";
}
};
}
@Test
void withRecordingSpan_BridgesEvent() {
testClock.setTime(Instant.ofEpochMilli(1));
// The test tracerProvider has a sampler which records and samples SERVER spans, and drops all
// others. We create a recording span by setting kind to SERVER.
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.SERVER).startSpan();
try (Scope unused = span.makeCurrent()) {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
} finally {
span.end();
}
assertThat(spanExporter.getFinishedSpanItems())
.satisfiesExactly(
spanData ->
assertThat(spanData)
.hasName("span")
.hasEventsSatisfyingExactly(
spanEvent ->
spanEvent
.hasName("my.event-name")
.hasTimestamp(100, TimeUnit.NANOSECONDS)
.hasAttributesSatisfying(
attributes -> {
assertThat(attributes.get(stringKey("color")))
.isEqualTo("red");
assertThat(attributes.get(stringKey("shape")))
.isEqualTo("square");
assertThat(
attributes.get(
longKey("log.record.observed_time_unix_nano")))
.isEqualTo(1000000L);
assertThat(
attributes.get(longKey("log.record.severity_number")))
.isEqualTo(Severity.DEBUG.getSeverityNumber());
assertThat(attributes.get(stringKey("log.record.body")))
.isEqualTo(
"{\"kvlistValue\":{\"values\":[{\"key\":\"number\",\"value\":{\"intValue\":\"1\"}},{\"key\":\"foo\",\"value\":{\"stringValue\":\"bar\"}},{\"key\":\"map\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"key\",\"value\":{\"stringValue\":\"value\"}}]}}}]}}");
})));
}
@Test
void nonRecordingSpan_doesNotBridgeEvent() {
// The test tracerProvider has a sampler which records and samples server spans, and drops all
// others. We create a non-recording span by setting kind to INTERNAL.
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.INTERNAL).startSpan();
try (Scope unused = span.makeCurrent()) {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
} finally {
span.end();
}
assertThat(spanExporter.getFinishedSpanItems())
.allSatisfy(spanData -> assertThat(spanData.getEvents()).isEmpty());
}
@Test
void differentSpanContext_doesNotBridgeEvent() {
// The test tracerProvider has a sampler which records and samples SERVER spans, and drops all
// others. We create a recording span by setting kind to SERVER.
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.SERVER).startSpan();
try (Scope unused = span.makeCurrent()) {
eventLogger
.builder("my.event-name")
// Manually override the context
.setContext(
Span.wrap(
SpanContext.create(
IdGenerator.random().generateTraceId(),
IdGenerator.random().generateSpanId(),
TraceFlags.getDefault(),
TraceState.getDefault()))
.storeInContext(Context.current()))
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
} finally {
span.end();
}
assertThat(spanExporter.getFinishedSpanItems())
.allSatisfy(spanData -> assertThat(spanData.getEvents()).isEmpty());
}
@Test
void noSpan_doesNotBridgeEvent() {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
assertThat(spanExporter.getFinishedSpanItems()).isEmpty();
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.eventbridge.internal;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.extension.incubator.fileconfig.FileConfiguration;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Test;
class EventToSpanBridgeComponentProviderTest {
@Test
void endToEnd() {
String yaml =
"file_format: 0.3\n"
+ "logger_provider:\n"
+ " processors:\n"
// TODO(jack-berg): remove "{}" after releasing
// https://github.com/open-telemetry/opentelemetry-java/pull/6891/files
+ " - event_to_span_event_bridge: {}\n";
OpenTelemetrySdk openTelemetrySdk =
FileConfiguration.parseAndCreate(
new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)));
assertThat(openTelemetrySdk.getSdkLoggerProvider().toString())
.matches("SdkLoggerProvider\\{.*logRecordProcessor=EventToSpanEventBridge\\{}.*}");
}
}

View File

@ -2,7 +2,7 @@
## Declarative configuration
The following samplers support [declarative configuration](https://github.com/open-telemetry/opentelemetry-specification/tree/main/specification/configuration#declarative-configuration):
The following samplers support [declarative configuration](https://opentelemetry.io/docs/languages/java/configuration/#declarative-configuration):
* `RuleBasedRoutingSampler`