The format, to use as a result of binary marshalling
Signed-off-by: Fabio José <fabiojose@gmail.com>
This commit is contained in:
parent
0d2e1d7579
commit
c941799977
|
|
@ -0,0 +1,159 @@
|
|||
package io.cloudevents.format;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import io.cloudevents.Attributes;
|
||||
import io.cloudevents.CloudEvent;
|
||||
import io.cloudevents.extensions.DistributedTracingExtension;
|
||||
import io.cloudevents.extensions.ExtensionFormat;
|
||||
import io.cloudevents.fun.AttributeMarshaller;
|
||||
import io.cloudevents.fun.BinaryFormatHeaderMapper;
|
||||
import io.cloudevents.fun.DataMarshaller;
|
||||
import io.cloudevents.fun.ExtensionMarshaller;
|
||||
import io.cloudevents.fun.FormatBuilder;
|
||||
import io.cloudevents.json.Json;
|
||||
import io.cloudevents.v02.Accessor;
|
||||
import io.cloudevents.v02.AttributesImpl;
|
||||
import io.cloudevents.v02.CloudEventBuilder;
|
||||
import io.cloudevents.v02.CloudEventImpl;
|
||||
import io.cloudevents.v02.http.BinaryFormatHeaderMapperImpl;
|
||||
|
||||
/**
|
||||
*
|
||||
* @author fabiojose
|
||||
*
|
||||
*/
|
||||
public class BinaryMarshaller<P, T, A extends Attributes> {
|
||||
|
||||
public static <P, T, A extends Attributes> EventStep<P, T, A> builder() {
|
||||
return new Builder<>();
|
||||
}
|
||||
|
||||
public static interface EventStep<P, T, A extends Attributes> {
|
||||
AttributeMarshalStep<P, T, A>
|
||||
withEvent(Supplier<CloudEvent<A, T>> event);
|
||||
}
|
||||
|
||||
public static interface AttributeMarshalStep<P, T, A extends Attributes> {
|
||||
ExtensionsStep<P, T, A> attributes(AttributeMarshaller<A> marshaller);
|
||||
}
|
||||
|
||||
public static interface ExtensionsStep<P, T, A extends Attributes> {
|
||||
HeaderMapStep<P, T, A> extensions(ExtensionMarshaller marshaller);
|
||||
}
|
||||
|
||||
public static interface HeaderMapStep<P, T, A extends Attributes> {
|
||||
PayloadStep<P, T, A> headers(BinaryFormatHeaderMapper mapper);
|
||||
}
|
||||
|
||||
public static interface PayloadStep<P, T, A extends Attributes> {
|
||||
Build<P, T, A> payload(DataMarshaller<P, T> marshaller);
|
||||
}
|
||||
|
||||
public static interface Build<P, T, A extends Attributes> {
|
||||
Format<P> build(FormatBuilder<P> builder);
|
||||
}
|
||||
|
||||
public static class Builder<P, T, A extends Attributes> implements
|
||||
EventStep<P, T, A>, AttributeMarshalStep<P, T, A>,
|
||||
ExtensionsStep<P, T, A>, HeaderMapStep<P, T, A>,
|
||||
PayloadStep<P, T, A>, Build<P, T, A> {
|
||||
|
||||
private CloudEvent<A, T> event;
|
||||
|
||||
private Map<String, String> attributesMap;
|
||||
|
||||
private Map<String, Object> headers;
|
||||
|
||||
private P payload;
|
||||
|
||||
private Map<String, String> extensionsMap;
|
||||
|
||||
@Override
|
||||
public AttributeMarshalStep<P, T, A>
|
||||
withEvent(Supplier<CloudEvent<A, T>> event) {
|
||||
this.event = event.get();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtensionsStep<P, T, A> attributes(AttributeMarshaller<A> marshaller) {
|
||||
this.attributesMap = marshaller.marshal(event.getAttributes());
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HeaderMapStep<P, T, A> extensions(ExtensionMarshaller marshaller) {
|
||||
this.extensionsMap = marshaller.marshal(Accessor.extensionsOf(event));
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PayloadStep<P, T, A> headers(BinaryFormatHeaderMapper mapper) {
|
||||
this.headers = mapper.map(attributesMap, extensionsMap);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Build<P, T, A> payload(DataMarshaller<P, T> marshaller) {
|
||||
event.getData().ifPresent((data) -> {
|
||||
try {
|
||||
payload = marshaller.marshal(data, headers);
|
||||
}catch(Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Format<P> build(FormatBuilder<P> builder) {
|
||||
return builder.build(payload, headers);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
||||
final DistributedTracingExtension dt = new DistributedTracingExtension();
|
||||
dt.setTraceparent("0");
|
||||
dt.setTracestate("congo=4");
|
||||
|
||||
final ExtensionFormat tracing = new DistributedTracingExtension.Format(dt);
|
||||
|
||||
final CloudEventImpl<String> ce =
|
||||
new CloudEventBuilder<String>()
|
||||
.withId("x10")
|
||||
.withSource(URI.create("/source"))
|
||||
.withType("event-type")
|
||||
.withSchemaurl(URI.create("/schema"))
|
||||
.withContenttype("text/plain")
|
||||
.withExtension(tracing)
|
||||
.withData("my-data")
|
||||
.build();
|
||||
|
||||
Format<String> format =
|
||||
BinaryMarshaller.<String, String, AttributesImpl>builder()
|
||||
.withEvent(() -> ce)
|
||||
.attributes(AttributesImpl.marshaller()::marshal)
|
||||
.extensions((extensions) -> {
|
||||
return extensions.stream()
|
||||
.map(ExtensionFormat::transport)
|
||||
.flatMap(t -> t.entrySet().stream())
|
||||
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
|
||||
})
|
||||
.headers(BinaryFormatHeaderMapperImpl.mapper()::map)
|
||||
.payload((data, headers) -> {
|
||||
return Json.encode(data);
|
||||
})
|
||||
.build((payload, headers) -> {
|
||||
return new Format<>(payload, headers);
|
||||
});
|
||||
|
||||
System.out.println(format.getPayload());
|
||||
System.out.print(format.getHeaders());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,290 @@
|
|||
package io.cloudevents.format;
|
||||
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.AbstractMap.SimpleEntry;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonUnwrapped;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import io.cloudevents.Attributes;
|
||||
import io.cloudevents.CloudEvent;
|
||||
import io.cloudevents.extensions.DistributedTracingExtension;
|
||||
import io.cloudevents.extensions.ExtensionFormat;
|
||||
import io.cloudevents.extensions.InMemoryFormat;
|
||||
import io.cloudevents.fun.BinaryFormatAttributeMapper;
|
||||
import io.cloudevents.fun.BinaryFormatExtensionMapper;
|
||||
import io.cloudevents.fun.AttributeUnmarshaller;
|
||||
import io.cloudevents.fun.DataUnmarshaller;
|
||||
import io.cloudevents.fun.EventBuilder;
|
||||
import io.cloudevents.fun.ExtensionUmarshaller;
|
||||
import io.cloudevents.v02.http.BinaryFormatAttributeMapperImpl;
|
||||
import io.cloudevents.v02.http.BinaryFormatExtensionMapperImpl;
|
||||
import io.cloudevents.v02.AttributesImpl;
|
||||
import io.cloudevents.v02.CloudEventBuilder;
|
||||
|
||||
/**
|
||||
*
|
||||
*
|
||||
* @author fabiojose
|
||||
*
|
||||
* @param <E>
|
||||
* @param <R>
|
||||
*/
|
||||
public class BinaryUnmashaller<P, T, A extends Attributes> {
|
||||
|
||||
private BinaryUnmashaller(){}
|
||||
|
||||
public static <P, T, A extends Attributes> HeadersStep<P, T, A> builder() {
|
||||
return new Builder<>();
|
||||
}
|
||||
|
||||
public interface HeadersStep<P, T, A extends Attributes> {
|
||||
PayloadStep<P, T, A> withHeaders(Supplier<Map<String, Object>> headers);
|
||||
}
|
||||
|
||||
public interface PayloadStep<P, T, A extends Attributes> {
|
||||
AttributeMapStep<P, T, A> withPayload(Supplier<P> payload);
|
||||
}
|
||||
|
||||
public interface AttributeMapStep<P, T, A extends Attributes> {
|
||||
AttributeUmarshallStep<P, T, A> attributes(BinaryFormatAttributeMapper un);
|
||||
}
|
||||
|
||||
public interface AttributeUmarshallStep<P, T, A extends Attributes> {
|
||||
DataUnmarshallStep<P, T, A> attributes(AttributeUnmarshaller<A> un);
|
||||
}
|
||||
|
||||
public interface DataUnmarshallStep<P, T, A extends Attributes> {
|
||||
ExtensionsMapStep<P, T, A> data(DataUnmarshaller<P, T, A> un);
|
||||
}
|
||||
|
||||
public interface ExtensionsMapStep<P, T, A extends Attributes> {
|
||||
ExtensionsStepBegin<P, T, A> extensions(BinaryFormatExtensionMapper mapper);
|
||||
}
|
||||
|
||||
public interface ExtensionsStepBegin<P, T, A extends Attributes> {
|
||||
ExtensionsStep<P, T, A> begin();
|
||||
}
|
||||
|
||||
public interface ExtensionsStep<P, T, A extends Attributes> {
|
||||
ExtensionsStep<P, T, A> extension(ExtensionUmarshaller un);
|
||||
Build<T, A> end();
|
||||
}
|
||||
|
||||
public interface Build<T, A extends Attributes> {
|
||||
CloudEvent<A, T> build(EventBuilder<T, A> builder);
|
||||
}
|
||||
|
||||
public static class Wrapper {
|
||||
private String name;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
public void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Dummy {
|
||||
private String foo;
|
||||
private Wrapper wrap;
|
||||
|
||||
public Dummy() {
|
||||
this.wrap = new Wrapper();
|
||||
this.wrap.setName("common.name");
|
||||
this.setFoo("bar");
|
||||
}
|
||||
|
||||
public String getFoo() {
|
||||
return foo;
|
||||
}
|
||||
|
||||
public void setFoo(String foo) {
|
||||
this.foo = foo;
|
||||
}
|
||||
|
||||
@JsonUnwrapped
|
||||
public Wrapper getWrap() {
|
||||
return wrap;
|
||||
}
|
||||
|
||||
public void setWrap(Wrapper wrap) {
|
||||
this.wrap = wrap;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Builder<P, T, A extends Attributes>
|
||||
implements HeadersStep<P, T, A>, PayloadStep<P, T, A>,
|
||||
AttributeMapStep<P, T, A>, AttributeUmarshallStep<P, T, A>,
|
||||
DataUnmarshallStep<P, T, A>,
|
||||
ExtensionsMapStep<P, T, A>, ExtensionsStepBegin<P, T, A>,
|
||||
ExtensionsStep<P, T, A>,
|
||||
Build<T, A> {
|
||||
|
||||
private Map<String, Object> headers;
|
||||
private Map<String, String> attributesMap;
|
||||
|
||||
private P payload;
|
||||
private T data;
|
||||
|
||||
private A attributes;
|
||||
|
||||
private Map<String, String> extensionsMap;
|
||||
private List<ExtensionFormat> extensions = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public PayloadStep<P, T, A> withHeaders(Supplier<Map<String, Object>> headers) {
|
||||
|
||||
this.headers = headers.get()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map((entry) ->
|
||||
new SimpleEntry<>(entry.getKey(), entry.getValue()))
|
||||
.collect(toMap(Map.Entry::getKey,
|
||||
Map.Entry::getValue));
|
||||
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AttributeMapStep<P, T, A> withPayload(Supplier<P> payload) {
|
||||
this.payload = payload.get();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public AttributeUmarshallStep<P, T, A> attributes(BinaryFormatAttributeMapper un) {
|
||||
this.attributesMap = un.map(headers);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataUnmarshallStep<P, T, A> attributes(AttributeUnmarshaller<A> un) {
|
||||
this.attributes = un.unmarshal(attributesMap);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtensionsMapStep<P, T, A> data(DataUnmarshaller<P, T, A> un) {
|
||||
try {
|
||||
this.data = un.unmarshall(this.payload, this.attributes);
|
||||
return this;
|
||||
}catch(Exception e) {
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtensionsStepBegin<P, T, A> extensions(BinaryFormatExtensionMapper mapper) {
|
||||
this.extensionsMap = mapper.map(headers);
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtensionsStep<P, T, A> begin() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExtensionsStep<P, T, A> extension(ExtensionUmarshaller un) {
|
||||
Optional<ExtensionFormat> ef = un.unmarshal(extensionsMap);
|
||||
ef.ifPresent((value) ->{
|
||||
this.extensions.add(value);
|
||||
});
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Build<T, A> end() {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public CloudEvent<A, T> build(EventBuilder<T, A> builder) {
|
||||
return builder.build(data, attributes, extensions);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
Map<String, Object> myHeaders = new HashMap<>();
|
||||
myHeaders.put("ce-id", "0x11");
|
||||
myHeaders.put("ce-source", "/source");
|
||||
myHeaders.put("ce-specversion", "0.3");
|
||||
myHeaders.put("ce-type", "br.my");
|
||||
myHeaders.put("ce-time", "2019-09-16T20:49:00Z");
|
||||
myHeaders.put("ce-schemaurl", "http://my.br");
|
||||
myHeaders.put("my-ext", "my-custom extension");
|
||||
myHeaders.put("traceparent", "0");
|
||||
myHeaders.put("tracestate", "congo=4");
|
||||
myHeaders.put("Content-Type", "application/json");
|
||||
|
||||
String myPayload = "{\"foo\" : \"rocks\", \"name\" : \"jocker\"}";
|
||||
|
||||
//String payload = "{\"id\":\"0x19\",\"type\":\"aws.s3.object.created\",\"time\":\"2018-04-26T14:48:09.769Z\",\"source\":\"/source\",\"contenttype\":\"application/json\",\"specversion\":\"0.2\"}";
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
System.out.println(mapper.writeValueAsString(new Dummy()));
|
||||
|
||||
CloudEvent<AttributesImpl, Dummy> event = BinaryUnmashaller.<String, Dummy, AttributesImpl>
|
||||
builder()
|
||||
.withHeaders(() -> myHeaders)
|
||||
.withPayload(() -> myPayload)
|
||||
.attributes(new BinaryFormatAttributeMapperImpl()::map)
|
||||
.attributes(AttributesImpl.unmarshaller()::unmarshal)
|
||||
.data((payload, attributes) -> {
|
||||
return mapper.readValue(payload, Dummy.class);
|
||||
})
|
||||
.extensions(new BinaryFormatExtensionMapperImpl()::map)
|
||||
.begin()
|
||||
.extension((exts) -> {
|
||||
String key = "my-ext";
|
||||
String value = exts.get(key);
|
||||
|
||||
ExtensionFormat result = null;
|
||||
if(null!= value) {
|
||||
result = ExtensionFormat.of(InMemoryFormat
|
||||
.of(key, value, Object.class), key, value);
|
||||
}
|
||||
return Optional.ofNullable(result);
|
||||
})
|
||||
.extension((exts) -> {
|
||||
String traceparent = exts.get("traceparent");
|
||||
String tracestate = exts.get("tracestate");
|
||||
|
||||
if(null!= traceparent && null!= tracestate) {
|
||||
DistributedTracingExtension dte = new DistributedTracingExtension();
|
||||
dte.setTraceparent(traceparent);
|
||||
dte.setTracestate(tracestate);
|
||||
|
||||
InMemoryFormat inMemory =
|
||||
InMemoryFormat.of("distributedTracing", dte, Object.class);
|
||||
|
||||
return Optional.of(
|
||||
ExtensionFormat.of(inMemory,
|
||||
new SimpleEntry<>("traceparent", traceparent),
|
||||
new SimpleEntry<>("tracestate", tracestate))
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
})
|
||||
.end()
|
||||
.build(CloudEventBuilder.<Dummy>builder()::build);
|
||||
|
||||
System.out.println(event.getAttributes());
|
||||
System.out.println(event.getData());
|
||||
System.out.println(event.getExtensions());
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright 2019 The CloudEvents Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package io.cloudevents.format;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Represents a result of binary marshalling, to be used to wire serialization
|
||||
*
|
||||
* @author fabiojose
|
||||
*
|
||||
*/
|
||||
public class Format<T> {
|
||||
|
||||
private final T payload;
|
||||
private final Map<String, Object> headers;
|
||||
|
||||
public Format(T payload, Map<String, Object> headers) {
|
||||
this.payload = payload;
|
||||
this.headers = headers;
|
||||
}
|
||||
|
||||
/**
|
||||
* The payload
|
||||
*/
|
||||
public Optional<T> getPayload() {
|
||||
return Optional.ofNullable(payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* The headers
|
||||
*/
|
||||
public Map<String, Object> getHeaders() {
|
||||
return headers;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue