Wait for rocketmq message to arrive before asserting spans (#5591)

* Wait for rocketmq message to arrive before asserting spans

* Apply suggestions from code review

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>

* trigger build

Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
Lauri Tulmin 2022-03-17 12:38:58 +02:00 committed by GitHub
parent 1ee60aa6e6
commit 649779450b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 2 deletions

View File

@ -8,10 +8,13 @@ package io.opentelemetry.instrumentation.rocketmq
import base.BaseConf import base.BaseConf
import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.util.concurrent.CompletableFuture
import java.util.concurrent.TimeUnit
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.client.producer.SendCallback import org.apache.rocketmq.client.producer.SendCallback
import org.apache.rocketmq.client.producer.SendResult import org.apache.rocketmq.client.producer.SendResult
import org.apache.rocketmq.client.producer.SendStatus
import org.apache.rocketmq.common.message.Message import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper import org.apache.rocketmq.remoting.common.RemotingHelper
import spock.lang.Shared import spock.lang.Shared
@ -66,17 +69,27 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
BaseConf.deleteTempDir() BaseConf.deleteTempDir()
} }
def setup() {
tracingMessageListener.reset()
}
def "test rocketmq produce callback"() { def "test rocketmq produce callback"() {
CompletableFuture<SendResult> result = new CompletableFuture<>()
when: when:
producer.send(msg, new SendCallback() { producer.send(msg, new SendCallback() {
@Override @Override
void onSuccess(SendResult sendResult) { void onSuccess(SendResult sendResult) {
result.complete(sendResult)
} }
@Override @Override
void onException(Throwable throwable) { void onException(Throwable throwable) {
result.completeExceptionally(throwable)
} }
}) })
result.get(10, TimeUnit.SECONDS).sendStatus == SendStatus.SEND_OK
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()
then: then:
assertTraces(1) { assertTraces(1) {
@ -123,8 +136,11 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
def "test rocketmq produce and consume"() { def "test rocketmq produce and consume"() {
when: when:
runWithSpan("parent") { runWithSpan("parent") {
producer.send(msg) SendResult sendResult = producer.send(msg)
assert sendResult.sendStatus == SendStatus.SEND_OK
} }
// waiting longer than assertTraces below does on its own because of CI flakiness
tracingMessageListener.waitForMessages()
then: then:
assertTraces(1) { assertTraces(1) {

View File

@ -5,6 +5,7 @@
package io.opentelemetry.instrumentation.rocketmq package io.opentelemetry.instrumentation.rocketmq
import java.util.concurrent.TimeUnit
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
@ -33,7 +34,7 @@ class TracingMessageListener implements MessageListenerOrderly {
} }
void waitForMessages() { void waitForMessages() {
messageReceived.await() messageReceived.await(30, TimeUnit.SECONDS)
} }
int getLastBatchSize() { int getLastBatchSize() {