add logs filtering (#1823)

This commit is contained in:
oliver zhang 2025-04-10 03:08:45 +08:00 committed by GitHub
parent b3fbfe3888
commit e009336016
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 150 additions and 0 deletions

View File

@ -28,6 +28,10 @@ logger_provider:
- event_to_span_event_bridge:
```
## Filtering Log Processor
`FilteringLogRecordProcessor` is a `LogRecordProcessor` that only keep logs based on a predicate
## Component owners
- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic

View File

@ -24,4 +24,5 @@ dependencies {
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
testImplementation("io.opentelemetry:opentelemetry-exporter-logging")
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.filter;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.util.function.Predicate;
public class FilteringLogRecordProcessor implements LogRecordProcessor {
public final LogRecordProcessor delegate;
public final Predicate<LogRecordData> predicate;
public FilteringLogRecordProcessor(
LogRecordProcessor delegate, Predicate<LogRecordData> predicate) {
this.delegate = delegate;
this.predicate = predicate;
}
@Override
public void onEmit(Context context, ReadWriteLogRecord readWriteLogRecord) {
if (predicate.test(readWriteLogRecord.toLogRecordData())) {
delegate.onEmit(context, readWriteLogRecord);
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.filter;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdk;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.SdkLoggerProviderBuilder;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
public class FilteringLogRecordProcessorTest {
private final InMemoryLogRecordExporter memoryLogRecordExporter =
InMemoryLogRecordExporter.create();
;
private final LogRecordProcessor logRecordProcessor =
SimpleLogRecordProcessor.create(memoryLogRecordExporter);
;
private final InMemorySpanExporter spansExporter = InMemorySpanExporter.create();
private AutoConfiguredOpenTelemetrySdkBuilder sdkBuilder;
private Logger logger;
@BeforeEach
void setUp() {
sdkBuilder = AutoConfiguredOpenTelemetrySdk.builder();
sdkBuilder
.addPropertiesSupplier(
() -> {
Map<String, String> configMap = new HashMap<>();
configMap.put("otel.metrics.exporter", "none");
configMap.put("otel.traces.exporter", "logging");
configMap.put("otel.logs.exporter", "logging");
return configMap;
})
.addSpanExporterCustomizer((exporter, c) -> spansExporter)
.addLogRecordExporterCustomizer(
(logRecordExporter, configProperties) -> memoryLogRecordExporter)
.addLoggerProviderCustomizer(
new BiFunction<SdkLoggerProviderBuilder, ConfigProperties, SdkLoggerProviderBuilder>() {
@Override
public SdkLoggerProviderBuilder apply(
SdkLoggerProviderBuilder sdkLoggerProviderBuilder,
ConfigProperties configProperties) {
return sdkLoggerProviderBuilder.addLogRecordProcessor(
new FilteringLogRecordProcessor(
logRecordProcessor,
logRecordData -> logRecordData.getSpanContext().isSampled()));
}
});
logger =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
new FilteringLogRecordProcessor(
logRecordProcessor,
logRecordData -> {
SpanContext spanContext = logRecordData.getSpanContext();
return spanContext.isSampled();
}) {})
.build()
.get("TestScope");
}
@Test
void verifyLogFilteringExistSpanContext() {
try (OpenTelemetrySdk sdk = sdkBuilder.build().getOpenTelemetrySdk()) {
Tracer tracer = sdk.getTracer("test");
Span span = tracer.spanBuilder("test").startSpan();
sdk.getLogsBridge().get("test").logRecordBuilder().setBody("One Log").emit();
List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(1, finishedLogRecordItems.size());
try (Scope scope = span.makeCurrent()) {
} finally {
span.end();
}
List<SpanData> finishedSpans = spansExporter.getFinishedSpanItems();
assertEquals(1, finishedSpans.size());
}
}
@Test
void verifyFilteringNotExitSpanContext() {
logger.logRecordBuilder().setBody("One Log").emit();
List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(0, finishedLogRecordItems.size());
}
}