Merge pull request #366 from DataDog/mar-kolya/fixes-for-lettuce-tests

Fix some flakiness in Lettuce tests
This commit is contained in:
Nikolay Martynov 2018-06-22 11:01:30 -04:00 committed by GitHub
commit c050b4acd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 195 additions and 93 deletions

View File

@ -1,11 +1,13 @@
import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils import datadog.trace.agent.test.TestUtils
import io.lettuce.core.ClientOptions
import io.lettuce.core.ConnectionFuture import io.lettuce.core.ConnectionFuture
import io.lettuce.core.RedisClient import io.lettuce.core.RedisClient
import io.lettuce.core.RedisFuture import io.lettuce.core.RedisFuture
import io.lettuce.core.RedisURI import io.lettuce.core.RedisURI
import io.lettuce.core.api.StatefulConnection import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.async.RedisAsyncCommands import io.lettuce.core.api.async.RedisAsyncCommands
import io.lettuce.core.api.sync.RedisCommands
import io.lettuce.core.codec.StringCodec import io.lettuce.core.codec.StringCodec
import io.lettuce.core.protocol.AsyncCommand import io.lettuce.core.protocol.AsyncCommand
import redis.embedded.RedisServer import redis.embedded.RedisServer
@ -30,35 +32,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
System.setProperty("dd.integration.lettuce.enabled", "true") System.setProperty("dd.integration.lettuce.enabled", "true")
} }
@Shared
public static final String HOST = "127.0.0.1" public static final String HOST = "127.0.0.1"
public static final int PORT = TestUtils.randomOpenPort() public static final int PORT = TestUtils.randomOpenPort()
public static final int INCORRECT_PORT = TestUtils.randomOpenPort() public static final int INCORRECT_PORT = TestUtils.randomOpenPort()
public static final int DB_INDEX = 0 public static final int DB_INDEX = 0
@Shared
public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX
@Shared
public static final String DB_ADDR_NON_EXISTENT = HOST + ":" + INCORRECT_PORT + "/" + DB_INDEX public static final String DB_ADDR_NON_EXISTENT = HOST + ":" + INCORRECT_PORT + "/" + DB_INDEX
@Shared
public static final String DB_URI_NON_EXISTENT = "redis://" + DB_ADDR_NON_EXISTENT public static final String DB_URI_NON_EXISTENT = "redis://" + DB_ADDR_NON_EXISTENT
public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared @Shared
RedisServer redisServer = RedisServer.builder() RedisServer redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup // bind to localhost to avoid firewall popup
.setting("bind " + HOST) .setting("bind " + HOST)
// set max memory to avoid problems in CI // set max memory to avoid problems in CI
.setting("maxmemory 128M") .setting("maxmemory 128M")
.port(PORT).build() .port(PORT).build()
@Shared
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
@Shared
StatefulConnection connection
@Shared
RedisAsyncCommands<String, ?> asyncCommands = null
@Shared @Shared
Map<String, String> testHashMap = [ Map<String, String> testHashMap = [
firstname: "John", firstname: "John",
@ -66,16 +58,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
age: "53" age: "53"
] ]
def setupSpec() { RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
StatefulConnection connection
RedisAsyncCommands<String, ?> asyncCommands
RedisCommands<String, ?> syncCommands
def setup() {
println "Using redis: $redisServer.args" println "Using redis: $redisServer.args"
redisServer.start() redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect() connection = redisClient.connect()
asyncCommands = connection.async() asyncCommands = connection.async()
syncCommands = connection.sync()
TEST_WRITER.waitForTraces(1) TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear() TEST_WRITER.clear()
} }
def cleanupSpec() { def cleanup() {
connection.close() connection.close()
redisServer.stop() redisServer.stop()
} }
@ -83,11 +84,14 @@ class LettuceAsyncClientTest extends AgentTestRunner {
def "connect using get on ConnectionFuture"() { def "connect using get on ConnectionFuture"() {
setup: setup:
RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI) RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8, ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, PORT, 3, TimeUnit.SECONDS)) new RedisURI(HOST, PORT, 3, TimeUnit.SECONDS))
def connection = connectionFuture.get() StatefulConnection connection = connectionFuture.get()
expect: then:
connection != null connection != null
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
trace(0, 1) { trace(0, 1) {
@ -112,17 +116,20 @@ class LettuceAsyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
connection.close()
} }
def "connect exception inside the connection future"() { def "connect exception inside the connection future"() {
setup: setup:
RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT) RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT)
StatefulConnection connection = null testConnectionClient.setOptions(CLIENT_OPTIONS)
when: when:
ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8, ConnectionFuture connectionFuture = testConnectionClient.connectAsync(StringCodec.UTF8,
new RedisURI(HOST, INCORRECT_PORT, 3, TimeUnit.SECONDS)) new RedisURI(HOST, INCORRECT_PORT, 3, TimeUnit.SECONDS))
connection = connectionFuture.get() StatefulConnection connection = connectionFuture.get()
then: then:
connection == null connection == null
@ -183,6 +190,8 @@ class LettuceAsyncClientTest extends AgentTestRunner {
def "get command chained with thenAccept"() { def "get command chained with thenAccept"() {
setup: setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions() def conds = new AsyncConditions()
Consumer<String> consumer = new Consumer<String>() { Consumer<String> consumer = new Consumer<String>() {
@Override @Override
@ -199,8 +208,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
then: then:
conds.await() conds.await()
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) { span(0) {
serviceName "redis" serviceName "redis"
operationName "redis.query" operationName "redis.query"
@ -271,11 +297,12 @@ class LettuceAsyncClientTest extends AgentTestRunner {
} }
} }
} }
} }
def "command with no arguments using a biconsumer"() { def "command with no arguments using a biconsumer"() {
setup: setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions() def conds = new AsyncConditions()
BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() { BiConsumer<String, Throwable> biConsumer = new BiConsumer<String, Throwable>() {
@Override @Override
@ -292,8 +319,25 @@ class LettuceAsyncClientTest extends AgentTestRunner {
then: then:
conds.await() conds.await()
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) { span(0) {
serviceName "redis" serviceName "redis"
operationName "redis.query" operationName "redis.query"
@ -433,9 +477,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
asyncCommands.setAutoFlushCommands(true)
} }
def "cancel command before it finishes"() { def "cancel command before it finishes"() {
@ -477,9 +518,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
asyncCommands.setAutoFlushCommands(true)
} }
def "debug segfault command (returns void) with no argument should produce span"() { def "debug segfault command (returns void) with no argument should produce span"() {
@ -506,11 +544,6 @@ class LettuceAsyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
if (!redisServer.active) {
redisServer.start()
}
} }
@ -538,10 +571,5 @@ class LettuceAsyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
if (!redisServer.active) {
redisServer.start()
}
} }
} }

