Fix integration test
This commit is contained in:
parent
c8bbce0549
commit
cddd368fc0
|
@ -16,158 +16,136 @@ import java.time.Duration
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
class DDApiIntegrationTest {
|
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
|
||||||
// Do not run tests locally on Java7 since testcontainers are not compatible with Java7
|
// It is fine to run on CI because CI provides agent externally, not through testcontainers
|
||||||
// It is fine to run on CI because CI provides rabbitmq externally, not through testcontainers
|
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
|
||||||
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
|
class DDApiIntegrationTest extends DDSpecification {
|
||||||
static class DDAgentApiIntegrationV4Test extends DDSpecification {
|
static final WRITER = new ListWriter()
|
||||||
static final WRITER = new ListWriter()
|
static final TRACER = DDTracer.builder().writer(WRITER).build()
|
||||||
static final TRACER = DDTracer.builder().writer(WRITER).build()
|
static final CONTEXT = new DDSpanContext(
|
||||||
static final CONTEXT = new DDSpanContext(
|
1G,
|
||||||
1G,
|
1G,
|
||||||
1G,
|
0G,
|
||||||
0G,
|
"fakeService",
|
||||||
"fakeService",
|
"fakeOperation",
|
||||||
"fakeOperation",
|
"fakeResource",
|
||||||
"fakeResource",
|
PrioritySampling.UNSET,
|
||||||
PrioritySampling.UNSET,
|
null,
|
||||||
null,
|
[:],
|
||||||
[:],
|
false,
|
||||||
false,
|
"fakeType",
|
||||||
"fakeType",
|
[:],
|
||||||
[:],
|
new PendingTrace(TRACER, 1G),
|
||||||
new PendingTrace(TRACER, 1G),
|
TRACER,
|
||||||
TRACER,
|
[:])
|
||||||
[:])
|
|
||||||
|
|
||||||
// Looks like okHttp needs to resolve this, even for connection over socket
|
// Looks like okHttp needs to resolve this, even for connection over socket
|
||||||
static final SOMEHOST = "datadoghq.com"
|
static final SOMEHOST = "datadoghq.com"
|
||||||
static final SOMEPORT = 123
|
static final SOMEPORT = 123
|
||||||
|
|
||||||
|
/*
|
||||||
|
Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because
|
||||||
|
'testcontainers' are built for Java 8 and Java 7 cannot load this class.
|
||||||
|
*/
|
||||||
|
@Shared
|
||||||
|
def agentContainer
|
||||||
|
@Shared
|
||||||
|
def agentContainerHost = "localhost"
|
||||||
|
@Shared
|
||||||
|
def agentContainerPort = 8126
|
||||||
|
@Shared
|
||||||
|
Process process
|
||||||
|
@Shared
|
||||||
|
File socketPath
|
||||||
|
|
||||||
|
def api
|
||||||
|
def unixDomainSocketApi
|
||||||
|
|
||||||
|
def endpoint = new AtomicReference<String>(null)
|
||||||
|
def agentResponse = new AtomicReference<Map<String, Map<String, Number>>>(null)
|
||||||
|
|
||||||
|
DDAgentResponseListener responseListener = { String receivedEndpoint, Map<String, Map<String, Number>> responseJson ->
|
||||||
|
endpoint.set(receivedEndpoint)
|
||||||
|
agentResponse.set(responseJson)
|
||||||
|
}
|
||||||
|
|
||||||
|
def setupSpec() {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Note: type here has to stay undefined, otherwise tests will fail in CI in Java 7 because
|
CI will provide us with agent container running along side our build.
|
||||||
'testcontainers' are built for Java 8 and Java 7 cannot load this class.
|
When building locally, however, we need to take matters into our own hands
|
||||||
*/
|
and we use 'testcontainers' for this.
|
||||||
@Shared
|
*/
|
||||||
def agentContainer
|
if ("true" != System.getenv("CI")) {
|
||||||
@Shared
|
agentContainer = new GenericContainer("datadog/docker-dd-agent:latest")
|
||||||
def agentContainerHost = "localhost"
|
.withEnv(["DD_APM_ENABLED": "true",
|
||||||
@Shared
|
"DD_BIND_HOST" : "0.0.0.0",
|
||||||
def agentContainerPort = 8126
|
"DD_API_KEY" : "invalid_key_but_this_is_fine",
|
||||||
@Shared
|
"DD_LOGS_STDOUT": "yes"])
|
||||||
Process process
|
.withExposedPorts(datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT)
|
||||||
@Shared
|
.withStartupTimeout(Duration.ofSeconds(120))
|
||||||
File socketPath
|
// Apparently we need to sleep for a bit so agent's response `{"service:,env:":1}` in rate_by_service.
|
||||||
|
// This is clearly a race-condition and maybe we should av oid verifying complete response
|
||||||
def api
|
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)))
|
||||||
def unixDomainSocketApi
|
// .withLogConsumer { output ->
|
||||||
|
// print output.utf8String
|
||||||
def endpoint = new AtomicReference<String>(null)
|
// }
|
||||||
def agentResponse = new AtomicReference<Map<String, Map<String, Number>>>(null)
|
agentContainer.start()
|
||||||
|
agentContainerHost = agentContainer.containerIpAddress
|
||||||
DDAgentResponseListener responseListener = { String receivedEndpoint, Map<String, Map<String, Number>> responseJson ->
|
agentContainerPort = agentContainer.getMappedPort(datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT)
|
||||||
endpoint.set(receivedEndpoint)
|
|
||||||
agentResponse.set(responseJson)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
def setupSpec() {
|
File tmpDir = File.createTempDir()
|
||||||
|
tmpDir.deleteOnExit()
|
||||||
|
socketPath = new File(tmpDir, "socket")
|
||||||
|
process = Runtime.getRuntime().exec("socat UNIX-LISEN:${socketPath},reuseaddr,fork TCP-CONNECT:${agentContainerHost}:${agentContainerPort}")
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
def cleanupSpec() {
|
||||||
CI will provide us with rabbitmq container running along side our build.
|
if (agentContainer) {
|
||||||
When building locally, however, we need to take matters into our own hands
|
agentContainer.stop()
|
||||||
and we use 'testcontainers' for this.
|
|
||||||
*/
|
|
||||||
if ("true" != System.getenv("CI")) {
|
|
||||||
agentContainer = new GenericContainer("datadog/docker-dd-agent:latest")
|
|
||||||
.withEnv(["DD_APM_ENABLED": "true",
|
|
||||||
"DD_BIND_HOST" : "0.0.0.0",
|
|
||||||
"DD_API_KEY" : "invalid_key_but_this_is_fine",
|
|
||||||
"DD_LOGS_STDOUT": "yes"])
|
|
||||||
.withExposedPorts(datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT)
|
|
||||||
.withStartupTimeout(Duration.ofSeconds(120))
|
|
||||||
// Apparently we need to sleep for a bit so agent's response `{"service:,env:":1}` in rate_by_service.
|
|
||||||
// This is clearly a race-condition and maybe we should av oid verifying complete response
|
|
||||||
.withStartupCheckStrategy(new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)))
|
|
||||||
// .withLogConsumer { output ->
|
|
||||||
// print output.utf8String
|
|
||||||
// }
|
|
||||||
agentContainer.start()
|
|
||||||
agentContainerHost = agentContainer.containerIpAddress
|
|
||||||
agentContainerPort = agentContainer.getMappedPort(datadog.trace.api.Config.DEFAULT_TRACE_AGENT_PORT)
|
|
||||||
}
|
|
||||||
|
|
||||||
File tmpDir = File.createTempDir()
|
|
||||||
tmpDir.deleteOnExit()
|
|
||||||
socketPath = new File(tmpDir, "socket")
|
|
||||||
process = Runtime.getRuntime().exec("socat UNIX-LISTEN:${socketPath},reuseaddr,fork TCP-CONNECT:${agentContainerHost}:${agentContainerPort}")
|
|
||||||
}
|
}
|
||||||
|
process.destroy()
|
||||||
|
}
|
||||||
|
|
||||||
def cleanupSpec() {
|
def setup() {
|
||||||
if (agentContainer) {
|
api = new DDAgentApi(agentContainerHost, agentContainerPort, null)
|
||||||
agentContainer.stop()
|
api.addResponseListener(responseListener)
|
||||||
}
|
|
||||||
process.destroy()
|
|
||||||
}
|
|
||||||
|
|
||||||
def setup() {
|
unixDomainSocketApi = new DDAgentApi(SOMEHOST, SOMEPORT, socketPath.toString())
|
||||||
api = new DDAgentApi(agentContainerHost, agentContainerPort, v4(), null)
|
unixDomainSocketApi.addResponseListener(responseListener)
|
||||||
api.addResponseListener(responseListener)
|
}
|
||||||
|
|
||||||
unixDomainSocketApi = new DDAgentApi(SOMEHOST, SOMEPORT, v4(), socketPath.toString())
|
def "Sending traces succeeds (test #test)"() {
|
||||||
unixDomainSocketApi.addResponseListener(responseListener)
|
expect:
|
||||||
}
|
api.sendTraces(traces)
|
||||||
|
assert endpoint.get() == "http://${agentContainerHost}:${agentContainerPort}/v0.4/traces"
|
||||||
|
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
|
||||||
|
|
||||||
boolean v4() {
|
where:
|
||||||
return true
|
traces | test
|
||||||
}
|
[] | 1
|
||||||
|
[[], []] | 2
|
||||||
def "Sending traces succeeds (test #test)"() {
|
[[new DDSpan(1, CONTEXT)]] | 3
|
||||||
expect:
|
[[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4
|
||||||
api.sendTraces(traces)
|
(1..15).collect { [] } | 5
|
||||||
if (v4()) {
|
(1..16).collect { [] } | 6
|
||||||
assert endpoint.get() == "http://${agentContainerHost}:${agentContainerPort}/v0.4/traces"
|
// Larger traces take more than 1 second to send to the agent and get a timeout exception:
|
||||||
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
|
|
||||||
}
|
|
||||||
|
|
||||||
where:
|
|
||||||
traces | test
|
|
||||||
[] | 1
|
|
||||||
[[], []] | 2
|
|
||||||
[[new DDSpan(1, CONTEXT)]] | 3
|
|
||||||
[[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4
|
|
||||||
(1..15).collect { [] } | 5
|
|
||||||
(1..16).collect { [] } | 6
|
|
||||||
// Larger traces take more than 1 second to send to the agent and get a timeout exception:
|
|
||||||
// (1..((1 << 16) - 1)).collect { [] } | 7
|
// (1..((1 << 16) - 1)).collect { [] } | 7
|
||||||
// (1..(1 << 16)).collect { [] } | 8
|
// (1..(1 << 16)).collect { [] } | 8
|
||||||
}
|
|
||||||
|
|
||||||
def "Sending traces to unix domain socket succeeds (test #test)"() {
|
|
||||||
expect:
|
|
||||||
unixDomainSocketApi.sendTraces(traces)
|
|
||||||
if (v4()) {
|
|
||||||
assert endpoint.get() == "http://${SOMEHOST}:${SOMEPORT}/v0.4/traces"
|
|
||||||
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
|
|
||||||
}
|
|
||||||
|
|
||||||
where:
|
|
||||||
traces | test
|
|
||||||
[] | 1
|
|
||||||
[[], []] | 2
|
|
||||||
[[new DDSpan(1, CONTEXT)]] | 3
|
|
||||||
[[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Requires({ "true" == System.getenv("CI") || jvm.java8Compatible })
|
def "Sending traces to unix domain socket succeeds (test #test)"() {
|
||||||
static class DDAgentApiIntegrationV3Test extends DDAgentApiIntegrationV4Test {
|
expect:
|
||||||
boolean v4() {
|
unixDomainSocketApi.sendTraces(traces)
|
||||||
return false
|
assert endpoint.get() == "http://${SOMEHOST}:${SOMEPORT}/v0.4/traces"
|
||||||
}
|
assert agentResponse.get() == [rate_by_service: ["service:,env:": 1]]
|
||||||
|
|
||||||
def cleanup() {
|
where:
|
||||||
assert endpoint.get() == null
|
traces | test
|
||||||
assert agentResponse.get() == null
|
[] | 1
|
||||||
}
|
[[], []] | 2
|
||||||
|
[[new DDSpan(1, CONTEXT)]] | 3
|
||||||
|
[[new DDSpan(TimeUnit.MILLISECONDS.toMicros(System.currentTimeMillis()), CONTEXT)]] | 4
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue