Use of marshallers and unmarshallers for HTTP
Signed-off-by: Fabio José <fabiojose@gmail.com>
This commit is contained in:
parent
0b11866d83
commit
efd60454ee
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright 2018 The CloudEvents Authors
|
||||
* Copyright 2019 The CloudEvents Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
|
@ -15,30 +15,7 @@
|
|||
*/
|
||||
package io.cloudevents.http.vertx.impl;
|
||||
|
||||
import io.cloudevents.CloudEvent;
|
||||
import io.cloudevents.extensions.DistributedTracingExtension;
|
||||
import io.cloudevents.extensions.ExtensionFormat;
|
||||
import io.cloudevents.format.BinaryMarshaller;
|
||||
import io.cloudevents.format.BinaryUnmarshaller;
|
||||
import io.cloudevents.format.Wire;
|
||||
import io.cloudevents.http.vertx.VertxCloudEvents;
|
||||
import io.cloudevents.json.Json;
|
||||
import io.cloudevents.v02.Accessor;
|
||||
import io.cloudevents.v02.AttributesImpl;
|
||||
import io.cloudevents.v02.CloudEventBuilder;
|
||||
import io.cloudevents.v02.CloudEventImpl;
|
||||
import io.cloudevents.v02.http.AttributeMapper;
|
||||
import io.cloudevents.v02.http.ExtensionMapper;
|
||||
import io.cloudevents.v02.http.HeaderMapper;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.MultiMap;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.HttpClientRequest;
|
||||
import static io.vertx.core.http.HttpHeaders.createOptimized;
|
||||
import io.vertx.core.http.HttpHeaders;
|
||||
import io.vertx.core.http.HttpServerRequest;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
|
|
@ -46,6 +23,23 @@ import java.util.Map;
|
|||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
|
||||
import io.cloudevents.CloudEvent;
|
||||
import io.cloudevents.format.Wire;
|
||||
import io.cloudevents.http.vertx.VertxCloudEvents;
|
||||
import io.cloudevents.json.Json;
|
||||
import io.cloudevents.v02.AttributesImpl;
|
||||
import io.cloudevents.v02.CloudEventImpl;
|
||||
import io.cloudevents.v02.http.Marshallers;
|
||||
import io.cloudevents.v02.http.Unmarshallers;
|
||||
import io.vertx.core.AsyncResult;
|
||||
import io.vertx.core.Future;
|
||||
import io.vertx.core.Handler;
|
||||
import io.vertx.core.MultiMap;
|
||||
import io.vertx.core.buffer.Buffer;
|
||||
import io.vertx.core.http.HttpClientRequest;
|
||||
import io.vertx.core.http.HttpHeaders;
|
||||
import io.vertx.core.http.HttpServerRequest;
|
||||
|
||||
public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
||||
|
||||
private final static CharSequence BINARY_TYPE = HttpHeaders.createOptimized("application/json");
|
||||
|
|
@ -66,16 +60,7 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
|||
if (headers.get(HttpHeaders.CONTENT_TYPE).equalsIgnoreCase(BINARY_TYPE.toString())) {
|
||||
request.bodyHandler((Buffer buff) -> {
|
||||
CloudEvent<AttributesImpl, String> event =
|
||||
BinaryUnmarshaller.<AttributesImpl, String, String>builder()
|
||||
.map(AttributeMapper::map)
|
||||
.map(AttributesImpl::unmarshal)
|
||||
.map("application/json",
|
||||
Json.umarshaller(String.class)::unmarshal)
|
||||
.next()
|
||||
.map(ExtensionMapper::map)
|
||||
.map(DistributedTracingExtension::unmarshall)
|
||||
.next()
|
||||
.builder(CloudEventBuilder.<String>builder()::build)
|
||||
Unmarshallers.binary(String.class)
|
||||
.withHeaders(() -> {
|
||||
final Map<String, Object> result = new HashMap<>();
|
||||
|
||||
|
|
@ -119,14 +104,8 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
|||
public void writeToHttpClientRequest(CloudEvent<AttributesImpl, String> cloudEvent, boolean binary, HttpClientRequest request) {
|
||||
|
||||
if (binary) {
|
||||
Wire<String, String, Object> wire =
|
||||
BinaryMarshaller.<AttributesImpl, String, String>builder()
|
||||
.map(AttributesImpl::marshal)
|
||||
.map(Accessor::extensionsOf)
|
||||
.map(ExtensionFormat::marshal)
|
||||
.map(HeaderMapper::map)
|
||||
.map(Json.marshaller())
|
||||
.builder(Wire<String, String, Object>::new)
|
||||
Wire<String, String, String> wire =
|
||||
Marshallers.<String>binary()
|
||||
.withEvent(() -> cloudEvent)
|
||||
.marshal();
|
||||
|
||||
|
|
@ -142,7 +121,7 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
|
|||
.stream()
|
||||
.forEach(header -> {
|
||||
request.putHeader(createOptimized(header.getKey()),
|
||||
createOptimized(header.getValue().toString()));
|
||||
createOptimized(header.getValue()));
|
||||
});
|
||||
|
||||
wire.getPayload().ifPresent((payload) -> {
|
||||
|
|
|
|||
Loading…
Reference in New Issue