Fix some flakiness in Lettuce tests

It looks like automatic reconnection was enabled which lead to random
traces popping up in random places when Redis server was shutdown.

Also make sure that server persists only during single test to weedout
all inter-test dependencies.
This commit is contained in:
Nikolay Martynov 2018-06-21 17:33:03 -04:00
parent 367d4bf247
commit 389f65687a
3 changed files with 195 additions and 93 deletions

View File

@ -1,11 +1,13 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import io.lettuce.core.ClientOptions
import io.lettuce.core.ConnectionFuture
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisFuture
import io.lettuce.core.RedisURI
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.api.sync.RedisCommands
import io.lettuce.core.codec.StringCodec
import io.lettuce.core.protocol.AsyncCommand
import redis.embedded.RedisServer
@ -30,35 +32,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
System.setProperty("dd.integration.lettuce.enabled", "true")
}
@Shared
public static final String HOST = "127.0.0.1"
public static final int PORT = TestUtils.randomOpenPort()
public static final int INCORRECT_PORT = TestUtils.randomOpenPort()
public static final int DB_INDEX = 0
@Shared
public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX
@Shared
public static final String DB_ADDR_NON_EXISTENT = HOST + ":" + INCORRECT_PORT + "/" + DB_INDEX
@Shared
public static final String DB_URI_NON_EXISTENT = "redis://" + DB_ADDR_NON_EXISTENT
public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared
RedisServer redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(PORT).build()
@Shared
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
@Shared
StatefulConnection connection
@Shared
RedisAsyncCommands<String, ?> asyncCommands = null
@Shared
Map<String, String> testHashMap = [
firstname: "John",
@ -66,16 +58,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
age: "53"
]
def setupSpec() {
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
StatefulConnection connection
RedisAsyncCommands<String, ?> asyncCommands
RedisCommands<String, ?> syncCommands
def setup() {
println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect()
asyncCommands = connection.async()
syncCommands = connection.sync()
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
}
def cleanupSpec() {
def cleanup() {
connection.close()
redisServer.stop()
}
@ -83,11 +84,14 @@ class LettuceAsyncClientTest extends AgentTestRunner {
def "connect using get on ConnectionFuture"() {
setup:
RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, PORT, 3, TimeUnit.SECONDS))
def connection = connectionFuture.get()
StatefulConnection connection = connectionFuture.get()
expect:
then:
connection != null
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
@ -112,17 +116,20 @@ class LettuceAsyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
connection.close()
}
def "connect exception inside the connection future"() {
setup:
RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT)
StatefulConnection connection = null
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, INCORRECT_PORT, 3, TimeUnit.SECONDS))
connection = connectionFuture.get()
StatefulConnection connection = connectionFuture.get()
then:
connection == null
@ -183,6 +190,8 @@ class LettuceAsyncClientTest extends AgentTestRunner {
def "get command chained with thenAccept"() {
setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions()
Consumer<String> consumer = new Consumer<String>() {
@Override
@ -199,8 +208,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
@ -271,11 +297,12 @@ class LettuceAsyncClientTest extends AgentTestRunner {
}
}
}
}
def "command with no arguments using a biconsumer"() {
setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions()
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
@Override
@ -292,8 +319,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
@ -433,9 +477,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
asyncCommands.setAutoFlushCommands(true)
}
def "cancel command before it finishes"() {
@ -477,9 +518,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
asyncCommands.setAutoFlushCommands(true)
}
def "debug segfault command (returns void) with no argument should produce span"() {
@ -506,11 +544,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
@ -538,10 +571,5 @@ class LettuceAsyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
}

View File

