Fix capturing context in log4j library instrumentation with async logger (#12176)

This commit is contained in:
Lauri Tulmin 2024-09-13 01:09:41 +03:00 committed by GitHub
parent c2713e16d8
commit e8c5c066d6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 174 additions and 42 deletions

View File

@ -37,7 +37,7 @@ dependencies {
}
// this is needed for the async logging test
testImplementation("com.lmax:disruptor:3.4.2")
testLibrary("com.lmax:disruptor:3.4.2")
}
tasks.withType<Test>().configureEach {

View File

@ -9,6 +9,7 @@ import static java.util.Collections.emptyList;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.incubator.config.internal.InstrumentationConfig;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.ContextDataAccessor;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.LogEventMapper;
@ -77,7 +78,15 @@ public final class Log4jHelper {
threadId = currentThread.getId();
}
mapper.mapLogEvent(
builder, message, level, marker, throwable, contextData, threadName, threadId);
builder,
message,
level,
marker,
throwable,
contextData,
threadName,
threadId,
Context.current());
builder.setTimestamp(Instant.now());
builder.emit();
}
@ -87,12 +96,12 @@ public final class Log4jHelper {
@Override
@Nullable
public Object getValue(Map<String, String> contextData, String key) {
public String getValue(Map<String, String> contextData, String key) {
return contextData.get(key);
}
@Override
public void forEach(Map<String, String> contextData, BiConsumer<String, Object> action) {
public void forEach(Map<String, String> contextData, BiConsumer<String, String> action) {
contextData.forEach(action);
}
}

View File

@ -6,13 +6,26 @@ dependencies {
library("org.apache.logging.log4j:log4j-core:2.17.0")
annotationProcessor("org.apache.logging.log4j:log4j-core:2.17.0")
implementation(project(":instrumentation:log4j:log4j-context-data:log4j-context-data-2.17:library-autoconfigure"))
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testLibrary("com.lmax:disruptor:3.3.4")
if (findProperty("testLatestDeps") as Boolean) {
testCompileOnly("biz.aQute.bnd:biz.aQute.bnd.annotation:7.0.0")
}
}
tasks.withType<Test>().configureEach {
jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")
tasks {
withType<Test>().configureEach {
jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")
}
val testAsyncLogger by registering(Test::class) {
jvmArgs("-DLog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector")
}
check {
dependsOn(testAsyncLogger)
}
}

View File

@ -18,6 +18,7 @@ import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.StringMapMessage;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.util.ReadOnlyStringMap;
import org.apache.logging.log4j.util.SortedArrayStringMap;
class LogEventToReplay implements LogEvent {
@ -59,7 +60,8 @@ class LogEventToReplay implements LogEvent {
this.instant = logEvent.getInstant();
this.thrown = logEvent.getThrown();
this.marker = logEvent.getMarker();
this.contextData = logEvent.getContextData();
// copy context data, context data map may be reused
this.contextData = new SortedArrayStringMap(logEvent.getContextData());
this.threadName = logEvent.getThreadName();
this.threadId = logEvent.getThreadId();
}

View File

@ -10,8 +10,14 @@ import static java.util.Collections.emptyList;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.ContextDataAccessor;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.LogEventMapper;
import io.opentelemetry.instrumentation.log4j.contextdata.v2_17.internal.ContextDataKeys;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@ -272,6 +278,28 @@ public class OpenTelemetryAppender extends AbstractAppender {
LogRecordBuilder builder =
openTelemetry.getLogsBridge().loggerBuilder(instrumentationName).build().logRecordBuilder();
ReadOnlyStringMap contextData = event.getContextData();
Context context = Context.current();
// when using async logger we'll be executing on a different thread than what started logging
// reconstruct the context from context data
if (context == Context.root()) {
ContextDataAccessor<ReadOnlyStringMap> contextDataAccessor = ContextDataAccessorImpl.INSTANCE;
String traceId = contextDataAccessor.getValue(contextData, ContextDataKeys.TRACE_ID_KEY);
String spanId = contextDataAccessor.getValue(contextData, ContextDataKeys.SPAN_ID_KEY);
String traceFlags =
contextDataAccessor.getValue(contextData, ContextDataKeys.TRACE_FLAGS_KEY);
if (traceId != null && spanId != null && traceFlags != null) {
context =
Context.root()
.with(
Span.wrap(
SpanContext.create(
traceId,
spanId,
TraceFlags.fromHex(traceFlags, 0),
TraceState.getDefault())));
}
}
mapper.mapLogEvent(
builder,
event.getMessage(),
@ -280,7 +308,8 @@ public class OpenTelemetryAppender extends AbstractAppender {
event.getThrown(),
contextData,
event.getThreadName(),
event.getThreadId());
event.getThreadId(),
context);
Instant timestamp = event.getInstant();
if (timestamp != null) {
@ -297,12 +326,12 @@ public class OpenTelemetryAppender extends AbstractAppender {
@Override
@Nullable
public Object getValue(ReadOnlyStringMap contextData, String key) {
public String getValue(ReadOnlyStringMap contextData, String key) {
return contextData.getValue(key);
}
@Override
public void forEach(ReadOnlyStringMap contextData, BiConsumer<String, Object> action) {
public void forEach(ReadOnlyStringMap contextData, BiConsumer<String, String> action) {
contextData.forEach(action::accept);
}
}

View File

@ -15,7 +15,7 @@ import javax.annotation.Nullable;
public interface ContextDataAccessor<T> {
@Nullable
Object getValue(T contextData, String key);
String getValue(T contextData, String key);
void forEach(T contextData, BiConsumer<String, Object> action);
void forEach(T contextData, BiConsumer<String, String> action);
}

View File

@ -86,7 +86,8 @@ public final class LogEventMapper<T> {
@Nullable Throwable throwable,
T contextData,
String threadName,
long threadId) {
long threadId,
Context context) {
AttributesBuilder attributes = Attributes.builder();
@ -116,8 +117,7 @@ public final class LogEventMapper<T> {
}
builder.setAllAttributes(attributes.build());
builder.setContext(Context.current());
builder.setContext(context);
}
// visible for testing
@ -165,16 +165,16 @@ public final class LogEventMapper<T> {
contextData,
(key, value) -> {
if (value != null) {
attributes.put(getContextDataAttributeKey(key), value.toString());
attributes.put(getContextDataAttributeKey(key), value);
}
});
return;
}
for (String key : captureContextDataAttributes) {
Object value = contextDataAccessor.getValue(contextData, key);
String value = contextDataAccessor.getValue(contextData, key);
if (value != null) {
attributes.put(getContextDataAttributeKey(key), value.toString());
attributes.put(getContextDataAttributeKey(key), value);
}
}
}

View File

@ -11,7 +11,6 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equal
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
@ -19,6 +18,7 @@ import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.semconv.ExceptionAttributes;
import io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes;
import java.time.Instant;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -93,7 +93,12 @@ abstract class AbstractOpenTelemetryAppenderTest {
.hasResource(resource)
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("log message 1")
.hasAttributes(Attributes.empty()));
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId())));
}
@Test
@ -123,6 +128,9 @@ abstract class AbstractOpenTelemetryAppenderTest {
.hasSeverity(Severity.INFO)
.hasSeverityText("INFO")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME, Thread.currentThread().getName()),
equalTo(ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(
ExceptionAttributes.EXCEPTION_TYPE,
IllegalStateException.class.getName()),
@ -158,7 +166,13 @@ abstract class AbstractOpenTelemetryAppenderTest {
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("log message 1")
.hasAttributesSatisfyingExactly(
equalTo(stringKey("key1"), "val1"), equalTo(stringKey("key2"), "val2")));
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("key1"), "val1"),
equalTo(stringKey("key2"), "val2")));
}
@Test
@ -177,6 +191,11 @@ abstract class AbstractOpenTelemetryAppenderTest {
.hasResource(resource)
.hasInstrumentationScope(instrumentationScopeInfo)
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}
@ -198,6 +217,11 @@ abstract class AbstractOpenTelemetryAppenderTest {
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("val2")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1")));
}
@ -233,6 +257,11 @@ abstract class AbstractOpenTelemetryAppenderTest {
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("a message")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}

