diff --git a/examples/pom.xml b/examples/pom.xml index b748259de..6bf4a7b61 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -22,7 +22,12 @@ ${java.version} ${java.version} false +<<<<<<< HEAD 0.14.0 +======= + 1.41.0 + 3.4.0 +>>>>>>> 3a8fd611 (Ensure OTEL version from examples aligns with SDK and ITs (#1403)) @@ -78,11 +83,29 @@ ${opentelemetry.version} + io.opentelemetry + opentelemetry-sdk-metrics + ${opentelemetry.version} + + +<<<<<<< HEAD +======= io.opentelemetry opentelemetry-exporter-zipkin ${opentelemetry.version} + io.zipkin.reporter2 + zipkin-reporter + ${zipkin.version} + + + io.zipkin.reporter2 + zipkin-sender-urlconnection + ${zipkin.version} + + +>>>>>>> 3a8fd611 (Ensure OTEL version from examples aligns with SDK and ITs (#1403)) org.junit.jupiter junit-jupiter test diff --git a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java index 84909e291..a9cd028b3 100644 --- a/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java +++ b/examples/src/main/java/io/dapr/examples/OpenTelemetryConfig.java @@ -20,9 +20,10 @@ import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.exporter.zipkin.ZipkinSpanExporter; import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import org.springframework.beans.factory.annotation.Autowired; @@ -59,18 +60,23 @@ public class OpenTelemetryConfig { * Creates an opentelemetry instance. * @return OpenTelemetry. */ - public static OpenTelemetry createOpenTelemetry() { + public static OpenTelemetrySdk createOpenTelemetry() { // Only exports to Zipkin if it is up. Otherwise, ignore it. // This is helpful to avoid exceptions for examples that do not require Zipkin. if (isZipkinUp()) { + Resource serviceResource = Resource.getDefault() + .toBuilder() + .put("service.name", InvokeClient.class.getName()) // Use ResourceAttributes constant + .build(); String httpUrl = String.format("http://localhost:%d", ZIPKIN_PORT); + ZipkinSpanExporter zipkinExporter = ZipkinSpanExporter.builder() .setEndpoint(httpUrl + ENDPOINT_V2_SPANS) - .setServiceName(InvokeClient.class.getName()) .build(); SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() + .setResource(serviceResource) .addSpanProcessor(SimpleSpanProcessor.create(zipkinExporter)) .build(); @@ -100,7 +106,7 @@ public class OpenTelemetryConfig { */ public static reactor.util.context.Context getReactorContext(Context context) { Map map = new HashMap<>(); - TextMapPropagator.Setter> setter = + TextMapSetter> setter = (carrier, key, value) -> map.put(key, value); GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(context, map, setter); diff --git a/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java b/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java index b2da80a40..740fee826 100644 --- a/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java +++ b/examples/src/main/java/io/dapr/examples/OpenTelemetryInterceptor.java @@ -15,6 +15,7 @@ package io.dapr.examples; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapGetter; import io.opentelemetry.context.propagation.TextMapPropagator; import jakarta.servlet.DispatcherType; import jakarta.servlet.http.HttpServletRequest; @@ -34,8 +35,8 @@ public class OpenTelemetryInterceptor implements HandlerInterceptor { @Autowired private OpenTelemetry openTelemetry; - private static final TextMapPropagator.Getter HTTP_SERVLET_REQUEST_GETTER = - new TextMapPropagator.Getter<>() { + private static final TextMapGetter HTTP_SERVLET_REQUEST_GETTER = + new TextMapGetter<>() { @Override public Iterable keys(HttpServletRequest carrier) { return Collections.list(carrier.getHeaderNames()); diff --git a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java index 4864a1d2a..91b19f9ce 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/BulkPublisher.java @@ -19,8 +19,8 @@ import io.dapr.client.DaprPreviewClient; import io.dapr.client.domain.BulkPublishResponse; import io.dapr.client.domain.BulkPublishResponseFailedEntry; import io.dapr.examples.OpenTelemetryConfig; -import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -55,28 +55,41 @@ public class BulkPublisher { * @throws Exception any exception */ public static void main(String[] args) throws Exception { - OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry(); - Tracer tracer = openTelemetry.getTracer(BulkPublisher.class.getCanonicalName()); - Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan(); + OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry(); + Tracer tracer = openTelemetrySdk.getTracer(BulkPublisher.class.getCanonicalName()); + Span span = tracer.spanBuilder("Bulk Publisher's Main").setSpanKind(SpanKind.CLIENT).startSpan(); + try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) { DaprClient c = (DaprClient) client; + c.waitForSidecar(10000); + try (Scope scope = span.makeCurrent()) { System.out.println("Using preview client..."); + List messages = new ArrayList<>(); + System.out.println("Constructing the list of messages to publish"); + for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d", i); + messages.add(message); + System.out.println("Going to publish message : " + message); } - BulkPublishResponse res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages) + + BulkPublishResponse res = client + .publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages) .contextWrite(getReactorContext()).block(); + System.out.println("Published the set of messages in a single call to Dapr"); + if (res != null) { if (res.getFailedEntries().size() > 0) { // Ideally this condition will not happen in examples System.out.println("Some events failed to be published"); + for (BulkPublishResponseFailedEntry entry : res.getFailedEntries()) { System.out.println("EntryId : " + entry.getEntry().getEntryId() + " Error message : " + entry.getErrorMessage()); @@ -86,16 +99,11 @@ public class BulkPublisher { throw new Exception("null response from dapr"); } } - // Close the span. span.end(); - // Allow plenty of time for Dapr to export all relevant spans to the tracing infra. Thread.sleep(10000); - // Shutdown the OpenTelemetry tracer. - OpenTelemetrySdk.getGlobalTracerManagement().shutdown(); - + openTelemetrySdk.getSdkTracerProvider().shutdown(); System.out.println("Done"); } } } - diff --git a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java index fd79b076b..601382408 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java +++ b/examples/src/main/java/io/dapr/examples/pubsub/PublisherWithTracing.java @@ -16,8 +16,8 @@ package io.dapr.examples.pubsub; import io.dapr.client.DaprClient; import io.dapr.client.DaprClientBuilder; import io.dapr.examples.OpenTelemetryConfig; -import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -51,19 +51,21 @@ public class PublisherWithTracing { * @throws Exception A startup Exception. */ public static void main(String[] args) throws Exception { - OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry(); - Tracer tracer = openTelemetry.getTracer(PublisherWithTracing.class.getCanonicalName()); - Span span = tracer.spanBuilder("Publisher's Main").setSpanKind(Span.Kind.CLIENT).startSpan(); + OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry(); + Tracer tracer = openTelemetrySdk.getTracer(PublisherWithTracing.class.getCanonicalName()); + Span span = tracer.spanBuilder("Publisher's Main").setSpanKind(SpanKind.CLIENT).startSpan(); try (DaprClient client = new DaprClientBuilder().build()) { try (Scope scope = span.makeCurrent()) { for (int i = 0; i < NUM_MESSAGES; i++) { String message = String.format("This is message #%d", i); + // Publishing messages, notice the use of subscriberContext() for tracing. client.publishEvent( PUBSUB_NAME, TOPIC_NAME, message).contextWrite(getReactorContext()).block(); + System.out.println("Published message: " + message); try { @@ -71,19 +73,14 @@ public class PublisherWithTracing { } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); + return; } } } - // Close the span. span.end(); - - // Shutdown the OpenTelemetry tracer. - OpenTelemetrySdk.getGlobalTracerManagement().shutdown(); - - // This is an example, so for simplicity we are just exiting here. - // Normally a dapr app would be a web service and not exit main. + openTelemetrySdk.getSdkTracerProvider().shutdown(); System.out.println("Done."); } } diff --git a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java index 9461af4c2..62a976874 100644 --- a/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java +++ b/examples/src/main/java/io/dapr/examples/tracing/InvokeClient.java @@ -19,8 +19,8 @@ import io.dapr.client.domain.HttpExtension; import io.dapr.client.domain.InvokeMethodRequest; import io.dapr.examples.OpenTelemetryConfig; import io.dapr.utils.TypeRef; -import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Scope; import io.opentelemetry.sdk.OpenTelemetrySdk; @@ -48,10 +48,10 @@ public class InvokeClient { * @param args Messages to be sent as request for the invoke API. */ public static void main(String[] args) throws Exception { - final OpenTelemetry openTelemetry = OpenTelemetryConfig.createOpenTelemetry(); - final Tracer tracer = openTelemetry.getTracer(InvokeClient.class.getCanonicalName()); + OpenTelemetrySdk openTelemetrySdk = OpenTelemetryConfig.createOpenTelemetry(); + Tracer tracer = openTelemetrySdk.getTracer(InvokeClient.class.getCanonicalName()); + Span span = tracer.spanBuilder("Example's Main").setSpanKind(SpanKind.CLIENT).startSpan(); - Span span = tracer.spanBuilder("Example's Main").setSpanKind(Span.Kind.CLIENT).startSpan(); try (DaprClient client = (new DaprClientBuilder()).build()) { for (String message : args) { try (Scope scope = span.makeCurrent()) { @@ -77,14 +77,5 @@ public class InvokeClient { System.out.println("Done"); System.exit(0); } - span.end(); - shutdown(); - System.out.println("Done"); } - - private static void shutdown() throws Exception { - OpenTelemetrySdk.getGlobalTracerManagement().shutdown(); - Validation.validate(); - } - } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 09edb7892..20f64c757 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -1346,11 +1346,9 @@ public class DaprClientImpl extends AbstractDaprClient { } if (scheduleJobRequest.getFailurePolicy() != null) { - scheduleJobRequestBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy())); + jobBuilder.setFailurePolicy(getJobFailurePolicy(scheduleJobRequest.getFailurePolicy())); } - scheduleJobRequestBuilder.setOverwrite(scheduleJobRequest.getOverwrite()); - Mono scheduleJobResponseMono = Mono.deferContextual(context -> this.createMono( it -> intercept(context, asyncStub)