Fix flaky mongo reactive test (#10897)

This commit is contained in:
Lauri Tulmin 2024-03-20 15:36:05 +02:00 committed by GitHub
parent 589df4c56a
commit 2b4f685679
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 33 additions and 17 deletions

View File

@ -23,11 +23,14 @@ import spock.lang.Shared
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<Document>> implements AgentTestTrait { class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<Document>> implements AgentTestTrait {
@Shared @Shared
MongoClient client MongoClient client
@Shared
List<Closeable> cleanup = []
def setupSpec() throws Exception { def setupSpec() throws Exception {
client = MongoClients.create("mongodb://localhost:$port") client = MongoClients.create("mongodb://localhost:$port")
@ -36,18 +39,27 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
def cleanupSpec() throws Exception { def cleanupSpec() throws Exception {
client?.close() client?.close()
client = null client = null
cleanup.forEach {
it.close()
}
} }
@Override @Override
void createCollection(String dbName, String collectionName) { void createCollection(String dbName, String collectionName) {
MongoDatabase db = client.getDatabase(dbName) MongoDatabase db = client.getDatabase(dbName)
db.createCollection(collectionName).subscribe(toSubscriber {}) def latch = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch.countDown() })
latch.await(30, TimeUnit.SECONDS)
} }
@Override @Override
void createCollectionNoDescription(String dbName, String collectionName) { void createCollectionNoDescription(String dbName, String collectionName) {
MongoDatabase db = MongoClients.create("mongodb://localhost:${port}").getDatabase(dbName) def tmpClient = MongoClients.create("mongodb://localhost:${port}")
db.createCollection(collectionName).subscribe(toSubscriber {}) cleanup.add(tmpClient)
MongoDatabase db = tmpClient.getDatabase(dbName)
def latch = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch.countDown() })
latch.await(30, TimeUnit.SECONDS)
} }
@Override @Override
@ -63,8 +75,12 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
new ServerAddress("localhost", port))) new ServerAddress("localhost", port)))
}) })
settings.build() settings.build()
MongoDatabase db = MongoClients.create(settings.build()).getDatabase(dbName) def tmpClient = MongoClients.create(settings.build())
db.createCollection(collectionName).subscribe(toSubscriber {}) cleanup.add(tmpClient)
MongoDatabase db = tmpClient.getDatabase(dbName)
def latch = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch.countDown() })
latch.await(30, TimeUnit.SECONDS)
} }
@Override @Override
@ -72,7 +88,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
MongoDatabase db = client.getDatabase(dbName) MongoDatabase db = client.getDatabase(dbName)
def count = new CompletableFuture<Integer>() def count = new CompletableFuture<Integer>()
db.getCollection(collectionName).estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) db.getCollection(collectionName).estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
return count.join() return count.get(30, TimeUnit.SECONDS)
} }
@Override @Override
@ -81,7 +97,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
MongoDatabase db = client.getDatabase(dbName) MongoDatabase db = client.getDatabase(dbName)
def latch1 = new CountDownLatch(1) def latch1 = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() }) db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() })
latch1.await() latch1.await(30, TimeUnit.SECONDS)
return db.getCollection(collectionName) return db.getCollection(collectionName)
} }
ignoreTracesAndClear(1) ignoreTracesAndClear(1)
@ -94,7 +110,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
collection.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber { collection.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber {
collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
}) })
return count.join() return count.get(30, TimeUnit.SECONDS)
} }
@Override @Override
@ -103,11 +119,11 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
MongoDatabase db = client.getDatabase(dbName) MongoDatabase db = client.getDatabase(dbName)
def latch1 = new CountDownLatch(1) def latch1 = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() }) db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() })
latch1.await() latch1.await(30, TimeUnit.SECONDS)
def coll = db.getCollection(collectionName) def coll = db.getCollection(collectionName)
def latch2 = new CountDownLatch(1) def latch2 = new CountDownLatch(1)
coll.insertOne(new Document("password", "OLDPW")).subscribe(toSubscriber { latch2.countDown() }) coll.insertOne(new Document("password", "OLDPW")).subscribe(toSubscriber { latch2.countDown() })
latch2.await() latch2.await(30, TimeUnit.SECONDS)
return coll return coll
} }
ignoreTracesAndClear(1) ignoreTracesAndClear(1)
@ -124,7 +140,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
result.complete(it) result.complete(it)
collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
}) })
return result.join().modifiedCount return result.get(30, TimeUnit.SECONDS).modifiedCount
} }
@Override @Override
@ -133,11 +149,11 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
MongoDatabase db = client.getDatabase(dbName) MongoDatabase db = client.getDatabase(dbName)
def latch1 = new CountDownLatch(1) def latch1 = new CountDownLatch(1)
db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() }) db.createCollection(collectionName).subscribe(toSubscriber { latch1.countDown() })
latch1.await() latch1.await(30, TimeUnit.SECONDS)
def coll = db.getCollection(collectionName) def coll = db.getCollection(collectionName)
def latch2 = new CountDownLatch(1) def latch2 = new CountDownLatch(1)
coll.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber { latch2.countDown() }) coll.insertOne(new Document("password", "SECRET")).subscribe(toSubscriber { latch2.countDown() })
latch2.await() latch2.await(30, TimeUnit.SECONDS)
return coll return coll
} }
ignoreTracesAndClear(1) ignoreTracesAndClear(1)
@ -152,7 +168,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
result.complete(it) result.complete(it)
collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) }) collection.estimatedDocumentCount().subscribe(toSubscriber { count.complete(it) })
}) })
return result.join().deletedCount return result.get(30, TimeUnit.SECONDS).deletedCount
} }
@Override @Override
@ -173,7 +189,7 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
db.createCollection(collectionName).subscribe(toSubscriber { db.createCollection(collectionName).subscribe(toSubscriber {
latch.countDown() latch.countDown()
}) })
latch.await() latch.await(30, TimeUnit.SECONDS)
return db.getCollection(collectionName) return db.getCollection(collectionName)
} }
ignoreTracesAndClear(1) ignoreTracesAndClear(1)
@ -181,10 +197,10 @@ class Mongo4ReactiveClientTest extends AbstractMongoClientTest<MongoCollection<D
collection.updateOne(new BsonDocument(), new BsonDocument()).subscribe(toSubscriber { collection.updateOne(new BsonDocument(), new BsonDocument()).subscribe(toSubscriber {
result.complete(it) result.complete(it)
}) })
throw result.join() throw result.get(30, TimeUnit.SECONDS)
} }
def Subscriber<?> toSubscriber(Closure closure) { Subscriber<?> toSubscriber(Closure closure) {
return new Subscriber() { return new Subscriber() {
boolean hasResult boolean hasResult