Fix support rocketmq client test (#2610)
* Fix test * Fix test * Fix test * Fix test
This commit is contained in:
parent
1a18841fdf
commit
8e2c86fe3b
|
@ -9,9 +9,7 @@ import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientTest
|
||||||
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
import io.opentelemetry.instrumentation.test.AgentTestTrait
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
|
||||||
import org.apache.rocketmq.client.producer.DefaultMQProducer
|
import org.apache.rocketmq.client.producer.DefaultMQProducer
|
||||||
import spock.lang.Ignore
|
|
||||||
|
|
||||||
@Ignore("https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2586")
|
|
||||||
class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {
|
class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -10,9 +10,7 @@ import io.opentelemetry.instrumentation.api.config.Config
|
||||||
import io.opentelemetry.instrumentation.test.LibraryTestTrait
|
import io.opentelemetry.instrumentation.test.LibraryTestTrait
|
||||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
|
||||||
import org.apache.rocketmq.client.producer.DefaultMQProducer
|
import org.apache.rocketmq.client.producer.DefaultMQProducer
|
||||||
import spock.lang.Ignore
|
|
||||||
|
|
||||||
@Ignore("https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2586")
|
|
||||||
class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTestTrait {
|
class RocketMqClientTest extends AbstractRocketMqClientTest implements LibraryTestTrait {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -31,9 +31,6 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
|
||||||
@Shared
|
@Shared
|
||||||
DefaultMQPushConsumer consumer
|
DefaultMQPushConsumer consumer
|
||||||
|
|
||||||
@Shared
|
|
||||||
DefaultMQPushConsumer batchConsumer
|
|
||||||
|
|
||||||
@Shared
|
@Shared
|
||||||
def sharedTopic
|
def sharedTopic
|
||||||
|
|
||||||
|
@ -48,14 +45,25 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
|
||||||
abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer)
|
abstract void configureMQPushConsumer(DefaultMQPushConsumer consumer)
|
||||||
|
|
||||||
def setupSpec() {
|
def setupSpec() {
|
||||||
|
sharedTopic = BaseConf.initTopic()
|
||||||
|
msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
|
||||||
|
Message msg1 = new Message(sharedTopic, "TagA", ("hello world a").getBytes())
|
||||||
|
Message msg2 = new Message(sharedTopic, "TagB", ("hello world b").getBytes())
|
||||||
|
msgs.add(msg1)
|
||||||
|
msgs.add(msg2)
|
||||||
producer = BaseConf.getProducer(BaseConf.nsAddr)
|
producer = BaseConf.getProducer(BaseConf.nsAddr)
|
||||||
configureMQProducer(producer)
|
configureMQProducer(producer)
|
||||||
|
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener())
|
||||||
|
configureMQPushConsumer(consumer)
|
||||||
|
}
|
||||||
|
|
||||||
|
def cleanupSpec() {
|
||||||
|
producer.shutdown()
|
||||||
|
consumer.shutdown()
|
||||||
|
BaseConf.deleteTempDir()
|
||||||
}
|
}
|
||||||
|
|
||||||
def "test rocketmq produce callback"() {
|
def "test rocketmq produce callback"() {
|
||||||
setup:
|
|
||||||
sharedTopic = BaseConf.initTopic()
|
|
||||||
msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
|
|
||||||
when:
|
when:
|
||||||
producer.send(msg, new SendCallback() {
|
producer.send(msg, new SendCallback() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -87,11 +95,6 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
|
||||||
}
|
}
|
||||||
|
|
||||||
def "test rocketmq produce and consume"() {
|
def "test rocketmq produce and consume"() {
|
||||||
setup:
|
|
||||||
sharedTopic = BaseConf.initTopic()
|
|
||||||
msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
|
|
||||||
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener())
|
|
||||||
configureMQPushConsumer(consumer)
|
|
||||||
when:
|
when:
|
||||||
runUnderTrace("parent") {
|
runUnderTrace("parent") {
|
||||||
producer.send(msg)
|
producer.send(msg)
|
||||||
|
@ -135,14 +138,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
|
||||||
|
|
||||||
def "test rocketmq produce and batch consume"() {
|
def "test rocketmq produce and batch consume"() {
|
||||||
setup:
|
setup:
|
||||||
sharedTopic = BaseConf.initTopic()
|
consumer.setConsumeMessageBatchMaxSize(2)
|
||||||
Message msg1 = new Message(sharedTopic, "TagA", ("hello world a").getBytes())
|
|
||||||
Message msg2 = new Message(sharedTopic, "TagB", ("hello world b").getBytes())
|
|
||||||
msgs.add(msg1)
|
|
||||||
msgs.add(msg2)
|
|
||||||
batchConsumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener())
|
|
||||||
batchConsumer.setConsumeMessageBatchMaxSize(2)
|
|
||||||
configureMQPushConsumer(batchConsumer)
|
|
||||||
when:
|
when:
|
||||||
runUnderTrace("parent") {
|
runUnderTrace("parent") {
|
||||||
producer.send(msgs)
|
producer.send(msgs)
|
||||||
|
|
|
@ -23,17 +23,17 @@ public final class BaseConf {
|
||||||
protected static String broker1Name;
|
protected static String broker1Name;
|
||||||
protected static final String clusterName;
|
protected static final String clusterName;
|
||||||
protected static final NamesrvController namesrvController;
|
protected static final NamesrvController namesrvController;
|
||||||
protected static final BrokerController brokerController1;
|
protected static final BrokerController brokerController;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
System.setProperty(
|
System.setProperty(
|
||||||
RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
|
RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
|
||||||
namesrvController = IntegrationTestBase.createAndStartNamesrv();
|
namesrvController = IntegrationTestBase.createAndStartNamesrv();
|
||||||
nsAddr = "localhost:" + namesrvController.getNettyServerConfig().getListenPort();
|
nsAddr = "localhost:" + namesrvController.getNettyServerConfig().getListenPort();
|
||||||
brokerController1 = IntegrationTestBase.createAndStartBroker(nsAddr);
|
brokerController = IntegrationTestBase.createAndStartBroker(nsAddr);
|
||||||
clusterName = brokerController1.getBrokerConfig().getBrokerClusterName();
|
clusterName = brokerController.getBrokerConfig().getBrokerClusterName();
|
||||||
broker1Name = brokerController1.getBrokerConfig().getBrokerName();
|
broker1Name = brokerController.getBrokerConfig().getBrokerName();
|
||||||
broker1Addr = "localhost:" + brokerController1.getNettyServerConfig().getListenPort();
|
broker1Addr = "localhost:" + brokerController.getNettyServerConfig().getListenPort();
|
||||||
}
|
}
|
||||||
|
|
||||||
private BaseConf() {}
|
private BaseConf() {}
|
||||||
|
@ -64,7 +64,8 @@ public final class BaseConf {
|
||||||
return producer;
|
return producer;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void deleteTempDir() {
|
public static void deleteTempDir() {
|
||||||
|
namesrvController.shutdown();
|
||||||
IntegrationTestBase.deleteTempDir();
|
IntegrationTestBase.deleteTempDir();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue