Retry rocketmq batch test when it fails (#4922)
This commit is contained in:
parent
d7756f1f2b
commit
08ba196729
|
@ -40,6 +40,9 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
|
||||||
@Shared
|
@Shared
|
||||||
def msgs = new ArrayList<Message>()
|
def msgs = new ArrayList<Message>()
|
||||||
|
|
||||||
|
@Shared
|
||||||
|
TracingMessageListener tracingMessageListener = new TracingMessageListener()
|
||||||
|
|
||||||
abstract void configureMQProducer(DefaultMQProducer producer)
|
abstract void configureMQProducer(DefaultMQProducer producer)
|
||||||
|
|
||||||
abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer)
|
abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer)
|
||||||
|
@ -53,7 +56,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
|
||||||
msgs.add(msg2)
|
msgs.add(msg2)
|
||||||
producer = BaseConf.getProducer(BaseConf.nsAddr)
|
producer = BaseConf.getProducer(BaseConf.nsAddr)
|
||||||
configureMQProducer(producer)
|
configureMQProducer(producer)
|
||||||
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new TracingMessageListener())
|
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", tracingMessageListener)
|
||||||
configureMQPushConsumer(consumer)
|
configureMQPushConsumer(consumer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,8 +178,26 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
|
||||||
consumer.setConsumeMessageBatchMaxSize(2)
|
consumer.setConsumeMessageBatchMaxSize(2)
|
||||||
|
|
||||||
when:
|
when:
|
||||||
runWithSpan("parent") {
|
// This test assumes that messages are sent and received as a batch. Occasionally it happens
|
||||||
producer.send(msgs)
|
// that the messages are not received as a batch, but one by one. This doesn't match what the
|
||||||
|
// assertion expects. To reduce flakiness we retry the test when messages weren't received as
|
||||||
|
// a batch.
|
||||||
|
def maxAttempts = 5
|
||||||
|
for (i in 1..maxAttempts) {
|
||||||
|
tracingMessageListener.reset()
|
||||||
|
|
||||||
|
runWithSpan("parent") {
|
||||||
|
producer.send(msgs)
|
||||||
|
}
|
||||||
|
|
||||||
|
tracingMessageListener.waitForMessages()
|
||||||
|
if (tracingMessageListener.getLastBatchSize() == 2) {
|
||||||
|
break
|
||||||
|
} else if (i < maxAttempts) {
|
||||||
|
// if messages weren't received as a batch we get 1 trace instead of 2
|
||||||
|
ignoreTracesAndClear(1)
|
||||||
|
System.err.println("Messages weren't received as batch, retrying")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
then:
|
then:
|
||||||
|
|
|
@ -5,6 +5,8 @@
|
||||||
|
|
||||||
package io.opentelemetry.instrumentation.rocketmq
|
package io.opentelemetry.instrumentation.rocketmq
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
|
||||||
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
|
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
|
||||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
|
||||||
|
@ -13,9 +15,27 @@ import org.apache.rocketmq.common.message.MessageExt
|
||||||
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
|
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
|
||||||
|
|
||||||
class TracingMessageListener implements MessageListenerOrderly {
|
class TracingMessageListener implements MessageListenerOrderly {
|
||||||
|
private AtomicInteger lastBatchSize = new AtomicInteger()
|
||||||
|
private CountDownLatch messageReceived = new CountDownLatch(1)
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
|
ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
|
||||||
|
lastBatchSize.set(list.size())
|
||||||
|
messageReceived.countDown()
|
||||||
runInternalSpan("messageListener")
|
runInternalSpan("messageListener")
|
||||||
return ConsumeOrderlyStatus.SUCCESS
|
return ConsumeOrderlyStatus.SUCCESS
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void reset() {
|
||||||
|
messageReceived = new CountDownLatch(1)
|
||||||
|
lastBatchSize.set(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
void waitForMessages() {
|
||||||
|
messageReceived.await()
|
||||||
|
}
|
||||||
|
|
||||||
|
int getLastBatchSize() {
|
||||||
|
return lastBatchSize.get()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue