Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>
This commit is contained in:
parent
7f65c92dec
commit
2d68c4843f
|
@ -28,6 +28,7 @@ import io.cloudevents.CloudEventData;
|
|||
import io.cloudevents.SpecVersion;
|
||||
import io.cloudevents.core.format.EventFormat;
|
||||
import io.cloudevents.core.message.MessageWriter;
|
||||
import io.cloudevents.core.v1.CloudEventV1;
|
||||
import io.cloudevents.rw.CloudEventAttributesWriter;
|
||||
import io.cloudevents.rw.CloudEventExtensionsWriter;
|
||||
import io.cloudevents.rw.CloudEventRWException;
|
||||
|
@ -53,7 +54,7 @@ public final class ProtonAmqpMessageWriter<R> implements MessageWriter<CloudEven
|
|||
|
||||
@Override
|
||||
public CloudEventAttributesWriter withAttribute(final String name, final String value) throws CloudEventRWException {
|
||||
if (name.equals("datacontenttype")) {
|
||||
if (name.equals(CloudEventV1.DATACONTENTTYPE)) {
|
||||
message.setContentType(value);
|
||||
} else {
|
||||
if (applicationProperties == null) {
|
||||
|
|
|
@ -31,10 +31,13 @@ import org.junit.jupiter.params.provider.MethodSource;
|
|||
|
||||
import io.cloudevents.CloudEvent;
|
||||
import io.cloudevents.SpecVersion;
|
||||
import io.cloudevents.amqp.impl.AmqpConstants;
|
||||
import io.cloudevents.core.message.Encoding;
|
||||
import io.cloudevents.core.message.MessageReader;
|
||||
import io.cloudevents.core.mock.CSVFormat;
|
||||
import io.cloudevents.core.test.Data;
|
||||
import io.cloudevents.core.v03.CloudEventV03;
|
||||
import io.cloudevents.core.v1.CloudEventV1;
|
||||
import io.cloudevents.types.Time;
|
||||
|
||||
/**
|
||||
|
@ -42,7 +45,7 @@ import io.cloudevents.types.Time;
|
|||
*/
|
||||
public class ProtonAmqpMessageFactoryTest {
|
||||
|
||||
private static final String PREFIX_TEMPLATE = "cloudEvents:%s";
|
||||
private static final String PREFIX_TEMPLATE = AmqpConstants.CE_PREFIX + "%s";
|
||||
private static final String DATACONTENTTYPE_NULL = null;
|
||||
private static final byte[] DATAPAYLOAD_NULL = null;
|
||||
|
||||
|
@ -72,10 +75,10 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
// V03
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V03.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE),
|
||||
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
|
||||
property(CloudEventV03.ID, Data.ID),
|
||||
property(CloudEventV03.TYPE, Data.TYPE),
|
||||
property(CloudEventV03.SOURCE, Data.SOURCE),
|
||||
property("ignored", "ignore")
|
||||
),
|
||||
DATACONTENTTYPE_NULL,
|
||||
|
@ -84,13 +87,13 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V03.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("schemaurl", Data.DATASCHEMA.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
|
||||
property(CloudEventV03.ID, Data.ID),
|
||||
property(CloudEventV03.TYPE, Data.TYPE),
|
||||
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
|
||||
property(CloudEventV03.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
|
||||
property("ignored", "ignore")
|
||||
),
|
||||
Data.DATACONTENTTYPE_JSON,
|
||||
|
@ -99,13 +102,13 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V03.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("schemaurl", Data.DATASCHEMA.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
|
||||
property(CloudEventV03.ID, Data.ID),
|
||||
property(CloudEventV03.TYPE, Data.TYPE),
|
||||
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV03.SCHEMAURL, Data.DATASCHEMA.toString()),
|
||||
property(CloudEventV03.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
|
||||
property("astring", "aaa"),
|
||||
property("aboolean", "true"),
|
||||
property("anumber", "10"),
|
||||
|
@ -117,12 +120,12 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V03.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
|
||||
property(CloudEventV03.ID, Data.ID),
|
||||
property(CloudEventV03.TYPE, Data.TYPE),
|
||||
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV03.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
|
||||
property("ignored", "ignored")
|
||||
),
|
||||
Data.DATACONTENTTYPE_XML,
|
||||
|
@ -131,12 +134,12 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V03.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV03.SPECVERSION, SpecVersion.V03.toString()),
|
||||
property(CloudEventV03.ID, Data.ID),
|
||||
property(CloudEventV03.TYPE, Data.TYPE),
|
||||
property(CloudEventV03.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV03.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV03.TIME, Time.writeTime(Data.TIME)),
|
||||
property("ignored", "ignored")
|
||||
),
|
||||
Data.DATACONTENTTYPE_TEXT,
|
||||
|
@ -146,10 +149,10 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
// V1
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V1.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
|
||||
property(CloudEventV1.ID, Data.ID),
|
||||
property(CloudEventV1.TYPE, Data.TYPE),
|
||||
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
|
||||
property("ignored", "ignored")
|
||||
),
|
||||
DATACONTENTTYPE_NULL,
|
||||
|
@ -158,13 +161,13 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V1.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("dataschema", Data.DATASCHEMA.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
|
||||
property(CloudEventV1.ID, Data.ID),
|
||||
property(CloudEventV1.TYPE, Data.TYPE),
|
||||
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
|
||||
property(CloudEventV1.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
|
||||
property("ignored", "ignored")
|
||||
),
|
||||
Data.DATACONTENTTYPE_JSON,
|
||||
|
@ -173,13 +176,13 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V1.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("dataschema", Data.DATASCHEMA.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
|
||||
property(CloudEventV1.ID, Data.ID),
|
||||
property(CloudEventV1.TYPE, Data.TYPE),
|
||||
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV1.DATASCHEMA, Data.DATASCHEMA.toString()),
|
||||
property(CloudEventV1.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
|
||||
property("astring", "aaa"),
|
||||
property("aboolean", "true"),
|
||||
property("anumber", "10"),
|
||||
|
@ -191,12 +194,12 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V1.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
|
||||
property(CloudEventV1.ID, Data.ID),
|
||||
property(CloudEventV1.TYPE, Data.TYPE),
|
||||
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV1.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
|
||||
property("ignored", "ignored")
|
||||
),
|
||||
Data.DATACONTENTTYPE_XML,
|
||||
|
@ -205,12 +208,12 @@ public class ProtonAmqpMessageFactoryTest {
|
|||
),
|
||||
Arguments.of(
|
||||
properties(
|
||||
property("specversion", SpecVersion.V1.toString()),
|
||||
property("id", Data.ID),
|
||||
property("type", Data.TYPE),
|
||||
property("source", Data.SOURCE.toString()),
|
||||
property("subject", Data.SUBJECT),
|
||||
property("time", Time.writeTime(Data.TIME)),
|
||||
property(CloudEventV1.SPECVERSION, SpecVersion.V1.toString()),
|
||||
property(CloudEventV1.ID, Data.ID),
|
||||
property(CloudEventV1.TYPE, Data.TYPE),
|
||||
property(CloudEventV1.SOURCE, Data.SOURCE.toString()),
|
||||
property(CloudEventV1.SUBJECT, Data.SUBJECT),
|
||||
property(CloudEventV1.TIME, Time.writeTime(Data.TIME)),
|
||||
property("ignored", "ignored")
|
||||
),
|
||||
Data.DATACONTENTTYPE_TEXT,
|
||||
|
|
|
@ -19,6 +19,7 @@ package io.cloudevents.core.message.impl;
|
|||
|
||||
import io.cloudevents.CloudEventData;
|
||||
import io.cloudevents.SpecVersion;
|
||||
import io.cloudevents.core.v1.CloudEventV1;
|
||||
import io.cloudevents.rw.CloudEventDataMapper;
|
||||
import io.cloudevents.rw.CloudEventRWException;
|
||||
import io.cloudevents.rw.CloudEventWriter;
|
||||
|
@ -56,10 +57,10 @@ public abstract class BaseGenericBinaryMessageReaderImpl<HK, HV> extends BaseBin
|
|||
// in order to complete the visit in one loop
|
||||
this.forEachHeader((key, value) -> {
|
||||
if (isContentTypeHeader(key)) {
|
||||
visitor.withAttribute("datacontenttype", toCloudEventsValue(value));
|
||||
visitor.withAttribute(CloudEventV1.DATACONTENTTYPE, toCloudEventsValue(value));
|
||||
} else if (isCloudEventsHeader(key)) {
|
||||
String name = toCloudEventsKey(key);
|
||||
if (name.equals("specversion")) {
|
||||
if (name.equals(CloudEventV1.SPECVERSION)) {
|
||||
return;
|
||||
}
|
||||
if (this.version.getAllAttributes().contains(name)) {
|
||||
|
|
|
@ -108,13 +108,13 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
|
|||
@Override
|
||||
public CloudEvent build() {
|
||||
if (id == null) {
|
||||
throw createMissingAttributeException("id");
|
||||
throw createMissingAttributeException(CloudEventV1.ID);
|
||||
}
|
||||
if (source == null) {
|
||||
throw createMissingAttributeException("source");
|
||||
throw createMissingAttributeException(CloudEventV1.SOURCE);
|
||||
}
|
||||
if (type == null) {
|
||||
throw createMissingAttributeException("type");
|
||||
throw createMissingAttributeException(CloudEventV1.TYPE);
|
||||
}
|
||||
|
||||
return new CloudEventV1(id, source, type, datacontenttype, dataschema, subject, time, this.data, this.extensions);
|
||||
|
@ -140,34 +140,34 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
|
|||
@Override
|
||||
public CloudEventBuilder withAttribute(String name, String value) throws CloudEventRWException {
|
||||
switch (name) {
|
||||
case "id":
|
||||
case CloudEventV1.ID:
|
||||
withId(value);
|
||||
return this;
|
||||
case "source":
|
||||
case CloudEventV1.SOURCE:
|
||||
try {
|
||||
withSource(new URI(value));
|
||||
} catch (URISyntaxException e) {
|
||||
throw CloudEventRWException.newInvalidAttributeValue("source", value, e);
|
||||
throw CloudEventRWException.newInvalidAttributeValue(CloudEventV1.SOURCE, value, e);
|
||||
}
|
||||
return this;
|
||||
case "type":
|
||||
case CloudEventV1.TYPE:
|
||||
withType(value);
|
||||
return this;
|
||||
case "datacontenttype":
|
||||
case CloudEventV1.DATACONTENTTYPE:
|
||||
withDataContentType(value);
|
||||
return this;
|
||||
case "dataschema":
|
||||
case CloudEventV1.DATASCHEMA:
|
||||
try {
|
||||
withDataSchema(new URI(value));
|
||||
} catch (URISyntaxException e) {
|
||||
throw CloudEventRWException.newInvalidAttributeValue("dataschema", value, e);
|
||||
throw CloudEventRWException.newInvalidAttributeValue(CloudEventV1.DATASCHEMA, value, e);
|
||||
}
|
||||
return this;
|
||||
case "subject":
|
||||
case CloudEventV1.SUBJECT:
|
||||
withSubject(value);
|
||||
return this;
|
||||
case "time":
|
||||
withTime(Time.parseTime("time", value));
|
||||
case CloudEventV1.TIME:
|
||||
withTime(Time.parseTime(CloudEventV1.TIME, value));
|
||||
return this;
|
||||
}
|
||||
throw CloudEventRWException.newInvalidAttributeName(name);
|
||||
|
@ -176,10 +176,10 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
|
|||
@Override
|
||||
public CloudEventBuilder withAttribute(String name, URI value) throws CloudEventRWException {
|
||||
switch (name) {
|
||||
case "source":
|
||||
case CloudEventV1.SOURCE:
|
||||
withSource(value);
|
||||
return this;
|
||||
case "dataschema":
|
||||
case CloudEventV1.DATASCHEMA:
|
||||
withDataSchema(value);
|
||||
return this;
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ public final class CloudEventBuilder extends BaseCloudEventBuilder<CloudEventBui
|
|||
|
||||
@Override
|
||||
public CloudEventBuilder withAttribute(String name, OffsetDateTime value) throws CloudEventRWException {
|
||||
if ("time".equals(name)) {
|
||||
if (CloudEventV1.TIME.equals(name)) {
|
||||
withTime(value);
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
package io.cloudevents.http.impl;
|
||||
|
||||
import io.cloudevents.core.message.impl.MessageUtils;
|
||||
import io.cloudevents.core.v1.CloudEventV1;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
@ -30,12 +31,12 @@ public final class CloudEventsHeaders {
|
|||
public static final String CE_PREFIX = "ce-";
|
||||
|
||||
public static final Map<String, String> ATTRIBUTES_TO_HEADERS = Collections.unmodifiableMap(MessageUtils.generateAttributesToHeadersMapping(v -> {
|
||||
if (v.equals("datacontenttype")) {
|
||||
if (v.equals(CloudEventV1.DATACONTENTTYPE)) {
|
||||
return CONTENT_TYPE;
|
||||
}
|
||||
return CE_PREFIX + v;
|
||||
}));
|
||||
|
||||
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
|
||||
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);
|
||||
|
||||
}
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
package io.cloudevents.kafka.impl;
|
||||
|
||||
import io.cloudevents.core.message.impl.MessageUtils;
|
||||
import io.cloudevents.core.v1.CloudEventV1;
|
||||
|
||||
import org.apache.kafka.common.header.Header;
|
||||
import org.apache.kafka.common.header.Headers;
|
||||
|
||||
|
@ -37,13 +39,13 @@ public class KafkaHeaders {
|
|||
|
||||
protected static final Map<String, String> ATTRIBUTES_TO_HEADERS = MessageUtils.generateAttributesToHeadersMapping(
|
||||
v -> {
|
||||
if (v.equals("datacontenttype")) {
|
||||
if (v.equals(CloudEventV1.DATACONTENTTYPE)) {
|
||||
return CONTENT_TYPE;
|
||||
}
|
||||
return CE_PREFIX + v;
|
||||
});
|
||||
|
||||
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get("specversion");
|
||||
public static final String SPEC_VERSION = ATTRIBUTES_TO_HEADERS.get(CloudEventV1.SPECVERSION);
|
||||
|
||||
public static String getParsedKafkaHeader(Headers headers, String key) {
|
||||
Header h = headers.lastHeader(key);
|
||||
|
|
|
@ -18,6 +18,8 @@
|
|||
package io.cloudevents.kafka.impl;
|
||||
|
||||
import io.cloudevents.SpecVersion;
|
||||
import io.cloudevents.core.v1.CloudEventV1;
|
||||
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.common.header.internals.RecordHeaders;
|
||||
|
||||
|
@ -44,7 +46,7 @@ public final class KafkaProducerMessageWriterImpl<K>
|
|||
|
||||
@Override
|
||||
public KafkaProducerMessageWriterImpl<K> create(SpecVersion version) {
|
||||
this.withAttribute("specversion", version.toString());
|
||||
this.withAttribute(CloudEventV1.SPECVERSION, version.toString());
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue