Merge pull request #20 from matzew/0.2_start
WIP: Moving towards Spec 0.2
This commit is contained in:
commit
9eac0fe83a
|
|
@ -17,36 +17,24 @@ package io.cloudevents;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.util.Map;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An abstract event envelope, representing the 0.1 version of the <a href="https://github.com/cloudevents/spec/blob/master/spec.md">CNCF CloudEvent spec</a>.
|
* An abstract event envelope, representing the 0.2 version of the <a href="https://github.com/cloudevents/spec/blob/master/spec.md">CNCF CloudEvent spec</a>.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public interface CloudEvent<T> {
|
public interface CloudEvent<T> {
|
||||||
|
|
||||||
// required
|
|
||||||
String EVENT_TYPE_KEY = "ce-eventType";
|
|
||||||
String CLOUD_EVENTS_VERSION_KEY = "ce-cloudEventsVersion";
|
|
||||||
String SOURCE_KEY = "ce-source";
|
|
||||||
String EVENT_ID_KEY = "ce-eventID";
|
|
||||||
|
|
||||||
// none-required
|
|
||||||
String EVENT_TYPE_VERSION_KEY = "ce-eventTypeVersion";
|
|
||||||
String EVENT_TIME_KEY = "ce-eventTime";
|
|
||||||
String SCHEMA_URL_KEY = "ce-schemaURL";
|
|
||||||
String HEADER_PREFIX = "ce-x-";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Type of occurrence which has happened. Often this property is used for routing, observability, policy enforcement, etc.
|
* Type of occurrence which has happened. Often this property is used for routing, observability, policy enforcement, etc.
|
||||||
*/
|
*/
|
||||||
String getEventType();
|
String getType();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The version of the CloudEvents specification which the event uses. This enables the interpretation of the context.
|
* The version of the CloudEvents specification which the event uses. This enables the interpretation of the context.
|
||||||
*/
|
*/
|
||||||
String getCloudEventsVersion();
|
String getSepcVersion();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This describes the event producer. Often this will include information such as the type of the event source, the organization publishing the event, and some unique identifiers.
|
* This describes the event producer. Often this will include information such as the type of the event source, the organization publishing the event, and some unique identifiers.
|
||||||
|
|
@ -57,16 +45,12 @@ public interface CloudEvent<T> {
|
||||||
/**
|
/**
|
||||||
* ID of the event. The semantics of this string are explicitly undefined to ease the implementation of producers. Enables deduplication.
|
* ID of the event. The semantics of this string are explicitly undefined to ease the implementation of producers. Enables deduplication.
|
||||||
*/
|
*/
|
||||||
String getEventID();
|
String getId();
|
||||||
|
|
||||||
/**
|
|
||||||
* The version of the eventType. This enables the interpretation of data by eventual consumers, requires the consumer to be knowledgeable about the producer.
|
|
||||||
*/
|
|
||||||
Optional<String> getEventTypeVersion();
|
|
||||||
/**
|
/**
|
||||||
* Timestamp of when the event happened.
|
* Timestamp of when the event happened.
|
||||||
*/
|
*/
|
||||||
Optional<ZonedDateTime> getEventTime();
|
Optional<ZonedDateTime> getTime();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A link to the schema that the data attribute adheres to.
|
* A link to the schema that the data attribute adheres to.
|
||||||
|
|
@ -78,13 +62,13 @@ public interface CloudEvent<T> {
|
||||||
*/
|
*/
|
||||||
Optional<String> getContentType();
|
Optional<String> getContentType();
|
||||||
|
|
||||||
/**
|
|
||||||
* This is for additional metadata and this does not have a mandated structure. This enables a place for custom fields a producer or middleware might want to include and provides a place to test metadata before adding them to the CloudEvents specification.
|
|
||||||
*/
|
|
||||||
Optional<Map> getExtensions();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The event payload. The payload depends on the eventType, schemaURL and eventTypeVersion, the payload is encoded into a media format which is specified by the contentType attribute (e.g. application/json).
|
* The event payload. The payload depends on the eventType, schemaURL and eventTypeVersion, the payload is encoded into a media format which is specified by the contentType attribute (e.g. application/json).
|
||||||
*/
|
*/
|
||||||
Optional<T> getData();
|
Optional<T> getData();
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
Optional<List<Extension>> getExtensions();
|
||||||
}
|
}
|
||||||
|
|
@ -19,39 +19,40 @@ import io.cloudevents.impl.DefaultCloudEventImpl;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Builder class to create a Java Object representing a CloudEvent implementation
|
* Builder class to create a Java Object representing a CloudEvent implementation
|
||||||
* @param <T> type of the data field
|
* @param <T> type of the data field
|
||||||
*/
|
*/
|
||||||
public class CloudEventBuilder<T> {
|
public class CloudEventBuilder<T> {
|
||||||
|
|
||||||
private final String cloudEventsVersion = "0.1";
|
private String specversion;
|
||||||
private Map<?,?> extensions = new LinkedHashMap();
|
|
||||||
private String contentType;
|
private String contentType;
|
||||||
private String eventType;
|
private String type;
|
||||||
private URI source;
|
private URI source;
|
||||||
private String eventID;
|
private String id;
|
||||||
private String eventTypeVersion;
|
private ZonedDateTime time;
|
||||||
private ZonedDateTime eventTime;
|
|
||||||
private URI schemaURL;
|
private URI schemaURL;
|
||||||
private T data;
|
private T data;
|
||||||
|
private final List<Extension> extensions = new ArrayList<>();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Type of occurrence which has happened. Often this property is used for routing, observability, policy enforcement, etc.
|
* The version of the CloudEvents specification which the event uses.
|
||||||
*/
|
*/
|
||||||
public CloudEventBuilder<T> eventType(final String eventType) {
|
public CloudEventBuilder<T> specVersion(final String specVersion) {
|
||||||
this.eventType = eventType;
|
this.specversion = specVersion;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The version of the eventType. This enables the interpretation of data by eventual consumers, requires the consumer to be knowledgeable about the producer.
|
* Type of occurrence which has happened. Often this property is used for routing, observability, policy enforcement, etc.
|
||||||
*/
|
*/
|
||||||
public CloudEventBuilder<T> eventTypeVersion(final String eventTypeVersion) {
|
public CloudEventBuilder<T> type(final String type) {
|
||||||
this.eventTypeVersion = eventTypeVersion;
|
this.type = type;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -68,16 +69,16 @@ public class CloudEventBuilder<T> {
|
||||||
/**
|
/**
|
||||||
* ID of the event. The semantics of this string are explicitly undefined to ease the implementation of producers. Enables deduplication.
|
* ID of the event. The semantics of this string are explicitly undefined to ease the implementation of producers. Enables deduplication.
|
||||||
*/
|
*/
|
||||||
public CloudEventBuilder<T> eventID(final String eventID) {
|
public CloudEventBuilder<T> id(final String id) {
|
||||||
this.eventID = eventID;
|
this.id = id;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Timestamp of when the event happened.
|
* Timestamp of when the event happened.
|
||||||
*/
|
*/
|
||||||
public CloudEventBuilder<T> eventTime(final ZonedDateTime eventTime) {
|
public CloudEventBuilder<T> time(final ZonedDateTime time) {
|
||||||
this.eventTime = eventTime;
|
this.time = time;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -98,32 +99,32 @@ public class CloudEventBuilder<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is for additional metadata and this does not have a mandated structure. This enables a place for custom
|
* The event payload. The payload depends on the type and schemaURL, the payload is encoded into a media format which is specified by the contenttype attribute (e.g. application/json).
|
||||||
* fields a producer or middleware might want to include and provides a place to test metadata before adding them
|
|
||||||
* to the CloudEvents specification.
|
|
||||||
*/
|
|
||||||
public CloudEventBuilder<T> extensions(final Map extensions) {
|
|
||||||
this.extensions = extensions;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The event payload. The payload depends on the eventType, schemaURL and eventTypeVersion, the payload is encoded into a media format which is specified by the contentType attribute (e.g. application/json).
|
|
||||||
*/
|
*/
|
||||||
public CloudEventBuilder<T> data(final T data) {
|
public CloudEventBuilder<T> data(final T data) {
|
||||||
this.data = data;
|
this.data = data;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CloudEventBuilder<T> extension(final Extension extension) {
|
||||||
|
this.extensions.add(extension);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a new {@link CloudEvent} with the previously-set configuration.
|
* Constructs a new {@link CloudEvent} with the previously-set configuration.
|
||||||
*/
|
*/
|
||||||
public CloudEvent<T> build() {
|
public CloudEvent<T> build() {
|
||||||
|
|
||||||
if (eventType == null || cloudEventsVersion == null || source == null || eventID == null) {
|
// forcing latest (default) version
|
||||||
|
if (specversion == null) {
|
||||||
|
specversion = SpecVersion.DEFAULT.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type == null || source == null || id == null) {
|
||||||
throw new IllegalArgumentException("please provide all required fields");
|
throw new IllegalArgumentException("please provide all required fields");
|
||||||
}
|
}
|
||||||
|
|
||||||
return new DefaultCloudEventImpl<T>(eventType, cloudEventsVersion, source, eventID, eventTypeVersion, eventTime, schemaURL, contentType, extensions, data);
|
return new DefaultCloudEventImpl<T>(type, specversion, source, id, time, schemaURL, contentType, data, extensions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,19 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2018 The CloudEvents Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.cloudevents;
|
||||||
|
|
||||||
|
public interface Extension {
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,68 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2018 The CloudEvents Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.cloudevents;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public enum SpecVersion {
|
||||||
|
|
||||||
|
V_01("0.1"),
|
||||||
|
V_02("0.2"),
|
||||||
|
DEFAULT(V_02.toString());
|
||||||
|
|
||||||
|
private final String version;
|
||||||
|
|
||||||
|
SpecVersion(final String version) {
|
||||||
|
this.version = version;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String version() {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SpecVersion fromVersion(final String version) {
|
||||||
|
if (version == null)
|
||||||
|
return null;
|
||||||
|
|
||||||
|
final SpecVersion specVersion= VERSION_TO_SPEC.get(version);
|
||||||
|
|
||||||
|
if (specVersion == null)
|
||||||
|
throw new IllegalArgumentException();
|
||||||
|
|
||||||
|
return specVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final Map<String, SpecVersion> VERSION_TO_SPEC =
|
||||||
|
new HashMap<>();
|
||||||
|
|
||||||
|
static
|
||||||
|
{
|
||||||
|
SpecVersion[] instances = SpecVersion.class.getEnumConstants();
|
||||||
|
|
||||||
|
for (int i = 0; i < instances.length; i++)
|
||||||
|
{
|
||||||
|
VERSION_TO_SPEC.put(instances[i].toString(), instances[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,33 @@
|
||||||
|
package io.cloudevents.extensions;
|
||||||
|
|
||||||
|
import io.cloudevents.Extension;
|
||||||
|
|
||||||
|
public class DistributedTracingExtension implements Extension {
|
||||||
|
|
||||||
|
private String traceparent;
|
||||||
|
private String tracestate;
|
||||||
|
|
||||||
|
public String getTraceparent() {
|
||||||
|
return traceparent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTraceparent(String traceparent) {
|
||||||
|
this.traceparent = traceparent;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTracestate() {
|
||||||
|
return tracestate;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setTracestate(String tracestate) {
|
||||||
|
this.tracestate = tracestate;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "DistributedTracingExtension{" +
|
||||||
|
"traceparent='" + traceparent + '\'' +
|
||||||
|
", tracestate='" + tracestate + '\'' +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2018 The CloudEvents Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.cloudevents.http;
|
||||||
|
|
||||||
|
import io.cloudevents.SpecVersion;
|
||||||
|
|
||||||
|
import static io.cloudevents.SpecVersion.V_01;
|
||||||
|
|
||||||
|
public interface HttpTransportAttributes {
|
||||||
|
|
||||||
|
// required attrs
|
||||||
|
String typeKey();
|
||||||
|
String specVersionKey();
|
||||||
|
String sourceKey();
|
||||||
|
String idKey();
|
||||||
|
|
||||||
|
// none-required attrs
|
||||||
|
String timeKey();
|
||||||
|
String schemaUrlKey();
|
||||||
|
|
||||||
|
static HttpTransportAttributes getHttpAttributesForSpec(final SpecVersion specVersion) {
|
||||||
|
|
||||||
|
switch (specVersion) {
|
||||||
|
|
||||||
|
case V_01: return new V01HttpTransportMappers();
|
||||||
|
case V_02:
|
||||||
|
case DEFAULT: return new V02HttpTransportMappers();
|
||||||
|
}
|
||||||
|
|
||||||
|
// you should not be here!
|
||||||
|
throw new IllegalArgumentException("Could not find proper version");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,51 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2018 The CloudEvents Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.cloudevents.http;
|
||||||
|
|
||||||
|
public class V01HttpTransportMappers implements HttpTransportAttributes {
|
||||||
|
|
||||||
|
public static final String SPEC_VERSION_KEY = "ce-cloudEventsVersion";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String typeKey() {
|
||||||
|
return "ce-eventType";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String specVersionKey() {
|
||||||
|
return SPEC_VERSION_KEY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String sourceKey() {
|
||||||
|
return "ce-source";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String idKey() {
|
||||||
|
return "ce-eventID";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String timeKey() {
|
||||||
|
return "ce-eventTime";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String schemaUrlKey() {
|
||||||
|
return "ce-schemaURL";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,47 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2018 The CloudEvents Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.cloudevents.http;
|
||||||
|
|
||||||
|
public class V02HttpTransportMappers extends V01HttpTransportMappers {
|
||||||
|
|
||||||
|
public static final String SPEC_VERSION_KEY = "ce-specversion";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String typeKey() {
|
||||||
|
return "ce-type";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String specVersionKey() {
|
||||||
|
return SPEC_VERSION_KEY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String idKey() {
|
||||||
|
return "ce-id";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String timeKey() {
|
||||||
|
return "ce-time";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String schemaUrlKey() {
|
||||||
|
return "ce-schemaurl";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -15,12 +15,17 @@
|
||||||
*/
|
*/
|
||||||
package io.cloudevents.impl;
|
package io.cloudevents.impl;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonAlias;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
|
||||||
import io.cloudevents.CloudEvent;
|
import io.cloudevents.CloudEvent;
|
||||||
|
import io.cloudevents.Extension;
|
||||||
|
import io.cloudevents.SpecVersion;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
|
|
@ -29,32 +34,31 @@ import java.util.Optional;
|
||||||
*
|
*
|
||||||
* @param <T> generic type of the underlying data field.
|
* @param <T> generic type of the underlying data field.
|
||||||
*/
|
*/
|
||||||
|
@JsonIgnoreProperties(value = { "eventTypeVersion", "extensions" }) // was removed from 0.1
|
||||||
public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
|
public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 2L;
|
||||||
|
|
||||||
private String cloudEventsVersion = "0.1";
|
private String specversion;
|
||||||
private Map extensions = null;
|
private String type = null;
|
||||||
private String eventType = null;
|
|
||||||
private URI source = null;
|
private URI source = null;
|
||||||
private String eventID = null;
|
private String id = null;
|
||||||
private String eventTypeVersion = null;
|
private ZonedDateTime time = null;
|
||||||
private ZonedDateTime eventTime = null;
|
|
||||||
private URI schemaURL = null;
|
private URI schemaURL = null;
|
||||||
private String contentType = null;
|
private String contentType = null;
|
||||||
private T data = null;
|
private T data = null;
|
||||||
|
private List<Extension> extensions = null;
|
||||||
|
|
||||||
public DefaultCloudEventImpl(final String eventType, final String cloudEventsVersion, final URI source, final String eventID, final String eventTypeVersion, final ZonedDateTime eventTime, final URI schemaURL, final String contentType, final Map extensions, final T data) {
|
public DefaultCloudEventImpl(final String type, final String specversion, final URI source, final String id, final ZonedDateTime time, final URI schemaURL, final String contentType, final T data, final List<Extension> extensions) {
|
||||||
this.cloudEventsVersion = cloudEventsVersion;
|
this.specversion = specversion;
|
||||||
this.extensions = extensions;
|
this.type = type;
|
||||||
this.eventType = eventType;
|
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.eventID = eventID;
|
this.id = id;
|
||||||
this.eventTypeVersion = eventTypeVersion;
|
this.time = time;
|
||||||
this.eventTime = eventTime;
|
|
||||||
this.schemaURL = schemaURL;
|
this.schemaURL = schemaURL;
|
||||||
this.contentType = contentType;
|
this.contentType = contentType;
|
||||||
this.data = data;
|
this.data = data;
|
||||||
|
this.extensions = extensions;
|
||||||
}
|
}
|
||||||
|
|
||||||
DefaultCloudEventImpl() {
|
DefaultCloudEventImpl() {
|
||||||
|
|
@ -62,18 +66,13 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getCloudEventsVersion() {
|
public String getSepcVersion() {
|
||||||
return cloudEventsVersion;
|
return specversion;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<Map> getExtensions() {
|
public String getType() {
|
||||||
return Optional.ofNullable(extensions);
|
return type;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getEventType() {
|
|
||||||
return eventType;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -82,18 +81,13 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getEventID() {
|
public String getId() {
|
||||||
return eventID;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<String> getEventTypeVersion() {
|
public Optional<ZonedDateTime> getTime() {
|
||||||
return Optional.ofNullable(eventTypeVersion);
|
return Optional.ofNullable(time);
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Optional<ZonedDateTime> getEventTime() {
|
|
||||||
return Optional.ofNullable(eventTime);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
@ -111,36 +105,36 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
|
||||||
return Optional.ofNullable(data);
|
return Optional.ofNullable(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<List<Extension>> getExtensions() {
|
||||||
|
return Optional.ofNullable(extensions);
|
||||||
|
}
|
||||||
|
|
||||||
// protected setters, used for (JSON) deserialization
|
// protected setters, used for (JSON) deserialization
|
||||||
|
|
||||||
void setCloudEventsVersion(String cloudEventsVersion) {
|
@JsonAlias({"specversion", "cloudEventsVersion"})
|
||||||
this.cloudEventsVersion = cloudEventsVersion;
|
void setSpecversion(String specversion) {
|
||||||
|
this.specversion = specversion;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setExtensions(Map extensions) {
|
@JsonAlias({"type", "eventType"})
|
||||||
this.extensions = extensions;
|
void setType(String type) {
|
||||||
}
|
this.type = type;
|
||||||
|
|
||||||
void setEventType(String eventType) {
|
|
||||||
this.eventType = eventType;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void setSource(URI source) {
|
void setSource(URI source) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setEventID(String eventID) {
|
@JsonAlias({"id", "eventID"})
|
||||||
this.eventID = eventID;
|
void setId(String id) {
|
||||||
}
|
this.id = id;
|
||||||
|
|
||||||
void setEventTypeVersion(String eventTypeVersion) {
|
|
||||||
this.eventTypeVersion = eventTypeVersion;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonDeserialize(using = ZonedDateTimeDeserializer.class)
|
@JsonDeserialize(using = ZonedDateTimeDeserializer.class)
|
||||||
void setEventTime(ZonedDateTime eventTime) {
|
@JsonAlias({"time", "eventTime"})
|
||||||
this.eventTime = eventTime;
|
void setTime(ZonedDateTime time) {
|
||||||
|
this.time = time;
|
||||||
}
|
}
|
||||||
|
|
||||||
void setSchemaURL(URI schemaURL) {
|
void setSchemaURL(URI schemaURL) {
|
||||||
|
|
@ -158,13 +152,11 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "DefaultCloudEventImpl{" +
|
return "DefaultCloudEventImpl{" +
|
||||||
"cloudEventsVersion='" + cloudEventsVersion + '\'' +
|
"specversion='" + specversion + '\'' +
|
||||||
", extensions=" + extensions +
|
", type='" + type + '\'' +
|
||||||
", eventType='" + eventType + '\'' +
|
|
||||||
", source=" + source +
|
", source=" + source +
|
||||||
", eventID='" + eventID + '\'' +
|
", id='" + id + '\'' +
|
||||||
", eventTypeVersion='" + eventTypeVersion + '\'' +
|
", time=" + time +
|
||||||
", eventTime=" + eventTime +
|
|
||||||
", schemaURL=" + schemaURL +
|
", schemaURL=" + schemaURL +
|
||||||
", contentType='" + contentType + '\'' +
|
", contentType='" + contentType + '\'' +
|
||||||
", data=" + data +
|
", data=" + data +
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@
|
||||||
*/
|
*/
|
||||||
package io.cloudevents;
|
package io.cloudevents;
|
||||||
|
|
||||||
|
import io.cloudevents.extensions.DistributedTracingExtension;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
@ -34,26 +35,21 @@ public class CloudEventBuilderTest {
|
||||||
final Map<String, String> keyValueStore = new HashMap<>();
|
final Map<String, String> keyValueStore = new HashMap<>();
|
||||||
keyValueStore.put("key1", "value1");
|
keyValueStore.put("key1", "value1");
|
||||||
keyValueStore.put("key2", "value2");
|
keyValueStore.put("key2", "value2");
|
||||||
final String eventId = UUID.randomUUID().toString();
|
final String id = UUID.randomUUID().toString();
|
||||||
final URI src = URI.create("/trigger");
|
final URI src = URI.create("/trigger");
|
||||||
final String eventType = "My.Cloud.Event.Type";
|
final String type = "My.Cloud.Event.Type";
|
||||||
final String eventTypeVersion = "2.0";
|
|
||||||
final ZonedDateTime eventTime = ZonedDateTime.now();
|
final ZonedDateTime eventTime = ZonedDateTime.now();
|
||||||
final String contentType = "application/json";
|
final String contentType = "application/json";
|
||||||
final URI schemaUri = URI.create("http://cloudevents.io/schema");
|
final URI schemaUri = URI.create("http://cloudevents.io/schema");
|
||||||
final Map<String, String> extensionData = new HashMap<>();
|
|
||||||
extensionData.put("foo", "bar");
|
|
||||||
|
|
||||||
// when
|
// when
|
||||||
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
||||||
.data(keyValueStore)
|
.data(keyValueStore)
|
||||||
.contentType(contentType)
|
.contentType(contentType)
|
||||||
.eventType(eventType)
|
.type(type)
|
||||||
.schemaURL(schemaUri)
|
.schemaURL(schemaUri)
|
||||||
.eventTypeVersion(eventTypeVersion)
|
.time(eventTime)
|
||||||
.eventTime(eventTime)
|
.id(id)
|
||||||
.extensions(extensionData)
|
|
||||||
.eventID(eventId)
|
|
||||||
.source(src)
|
.source(src)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
@ -65,36 +61,139 @@ public class CloudEventBuilderTest {
|
||||||
});
|
});
|
||||||
|
|
||||||
assertThat(simpleKeyValueEvent.getContentType().get()).isEqualTo(contentType);
|
assertThat(simpleKeyValueEvent.getContentType().get()).isEqualTo(contentType);
|
||||||
assertThat(simpleKeyValueEvent.getEventTime().get()).isEqualTo(eventTime);
|
assertThat(simpleKeyValueEvent.getTime().get()).isEqualTo(eventTime);
|
||||||
assertThat(simpleKeyValueEvent.getEventID()).isEqualTo(eventId);
|
assertThat(simpleKeyValueEvent.getId()).isEqualTo(id);
|
||||||
assertThat(simpleKeyValueEvent.getSchemaURL().get()).isEqualTo(schemaUri);
|
assertThat(simpleKeyValueEvent.getSchemaURL().get()).isEqualTo(schemaUri);
|
||||||
assertThat(simpleKeyValueEvent.getEventType()).isEqualTo(eventType);
|
assertThat(simpleKeyValueEvent.getType()).isEqualTo(type);
|
||||||
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
||||||
assertThat(simpleKeyValueEvent.getExtensions().get()).isEqualTo(extensionData);
|
assertThat(simpleKeyValueEvent.getSepcVersion()).isEqualTo(SpecVersion.DEFAULT.toString());
|
||||||
assertThat(simpleKeyValueEvent.getCloudEventsVersion()).isEqualTo("0.1");
|
|
||||||
assertThat(simpleKeyValueEvent.getEventTypeVersion().get()).isEqualTo("2.0");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBuilderWithoutData() {
|
public void testBuilderWithoutData() {
|
||||||
|
|
||||||
// given
|
// given
|
||||||
final String eventId = UUID.randomUUID().toString();
|
final String id = UUID.randomUUID().toString();
|
||||||
final URI src = URI.create("/trigger");
|
final URI src = URI.create("/trigger");
|
||||||
final String eventType = "My.Cloud.Event.Type";
|
final String type = "My.Cloud.Event.Type";
|
||||||
|
|
||||||
// when
|
// when
|
||||||
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
||||||
.eventType(eventType)
|
.type(type)
|
||||||
.eventID(eventId)
|
.id(id)
|
||||||
.source(src)
|
.source(src)
|
||||||
.build();
|
.build();
|
||||||
// than
|
// than
|
||||||
assertThat(simpleKeyValueEvent.getData().isPresent()).isFalse();
|
assertThat(simpleKeyValueEvent.getData().isPresent()).isFalse();
|
||||||
assertThat(simpleKeyValueEvent.getEventTime().isPresent()).isFalse();
|
assertThat(simpleKeyValueEvent.getTime().isPresent()).isFalse();
|
||||||
assertThat(simpleKeyValueEvent.getEventID()).isEqualTo(eventId);
|
assertThat(simpleKeyValueEvent.getId()).isEqualTo(id);
|
||||||
assertThat(simpleKeyValueEvent.getEventType()).isEqualTo(eventType);
|
assertThat(simpleKeyValueEvent.getType()).isEqualTo(type);
|
||||||
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
||||||
assertThat(simpleKeyValueEvent.getCloudEventsVersion()).isEqualTo("0.1");
|
assertThat(simpleKeyValueEvent.getSepcVersion()).isEqualTo(SpecVersion.DEFAULT.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderWithoutDataAndUrn() {
|
||||||
|
|
||||||
|
// given
|
||||||
|
final String id = UUID.randomUUID().toString();
|
||||||
|
final URI src = URI.create("urn:event:from:myapi/resourse/123");
|
||||||
|
final String type = "some.Cloud.Event.Type";
|
||||||
|
|
||||||
|
// when
|
||||||
|
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
||||||
|
.type(type)
|
||||||
|
.id(id)
|
||||||
|
.source(src)
|
||||||
|
.build();
|
||||||
|
// than
|
||||||
|
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test01BuilderWithoutDataAndUrn() {
|
||||||
|
|
||||||
|
// given
|
||||||
|
final String id = UUID.randomUUID().toString();
|
||||||
|
final URI src = URI.create("urn:event:from:myapi/resourse/123");
|
||||||
|
final String type = "some.Cloud.Event.Type";
|
||||||
|
|
||||||
|
// when
|
||||||
|
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
||||||
|
.specVersion("0.1")
|
||||||
|
.type(type)
|
||||||
|
.id(id)
|
||||||
|
.source(src)
|
||||||
|
.build();
|
||||||
|
// than
|
||||||
|
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
||||||
|
assertThat(simpleKeyValueEvent.getSepcVersion()).isEqualTo(SpecVersion.V_01.toString());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderWithoutDataAndURISchema() {
|
||||||
|
|
||||||
|
// given
|
||||||
|
final String id = UUID.randomUUID().toString();
|
||||||
|
final URI src = URI.create("urn:event:from:myapi/resourse/123");
|
||||||
|
final String type = "some.Cloud.Event.Type";
|
||||||
|
final URI schema = URI.create("urn:oasis:names:specification:docbook:dtd:xml:4.1.2");
|
||||||
|
|
||||||
|
// when
|
||||||
|
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
||||||
|
.type(type)
|
||||||
|
.id(id)
|
||||||
|
.source(src)
|
||||||
|
.schemaURL(schema)
|
||||||
|
.build();
|
||||||
|
// than
|
||||||
|
assertThat(simpleKeyValueEvent.getSchemaURL().get()).isEqualTo(schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderWithoutDataAndMailto() {
|
||||||
|
|
||||||
|
// given
|
||||||
|
final String id = UUID.randomUUID().toString();
|
||||||
|
final URI src = URI.create("mailto:cncf-wg-serverless@lists.cncf.io");
|
||||||
|
final String type = "My.Cloud.Event.Type";
|
||||||
|
|
||||||
|
// when
|
||||||
|
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
||||||
|
.type(type)
|
||||||
|
.id(id)
|
||||||
|
.source(src)
|
||||||
|
.build();
|
||||||
|
// than
|
||||||
|
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBuilderWithoutDataAndDistributedTracingExtension() {
|
||||||
|
|
||||||
|
// given
|
||||||
|
final String id = UUID.randomUUID().toString();
|
||||||
|
final URI src = URI.create("mailto:cncf-wg-serverless@lists.cncf.io");
|
||||||
|
final String type = "My.Cloud.Event.Type";
|
||||||
|
final DistributedTracingExtension dte = new DistributedTracingExtension();
|
||||||
|
dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
|
||||||
|
dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
|
||||||
|
|
||||||
|
// when
|
||||||
|
final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
|
||||||
|
.type(type)
|
||||||
|
.id(id)
|
||||||
|
.source(src)
|
||||||
|
.extension(dte)
|
||||||
|
.build();
|
||||||
|
// than
|
||||||
|
assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
|
||||||
|
assertThat(simpleKeyValueEvent.getExtensions().get()).contains(dte);
|
||||||
|
|
||||||
|
Extension receivedDte = simpleKeyValueEvent.getExtensions().get().get(0);
|
||||||
|
assertThat(receivedDte).extracting("traceparent", "tracestate")
|
||||||
|
.contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,9 +28,21 @@ import static org.assertj.core.api.Assertions.assertThat;
|
||||||
public class CloudEventJacksonTest {
|
public class CloudEventJacksonTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParseAzureJSON() {
|
public void testParseAzure01JSON() {
|
||||||
CloudEvent<Map<String, ?>> ce = JacksonMapper.fromInputStream(Thread.currentThread().getContextClassLoader().getResourceAsStream("azure.json"));
|
CloudEvent<Map<String, ?>> ce = JacksonMapper.fromInputStream(Thread.currentThread().getContextClassLoader().getResourceAsStream("01_azure.json"));
|
||||||
assertThat(ce.getEventType()).isEqualTo("Microsoft.Storage.BlobCreated");
|
assertThat(ce.getSepcVersion()).isEqualTo(SpecVersion.V_01.toString());
|
||||||
|
assertAzureCloudEvent(ce);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseAzure02JSON() {
|
||||||
|
CloudEvent<Map<String, ?>> ce = JacksonMapper.fromInputStream(Thread.currentThread().getContextClassLoader().getResourceAsStream("02_azure.json"));
|
||||||
|
assertThat(ce.getSepcVersion()).isEqualTo(SpecVersion.V_02.toString());
|
||||||
|
assertAzureCloudEvent(ce);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAzureCloudEvent(CloudEvent<Map<String, ?>> ce) {
|
||||||
|
assertThat(ce.getType()).isEqualTo("Microsoft.Storage.BlobCreated");
|
||||||
|
|
||||||
ce.getData().ifPresent(data -> {
|
ce.getData().ifPresent(data -> {
|
||||||
assertThat(Map.class).isAssignableFrom(data.getClass());
|
assertThat(Map.class).isAssignableFrom(data.getClass());
|
||||||
|
|
@ -40,16 +52,25 @@ public class CloudEventJacksonTest {
|
||||||
assertThat(storageDiagnostics).containsOnlyKeys("batchId");
|
assertThat(storageDiagnostics).containsOnlyKeys("batchId");
|
||||||
assertThat(storageDiagnostics.get("batchId")).isEqualTo("ba4fb664-f289-4742-8067-6c859411b066");
|
assertThat(storageDiagnostics.get("batchId")).isEqualTo("ba4fb664-f289-4742-8067-6c859411b066");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testParseAmazonJSON() {
|
public void testParseAmazon01JSON() {
|
||||||
CloudEvent ce = JacksonMapper.fromInputStream(Thread.currentThread().getContextClassLoader().getResourceAsStream("aws.json"));
|
CloudEvent ce = JacksonMapper.fromInputStream(Thread.currentThread().getContextClassLoader().getResourceAsStream("01_aws.json"));
|
||||||
assertThat(ce.getEventType()).isEqualTo("aws.s3.object.created");
|
assertAmazonCloudEvent(ce);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testParseAmazon02JSON() {
|
||||||
|
CloudEvent ce = JacksonMapper.fromInputStream(Thread.currentThread().getContextClassLoader().getResourceAsStream("02_aws.json"));
|
||||||
|
assertAmazonCloudEvent(ce);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertAmazonCloudEvent(CloudEvent ce) {
|
||||||
|
assertThat(ce.getType()).isEqualTo("aws.s3.object.created");
|
||||||
|
assertThat(ce.getId()).isEqualTo("C234-1234-1234");
|
||||||
|
assertThat(ce.getData().isPresent());
|
||||||
assertThat(ce.getSource().equals(URI.create("https://serverless.com")));
|
assertThat(ce.getSource().equals(URI.create("https://serverless.com")));
|
||||||
assertThat(ce.getEventTime().get()).isEqualTo(ZonedDateTime.parse("2018-04-26T14:48:09.769Z", ISO_ZONED_DATE_TIME));
|
assertThat(ce.getTime().get()).isEqualTo(ZonedDateTime.parse("2018-04-26T14:48:09.769Z", ISO_ZONED_DATE_TIME));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,52 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2018 The CloudEvents Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package io.cloudevents.http;
|
||||||
|
|
||||||
|
import io.cloudevents.SpecVersion;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
|
public class HttpTransportAttributesTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVersion01Headers() {
|
||||||
|
|
||||||
|
final HttpTransportAttributes v01 = HttpTransportAttributes.getHttpAttributesForSpec(SpecVersion.V_01);
|
||||||
|
assertThat(v01.specVersionKey()).isEqualTo("ce-cloudEventsVersion");
|
||||||
|
assertThat(v01.timeKey()).isEqualTo("ce-eventTime");
|
||||||
|
assertThat(v01.idKey()).isEqualTo("ce-eventID");
|
||||||
|
assertThat(v01.schemaUrlKey()).isEqualTo("ce-schemaURL");
|
||||||
|
assertThat(v01.typeKey()).isEqualTo("ce-eventType");
|
||||||
|
|
||||||
|
// non-changed between 01 / 02
|
||||||
|
assertThat(v01.sourceKey()).isEqualTo("ce-source");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testVersion02Headers() {
|
||||||
|
|
||||||
|
final HttpTransportAttributes v02 = HttpTransportAttributes.getHttpAttributesForSpec(SpecVersion.V_02);
|
||||||
|
assertThat(v02.specVersionKey()).isEqualTo("ce-specversion");
|
||||||
|
assertThat(v02.timeKey()).isEqualTo("ce-time");
|
||||||
|
assertThat(v02.idKey()).isEqualTo("ce-id");
|
||||||
|
assertThat(v02.schemaUrlKey()).isEqualTo("ce-schemaurl");
|
||||||
|
assertThat(v02.typeKey()).isEqualTo("ce-type");
|
||||||
|
|
||||||
|
// non-changed between 01 / 02
|
||||||
|
assertThat(v02.sourceKey()).isEqualTo("ce-source");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -18,4 +18,4 @@
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"source": "/subscriptions/326100e2-f69d-4268-8503-075374f62b6e/resourceGroups/cvtest34/providers/Microsoft.Storage/storageAccounts/cvtest34#/blobServices/default/containers/myfiles/blobs/IMG_20180224_0004.jpg"
|
"source": "/subscriptions/326100e2-f69d-4268-8503-075374f62b6e/resourceGroups/cvtest34/providers/Microsoft.Storage/storageAccounts/cvtest34#/blobServices/default/containers/myfiles/blobs/IMG_20180224_0004.jpg"
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,22 @@
|
||||||
|
{
|
||||||
|
"type": "aws.s3.object.created",
|
||||||
|
"id": "C234-1234-1234",
|
||||||
|
"time": "2018-04-26T14:48:09.769Z",
|
||||||
|
"source": "https://serverless.com",
|
||||||
|
"contentType": "application/json",
|
||||||
|
"specversion": "0.2",
|
||||||
|
"data":
|
||||||
|
{ "s3SchemaVersion": "1.0",
|
||||||
|
"configurationId": "cd267a38-30df-400e-9e3d-d0f1ca6e2410",
|
||||||
|
"bucket":
|
||||||
|
{ "name": "cloudevents",
|
||||||
|
"ownerIdentity": {},
|
||||||
|
"arn": "arn:aws:s3:::cloudevents" },
|
||||||
|
"object":
|
||||||
|
{ "key": "dan_kohn.jpg",
|
||||||
|
"size": 444684,
|
||||||
|
"eTag": "38b01ff16138d7ca0a0eb3f7a88ff815",
|
||||||
|
"sequencer": "005AE1E6A9A3D61490"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
{
|
||||||
|
"id": "96fb5f0b-001e-0108-6dfe-da6e2806f124",
|
||||||
|
"time": "2018-04-23T12:28:22.4579346Z",
|
||||||
|
"type": "Microsoft.Storage.BlobCreated",
|
||||||
|
"specversion": "0.2",
|
||||||
|
"data": {
|
||||||
|
"api": "PutBlockList",
|
||||||
|
"clientRequestId": "a23b4aba-2755-4107-8020-8ba6c54b203d",
|
||||||
|
"requestId": "96fb5f0b-001e-0108-6dfe-da6e28000000",
|
||||||
|
"eTag": "0x8D5A915B425AFFD",
|
||||||
|
"contentType": "image/jpeg",
|
||||||
|
"contentLength": 2779325,
|
||||||
|
"blobType": "BlockBlob",
|
||||||
|
"url": "https://cvtest34.blob.core.windows.net/myfiles/IMG_20180224_0004.jpg",
|
||||||
|
"sequencer": "000000000000000000000000000000BA00000000003db46c",
|
||||||
|
"storageDiagnostics": {
|
||||||
|
"batchId": "ba4fb664-f289-4742-8067-6c859411b066"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"source": "/subscriptions/326100e2-f69d-4268-8503-075374f62b6e/resourceGroups/cvtest34/providers/Microsoft.Storage/storageAccounts/cvtest34#/blobServices/default/containers/myfiles/blobs/IMG_20180224_0004.jpg"
|
||||||
|
}
|
||||||
|
|
@ -2,28 +2,29 @@
|
||||||
|
|
||||||
## Receiving CloudEvents
|
## Receiving CloudEvents
|
||||||
|
|
||||||
Below is a sample on how to read CloudEvents from an HttpRequest:
|
Below is a sample on how to use (Vert.x API for RxJava 2)[https://vertx.io/docs/vertx-rx/java2/] for reading CloudEvents from an HttpServerRequest:
|
||||||
|
|
||||||
```java
|
```java
|
||||||
import io.vertx.core.AbstractVerticle;
|
import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
|
||||||
public class Server extends AbstractVerticle {
|
import io.vertx.core.http.HttpHeaders;
|
||||||
|
import io.vertx.reactivex.core.AbstractVerticle;
|
||||||
|
|
||||||
|
public class CloudEventVerticle extends AbstractVerticle {
|
||||||
|
|
||||||
public void start() {
|
public void start() {
|
||||||
vertx.createHttpServer().requestHandler(req -> {
|
|
||||||
|
|
||||||
VertxCloudEvents.create().readFromRequest(req, reply -> {
|
vertx.createHttpServer()
|
||||||
|
.requestHandler(req -> VertxCloudEvents.create().rxReadFromRequest(req)
|
||||||
if (reply.succeeded()) {
|
.subscribe((receivedEvent, throwable) -> {
|
||||||
|
if (receivedEvent != null) {
|
||||||
final CloudEvent<?> receivedEvent = reply.result();
|
// I got a CloudEvent object:
|
||||||
// access the attributes:
|
System.out.println("The event type: " + receivedEvent.getEventType())
|
||||||
System.out.println(receivedEvent.getEventID());
|
}
|
||||||
/// ...
|
}))
|
||||||
});
|
.rxListen(8080)
|
||||||
|
.subscribe(server -> {
|
||||||
req.response()
|
System.out.println("Server running!");
|
||||||
.putHeader("content-type", "text/plain")
|
});
|
||||||
.end("Got a CloudEvent!");
|
|
||||||
}).listen(8080);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
@ -35,10 +36,12 @@ Below is a sample on how to use the client to send a CloudEvent:
|
||||||
```java
|
```java
|
||||||
final HttpClientRequest request = vertx.createHttpClient().post(8080, "localhost", "/");
|
final HttpClientRequest request = vertx.createHttpClient().post(8080, "localhost", "/");
|
||||||
|
|
||||||
|
// add a client response handler
|
||||||
|
request.handler(resp -> {
|
||||||
|
// react on the server response
|
||||||
|
});
|
||||||
|
|
||||||
|
// write the CloudEvent to the given HTTP Post request object
|
||||||
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, request);
|
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, request);
|
||||||
request.handler(resp -> {
|
|
||||||
context.assertEquals(resp.statusCode(), 200);
|
|
||||||
});
|
|
||||||
request.end();
|
request.end();
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Copyright 2018 The CloudEvents Authors
|
* Copyright 2018 The CloudEvents Authors
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
|
@ -16,6 +16,7 @@
|
||||||
package io.cloudevents.http.vertx;
|
package io.cloudevents.http.vertx;
|
||||||
|
|
||||||
import io.cloudevents.CloudEvent;
|
import io.cloudevents.CloudEvent;
|
||||||
|
import io.cloudevents.Extension;
|
||||||
import io.cloudevents.http.vertx.impl.VertxCloudEventsImpl;
|
import io.cloudevents.http.vertx.impl.VertxCloudEventsImpl;
|
||||||
import io.vertx.codegen.annotations.GenIgnore;
|
import io.vertx.codegen.annotations.GenIgnore;
|
||||||
import io.vertx.codegen.annotations.VertxGen;
|
import io.vertx.codegen.annotations.VertxGen;
|
||||||
|
|
@ -24,6 +25,8 @@ import io.vertx.core.Handler;
|
||||||
import io.vertx.core.http.HttpClientRequest;
|
import io.vertx.core.http.HttpClientRequest;
|
||||||
import io.vertx.core.http.HttpServerRequest;
|
import io.vertx.core.http.HttpServerRequest;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@VertxGen
|
@VertxGen
|
||||||
public interface VertxCloudEvents {
|
public interface VertxCloudEvents {
|
||||||
|
|
||||||
|
|
@ -34,6 +37,9 @@ public interface VertxCloudEvents {
|
||||||
@GenIgnore(GenIgnore.PERMITTED_TYPE)
|
@GenIgnore(GenIgnore.PERMITTED_TYPE)
|
||||||
<T> void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<T>>> resultHandler);
|
<T> void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<T>>> resultHandler);
|
||||||
|
|
||||||
|
@GenIgnore(GenIgnore.PERMITTED_TYPE)
|
||||||
|
<T> void readFromRequest(HttpServerRequest request, Class[] extensions, Handler<AsyncResult<CloudEvent<T>>> resultHandler);
|
||||||
|
|
||||||
@GenIgnore(GenIgnore.PERMITTED_TYPE)
|
@GenIgnore(GenIgnore.PERMITTED_TYPE)
|
||||||
<T> void writeToHttpClientRequest(CloudEvent<T> ce, HttpClientRequest request);
|
<T> void writeToHttpClientRequest(CloudEvent<T> ce, HttpClientRequest request);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Copyright 2018 The CloudEvents Authors
|
* Copyright 2018 The CloudEvents Authors
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
|
@ -17,6 +17,11 @@ package io.cloudevents.http.vertx.impl;
|
||||||
|
|
||||||
import io.cloudevents.CloudEvent;
|
import io.cloudevents.CloudEvent;
|
||||||
import io.cloudevents.CloudEventBuilder;
|
import io.cloudevents.CloudEventBuilder;
|
||||||
|
import io.cloudevents.Extension;
|
||||||
|
import io.cloudevents.SpecVersion;
|
||||||
|
import io.cloudevents.extensions.DistributedTracingExtension;
|
||||||
|
import io.cloudevents.http.HttpTransportAttributes;
|
||||||
|
import io.cloudevents.http.V02HttpTransportMappers;
|
||||||
import io.cloudevents.http.vertx.VertxCloudEvents;
|
import io.cloudevents.http.vertx.VertxCloudEvents;
|
||||||
import io.vertx.core.AsyncResult;
|
import io.vertx.core.AsyncResult;
|
||||||
import io.vertx.core.Future;
|
import io.vertx.core.Future;
|
||||||
|
|
@ -26,21 +31,19 @@ import io.vertx.core.buffer.Buffer;
|
||||||
import io.vertx.core.http.HttpClientRequest;
|
import io.vertx.core.http.HttpClientRequest;
|
||||||
import io.vertx.core.http.HttpHeaders;
|
import io.vertx.core.http.HttpHeaders;
|
||||||
import io.vertx.core.http.HttpServerRequest;
|
import io.vertx.core.http.HttpServerRequest;
|
||||||
|
import io.vertx.core.json.Json;
|
||||||
|
import io.vertx.core.json.JsonObject;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.time.format.DateTimeFormatter;
|
import java.time.format.DateTimeFormatter;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import static io.cloudevents.CloudEvent.CLOUD_EVENTS_VERSION_KEY;
|
|
||||||
import static io.cloudevents.CloudEvent.EVENT_ID_KEY;
|
|
||||||
import static io.cloudevents.CloudEvent.EVENT_TIME_KEY;
|
|
||||||
import static io.cloudevents.CloudEvent.EVENT_TYPE_KEY;
|
|
||||||
import static io.cloudevents.CloudEvent.EVENT_TYPE_VERSION_KEY;
|
|
||||||
import static io.cloudevents.CloudEvent.HEADER_PREFIX;
|
|
||||||
import static io.cloudevents.CloudEvent.SCHEMA_URL_KEY;
|
|
||||||
import static io.cloudevents.CloudEvent.SOURCE_KEY;
|
|
||||||
|
|
||||||
public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
||||||
|
|
||||||
|
|
@ -58,41 +61,75 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public <T> void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<T>>> resultHandler) {
|
public <T> void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<T>>> resultHandler) {
|
||||||
|
this.readFromRequest(request, null, resultHandler);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> void readFromRequest(HttpServerRequest request, Class[] extensions, Handler<AsyncResult<CloudEvent<T>>> resultHandler) {
|
||||||
|
|
||||||
final MultiMap headers = request.headers();
|
final MultiMap headers = request.headers();
|
||||||
final CloudEventBuilder builder = new CloudEventBuilder();
|
final CloudEventBuilder builder = new CloudEventBuilder();
|
||||||
|
|
||||||
try {
|
|
||||||
// just check, no need to set the version
|
|
||||||
readRequiredHeaderValue(headers, CLOUD_EVENTS_VERSION_KEY);
|
|
||||||
|
|
||||||
|
final HttpTransportAttributes httpTransportKeys;
|
||||||
|
{
|
||||||
|
if (headers.contains(V02HttpTransportMappers.SPEC_VERSION_KEY)) {
|
||||||
|
httpTransportKeys = HttpTransportAttributes.getHttpAttributesForSpec(SpecVersion.V_02);
|
||||||
|
} else {
|
||||||
|
httpTransportKeys = HttpTransportAttributes.getHttpAttributesForSpec(SpecVersion.V_01);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
builder
|
builder
|
||||||
// set required values
|
// set required values
|
||||||
.eventType(readRequiredHeaderValue(headers, EVENT_TYPE_KEY))
|
.specVersion(readRequiredHeaderValue(headers, httpTransportKeys.specVersionKey()))
|
||||||
.source(URI.create(readRequiredHeaderValue(headers ,SOURCE_KEY)))
|
.type(readRequiredHeaderValue(headers, httpTransportKeys.typeKey()))
|
||||||
.eventID(readRequiredHeaderValue(headers, EVENT_ID_KEY))
|
.source(URI.create(readRequiredHeaderValue(headers ,httpTransportKeys.sourceKey())))
|
||||||
|
.id(readRequiredHeaderValue(headers, httpTransportKeys.idKey()))
|
||||||
|
|
||||||
// set optional values
|
// set optional values
|
||||||
.eventTypeVersion(headers.get(EVENT_TYPE_VERSION_KEY))
|
|
||||||
.contentType(headers.get(HttpHeaders.CONTENT_TYPE));
|
.contentType(headers.get(HttpHeaders.CONTENT_TYPE));
|
||||||
|
|
||||||
final String eventTime = headers.get(EVENT_TIME_KEY);
|
final String eventTime = headers.get(httpTransportKeys.timeKey());
|
||||||
if (eventTime != null) {
|
if (eventTime != null) {
|
||||||
builder.eventTime(ZonedDateTime.parse(eventTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME));
|
builder.time(ZonedDateTime.parse(eventTime, DateTimeFormatter.ISO_OFFSET_DATE_TIME));
|
||||||
}
|
}
|
||||||
|
|
||||||
final String schemaURL = headers.get(SCHEMA_URL_KEY);
|
final String schemaURL = headers.get(httpTransportKeys.schemaUrlKey());
|
||||||
if (schemaURL != null) {
|
if (schemaURL != null) {
|
||||||
builder.schemaURL(URI.create(schemaURL));
|
builder.schemaURL(URI.create(schemaURL));
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the extensions
|
|
||||||
final Map<String, String> extensions =
|
|
||||||
headers.entries().stream()
|
|
||||||
.filter(header -> header.getKey().startsWith(HEADER_PREFIX))
|
|
||||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
|
||||||
|
|
||||||
builder.extensions(extensions);
|
if (extensions != null && extensions.length > 0) {
|
||||||
|
|
||||||
|
// move this out
|
||||||
|
Arrays.asList(extensions).forEach(ext -> {
|
||||||
|
|
||||||
|
try {
|
||||||
|
Object extObj = ext.newInstance();
|
||||||
|
final JsonObject extension = new JsonObject();
|
||||||
|
Field[] fields = ext.getDeclaredFields();
|
||||||
|
|
||||||
|
for (Field field : fields) {
|
||||||
|
boolean accessible = field.isAccessible();
|
||||||
|
field.setAccessible(true);
|
||||||
|
field.set(extObj, request.headers().get(field.getName()));
|
||||||
|
field.setAccessible(accessible);
|
||||||
|
}
|
||||||
|
builder.extension((Extension) extObj);
|
||||||
|
} catch (InstantiationException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
request.bodyHandler((Buffer buff) -> {
|
request.bodyHandler((Buffer buff) -> {
|
||||||
|
|
||||||
if (buff.length()>0) {
|
if (buff.length()>0) {
|
||||||
|
|
@ -115,29 +152,40 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
||||||
request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
|
request.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HttpTransportAttributes httpTransportAttributes = HttpTransportAttributes.getHttpAttributesForSpec(SpecVersion.fromVersion(ce.getSepcVersion()));
|
||||||
|
|
||||||
// read required headers
|
// read required headers
|
||||||
request
|
request
|
||||||
.putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaders.createOptimized("application/json"))
|
.putHeader(HttpHeaders.CONTENT_TYPE, HttpHeaders.createOptimized("application/json"))
|
||||||
.putHeader(HttpHeaders.createOptimized(CLOUD_EVENTS_VERSION_KEY), HttpHeaders.createOptimized(ce.getCloudEventsVersion()))
|
.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.specVersionKey()), HttpHeaders.createOptimized(ce.getSepcVersion()))
|
||||||
.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_KEY), HttpHeaders.createOptimized(ce.getEventType()))
|
.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.typeKey()), HttpHeaders.createOptimized(ce.getType()))
|
||||||
.putHeader(HttpHeaders.createOptimized(SOURCE_KEY), HttpHeaders.createOptimized(ce.getSource().toString()))
|
.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.sourceKey()), HttpHeaders.createOptimized(ce.getSource().toString()))
|
||||||
.putHeader(HttpHeaders.createOptimized(EVENT_ID_KEY), HttpHeaders.createOptimized(ce.getEventID()));
|
.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.idKey()), HttpHeaders.createOptimized(ce.getId()));
|
||||||
|
|
||||||
// read optional headers
|
// read optional headers
|
||||||
ce.getEventTypeVersion().ifPresent(eventTypeVersion -> {
|
ce.getTime().ifPresent(eventTime -> {
|
||||||
request.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_VERSION_KEY), HttpHeaders.createOptimized(eventTypeVersion));
|
request.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.timeKey()), HttpHeaders.createOptimized(eventTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)));
|
||||||
});
|
|
||||||
|
|
||||||
ce.getEventTime().ifPresent(eventTime -> {
|
|
||||||
request.putHeader(HttpHeaders.createOptimized(EVENT_TIME_KEY), HttpHeaders.createOptimized(eventTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)));
|
|
||||||
});
|
});
|
||||||
|
|
||||||
ce.getSchemaURL().ifPresent(schemaUrl -> {
|
ce.getSchemaURL().ifPresent(schemaUrl -> {
|
||||||
request.putHeader(HttpHeaders.createOptimized(SCHEMA_URL_KEY), HttpHeaders.createOptimized(schemaUrl.toString()));
|
request.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.schemaUrlKey()), HttpHeaders.createOptimized(schemaUrl.toString()));
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
ce.getExtensions().ifPresent(extensions -> {
|
||||||
|
|
||||||
|
extensions.forEach(ext -> {
|
||||||
|
JsonObject.mapFrom(ext).forEach(extEntry -> {
|
||||||
|
request.putHeader(HttpHeaders.createOptimized(extEntry.getKey()), HttpHeaders.createOptimized(extEntry.getValue().toString()));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
ce.getData().ifPresent(data -> {
|
ce.getData().ifPresent(data -> {
|
||||||
request.write(data.toString());
|
request.write(data.toString());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Copyright 2018 The CloudEvents Authors
|
* Copyright 2018 The CloudEvents Authors
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Copyright 2018 The CloudEvents Authors
|
* Copyright 2018 The CloudEvents Authors
|
||||||
*
|
*
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
|
@ -17,6 +17,9 @@ package io.cloudevents.http.vertx;
|
||||||
|
|
||||||
import io.cloudevents.CloudEvent;
|
import io.cloudevents.CloudEvent;
|
||||||
import io.cloudevents.CloudEventBuilder;
|
import io.cloudevents.CloudEventBuilder;
|
||||||
|
import io.cloudevents.extensions.DistributedTracingExtension;
|
||||||
|
import io.cloudevents.http.V01HttpTransportMappers;
|
||||||
|
import io.cloudevents.http.V02HttpTransportMappers;
|
||||||
import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
|
import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
|
||||||
import io.vertx.core.http.HttpHeaders;
|
import io.vertx.core.http.HttpHeaders;
|
||||||
import io.vertx.junit5.Checkpoint;
|
import io.vertx.junit5.Checkpoint;
|
||||||
|
|
@ -29,10 +32,11 @@ import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.api.extension.ExtendWith;
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.List;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
import static io.cloudevents.CloudEvent.CLOUD_EVENTS_VERSION_KEY;
|
import static io.cloudevents.SpecVersion.V_01;
|
||||||
import static io.cloudevents.CloudEvent.EVENT_TYPE_KEY;
|
import static io.cloudevents.SpecVersion.V_02;
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@ExtendWith(VertxExtension.class)
|
@ExtendWith(VertxExtension.class)
|
||||||
|
|
@ -41,15 +45,16 @@ class VertxCloudEventsTests {
|
||||||
private final static Logger logger = Logger.getLogger(VertxCloudEventsTests.class.getName());
|
private final static Logger logger = Logger.getLogger(VertxCloudEventsTests.class.getName());
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("Post a cloud event with a payload")
|
@DisplayName("Post a 0.2 CloudEvents object with a payload")
|
||||||
void cloudEventWithPayload(Vertx vertx, VertxTestContext testContext) {
|
void cloudEventWithPayload(Vertx vertx, VertxTestContext testContext) {
|
||||||
Checkpoint serverCheckpoint = testContext.checkpoint();
|
Checkpoint serverCheckpoint = testContext.checkpoint();
|
||||||
Checkpoint clientCheckpoint = testContext.checkpoint();
|
Checkpoint clientCheckpoint = testContext.checkpoint();
|
||||||
|
|
||||||
CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
|
CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
|
||||||
|
.specVersion("0.2")
|
||||||
.source(URI.create("http://knative-eventing.com"))
|
.source(URI.create("http://knative-eventing.com"))
|
||||||
.eventID("foo-bar")
|
.id("foo-bar")
|
||||||
.eventType("pushevent")
|
.type("pushevent")
|
||||||
.data("{\"foo\":\"bar\"}}")
|
.data("{\"foo\":\"bar\"}}")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
@ -59,9 +64,9 @@ class VertxCloudEventsTests {
|
||||||
.rxReadFromRequest(req)
|
.rxReadFromRequest(req)
|
||||||
.doOnError(testContext::failNow)
|
.doOnError(testContext::failNow)
|
||||||
.subscribe(event -> testContext.verify(() -> {
|
.subscribe(event -> testContext.verify(() -> {
|
||||||
assertThat(event.getEventID()).isEqualTo(cloudEvent.getEventID());
|
assertThat(event.getId()).isEqualTo(cloudEvent.getId());
|
||||||
assertThat(event.getSource().toString()).isEqualTo(cloudEvent.getSource().toString());
|
assertThat(event.getSource().toString()).isEqualTo(cloudEvent.getSource().toString());
|
||||||
assertThat(event.getEventType()).isEqualTo(cloudEvent.getEventType());
|
assertThat(event.getType()).isEqualTo(cloudEvent.getType());
|
||||||
assertThat(event.getData()).isPresent();
|
assertThat(event.getData()).isPresent();
|
||||||
req.response().end();
|
req.response().end();
|
||||||
serverCheckpoint.flag();
|
serverCheckpoint.flag();
|
||||||
|
|
@ -80,15 +85,16 @@ class VertxCloudEventsTests {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@DisplayName("Post a cloud event without a payload")
|
@DisplayName("Post a 0.2 CloudEvents object without a payload")
|
||||||
void cloudEventWithoutPayload(Vertx vertx, VertxTestContext testContext) {
|
void cloudEventWithoutPayload(Vertx vertx, VertxTestContext testContext) {
|
||||||
Checkpoint serverCheckpoint = testContext.checkpoint();
|
Checkpoint serverCheckpoint = testContext.checkpoint();
|
||||||
Checkpoint clientCheckpoint = testContext.checkpoint();
|
Checkpoint clientCheckpoint = testContext.checkpoint();
|
||||||
|
|
||||||
CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
|
CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
|
||||||
|
.specVersion("0.2")
|
||||||
.source(URI.create("http://knative-eventing.com"))
|
.source(URI.create("http://knative-eventing.com"))
|
||||||
.eventID("foo-bar")
|
.id("foo-bar")
|
||||||
.eventType("pushevent")
|
.type("pushevent")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
vertx.createHttpServer()
|
vertx.createHttpServer()
|
||||||
|
|
@ -97,9 +103,65 @@ class VertxCloudEventsTests {
|
||||||
.rxReadFromRequest(req)
|
.rxReadFromRequest(req)
|
||||||
.doOnError(testContext::failNow)
|
.doOnError(testContext::failNow)
|
||||||
.subscribe(event -> testContext.verify(() -> {
|
.subscribe(event -> testContext.verify(() -> {
|
||||||
assertThat(event.getEventID()).isEqualTo(cloudEvent.getEventID());
|
|
||||||
|
// check headers
|
||||||
|
assertThat(req.headers().get(V02HttpTransportMappers.SPEC_VERSION_KEY)).isEqualTo(V_02.toString());
|
||||||
|
assertThat(req.headers().get(V01HttpTransportMappers.SPEC_VERSION_KEY)).isNull();
|
||||||
|
assertThat(req.headers().get("ce-id")).isEqualTo("foo-bar");
|
||||||
|
assertThat(req.headers().get("ce-eventID")).isNull();
|
||||||
|
|
||||||
|
// check parsed object
|
||||||
|
assertThat(event.getId()).isEqualTo(cloudEvent.getId());
|
||||||
assertThat(event.getSource().toString()).isEqualTo(cloudEvent.getSource().toString());
|
assertThat(event.getSource().toString()).isEqualTo(cloudEvent.getSource().toString());
|
||||||
assertThat(event.getEventType()).isEqualTo(cloudEvent.getEventType());
|
assertThat(event.getType()).isEqualTo(cloudEvent.getType());
|
||||||
|
assertThat(event.getData()).isNotPresent();
|
||||||
|
req.response().end();
|
||||||
|
serverCheckpoint.flag();
|
||||||
|
})))
|
||||||
|
.rxListen(8080)
|
||||||
|
.doOnError(testContext::failNow)
|
||||||
|
.subscribe(server -> {
|
||||||
|
HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
|
||||||
|
req.handler(resp -> testContext.verify(() -> {
|
||||||
|
assertThat(resp.statusCode()).isEqualTo(200);
|
||||||
|
clientCheckpoint.flag();
|
||||||
|
}));
|
||||||
|
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
|
||||||
|
req.end();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("Post a 0.1 CloudEvents object without a payload")
|
||||||
|
void cloudEventWithoutPayload01(Vertx vertx, VertxTestContext testContext) {
|
||||||
|
Checkpoint serverCheckpoint = testContext.checkpoint();
|
||||||
|
Checkpoint clientCheckpoint = testContext.checkpoint();
|
||||||
|
|
||||||
|
CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
|
||||||
|
.specVersion("0.1")
|
||||||
|
.source(URI.create("http://knative-eventing.com"))
|
||||||
|
.id("foo-bar")
|
||||||
|
.type("pushevent")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
vertx.createHttpServer()
|
||||||
|
.requestHandler(req -> VertxCloudEvents
|
||||||
|
.create()
|
||||||
|
.rxReadFromRequest(req)
|
||||||
|
.doOnError(testContext::failNow)
|
||||||
|
.subscribe(event -> testContext.verify(() -> {
|
||||||
|
|
||||||
|
// check headers
|
||||||
|
assertThat(req.headers().get(V01HttpTransportMappers.SPEC_VERSION_KEY)).isEqualTo(V_01.toString());
|
||||||
|
assertThat(req.headers().get(V02HttpTransportMappers.SPEC_VERSION_KEY)).isNull();
|
||||||
|
assertThat(req.headers().get("ce-eventID")).isEqualTo("foo-bar");
|
||||||
|
assertThat(req.headers().get("ce-id")).isNull();
|
||||||
|
|
||||||
|
// check parsed object
|
||||||
|
assertThat(event.getId()).isEqualTo(cloudEvent.getId());
|
||||||
|
assertThat(event.getSepcVersion().toString()).isEqualTo(cloudEvent.getSepcVersion());
|
||||||
|
assertThat(event.getSource().toString()).isEqualTo(cloudEvent.getSource().toString());
|
||||||
|
assertThat(event.getType()).isEqualTo(cloudEvent.getType());
|
||||||
assertThat(event.getData()).isNotPresent();
|
assertThat(event.getData()).isNotPresent();
|
||||||
req.response().end();
|
req.response().end();
|
||||||
serverCheckpoint.flag();
|
serverCheckpoint.flag();
|
||||||
|
|
@ -140,8 +202,9 @@ class VertxCloudEventsTests {
|
||||||
.subscribe(server -> {
|
.subscribe(server -> {
|
||||||
HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
|
HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
|
||||||
// create incomplete CloudEvent request
|
// create incomplete CloudEvent request
|
||||||
req.putHeader(HttpHeaders.createOptimized(CLOUD_EVENTS_VERSION_KEY), HttpHeaders.createOptimized("0.1"));
|
req.putHeader(HttpHeaders.createOptimized("ce-specversion"), HttpHeaders.createOptimized("0.2"));
|
||||||
req.putHeader(HttpHeaders.createOptimized(EVENT_TYPE_KEY), HttpHeaders.createOptimized("pushevent"));
|
req.putHeader(HttpHeaders.createOptimized("ce-type"), HttpHeaders.createOptimized("pushevent"));
|
||||||
|
req.putHeader(HttpHeaders.createOptimized("foo"), HttpHeaders.createOptimized("bar"));
|
||||||
req.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
|
req.putHeader(HttpHeaders.CONTENT_LENGTH, HttpHeaders.createOptimized("0"));
|
||||||
req.handler(resp -> testContext.verify(() -> {
|
req.handler(resp -> testContext.verify(() -> {
|
||||||
assertThat(resp.statusCode()).isEqualTo(200);
|
assertThat(resp.statusCode()).isEqualTo(200);
|
||||||
|
|
@ -150,4 +213,59 @@ class VertxCloudEventsTests {
|
||||||
req.end();
|
req.end();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@DisplayName("Post a 0.2 CloudEvents object with a payload")
|
||||||
|
void cloudEventWithExtension(Vertx vertx, VertxTestContext testContext) {
|
||||||
|
Checkpoint serverCheckpoint = testContext.checkpoint();
|
||||||
|
Checkpoint clientCheckpoint = testContext.checkpoint();
|
||||||
|
|
||||||
|
final DistributedTracingExtension dte = new DistributedTracingExtension();
|
||||||
|
dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
|
||||||
|
dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
|
||||||
|
|
||||||
|
|
||||||
|
final CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
|
||||||
|
.specVersion("0.2")
|
||||||
|
.source(URI.create("http://knative-eventing.com"))
|
||||||
|
.id("foo-bar")
|
||||||
|
.extension(dte)
|
||||||
|
.type("pushevent")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
Class[] exceptions = {DistributedTracingExtension.class};
|
||||||
|
|
||||||
|
vertx.createHttpServer()
|
||||||
|
.requestHandler(req -> VertxCloudEvents
|
||||||
|
.create()
|
||||||
|
.rxReadFromRequest(req, exceptions)
|
||||||
|
.doOnError(testContext::failNow)
|
||||||
|
.subscribe(event -> testContext.verify(() -> {
|
||||||
|
assertThat(event.getId()).isEqualTo(cloudEvent.getId());
|
||||||
|
|
||||||
|
// extension headers
|
||||||
|
assertThat(req.headers().get("traceparent")).isEqualTo("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
|
||||||
|
assertThat(req.headers().get("tracestate")).isEqualTo("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
|
||||||
|
|
||||||
|
assertThat(event.getExtensions().get().get(0)).isNotNull();
|
||||||
|
|
||||||
|
assertThat(event.getExtensions().get().get(0)).extracting("traceparent", "tracestate")
|
||||||
|
.contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
|
||||||
|
|
||||||
|
|
||||||
|
req.response().end();
|
||||||
|
serverCheckpoint.flag();
|
||||||
|
})))
|
||||||
|
.rxListen(8080)
|
||||||
|
.doOnError(testContext::failNow)
|
||||||
|
.subscribe(server -> {
|
||||||
|
HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
|
||||||
|
req.handler(resp -> testContext.verify(() -> {
|
||||||
|
assertThat(resp.statusCode()).isEqualTo(200);
|
||||||
|
clientCheckpoint.flag();
|
||||||
|
}));
|
||||||
|
VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
|
||||||
|
req.end();
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue