Use metrics batch API in connection pool instrumentations (#6231)

* Use metrics batch API in connection pool instrumentations

* Fix vibur muzzle

* Fix vibur muzzle
This commit is contained in:
Mateusz Rzeszutek 2022-07-05 13:47:15 +02:00 committed by GitHub
parent 814985e620
commit afee828846
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 230 additions and 160 deletions

View File

@ -10,13 +10,14 @@ import static io.opentelemetry.api.common.AttributeKey.stringKey;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterBuilder; import io.opentelemetry.api.metrics.MeterBuilder;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.api.metrics.ObservableMeasurement;
import io.opentelemetry.instrumentation.api.internal.EmbeddedInstrumentationProperties; import io.opentelemetry.instrumentation.api.internal.EmbeddedInstrumentationProperties;
import java.util.function.LongSupplier;
/** /**
* A helper class that models the <a * A helper class that models the <a
@ -54,64 +55,53 @@ public final class DbConnectionPoolMetrics {
idleConnectionsAttributes = attributes.toBuilder().put(CONNECTION_STATE, STATE_IDLE).build(); idleConnectionsAttributes = attributes.toBuilder().put(CONNECTION_STATE, STATE_IDLE).build();
} }
public ObservableLongUpDownCounter usedConnections(LongSupplier usedConnectionsGetter) { public ObservableLongMeasurement connections() {
return meter return meter
.upDownCounterBuilder("db.client.connections.usage") .upDownCounterBuilder("db.client.connections.usage")
.setUnit("connections") .setUnit("connections")
.setDescription( .setDescription(
"The number of connections that are currently in state described by the state attribute.") "The number of connections that are currently in state described by the state attribute.")
.buildWithCallback( .buildObserver();
measurement ->
measurement.record(usedConnectionsGetter.getAsLong(), usedConnectionsAttributes));
} }
public ObservableLongUpDownCounter idleConnections(LongSupplier idleConnectionsGetter) { public ObservableLongMeasurement minIdleConnections() {
return meter
.upDownCounterBuilder("db.client.connections.usage")
.setUnit("connections")
.setDescription(
"The number of connections that are currently in state described by the state attribute.")
.buildWithCallback(
measurement ->
measurement.record(idleConnectionsGetter.getAsLong(), idleConnectionsAttributes));
}
public ObservableLongUpDownCounter minIdleConnections(LongSupplier minIdleConnectionsGetter) {
return meter return meter
.upDownCounterBuilder("db.client.connections.idle.min") .upDownCounterBuilder("db.client.connections.idle.min")
.setUnit("connections") .setUnit("connections")
.setDescription("The minimum number of idle open connections allowed.") .setDescription("The minimum number of idle open connections allowed.")
.buildWithCallback( .buildObserver();
measurement -> measurement.record(minIdleConnectionsGetter.getAsLong(), attributes));
} }
public ObservableLongUpDownCounter maxIdleConnections(LongSupplier maxIdleConnectionsGetter) { public ObservableLongMeasurement maxIdleConnections() {
return meter return meter
.upDownCounterBuilder("db.client.connections.idle.max") .upDownCounterBuilder("db.client.connections.idle.max")
.setUnit("connections") .setUnit("connections")
.setDescription("The maximum number of idle open connections allowed.") .setDescription("The maximum number of idle open connections allowed.")
.buildWithCallback( .buildObserver();
measurement -> measurement.record(maxIdleConnectionsGetter.getAsLong(), attributes));
} }
public ObservableLongUpDownCounter maxConnections(LongSupplier maxConnectionsGetter) { public ObservableLongMeasurement maxConnections() {
return meter return meter
.upDownCounterBuilder("db.client.connections.max") .upDownCounterBuilder("db.client.connections.max")
.setUnit("connections") .setUnit("connections")
.setDescription("The maximum number of open connections allowed.") .setDescription("The maximum number of open connections allowed.")
.buildWithCallback( .buildObserver();
measurement -> measurement.record(maxConnectionsGetter.getAsLong(), attributes));
} }
public ObservableLongUpDownCounter pendingRequestsForConnection( public ObservableLongMeasurement pendingRequestsForConnection() {
LongSupplier pendingRequestsGetter) {
return meter return meter
.upDownCounterBuilder("db.client.connections.pending_requests") .upDownCounterBuilder("db.client.connections.pending_requests")
.setUnit("requests") .setUnit("requests")
.setDescription( .setDescription(
"The number of pending requests for an open connection, cumulative for the entire pool.") "The number of pending requests for an open connection, cumulative for the entire pool.")
.buildWithCallback( .buildObserver();
measurement -> measurement.record(pendingRequestsGetter.getAsLong(), attributes)); }
public BatchCallback batchCallback(
Runnable callback,
ObservableMeasurement observableMeasurement,
ObservableMeasurement... additionalMeasurements) {
return meter.batchCallback(callback, observableMeasurement, additionalMeasurements);
} }
// TODO: should be a BoundLongCounter // TODO: should be a BoundLongCounter
@ -151,8 +141,15 @@ public final class DbConnectionPoolMetrics {
.build(); .build();
} }
// TODO: should be removed once bound instruments are back
public Attributes getAttributes() { public Attributes getAttributes() {
return attributes; return attributes;
} }
public Attributes getUsedConnectionsAttributes() {
return usedConnectionsAttributes;
}
public Attributes getIdleConnectionsAttributes() {
return idleConnectionsAttributes;
}
} }

View File

@ -6,10 +6,10 @@
package io.opentelemetry.instrumentation.apachedbcp; package io.opentelemetry.instrumentation.apachedbcp;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics; import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.dbcp2.BasicDataSourceMXBean; import org.apache.commons.dbcp2.BasicDataSourceMXBean;
@ -20,31 +20,44 @@ final class DataSourceMetrics {
// a weak map does not make sense here because each Meter holds a reference to the dataSource // a weak map does not make sense here because each Meter holds a reference to the dataSource
// all instrumented/known implementations of BasicDataSourceMXBean do not implement // all instrumented/known implementations of BasicDataSourceMXBean do not implement
// equals()/hashCode(), so it's safe to keep them in a plain ConcurrentHashMap // equals()/hashCode(), so it's safe to keep them in a plain ConcurrentHashMap
private static final Map<BasicDataSourceMXBean, List<ObservableLongUpDownCounter>> private static final Map<BasicDataSourceMXBean, BatchCallback> dataSourceMetrics =
dataSourceMetrics = new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
public static void registerMetrics( public static void registerMetrics(
OpenTelemetry openTelemetry, BasicDataSourceMXBean dataSource, String dataSourceName) { OpenTelemetry openTelemetry, BasicDataSourceMXBean dataSource, String dataSourceName) {
DbConnectionPoolMetrics metrics = DbConnectionPoolMetrics metrics =
DbConnectionPoolMetrics.create(openTelemetry, INSTRUMENTATION_NAME, dataSourceName); DbConnectionPoolMetrics.create(openTelemetry, INSTRUMENTATION_NAME, dataSourceName);
List<ObservableLongUpDownCounter> meters = ObservableLongMeasurement connections = metrics.connections();
Arrays.asList( ObservableLongMeasurement minIdleConnections = metrics.minIdleConnections();
metrics.usedConnections(dataSource::getNumActive), ObservableLongMeasurement maxIdleConnections = metrics.maxIdleConnections();
metrics.idleConnections(dataSource::getNumIdle), ObservableLongMeasurement maxConnections = metrics.maxConnections();
metrics.minIdleConnections(dataSource::getMinIdle),
metrics.maxIdleConnections(dataSource::getMaxIdle),
metrics.maxConnections(dataSource::getMaxTotal));
dataSourceMetrics.put(dataSource, meters); Attributes attributes = metrics.getAttributes();
Attributes usedConnectionsAttributes = metrics.getUsedConnectionsAttributes();
Attributes idleConnectionsAttributes = metrics.getIdleConnectionsAttributes();
BatchCallback callback =
metrics.batchCallback(
() -> {
connections.record(dataSource.getNumActive(), usedConnectionsAttributes);
connections.record(dataSource.getNumIdle(), idleConnectionsAttributes);
minIdleConnections.record(dataSource.getMinIdle(), attributes);
maxIdleConnections.record(dataSource.getMaxIdle(), attributes);
maxConnections.record(dataSource.getMaxTotal(), attributes);
},
connections,
minIdleConnections,
maxIdleConnections,
maxConnections);
dataSourceMetrics.put(dataSource, callback);
} }
public static void unregisterMetrics(BasicDataSourceMXBean dataSource) { public static void unregisterMetrics(BasicDataSourceMXBean dataSource) {
List<ObservableLongUpDownCounter> observableInstruments = dataSourceMetrics.remove(dataSource); BatchCallback callback = dataSourceMetrics.remove(dataSource);
if (observableInstruments != null) { if (callback != null) {
for (ObservableLongUpDownCounter observable : observableInstruments) { callback.close();
observable.close();
}
} }
} }

View File

@ -7,38 +7,42 @@ package io.opentelemetry.instrumentation.c3p0;
import com.mchange.v2.c3p0.PooledDataSource; import com.mchange.v2.c3p0.PooledDataSource;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics; import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.LongSupplier; import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable; import javax.annotation.Nullable;
final class ConnectionPoolMetrics { final class ConnectionPoolMetrics {
private static final Logger logger = Logger.getLogger(ConnectionPoolMetrics.class.getName());
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.c3p0-0.9"; private static final String INSTRUMENTATION_NAME = "io.opentelemetry.c3p0-0.9";
// a weak map does not make sense here because each Meter holds a reference to the dataSource // a weak map does not make sense here because each Meter holds a reference to the dataSource
// PooledDataSource implements equals() & hashCode() in IdentityTokenResolvable, // PooledDataSource implements equals() & hashCode() in IdentityTokenResolvable,
// that's why we wrap it with IdentityDataSourceKey that uses identity comparison instead // that's why we wrap it with IdentityDataSourceKey that uses identity comparison instead
private static final Map<IdentityDataSourceKey, List<ObservableLongUpDownCounter>> private static final Map<IdentityDataSourceKey, BatchCallback> dataSourceMetrics =
dataSourceMetrics = new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
public static void registerMetrics(OpenTelemetry openTelemetry, PooledDataSource dataSource) { public static void registerMetrics(OpenTelemetry openTelemetry, PooledDataSource dataSource) {
dataSourceMetrics.compute( dataSourceMetrics.compute(
new IdentityDataSourceKey(dataSource), new IdentityDataSourceKey(dataSource),
(key, existingCounters) -> (key, existingCallback) ->
ConnectionPoolMetrics.createMeters(openTelemetry, key, existingCounters)); ConnectionPoolMetrics.createMeters(openTelemetry, key, existingCallback));
} }
private static List<ObservableLongUpDownCounter> createMeters( private static BatchCallback createMeters(
OpenTelemetry openTelemetry, OpenTelemetry openTelemetry,
IdentityDataSourceKey key, IdentityDataSourceKey key,
List<ObservableLongUpDownCounter> existingCounters) { @Nullable BatchCallback existingCallback) {
// remove old counters from the registry in case they were already there // remove old counters from the registry in case they were already there
removeMetersFromRegistry(existingCounters); removeMetersFromRegistry(existingCallback);
PooledDataSource dataSource = key.dataSource; PooledDataSource dataSource = key.dataSource;
@ -46,25 +50,38 @@ final class ConnectionPoolMetrics {
DbConnectionPoolMetrics.create( DbConnectionPoolMetrics.create(
openTelemetry, INSTRUMENTATION_NAME, dataSource.getDataSourceName()); openTelemetry, INSTRUMENTATION_NAME, dataSource.getDataSourceName());
return Arrays.asList( ObservableLongMeasurement connections = metrics.connections();
metrics.usedConnections(wrapThrowingSupplier(dataSource::getNumBusyConnectionsDefaultUser)), ObservableLongMeasurement pendingRequestsForConnection = metrics.pendingRequestsForConnection();
metrics.idleConnections(wrapThrowingSupplier(dataSource::getNumIdleConnectionsDefaultUser)),
metrics.pendingRequestsForConnection( Attributes attributes = metrics.getAttributes();
wrapThrowingSupplier(dataSource::getNumThreadsAwaitingCheckoutDefaultUser))); Attributes usedConnectionsAttributes = metrics.getUsedConnectionsAttributes();
Attributes idleConnectionsAttributes = metrics.getIdleConnectionsAttributes();
return metrics.batchCallback(
() -> {
try {
connections.record(
dataSource.getNumBusyConnectionsDefaultUser(), usedConnectionsAttributes);
connections.record(
dataSource.getNumIdleConnectionsDefaultUser(), idleConnectionsAttributes);
pendingRequestsForConnection.record(
dataSource.getNumThreadsAwaitingCheckoutDefaultUser(), attributes);
} catch (SQLException e) {
logger.log(Level.FINE, "Failed to get C3P0 datasource metric", e);
}
},
connections,
pendingRequestsForConnection);
} }
public static void unregisterMetrics(PooledDataSource dataSource) { public static void unregisterMetrics(PooledDataSource dataSource) {
List<ObservableLongUpDownCounter> meters = BatchCallback callback = dataSourceMetrics.remove(new IdentityDataSourceKey(dataSource));
dataSourceMetrics.remove(new IdentityDataSourceKey(dataSource)); removeMetersFromRegistry(callback);
removeMetersFromRegistry(meters);
} }
private static void removeMetersFromRegistry( private static void removeMetersFromRegistry(@Nullable BatchCallback callback) {
@Nullable List<ObservableLongUpDownCounter> observableInstruments) { if (callback != null) {
if (observableInstruments != null) { callback.close();
for (ObservableLongUpDownCounter observable : observableInstruments) {
observable.close();
}
} }
} }
@ -103,20 +120,5 @@ final class ConnectionPoolMetrics {
} }
} }
static LongSupplier wrapThrowingSupplier(DataSourceIntSupplier supplier) {
return () -> {
try {
return supplier.getAsInt();
} catch (SQLException e) {
throw new IllegalStateException("Failed to get C3P0 datasource metric", e);
}
};
}
@FunctionalInterface
interface DataSourceIntSupplier {
int getAsInt() throws SQLException;
}
private ConnectionPoolMetrics() {} private ConnectionPoolMetrics() {}
} }

View File

@ -7,10 +7,9 @@ package io.opentelemetry.instrumentation.hikaricp;
import com.zaxxer.hikari.metrics.IMetricsTracker; import com.zaxxer.hikari.metrics.IMetricsTracker;
import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
final class OpenTelemetryMetricsTracker implements IMetricsTracker { final class OpenTelemetryMetricsTracker implements IMetricsTracker {
@ -19,7 +18,7 @@ final class OpenTelemetryMetricsTracker implements IMetricsTracker {
private final IMetricsTracker userMetricsTracker; private final IMetricsTracker userMetricsTracker;
private final List<ObservableLongUpDownCounter> observableInstruments; private final BatchCallback callback;
private final LongCounter timeouts; private final LongCounter timeouts;
private final DoubleHistogram createTime; private final DoubleHistogram createTime;
private final DoubleHistogram waitTime; private final DoubleHistogram waitTime;
@ -28,14 +27,14 @@ final class OpenTelemetryMetricsTracker implements IMetricsTracker {
OpenTelemetryMetricsTracker( OpenTelemetryMetricsTracker(
IMetricsTracker userMetricsTracker, IMetricsTracker userMetricsTracker,
List<ObservableLongUpDownCounter> observableInstruments, BatchCallback callback,
LongCounter timeouts, LongCounter timeouts,
DoubleHistogram createTime, DoubleHistogram createTime,
DoubleHistogram waitTime, DoubleHistogram waitTime,
DoubleHistogram useTime, DoubleHistogram useTime,
Attributes attributes) { Attributes attributes) {
this.userMetricsTracker = userMetricsTracker; this.userMetricsTracker = userMetricsTracker;
this.observableInstruments = observableInstruments; this.callback = callback;
this.timeouts = timeouts; this.timeouts = timeouts;
this.createTime = createTime; this.createTime = createTime;
this.waitTime = waitTime; this.waitTime = waitTime;
@ -70,9 +69,7 @@ final class OpenTelemetryMetricsTracker implements IMetricsTracker {
@Override @Override
public void close() { public void close() {
for (ObservableLongUpDownCounter observable : observableInstruments) { callback.close();
observable.close();
}
userMetricsTracker.close(); userMetricsTracker.close();
} }
} }

View File

@ -9,10 +9,10 @@ import com.zaxxer.hikari.metrics.IMetricsTracker;
import com.zaxxer.hikari.metrics.MetricsTrackerFactory; import com.zaxxer.hikari.metrics.MetricsTrackerFactory;
import com.zaxxer.hikari.metrics.PoolStats; import com.zaxxer.hikari.metrics.PoolStats;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics; import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable; import javax.annotation.Nullable;
final class OpenTelemetryMetricsTrackerFactory implements MetricsTrackerFactory { final class OpenTelemetryMetricsTrackerFactory implements MetricsTrackerFactory {
@ -38,17 +38,32 @@ final class OpenTelemetryMetricsTrackerFactory implements MetricsTrackerFactory
DbConnectionPoolMetrics metrics = DbConnectionPoolMetrics metrics =
DbConnectionPoolMetrics.create(openTelemetry, INSTRUMENTATION_NAME, poolName); DbConnectionPoolMetrics.create(openTelemetry, INSTRUMENTATION_NAME, poolName);
List<ObservableLongUpDownCounter> observableInstruments = ObservableLongMeasurement connections = metrics.connections();
Arrays.asList( ObservableLongMeasurement minIdleConnections = metrics.minIdleConnections();
metrics.usedConnections(poolStats::getActiveConnections), ObservableLongMeasurement maxConnections = metrics.maxConnections();
metrics.idleConnections(poolStats::getIdleConnections), ObservableLongMeasurement pendingRequestsForConnection = metrics.pendingRequestsForConnection();
metrics.minIdleConnections(poolStats::getMinConnections),
metrics.maxConnections(poolStats::getMaxConnections), Attributes attributes = metrics.getAttributes();
metrics.pendingRequestsForConnection(poolStats::getPendingThreads)); Attributes usedConnectionsAttributes = metrics.getUsedConnectionsAttributes();
Attributes idleConnectionsAttributes = metrics.getIdleConnectionsAttributes();
BatchCallback callback =
metrics.batchCallback(
() -> {
connections.record(poolStats.getActiveConnections(), usedConnectionsAttributes);
connections.record(poolStats.getIdleConnections(), idleConnectionsAttributes);
minIdleConnections.record(poolStats.getMinConnections(), attributes);
maxConnections.record(poolStats.getMaxConnections(), attributes);
pendingRequestsForConnection.record(poolStats.getPendingThreads(), attributes);
},
connections,
minIdleConnections,
maxConnections,
pendingRequestsForConnection);
return new OpenTelemetryMetricsTracker( return new OpenTelemetryMetricsTracker(
userMetricsTracker, userMetricsTracker,
observableInstruments, callback,
metrics.connectionTimeouts(), metrics.connectionTimeouts(),
metrics.connectionCreateTime(), metrics.connectionCreateTime(),
metrics.connectionWaitTime(), metrics.connectionWaitTime(),

View File

@ -6,10 +6,10 @@
package io.opentelemetry.instrumentation.oracleucp; package io.opentelemetry.instrumentation.oracleucp;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics; import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import oracle.ucp.UniversalConnectionPool; import oracle.ucp.UniversalConnectionPool;
@ -20,8 +20,8 @@ final class ConnectionPoolMetrics {
// a weak map does not make sense here because each Meter holds a reference to the connection pool // a weak map does not make sense here because each Meter holds a reference to the connection pool
// none of the UniversalConnectionPool implementations contain equals()/hashCode(), so it's safe // none of the UniversalConnectionPool implementations contain equals()/hashCode(), so it's safe
// to keep them in a plain ConcurrentHashMap // to keep them in a plain ConcurrentHashMap
private static final Map<UniversalConnectionPool, List<ObservableLongUpDownCounter>> private static final Map<UniversalConnectionPool, BatchCallback> dataSourceMetrics =
dataSourceMetrics = new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
public static void registerMetrics( public static void registerMetrics(
OpenTelemetry openTelemetry, UniversalConnectionPool connectionPool) { OpenTelemetry openTelemetry, UniversalConnectionPool connectionPool) {
@ -29,27 +29,40 @@ final class ConnectionPoolMetrics {
connectionPool, (unused) -> createMeters(openTelemetry, connectionPool)); connectionPool, (unused) -> createMeters(openTelemetry, connectionPool));
} }
private static List<ObservableLongUpDownCounter> createMeters( private static BatchCallback createMeters(
OpenTelemetry openTelemetry, UniversalConnectionPool connectionPool) { OpenTelemetry openTelemetry, UniversalConnectionPool connectionPool) {
DbConnectionPoolMetrics metrics = DbConnectionPoolMetrics metrics =
DbConnectionPoolMetrics.create( DbConnectionPoolMetrics.create(
openTelemetry, INSTRUMENTATION_NAME, connectionPool.getName()); openTelemetry, INSTRUMENTATION_NAME, connectionPool.getName());
return Arrays.asList( ObservableLongMeasurement connections = metrics.connections();
metrics.usedConnections(connectionPool::getBorrowedConnectionsCount), ObservableLongMeasurement maxConnections = metrics.maxConnections();
metrics.idleConnections(connectionPool::getAvailableConnectionsCount), ObservableLongMeasurement pendingRequestsForConnection = metrics.pendingRequestsForConnection();
metrics.maxConnections(() -> connectionPool.getStatistics().getPeakConnectionsCount()),
metrics.pendingRequestsForConnection( Attributes attributes = metrics.getAttributes();
() -> connectionPool.getStatistics().getPendingRequestsCount())); Attributes usedConnectionsAttributes = metrics.getUsedConnectionsAttributes();
Attributes idleConnectionsAttributes = metrics.getIdleConnectionsAttributes();
return metrics.batchCallback(
() -> {
connections.record(
connectionPool.getBorrowedConnectionsCount(), usedConnectionsAttributes);
connections.record(
connectionPool.getAvailableConnectionsCount(), idleConnectionsAttributes);
maxConnections.record(
connectionPool.getStatistics().getPeakConnectionsCount(), attributes);
pendingRequestsForConnection.record(
connectionPool.getStatistics().getPendingRequestsCount(), attributes);
},
connections,
maxConnections,
pendingRequestsForConnection);
} }
public static void unregisterMetrics(UniversalConnectionPool connectionPool) { public static void unregisterMetrics(UniversalConnectionPool connectionPool) {
List<ObservableLongUpDownCounter> observableInstruments = BatchCallback callback = dataSourceMetrics.remove(connectionPool);
dataSourceMetrics.remove(connectionPool); if (callback != null) {
if (observableInstruments != null) { callback.close();
for (ObservableLongUpDownCounter observable : observableInstruments) {
observable.close();
}
} }
} }

View File

@ -7,10 +7,10 @@ package io.opentelemetry.javaagent.instrumentation.tomcat.jdbc;
import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics; import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.apache.tomcat.jdbc.pool.DataSourceProxy; import org.apache.tomcat.jdbc.pool.DataSourceProxy;
@ -23,34 +23,48 @@ public final class TomcatConnectionPoolMetrics {
// a weak map does not make sense here because each Meter holds a reference to the dataSource // a weak map does not make sense here because each Meter holds a reference to the dataSource
// DataSourceProxy does not implement equals()/hashCode(), so it's safe to keep them in a plain // DataSourceProxy does not implement equals()/hashCode(), so it's safe to keep them in a plain
// ConcurrentHashMap // ConcurrentHashMap
private static final Map<DataSourceProxy, List<ObservableLongUpDownCounter>> dataSourceMetrics = private static final Map<DataSourceProxy, BatchCallback> dataSourceMetrics =
new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
public static void registerMetrics(DataSourceProxy dataSource) { public static void registerMetrics(DataSourceProxy dataSource) {
dataSourceMetrics.computeIfAbsent(dataSource, TomcatConnectionPoolMetrics::createCounters); dataSourceMetrics.computeIfAbsent(dataSource, TomcatConnectionPoolMetrics::createInstruments);
} }
private static List<ObservableLongUpDownCounter> createCounters(DataSourceProxy dataSource) { private static BatchCallback createInstruments(DataSourceProxy dataSource) {
DbConnectionPoolMetrics metrics = DbConnectionPoolMetrics metrics =
DbConnectionPoolMetrics.create( DbConnectionPoolMetrics.create(
openTelemetry, INSTRUMENTATION_NAME, dataSource.getPoolName()); openTelemetry, INSTRUMENTATION_NAME, dataSource.getPoolName());
return Arrays.asList( ObservableLongMeasurement connections = metrics.connections();
metrics.usedConnections(dataSource::getActive), ObservableLongMeasurement minIdleConnections = metrics.minIdleConnections();
metrics.idleConnections(dataSource::getIdle), ObservableLongMeasurement maxIdleConnections = metrics.maxIdleConnections();
metrics.minIdleConnections(dataSource::getMinIdle), ObservableLongMeasurement maxConnections = metrics.maxConnections();
metrics.maxIdleConnections(dataSource::getMaxIdle), ObservableLongMeasurement pendingRequestsForConnection = metrics.pendingRequestsForConnection();
metrics.maxConnections(dataSource::getMaxActive),
metrics.pendingRequestsForConnection(dataSource::getWaitCount)); Attributes attributes = metrics.getAttributes();
Attributes usedConnectionsAttributes = metrics.getUsedConnectionsAttributes();
Attributes idleConnectionsAttributes = metrics.getIdleConnectionsAttributes();
return metrics.batchCallback(
() -> {
connections.record(dataSource.getActive(), usedConnectionsAttributes);
connections.record(dataSource.getIdle(), idleConnectionsAttributes);
minIdleConnections.record(dataSource.getMinIdle(), attributes);
maxIdleConnections.record(dataSource.getMaxIdle(), attributes);
maxConnections.record(dataSource.getMaxActive(), attributes);
pendingRequestsForConnection.record(dataSource.getWaitCount(), attributes);
},
connections,
minIdleConnections,
maxIdleConnections,
maxConnections,
pendingRequestsForConnection);
} }
public static void unregisterMetrics(DataSourceProxy dataSource) { public static void unregisterMetrics(DataSourceProxy dataSource) {
List<ObservableLongUpDownCounter> counters = dataSourceMetrics.remove(dataSource); BatchCallback callback = dataSourceMetrics.remove(dataSource);
if (counters != null) { if (callback != null) {
for (ObservableLongUpDownCounter meter : counters) { callback.close();
meter.close();
}
} }
} }

View File

@ -5,12 +5,14 @@
package io.opentelemetry.javaagent.instrumentation.viburdbcp; package io.opentelemetry.javaagent.instrumentation.viburdbcp;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import com.google.auto.service.AutoService; import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.List; import java.util.List;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(InstrumentationModule.class) @AutoService(InstrumentationModule.class)
public class ViburDbcpInstrumentationModule extends InstrumentationModule { public class ViburDbcpInstrumentationModule extends InstrumentationModule {
@ -18,6 +20,14 @@ public class ViburDbcpInstrumentationModule extends InstrumentationModule {
super("vibur-dbcp", "vibur-dbcp-11.0"); super("vibur-dbcp", "vibur-dbcp-11.0");
} }
@Override
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
// ViburDBCPConfig was renamed to ViburConfig in 10.0; this matcher excludes all versions < 10.0
// in 11.0, the ViburDBCPDataSource#getPool() method signature was changed - this is detected by
// muzzle
return hasClassesNamed("org.vibur.dbcp.ViburConfig");
}
@Override @Override
public List<TypeInstrumentation> typeInstrumentations() { public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new ViburDbcpDataSourceInstrumentation()); return singletonList(new ViburDbcpDataSourceInstrumentation());

View File

@ -6,10 +6,10 @@
package io.opentelemetry.instrumentation.viburdbcp; package io.opentelemetry.instrumentation.viburdbcp;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter; import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.BatchCallback;
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics; import io.opentelemetry.instrumentation.api.metrics.db.DbConnectionPoolMetrics;
import java.util.Arrays;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.vibur.dbcp.ViburDBCPDataSource; import org.vibur.dbcp.ViburDBCPDataSource;
@ -20,31 +20,40 @@ final class ConnectionPoolMetrics {
// a weak map does not make sense here because each Meter holds a reference to the dataSource // a weak map does not make sense here because each Meter holds a reference to the dataSource
// ViburDBCPDataSource does not implement equals()/hashCode(), so it's safe to keep them in a // ViburDBCPDataSource does not implement equals()/hashCode(), so it's safe to keep them in a
// plain ConcurrentHashMap // plain ConcurrentHashMap
private static final Map<ViburDBCPDataSource, List<ObservableLongUpDownCounter>> private static final Map<ViburDBCPDataSource, BatchCallback> dataSourceMetrics =
dataSourceMetrics = new ConcurrentHashMap<>(); new ConcurrentHashMap<>();
public static void registerMetrics(OpenTelemetry openTelemetry, ViburDBCPDataSource dataSource) { public static void registerMetrics(OpenTelemetry openTelemetry, ViburDBCPDataSource dataSource) {
dataSourceMetrics.computeIfAbsent( dataSourceMetrics.computeIfAbsent(
dataSource, (unused) -> createMeters(openTelemetry, dataSource)); dataSource, (unused) -> createMeters(openTelemetry, dataSource));
} }
private static List<ObservableLongUpDownCounter> createMeters( private static BatchCallback createMeters(
OpenTelemetry openTelemetry, ViburDBCPDataSource dataSource) { OpenTelemetry openTelemetry, ViburDBCPDataSource dataSource) {
DbConnectionPoolMetrics metrics = DbConnectionPoolMetrics metrics =
DbConnectionPoolMetrics.create(openTelemetry, INSTRUMENTATION_NAME, dataSource.getName()); DbConnectionPoolMetrics.create(openTelemetry, INSTRUMENTATION_NAME, dataSource.getName());
return Arrays.asList( ObservableLongMeasurement connections = metrics.connections();
metrics.usedConnections(() -> dataSource.getPool().taken()), ObservableLongMeasurement maxConnections = metrics.maxConnections();
metrics.idleConnections(() -> dataSource.getPool().remainingCreated()),
metrics.maxConnections(dataSource::getPoolMaxSize)); Attributes attributes = metrics.getAttributes();
Attributes usedConnectionsAttributes = metrics.getUsedConnectionsAttributes();
Attributes idleConnectionsAttributes = metrics.getIdleConnectionsAttributes();
return metrics.batchCallback(
() -> {
connections.record(dataSource.getPool().taken(), usedConnectionsAttributes);
connections.record(dataSource.getPool().remainingCreated(), idleConnectionsAttributes);
maxConnections.record(dataSource.getPoolMaxSize(), attributes);
},
connections,
maxConnections);
} }
public static void unregisterMetrics(ViburDBCPDataSource dataSource) { public static void unregisterMetrics(ViburDBCPDataSource dataSource) {
List<ObservableLongUpDownCounter> observableInstruments = dataSourceMetrics.remove(dataSource); BatchCallback callback = dataSourceMetrics.remove(dataSource);
if (observableInstruments != null) { if (callback != null) {
for (ObservableLongUpDownCounter observable : observableInstruments) { callback.close();
observable.close();
}
} }
} }