Flaky spring kafka batch test (#5740)
* Flaky spring kafka batch test * Update instrumentation/spring/spring-kafka-2.7/javaagent/src/test/groovy/SpringKafkaInstrumentationTest.groovy Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com> Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>
This commit is contained in:
parent
00b6e6e043
commit
2b7fe695b9
|
@ -8,6 +8,9 @@ import io.opentelemetry.instrumentation.testing.GlobalTraceUtil
|
|||
import io.opentelemetry.sdk.trace.data.SpanData
|
||||
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
|
||||
import java.time.Duration
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import org.apache.kafka.clients.admin.NewTopic
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord
|
||||
import org.springframework.boot.SpringApplication
|
||||
|
@ -64,12 +67,28 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
|
|||
def kafkaTemplate = applicationContext.getBean("kafkaTemplate", KafkaTemplate)
|
||||
|
||||
when:
|
||||
runWithSpan("producer") {
|
||||
// wrapping in a transaction is needed to remove the possibility of messages being picked up separately by the consumer
|
||||
kafkaTemplate.executeInTransaction({ ops ->
|
||||
ops.send("testTopic", "10", "testSpan1")
|
||||
ops.send("testTopic", "20", "testSpan2")
|
||||
})
|
||||
// This test assumes that messages are sent and received as a batch. Occasionally it happens
|
||||
// that the messages are not received as a batch, but one by one. This doesn't match what the
|
||||
// assertion expects. To reduce flakiness we retry the test when messages weren't received as
|
||||
// a batch.
|
||||
def maxAttempts = 5
|
||||
for (i in 1..maxAttempts) {
|
||||
Listener.reset()
|
||||
|
||||
runWithSpan("producer") {
|
||||
kafkaTemplate.executeInTransaction({ ops ->
|
||||
ops.send("testTopic", "10", "testSpan1")
|
||||
ops.send("testTopic", "20", "testSpan2")
|
||||
})
|
||||
}
|
||||
|
||||
Listener.waitForMessages()
|
||||
if (Listener.getLastBatchSize() == 2) {
|
||||
break
|
||||
} else if (i < maxAttempts) {
|
||||
ignoreTracesAndClear(2)
|
||||
System.err.println("Messages weren't received as batch, retrying")
|
||||
}
|
||||
}
|
||||
|
||||
then:
|
||||
|
@ -242,9 +261,16 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
|
|||
}
|
||||
|
||||
static class Listener {
|
||||
static AtomicInteger lastBatchSize = new AtomicInteger()
|
||||
static CountDownLatch messageReceived = new CountDownLatch(2)
|
||||
|
||||
@KafkaListener(id = "testListener", topics = "testTopic", containerFactory = "batchFactory")
|
||||
void listener(List<ConsumerRecord<String, String>> records) {
|
||||
lastBatchSize.set(records.size())
|
||||
records.size().times {
|
||||
messageReceived.countDown()
|
||||
}
|
||||
|
||||
GlobalTraceUtil.runWithSpan("consumer") {}
|
||||
records.forEach({ record ->
|
||||
if (record.value() == "error") {
|
||||
|
@ -252,5 +278,18 @@ class SpringKafkaInstrumentationTest extends AgentInstrumentationSpecification {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
static void reset() {
|
||||
messageReceived = new CountDownLatch(2)
|
||||
lastBatchSize.set(0)
|
||||
}
|
||||
|
||||
static void waitForMessages() {
|
||||
messageReceived.await(30, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
static int getLastBatchSize() {
|
||||
return lastBatchSize.get()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue