Signals processing (#1014)

This commit is contained in:
LikeTheSalad 2023-09-08 00:56:55 +02:00 committed by GitHub
parent 7e4637a56a
commit d56ea370f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 607 additions and 0 deletions

View File

@ -47,6 +47,9 @@ components:
- HaloFour
noop-api:
- jack-berg
processors:
- LikeTheSalad
- breedx-splk
prometheus-collector:
- jkwatson
resource-providers:

10
processors/README.md Normal file
View File

@ -0,0 +1,10 @@
# Processors
This module provides tools to intercept and process signals globally.
## Component owners
- [Cesar Munoz](https://github.com/LikeTheSalad), Elastic
- [Jason Plumb](https://github.com/breedx-splk), Splunk
Learn more about component owners in [component_owners.yml](../.github/component_owners.yml).

View File

@ -0,0 +1,17 @@
plugins {
id("otel.java-conventions")
id("otel.publish-conventions")
}
description = "Tools to intercept and process signals globally."
otelJava.moduleName.set("io.opentelemetry.contrib.processors")
java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
dependencies {
api("io.opentelemetry:opentelemetry-sdk")
testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
}

View File

@ -0,0 +1,39 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor;
import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.LogRecordExporter;
import java.util.Collection;
/** Intercepts logs before delegating them to the real exporter. */
public final class InterceptableLogRecordExporter implements LogRecordExporter {
private final LogRecordExporter delegate;
private final Interceptor<LogRecordData> interceptor;
public InterceptableLogRecordExporter(
LogRecordExporter delegate, Interceptor<LogRecordData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}
@Override
public CompletableResultCode export(Collection<LogRecordData> logs) {
return delegate.export(interceptor.interceptAll(logs));
}
@Override
public CompletableResultCode flush() {
return delegate.flush();
}
@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor;
import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.util.Collection;
/** Intercepts metrics before delegating them to the real exporter. */
public final class InterceptableMetricExporter implements MetricExporter {
private final MetricExporter delegate;
private final Interceptor<MetricData> interceptor;
public InterceptableMetricExporter(MetricExporter delegate, Interceptor<MetricData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}
@Override
public CompletableResultCode export(Collection<MetricData> metrics) {
return delegate.export(interceptor.interceptAll(metrics));
}
@Override
public CompletableResultCode flush() {
return delegate.flush();
}
@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
@Override
public AggregationTemporality getAggregationTemporality(InstrumentType instrumentType) {
return delegate.getAggregationTemporality(instrumentType);
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor;
import io.opentelemetry.contrib.interceptor.api.Interceptor;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.util.Collection;
/** Intercepts spans before delegating them to the real exporter. */
public final class InterceptableSpanExporter implements SpanExporter {
private final SpanExporter delegate;
private final Interceptor<SpanData> interceptor;
public InterceptableSpanExporter(SpanExporter delegate, Interceptor<SpanData> interceptor) {
this.delegate = delegate;
this.interceptor = interceptor;
}
@Override
public CompletableResultCode export(Collection<SpanData> spans) {
return delegate.export(interceptor.interceptAll(spans));
}
@Override
public CompletableResultCode flush() {
return delegate.flush();
}
@Override
public CompletableResultCode shutdown() {
return delegate.shutdown();
}
}

View File

@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor.api;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
/**
* Intercepts a signal before it gets exported. The signal can get updated and/or filtered out based
* on each interceptor implementation.
*/
public interface Interceptor<T> {
/**
* Intercepts a signal.
*
* @param item The signal object.
* @return The received signal modified (or null for excluding this signal from getting exported).
* If there's no operation needed to be done for a specific signal, it should be returned as
* is.
*/
@Nullable
T intercept(T item);
/** Intercepts a collection of signals. */
default Collection<T> interceptAll(Collection<T> items) {
List<T> result = new ArrayList<>();
for (T item : items) {
T intercepted = intercept(item);
if (intercepted != null) {
result.add(intercepted);
}
}
return result;
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor.common;
import io.opentelemetry.contrib.interceptor.api.Interceptor;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.annotation.Nullable;
/** Allows to run an item through a list of interceptors in the order they were added. */
public final class ComposableInterceptor<T> implements Interceptor<T> {
private final CopyOnWriteArrayList<Interceptor<T>> interceptors = new CopyOnWriteArrayList<>();
public void add(Interceptor<T> interceptor) {
interceptors.addIfAbsent(interceptor);
}
@Nullable
@Override
public T intercept(T item) {
T intercepted = item;
for (Interceptor<T> interceptor : interceptors) {
intercepted = interceptor.intercept(intercepted);
if (intercepted == null) {
break;
}
}
return intercepted;
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.contrib.interceptor.common.ComposableInterceptor;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.data.Body;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.logs.export.SimpleLogRecordProcessor;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryLogRecordExporter;
import java.util.List;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class InterceptableLogRecordExporterTest {
private InMemoryLogRecordExporter memoryLogRecordExporter;
private Logger logger;
private ComposableInterceptor<LogRecordData> interceptor;
@BeforeEach
void setUp() {
memoryLogRecordExporter = InMemoryLogRecordExporter.create();
interceptor = new ComposableInterceptor<>();
logger =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
SimpleLogRecordProcessor.create(
new InterceptableLogRecordExporter(memoryLogRecordExporter, interceptor)))
.build()
.get("TestScope");
}
@Test
void verifyLogModification() {
interceptor.add(
item -> {
ModifiableLogRecordData modified = new ModifiableLogRecordData(item);
modified.attributes.put("global.attr", "from interceptor");
return modified;
});
logger
.logRecordBuilder()
.setBody("One log")
.setAttribute(AttributeKey.stringKey("local.attr"), "local")
.emit();
List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(1, finishedLogRecordItems.size());
LogRecordData logRecordData = finishedLogRecordItems.get(0);
assertEquals(2, logRecordData.getAttributes().size());
assertEquals(
"from interceptor",
logRecordData.getAttributes().get(AttributeKey.stringKey("global.attr")));
assertEquals("local", logRecordData.getAttributes().get(AttributeKey.stringKey("local.attr")));
}
@Test
void verifyLogFiltering() {
interceptor.add(
item -> {
if (item.getBody().asString().contains("deleted")) {
return null;
}
return item;
});
logger.logRecordBuilder().setBody("One log").emit();
logger.logRecordBuilder().setBody("This log will be deleted").emit();
logger.logRecordBuilder().setBody("Another log").emit();
List<LogRecordData> finishedLogRecordItems =
memoryLogRecordExporter.getFinishedLogRecordItems();
assertEquals(2, finishedLogRecordItems.size());
assertEquals("One log", finishedLogRecordItems.get(0).getBody().asString());
assertEquals("Another log", finishedLogRecordItems.get(1).getBody().asString());
}
private static class ModifiableLogRecordData implements LogRecordData {
private final LogRecordData delegate;
private final AttributesBuilder attributes = Attributes.builder();
private ModifiableLogRecordData(LogRecordData delegate) {
this.delegate = delegate;
}
@Override
public Resource getResource() {
return delegate.getResource();
}
@Override
public InstrumentationScopeInfo getInstrumentationScopeInfo() {
return delegate.getInstrumentationScopeInfo();
}
@Override
public long getTimestampEpochNanos() {
return delegate.getTimestampEpochNanos();
}
@Override
public long getObservedTimestampEpochNanos() {
return delegate.getObservedTimestampEpochNanos();
}
@Override
public SpanContext getSpanContext() {
return delegate.getSpanContext();
}
@Override
public Severity getSeverity() {
return delegate.getSeverity();
}
@Nullable
@Override
public String getSeverityText() {
return delegate.getSeverityText();
}
@Override
public Body getBody() {
return delegate.getBody();
}
@Override
public Attributes getAttributes() {
return attributes.putAll(delegate.getAttributes()).build();
}
@Override
public int getTotalAttributeCount() {
return delegate.getTotalAttributeCount();
}
}
}

View File

@ -0,0 +1,132 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.contrib.interceptor.common.ComposableInterceptor;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.Data;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class InterceptableMetricExporterTest {
private InMemoryMetricExporter memoryMetricExporter;
private SdkMeterProvider meterProvider;
private Meter meter;
private ComposableInterceptor<MetricData> interceptor;
@BeforeEach
void setUp() {
memoryMetricExporter = InMemoryMetricExporter.create();
interceptor = new ComposableInterceptor<>();
meterProvider =
SdkMeterProvider.builder()
.registerMetricReader(
PeriodicMetricReader.create(
new InterceptableMetricExporter(memoryMetricExporter, interceptor)))
.build();
meter = meterProvider.get("TestScope");
}
@Test
void verifyMetricModification() {
interceptor.add(
item -> {
ModifiableMetricData modified = new ModifiableMetricData(item);
modified.name = "ModifiedName";
return modified;
});
meter.counterBuilder("OneCounter").build().add(1);
meterProvider.forceFlush();
List<MetricData> finishedMetricItems = memoryMetricExporter.getFinishedMetricItems();
assertEquals(1, finishedMetricItems.size());
assertEquals("ModifiedName", finishedMetricItems.get(0).getName());
}
@Test
void verifyMetricFiltering() {
interceptor.add(
item -> {
if (item.getName().contains("Deleted")) {
return null;
}
return item;
});
meter.counterBuilder("OneCounter").build().add(1);
meter.counterBuilder("DeletedCounter").build().add(1);
meter.counterBuilder("AnotherCounter").build().add(1);
meterProvider.forceFlush();
List<MetricData> finishedMetricItems = memoryMetricExporter.getFinishedMetricItems();
assertEquals(2, finishedMetricItems.size());
List<String> names = new ArrayList<>();
for (MetricData item : finishedMetricItems) {
names.add(item.getName());
}
assertThat(names).containsExactlyInAnyOrder("OneCounter", "AnotherCounter");
}
private static class ModifiableMetricData implements MetricData {
private final MetricData delegate;
private String name;
private ModifiableMetricData(MetricData delegate) {
this.delegate = delegate;
}
@Override
public Resource getResource() {
return delegate.getResource();
}
@Override
public InstrumentationScopeInfo getInstrumentationScopeInfo() {
return delegate.getInstrumentationScopeInfo();
}
@Override
public String getName() {
if (name != null) {
return name;
}
return delegate.getName();
}
@Override
public String getDescription() {
return delegate.getDescription();
}
@Override
public String getUnit() {
return delegate.getUnit();
}
@Override
public MetricDataType getType() {
return delegate.getType();
}
@Override
public Data<?> getData() {
return delegate.getData();
}
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.contrib.interceptor;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.contrib.interceptor.common.ComposableInterceptor;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.DelegatingSpanData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class InterceptableSpanExporterTest {
private InMemorySpanExporter memorySpanExporter;
private Tracer tracer;
private ComposableInterceptor<SpanData> interceptor;
@BeforeEach
void setUp() {
memorySpanExporter = InMemorySpanExporter.create();
interceptor = new ComposableInterceptor<>();
tracer =
SdkTracerProvider.builder()
.addSpanProcessor(
SimpleSpanProcessor.create(
new InterceptableSpanExporter(memorySpanExporter, interceptor)))
.build()
.get("TestScope");
}
@Test
void verifySpanModification() {
interceptor.add(
item -> {
ModifiableSpanData modified = new ModifiableSpanData(item);
modified.attributes.put("global.attr", "from interceptor");
return modified;
});
tracer.spanBuilder("Test span").setAttribute("local.attr", 10).startSpan().end();
List<SpanData> finishedSpanItems = memorySpanExporter.getFinishedSpanItems();
assertEquals(1, finishedSpanItems.size());
SpanData spanData = finishedSpanItems.get(0);
assertEquals(2, spanData.getAttributes().size());
assertEquals(
"from interceptor", spanData.getAttributes().get(AttributeKey.stringKey("global.attr")));
assertEquals(10, spanData.getAttributes().get(AttributeKey.longKey("local.attr")));
}
@Test
void verifySpanFiltering() {
interceptor.add(
item -> {
if (item.getName().contains("deleted")) {
return null;
}
return item;
});
tracer.spanBuilder("One span").startSpan().end();
tracer.spanBuilder("This will get deleted").startSpan().end();
tracer.spanBuilder("Another span").startSpan().end();
List<SpanData> finishedSpanItems = memorySpanExporter.getFinishedSpanItems();
assertEquals(2, finishedSpanItems.size());
assertEquals("One span", finishedSpanItems.get(0).getName());
assertEquals("Another span", finishedSpanItems.get(1).getName());
}
private static class ModifiableSpanData extends DelegatingSpanData {
private final AttributesBuilder attributes = Attributes.builder();
protected ModifiableSpanData(SpanData delegate) {
super(delegate);
}
@Override
public Attributes getAttributes() {
return attributes.putAll(super.getAttributes()).build();
}
}
}

View File

@ -76,6 +76,7 @@ include(":jmx-metrics")
include(":maven-extension")
include(":micrometer-meter-provider")
include(":noop-api")
include(":processors")
include(":prometheus-client-bridge")
include(":resource-providers")
include(":runtime-attach:runtime-attach")