View File

@ -3,6 +3,7 @@ import datadog.trace.agent.test.TestUtils
import io.lettuce.core.* import io.lettuce.core.*
import io.lettuce.core.api.StatefulConnection import io.lettuce.core.api.StatefulConnection
import io.lettuce.core.api.reactive.RedisReactiveCommands import io.lettuce.core.api.reactive.RedisReactiveCommands
import io.lettuce.core.api.sync.RedisCommands
import redis.embedded.RedisServer import redis.embedded.RedisServer
import spock.lang.Shared import spock.lang.Shared
import spock.util.concurrent.AsyncConditions import spock.util.concurrent.AsyncConditions
@ -18,40 +19,41 @@ class LettuceReactiveClientTest extends AgentTestRunner {
System.setProperty("dd.integration.lettuce.enabled", "true") System.setProperty("dd.integration.lettuce.enabled", "true")
} }
@Shared
public static final String HOST = "127.0.0.1" public static final String HOST = "127.0.0.1"
public static final int PORT = TestUtils.randomOpenPort() public static final int PORT = TestUtils.randomOpenPort()
public static final int DB_INDEX = 0 public static final int DB_INDEX = 0
@Shared
public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX
public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared @Shared
RedisServer redisServer = RedisServer.builder() RedisServer redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup // bind to localhost to avoid firewall popup
.setting("bind " + HOST) .setting("bind " + HOST)
// set max memory to avoid problems in CI // set max memory to avoid problems in CI
.setting("maxmemory 128M") .setting("maxmemory 128M")
.port(PORT).build() .port(PORT).build()
@Shared
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI) RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
@Shared
StatefulConnection connection StatefulConnection connection
@Shared RedisReactiveCommands<String, ?> reactiveCommands
RedisReactiveCommands<String, ?> reactiveCommands = null RedisCommands<String, ?> syncCommands
def setupSpec() { def setup() {
println "Using redis: $redisServer.args" println "Using redis: $redisServer.args"
redisServer.start() redisServer.start()
redisClient.setOptions(CLIENT_OPTIONS)
connection = redisClient.connect() connection = redisClient.connect()
reactiveCommands = connection.reactive() reactiveCommands = connection.reactive()
syncCommands = connection.sync()
TEST_WRITER.waitForTraces(1) TEST_WRITER.waitForTraces(1)
TEST_WRITER.clear() TEST_WRITER.clear()
} }
def cleanupSpec() { def cleanup() {
connection.close() connection.close()
redisServer.stop() redisServer.stop()
} }
@ -96,6 +98,7 @@ class LettuceReactiveClientTest extends AgentTestRunner {
def "get command with lambda function"() { def "get command with lambda function"() {
setup: setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions() def conds = new AsyncConditions()
when: when:
@ -103,8 +106,25 @@ class LettuceReactiveClientTest extends AgentTestRunner {
then: then:
conds.await() conds.await()
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) { span(0) {
serviceName "redis" serviceName "redis"
operationName "redis.query" operationName "redis.query"
@ -164,6 +184,7 @@ class LettuceReactiveClientTest extends AgentTestRunner {
def "command with no arguments"() { def "command with no arguments"() {
setup: setup:
syncCommands.set("TESTKEY", "TESTVAL")
def conds = new AsyncConditions() def conds = new AsyncConditions()
when: when:
@ -175,8 +196,25 @@ class LettuceReactiveClientTest extends AgentTestRunner {
then: then:
conds.await() conds.await()
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) { span(0) {
serviceName "redis" serviceName "redis"
operationName "redis.query" operationName "redis.query"
@ -287,11 +325,6 @@ class LettuceReactiveClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
if (!redisServer.active) {
redisServer.start()
}
} }
def "shutdown command (returns void) with argument should produce span"() { def "shutdown command (returns void) with argument should produce span"() {
@ -318,11 +351,6 @@ class LettuceReactiveClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
if (!redisServer.active) {
redisServer.start()
}
} }
} }

View File

@ -1,5 +1,6 @@
import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.agent.test.TestUtils import datadog.trace.agent.test.TestUtils
import io.lettuce.core.ClientOptions
import io.lettuce.core.RedisClient import io.lettuce.core.RedisClient
import io.lettuce.core.RedisConnectionException import io.lettuce.core.RedisConnectionException
import io.lettuce.core.api.StatefulConnection import io.lettuce.core.api.StatefulConnection
@ -18,35 +19,25 @@ class LettuceSyncClientTest extends AgentTestRunner {
System.setProperty("dd.integration.lettuce.enabled", "true") System.setProperty("dd.integration.lettuce.enabled", "true")
} }
@Shared
public static final String HOST = "127.0.0.1" public static final String HOST = "127.0.0.1"
public static final int PORT = TestUtils.randomOpenPort() public static final int PORT = TestUtils.randomOpenPort()
public static final int INCORRECT_PORT = TestUtils.randomOpenPort() public static final int INCORRECT_PORT = TestUtils.randomOpenPort()
public static final int DB_INDEX = 0 public static final int DB_INDEX = 0
@Shared
public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX public static final String DB_ADDR = HOST + ":" + PORT + "/" + DB_INDEX
@Shared
public static final String DB_ADDR_NON_EXISTENT = HOST + ":" + INCORRECT_PORT + "/" + DB_INDEX public static final String DB_ADDR_NON_EXISTENT = HOST + ":" + INCORRECT_PORT + "/" + DB_INDEX
@Shared
public static final String DB_URI_NON_EXISTENT = "redis://" + DB_ADDR_NON_EXISTENT public static final String DB_URI_NON_EXISTENT = "redis://" + DB_ADDR_NON_EXISTENT
public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR public static final String EMBEDDED_DB_URI = "redis://" + DB_ADDR
// Disable autoreconnect so we do not get stray traces popping up on server shutdown
public static final ClientOptions CLIENT_OPTIONS = ClientOptions.builder().autoReconnect(false).build()
@Shared @Shared
RedisServer redisServer = RedisServer.builder() RedisServer redisServer = RedisServer.builder()
// bind to localhost to avoid firewall popup // bind to localhost to avoid firewall popup
.setting("bind " + HOST) .setting("bind " + HOST)
// set max memory to avoid problems in CI // set max memory to avoid problems in CI
.setting("maxmemory 128M") .setting("maxmemory 128M")
.port(PORT).build() .port(PORT).build()
@Shared
RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
@Shared
StatefulConnection connection
@Shared
RedisCommands<String, ?> syncCommands = null
@Shared @Shared
Map<String, String> testHashMap = [ Map<String, String> testHashMap = [
firstname: "John", firstname: "John",
@ -54,7 +45,11 @@ class LettuceSyncClientTest extends AgentTestRunner {
age: "53" age: "53"
] ]
def setupSpec() { RedisClient redisClient = RedisClient.create(EMBEDDED_DB_URI)
StatefulConnection connection
RedisCommands<String, ?> syncCommands
def setup() {
redisServer.start() redisServer.start()
connection = redisClient.connect() connection = redisClient.connect()
syncCommands = connection.sync() syncCommands = connection.sync()
@ -62,7 +57,7 @@ class LettuceSyncClientTest extends AgentTestRunner {
TEST_WRITER.clear() TEST_WRITER.clear()
} }
def cleanupSpec() { def cleanup() {
connection.close() connection.close()
redisServer.stop() redisServer.stop()
} }
@ -70,9 +65,12 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "connect"() { def "connect"() {
setup: setup:
RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI) RedisClient testConnectionClient = RedisClient.create(EMBEDDED_DB_URI)
testConnectionClient.connect() testConnectionClient.setOptions(CLIENT_OPTIONS)
expect: when:
StatefulConnection connection = testConnectionClient.connect()
then:
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 1) {
trace(0, 1) { trace(0, 1) {
span(0) { span(0) {
@ -96,11 +94,15 @@ class LettuceSyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
connection.close()
} }
def "connect exception"() { def "connect exception"() {
setup: setup:
RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT) RedisClient testConnectionClient = RedisClient.create(DB_URI_NON_EXISTENT)
testConnectionClient.setOptions(CLIENT_OPTIONS)
when: when:
testConnectionClient.connect() testConnectionClient.connect()
@ -162,12 +164,30 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "get command"() { def "get command"() {
setup: setup:
syncCommands.set("TESTKEY", "TESTVAL")
String res = syncCommands.get("TESTKEY") String res = syncCommands.get("TESTKEY")
expect: expect:
res == "TESTVAL" res == "TESTVAL"
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) { span(0) {
serviceName "redis" serviceName "redis"
operationName "redis.query" operationName "redis.query"
@ -216,12 +236,30 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "command with no arguments"() { def "command with no arguments"() {
setup: setup:
syncCommands.set("TESTKEY", "TESTVAL")
def keyRetrieved = syncCommands.randomkey() def keyRetrieved = syncCommands.randomkey()
expect: expect:
keyRetrieved == "TESTKEY" keyRetrieved == "TESTKEY"
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "SET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) { span(0) {
serviceName "redis" serviceName "redis"
operationName "redis.query" operationName "redis.query"
@ -297,12 +335,30 @@ class LettuceSyncClientTest extends AgentTestRunner {
def "hash getall command"() { def "hash getall command"() {
setup: setup:
syncCommands.hmset("user", testHashMap)
Map<String, String> res = syncCommands.hgetall("user") Map<String, String> res = syncCommands.hgetall("user")
expect: expect:
res == testHashMap res == testHashMap
assertTraces(TEST_WRITER, 1) { assertTraces(TEST_WRITER, 2) {
trace(0, 1) { trace(0, 1) {
span(0) {
serviceName "redis"
operationName "redis.query"
spanType "redis"
resourceName "HMSET"
errored false
tags {
defaultTags()
"component" "redis-client"
"db.type" "redis"
"span.kind" "client"
"span.type" "redis"
}
}
}
trace(1, 1) {
span(0) { span(0) {
serviceName "redis" serviceName "redis"
operationName "redis.query" operationName "redis.query"
@ -346,11 +402,6 @@ class LettuceSyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
if (!redisServer.active) {
redisServer.start()
}
} }
def "shutdown command (returns void) should produce a span"() { def "shutdown command (returns void) should produce a span"() {
@ -377,10 +428,5 @@ class LettuceSyncClientTest extends AgentTestRunner {
} }
} }
} }
cleanup:
if (!redisServer.active) {
redisServer.start()
}
} }
} }