View File

@ -10,9 +10,11 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equal
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes;
import org.apache.logging.log4j.message.StringMapMessage;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -43,8 +45,16 @@ class LogReplayOpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTe
OpenTelemetryAppender.install(testing.getOpenTelemetry());
}
private static boolean isAsyncLogger() {
return logger.getClass().getName().contains("AsyncLogger");
}
@Test
void twoLogs() {
// with async logger OpenTelemetryAppender.install may be called before second log message is
// captured, so we get 2 log records instead of the expected 1
Assumptions.assumeFalse(isAsyncLogger());
logger.info("log message 1");
logger.info(
"log message 2"); // Won't be instrumented because cache size is 1 (see log4j2.xml file)
@ -61,6 +71,10 @@ class LogReplayOpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTe
@Test
void twoLogsStringMapMessage() {
// with async logger OpenTelemetryAppender.install may be called before second log message is
// captured, so we get 2 log records instead of the expected 1
Assumptions.assumeFalse(isAsyncLogger());
StringMapMessage message = new StringMapMessage();
message.put("key1", "val1");
message.put("key2", "val2");
@ -81,12 +95,19 @@ class LogReplayOpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTe
.hasResource(resource)
.hasInstrumentationScope(instrumentationScopeInfo)
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME, Thread.currentThread().getName()),
equalTo(ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}
@Test
void twoLogsStructuredDataMessage() {
// with async logger OpenTelemetryAppender.install may be called before second log message is
// captured, so we get 2 log records instead of the expected 1
Assumptions.assumeFalse(isAsyncLogger());
StructuredDataMessage message = new StructuredDataMessage("an id", "a message", "a type");
message.put("key1", "val1");
message.put("key2", "val2");
@ -107,6 +128,9 @@ class LogReplayOpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTe
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("a message")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME, Thread.currentThread().getName()),
equalTo(ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}

View File

@ -5,9 +5,12 @@
package io.opentelemetry.instrumentation.log4j.appender.v2_17;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -30,7 +33,7 @@ class OpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTest {
}
@Test
void logWithSpan() { // Does not work for log replay but it is not likely to occur because
void logWithSpan() { // Does not work for log replay, but it is not likely to occur because
// the log replay is related to the case where an OpenTelemetry object is not yet available
// at the time the log is executed (and if no OpenTelemetry is available, the context
// propagation can't happen)
@ -44,6 +47,13 @@ class OpenTelemetryAppenderTest extends AbstractOpenTelemetryAppenderTest {
executeAfterLogsExecution();
testing.waitAndAssertLogRecords(logRecord -> logRecord.hasSpanContext(span1.getSpanContext()));
testing.waitAndAssertLogRecords(
logRecord ->
logRecord
.hasSpanContext(span1.getSpanContext())
.hasAttributesSatisfying(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME, Thread.currentThread().getName()),
equalTo(ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId())));
}
}

View File

@ -180,12 +180,12 @@ class LogEventMapperTest {
@Override
@Nullable
public Object getValue(Map<String, String> contextData, String key) {
public String getValue(Map<String, String> contextData, String key) {
return contextData.get(key);
}
@Override
public void forEach(Map<String, String> contextData, BiConsumer<String, Object> action) {
public void forEach(Map<String, String> contextData, BiConsumer<String, String> action) {
contextData.forEach(action);
}
}

View File

@ -6,7 +6,7 @@
pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} traceId: %X{trace_id} spanId: %X{span_id} flags: %X{trace_flags} - %msg%n"/>
</Console>
<!-- TODO run tests both with and without experimental log attributes -->
<OpenTelemetry name="OpenTelemetryAppender" numLogsCapturedBeforeOtelInstall="1" captureMapMessageAttributes="true" captureMarkerAttribute="true" captureContextDataAttributes="*"/>
<OpenTelemetry name="OpenTelemetryAppender" numLogsCapturedBeforeOtelInstall="1" captureMapMessageAttributes="true" captureMarkerAttribute="true" captureContextDataAttributes="*" captureExperimentalAttributes="true"/>
</Appenders>
<Loggers>
<Logger name="TestLogger" level="All">

View File

@ -10,8 +10,8 @@ import io.opentelemetry.api.baggage.BaggageEntry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.incubator.log.LoggingContextConstants;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import io.opentelemetry.instrumentation.log4j.contextdata.v2_17.internal.ContextDataKeys;
import io.opentelemetry.javaagent.bootstrap.internal.ConfiguredResourceAttributesHolder;
import java.util.Collections;
import java.util.HashMap;
@ -25,15 +25,6 @@ import org.apache.logging.log4j.core.util.ContextDataProvider;
public class OpenTelemetryContextDataProvider implements ContextDataProvider {
private static final boolean BAGGAGE_ENABLED =
ConfigPropertiesUtil.getBoolean("otel.instrumentation.log4j-context-data.add-baggage", false);
private static final String TRACE_ID_KEY =
ConfigPropertiesUtil.getString(
"otel.instrumentation.common.logging.trace-id", LoggingContextConstants.TRACE_ID);
private static final String SPAN_ID_KEY =
ConfigPropertiesUtil.getString(
"otel.instrumentation.common.logging.span-id", LoggingContextConstants.SPAN_ID);
private static final String TRACE_FLAGS_KEY =
ConfigPropertiesUtil.getString(
"otel.instrumentation.common.logging.trace-flags", LoggingContextConstants.TRACE_FLAGS);
private static final boolean configuredResourceAttributeAccessible =
isConfiguredResourceAttributeAccessible();
private static final Map<String, String> staticContextData = getStaticContextData();
@ -76,13 +67,11 @@ public class OpenTelemetryContextDataProvider implements ContextDataProvider {
return staticContextData;
}
Map<String, String> contextData = new HashMap<>();
contextData.putAll(staticContextData);
Map<String, String> contextData = new HashMap<>(staticContextData);
SpanContext spanContext = currentSpan.getSpanContext();
contextData.put(TRACE_ID_KEY, spanContext.getTraceId());
contextData.put(SPAN_ID_KEY, spanContext.getSpanId());
contextData.put(TRACE_FLAGS_KEY, spanContext.getTraceFlags().asHex());
contextData.put(ContextDataKeys.TRACE_ID_KEY, spanContext.getTraceId());
contextData.put(ContextDataKeys.SPAN_ID_KEY, spanContext.getSpanId());
contextData.put(ContextDataKeys.TRACE_FLAGS_KEY, spanContext.getTraceFlags().asHex());
if (BAGGAGE_ENABLED) {
Baggage baggage = Baggage.fromContext(context);

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.log4j.contextdata.v2_17.internal;
import io.opentelemetry.instrumentation.api.incubator.log.LoggingContextConstants;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class ContextDataKeys {
public static final String TRACE_ID_KEY =
ConfigPropertiesUtil.getString(
"otel.instrumentation.common.logging.trace-id", LoggingContextConstants.TRACE_ID);
public static final String SPAN_ID_KEY =
ConfigPropertiesUtil.getString(
"otel.instrumentation.common.logging.span-id", LoggingContextConstants.SPAN_ID);
public static final String TRACE_FLAGS_KEY =
ConfigPropertiesUtil.getString(
"otel.instrumentation.common.logging.trace-flags", LoggingContextConstants.TRACE_FLAGS);
private ContextDataKeys() {}
}