@ -3,6 +3,7 @@ import datadog.trace.agent.test.TestUtils
import io.lettuce.core.*
import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.reactive.RedisReactiveCommands
import io.lettuce.core.api.sync.RedisCommands
import redis.embedded.RedisServer
import spock.lang.Shared
import spock.util.concurrent.AsyncConditions
@ -18,40 +19,41 @@ class LettuceReactiveClientTest extends AgentTestRunner {
System.setProperty("dd.integration.lettuce.enabled", "true")
}
@Shared
public static final String HOST = "127.0.0.1"
public static final int PORT = TestUtils.randomOpenPort()
public static final int DB_INDEX = 0
@Shared
public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX
public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared
RedisServer redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(PORT).build()
@Shared
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
@Shared
StatefulConnection connection
@Shared
RedisReactiveCommands<String, ?> reactiveCommands = null
RedisReactiveCommands<String, ?> reactiveCommands
RedisCommands<String, ?> syncCommands
def setupSpec() {
def setup() {
println "Using redis: $redisServer.args"
redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect()
reactiveCommands = connection.reactive()
syncCommands = connection.sync()
TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear()
}
def cleanupSpec() {
def cleanup() {
connection.close()
redisServer.stop()
}
@ -96,6 +98,7 @@ class LettuceReactiveClientTest extends AgentTestRunner {
def "get command with lambda function"() {
setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions()
when:
@ -103,8 +106,25 @@ class LettuceReactiveClientTest extends AgentTestRunner {
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
@ -164,6 +184,7 @@ class LettuceReactiveClientTest extends AgentTestRunner {
def "command with no arguments"() {
setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions()
when:
@ -175,8 +196,25 @@ class LettuceReactiveClientTest extends AgentTestRunner {
then:
conds.await()
assertTraces(TEST_WRITER, 1) {
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
@ -287,11 +325,6 @@ class LettuceReactiveClientTest extends AgentTestRunner {
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
def "shutdown command (returns void) with argument should produce span"() {
@ -318,11 +351,6 @@ class LettuceReactiveClientTest extends AgentTestRunner {
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
}

View File

@ -1,5 +1,6 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils
import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisConnectionException
import io.lettuce.core.api.StatefulConnection
@ -18,35 +19,25 @@ class LettuceSyncClientTest extends AgentTestRunner {
System.setProperty("dd.integration.lettuce.enabled", "true")
}
@Shared
public static final String HOST = "127.0.0.1"
public static final int PORT = TestUtils.randomOpenPort()
public static final int INCORRECT_PORT = TestUtils.randomOpenPort()
public static final int DB_INDEX = 0
@Shared
public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX
@Shared
public static final String DB_ADDR_NON_EXISTENT = HOST + ":" + INCORRECT_PORT + "/" + DB_INDEX
@Shared
public static final String DB_URI_NON_EXISTENT = "redis://" + DB_ADDR_NON_EXISTENT
public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared
RedisServer redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup
// bind to localhost to avoid firewall popup
.setting("bind " + HOST)
// set max memory to avoid problems in CI
// set max memory to avoid problems in CI
.setting("maxmemory 128M")
.port(PORT).build()
@Shared
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
@Shared
StatefulConnection connection
@Shared
RedisCommands<String, ?> syncCommands = null
@Shared
Map<String, String> testHashMap = [
firstname: "John",
@ -54,7 +45,11 @@ class LettuceSyncClientTest extends AgentTestRunner {
age: "53"
]
def setupSpec() {
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
StatefulConnection connection
RedisCommands<String, ?> syncCommands
def setup() {
redisServer.start()
connection = redisClient.connect()
syncCommands = connection.sync()
@ -62,7 +57,7 @@ class LettuceSyncClientTest extends AgentTestRunner {
TEST_WRITER.clear()
}
def cleanupSpec() {
def cleanup() {
connection.close()
redisServer.stop()
}
@ -70,9 +65,12 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "connect"() {
setup:
RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI)
testConnectionClient.connect()
testConnectionClient.setOptions(CLIENT_OPTIONS)
expect:
when:
StatefulConnection connection = testConnectionClient.connect()
then:
assertTraces(TEST_WRITER, 1) {
trace(0, 1) {
span(0) {
@ -96,11 +94,15 @@ class LettuceSyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
connection.close()
}
def "connect exception"() {
setup:
RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
testConnectionClient.connect()
@ -162,12 +164,30 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "get command"() {
setup:
syncCommands.set("TESTKEY", "TESTVAL")
String res = syncCommands.get("TESTKEY")
expect:
res == "TESTVAL"
assertTraces(TEST_WRITER, 1) {
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
@ -216,12 +236,30 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "command with no arguments"() {
setup:
syncCommands.set("TESTKEY", "TESTVAL")
def keyRetrieved = syncCommands.randomkey()
expect:
keyRetrieved == "TESTKEY"
assertTraces(TEST_WRITER, 1) {
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
@ -297,12 +335,30 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "hash getall command"() {
setup:
syncCommands.hmset("user", testHashMap)
Map<String, String> res = syncCommands.hgetall("user")
expect:
res == testHashMap
assertTraces(TEST_WRITER, 1) {
assertTraces(TEST_WRITER, 2) {
trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "HMSET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
@ -346,11 +402,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
def "shutdown command (returns void) should produce a span"() {
@ -377,10 +428,5 @@ class LettuceSyncClientTest extends AgentTestRunner {
}
}
}
cleanup:
if (!redisServer.active) {
redisServer.start()
}
}
}