Add R2dbc statement javaagent instrumentation (#7977)

This PR resolves #2515 .
It adds javaagent instrumentation for
[r2dbc-spi](https://github.com/r2dbc/r2dbc-spi) >= v1.0

As suggested by @mp911de in
https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2515#issuecomment-1141723561
I used the [r2dbc-proxy](https://github.com/r2dbc/r2dbc-proxy)
`ProxyConnectionFactory` to intercept Database query executions and
create according spans.

Example span from example project using
[spring-boot-starter-data-r2dbc](https://github.com/spring-projects/spring-data-relational)
Application:

![r2dbc-example](https://user-images.githubusercontent.com/39240633/222902361-a3878ecd-a8d0-4d33-b0c0-e410d908e05e.png)

---------

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
Phil 2023-03-23 12:52:53 +01:00 committed by GitHub
parent bd3117b165
commit a6bc3b197d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1055 additions and 0 deletions

View File

@ -96,6 +96,7 @@ These are the supported libraries and frameworks:
| [Play](https://github.com/playframework/playframework) | 2.4+ | N/A | [HTTP Client Spans], [HTTP Client Metrics], Provides `http.route` [2] |
| [Play WS](https://github.com/playframework/play-ws) | 1.0+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |
| [Quartz](https://www.quartz-scheduler.org/) | 2.0+ | [opentelemetry-quartz-2.0](../instrumentation/quartz-2.0/library) | none |
| [R2DBC](https://r2dbc.io/) | 1.0+ | [opentelemetry-r2dbc-1.0](../instrumentation/r2dbc-1.0/library) | [Database Client Spans] |
| [RabbitMQ Client](https://github.com/rabbitmq/rabbitmq-java-client) | 2.7+ | N/A | [Messaging Spans] |
| [Ratpack](https://github.com/ratpack/ratpack) | 1.4+ | [opentelemetry-ratpack-1.7](../instrumentation/ratpack/ratpack-1.7/library) | [HTTP Client Spans], [HTTP Client Metrics], [HTTP Server Spans], [HTTP Server Metrics] |
| [Reactor](https://github.com/reactor/reactor-core) | 3.1+ | [opentelemetry-reactor-3.1](../instrumentation/reactor/reactor-3.1/library) | Context propagation |

View File

@ -0,0 +1,31 @@
plugins {
id("otel.javaagent-instrumentation")
id("otel.java-conventions")
}
muzzle {
pass {
group.set("io.r2dbc")
module.set("r2dbc-spi")
versions.set("[1.0.0.RELEASE,)")
extraDependency("io.projectreactor:reactor-core:3.4.12")
}
}
sourceSets {
main {
val shadedDep = project(":instrumentation:r2dbc-1.0:library-instrumentation-shaded")
output.dir(
shadedDep.file("build/extracted/shadow"),
"builtBy" to ":instrumentation:r2dbc-1.0:library-instrumentation-shaded:extractShadowJar",
)
}
}
dependencies {
library("io.r2dbc:r2dbc-spi:1.0.0.RELEASE")
compileOnly(project(path = ":instrumentation:r2dbc-1.0:library-instrumentation-shaded", configuration = "shadow"))
testImplementation(project(":instrumentation:r2dbc-1.0:testing"))
testInstrumentation(project(":instrumentation:reactor:reactor-3.1:javaagent"))
}

View File

@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
class R2dbcInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("io.r2dbc.spi.ConnectionFactories");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(named("find"))
.and(takesArguments(1))
.and(takesArgument(0, named("io.r2dbc.spi.ConnectionFactoryOptions"))),
this.getClass().getName() + "$FactoryAdvice");
}
@SuppressWarnings("unused")
public static class FactoryAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
@Advice.Return(readOnly = false) ConnectionFactory factory,
@Advice.Argument(0) ConnectionFactoryOptions factoryOptions) {
if (factory != null) {
factory = R2dbcSingletons.telemetry().wrapConnectionFactory(factory, factoryOptions);
}
}
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Collections;
import java.util.List;
@AutoService(InstrumentationModule.class)
public class R2dbcInstrumentationModule extends InstrumentationModule {
public R2dbcInstrumentationModule() {
super("r2dbc", "r2dbc-1.0");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new R2dbcInstrumentation());
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.net.PeerServiceAttributesExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.CommonConfig;
import io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal.R2dbcNetAttributesGetter;
public final class R2dbcSingletons {
private static final R2dbcTelemetry TELEMETRY =
R2dbcTelemetry.builder(GlobalOpenTelemetry.get())
.setStatementSanitizationEnabled(CommonConfig.get().isStatementSanitizationEnabled())
.addAttributeExtractor(
PeerServiceAttributesExtractor.create(
R2dbcNetAttributesGetter.INSTANCE, CommonConfig.get().getPeerServiceMapping()))
.build();
public static R2dbcTelemetry telemetry() {
return TELEMETRY;
}
private R2dbcSingletons() {}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class R2dbcStatementTest extends AbstractR2dbcStatementTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension getTesting() {
return testing;
}
}

View File

@ -0,0 +1,33 @@
plugins {
id("com.github.johnrengelman.shadow")
id("otel.java-conventions")
}
group = "io.opentelemetry.javaagent.instrumentation"
dependencies {
implementation(project(":instrumentation:r2dbc-1.0:library"))
}
tasks {
shadowJar {
exclude("META-INF/**/*")
dependencies {
// including only :r2dbc-1.0:library excludes its transitive dependencies
include(project(":instrumentation:r2dbc-1.0:library"))
include(dependency("io.r2dbc:r2dbc-proxy"))
}
relocate(
"io.r2dbc.proxy",
"io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.shaded.io.r2dbc.proxy"
)
}
val extractShadowJar by registering(Copy::class) {
dependsOn(shadowJar)
from(zipTree(shadowJar.get().archiveFile))
into("build/extracted/shadow")
}
}

View File

@ -0,0 +1,52 @@
# Library Instrumentation for R2dbc version 1.0 and higher
Provides OpenTelemetry instrumentation for [R2dbc](https://r2dbc.io/).
## Quickstart
### Add these dependencies to your project
Replace `OPENTELEMETRY_VERSION` with the [latest
release](https://search.maven.org/search?q=g:io.opentelemetry.instrumentation%20AND%20a:opentelemetry-r2dbc-1.0).
For Maven, add to your `pom.xml` dependencies:
```xml
<dependencies>
<dependency>
<groupId>io.opentelemetry.instrumentation</groupId>
<artifactId>opentelemetry-r2dbc-1.0</artifactId>
<version>OPENTELEMETRY_VERSION</version>
</dependency>
</dependencies>
```
For Gradle, add to your dependencies:
```groovy
implementation("io.opentelemetry.instrumentation:opentelemetry-r2dbc-1.0:OPENTELEMETRY_VERSION")
```
### Usage
The instrumentation library provides a R2dbc `ProxyConnectionFactory` that gets wrapped around the original
`ConnectionFactory`.
```java
ConnectionFactory wrapWithProxyFactory(OpenTelemetry openTelemetry, ConnectionFactory originalFactory, ConnectionFactoryOptions factoryOptions) {
return R2dbcTelemetryBuilder
.create(openTelemetry)
.wrapConnectionFactory(originalFactory, factoryOptions);
}
```
If you use R2dbc in a Spring application you can wrap the `ConnectionFactory` in the `ConnectionFactoryInitializer` Bean:
```java
@Bean
ConnectionFactoryInitializer initializer(OpenTelemetry openTelemetry, ConnectionFactory connectionFactory) {
ConnectionFactoryInitializer initializer = new ConnectionFactoryInitializer();
ConnectionFactoryOptions factoryOptions = ConnectionFactoryOptions.parse("r2dbc:mariadb://localhost:3306/db");
initializer.setConnectionFactory(wrapWithProxyFactory(openTelemetry, connectionFactory, factoryOptions));
return initializer;
}
```

View File

@ -0,0 +1,11 @@
plugins {
id("otel.library-instrumentation")
}
dependencies {
library("io.r2dbc:r2dbc-spi:1.0.0.RELEASE")
implementation("io.r2dbc:r2dbc-proxy:1.0.1.RELEASE")
testImplementation(project(":instrumentation:r2dbc-1.0:testing"))
testImplementation(project(":instrumentation:reactor:reactor-3.1:library"))
}

View File

@ -0,0 +1,43 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal.DbExecution;
import io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal.TraceProxyListener;
import io.r2dbc.proxy.ProxyConnectionFactory;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
/** Entrypoint for instrumenting R2dbc. */
public final class R2dbcTelemetry {
/** Returns a new {@link R2dbcTelemetry} configured with the given {@link OpenTelemetry}. */
public static R2dbcTelemetry create(OpenTelemetry openTelemetry) {
return builder(openTelemetry).build();
}
/**
* Returns a new {@link R2dbcTelemetryBuilder} configured with the given {@link OpenTelemetry}.
*/
public static R2dbcTelemetryBuilder builder(OpenTelemetry openTelemetry) {
return new R2dbcTelemetryBuilder(openTelemetry);
}
private final Instrumenter<DbExecution, Void> instrumenter;
R2dbcTelemetry(Instrumenter<DbExecution, Void> instrumenter) {
this.instrumenter = instrumenter;
}
public ConnectionFactory wrapConnectionFactory(
ConnectionFactory originalFactory, ConnectionFactoryOptions factoryOptions) {
return ProxyConnectionFactory.builder(originalFactory)
.listener(new TraceProxyListener(instrumenter, factoryOptions))
.build();
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal.DbExecution;
import io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal.R2dbcInstrumenterBuilder;
/** A builder of {@link R2dbcTelemetry}. */
public final class R2dbcTelemetryBuilder {
private final R2dbcInstrumenterBuilder instrumenterBuilder;
private boolean statementSanitizationEnabled = true;
R2dbcTelemetryBuilder(OpenTelemetry openTelemetry) {
instrumenterBuilder = new R2dbcInstrumenterBuilder(openTelemetry);
}
@CanIgnoreReturnValue
public R2dbcTelemetryBuilder addAttributeExtractor(
AttributesExtractor<DbExecution, Void> attributesExtractor) {
instrumenterBuilder.addAttributeExtractor(attributesExtractor);
return this;
}
/**
* Sets whether the {@code db.statement} attribute on the spans emitted by the constructed {@link
* R2dbcTelemetry} should be sanitized. If set to {@code true}, all parameters that can
* potentially contain sensitive information will be masked. Enabled by default.
*/
@CanIgnoreReturnValue
public R2dbcTelemetryBuilder setStatementSanitizationEnabled(boolean enabled) {
this.statementSanitizationEnabled = enabled;
return this;
}
/**
* Returns a new {@link R2dbcTelemetry} with the settings of this {@link R2dbcTelemetryBuilder}.
*/
public R2dbcTelemetry build() {
return new R2dbcTelemetry(instrumenterBuilder.build(statementSanitizationEnabled));
}
}

View File

@ -0,0 +1,134 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal;
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
import static io.r2dbc.spi.ConnectionFactoryOptions.PROTOCOL;
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
import io.opentelemetry.context.Context;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import io.r2dbc.proxy.core.QueryExecutionInfo;
import io.r2dbc.proxy.core.QueryInfo;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactoryOptions;
import java.util.Locale;
import java.util.stream.Collectors;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class DbExecution {
private final String system;
private final String user;
private final String name;
private final String host;
private final Integer port;
private final String connectionString;
private final String rawStatement;
private Context context;
public DbExecution(QueryExecutionInfo queryInfo, ConnectionFactoryOptions factoryOptions) {
Connection originalConnection = queryInfo.getConnectionInfo().getOriginalConnection();
this.system =
originalConnection != null
? originalConnection
.getMetadata()
.getDatabaseProductName()
.toLowerCase(Locale.ROOT)
.split(" ")[0]
: SemanticAttributes.DbSystemValues.OTHER_SQL;
this.user = factoryOptions.hasOption(USER) ? (String) factoryOptions.getValue(USER) : null;
this.name =
factoryOptions.hasOption(DATABASE)
? ((String) factoryOptions.getValue(DATABASE)).toLowerCase(Locale.ROOT)
: null;
String driver =
factoryOptions.hasOption(DRIVER) ? (String) factoryOptions.getValue(DRIVER) : null;
String protocol =
factoryOptions.hasOption(PROTOCOL) ? (String) factoryOptions.getValue(PROTOCOL) : null;
this.host = factoryOptions.hasOption(HOST) ? (String) factoryOptions.getValue(HOST) : null;
this.port = factoryOptions.hasOption(PORT) ? (Integer) factoryOptions.getValue(PORT) : null;
this.connectionString =
String.format(
"%s%s:%s%s",
driver != null ? driver : "",
protocol != null ? ":" + protocol : "",
host != null ? "//" + host : "",
port != null ? ":" + port : "");
this.rawStatement =
queryInfo.getQueries().stream().map(QueryInfo::getQuery).collect(Collectors.joining(";\n"));
}
public Integer getPort() {
return port;
}
public String getHost() {
return host;
}
public String getSystem() {
return system;
}
public String getUser() {
return user;
}
public String getName() {
return name;
}
public String getConnectionString() {
return connectionString;
}
public String getRawStatement() {
return rawStatement;
}
public Context getContext() {
return context;
}
public void setContext(Context context) {
this.context = context;
}
@Override
public String toString() {
return "DbExecution{"
+ "system='"
+ system
+ '\''
+ ", user='"
+ user
+ '\''
+ ", name='"
+ name
+ '\''
+ ", host='"
+ host
+ '\''
+ ", port="
+ port
+ ", connectionString='"
+ connectionString
+ '\''
+ ", rawStatement='"
+ rawStatement
+ '\''
+ ", context="
+ context
+ '}';
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.DbClientSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor;
import java.util.ArrayList;
import java.util.List;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class R2dbcInstrumenterBuilder {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.r2dbc-1.0";
private final OpenTelemetry openTelemetry;
private final List<AttributesExtractor<DbExecution, Void>> additionalExtractors =
new ArrayList<>();
public R2dbcInstrumenterBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
}
@CanIgnoreReturnValue
public R2dbcInstrumenterBuilder addAttributeExtractor(
AttributesExtractor<DbExecution, Void> attributesExtractor) {
additionalExtractors.add(attributesExtractor);
return this;
}
public Instrumenter<DbExecution, Void> build(boolean statementSanitizationEnabled) {
return Instrumenter.<DbExecution, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
DbClientSpanNameExtractor.create(R2dbcSqlAttributesGetter.INSTANCE))
.addAttributesExtractor(
SqlClientAttributesExtractor.builder(R2dbcSqlAttributesGetter.INSTANCE)
.setStatementSanitizationEnabled(statementSanitizationEnabled)
.build())
.addAttributesExtractor(
NetClientAttributesExtractor.create(R2dbcNetAttributesGetter.INSTANCE))
.buildInstrumenter(SpanKindExtractor.alwaysClient());
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal;
import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesGetter;
import javax.annotation.Nullable;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public enum R2dbcNetAttributesGetter implements NetClientAttributesGetter<DbExecution, Void> {
INSTANCE;
@Nullable
@Override
public String getTransport(DbExecution request, @Nullable Void unused) {
return null;
}
@Nullable
@Override
public String getPeerName(DbExecution request) {
return request.getHost();
}
@Nullable
@Override
public Integer getPeerPort(DbExecution request) {
return request.getPort();
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal;
import io.opentelemetry.instrumentation.api.instrumenter.db.SqlClientAttributesGetter;
import javax.annotation.Nullable;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public enum R2dbcSqlAttributesGetter implements SqlClientAttributesGetter<DbExecution> {
INSTANCE;
@Override
public String getSystem(DbExecution request) {
return request.getSystem();
}
@Override
@Nullable
public String getUser(DbExecution request) {
return request.getUser();
}
@Override
@Nullable
public String getName(DbExecution request) {
return request.getName();
}
@Override
@Nullable
public String getConnectionString(DbExecution request) {
return request.getConnectionString();
}
@Override
@Nullable
public String getRawStatement(DbExecution request) {
return request.getRawStatement();
}
}

View File

@ -0,0 +1,49 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.r2dbc.proxy.core.QueryExecutionInfo;
import io.r2dbc.proxy.listener.ProxyMethodExecutionListener;
import io.r2dbc.spi.ConnectionFactoryOptions;
/**
* This class is internal and is hence not for public use. Its APIs are unstable and can change at
* any time.
*/
public final class TraceProxyListener implements ProxyMethodExecutionListener {
private static final String KEY_DB_EXECUTION = "dbExecution";
private final Instrumenter<DbExecution, Void> instrumenter;
private final ConnectionFactoryOptions factoryOptions;
public TraceProxyListener(
Instrumenter<DbExecution, Void> instrumenter, ConnectionFactoryOptions factoryOptions) {
this.instrumenter = instrumenter;
this.factoryOptions = factoryOptions;
}
@Override
public void beforeQuery(QueryExecutionInfo queryInfo) {
Context parentContext = Context.current();
DbExecution dbExecution = new DbExecution(queryInfo, factoryOptions);
if (!instrumenter.shouldStart(parentContext, dbExecution)) {
return;
}
dbExecution.setContext(instrumenter.start(parentContext, dbExecution));
queryInfo.getValueStore().put(KEY_DB_EXECUTION, dbExecution);
}
@Override
public void afterQuery(QueryExecutionInfo queryInfo) {
DbExecution dbExecution = (DbExecution) queryInfo.getValueStore().get(KEY_DB_EXECUTION);
if (dbExecution != null && dbExecution.getContext() != null) {
instrumenter.end(dbExecution.getContext(), dbExecution, null, queryInfo.getThrowable());
}
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;
import io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal.DbExecution;
import io.r2dbc.proxy.core.QueryExecutionInfo;
import io.r2dbc.proxy.core.QueryInfo;
import io.r2dbc.proxy.test.MockConnectionInfo;
import io.r2dbc.proxy.test.MockQueryExecutionInfo;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.ConnectionMetadata;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class DbExecutionTest {
@Mock Connection connection;
@Mock ConnectionMetadata metadata;
@Test
void dbExecution() {
when(connection.getMetadata()).thenReturn(metadata);
when(metadata.getDatabaseProductName()).thenReturn("testdb");
QueryExecutionInfo queryExecutionInfo =
MockQueryExecutionInfo.builder()
.queryInfo(new QueryInfo("SELECT * from person where last_name = 'tom'"))
.connectionInfo(MockConnectionInfo.builder().originalConnection(connection).build())
.build();
ConnectionFactoryOptions factoryOptions =
ConnectionFactoryOptions.parse("r2dbc:mariadb://root:root@localhost:3306/db");
io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0.internal.DbExecution dbExecution =
new DbExecution(queryExecutionInfo, factoryOptions);
assertEquals("testdb", dbExecution.getSystem());
assertEquals("root", dbExecution.getUser());
assertEquals("db", dbExecution.getName());
assertEquals("localhost", dbExecution.getHost());
assertEquals(3306, dbExecution.getPort());
assertEquals("mariadb://localhost:3306", dbExecution.getConnectionString());
assertEquals("SELECT * from person where last_name = 'tom'", dbExecution.getRawStatement());
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import io.opentelemetry.instrumentation.reactor.v3_1.ContextPropagationOperator;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
class R2dbcStatementTest extends AbstractR2dbcStatementTest {
@RegisterExtension
private static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
private final ContextPropagationOperator tracingOperator = ContextPropagationOperator.create();
@Override
protected InstrumentationExtension getTesting() {
return testing;
}
@BeforeAll
void setup() {
tracingOperator.registerOnEachOperator();
}
@AfterAll
void stop() {
tracingOperator.resetOnEachOperator();
}
@Override
protected ConnectionFactory createProxyConnectionFactory(
ConnectionFactoryOptions connectionFactoryOptions) {
return R2dbcTelemetry.create(testing.getOpenTelemetry())
.wrapConnectionFactory(
super.createProxyConnectionFactory(connectionFactoryOptions), connectionFactoryOptions);
}
}

View File

@ -0,0 +1,17 @@
plugins {
id("otel.java-conventions")
}
dependencies {
implementation(project(":testing-common"))
implementation("io.r2dbc:r2dbc-spi:1.0.0.RELEASE")
implementation(project(":instrumentation-api-semconv"))
implementation("org.testcontainers:junit-jupiter")
compileOnly("io.projectreactor:reactor-core:3.4.12")
runtimeOnly("dev.miku:r2dbc-mysql:0.8.2.RELEASE")
runtimeOnly("org.mariadb:r2dbc-mariadb:1.1.3")
runtimeOnly("org.postgresql:r2dbc-postgresql:1.0.1.RELEASE")
}

View File

@ -0,0 +1,272 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.r2dbc.v1_0;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_CONNECTION_STRING;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_NAME;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_SQL_TABLE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_STATEMENT;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_SYSTEM;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.DB_USER;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_NAME;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.NET_PEER_PORT;
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
import static org.junit.jupiter.api.Named.named;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import reactor.core.publisher.Mono;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractR2dbcStatementTest {
private static final Logger logger = LoggerFactory.getLogger(AbstractR2dbcStatementTest.class);
protected abstract InstrumentationExtension getTesting();
private static final String USER_DB = "SA";
private static final String PW_DB = "password123";
private static final String DB = "tempdb";
private static final DbSystemProps POSTGRESQL =
new DbSystemProps("postgresql", "postgres:9.6.8", 5432)
.envVariables(
"POSTGRES_USER", USER_DB,
"POSTGRES_PASSWORD", PW_DB,
"POSTGRES_DB", DB);
private static final DbSystemProps MARIADB =
new DbSystemProps("mariadb", "mariadb:10.3.6", 3306)
.envVariables(
"MYSQL_ROOT_PASSWORD", PW_DB,
"MYSQL_USER", USER_DB,
"MYSQL_PASSWORD", PW_DB,
"MYSQL_DATABASE", DB);
private static final DbSystemProps MYSQL =
new DbSystemProps("mysql", "mysql:8.0.32", 3306)
.envVariables(
"MYSQL_ROOT_PASSWORD", PW_DB,
"MYSQL_USER", USER_DB,
"MYSQL_PASSWORD", PW_DB,
"MYSQL_DATABASE", DB);
private static final Map<String, DbSystemProps> SYSTEMS = new HashMap<>();
static {
SYSTEMS.put(POSTGRESQL.system, POSTGRESQL);
SYSTEMS.put(MYSQL.system, MYSQL);
SYSTEMS.put(MARIADB.system, MARIADB);
}
private static Integer port;
private static GenericContainer<?> container;
protected ConnectionFactory createProxyConnectionFactory(
ConnectionFactoryOptions connectionFactoryOptions) {
return ConnectionFactories.find(connectionFactoryOptions);
}
@AfterAll
void stopContainer() {
if (container != null) {
container.stop();
}
}
void startContainer(DbSystemProps props) {
if (container != null && container.getDockerImageName().equals(props.image)) {
return;
}
if (container != null) {
container.stop();
}
if (props.image != null) {
container =
new GenericContainer<>(props.image)
.withEnv(props.envVariables)
.withExposedPorts(props.port)
.withLogConsumer(new Slf4jLogConsumer(logger))
.withStartupTimeout(Duration.ofMinutes(2));
container.start();
port = container.getMappedPort(props.port);
}
}
@ParameterizedTest(name = "{index}: {0}")
@MethodSource("provideParameters")
void testQueries(Parameter parameter) {
DbSystemProps props = SYSTEMS.get(parameter.system);
startContainer(props);
ConnectionFactory connectionFactory =
createProxyConnectionFactory(
ConnectionFactoryOptions.builder()
.option(DRIVER, props.system)
.option(HOST, "localhost")
.option(PORT, port)
.option(USER, USER_DB)
.option(PASSWORD, PW_DB)
.option(DATABASE, DB)
.build());
getTesting()
.runWithSpan(
"parent",
() -> {
Mono.from(connectionFactory.create())
.flatMapMany(
connection ->
Mono.from(connection.createStatement(parameter.statement).execute())
// Subscribe to the Statement.execute()
.flatMapMany(result -> result.map((row, metadata) -> ""))
.concatWith(Mono.from(connection.close()).cast(String.class)))
.doFinally(e -> getTesting().runWithSpan("child", () -> {}))
.blockLast();
});
getTesting()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL),
span ->
span.hasName(parameter.spanName)
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(
DB_CONNECTION_STRING,
parameter.system + "://localhost:" + port),
equalTo(DB_SYSTEM, parameter.system),
equalTo(DB_NAME, DB),
equalTo(DB_USER, USER_DB),
equalTo(DB_STATEMENT, parameter.expectedStatement),
equalTo(DB_OPERATION, parameter.operation),
equalTo(DB_SQL_TABLE, parameter.table),
equalTo(NET_PEER_NAME, "localhost"),
equalTo(NET_PEER_PORT, port)),
span ->
span.hasName("child")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0))));
}
private static Stream<Arguments> provideParameters() {
return SYSTEMS.values().stream()
.flatMap(
system ->
Stream.of(
Arguments.of(
named(
system.system + " Simple Select",
new Parameter(
system.system,
"SELECT 3",
"SELECT ?",
"SELECT " + DB,
null,
"SELECT"))),
Arguments.of(
named(
system.system + " Create Table",
new Parameter(
system.system,
"CREATE TABLE person (id SERIAL PRIMARY KEY, first_name VARCHAR(255), last_name VARCHAR(255))",
"CREATE TABLE person (id SERIAL PRIMARY KEY, first_name VARCHAR(?), last_name VARCHAR(?))",
DB,
null,
null))),
Arguments.of(
named(
system.system + " Insert",
new Parameter(
system.system,
"INSERT INTO person (id, first_name, last_name) values (1, 'tom', 'johnson')",
"INSERT INTO person (id, first_name, last_name) values (?, ?, ?)",
"INSERT " + DB + ".person",
"person",
"INSERT"))),
Arguments.of(
named(
system.system + " Select from Table",
new Parameter(
system.system,
"SELECT * FROM person where first_name = 'tom'",
"SELECT * FROM person where first_name = ?",
"SELECT " + DB + ".person",
"person",
"SELECT")))));
}
private static class Parameter {
public final String system;
public final String statement;
public final String expectedStatement;
public final String spanName;
public final String table;
public final String operation;
public Parameter(
String system,
String statement,
String expectedStatement,
String spanName,
String table,
String operation) {
this.system = system;
this.statement = statement;
this.expectedStatement = expectedStatement;
this.spanName = spanName;
this.table = table;
this.operation = operation;
}
}
private static class DbSystemProps {
public final String system;
public final String image;
public final int port;
public final Map<String, String> envVariables = new HashMap<>();
public DbSystemProps(String system, String image, int port) {
this.system = system;
this.image = image;
this.port = port;
}
@CanIgnoreReturnValue
public DbSystemProps envVariables(String... keyValues) {
for (int i = 0; i < keyValues.length / 2; i++) {
envVariables.put(keyValues[2 * i], keyValues[2 * i + 1]);
}
return this;
}
}
}

View File

@ -399,6 +399,10 @@ hideFromDependabot(":instrumentation:play:play-ws:play-ws-common:testing")
hideFromDependabot(":instrumentation:quartz-2.0:javaagent")
hideFromDependabot(":instrumentation:quartz-2.0:library")
hideFromDependabot(":instrumentation:quartz-2.0:testing")
hideFromDependabot(":instrumentation:r2dbc-1.0:javaagent")
hideFromDependabot(":instrumentation:r2dbc-1.0:library")
hideFromDependabot(":instrumentation:r2dbc-1.0:library-instrumentation-shaded")
hideFromDependabot(":instrumentation:r2dbc-1.0:testing")
hideFromDependabot(":instrumentation:rabbitmq-2.7:javaagent")
hideFromDependabot(":instrumentation:ratpack:ratpack-1.4:javaagent")
hideFromDependabot(":instrumentation:ratpack:ratpack-1.4:testing")