💨 adding extension support (0.2) and shipping distributred tracing
Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
This commit is contained in:
		
							parent
							
								
									90c63496d2
								
							
						
					
					
						commit
						5aef5b2fe7
					
				| 
						 | 
				
			
			@ -17,7 +17,7 @@ package io.cloudevents;
 | 
			
		|||
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.time.ZonedDateTime;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
| 
						 | 
				
			
			@ -66,4 +66,9 @@ public interface CloudEvent<T> {
 | 
			
		|||
     * The event payload. The payload depends on the eventType, schemaURL and eventTypeVersion, the payload is encoded into a media format which is specified by the contentType attribute (e.g. application/json).
 | 
			
		||||
     */
 | 
			
		||||
    Optional<T> getData();
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     *
 | 
			
		||||
     */
 | 
			
		||||
    Optional<List<Extension>> getExtensions();
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -19,7 +19,9 @@ import io.cloudevents.impl.DefaultCloudEventImpl;
 | 
			
		|||
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.time.ZonedDateTime;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.LinkedHashMap;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
| 
						 | 
				
			
			@ -36,6 +38,7 @@ public class CloudEventBuilder<T> {
 | 
			
		|||
    private ZonedDateTime time;
 | 
			
		||||
    private URI schemaURL;
 | 
			
		||||
    private T data;
 | 
			
		||||
    private final List<Extension> extensions = new ArrayList<>();
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * The version of the CloudEvents specification which the event uses.
 | 
			
		||||
| 
						 | 
				
			
			@ -103,6 +106,11 @@ public class CloudEventBuilder<T> {
 | 
			
		|||
        return this;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public CloudEventBuilder<T> extension(final Extension extension) {
 | 
			
		||||
        this.extensions.add(extension);
 | 
			
		||||
        return this;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
     * Constructs a new {@link CloudEvent} with the previously-set configuration.
 | 
			
		||||
     */
 | 
			
		||||
| 
						 | 
				
			
			@ -117,6 +125,6 @@ public class CloudEventBuilder<T> {
 | 
			
		|||
            throw new IllegalArgumentException("please provide all required fields");
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        return new DefaultCloudEventImpl<T>(type, specversion, source, id, time, schemaURL, contentType, data);
 | 
			
		||||
        return new DefaultCloudEventImpl<T>(type, specversion, source, id, time, schemaURL, contentType, data, extensions);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,19 @@
 | 
			
		|||
/**
 | 
			
		||||
 * Copyright 2018 The CloudEvents Authors
 | 
			
		||||
 *
 | 
			
		||||
 * Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
 * you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at
 | 
			
		||||
 *
 | 
			
		||||
 * http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
 *
 | 
			
		||||
 * Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
 * distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
 * See the License for the specific language governing permissions and
 | 
			
		||||
 * limitations under the License.
 | 
			
		||||
 */
 | 
			
		||||
package io.cloudevents;
 | 
			
		||||
 | 
			
		||||
public interface Extension {
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,33 @@
 | 
			
		|||
package io.cloudevents.extensions;
 | 
			
		||||
 | 
			
		||||
import io.cloudevents.Extension;
 | 
			
		||||
 | 
			
		||||
public class DistributedTracingExtension implements Extension {
 | 
			
		||||
 | 
			
		||||
    private String traceparent;
 | 
			
		||||
    private String tracestate;
 | 
			
		||||
 | 
			
		||||
    public String getTraceparent() {
 | 
			
		||||
        return traceparent;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void setTraceparent(String traceparent) {
 | 
			
		||||
        this.traceparent = traceparent;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public String getTracestate() {
 | 
			
		||||
        return tracestate;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    public void setTracestate(String tracestate) {
 | 
			
		||||
        this.tracestate = tracestate;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public String toString() {
 | 
			
		||||
        return "DistributedTracingExtension{" +
 | 
			
		||||
                "traceparent='" + traceparent + '\'' +
 | 
			
		||||
                ", tracestate='" + tracestate + '\'' +
 | 
			
		||||
                '}';
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -19,11 +19,13 @@ import com.fasterxml.jackson.annotation.JsonAlias;
 | 
			
		|||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 | 
			
		||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 | 
			
		||||
import io.cloudevents.CloudEvent;
 | 
			
		||||
import io.cloudevents.Extension;
 | 
			
		||||
import io.cloudevents.SpecVersion;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.time.ZonedDateTime;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
import java.util.Optional;
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -45,8 +47,9 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
 | 
			
		|||
    private URI schemaURL = null;
 | 
			
		||||
    private String contentType = null;
 | 
			
		||||
    private T data = null;
 | 
			
		||||
    private List<Extension> extensions = null;
 | 
			
		||||
 | 
			
		||||
    public DefaultCloudEventImpl(final String type, final String specversion, final URI source, final String id, final ZonedDateTime time, final URI schemaURL, final String contentType, final T data) {
 | 
			
		||||
    public DefaultCloudEventImpl(final String type, final String specversion, final URI source, final String id, final ZonedDateTime time, final URI schemaURL, final String contentType, final T data, final List<Extension> extensions) {
 | 
			
		||||
        this.specversion = specversion;
 | 
			
		||||
        this.type = type;
 | 
			
		||||
        this.source = source;
 | 
			
		||||
| 
						 | 
				
			
			@ -55,6 +58,7 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
 | 
			
		|||
        this.schemaURL = schemaURL;
 | 
			
		||||
        this.contentType = contentType;
 | 
			
		||||
        this.data = data;
 | 
			
		||||
        this.extensions = extensions;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    DefaultCloudEventImpl() {
 | 
			
		||||
| 
						 | 
				
			
			@ -101,6 +105,11 @@ public class DefaultCloudEventImpl<T> implements CloudEvent<T>, Serializable {
 | 
			
		|||
        return Optional.ofNullable(data);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public Optional<List<Extension>> getExtensions() {
 | 
			
		||||
        return Optional.ofNullable(extensions);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // protected setters, used for (JSON) deserialization
 | 
			
		||||
 | 
			
		||||
    @JsonAlias({"specversion", "cloudEventsVersion"})
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,6 +15,7 @@
 | 
			
		|||
 */
 | 
			
		||||
package io.cloudevents;
 | 
			
		||||
 | 
			
		||||
import io.cloudevents.extensions.DistributedTracingExtension;
 | 
			
		||||
import org.junit.Test;
 | 
			
		||||
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
| 
						 | 
				
			
			@ -168,4 +169,31 @@ public class CloudEventBuilderTest {
 | 
			
		|||
        assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    public void testBuilderWithoutDataAndDistributedTracingExtension() {
 | 
			
		||||
 | 
			
		||||
        // given
 | 
			
		||||
        final String id = UUID.randomUUID().toString();
 | 
			
		||||
        final URI src = URI.create("mailto:cncf-wg-serverless@lists.cncf.io");
 | 
			
		||||
        final String type = "My.Cloud.Event.Type";
 | 
			
		||||
        final DistributedTracingExtension dte = new DistributedTracingExtension();
 | 
			
		||||
        dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
 | 
			
		||||
        dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
 | 
			
		||||
 | 
			
		||||
        // when
 | 
			
		||||
        final CloudEvent<Map<String, String>> simpleKeyValueEvent = new CloudEventBuilder()
 | 
			
		||||
                .type(type)
 | 
			
		||||
                .id(id)
 | 
			
		||||
                .source(src)
 | 
			
		||||
                .extension(dte)
 | 
			
		||||
                .build();
 | 
			
		||||
        // than
 | 
			
		||||
        assertThat(simpleKeyValueEvent.getSource()).isEqualTo(src);
 | 
			
		||||
        assertThat(simpleKeyValueEvent.getExtensions().get()).contains(dte);
 | 
			
		||||
 | 
			
		||||
        Extension receivedDte = simpleKeyValueEvent.getExtensions().get().get(0);
 | 
			
		||||
        assertThat(receivedDte).extracting("traceparent", "tracestate")
 | 
			
		||||
                .contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -16,6 +16,7 @@
 | 
			
		|||
package io.cloudevents.http.vertx;
 | 
			
		||||
 | 
			
		||||
import io.cloudevents.CloudEvent;
 | 
			
		||||
import io.cloudevents.Extension;
 | 
			
		||||
import io.cloudevents.http.vertx.impl.VertxCloudEventsImpl;
 | 
			
		||||
import io.vertx.codegen.annotations.GenIgnore;
 | 
			
		||||
import io.vertx.codegen.annotations.VertxGen;
 | 
			
		||||
| 
						 | 
				
			
			@ -24,6 +25,8 @@ import io.vertx.core.Handler;
 | 
			
		|||
import io.vertx.core.http.HttpClientRequest;
 | 
			
		||||
import io.vertx.core.http.HttpServerRequest;
 | 
			
		||||
 | 
			
		||||
import java.util.List;
 | 
			
		||||
 | 
			
		||||
@VertxGen
 | 
			
		||||
public interface VertxCloudEvents {
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -34,6 +37,9 @@ public interface VertxCloudEvents {
 | 
			
		|||
    @GenIgnore(GenIgnore.PERMITTED_TYPE)
 | 
			
		||||
    <T> void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<T>>> resultHandler);
 | 
			
		||||
 | 
			
		||||
    @GenIgnore(GenIgnore.PERMITTED_TYPE)
 | 
			
		||||
    <T> void readFromRequest(HttpServerRequest request, Class[] extensions, Handler<AsyncResult<CloudEvent<T>>> resultHandler);
 | 
			
		||||
 | 
			
		||||
    @GenIgnore(GenIgnore.PERMITTED_TYPE)
 | 
			
		||||
    <T> void writeToHttpClientRequest(CloudEvent<T> ce, HttpClientRequest request);
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,7 +17,9 @@ package io.cloudevents.http.vertx.impl;
 | 
			
		|||
 | 
			
		||||
import io.cloudevents.CloudEvent;
 | 
			
		||||
import io.cloudevents.CloudEventBuilder;
 | 
			
		||||
import io.cloudevents.Extension;
 | 
			
		||||
import io.cloudevents.SpecVersion;
 | 
			
		||||
import io.cloudevents.extensions.DistributedTracingExtension;
 | 
			
		||||
import io.cloudevents.http.HttpTransportAttributes;
 | 
			
		||||
import io.cloudevents.http.V02HttpTransportMappers;
 | 
			
		||||
import io.cloudevents.http.vertx.VertxCloudEvents;
 | 
			
		||||
| 
						 | 
				
			
			@ -29,10 +31,19 @@ import io.vertx.core.buffer.Buffer;
 | 
			
		|||
import io.vertx.core.http.HttpClientRequest;
 | 
			
		||||
import io.vertx.core.http.HttpHeaders;
 | 
			
		||||
import io.vertx.core.http.HttpServerRequest;
 | 
			
		||||
import io.vertx.core.json.Json;
 | 
			
		||||
import io.vertx.core.json.JsonObject;
 | 
			
		||||
 | 
			
		||||
import java.io.Serializable;
 | 
			
		||||
import java.lang.reflect.Field;
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.time.ZonedDateTime;
 | 
			
		||||
import java.time.format.DateTimeFormatter;
 | 
			
		||||
import java.util.ArrayList;
 | 
			
		||||
import java.util.Arrays;
 | 
			
		||||
import java.util.Collections;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.Map;
 | 
			
		||||
 | 
			
		||||
public final class VertxCloudEventsImpl implements VertxCloudEvents {
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -50,6 +61,12 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
 | 
			
		|||
 | 
			
		||||
    @Override
 | 
			
		||||
    public <T> void readFromRequest(HttpServerRequest request, Handler<AsyncResult<CloudEvent<T>>> resultHandler) {
 | 
			
		||||
        this.readFromRequest(request, null, resultHandler);
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Override
 | 
			
		||||
    public <T> void readFromRequest(HttpServerRequest request, Class[] extensions, Handler<AsyncResult<CloudEvent<T>>> resultHandler) {
 | 
			
		||||
 | 
			
		||||
        final MultiMap headers = request.headers();
 | 
			
		||||
        final CloudEventBuilder builder = new CloudEventBuilder();
 | 
			
		||||
| 
						 | 
				
			
			@ -85,6 +102,34 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
 | 
			
		|||
                builder.schemaURL(URI.create(schemaURL));
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            if (extensions != null && extensions.length > 0) {
 | 
			
		||||
 | 
			
		||||
                // move this out
 | 
			
		||||
                Arrays.asList(extensions).forEach(ext -> {
 | 
			
		||||
 | 
			
		||||
                    try {
 | 
			
		||||
                        Object extObj  = ext.newInstance();
 | 
			
		||||
                        final JsonObject extension = new JsonObject();
 | 
			
		||||
                        Field[] fields = ext.getDeclaredFields();
 | 
			
		||||
 | 
			
		||||
                        for (Field field : fields) {
 | 
			
		||||
                            boolean accessible = field.isAccessible();
 | 
			
		||||
                            field.setAccessible(true);
 | 
			
		||||
                            field.set(extObj, request.headers().get(field.getName()));
 | 
			
		||||
                            field.setAccessible(accessible);
 | 
			
		||||
                        }
 | 
			
		||||
                        builder.extension((Extension) extObj);
 | 
			
		||||
                    } catch (InstantiationException e) {
 | 
			
		||||
                        e.printStackTrace();
 | 
			
		||||
                    } catch (IllegalAccessException e) {
 | 
			
		||||
                        e.printStackTrace();
 | 
			
		||||
                    }
 | 
			
		||||
                });
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            request.bodyHandler((Buffer buff) -> {
 | 
			
		||||
 | 
			
		||||
                if (buff.length()>0) {
 | 
			
		||||
| 
						 | 
				
			
			@ -126,8 +171,21 @@ public final class VertxCloudEventsImpl implements VertxCloudEvents {
 | 
			
		|||
            request.putHeader(HttpHeaders.createOptimized(httpTransportAttributes.schemaUrlKey()), HttpHeaders.createOptimized(schemaUrl.toString()));
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        ce.getExtensions().ifPresent(extensions -> {
 | 
			
		||||
 | 
			
		||||
            extensions.forEach(ext -> {
 | 
			
		||||
                JsonObject.mapFrom(ext).forEach(extEntry -> {
 | 
			
		||||
                    request.putHeader(HttpHeaders.createOptimized(extEntry.getKey()), HttpHeaders.createOptimized(extEntry.getValue().toString()));
 | 
			
		||||
                });
 | 
			
		||||
            });
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        ce.getData().ifPresent(data -> {
 | 
			
		||||
            request.write(data.toString());
 | 
			
		||||
        });
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -17,6 +17,7 @@ package io.cloudevents.http.vertx;
 | 
			
		|||
 | 
			
		||||
import io.cloudevents.CloudEvent;
 | 
			
		||||
import io.cloudevents.CloudEventBuilder;
 | 
			
		||||
import io.cloudevents.extensions.DistributedTracingExtension;
 | 
			
		||||
import io.cloudevents.http.V01HttpTransportMappers;
 | 
			
		||||
import io.cloudevents.http.V02HttpTransportMappers;
 | 
			
		||||
import io.cloudevents.http.reactivex.vertx.VertxCloudEvents;
 | 
			
		||||
| 
						 | 
				
			
			@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test;
 | 
			
		|||
import org.junit.jupiter.api.extension.ExtendWith;
 | 
			
		||||
 | 
			
		||||
import java.net.URI;
 | 
			
		||||
import java.util.List;
 | 
			
		||||
import java.util.logging.Logger;
 | 
			
		||||
 | 
			
		||||
import static io.cloudevents.SpecVersion.V_01;
 | 
			
		||||
| 
						 | 
				
			
			@ -211,4 +213,59 @@ class VertxCloudEventsTests {
 | 
			
		|||
                    req.end();
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    @Test
 | 
			
		||||
    @DisplayName("Post a 0.2 CloudEvents object with a payload")
 | 
			
		||||
    void cloudEventWithExtension(Vertx vertx, VertxTestContext testContext) {
 | 
			
		||||
        Checkpoint serverCheckpoint = testContext.checkpoint();
 | 
			
		||||
        Checkpoint clientCheckpoint = testContext.checkpoint();
 | 
			
		||||
 | 
			
		||||
        final DistributedTracingExtension dte = new DistributedTracingExtension();
 | 
			
		||||
        dte.setTraceparent("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
 | 
			
		||||
        dte.setTracestate("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        final CloudEvent<String> cloudEvent = new CloudEventBuilder<String>()
 | 
			
		||||
                .specVersion("0.2")
 | 
			
		||||
                .source(URI.create("http://knative-eventing.com"))
 | 
			
		||||
                .id("foo-bar")
 | 
			
		||||
                .extension(dte)
 | 
			
		||||
                .type("pushevent")
 | 
			
		||||
                .build();
 | 
			
		||||
 | 
			
		||||
        Class[] exceptions =  {DistributedTracingExtension.class};
 | 
			
		||||
 | 
			
		||||
        vertx.createHttpServer()
 | 
			
		||||
                .requestHandler(req -> VertxCloudEvents
 | 
			
		||||
                        .create()
 | 
			
		||||
                        .rxReadFromRequest(req, exceptions)
 | 
			
		||||
                        .doOnError(testContext::failNow)
 | 
			
		||||
                        .subscribe(event -> testContext.verify(() -> {
 | 
			
		||||
                            assertThat(event.getId()).isEqualTo(cloudEvent.getId());
 | 
			
		||||
 | 
			
		||||
                            // extension headers
 | 
			
		||||
                            assertThat(req.headers().get("traceparent")).isEqualTo("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01");
 | 
			
		||||
                            assertThat(req.headers().get("tracestate")).isEqualTo("congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
 | 
			
		||||
 | 
			
		||||
                            assertThat(event.getExtensions().get().get(0)).isNotNull();
 | 
			
		||||
 | 
			
		||||
                            assertThat(event.getExtensions().get().get(0)).extracting("traceparent", "tracestate")
 | 
			
		||||
                                    .contains("00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "congo=BleGNlZWRzIHRohbCBwbGVhc3VyZS4");
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
                            req.response().end();
 | 
			
		||||
                            serverCheckpoint.flag();
 | 
			
		||||
                        })))
 | 
			
		||||
                .rxListen(8080)
 | 
			
		||||
                .doOnError(testContext::failNow)
 | 
			
		||||
                .subscribe(server -> {
 | 
			
		||||
                    HttpClientRequest req = vertx.createHttpClient().post(server.actualPort(), "localhost", "/");
 | 
			
		||||
                    req.handler(resp -> testContext.verify(() -> {
 | 
			
		||||
                        assertThat(resp.statusCode()).isEqualTo(200);
 | 
			
		||||
                        clientCheckpoint.flag();
 | 
			
		||||
                    }));
 | 
			
		||||
                    VertxCloudEvents.create().writeToHttpClientRequest(cloudEvent, req);
 | 
			
		||||
                    req.end();
 | 
			
		||||
                });
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue