Turning the marshaller to lazy eval

Signed-off-by: Fabio José <fabiojose@gmail.com>
This commit is contained in:
Fabio José 2019-08-29 20:57:03 -03:00
parent 9c11207c84
commit a0047bef6e
1 changed files with 187 additions and 87 deletions

View File

@ -1,10 +1,24 @@
/**
* Copyright 2019 The CloudEvents Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.cloudevents.format; package io.cloudevents.format;
import java.net.URI; import java.net.URI;
import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.Collectors;
import io.cloudevents.Attributes; import io.cloudevents.Attributes;
import io.cloudevents.CloudEvent; import io.cloudevents.CloudEvent;
@ -13,8 +27,9 @@ import io.cloudevents.extensions.ExtensionFormat;
import io.cloudevents.fun.AttributeMarshaller; import io.cloudevents.fun.AttributeMarshaller;
import io.cloudevents.fun.BinaryFormatHeaderMapper; import io.cloudevents.fun.BinaryFormatHeaderMapper;
import io.cloudevents.fun.DataMarshaller; import io.cloudevents.fun.DataMarshaller;
import io.cloudevents.fun.ExtensionFormatAccessor;
import io.cloudevents.fun.ExtensionMarshaller; import io.cloudevents.fun.ExtensionMarshaller;
import io.cloudevents.fun.FormatBuilder; import io.cloudevents.fun.WireBuilder;
import io.cloudevents.json.Json; import io.cloudevents.json.Json;
import io.cloudevents.v02.Accessor; import io.cloudevents.v02.Accessor;
import io.cloudevents.v02.AttributesImpl; import io.cloudevents.v02.AttributesImpl;
@ -27,131 +42,216 @@ import io.cloudevents.v02.http.BinaryFormatHeaderMapperImpl;
* @author fabiojose * @author fabiojose
* *
*/ */
public class BinaryMarshaller<P, T, A extends Attributes> { public final class BinaryMarshaller {
private BinaryMarshaller() {}
public static <P, T, A extends Attributes> EventStep<P, T, A> builder() {
return new Builder<>();
}
public static interface EventStep<P, T, A extends Attributes> { /**
AttributeMarshalStep<P, T, A> * Gets a new builder instance
withEvent(Supplier<CloudEvent<A, T>> event); * @param <A> The attributes type
* @param <T> The 'data' type
* @param <P> The payload type
* @return
*/
public static <A extends Attributes, T, P>
AttributeMarshalStep<A, T, P> builder() {
return new Builder<A, T, P>();
} }
public static interface AttributeMarshalStep<P, T, A extends Attributes> { public static interface AttributeMarshalStep<A extends Attributes, T, P> {
ExtensionsStep<P, T, A> attributes(AttributeMarshaller<A> marshaller); /**
* Marshals the {@link Attributes} instance into a
* {@code Map<String, String>}
* @param marshaller
* @return
*/
ExtensionsAccessorStep<A, T, P> map(AttributeMarshaller<A> marshaller);
} }
public static interface ExtensionsStep<P, T, A extends Attributes> { public static interface ExtensionsAccessorStep<A extends Attributes, T, P> {
HeaderMapStep<P, T, A> extensions(ExtensionMarshaller marshaller);
/**
* To get access of internal collection of {@link ExtensionFormat}
* @param accessor
* @return
*/
ExtensionsStep<A, T, P> map(ExtensionFormatAccessor<A, T> accessor);
} }
public static interface HeaderMapStep<P, T, A extends Attributes> { public static interface ExtensionsStep<A extends Attributes, T, P> {
PayloadStep<P, T, A> headers(BinaryFormatHeaderMapper mapper); /**
* Marshals the collection of {@link ExtensionFormat} into a
* {@code Map<String, String>}
* @param marshaller
* @return
*/
HeaderMapStep<A, T, P> map(ExtensionMarshaller marshaller);
} }
public static interface PayloadStep<P, T, A extends Attributes> { public static interface HeaderMapStep<A extends Attributes, T, P> {
Build<P, T, A> payload(DataMarshaller<P, T> marshaller); /**
* Marshals the map of attributes and extensions into a map of headers
* @param mapper
* @return
*/
PayloadStep<A, T, P> map(BinaryFormatHeaderMapper mapper);
} }
public static interface Build<P, T, A extends Attributes> { public static interface PayloadStep<A extends Attributes, T, P> {
Format<P> build(FormatBuilder<P> builder); /**
* Marshals the 'data' into payload
* @param marshaller
* @return
*/
BuilderStep<A, T, P> map(DataMarshaller<P, T> marshaller);
} }
public static class Builder<P, T, A extends Attributes> implements public static interface BuilderStep<A extends Attributes, T, P> {
EventStep<P, T, A>, AttributeMarshalStep<P, T, A>, /**
ExtensionsStep<P, T, A>, HeaderMapStep<P, T, A>, * Builds the {@link Wire} to use for wire transfer
PayloadStep<P, T, A>, Build<P, T, A> { * @param builder
* @return
private CloudEvent<A, T> event; */
EventStep<A, T, P> builder(WireBuilder<P> builder);
private Map<String, String> attributesMap; }
private Map<String, Object> headers; public static interface EventStep<A extends Attributes, T, P> {
/**
private P payload; * Takes the {@link CloudEvent} instance to marshal
* @param event
private Map<String, String> extensionsMap; * @return
*/
Marshaller<P> withEvent(Supplier<CloudEvent<A, T>> event);
}
public static interface Marshaller<P> {
/**
* Builds an instance of {@link Wire}, doing all the computation at
* this method call.
* @return
*/
Wire<P> marshal();
}
private static final class Builder<A extends Attributes, T, P> implements
AttributeMarshalStep<A, T, P>,
ExtensionsAccessorStep<A, T, P>,
ExtensionsStep<A, T, P>,
PayloadStep<A, T, P>,
HeaderMapStep<A, T, P>,
BuilderStep<A, T, P>,
EventStep<A, T, P>,
Marshaller<P> {
private AttributeMarshaller<A> attributeMarshaller;
private ExtensionFormatAccessor<A, T> extensionsAccessor;
private ExtensionMarshaller extensionMarshaller;
private BinaryFormatHeaderMapper headerMapper;
private DataMarshaller<P, T> dataMarshaller;
private WireBuilder<P> wireBuilder;
private Supplier<CloudEvent<A, T>> eventSupplier;
@Override @Override
public AttributeMarshalStep<P, T, A> public ExtensionsAccessorStep<A, T, P> map(AttributeMarshaller<A> marshaller) {
withEvent(Supplier<CloudEvent<A, T>> event) { this.attributeMarshaller = marshaller;
this.event = event.get();
return this; return this;
} }
@Override @Override
public ExtensionsStep<P, T, A> attributes(AttributeMarshaller<A> marshaller) { public ExtensionsStep<A, T, P> map(ExtensionFormatAccessor<A, T> accessor) {
this.attributesMap = marshaller.marshal(event.getAttributes()); this.extensionsAccessor = accessor;
return this;
}
@Override
public HeaderMapStep<P, T, A> extensions(ExtensionMarshaller marshaller) {
this.extensionsMap = marshaller.marshal(Accessor.extensionsOf(event));
return this; return this;
} }
@Override @Override
public PayloadStep<P, T, A> headers(BinaryFormatHeaderMapper mapper) { public HeaderMapStep<A, T, P> map(ExtensionMarshaller marshaller) {
this.headers = mapper.map(attributesMap, extensionsMap); this.extensionMarshaller = marshaller;
return this;
}
@Override
public PayloadStep<A, T, P> map(BinaryFormatHeaderMapper mapper) {
this.headerMapper = mapper;
return this;
}
@Override
public BuilderStep<A, T, P> map(DataMarshaller<P, T> marshaller) {
this.dataMarshaller = marshaller;
return this;
}
@Override
public EventStep<A, T, P> builder(WireBuilder<P> builder) {
this.wireBuilder = builder;
return this; return this;
} }
@Override @Override
public Build<P, T, A> payload(DataMarshaller<P, T> marshaller) { public Marshaller<P> withEvent(Supplier<CloudEvent<A, T>> event) {
event.getData().ifPresent((data) -> { this.eventSupplier = event;
try {
payload = marshaller.marshal(data, headers);
}catch(Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
});
return this; return this;
} }
@Override @Override
public Format<P> build(FormatBuilder<P> builder) { public Wire<P> marshal() {
return builder.build(payload, headers); CloudEvent<A, T> event = eventSupplier.get();
Map<String, String> attributesMap =
attributeMarshaller.marshal(event.getAttributes());
Collection<ExtensionFormat> extensionsFormat =
extensionsAccessor.extensionsOf(event);
Map<String, String> extensionsMap =
extensionMarshaller.marshal(extensionsFormat);
Map<String, Object> headers =
headerMapper.map(attributesMap, extensionsMap);
P payload = null;
if(event.getData().isPresent()) {
payload = dataMarshaller.marshal(event.getData().get(),
headers);
}
return wireBuilder.build(payload, headers);
} }
} }
public static void main(String[] args) { public static void main(String[] args) {
final DistributedTracingExtension dt =
final DistributedTracingExtension dt = new DistributedTracingExtension(); new DistributedTracingExtension();
dt.setTraceparent("0"); dt.setTraceparent("0");
dt.setTracestate("congo=4"); dt.setTracestate("congo=4");
final ExtensionFormat tracing = new DistributedTracingExtension.Format(dt); final ExtensionFormat tracing =
new DistributedTracingExtension.Format(dt);
final CloudEventImpl<String> ce = final CloudEventImpl<String> ce =
CloudEventBuilder.<String>builder() CloudEventBuilder.<String>builder()
.withId("x10") .withId("x10")
.withSource(URI.create("/source")) .withSource(URI.create("/source"))
.withType("event-type") .withType("event-type")
.withSchemaurl(URI.create("/schema")) .withSchemaurl(URI.create("/schema"))
.withContenttype("text/plain") .withContenttype("text/plain")
.withExtension(tracing) .withData("my-data")
.withData("my-data") .withExtension(tracing)
.build(); .build();
Format<String> format = Wire<String> wire =
BinaryMarshaller.<String, String, AttributesImpl>builder() BinaryMarshaller.<AttributesImpl, String, String>builder()
.map(AttributesImpl::marshal)
.map(Accessor::extensionsOf)
.map(ExtensionFormat::marshal)
.map(BinaryFormatHeaderMapperImpl::map)
.map(Json.marshaller()::marshal)
.builder(Wire<String>::new)
.withEvent(() -> ce) .withEvent(() -> ce)
.attributes(AttributesImpl.marshaller()::marshal) .marshal();
.extensions((extensions) -> {
return extensions.stream()
.map(ExtensionFormat::transport)
.flatMap(t -> t.entrySet().stream())
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
})
.headers(BinaryFormatHeaderMapperImpl.mapper()::map)
.payload(Json.marshaller()::marshal)
.build((payload, headers) -> {
return new Format<>(payload, headers);
});
System.out.println(format.getPayload()); System.out.println(wire.getPayload());
System.out.print(format.getHeaders()); System.out.println(wire.getHeaders());
} }
} }