MongoDB 4 driver instrumentation (#2046)

* MongoDB 4 driver instrumentation

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

* fix getting constructor

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

* fix formatting

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

* Update instrumentation/mongo/mongo-4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/v4/MongoClientInstrumentationModule.java

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

* Update settings.gradle

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

* Update instrumentation/mongo/mongo-4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/v4/MongoClientInstrumentationModule.java

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

* use mongo-4.0

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

* use public api

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

* use testImplementation

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

* use declaresField(named("commandListeners")) in typeMatcher

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

* migrate to mongo-4.0-testing

Signed-off-by: Sergei Malafeev <sergei@malafeev.org>

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Sergei Malafeev 2021-01-25 21:55:03 +08:00 committed by GitHub
parent 588f8847ed
commit bbcbeb51e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 571 additions and 1 deletions

View File

@ -7,6 +7,16 @@ muzzle {
versions = "[3.7,)"
assertInverse = true
}
pass {
group = "org.mongodb"
module = "mongodb-driver-core"
// this instrumentation is backwards compatible with early versions of the new API that shipped in 3.7
// the legacy API instrumented in mongo-3.1 continues to be shipped in 4.x, but doesn't conflict here
// because they are triggered by different types: MongoClientSettings(new) vs MongoClientOptions(legacy)
versions = "[3.7,)"
extraDependency "org.mongodb:bson:3.7.0"
assertInverse = true
}
}
dependencies {

View File

@ -0,0 +1,14 @@
apply from: "$rootDir/gradle/instrumentation.gradle"
dependencies {
testInstrumentation(project(':instrumentation:mongo:mongo-3.7:javaagent')) {
exclude group: 'org.mongodb', module: 'mongo-java-driver'
}
testImplementation project(':instrumentation:mongo:mongo-testing')
testImplementation group: 'org.mongodb', name: 'mongodb-driver-core', version: '4.0.0'
testImplementation group: 'org.mongodb', name: 'mongodb-driver-sync', version: '4.0.0'
testImplementation group: 'org.mongodb', name: 'mongodb-driver-reactivestreams', version: '4.0.0'
testImplementation group: 'de.flapdoodle.embed', name: 'de.flapdoodle.embed.mongo', version: '1.50.5'
}

View File

@ -0,0 +1,308 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.mongodb.client.result.DeleteResult
import com.mongodb.client.result.UpdateResult
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoClients
import com.mongodb.reactivestreams.client.MongoCollection
import com.mongodb.reactivestreams.client.MongoDatabase
import io.opentelemetry.api.trace.attributes.SemanticAttributes
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.sdk.trace.data.SpanData
import org.bson.BsonDocument
import org.bson.BsonString
import org.bson.Document
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import spock.lang.Shared
import spock.lang.Timeout
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import static io.opentelemetry.api.trace.Span.Kind.CLIENT
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
@Timeout(10)
class Mongo4ReactiveClientTest extends MongoBaseTest {
@Shared
MongoClient client
def setup() throws Exception {
client = MongoClients.create("mongodb://localhost:$port")
}
def cleanup() throws Exception {
client?.close()
client = null
}
def "test create collection"() {
setup:
MongoDatabase db = client.getDatabase(dbName)
when:
db.createCollection(collectionName).subscribe(toSubscriber {})
then:
assertTraces(1) {
trace(0, 1) {
mongoSpan(it, 0, "create", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"create\":\"$collectionName\",\"capped\":\"?\"}" ||
it == "{\"create\": \"$collectionName\", \"capped\": \"?\", \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}"
true
}
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test create collection no description"() {
setup:
MongoDatabase db = MongoClients.create("mongodb://localhost:$port").getDatabase(dbName)
when:
db.createCollection(collectionName).subscribe(toSubscriber {})
then:
assertTraces(1) {
trace(0, 1) {
mongoSpan(it, 0, "create", collectionName, dbName, {
assert it.replaceAll(" ", "") == "{\"create\":\"$collectionName\",\"capped\":\"?\"}" ||
it == "{\"create\": \"$collectionName\", \"capped\": \"?\", \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}"
true
})
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test get collection"() {
setup:
MongoDatabase db = client.getDatabase(dbName)
when:
def count = new CompletableFuture()
db.getCollection(collectionName).estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
then:
count.get() == 0
assertTraces(1) {
trace(0, 1) {
mongoSpan(it, 0, "count", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" ||
it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}"
true
}
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test insert"() {
setup:
MongoCollection<Document> collection = runUnderTrace("setup") {
MongoDatabase db = client.getDatabase(dbName)
def latch1 = new CountDownLatch(1)
// This creates a trace that isn't linked to the parent... using NIO internally that we don't handle.
db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() })
latch1.await()
return db.getCollection(collectionName)
}
TEST_WRITER.waitForTraces(2)
TEST_WRITER.clear()
when:
def count = new CompletableFuture()
collection.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber {
collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
})
then:
count.get() == 1
assertTraces(2) {
trace(0, 1) {
mongoSpan(it, 0, "insert", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"insert\":\"$collectionName\",\"ordered\":\"?\",\"documents\":[{\"_id\":\"?\",\"password\":\"?\"}]}" ||
it == "{\"insert\": \"$collectionName\", \"ordered\": \"?\", \"\$db\": \"?\", \"documents\": [{\"_id\": \"?\", \"password\": \"?\"}]}"
true
}
}
trace(1, 1) {
mongoSpan(it, 0, "count", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" ||
it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}"
true
}
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test update"() {
setup:
MongoCollection<Document> collection = runUnderTrace("setup") {
MongoDatabase db = client.getDatabase(dbName)
def latch1 = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() })
latch1.await()
def coll = db.getCollection(collectionName)
def latch2 = new CountDownLatch(1)
coll.insertOne(new Document("password", "OLDPW")).subscribe(toSubscriber { latch2.countDown() })
latch2.await()
return coll
}
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
when:
def result = new CompletableFuture<UpdateResult>()
def count = new CompletableFuture()
collection.updateOne(
new BsonDocument("password", new BsonString("OLDPW")),
new BsonDocument('$set', new BsonDocument("password", new BsonString("NEWPW")))).subscribe(toSubscriber {
result.complete(it)
collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
})
then:
result.get().modifiedCount == 1
count.get() == 1
assertTraces(2) {
trace(0, 1) {
mongoSpan(it, 0, "update", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"update\":\"$collectionName\",\"ordered\":\"?\",\"updates\":[{\"q\":{\"password\":\"?\"},\"u\":{\"\$set\":{\"password\":\"?\"}}}]}" ||
it == "{\"update\": \"?\", \"ordered\": \"?\", \"\$db\": \"?\", \"updates\": [{\"q\": {\"password\": \"?\"}, \"u\": {\"\$set\": {\"password\": \"?\"}}}]}"
true
}
}
trace(1, 1) {
mongoSpan(it, 0, "count", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" ||
it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}"
true
}
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test delete"() {
setup:
MongoCollection<Document> collection = runUnderTrace("setup") {
MongoDatabase db = client.getDatabase(dbName)
def latch1 = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() })
latch1.await()
def coll = db.getCollection(collectionName)
def latch2 = new CountDownLatch(1)
coll.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber { latch2.countDown() })
latch2.await()
return coll
}
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
when:
def result = new CompletableFuture<DeleteResult>()
def count = new CompletableFuture()
collection.deleteOne(new BsonDocument("password", new BsonString("SECRET"))).subscribe(toSubscriber {
result.complete(it)
collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
})
then:
result.get().deletedCount == 1
count.get() == 0
assertTraces(2) {
trace(0, 1) {
mongoSpan(it, 0, "delete", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"delete\":\"$collectionName\",\"ordered\":\"?\",\"deletes\":[{\"q\":{\"password\":\"?\"},\"limit\":\"?\"}]}" ||
it == "{\"delete\": \"?\", \"ordered\": \"?\", \"\$db\": \"?\", \"deletes\": [{\"q\": {\"password\": \"?\"}, \"limit\": \"?\"}]}"
true
}
}
trace(1, 1) {
mongoSpan(it, 0, "count", collectionName, dbName) {
assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" ||
it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}"
true
}
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def Subscriber<?> toSubscriber(Closure closure) {
return new Subscriber() {
boolean hasResult
@Override
void onSubscribe(Subscription s) {
s.request(1) // must request 1 value to trigger async call
}
@Override
void onNext(Object o) { hasResult = true; closure.call(o) }
@Override
void onError(Throwable t) { hasResult = true; closure.call(t) }
@Override
void onComplete() {
if (!hasResult) {
hasResult = true
closure.call()
}
}
}
}
def mongoSpan(TraceAssert trace, int index,
String operation, String collection,
String dbName, Closure<Boolean> statementEval,
Object parentSpan = null, Throwable exception = null) {
trace.span(index) {
name statementEval
kind CLIENT
if (parentSpan == null) {
hasNoParent()
} else {
childOf((SpanData) parentSpan)
}
attributes {
"$SemanticAttributes.NET_PEER_NAME.key" "localhost"
"$SemanticAttributes.NET_PEER_IP.key" "127.0.0.1"
"$SemanticAttributes.NET_PEER_PORT.key" port
"$SemanticAttributes.DB_CONNECTION_STRING.key" "mongodb://localhost:" + port
"$SemanticAttributes.DB_STATEMENT.key" statementEval
"$SemanticAttributes.DB_SYSTEM.key" "mongodb"
"$SemanticAttributes.DB_NAME.key" dbName
"$SemanticAttributes.DB_OPERATION.key" operation
"$SemanticAttributes.DB_MONGODB_COLLECTION.key" collection
}
}
}
}

View File

@ -0,0 +1,227 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.mongodb.MongoTimeoutException
import org.bson.BsonDocument
import org.bson.BsonString
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import com.mongodb.client.MongoClient
import com.mongodb.client.MongoClients
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import org.bson.Document
import spock.lang.Shared
class MongoClientTest extends MongoBaseTest {
@Shared
MongoClient client
def setup() throws Exception {
client = MongoClients.create("mongodb://localhost:$port")
}
def cleanup() throws Exception {
client?.close()
client = null
}
def "test create collection"() {
setup:
MongoDatabase db = client.getDatabase(dbName)
when:
db.createCollection(collectionName)
then:
assertTraces(1) {
trace(0, 1) {
mongoSpan(it, 0, "create", collectionName, dbName, "{\"create\":\"$collectionName\",\"capped\":\"?\"}")
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test create collection no description"() {
setup:
MongoDatabase db = MongoClients.create("mongodb://localhost:$port").getDatabase(dbName)
when:
db.createCollection(collectionName)
then:
assertTraces(1) {
trace(0, 1) {
mongoSpan(it, 0, "create", collectionName, dbName, "{\"create\":\"$collectionName\",\"capped\":\"?\"}")
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test get collection"() {
setup:
MongoDatabase db = client.getDatabase(dbName)
when:
int count = db.getCollection(collectionName).estimatedDocumentCount()
then:
count == 0
assertTraces(1) {
trace(0,1) {
mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}")
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test insert"() {
setup:
MongoCollection<Document> collection = runUnderTrace("setup") {
MongoDatabase db = client.getDatabase(dbName)
db.createCollection(collectionName)
return db.getCollection(collectionName)
}
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
when:
collection.insertOne(new Document("password", "SECRET"))
then:
collection.estimatedDocumentCount() == 1
assertTraces(2) {
trace(0, 1) {
mongoSpan(it, 0, "insert", collectionName, dbName, "{\"insert\":\"$collectionName\",\"ordered\":\"?\",\"documents\":[{\"_id\":\"?\",\"password\":\"?\"}]}")
}
trace(1, 1) {
mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}")
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test update"() {
setup:
MongoCollection<Document> collection = runUnderTrace("setup") {
MongoDatabase db = client.getDatabase(dbName)
db.createCollection(collectionName)
def coll = db.getCollection(collectionName)
coll.insertOne(new Document("password", "OLDPW"))
return coll
}
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
when:
def result = collection.updateOne(
new BsonDocument("password", new BsonString("OLDPW")),
new BsonDocument('$set', new BsonDocument("password", new BsonString("NEWPW"))))
then:
result.modifiedCount == 1
collection.estimatedDocumentCount() == 1
assertTraces(2) {
trace(0, 1) {
mongoSpan(it, 0, "update", collectionName, dbName, "{\"update\":\"$collectionName\",\"ordered\":\"?\",\"updates\":[{\"q\":{\"password\":\"?\"},\"u\":{\"\$set\":{\"password\":\"?\"}}}]}")
}
trace(1, 1) {
mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}")
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test delete"() {
setup:
MongoCollection<Document> collection = runUnderTrace("setup") {
MongoDatabase db = client.getDatabase(dbName)
db.createCollection(collectionName)
def coll = db.getCollection(collectionName)
coll.insertOne(new Document("password", "SECRET"))
return coll
}
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
when:
def result = collection.deleteOne(new BsonDocument("password", new BsonString("SECRET")))
then:
result.deletedCount == 1
collection.estimatedDocumentCount() == 0
assertTraces(2) {
trace(0, 1) {
mongoSpan(it, 0, "delete", collectionName, dbName, "{\"delete\":\"$collectionName\",\"ordered\":\"?\",\"deletes\":[{\"q\":{\"password\":\"?\"},\"limit\":\"?\"}]}")
}
trace(1, 1) {
mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}")
}
}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test error"() {
setup:
MongoCollection<Document> collection = runUnderTrace("setup") {
MongoDatabase db = client.getDatabase(dbName)
db.createCollection(collectionName)
return db.getCollection(collectionName)
}
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
when:
collection.updateOne(new BsonDocument(), new BsonDocument())
then:
thrown(IllegalArgumentException)
// Unfortunately not caught by our instrumentation.
assertTraces(0) {}
where:
dbName = "test_db"
collectionName = "testCollection"
}
def "test client failure"() {
setup:
def client = MongoClients.create("mongodb://localhost:$UNUSABLE_PORT/?serverselectiontimeoutms=10")
when:
MongoDatabase db = client.getDatabase(dbName)
db.createCollection(collectionName)
then:
thrown(MongoTimeoutException)
// Unfortunately not caught by our instrumentation.
assertTraces(0) {}
where:
dbName = "test_db"
collectionName = "testCollection"
}
}

View File

@ -119,7 +119,7 @@ public class MongoClientTracer extends DatabaseClientTracer<CommandStartedEvent,
asList("ordered", "insert", "count", "find", "create");
private JsonWriterSettings createJsonWriterSettings(int maxNormalizedQueryLength) {
JsonWriterSettings settings = new JsonWriterSettings(false);
JsonWriterSettings settings = null;
try {
// The static JsonWriterSettings.builder() method was introduced in the 3.5 release
Optional<Method> buildMethod =
@ -154,6 +154,16 @@ public class MongoClientTracer extends DatabaseClientTracer<CommandStartedEvent,
}
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException ignored) {
}
if (settings == null) {
try {
settings = JsonWriterSettings.class.getConstructor(Boolean.TYPE).newInstance(false);
} catch (InstantiationException
| IllegalAccessException
| InvocationTargetException
| NoSuchMethodException ignored) {
}
}
return settings;
}

View File

@ -153,6 +153,7 @@ include ':instrumentation:logback:logback-1.0:testing'
include ':instrumentation:methods:javaagent'
include ':instrumentation:mongo:mongo-3.1:javaagent'
include ':instrumentation:mongo:mongo-3.7:javaagent'
include ':instrumentation:mongo:mongo-4.0-testing'
include ':instrumentation:mongo:mongo-async-3.3:javaagent'
include ':instrumentation:mongo:mongo-common:javaagent'
include ':instrumentation:mongo:mongo-testing'