From bbcbeb51e3a8319e30af5d22705703deb81cfa8e Mon Sep 17 00:00:00 2001 From: Sergei Malafeev Date: Mon, 25 Jan 2021 21:55:03 +0800 Subject: [PATCH] MongoDB 4 driver instrumentation (#2046) * MongoDB 4 driver instrumentation Signed-off-by: Sergei Malafeev * fix getting constructor Signed-off-by: Sergei Malafeev * fix formatting Signed-off-by: Sergei Malafeev * Update instrumentation/mongo/mongo-4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/v4/MongoClientInstrumentationModule.java Co-authored-by: Trask Stalnaker * Update settings.gradle Co-authored-by: Trask Stalnaker * Update instrumentation/mongo/mongo-4/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/v4/MongoClientInstrumentationModule.java Co-authored-by: Trask Stalnaker * use mongo-4.0 Signed-off-by: Sergei Malafeev * use public api Signed-off-by: Sergei Malafeev * use testImplementation Signed-off-by: Sergei Malafeev * use declaresField(named("commandListeners")) in typeMatcher Signed-off-by: Sergei Malafeev * migrate to mongo-4.0-testing Signed-off-by: Sergei Malafeev Co-authored-by: Trask Stalnaker --- .../javaagent/mongo-3.7-javaagent.gradle | 10 + .../mongo-4.0-testing.gradle | 14 + .../groovy/Mongo4ReactiveClientTest.groovy | 308 ++++++++++++++++++ .../src/test/groovy/MongoClientTest.groovy | 227 +++++++++++++ .../mongo/MongoClientTracer.java | 12 +- settings.gradle | 1 + 6 files changed, 571 insertions(+), 1 deletion(-) create mode 100644 instrumentation/mongo/mongo-4.0-testing/mongo-4.0-testing.gradle create mode 100644 instrumentation/mongo/mongo-4.0-testing/src/test/groovy/Mongo4ReactiveClientTest.groovy create mode 100644 instrumentation/mongo/mongo-4.0-testing/src/test/groovy/MongoClientTest.groovy diff --git a/instrumentation/mongo/mongo-3.7/javaagent/mongo-3.7-javaagent.gradle b/instrumentation/mongo/mongo-3.7/javaagent/mongo-3.7-javaagent.gradle index f1f50afac7..9afbf7cf3a 100644 --- a/instrumentation/mongo/mongo-3.7/javaagent/mongo-3.7-javaagent.gradle +++ b/instrumentation/mongo/mongo-3.7/javaagent/mongo-3.7-javaagent.gradle @@ -7,6 +7,16 @@ muzzle { versions = "[3.7,)" assertInverse = true } + pass { + group = "org.mongodb" + module = "mongodb-driver-core" + // this instrumentation is backwards compatible with early versions of the new API that shipped in 3.7 + // the legacy API instrumented in mongo-3.1 continues to be shipped in 4.x, but doesn't conflict here + // because they are triggered by different types: MongoClientSettings(new) vs MongoClientOptions(legacy) + versions = "[3.7,)" + extraDependency "org.mongodb:bson:3.7.0" + assertInverse = true + } } dependencies { diff --git a/instrumentation/mongo/mongo-4.0-testing/mongo-4.0-testing.gradle b/instrumentation/mongo/mongo-4.0-testing/mongo-4.0-testing.gradle new file mode 100644 index 0000000000..5c182f758b --- /dev/null +++ b/instrumentation/mongo/mongo-4.0-testing/mongo-4.0-testing.gradle @@ -0,0 +1,14 @@ +apply from: "$rootDir/gradle/instrumentation.gradle" + + + +dependencies { + testInstrumentation(project(':instrumentation:mongo:mongo-3.7:javaagent')) { + exclude group: 'org.mongodb', module: 'mongo-java-driver' + } + testImplementation project(':instrumentation:mongo:mongo-testing') + testImplementation group: 'org.mongodb', name: 'mongodb-driver-core', version: '4.0.0' + testImplementation group: 'org.mongodb', name: 'mongodb-driver-sync', version: '4.0.0' + testImplementation group: 'org.mongodb', name: 'mongodb-driver-reactivestreams', version: '4.0.0' + testImplementation group: 'de.flapdoodle.embed', name: 'de.flapdoodle.embed.mongo', version: '1.50.5' +} diff --git a/instrumentation/mongo/mongo-4.0-testing/src/test/groovy/Mongo4ReactiveClientTest.groovy b/instrumentation/mongo/mongo-4.0-testing/src/test/groovy/Mongo4ReactiveClientTest.groovy new file mode 100644 index 0000000000..81db19d3f8 --- /dev/null +++ b/instrumentation/mongo/mongo-4.0-testing/src/test/groovy/Mongo4ReactiveClientTest.groovy @@ -0,0 +1,308 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import com.mongodb.client.result.DeleteResult +import com.mongodb.client.result.UpdateResult +import com.mongodb.reactivestreams.client.MongoClient +import com.mongodb.reactivestreams.client.MongoClients +import com.mongodb.reactivestreams.client.MongoCollection +import com.mongodb.reactivestreams.client.MongoDatabase +import io.opentelemetry.api.trace.attributes.SemanticAttributes +import io.opentelemetry.instrumentation.test.asserts.TraceAssert +import io.opentelemetry.sdk.trace.data.SpanData +import org.bson.BsonDocument +import org.bson.BsonString +import org.bson.Document +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription +import spock.lang.Shared +import spock.lang.Timeout + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.CountDownLatch + +import static io.opentelemetry.api.trace.Span.Kind.CLIENT +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace + +@Timeout(10) +class Mongo4ReactiveClientTest extends MongoBaseTest { + + @Shared + MongoClient client + + def setup() throws Exception { + client = MongoClients.create("mongodb://localhost:$port") + } + + def cleanup() throws Exception { + client?.close() + client = null + } + + def "test create collection"() { + setup: + MongoDatabase db = client.getDatabase(dbName) + + when: + db.createCollection(collectionName).subscribe(toSubscriber {}) + + then: + assertTraces(1) { + trace(0, 1) { + mongoSpan(it, 0, "create", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"create\":\"$collectionName\",\"capped\":\"?\"}" || + it == "{\"create\": \"$collectionName\", \"capped\": \"?\", \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}" + true + } + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test create collection no description"() { + setup: + MongoDatabase db = MongoClients.create("mongodb://localhost:$port").getDatabase(dbName) + + when: + db.createCollection(collectionName).subscribe(toSubscriber {}) + + then: + assertTraces(1) { + trace(0, 1) { + mongoSpan(it, 0, "create", collectionName, dbName, { + assert it.replaceAll(" ", "") == "{\"create\":\"$collectionName\",\"capped\":\"?\"}" || + it == "{\"create\": \"$collectionName\", \"capped\": \"?\", \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}" + true + }) + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test get collection"() { + setup: + MongoDatabase db = client.getDatabase(dbName) + + when: + def count = new CompletableFuture() + db.getCollection(collectionName).estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) + + then: + count.get() == 0 + assertTraces(1) { + trace(0, 1) { + mongoSpan(it, 0, "count", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" || + it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}" + true + } + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test insert"() { + setup: + MongoCollection collection = runUnderTrace("setup") { + MongoDatabase db = client.getDatabase(dbName) + def latch1 = new CountDownLatch(1) + // This creates a trace that isn't linked to the parent... using NIO internally that we don't handle. + db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() }) + latch1.await() + return db.getCollection(collectionName) + } + TEST_WRITER.waitForTraces(2) + TEST_WRITER.clear() + + when: + def count = new CompletableFuture() + collection.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber { + collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) + }) + + then: + count.get() == 1 + assertTraces(2) { + trace(0, 1) { + mongoSpan(it, 0, "insert", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"insert\":\"$collectionName\",\"ordered\":\"?\",\"documents\":[{\"_id\":\"?\",\"password\":\"?\"}]}" || + it == "{\"insert\": \"$collectionName\", \"ordered\": \"?\", \"\$db\": \"?\", \"documents\": [{\"_id\": \"?\", \"password\": \"?\"}]}" + true + } + } + trace(1, 1) { + mongoSpan(it, 0, "count", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" || + it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}" + true + } + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test update"() { + setup: + MongoCollection collection = runUnderTrace("setup") { + MongoDatabase db = client.getDatabase(dbName) + def latch1 = new CountDownLatch(1) + db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() }) + latch1.await() + def coll = db.getCollection(collectionName) + def latch2 = new CountDownLatch(1) + coll.insertOne(new Document("password", "OLDPW")).subscribe(toSubscriber { latch2.countDown() }) + latch2.await() + return coll + } + TEST_WRITER.waitForTraces(1) + TEST_WRITER.clear() + + when: + def result = new CompletableFuture() + def count = new CompletableFuture() + collection.updateOne( + new BsonDocument("password", new BsonString("OLDPW")), + new BsonDocument('$set', new BsonDocument("password", new BsonString("NEWPW")))).subscribe(toSubscriber { + result.complete(it) + collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) + }) + + then: + result.get().modifiedCount == 1 + count.get() == 1 + assertTraces(2) { + trace(0, 1) { + mongoSpan(it, 0, "update", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"update\":\"$collectionName\",\"ordered\":\"?\",\"updates\":[{\"q\":{\"password\":\"?\"},\"u\":{\"\$set\":{\"password\":\"?\"}}}]}" || + it == "{\"update\": \"?\", \"ordered\": \"?\", \"\$db\": \"?\", \"updates\": [{\"q\": {\"password\": \"?\"}, \"u\": {\"\$set\": {\"password\": \"?\"}}}]}" + true + } + } + trace(1, 1) { + mongoSpan(it, 0, "count", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" || + it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}" + true + } + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test delete"() { + setup: + MongoCollection collection = runUnderTrace("setup") { + MongoDatabase db = client.getDatabase(dbName) + def latch1 = new CountDownLatch(1) + db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() }) + latch1.await() + def coll = db.getCollection(collectionName) + def latch2 = new CountDownLatch(1) + coll.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber { latch2.countDown() }) + latch2.await() + return coll + } + TEST_WRITER.waitForTraces(1) + TEST_WRITER.clear() + + when: + def result = new CompletableFuture() + def count = new CompletableFuture() + collection.deleteOne(new BsonDocument("password", new BsonString("SECRET"))).subscribe(toSubscriber { + result.complete(it) + collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) + }) + + then: + result.get().deletedCount == 1 + count.get() == 0 + assertTraces(2) { + trace(0, 1) { + mongoSpan(it, 0, "delete", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"delete\":\"$collectionName\",\"ordered\":\"?\",\"deletes\":[{\"q\":{\"password\":\"?\"},\"limit\":\"?\"}]}" || + it == "{\"delete\": \"?\", \"ordered\": \"?\", \"\$db\": \"?\", \"deletes\": [{\"q\": {\"password\": \"?\"}, \"limit\": \"?\"}]}" + true + } + } + trace(1, 1) { + mongoSpan(it, 0, "count", collectionName, dbName) { + assert it.replaceAll(" ", "") == "{\"count\":\"$collectionName\",\"query\":{}}" || + it == "{\"count\": \"$collectionName\", \"query\": {}, \"\$db\": \"?\", \"\$readPreference\": {\"mode\": \"?\"}}" + true + } + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def Subscriber toSubscriber(Closure closure) { + return new Subscriber() { + boolean hasResult + + @Override + void onSubscribe(Subscription s) { + s.request(1) // must request 1 value to trigger async call + } + + @Override + void onNext(Object o) { hasResult = true; closure.call(o) } + + @Override + void onError(Throwable t) { hasResult = true; closure.call(t) } + + @Override + void onComplete() { + if (!hasResult) { + hasResult = true + closure.call() + } + } + } + } + + def mongoSpan(TraceAssert trace, int index, + String operation, String collection, + String dbName, Closure statementEval, + Object parentSpan = null, Throwable exception = null) { + trace.span(index) { + name statementEval + kind CLIENT + if (parentSpan == null) { + hasNoParent() + } else { + childOf((SpanData) parentSpan) + } + attributes { + "$SemanticAttributes.NET_PEER_NAME.key" "localhost" + "$SemanticAttributes.NET_PEER_IP.key" "127.0.0.1" + "$SemanticAttributes.NET_PEER_PORT.key" port + "$SemanticAttributes.DB_CONNECTION_STRING.key" "mongodb://localhost:" + port + "$SemanticAttributes.DB_STATEMENT.key" statementEval + "$SemanticAttributes.DB_SYSTEM.key" "mongodb" + "$SemanticAttributes.DB_NAME.key" dbName + "$SemanticAttributes.DB_OPERATION.key" operation + "$SemanticAttributes.DB_MONGODB_COLLECTION.key" collection + } + } + } +} diff --git a/instrumentation/mongo/mongo-4.0-testing/src/test/groovy/MongoClientTest.groovy b/instrumentation/mongo/mongo-4.0-testing/src/test/groovy/MongoClientTest.groovy new file mode 100644 index 0000000000..b19dff7ff6 --- /dev/null +++ b/instrumentation/mongo/mongo-4.0-testing/src/test/groovy/MongoClientTest.groovy @@ -0,0 +1,227 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import com.mongodb.MongoTimeoutException +import org.bson.BsonDocument +import org.bson.BsonString + +import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace +import com.mongodb.client.MongoClient +import com.mongodb.client.MongoClients +import com.mongodb.client.MongoCollection +import com.mongodb.client.MongoDatabase +import org.bson.Document +import spock.lang.Shared + +class MongoClientTest extends MongoBaseTest { + + @Shared + MongoClient client + + def setup() throws Exception { + client = MongoClients.create("mongodb://localhost:$port") + } + + def cleanup() throws Exception { + client?.close() + client = null + } + + def "test create collection"() { + setup: + MongoDatabase db = client.getDatabase(dbName) + + when: + db.createCollection(collectionName) + + then: + assertTraces(1) { + trace(0, 1) { + mongoSpan(it, 0, "create", collectionName, dbName, "{\"create\":\"$collectionName\",\"capped\":\"?\"}") + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test create collection no description"() { + setup: + MongoDatabase db = MongoClients.create("mongodb://localhost:$port").getDatabase(dbName) + + when: + db.createCollection(collectionName) + + then: + assertTraces(1) { + trace(0, 1) { + mongoSpan(it, 0, "create", collectionName, dbName, "{\"create\":\"$collectionName\",\"capped\":\"?\"}") + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test get collection"() { + setup: + MongoDatabase db = client.getDatabase(dbName) + + when: + int count = db.getCollection(collectionName).estimatedDocumentCount() + + then: + count == 0 + assertTraces(1) { + trace(0,1) { + mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}") + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test insert"() { + setup: + MongoCollection collection = runUnderTrace("setup") { + MongoDatabase db = client.getDatabase(dbName) + db.createCollection(collectionName) + return db.getCollection(collectionName) + } + TEST_WRITER.waitForTraces(1) + TEST_WRITER.clear() + + when: + collection.insertOne(new Document("password", "SECRET")) + + then: + collection.estimatedDocumentCount() == 1 + assertTraces(2) { + trace(0, 1) { + mongoSpan(it, 0, "insert", collectionName, dbName, "{\"insert\":\"$collectionName\",\"ordered\":\"?\",\"documents\":[{\"_id\":\"?\",\"password\":\"?\"}]}") + } + trace(1, 1) { + mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}") + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test update"() { + setup: + MongoCollection collection = runUnderTrace("setup") { + MongoDatabase db = client.getDatabase(dbName) + db.createCollection(collectionName) + def coll = db.getCollection(collectionName) + coll.insertOne(new Document("password", "OLDPW")) + return coll + } + TEST_WRITER.waitForTraces(1) + TEST_WRITER.clear() + + when: + def result = collection.updateOne( + new BsonDocument("password", new BsonString("OLDPW")), + new BsonDocument('$set', new BsonDocument("password", new BsonString("NEWPW")))) + + then: + result.modifiedCount == 1 + collection.estimatedDocumentCount() == 1 + assertTraces(2) { + trace(0, 1) { + mongoSpan(it, 0, "update", collectionName, dbName, "{\"update\":\"$collectionName\",\"ordered\":\"?\",\"updates\":[{\"q\":{\"password\":\"?\"},\"u\":{\"\$set\":{\"password\":\"?\"}}}]}") + } + trace(1, 1) { + mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}") + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test delete"() { + setup: + MongoCollection collection = runUnderTrace("setup") { + MongoDatabase db = client.getDatabase(dbName) + db.createCollection(collectionName) + def coll = db.getCollection(collectionName) + coll.insertOne(new Document("password", "SECRET")) + return coll + } + TEST_WRITER.waitForTraces(1) + TEST_WRITER.clear() + + when: + def result = collection.deleteOne(new BsonDocument("password", new BsonString("SECRET"))) + + then: + result.deletedCount == 1 + collection.estimatedDocumentCount() == 0 + assertTraces(2) { + trace(0, 1) { + mongoSpan(it, 0, "delete", collectionName, dbName, "{\"delete\":\"$collectionName\",\"ordered\":\"?\",\"deletes\":[{\"q\":{\"password\":\"?\"},\"limit\":\"?\"}]}") + } + trace(1, 1) { + mongoSpan(it, 0, "count", collectionName, dbName, "{\"count\":\"$collectionName\",\"query\":{}}") + } + } + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test error"() { + setup: + MongoCollection collection = runUnderTrace("setup") { + MongoDatabase db = client.getDatabase(dbName) + db.createCollection(collectionName) + return db.getCollection(collectionName) + } + TEST_WRITER.waitForTraces(1) + TEST_WRITER.clear() + + when: + collection.updateOne(new BsonDocument(), new BsonDocument()) + + then: + thrown(IllegalArgumentException) + // Unfortunately not caught by our instrumentation. + assertTraces(0) {} + + where: + dbName = "test_db" + collectionName = "testCollection" + } + + def "test client failure"() { + setup: + def client = MongoClients.create("mongodb://localhost:$UNUSABLE_PORT/?serverselectiontimeoutms=10") + + when: + MongoDatabase db = client.getDatabase(dbName) + db.createCollection(collectionName) + + then: + thrown(MongoTimeoutException) + // Unfortunately not caught by our instrumentation. + assertTraces(0) {} + + where: + dbName = "test_db" + collectionName = "testCollection" + } + +} diff --git a/instrumentation/mongo/mongo-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/MongoClientTracer.java b/instrumentation/mongo/mongo-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/MongoClientTracer.java index 169e4e0ea7..6046f24ac3 100644 --- a/instrumentation/mongo/mongo-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/MongoClientTracer.java +++ b/instrumentation/mongo/mongo-common/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/mongo/MongoClientTracer.java @@ -119,7 +119,7 @@ public class MongoClientTracer extends DatabaseClientTracer buildMethod = @@ -154,6 +154,16 @@ public class MongoClientTracer extends DatabaseClientTracer