diff --git a/docs/supported-libraries.md b/docs/supported-libraries.md index 68065b2389..725423a4c4 100644 --- a/docs/supported-libraries.md +++ b/docs/supported-libraries.md @@ -107,6 +107,7 @@ These are the supported libraries and frameworks: | [OSHI](https://github.com/oshi/oshi/) | 5.3.1+ | [opentelemetry-oshi](../instrumentation/oshi/library) | [System Metrics] (partial support) | | [Play MVC](https://github.com/playframework/playframework) | 2.4+ | N/A | Provides `http.route` [2], Controller Spans [3] | | [Play WS](https://github.com/playframework/play-ws) | 1.0+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] | +| [PowerJob](http://www.powerjob.tech/) | 4.0.0+ | N/A | none | | [Quarkus Resteasy Reactive](https://quarkus.io/extensions/io.quarkus/quarkus-resteasy-reactive/) | 2.16.7+ | N/A | Provides `http.route` [2] | | [Quartz](https://www.quartz-scheduler.org/) | 2.0+ | [opentelemetry-quartz-2.0](../instrumentation/quartz-2.0/library) | none | | [R2DBC](https://r2dbc.io/) | 1.0+ | [opentelemetry-r2dbc-1.0](../instrumentation/r2dbc-1.0/library) | [Database Client Spans] | diff --git a/instrumentation/powerjob-4.0/README.md b/instrumentation/powerjob-4.0/README.md new file mode 100644 index 0000000000..f95b6da8b1 --- /dev/null +++ b/instrumentation/powerjob-4.0/README.md @@ -0,0 +1,5 @@ +# Settings for the PowerJob instrumentation + +| System property | Type | Default | Description | +|--------------------------------------------------------------|---------|---------|-----------------------------------------------------| +| `otel.instrumentation.powerjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. | diff --git a/instrumentation/powerjob-4.0/javaagent/build.gradle.kts b/instrumentation/powerjob-4.0/javaagent/build.gradle.kts new file mode 100644 index 0000000000..68b5361439 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/build.gradle.kts @@ -0,0 +1,25 @@ +plugins { + id("otel.javaagent-instrumentation") +} + +muzzle { + pass { + group.set("tech.powerjob") + module.set("powerjob-worker") + versions.set("[4.0.0,)") + assertInverse.set(true) + extraDependency("tech.powerjob:powerjob-official-processors:1.1.0") + } +} + +dependencies { + library("tech.powerjob:powerjob-worker:4.0.0") + library("tech.powerjob:powerjob-official-processors:1.1.0") +} + +tasks.withType().configureEach { + // required on jdk17 + jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED") + jvmArgs("-XX:+IgnoreUnrecognizedVMOptions") + jvmArgs("-Dotel.instrumentation.powerjob.experimental-span-attributes=true") +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/BasicProcessorInstrumentation.java b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/BasicProcessorInstrumentation.java new file mode 100644 index 0000000000..4fd168a2da --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/BasicProcessorInstrumentation.java @@ -0,0 +1,87 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; +import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface; +import static io.opentelemetry.javaagent.instrumentation.powerjob.v4_0.PowerJobSingletons.instrumenter; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.Scope; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; + +public class BasicProcessorInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return implementsInterface(named("tech.powerjob.worker.core.processor.sdk.BasicProcessor")); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + named("process") + .and(isPublic()) + .and( + takesArguments(1) + .and( + takesArgument( + 0, named("tech.powerjob.worker.core.processor.TaskContext")))), + BasicProcessorInstrumentation.class.getName() + "$ProcessAdvice"); + } + + public static class ProcessAdvice { + + @SuppressWarnings("unused") + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onSchedule( + @Advice.This BasicProcessor handler, + @Advice.Argument(0) TaskContext taskContext, + @Advice.Local("otelRequest") PowerJobProcessRequest request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + Context parentContext = currentContext(); + request = + PowerJobProcessRequest.createRequest( + taskContext.getJobId(), + handler, + "process", + taskContext.getJobParams(), + taskContext.getInstanceParams()); + + if (!instrumenter().shouldStart(parentContext, request)) { + return; + } + context = instrumenter().start(parentContext, request); + scope = context.makeCurrent(); + } + + @SuppressWarnings("unused") + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan( + @Advice.Return ProcessResult result, + @Advice.Thrown Throwable throwable, + @Advice.Local("otelRequest") PowerJobProcessRequest request, + @Advice.Local("otelContext") Context context, + @Advice.Local("otelScope") Scope scope) { + if (scope == null) { + return; + } + scope.close(); + instrumenter().end(context, request, result, throwable); + } + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobCodeAttributesGetter.java b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobCodeAttributesGetter.java new file mode 100644 index 0000000000..79d3316d10 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobCodeAttributesGetter.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter; +import javax.annotation.Nullable; + +class PowerJobCodeAttributesGetter implements CodeAttributesGetter { + + @Nullable + @Override + public Class getCodeClass(PowerJobProcessRequest powerJobProcessRequest) { + return powerJobProcessRequest.getDeclaringClass(); + } + + @Nullable + @Override + public String getMethodName(PowerJobProcessRequest powerJobProcessRequest) { + return powerJobProcessRequest.getMethodName(); + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobExperimentalAttributeExtractor.java b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobExperimentalAttributeExtractor.java new file mode 100644 index 0000000000..50f71ca847 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobExperimentalAttributeExtractor.java @@ -0,0 +1,45 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import javax.annotation.Nullable; +import tech.powerjob.worker.core.processor.ProcessResult; + +class PowerJobExperimentalAttributeExtractor + implements AttributesExtractor { + + private static final AttributeKey POWERJOB_JOB_ID = + AttributeKey.longKey("scheduling.powerjob.job.id"); + private static final AttributeKey POWERJOB_JOB_PARAM = + AttributeKey.stringKey("scheduling.powerjob.job.param"); + private static final AttributeKey POWERJOB_JOB_INSTANCE_PARAM = + AttributeKey.stringKey("scheduling.powerjob.job.instance.param"); + private static final AttributeKey POWERJOB_JOB_INSTANCE_TYPE = + AttributeKey.stringKey("scheduling.powerjob.job.type"); + + @Override + public void onStart( + AttributesBuilder attributes, + Context parentContext, + PowerJobProcessRequest powerJobProcessRequest) { + attributes.put(POWERJOB_JOB_ID, powerJobProcessRequest.getJobId()); + attributes.put(POWERJOB_JOB_PARAM, powerJobProcessRequest.getJobParams()); + attributes.put(POWERJOB_JOB_INSTANCE_PARAM, powerJobProcessRequest.getInstanceParams()); + attributes.put(POWERJOB_JOB_INSTANCE_TYPE, powerJobProcessRequest.getJobType()); + } + + @Override + public void onEnd( + AttributesBuilder attributes, + Context context, + PowerJobProcessRequest powerJobProcessRequest, + @Nullable ProcessResult unused, + @Nullable Throwable error) {} +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobInstrumentationModule.java b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobInstrumentationModule.java new file mode 100644 index 0000000000..4838b489f0 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobInstrumentationModule.java @@ -0,0 +1,24 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Collections; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class PowerJobInstrumentationModule extends InstrumentationModule { + public PowerJobInstrumentationModule() { + super("powerjob", "powerjob-4.0"); + } + + @Override + public List typeInstrumentations() { + return Collections.singletonList(new BasicProcessorInstrumentation()); + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobProcessRequest.java b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobProcessRequest.java new file mode 100644 index 0000000000..76145e1bf4 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobProcessRequest.java @@ -0,0 +1,95 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import java.util.Arrays; +import java.util.List; +import tech.powerjob.official.processors.impl.FileCleanupProcessor; +import tech.powerjob.official.processors.impl.HttpProcessor; +import tech.powerjob.official.processors.impl.script.PythonProcessor; +import tech.powerjob.official.processors.impl.script.ShellProcessor; +import tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor; +import tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; +import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor; +import tech.powerjob.worker.core.processor.sdk.MapProcessor; +import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; + +public final class PowerJobProcessRequest { + private final String methodName; + private final Long jobId; + private final String jobType; + private final Class declaringClass; + private final String jobParams; + private final String instanceParams; + private static final List> KNOWN_PROCESSORS = + Arrays.asList( + FileCleanupProcessor.class, + BroadcastProcessor.class, + MapReduceProcessor.class, + MapProcessor.class, + ShellProcessor.class, + PythonProcessor.class, + HttpProcessor.class, + SpringDatasourceSqlProcessor.class, + DynamicDatasourceSqlProcessor.class); + + private PowerJobProcessRequest( + Long jobId, + String methodName, + Class declaringClass, + String jobParams, + String instanceParams, + String jobType) { + this.jobId = jobId; + this.methodName = methodName; + this.jobType = jobType; + this.declaringClass = declaringClass; + this.jobParams = jobParams; + this.instanceParams = instanceParams; + } + + public static PowerJobProcessRequest createRequest( + Long jobId, + BasicProcessor handler, + String methodName, + String jobParams, + String instanceParams) { + String jobType = "BasicProcessor"; + for (Class processorClass : KNOWN_PROCESSORS) { + if (processorClass.isInstance(handler)) { + jobType = processorClass.getSimpleName(); + break; + } + } + return new PowerJobProcessRequest( + jobId, methodName, handler.getClass(), jobParams, instanceParams, jobType); + } + + public String getMethodName() { + return methodName; + } + + public Long getJobId() { + return jobId; + } + + public Class getDeclaringClass() { + return declaringClass; + } + + public String getJobParams() { + return jobParams; + } + + public String getInstanceParams() { + return instanceParams; + } + + public String getJobType() { + return jobType; + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobSingletons.java b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobSingletons.java new file mode 100644 index 0000000000..cf49c5d333 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobSingletons.java @@ -0,0 +1,62 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.StatusCode; +import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesExtractor; +import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeSpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusExtractor; +import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig; +import tech.powerjob.worker.core.processor.ProcessResult; + +public final class PowerJobSingletons { + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.powerjob-4.0"; + + private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = + AgentInstrumentationConfig.get() + .getBoolean("otel.instrumentation.powerjob.experimental-span-attributes", false); + private static final Instrumenter INSTRUMENTER = create(); + + public static Instrumenter instrumenter() { + return INSTRUMENTER; + } + + private static Instrumenter create() { + PowerJobCodeAttributesGetter codeAttributesGetter = new PowerJobCodeAttributesGetter(); + SpanNameExtractor spanNameExtractor = + CodeSpanNameExtractor.create(codeAttributesGetter); + + InstrumenterBuilder builder = + Instrumenter.builder( + GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor) + .addAttributesExtractor(CodeAttributesExtractor.create(codeAttributesGetter)) + .setSpanStatusExtractor( + (spanStatusBuilder, powerJobProcessRequest, response, error) -> { + if (response != null && !response.isSuccess()) { + spanStatusBuilder.setStatus(StatusCode.ERROR); + } else { + SpanStatusExtractor.getDefault() + .extract(spanStatusBuilder, powerJobProcessRequest, response, error); + } + }); + + if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + builder.addAttributesExtractor( + AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "powerjob")); + builder.addAttributesExtractor(new PowerJobExperimentalAttributeExtractor()); + } + + return builder.buildInstrumenter(); + } + + private PowerJobSingletons() {} +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobBasicProcessorTest.java b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobBasicProcessorTest.java new file mode 100644 index 0000000000..f38eecfd18 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/PowerJobBasicProcessorTest.java @@ -0,0 +1,373 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static java.util.Arrays.asList; + +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import com.zaxxer.hikari.HikariDataSource; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.internal.StringUtils; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.util.ArrayList; +import java.util.List; +import javax.sql.DataSource; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import tech.powerjob.official.processors.impl.FileCleanupProcessor; +import tech.powerjob.official.processors.impl.HttpProcessor; +import tech.powerjob.official.processors.impl.script.PythonProcessor; +import tech.powerjob.official.processors.impl.script.ShellProcessor; +import tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor; +import tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.WorkflowContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; +import tech.powerjob.worker.log.OmsLogger; + +class PowerJobBasicProcessorTest { + @RegisterExtension + private static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static final String BASIC_PROCESSOR = "BasicProcessor"; + private static final String BROADCAST_PROCESSOR = "BroadcastProcessor"; + private static final String MAP_PROCESSOR = "MapProcessor"; + private static final String MAP_REDUCE_PROCESSOR = "MapReduceProcessor"; + private static final String SHELL_PROCESSOR = "ShellProcessor"; + private static final String PYTHON_PROCESSOR = "PythonProcessor"; + private static final String HTTP_PROCESSOR = "HttpProcessor"; + private static final String FILE_CLEANUP_PROCESSOR = "FileCleanupProcessor"; + private static final String SPRING_DATASOURCE_SQL_PROCESSOR = "SpringDatasourceSqlProcessor"; + private static final String DYNAMIC_DATASOURCE_SQL_PROCESSOR = "DynamicDatasourceSqlProcessor"; + + @Test + void testBasicProcessor() throws Exception { + long jobId = 1; + String jobParam = "abc"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + BasicProcessor testBasicProcessor = new TestBasicProcessor(); + testBasicProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(String.format("%s.process", TestBasicProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + TestBasicProcessor.class.getName(), jobId, jobParam, BASIC_PROCESSOR)); + }); + }); + } + + @Test + void testBasicFailProcessor() throws Exception { + long jobId = 1; + String jobParam = "abc"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + BasicProcessor testBasicFailProcessor = new TestBasicFailProcessor(); + testBasicFailProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName( + String.format("%s.process", TestBasicFailProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + TestBasicFailProcessor.class.getName(), + jobId, + jobParam, + BASIC_PROCESSOR)); + }); + }); + } + + @Test + void testBroadcastProcessor() throws Exception { + long jobId = 1; + String jobParam = "abc"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + BasicProcessor testBroadcastProcessor = new TestBroadcastProcessor(); + testBroadcastProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName( + String.format("%s.process", TestBroadcastProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + TestBroadcastProcessor.class.getName(), + jobId, + jobParam, + BROADCAST_PROCESSOR)); + }); + }); + } + + @Test + void testMapProcessor() throws Exception { + long jobId = 1; + String jobParam = "abc"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + BasicProcessor testMapProcessProcessor = new TestMapProcessProcessor(); + testMapProcessProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName( + String.format("%s.process", TestMapProcessProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + TestMapProcessProcessor.class.getName(), + jobId, + jobParam, + MAP_PROCESSOR)); + }); + }); + } + + @Test + void testMapReduceProcessor() throws Exception { + long jobId = 1; + String jobParam = "abc"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + BasicProcessor testMapReduceProcessProcessor = new TestMapReduceProcessProcessor(); + testMapReduceProcessProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName( + String.format( + "%s.process", TestMapReduceProcessProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + TestMapReduceProcessProcessor.class.getName(), + jobId, + jobParam, + MAP_REDUCE_PROCESSOR)); + }); + }); + } + + @Test + void testShellProcessor() throws Exception { + long jobId = 1; + String jobParam = "ls"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + taskContext.setWorkflowContext(new WorkflowContext(jobId, "")); + taskContext.setOmsLogger(new TestOmsLogger()); + BasicProcessor shellProcessor = new ShellProcessor(); + shellProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(String.format("%s.process", ShellProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + ShellProcessor.class.getName(), jobId, jobParam, SHELL_PROCESSOR)); + }); + }); + } + + @Test + void testPythonProcessor() throws Exception { + long jobId = 1; + String jobParam = "1+1"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + taskContext.setWorkflowContext(new WorkflowContext(jobId, "")); + taskContext.setOmsLogger(new TestOmsLogger()); + BasicProcessor pythonProcessor = new PythonProcessor(); + pythonProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(String.format("%s.process", PythonProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + PythonProcessor.class.getName(), jobId, jobParam, PYTHON_PROCESSOR)); + }); + }); + } + + @Test + void testHttpProcessor() throws Exception { + + long jobId = 1; + String jobParam = "{\"method\":\"GET\"}"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + taskContext.setWorkflowContext(new WorkflowContext(jobId, "")); + taskContext.setOmsLogger(new TestOmsLogger()); + BasicProcessor httpProcessor = new HttpProcessor(); + httpProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(String.format("%s.process", HttpProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + HttpProcessor.class.getName(), jobId, jobParam, HTTP_PROCESSOR)); + }); + }); + } + + @Test + void testFileCleanerProcessor() throws Exception { + + long jobId = 1; + JSONObject params = new JSONObject(); + params.put("dirPath", "/abc"); + params.put("filePattern", "[\\s\\S]*log"); + params.put("retentionTime", 0); + JSONArray array = new JSONArray(); + array.add(params); + String jobParam = array.toJSONString(); + TaskContext taskContext = genTaskContext(jobId, jobParam); + taskContext.setOmsLogger(new TestOmsLogger()); + BasicProcessor fileCleanupProcessor = new FileCleanupProcessor(); + fileCleanupProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName( + String.format("%s.process", FileCleanupProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.unset()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + FileCleanupProcessor.class.getName(), + jobId, + jobParam, + FILE_CLEANUP_PROCESSOR)); + }); + }); + } + + @Test + void testSpringDataSourceProcessor() throws Exception { + DataSource dataSource = new HikariDataSource(); + long jobId = 1; + String jobParam = "{\"dirPath\":\"/abc\"}"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + taskContext.setWorkflowContext(new WorkflowContext(jobId, "")); + taskContext.setOmsLogger(new TestOmsLogger()); + BasicProcessor springDatasourceSqlProcessor = new SpringDatasourceSqlProcessor(dataSource); + springDatasourceSqlProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName( + String.format( + "%s.process", SpringDatasourceSqlProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + SpringDatasourceSqlProcessor.class.getName(), + jobId, + jobParam, + SPRING_DATASOURCE_SQL_PROCESSOR)); + }); + }); + } + + @Test + void testDynamicDataSourceProcessor() throws Exception { + + long jobId = 1; + String jobParam = "{\"dirPath\":\"/abc\"}"; + TaskContext taskContext = genTaskContext(jobId, jobParam); + taskContext.setWorkflowContext(new WorkflowContext(jobId, "")); + taskContext.setOmsLogger(new TestOmsLogger()); + BasicProcessor dynamicDatasourceSqlProcessor = new DynamicDatasourceSqlProcessor(); + dynamicDatasourceSqlProcessor.process(taskContext); + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName( + String.format( + "%s.process", DynamicDatasourceSqlProcessor.class.getSimpleName())) + .hasKind(SpanKind.INTERNAL) + .hasStatus(StatusData.error()) + .hasAttributesSatisfyingExactly( + attributeAssertions( + DynamicDatasourceSqlProcessor.class.getName(), + jobId, + jobParam, + DYNAMIC_DATASOURCE_SQL_PROCESSOR)); + }); + }); + } + + private static TaskContext genTaskContext(long jobId, String jobParam) { + TaskContext taskContext = new TaskContext(); + taskContext.setJobId(jobId); + taskContext.setJobParams(jobParam); + return taskContext; + } + + private static List attributeAssertions( + String codeNamespace, long jobId, String jobParam, String jobType) { + List attributeAssertions = + new ArrayList<>( + asList( + equalTo(AttributeKey.stringKey("code.namespace"), codeNamespace), + equalTo(AttributeKey.stringKey("code.function"), "process"), + equalTo(AttributeKey.stringKey("job.system"), "powerjob"), + equalTo(AttributeKey.longKey("scheduling.powerjob.job.id"), jobId), + equalTo(AttributeKey.stringKey("scheduling.powerjob.job.type"), jobType))); + if (!StringUtils.isNullOrEmpty(jobParam)) { + attributeAssertions.add( + equalTo(AttributeKey.stringKey("scheduling.powerjob.job.param"), jobParam)); + } + return attributeAssertions; + } + + private static class TestOmsLogger implements OmsLogger { + + @Override + public void debug(String s, Object... objects) {} + + @Override + public void info(String s, Object... objects) {} + + @Override + public void warn(String s, Object... objects) {} + + @Override + public void error(String s, Object... objects) {} + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBasicFailProcessor.java b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBasicFailProcessor.java new file mode 100644 index 0000000000..9c2e2ff360 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBasicFailProcessor.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; + +class TestBasicFailProcessor implements BasicProcessor { + + @Override + public ProcessResult process(TaskContext context) { + return new ProcessResult(false, "fail"); + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBasicProcessor.java b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBasicProcessor.java new file mode 100644 index 0000000000..b15848cdde --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBasicProcessor.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.BasicProcessor; + +class TestBasicProcessor implements BasicProcessor { + + @Override + public ProcessResult process(TaskContext context) { + return new ProcessResult(true, System.currentTimeMillis() + "success"); + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBroadcastProcessor.java b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBroadcastProcessor.java new file mode 100644 index 0000000000..5c170974be --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestBroadcastProcessor.java @@ -0,0 +1,30 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import java.util.List; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor; + +class TestBroadcastProcessor implements BroadcastProcessor { + + @Override + public ProcessResult preProcess(TaskContext taskContext) { + return new ProcessResult(true, "preProcess success"); + } + + @Override + public ProcessResult postProcess(TaskContext taskContext, List taskResults) { + return new ProcessResult(true, "postProcess success"); + } + + @Override + public ProcessResult process(TaskContext context) { + return new ProcessResult(true, "processSuccess"); + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestMapProcessProcessor.java b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestMapProcessProcessor.java new file mode 100644 index 0000000000..2b0855efa7 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestMapProcessProcessor.java @@ -0,0 +1,18 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.sdk.MapProcessor; + +class TestMapProcessProcessor implements MapProcessor { + + @Override + public ProcessResult process(TaskContext context) { + return new ProcessResult(true, "processSuccess"); + } +} diff --git a/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestMapReduceProcessProcessor.java b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestMapReduceProcessProcessor.java new file mode 100644 index 0000000000..06497121e5 --- /dev/null +++ b/instrumentation/powerjob-4.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/powerjob/v4_0/TestMapReduceProcessProcessor.java @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0; + +import java.util.List; +import tech.powerjob.worker.core.processor.ProcessResult; +import tech.powerjob.worker.core.processor.TaskContext; +import tech.powerjob.worker.core.processor.TaskResult; +import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor; + +class TestMapReduceProcessProcessor implements MapReduceProcessor { + + @Override + public ProcessResult process(TaskContext context) { + return new ProcessResult(true, "processSuccess"); + } + + @Override + public ProcessResult reduce(TaskContext taskContext, List list) { + return new ProcessResult(true, "reduceSuccess"); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 5882499605..c89064ca61 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -479,6 +479,7 @@ include(":instrumentation:play:play-ws:play-ws-2.0:javaagent") include(":instrumentation:play:play-ws:play-ws-2.1:javaagent") include(":instrumentation:play:play-ws:play-ws-common:javaagent") include(":instrumentation:play:play-ws:play-ws-common:testing") +include(":instrumentation:powerjob-4.0:javaagent") include(":instrumentation:pulsar:pulsar-2.8:javaagent") include(":instrumentation:pulsar:pulsar-2.8:javaagent-unit-tests") include(":instrumentation:quarkus-resteasy-reactive:common-testing")