Add support for PowerJob (#12086)

Co-authored-by: Steve Rao <raozihao.rzh@alibaba-inc.com>
Co-authored-by: Helen <56097766+heyams@users.noreply.github.com>
This commit is contained in:
crossoverJie 2024-09-13 00:05:17 +08:00 committed by GitHub
parent 7ecc678866
commit 844c28c65f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 851 additions and 0 deletions

View File

@ -107,6 +107,7 @@ These are the supported libraries and frameworks:
| [OSHI](https://github.com/oshi/oshi/) | 5.3.1+ | [opentelemetry-oshi](../instrumentation/oshi/library) | [System Metrics] (partial support) |
| [Play MVC](https://github.com/playframework/playframework) | 2.4+ | N/A | Provides `http.route` [2], Controller Spans [3] |
| [Play WS](https://github.com/playframework/play-ws) | 1.0+ | N/A | [HTTP Client Spans], [HTTP Client Metrics] |
| [PowerJob](http://www.powerjob.tech/) | 4.0.0+ | N/A | none |
| [Quarkus Resteasy Reactive](https://quarkus.io/extensions/io.quarkus/quarkus-resteasy-reactive/) | 2.16.7+ | N/A | Provides `http.route` [2] |
| [Quartz](https://www.quartz-scheduler.org/) | 2.0+ | [opentelemetry-quartz-2.0](../instrumentation/quartz-2.0/library) | none |
| [R2DBC](https://r2dbc.io/) | 1.0+ | [opentelemetry-r2dbc-1.0](../instrumentation/r2dbc-1.0/library) | [Database Client Spans] |

View File

@ -0,0 +1,5 @@
# Settings for the PowerJob instrumentation
| System property | Type | Default | Description |
|--------------------------------------------------------------|---------|---------|-----------------------------------------------------|
| `otel.instrumentation.powerjob.experimental-span-attributes` | Boolean | `false` | Enable the capture of experimental span attributes. |

View File

@ -0,0 +1,25 @@
plugins {
id("otel.javaagent-instrumentation")
}
muzzle {
pass {
group.set("tech.powerjob")
module.set("powerjob-worker")
versions.set("[4.0.0,)")
assertInverse.set(true)
extraDependency("tech.powerjob:powerjob-official-processors:1.1.0")
}
}
dependencies {
library("tech.powerjob:powerjob-worker:4.0.0")
library("tech.powerjob:powerjob-official-processors:1.1.0")
}
tasks.withType<Test>().configureEach {
// required on jdk17
jvmArgs("--add-opens=java.base/java.lang=ALL-UNNAMED")
jvmArgs("-XX:+IgnoreUnrecognizedVMOptions")
jvmArgs("-Dotel.instrumentation.powerjob.experimental-span-attributes=true")
}

View File

@ -0,0 +1,87 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.powerjob.v4_0.PowerJobSingletons.instrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
public class BasicProcessorInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(named("tech.powerjob.worker.core.processor.sdk.BasicProcessor"));
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("process")
.and(isPublic())
.and(
takesArguments(1)
.and(
takesArgument(
0, named("tech.powerjob.worker.core.processor.TaskContext")))),
BasicProcessorInstrumentation.class.getName() + "$ProcessAdvice");
}
public static class ProcessAdvice {
@SuppressWarnings("unused")
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onSchedule(
@Advice.This BasicProcessor handler,
@Advice.Argument(0) TaskContext taskContext,
@Advice.Local("otelRequest") PowerJobProcessRequest request,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
Context parentContext = currentContext();
request =
PowerJobProcessRequest.createRequest(
taskContext.getJobId(),
handler,
"process",
taskContext.getJobParams(),
taskContext.getInstanceParams());
if (!instrumenter().shouldStart(parentContext, request)) {
return;
}
context = instrumenter().start(parentContext, request);
scope = context.makeCurrent();
}
@SuppressWarnings("unused")
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Return ProcessResult result,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelRequest") PowerJobProcessRequest request,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
instrumenter().end(context, request, result, throwable);
}
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesGetter;
import javax.annotation.Nullable;
class PowerJobCodeAttributesGetter implements CodeAttributesGetter<PowerJobProcessRequest> {
@Nullable
@Override
public Class<?> getCodeClass(PowerJobProcessRequest powerJobProcessRequest) {
return powerJobProcessRequest.getDeclaringClass();
}
@Nullable
@Override
public String getMethodName(PowerJobProcessRequest powerJobProcessRequest) {
return powerJobProcessRequest.getMethodName();
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import javax.annotation.Nullable;
import tech.powerjob.worker.core.processor.ProcessResult;
class PowerJobExperimentalAttributeExtractor
implements AttributesExtractor<PowerJobProcessRequest, ProcessResult> {
private static final AttributeKey<Long> POWERJOB_JOB_ID =
AttributeKey.longKey("scheduling.powerjob.job.id");
private static final AttributeKey<String> POWERJOB_JOB_PARAM =
AttributeKey.stringKey("scheduling.powerjob.job.param");
private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_PARAM =
AttributeKey.stringKey("scheduling.powerjob.job.instance.param");
private static final AttributeKey<String> POWERJOB_JOB_INSTANCE_TYPE =
AttributeKey.stringKey("scheduling.powerjob.job.type");
@Override
public void onStart(
AttributesBuilder attributes,
Context parentContext,
PowerJobProcessRequest powerJobProcessRequest) {
attributes.put(POWERJOB_JOB_ID, powerJobProcessRequest.getJobId());
attributes.put(POWERJOB_JOB_PARAM, powerJobProcessRequest.getJobParams());
attributes.put(POWERJOB_JOB_INSTANCE_PARAM, powerJobProcessRequest.getInstanceParams());
attributes.put(POWERJOB_JOB_INSTANCE_TYPE, powerJobProcessRequest.getJobType());
}
@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
PowerJobProcessRequest powerJobProcessRequest,
@Nullable ProcessResult unused,
@Nullable Throwable error) {}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Collections;
import java.util.List;
@AutoService(InstrumentationModule.class)
public class PowerJobInstrumentationModule extends InstrumentationModule {
public PowerJobInstrumentationModule() {
super("powerjob", "powerjob-4.0");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new BasicProcessorInstrumentation());
}
}

View File

@ -0,0 +1,95 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import java.util.Arrays;
import java.util.List;
import tech.powerjob.official.processors.impl.FileCleanupProcessor;
import tech.powerjob.official.processors.impl.HttpProcessor;
import tech.powerjob.official.processors.impl.script.PythonProcessor;
import tech.powerjob.official.processors.impl.script.ShellProcessor;
import tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor;
import tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
import tech.powerjob.worker.core.processor.sdk.MapProcessor;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
public final class PowerJobProcessRequest {
private final String methodName;
private final Long jobId;
private final String jobType;
private final Class<?> declaringClass;
private final String jobParams;
private final String instanceParams;
private static final List<Class<?>> KNOWN_PROCESSORS =
Arrays.asList(
FileCleanupProcessor.class,
BroadcastProcessor.class,
MapReduceProcessor.class,
MapProcessor.class,
ShellProcessor.class,
PythonProcessor.class,
HttpProcessor.class,
SpringDatasourceSqlProcessor.class,
DynamicDatasourceSqlProcessor.class);
private PowerJobProcessRequest(
Long jobId,
String methodName,
Class<?> declaringClass,
String jobParams,
String instanceParams,
String jobType) {
this.jobId = jobId;
this.methodName = methodName;
this.jobType = jobType;
this.declaringClass = declaringClass;
this.jobParams = jobParams;
this.instanceParams = instanceParams;
}
public static PowerJobProcessRequest createRequest(
Long jobId,
BasicProcessor handler,
String methodName,
String jobParams,
String instanceParams) {
String jobType = "BasicProcessor";
for (Class<?> processorClass : KNOWN_PROCESSORS) {
if (processorClass.isInstance(handler)) {
jobType = processorClass.getSimpleName();
break;
}
}
return new PowerJobProcessRequest(
jobId, methodName, handler.getClass(), jobParams, instanceParams, jobType);
}
public String getMethodName() {
return methodName;
}
public Long getJobId() {
return jobId;
}
public Class<?> getDeclaringClass() {
return declaringClass;
}
public String getJobParams() {
return jobParams;
}
public String getInstanceParams() {
return instanceParams;
}
public String getJobType() {
return jobType;
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeAttributesExtractor;
import io.opentelemetry.instrumentation.api.incubator.semconv.code.CodeSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusExtractor;
import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig;
import tech.powerjob.worker.core.processor.ProcessResult;
public final class PowerJobSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.powerjob-4.0";
private static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
AgentInstrumentationConfig.get()
.getBoolean("otel.instrumentation.powerjob.experimental-span-attributes", false);
private static final Instrumenter<PowerJobProcessRequest, ProcessResult> INSTRUMENTER = create();
public static Instrumenter<PowerJobProcessRequest, ProcessResult> instrumenter() {
return INSTRUMENTER;
}
private static Instrumenter<PowerJobProcessRequest, ProcessResult> create() {
PowerJobCodeAttributesGetter codeAttributesGetter = new PowerJobCodeAttributesGetter();
SpanNameExtractor<PowerJobProcessRequest> spanNameExtractor =
CodeSpanNameExtractor.create(codeAttributesGetter);
InstrumenterBuilder<PowerJobProcessRequest, ProcessResult> builder =
Instrumenter.<PowerJobProcessRequest, ProcessResult>builder(
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractor(CodeAttributesExtractor.create(codeAttributesGetter))
.setSpanStatusExtractor(
(spanStatusBuilder, powerJobProcessRequest, response, error) -> {
if (response != null && !response.isSuccess()) {
spanStatusBuilder.setStatus(StatusCode.ERROR);
} else {
SpanStatusExtractor.getDefault()
.extract(spanStatusBuilder, powerJobProcessRequest, response, error);
}
});
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
builder.addAttributesExtractor(
AttributesExtractor.constant(AttributeKey.stringKey("job.system"), "powerjob"));
builder.addAttributesExtractor(new PowerJobExperimentalAttributeExtractor());
}
return builder.buildInstrumenter();
}
private PowerJobSingletons() {}
}

View File

@ -0,0 +1,373 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static java.util.Arrays.asList;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.zaxxer.hikari.HikariDataSource;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.internal.StringUtils;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import tech.powerjob.official.processors.impl.FileCleanupProcessor;
import tech.powerjob.official.processors.impl.HttpProcessor;
import tech.powerjob.official.processors.impl.script.PythonProcessor;
import tech.powerjob.official.processors.impl.script.ShellProcessor;
import tech.powerjob.official.processors.impl.sql.DynamicDatasourceSqlProcessor;
import tech.powerjob.official.processors.impl.sql.SpringDatasourceSqlProcessor;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.WorkflowContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
import tech.powerjob.worker.log.OmsLogger;
class PowerJobBasicProcessorTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static final String BASIC_PROCESSOR = "BasicProcessor";
private static final String BROADCAST_PROCESSOR = "BroadcastProcessor";
private static final String MAP_PROCESSOR = "MapProcessor";
private static final String MAP_REDUCE_PROCESSOR = "MapReduceProcessor";
private static final String SHELL_PROCESSOR = "ShellProcessor";
private static final String PYTHON_PROCESSOR = "PythonProcessor";
private static final String HTTP_PROCESSOR = "HttpProcessor";
private static final String FILE_CLEANUP_PROCESSOR = "FileCleanupProcessor";
private static final String SPRING_DATASOURCE_SQL_PROCESSOR = "SpringDatasourceSqlProcessor";
private static final String DYNAMIC_DATASOURCE_SQL_PROCESSOR = "DynamicDatasourceSqlProcessor";
@Test
void testBasicProcessor() throws Exception {
long jobId = 1;
String jobParam = "abc";
TaskContext taskContext = genTaskContext(jobId, jobParam);
BasicProcessor testBasicProcessor = new TestBasicProcessor();
testBasicProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(String.format("%s.process", TestBasicProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
attributeAssertions(
TestBasicProcessor.class.getName(), jobId, jobParam, BASIC_PROCESSOR));
});
});
}
@Test
void testBasicFailProcessor() throws Exception {
long jobId = 1;
String jobParam = "abc";
TaskContext taskContext = genTaskContext(jobId, jobParam);
BasicProcessor testBasicFailProcessor = new TestBasicFailProcessor();
testBasicFailProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(
String.format("%s.process", TestBasicFailProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.error())
.hasAttributesSatisfyingExactly(
attributeAssertions(
TestBasicFailProcessor.class.getName(),
jobId,
jobParam,
BASIC_PROCESSOR));
});
});
}
@Test
void testBroadcastProcessor() throws Exception {
long jobId = 1;
String jobParam = "abc";
TaskContext taskContext = genTaskContext(jobId, jobParam);
BasicProcessor testBroadcastProcessor = new TestBroadcastProcessor();
testBroadcastProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(
String.format("%s.process", TestBroadcastProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
attributeAssertions(
TestBroadcastProcessor.class.getName(),
jobId,
jobParam,
BROADCAST_PROCESSOR));
});
});
}
@Test
void testMapProcessor() throws Exception {
long jobId = 1;
String jobParam = "abc";
TaskContext taskContext = genTaskContext(jobId, jobParam);
BasicProcessor testMapProcessProcessor = new TestMapProcessProcessor();
testMapProcessProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(
String.format("%s.process", TestMapProcessProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
attributeAssertions(
TestMapProcessProcessor.class.getName(),
jobId,
jobParam,
MAP_PROCESSOR));
});
});
}
@Test
void testMapReduceProcessor() throws Exception {
long jobId = 1;
String jobParam = "abc";
TaskContext taskContext = genTaskContext(jobId, jobParam);
BasicProcessor testMapReduceProcessProcessor = new TestMapReduceProcessProcessor();
testMapReduceProcessProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(
String.format(
"%s.process", TestMapReduceProcessProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
attributeAssertions(
TestMapReduceProcessProcessor.class.getName(),
jobId,
jobParam,
MAP_REDUCE_PROCESSOR));
});
});
}
@Test
void testShellProcessor() throws Exception {
long jobId = 1;
String jobParam = "ls";
TaskContext taskContext = genTaskContext(jobId, jobParam);
taskContext.setWorkflowContext(new WorkflowContext(jobId, ""));
taskContext.setOmsLogger(new TestOmsLogger());
BasicProcessor shellProcessor = new ShellProcessor();
shellProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(String.format("%s.process", ShellProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
attributeAssertions(
ShellProcessor.class.getName(), jobId, jobParam, SHELL_PROCESSOR));
});
});
}
@Test
void testPythonProcessor() throws Exception {
long jobId = 1;
String jobParam = "1+1";
TaskContext taskContext = genTaskContext(jobId, jobParam);
taskContext.setWorkflowContext(new WorkflowContext(jobId, ""));
taskContext.setOmsLogger(new TestOmsLogger());
BasicProcessor pythonProcessor = new PythonProcessor();
pythonProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(String.format("%s.process", PythonProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
attributeAssertions(
PythonProcessor.class.getName(), jobId, jobParam, PYTHON_PROCESSOR));
});
});
}
@Test
void testHttpProcessor() throws Exception {
long jobId = 1;
String jobParam = "{\"method\":\"GET\"}";
TaskContext taskContext = genTaskContext(jobId, jobParam);
taskContext.setWorkflowContext(new WorkflowContext(jobId, ""));
taskContext.setOmsLogger(new TestOmsLogger());
BasicProcessor httpProcessor = new HttpProcessor();
httpProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(String.format("%s.process", HttpProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.error())
.hasAttributesSatisfyingExactly(
attributeAssertions(
HttpProcessor.class.getName(), jobId, jobParam, HTTP_PROCESSOR));
});
});
}
@Test
void testFileCleanerProcessor() throws Exception {
long jobId = 1;
JSONObject params = new JSONObject();
params.put("dirPath", "/abc");
params.put("filePattern", "[\\s\\S]*log");
params.put("retentionTime", 0);
JSONArray array = new JSONArray();
array.add(params);
String jobParam = array.toJSONString();
TaskContext taskContext = genTaskContext(jobId, jobParam);
taskContext.setOmsLogger(new TestOmsLogger());
BasicProcessor fileCleanupProcessor = new FileCleanupProcessor();
fileCleanupProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(
String.format("%s.process", FileCleanupProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.unset())
.hasAttributesSatisfyingExactly(
attributeAssertions(
FileCleanupProcessor.class.getName(),
jobId,
jobParam,
FILE_CLEANUP_PROCESSOR));
});
});
}
@Test
void testSpringDataSourceProcessor() throws Exception {
DataSource dataSource = new HikariDataSource();
long jobId = 1;
String jobParam = "{\"dirPath\":\"/abc\"}";
TaskContext taskContext = genTaskContext(jobId, jobParam);
taskContext.setWorkflowContext(new WorkflowContext(jobId, ""));
taskContext.setOmsLogger(new TestOmsLogger());
BasicProcessor springDatasourceSqlProcessor = new SpringDatasourceSqlProcessor(dataSource);
springDatasourceSqlProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(
String.format(
"%s.process", SpringDatasourceSqlProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.error())
.hasAttributesSatisfyingExactly(
attributeAssertions(
SpringDatasourceSqlProcessor.class.getName(),
jobId,
jobParam,
SPRING_DATASOURCE_SQL_PROCESSOR));
});
});
}
@Test
void testDynamicDataSourceProcessor() throws Exception {
long jobId = 1;
String jobParam = "{\"dirPath\":\"/abc\"}";
TaskContext taskContext = genTaskContext(jobId, jobParam);
taskContext.setWorkflowContext(new WorkflowContext(jobId, ""));
taskContext.setOmsLogger(new TestOmsLogger());
BasicProcessor dynamicDatasourceSqlProcessor = new DynamicDatasourceSqlProcessor();
dynamicDatasourceSqlProcessor.process(taskContext);
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> {
span.hasName(
String.format(
"%s.process", DynamicDatasourceSqlProcessor.class.getSimpleName()))
.hasKind(SpanKind.INTERNAL)
.hasStatus(StatusData.error())
.hasAttributesSatisfyingExactly(
attributeAssertions(
DynamicDatasourceSqlProcessor.class.getName(),
jobId,
jobParam,
DYNAMIC_DATASOURCE_SQL_PROCESSOR));
});
});
}
private static TaskContext genTaskContext(long jobId, String jobParam) {
TaskContext taskContext = new TaskContext();
taskContext.setJobId(jobId);
taskContext.setJobParams(jobParam);
return taskContext;
}
private static List<AttributeAssertion> attributeAssertions(
String codeNamespace, long jobId, String jobParam, String jobType) {
List<AttributeAssertion> attributeAssertions =
new ArrayList<>(
asList(
equalTo(AttributeKey.stringKey("code.namespace"), codeNamespace),
equalTo(AttributeKey.stringKey("code.function"), "process"),
equalTo(AttributeKey.stringKey("job.system"), "powerjob"),
equalTo(AttributeKey.longKey("scheduling.powerjob.job.id"), jobId),
equalTo(AttributeKey.stringKey("scheduling.powerjob.job.type"), jobType)));
if (!StringUtils.isNullOrEmpty(jobParam)) {
attributeAssertions.add(
equalTo(AttributeKey.stringKey("scheduling.powerjob.job.param"), jobParam));
}
return attributeAssertions;
}
private static class TestOmsLogger implements OmsLogger {
@Override
public void debug(String s, Object... objects) {}
@Override
public void info(String s, Object... objects) {}
@Override
public void warn(String s, Object... objects) {}
@Override
public void error(String s, Object... objects) {}
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
class TestBasicFailProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) {
return new ProcessResult(false, "fail");
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.BasicProcessor;
class TestBasicProcessor implements BasicProcessor {
@Override
public ProcessResult process(TaskContext context) {
return new ProcessResult(true, System.currentTimeMillis() + "success");
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import java.util.List;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.BroadcastProcessor;
class TestBroadcastProcessor implements BroadcastProcessor {
@Override
public ProcessResult preProcess(TaskContext taskContext) {
return new ProcessResult(true, "preProcess success");
}
@Override
public ProcessResult postProcess(TaskContext taskContext, List<TaskResult> taskResults) {
return new ProcessResult(true, "postProcess success");
}
@Override
public ProcessResult process(TaskContext context) {
return new ProcessResult(true, "processSuccess");
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.sdk.MapProcessor;
class TestMapProcessProcessor implements MapProcessor {
@Override
public ProcessResult process(TaskContext context) {
return new ProcessResult(true, "processSuccess");
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.powerjob.v4_0;
import java.util.List;
import tech.powerjob.worker.core.processor.ProcessResult;
import tech.powerjob.worker.core.processor.TaskContext;
import tech.powerjob.worker.core.processor.TaskResult;
import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;
class TestMapReduceProcessProcessor implements MapReduceProcessor {
@Override
public ProcessResult process(TaskContext context) {
return new ProcessResult(true, "processSuccess");
}
@Override
public ProcessResult reduce(TaskContext taskContext, List<TaskResult> list) {
return new ProcessResult(true, "reduceSuccess");
}
}

View File

@ -479,6 +479,7 @@ include(":instrumentation:play:play-ws:play-ws-2.0:javaagent")
include(":instrumentation:play:play-ws:play-ws-2.1:javaagent")
include(":instrumentation:play:play-ws:play-ws-common:javaagent")
include(":instrumentation:play:play-ws:play-ws-common:testing")
include(":instrumentation:powerjob-4.0:javaagent")
include(":instrumentation:pulsar:pulsar-2.8:javaagent")
include(":instrumentation:pulsar:pulsar-2.8:javaagent-unit-tests")
include(":instrumentation:quarkus-resteasy-reactive:common-testing")