diff --git a/api/src/main/java/io/cloudevents/format/BinaryMarshaller.java b/api/src/main/java/io/cloudevents/format/BinaryMarshaller.java new file mode 100644 index 00000000..0af13106 --- /dev/null +++ b/api/src/main/java/io/cloudevents/format/BinaryMarshaller.java @@ -0,0 +1,159 @@ +package io.cloudevents.format; + +import java.net.URI; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import io.cloudevents.Attributes; +import io.cloudevents.CloudEvent; +import io.cloudevents.extensions.DistributedTracingExtension; +import io.cloudevents.extensions.ExtensionFormat; +import io.cloudevents.fun.AttributeMarshaller; +import io.cloudevents.fun.BinaryFormatHeaderMapper; +import io.cloudevents.fun.DataMarshaller; +import io.cloudevents.fun.ExtensionMarshaller; +import io.cloudevents.fun.FormatBuilder; +import io.cloudevents.json.Json; +import io.cloudevents.v02.Accessor; +import io.cloudevents.v02.AttributesImpl; +import io.cloudevents.v02.CloudEventBuilder; +import io.cloudevents.v02.CloudEventImpl; +import io.cloudevents.v02.http.BinaryFormatHeaderMapperImpl; + +/** + * + * @author fabiojose + * + */ +public class BinaryMarshaller { + + public static EventStep builder() { + return new Builder<>(); + } + + public static interface EventStep { + AttributeMarshalStep + withEvent(Supplier> event); + } + + public static interface AttributeMarshalStep { + ExtensionsStep attributes(AttributeMarshaller marshaller); + } + + public static interface ExtensionsStep { + HeaderMapStep extensions(ExtensionMarshaller marshaller); + } + + public static interface HeaderMapStep { + PayloadStep headers(BinaryFormatHeaderMapper mapper); + } + + public static interface PayloadStep { + Build payload(DataMarshaller marshaller); + } + + public static interface Build { + Format

build(FormatBuilder

builder); + } + + public static class Builder implements + EventStep, AttributeMarshalStep, + ExtensionsStep, HeaderMapStep, + PayloadStep, Build { + + private CloudEvent event; + + private Map attributesMap; + + private Map headers; + + private P payload; + + private Map extensionsMap; + + @Override + public AttributeMarshalStep + withEvent(Supplier> event) { + this.event = event.get(); + return this; + } + + @Override + public ExtensionsStep attributes(AttributeMarshaller marshaller) { + this.attributesMap = marshaller.marshal(event.getAttributes()); + return this; + } + + @Override + public HeaderMapStep extensions(ExtensionMarshaller marshaller) { + this.extensionsMap = marshaller.marshal(Accessor.extensionsOf(event)); + return this; + } + + @Override + public PayloadStep headers(BinaryFormatHeaderMapper mapper) { + this.headers = mapper.map(attributesMap, extensionsMap); + return this; + } + + @Override + public Build payload(DataMarshaller marshaller) { + event.getData().ifPresent((data) -> { + try { + payload = marshaller.marshal(data, headers); + }catch(Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + }); + return this; + } + + @Override + public Format

build(FormatBuilder

builder) { + return builder.build(payload, headers); + } + } + + public static void main(String[] args) { + + final DistributedTracingExtension dt = new DistributedTracingExtension(); + dt.setTraceparent("0"); + dt.setTracestate("congo=4"); + + final ExtensionFormat tracing = new DistributedTracingExtension.Format(dt); + + final CloudEventImpl ce = + new CloudEventBuilder() + .withId("x10") + .withSource(URI.create("/source")) + .withType("event-type") + .withSchemaurl(URI.create("/schema")) + .withContenttype("text/plain") + .withExtension(tracing) + .withData("my-data") + .build(); + + Format format = + BinaryMarshaller.builder() + .withEvent(() -> ce) + .attributes(AttributesImpl.marshaller()::marshal) + .extensions((extensions) -> { + return extensions.stream() + .map(ExtensionFormat::transport) + .flatMap(t -> t.entrySet().stream()) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + }) + .headers(BinaryFormatHeaderMapperImpl.mapper()::map) + .payload((data, headers) -> { + return Json.encode(data); + }) + .build((payload, headers) -> { + return new Format<>(payload, headers); + }); + + System.out.println(format.getPayload()); + System.out.print(format.getHeaders()); + } +} diff --git a/api/src/main/java/io/cloudevents/format/BinaryUnmashaller.java b/api/src/main/java/io/cloudevents/format/BinaryUnmashaller.java new file mode 100644 index 00000000..3ddd54d9 --- /dev/null +++ b/api/src/main/java/io/cloudevents/format/BinaryUnmashaller.java @@ -0,0 +1,290 @@ +package io.cloudevents.format; + +import static java.util.stream.Collectors.toMap; + +import java.util.ArrayList; +import java.util.AbstractMap.SimpleEntry; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; + +import com.fasterxml.jackson.annotation.JsonUnwrapped; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.Attributes; +import io.cloudevents.CloudEvent; +import io.cloudevents.extensions.DistributedTracingExtension; +import io.cloudevents.extensions.ExtensionFormat; +import io.cloudevents.extensions.InMemoryFormat; +import io.cloudevents.fun.BinaryFormatAttributeMapper; +import io.cloudevents.fun.BinaryFormatExtensionMapper; +import io.cloudevents.fun.AttributeUnmarshaller; +import io.cloudevents.fun.DataUnmarshaller; +import io.cloudevents.fun.EventBuilder; +import io.cloudevents.fun.ExtensionUmarshaller; +import io.cloudevents.v02.http.BinaryFormatAttributeMapperImpl; +import io.cloudevents.v02.http.BinaryFormatExtensionMapperImpl; +import io.cloudevents.v02.AttributesImpl; +import io.cloudevents.v02.CloudEventBuilder; + +/** + * + * + * @author fabiojose + * + * @param + * @param + */ +public class BinaryUnmashaller { + + private BinaryUnmashaller(){} + + public static HeadersStep builder() { + return new Builder<>(); + } + + public interface HeadersStep { + PayloadStep withHeaders(Supplier> headers); + } + + public interface PayloadStep { + AttributeMapStep withPayload(Supplier

payload); + } + + public interface AttributeMapStep { + AttributeUmarshallStep attributes(BinaryFormatAttributeMapper un); + } + + public interface AttributeUmarshallStep { + DataUnmarshallStep attributes(AttributeUnmarshaller un); + } + + public interface DataUnmarshallStep { + ExtensionsMapStep data(DataUnmarshaller un); + } + + public interface ExtensionsMapStep { + ExtensionsStepBegin extensions(BinaryFormatExtensionMapper mapper); + } + + public interface ExtensionsStepBegin { + ExtensionsStep begin(); + } + + public interface ExtensionsStep { + ExtensionsStep extension(ExtensionUmarshaller un); + Build end(); + } + + public interface Build { + CloudEvent build(EventBuilder builder); + } + + public static class Wrapper { + private String name; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + } + + public static class Dummy { + private String foo; + private Wrapper wrap; + + public Dummy() { + this.wrap = new Wrapper(); + this.wrap.setName("common.name"); + this.setFoo("bar"); + } + + public String getFoo() { + return foo; + } + + public void setFoo(String foo) { + this.foo = foo; + } + + @JsonUnwrapped + public Wrapper getWrap() { + return wrap; + } + + public void setWrap(Wrapper wrap) { + this.wrap = wrap; + } + } + + public static class Builder + implements HeadersStep, PayloadStep, + AttributeMapStep, AttributeUmarshallStep, + DataUnmarshallStep, + ExtensionsMapStep, ExtensionsStepBegin, + ExtensionsStep, + Build { + + private Map headers; + private Map attributesMap; + + private P payload; + private T data; + + private A attributes; + + private Map extensionsMap; + private List extensions = new ArrayList<>(); + + @Override + public PayloadStep withHeaders(Supplier> headers) { + + this.headers = headers.get() + .entrySet() + .stream() + .map((entry) -> + new SimpleEntry<>(entry.getKey(), entry.getValue())) + .collect(toMap(Map.Entry::getKey, + Map.Entry::getValue)); + + return this; + } + + @Override + public AttributeMapStep withPayload(Supplier

payload) { + this.payload = payload.get(); + return this; + } + + @Override + public AttributeUmarshallStep attributes(BinaryFormatAttributeMapper un) { + this.attributesMap = un.map(headers); + return this; + } + + @Override + public DataUnmarshallStep attributes(AttributeUnmarshaller un) { + this.attributes = un.unmarshal(attributesMap); + return this; + } + + @Override + public ExtensionsMapStep data(DataUnmarshaller un) { + try { + this.data = un.unmarshall(this.payload, this.attributes); + return this; + }catch(Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public ExtensionsStepBegin extensions(BinaryFormatExtensionMapper mapper) { + this.extensionsMap = mapper.map(headers); + return this; + } + + @Override + public ExtensionsStep begin() { + return this; + } + + @Override + public ExtensionsStep extension(ExtensionUmarshaller un) { + Optional ef = un.unmarshal(extensionsMap); + ef.ifPresent((value) ->{ + this.extensions.add(value); + }); + return this; + } + + @Override + public Build end() { + return this; + } + + @Override + public CloudEvent build(EventBuilder builder) { + return builder.build(data, attributes, extensions); + } + + } + + public static void main(String[] args) throws Exception { + + Map myHeaders = new HashMap<>(); + myHeaders.put("ce-id", "0x11"); + myHeaders.put("ce-source", "/source"); + myHeaders.put("ce-specversion", "0.3"); + myHeaders.put("ce-type", "br.my"); + myHeaders.put("ce-time", "2019-09-16T20:49:00Z"); + myHeaders.put("ce-schemaurl", "http://my.br"); + myHeaders.put("my-ext", "my-custom extension"); + myHeaders.put("traceparent", "0"); + myHeaders.put("tracestate", "congo=4"); + myHeaders.put("Content-Type", "application/json"); + + String myPayload = "{\"foo\" : \"rocks\", \"name\" : \"jocker\"}"; + + //String payload = "{\"id\":\"0x19\",\"type\":\"aws.s3.object.created\",\"time\":\"2018-04-26T14:48:09.769Z\",\"source\":\"/source\",\"contenttype\":\"application/json\",\"specversion\":\"0.2\"}"; + + ObjectMapper mapper = new ObjectMapper(); + System.out.println(mapper.writeValueAsString(new Dummy())); + + CloudEvent event = BinaryUnmashaller. + builder() + .withHeaders(() -> myHeaders) + .withPayload(() -> myPayload) + .attributes(new BinaryFormatAttributeMapperImpl()::map) + .attributes(AttributesImpl.unmarshaller()::unmarshal) + .data((payload, attributes) -> { + return mapper.readValue(payload, Dummy.class); + }) + .extensions(new BinaryFormatExtensionMapperImpl()::map) + .begin() + .extension((exts) -> { + String key = "my-ext"; + String value = exts.get(key); + + ExtensionFormat result = null; + if(null!= value) { + result = ExtensionFormat.of(InMemoryFormat + .of(key, value, Object.class), key, value); + } + return Optional.ofNullable(result); + }) + .extension((exts) -> { + String traceparent = exts.get("traceparent"); + String tracestate = exts.get("tracestate"); + + if(null!= traceparent && null!= tracestate) { + DistributedTracingExtension dte = new DistributedTracingExtension(); + dte.setTraceparent(traceparent); + dte.setTracestate(tracestate); + + InMemoryFormat inMemory = + InMemoryFormat.of("distributedTracing", dte, Object.class); + + return Optional.of( + ExtensionFormat.of(inMemory, + new SimpleEntry<>("traceparent", traceparent), + new SimpleEntry<>("tracestate", tracestate)) + ); + + } + + return Optional.empty(); + }) + .end() + .build(CloudEventBuilder.builder()::build); + + System.out.println(event.getAttributes()); + System.out.println(event.getData()); + System.out.println(event.getExtensions()); + } +} diff --git a/api/src/main/java/io/cloudevents/format/Format.java b/api/src/main/java/io/cloudevents/format/Format.java new file mode 100644 index 00000000..c43253cf --- /dev/null +++ b/api/src/main/java/io/cloudevents/format/Format.java @@ -0,0 +1,50 @@ +/** + * Copyright 2019 The CloudEvents Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.cloudevents.format; + +import java.util.Map; +import java.util.Optional; + +/** + * Represents a result of binary marshalling, to be used to wire serialization + * + * @author fabiojose + * + */ +public class Format { + + private final T payload; + private final Map headers; + + public Format(T payload, Map headers) { + this.payload = payload; + this.headers = headers; + } + + /** + * The payload + */ + public Optional getPayload() { + return Optional.ofNullable(payload); + } + + /** + * The headers + */ + public Map getHeaders() { + return headers; + } +}