diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy index 51b248f10f..09708ac91f 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -8,10 +8,13 @@ package io.opentelemetry.instrumentation.rocketmq import base.BaseConf import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import java.util.concurrent.CompletableFuture +import java.util.concurrent.TimeUnit import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer import org.apache.rocketmq.client.producer.DefaultMQProducer import org.apache.rocketmq.client.producer.SendCallback import org.apache.rocketmq.client.producer.SendResult +import org.apache.rocketmq.client.producer.SendStatus import org.apache.rocketmq.common.message.Message import org.apache.rocketmq.remoting.common.RemotingHelper import spock.lang.Shared @@ -66,17 +69,27 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { BaseConf.deleteTempDir() } + def setup() { + tracingMessageListener.reset() + } + def "test rocketmq produce callback"() { + CompletableFuture result = new CompletableFuture<>() when: producer.send(msg, new SendCallback() { @Override void onSuccess(SendResult sendResult) { + result.complete(sendResult) } @Override void onException(Throwable throwable) { + result.completeExceptionally(throwable) } }) + result.get(10, TimeUnit.SECONDS).sendStatus == SendStatus.SEND_OK + // waiting longer than assertTraces below does on its own because of CI flakiness + tracingMessageListener.waitForMessages() then: assertTraces(1) { @@ -123,8 +136,11 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification { def "test rocketmq produce and consume"() { when: runWithSpan("parent") { - producer.send(msg) + SendResult sendResult = producer.send(msg) + assert sendResult.sendStatus == SendStatus.SEND_OK } + // waiting longer than assertTraces below does on its own because of CI flakiness + tracingMessageListener.waitForMessages() then: assertTraces(1) { diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy index daaaa53f94..21a67c509c 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/TracingMessageListener.groovy @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.rocketmq +import java.util.concurrent.TimeUnit import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly @@ -33,7 +34,7 @@ class TracingMessageListener implements MessageListenerOrderly { } void waitForMessages() { - messageReceived.await() + messageReceived.await(30, TimeUnit.SECONDS) } int getLastBatchSize() {