feat: add rocketmq binding (#554)

Spec details: a6978cf562/rocketmq-cloudevents-binding/rocketmq-transport-binding.md

Signed-off-by: Aaron Ai <yangkun.ayk@alibaba-inc.com>
This commit is contained in:
Aaron Ai 2023-05-23 16:49:08 +08:00 committed by GitHub
parent 4ebeab0e0f
commit 698cdf7ad4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 914 additions and 0 deletions

View File

@ -29,6 +29,7 @@
<module>spring-reactive</module>
<module>spring-rsocket</module>
<module>spring-function</module>
<module>rocketmq</module>
</modules>
</project>

View File

@ -0,0 +1,27 @@
# RocketMQ + CloudEvents Sample
This example demonstrates the integration of [RocketMQ 5.x client library](https://github.com/apache/rocketmq-clients)
with CloudEvents to create a RocketMQ binding.
## Building the Project
```shell
mvn package
```
## Setting Up a RocketMQ Instance
Follow the [quickstart guide](https://rocketmq.apache.org/docs/quick-start/01quickstart) on the official RocketMQ
website to set up the necessary components, including nameserver, proxy, and broker.
## Event Production
```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqProducer" -Dexec.args="foobar:8081 sample-topic"
```
## Event Consumption
```shell
mvn exec:java -Dexec.mainClass="io.cloudevents.examples.rocketmq.RocketmqConsumer" -Dexec.args="foobar:8081 sample-topic sample-consumer-group"
```

21
examples/rocketmq/pom.xml Normal file
View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloudevents-examples</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloudevents-rocketmq-example</artifactId>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-rocketmq</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,50 @@
package io.cloudevents.examples.rocketmq;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.rocketmq.RocketMqMessageFactory;
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
public class RocketmqConsumer {
private RocketmqConsumer() {
}
public static void main(String[] args) throws InterruptedException, ClientException, IOException {
if (args.length < 3) {
System.out.println("Usage: rocketmq_consumer <endpoints> <topic> <consumer_group>");
return;
}
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = args[0];
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
FilterExpression filterExpression = new FilterExpression();
String topic = args[1];
String consumerGroup = args[2];
// Create the RocketMQ Consumer.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
final MessageReader reader = RocketMqMessageFactory.createReader(messageView);
final CloudEvent event = reader.toEvent();
System.out.println("Received event=" + event + ", messageId=" + messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
// Block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// Close the push consumer when you don't need it anymore.
pushConsumer.close();
}
}

View File

@ -0,0 +1,64 @@
package io.cloudevents.examples.rocketmq;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.rocketmq.RocketMqMessageFactory;
import io.cloudevents.types.Time;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.shaded.com.google.gson.Gson;
public class RocketmqProducer {
private RocketmqProducer() {
}
public static void main(String[] args) throws ClientException, IOException {
if (args.length < 2) {
System.out.println("Usage: rocketmq_producer <endpoints> <topic>");
return;
}
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String endpoints = args[0];
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
String topic = args[1];
// Create the RocketMQ Producer.
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
final Gson gson = new Gson();
Map<String, String> payload = new HashMap<>();
payload.put("foo", "bar");
final CloudEvent event = new CloudEventBuilder()
.withId("client-id")
.withSource(URI.create("http://127.0.0.1/rocketmq-client"))
.withType("com.foobar")
.withTime(Time.parseTime("2022-11-09T21:47:12.032198+00:00"))
.withData(gson.toJson(payload).getBytes(StandardCharsets.UTF_8))
.build();
// Transform event into message.
final Message message = RocketMqMessageFactory.createWriter(topic).writeBinary(event);
try {
// Send the message.
final SendReceipt sendReceipt = producer.send(message);
System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());
} catch (Exception e) {
System.out.println("Failed to send message");
e.printStackTrace();
}
// Close the producer when you don't need it anymore.
producer.close();
}
}

View File

@ -80,6 +80,7 @@
<module>spring</module>
<module>sql</module>
<module>bom</module>
<module>rocketmq</module>
</modules>
<properties>

73
rocketmq/pom.xml Normal file
View File

@ -0,0 +1,73 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2018-Present The CloudEvents Authors
~ <p>
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~ <p>
~ http://www.apache.org/licenses/LICENSE-2.0
~ <p>
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>cloudevents-parent</artifactId>
<groupId>io.cloudevents</groupId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<artifactId>cloudevents-rocketmq</artifactId>
<name>CloudEvents - RocketMQ Binding</name>
<packaging>jar</packaging>
<properties>
<rocketmq.version>5.0.4</rocketmq.version>
<module-name>io.cloudevents.rocketmq</module-name>
</properties>
<dependencies>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-core</artifactId>
<classifier>tests</classifier>
<type>test-jar</type>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,80 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rocketmq;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.message.impl.GenericStructuredMessageReader;
import io.cloudevents.core.message.impl.MessageUtils;
import io.cloudevents.rw.CloudEventWriter;
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageView;
/**
* A factory class providing convenience methods for creating {@link MessageReader} and {@link MessageWriter} instances
* based on RocketMQ {@link MessageView} and {@link Message}.
*/
public class RocketMqMessageFactory {
private RocketMqMessageFactory() {
// prevent instantiation
}
/**
* Creates a {@link MessageReader} to read a RocketMQ {@link MessageView}.
*
* @param message The RocketMQ {@link MessageView} to read from.
* @return A {@link MessageReader} that can read the given {@link MessageView} to a {@link io.cloudevents.CloudEvent} representation.
*/
public static MessageReader createReader(final MessageView message) {
final ByteBuffer byteBuffer = message.getBody();
byte[] body = new byte[byteBuffer.remaining()];
byteBuffer.get(body);
final Map<String, String> properties = message.getProperties();
final String contentType = properties.get(RocketmqConstants.PROPERTY_CONTENT_TYPE);
return createReader(contentType, properties, body);
}
/**
* Creates a {@link MessageReader} using the content type, properties, and body of a RocketMQ {@link MessageView}.
*
* @param contentType The content type of the message payload.
* @param properties The properties of the RocketMQ message containing CloudEvent metadata (attributes and/or extensions).
* @param body The message body as byte array.
* @return A {@link MessageReader} capable of parsing a {@link io.cloudevents.CloudEvent} from the content-type, properties, and payload of a RocketMQ message.
*/
public static MessageReader createReader(final String contentType, final Map<String, String> properties, final byte[] body) {
return MessageUtils.parseStructuredOrBinaryMessage(
() -> contentType,
format -> new GenericStructuredMessageReader(format, body),
() -> properties.get(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION),
sv -> new RocketmqBinaryMessageReader(sv, properties, contentType, body)
);
}
/**
* Creates a {@link MessageWriter} instance capable of translating a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
*
* @param topic The topic to which the created RocketMQ message will be sent.
* @return A {@link MessageWriter} capable of converting a {@link io.cloudevents.CloudEvent} to a RocketMQ {@link Message}.
*/
public static MessageWriter<CloudEventWriter<Message>, Message> createWriter(final String topic) {
return new RocketmqMessageWriter(topic);
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rocketmq;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.data.BytesCloudEventData;
import io.cloudevents.core.message.impl.BaseGenericBinaryMessageReaderImpl;
import java.util.Arrays;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* A RocketMQ message reader that can be read as a <em>CloudEvent</em>.
*/
final class RocketmqBinaryMessageReader extends BaseGenericBinaryMessageReaderImpl<String, Object> {
private final String contentType;
private final Map<String, String> messageProperties;
/**
* Create an instance of an RocketMQ message reader.
*
* @param specVersion The version of the cloud event message.
* @param messageProperties The properties of the RocketMQ message that contains.
* @param contentType The content-type property of the RocketMQ message or {@code null} if the message content type if unknown.
* @param body The message payload or {@code null}/{@link RocketmqConstants#EMPTY_BODY} if the message does not contain any payload.
*/
RocketmqBinaryMessageReader(final SpecVersion specVersion, Map<String, String> messageProperties,
final String contentType, final byte[] body) {
super(specVersion, body != null && !Arrays.equals(RocketmqConstants.EMPTY_BODY, body) && body.length > 0 ? BytesCloudEventData.wrap(body) : null);
this.contentType = contentType;
this.messageProperties = messageProperties;
}
@Override
protected boolean isContentTypeHeader(String key) {
return key.equals(RocketmqConstants.PROPERTY_CONTENT_TYPE);
}
/**
* Tests whether the given property key belongs to cloud events headers.
*
* @param key The key to test for.
* @return True if the specified key belongs to cloud events headers.
*/
@Override
protected boolean isCloudEventsHeader(String key) {
final int prefixLength = RocketmqConstants.CE_PREFIX.length();
return key.length() > prefixLength && key.startsWith(RocketmqConstants.CE_PREFIX);
}
/**
* Transforms a RocketMQ message property key into a CloudEvents attribute or extension key.
* <p>
* This method removes the {@link RocketmqConstants#CE_PREFIX} prefix from the given key,
* assuming that the key has already been determined to be a CloudEvents header by
* {@link #isCloudEventsHeader(String)}.
*
* @param key The RocketMQ message property key with the CloudEvents header prefix.
* @return The CloudEvents attribute or extension key without the prefix.
*/
@Override
protected String toCloudEventsKey(String key) {
return key.substring(RocketmqConstants.CE_PREFIX.length());
}
@Override
protected void forEachHeader(BiConsumer<String, Object> fn) {
if (contentType != null) {
// visit the content-type message property
fn.accept(RocketmqConstants.PROPERTY_CONTENT_TYPE, contentType);
}
// visit message properties
messageProperties.forEach((k, v) -> {
if (k != null && v != null) {
fn.accept(k, v);
}
});
}
/**
* Gets the cloud event representation of the value.
* <p>
* This method simply returns the string representation of the type of value passed as argument.
*
* @param value The value of a CloudEvent attribute or extension.
* @return The string representation of the specified value.
*/
@Override
protected String toCloudEventsValue(Object value) {
return value.toString();
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rocketmq;
import io.cloudevents.core.message.impl.MessageUtils;
import java.util.Map;
/**
* Constants and methods used throughout the RocketMQ binding for cloud events.
*/
final class RocketmqConstants {
private RocketmqConstants() {
// prevent instantiation
}
static final byte[] EMPTY_BODY = new byte[] {(byte) '\0'};
/**
* The prefix name for CloudEvent attributes for use in properties of a RocketMQ message.
*/
static final String CE_PREFIX = "CE_";
static final Map<String, String> ATTRIBUTES_TO_PROPERTY_NAMES = MessageUtils.generateAttributesToHeadersMapping(CEA -> CE_PREFIX + CEA);
static final String PROPERTY_CONTENT_TYPE = "CE_contenttype";
static final String MESSAGE_PROPERTY_SPEC_VERSION = ATTRIBUTES_TO_PROPERTY_NAMES.get("specversion");
}

View File

@ -0,0 +1,98 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rocketmq;
import io.cloudevents.CloudEventData;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.rw.CloudEventContextWriter;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.rw.CloudEventWriter;
import java.util.HashMap;
import java.util.Map;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
/**
* The RocketmqMessageWriter class is a CloudEvents message writer for RocketMQ.
* It allows CloudEvents attributes, context attributes, and the event payload to be populated
* in a RocketMQ {@link Message} instance. This class implements the
* {@link MessageWriter} interface for creating and completing CloudEvents messages in a
* RocketMQ-compatible format.
*/
final class RocketmqMessageWriter implements MessageWriter<CloudEventWriter<Message>, Message>, CloudEventWriter<Message> {
private final Map<String, String> messageProperties;
private final MessageBuilder messageBuilder;
/**
* Create a RocketMQ message writer.
*
* @param topic message's topic.
*/
RocketmqMessageWriter(String topic) {
this.messageProperties = new HashMap<>();
final ClientServiceProvider provider = ClientServiceProvider.loadService();
this.messageBuilder = provider.newMessageBuilder();
messageBuilder.setTopic(topic);
}
@Override
public CloudEventContextWriter withContextAttribute(String name, String value) throws CloudEventRWException {
if (name.equals(CloudEventV1.DATACONTENTTYPE)) {
messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, value);
return this;
}
String propertyName = RocketmqConstants.ATTRIBUTES_TO_PROPERTY_NAMES.get(name);
if (propertyName == null) {
propertyName = name;
}
messageProperties.put(propertyName, value);
return this;
}
@Override
public CloudEventWriter<Message> create(SpecVersion version) throws CloudEventRWException {
messageProperties.put(RocketmqConstants.MESSAGE_PROPERTY_SPEC_VERSION, version.toString());
return this;
}
@Override
public Message setEvent(EventFormat format, byte[] value) throws CloudEventRWException {
messageProperties.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType());
messageBuilder.setBody(value);
messageProperties.forEach(messageBuilder::addProperty);
return messageBuilder.build();
}
@Override
public Message end(CloudEventData data) throws CloudEventRWException {
messageBuilder.setBody(data.toBytes());
messageProperties.forEach(messageBuilder::addProperty);
return messageBuilder.build();
}
@Override
public Message end() throws CloudEventRWException {
messageBuilder.setBody(RocketmqConstants.EMPTY_BODY);
messageProperties.forEach(messageBuilder::addProperty);
return messageBuilder.build();
}
}

View File

@ -0,0 +1,233 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rocketmq;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.message.Encoding;
import io.cloudevents.core.message.MessageReader;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.core.test.Data;
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.core.v1.CloudEventV1;
import io.cloudevents.types.Time;
import java.util.AbstractMap;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Tests verifying the behavior of the {@code RocketmqMessageFactory}.
*/
public class RocketmqMessageFactoryTest {
private static final String PREFIX_TEMPLATE = RocketmqConstants.CE_PREFIX + "%s";
private static final String DATA_CONTENT_TYPE_NULL = null;
private static final byte[] DATA_PAYLOAD_NULL = null;
@ParameterizedTest()
@MethodSource("binaryTestArguments")
public void readBinary(final Map<String, String> props, final String contentType, final byte[] body, final CloudEvent event) {
final MessageReader reader = RocketMqMessageFactory.createReader(contentType, props, body);
assertThat(reader.getEncoding()).isEqualTo(Encoding.BINARY);
assertThat(reader.toEvent()).isEqualTo(event);
}
@ParameterizedTest()
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void readStructured(final CloudEvent event) {
final String contentType = CSVFormat.INSTANCE.serializedContentType() + "; charset=utf8";
final byte[] contentPayload = CSVFormat.INSTANCE.serialize(event);
final MessageReader reader = RocketMqMessageFactory.createReader(contentType, null, contentPayload);
assertThat(reader.getEncoding()).isEqualTo(Encoding.STRUCTURED);
assertThat(reader.toEvent()).isEqualTo(event);
}
private static Stream<Arguments> binaryTestArguments() {
return Stream.of(
// V03
Arguments.of(
properties(
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property("ignored", "ignore")
),
DATA_CONTENT_TYPE_NULL,
DATA_PAYLOAD_NULL,
Data.V03_MIN
),
Arguments.of(
properties(
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignore")
),
Data.DATACONTENTTYPE_JSON,
Data.DATA_JSON_SERIALIZED,
Data.V03_WITH_JSON_DATA
),
Arguments.of(
properties(
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("astring", "aaa"),
property("aboolean", "true"),
property("anumber", "10"),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_JSON,
Data.DATA_JSON_SERIALIZED,
Data.V03_WITH_JSON_DATA_WITH_EXT_STRING
),
Arguments.of(
properties(
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_XML,
Data.DATA_XML_SERIALIZED,
Data.V03_WITH_XML_DATA
),
Arguments.of(
properties(
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
property(CloudEventV03.ID, Data.ID),
property(CloudEventV03.TYPE, Data.TYPE),
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
property(CloudEventV03.SUBJECT, Data.SUBJECT),
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_TEXT,
Data.DATA_TEXT_SERIALIZED,
Data.V03_WITH_TEXT_DATA
),
// V1
Arguments.of(
properties(
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property("ignored", "ignored")
),
DATA_CONTENT_TYPE_NULL,
DATA_PAYLOAD_NULL,
Data.V1_MIN
),
Arguments.of(
properties(
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_JSON,
Data.DATA_JSON_SERIALIZED,
Data.V1_WITH_JSON_DATA
),
Arguments.of(
properties(
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("astring", "aaa"),
property("aboolean", "true"),
property("anumber", "10"),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_JSON,
Data.DATA_JSON_SERIALIZED,
Data.V1_WITH_JSON_DATA_WITH_EXT_STRING
),
Arguments.of(
properties(
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_XML,
Data.DATA_XML_SERIALIZED,
Data.V1_WITH_XML_DATA
),
Arguments.of(
properties(
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
property(CloudEventV1.ID, Data.ID),
property(CloudEventV1.TYPE, Data.TYPE),
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
property(CloudEventV1.SUBJECT, Data.SUBJECT),
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
property("ignored", "ignored")
),
Data.DATACONTENTTYPE_TEXT,
Data.DATA_TEXT_SERIALIZED,
Data.V1_WITH_TEXT_DATA
)
);
}
private static AbstractMap.SimpleEntry<String, String> property(final String name, final String value) {
return name.equalsIgnoreCase("ignored") ?
new AbstractMap.SimpleEntry<>(name, value) :
new AbstractMap.SimpleEntry<>(String.format(PREFIX_TEMPLATE, name), value);
}
@SafeVarargs
private static Map<String, String> properties(final AbstractMap.SimpleEntry<String, String>... entries) {
return Stream.of(entries)
.collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue));
}
}

View File

@ -0,0 +1,117 @@
/*
* Copyright 2018-Present The CloudEvents Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.cloudevents.rocketmq;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.format.EventFormat;
import io.cloudevents.core.message.MessageWriter;
import io.cloudevents.core.mock.CSVFormat;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.assertj.core.api.Assertions.assertThat;
public class RocketmqMessageWriterTest {
/**
* Verifies that a binary CloudEvent message can be successfully represented
* as a RocketMQ message.
*/
@ParameterizedTest()
@MethodSource("io.cloudevents.core.test.Data#allEventsWithStringExtensions")
public void testWriteBinaryCloudEventToRocketmqRepresentation(final CloudEvent binaryEvent) {
String topic = "foobar";
final Message expectedMessage = translateBinaryEvent(topic, binaryEvent);
final MessageWriter<?, Message> writer = RocketMqMessageFactory.createWriter(topic);
final Message actualMessage = writer.writeBinary(binaryEvent);
assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody()));
assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties());
}
/**
* Verifies that a structured CloudEvent message (in CSV format) can be successfully represented
* as a RocketMQ message.
*/
@ParameterizedTest()
@MethodSource("io.cloudevents.core.test.Data#allEventsWithoutExtensions")
public void testWriteStructuredCloudEventToRocketmqRepresentation(final CloudEvent event) {
final EventFormat format = CSVFormat.INSTANCE;
final Message expectedMessage = translateStructured(event, format);
String topic = "foobar";
final MessageWriter<?, Message> writer = RocketMqMessageFactory.createWriter(topic);
final Message actualMessage = writer.writeStructured(event, format.serializedContentType());
assertThat(Objects.toString(actualMessage.getBody())).isEqualTo(Objects.toString(expectedMessage.getBody()));
assertThat(actualMessage.getProperties()).isEqualTo(expectedMessage.getProperties());
}
private Message translateBinaryEvent(final String topic, final CloudEvent event) {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
final MessageBuilder messageBuilder = provider.newMessageBuilder();
messageBuilder.setTopic(topic);
messageBuilder.setBody(RocketmqConstants.EMPTY_BODY);
final Map<String, String> map = new HashMap<>();
if (!event.getAttributeNames().isEmpty()) {
event.getAttributeNames().forEach(name -> {
if (name.equals(CloudEventV1.DATACONTENTTYPE) && event.getAttribute(name) != null) {
map.put(RocketmqConstants.PROPERTY_CONTENT_TYPE, event.getAttribute(name).toString());
} else {
addProperty(map, name, Objects.toString(event.getAttribute(name)), true);
}
});
}
if (!event.getExtensionNames().isEmpty()) {
event.getExtensionNames().forEach(name -> addProperty(map, name, Objects.toString(event.getExtension(name)), false));
}
map.forEach(messageBuilder::addProperty);
if (event.getData() != null) {
messageBuilder.setBody(event.getData().toBytes());
}
return messageBuilder.build();
}
private Message translateStructured(final CloudEvent event, final EventFormat format) {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
final MessageBuilder messageBuilder = provider.newMessageBuilder();
messageBuilder.setTopic("foobar");
messageBuilder.addProperty(RocketmqConstants.PROPERTY_CONTENT_TYPE, format.serializedContentType());
messageBuilder.setBody(format.serialize(event));
return messageBuilder.build();
}
private void addProperty(final Map<String, String> map, final String name, final String value, final boolean prefix) {
if (prefix) {
map.put(String.format(RocketmqConstants.CE_PREFIX + "%s", name), value);
} else {
map.put(name, value);
}
}
}