Spring batch tests to java - basic (#12076)

This commit is contained in:
Gregor Zeitlinger 2024-08-29 02:02:05 +02:00 committed by GitHub
parent 3bcbacc8d2
commit 6ae3e31dfc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
45 changed files with 1563 additions and 397 deletions

View File

@ -1,346 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import org.springframework.batch.core.JobParameter
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.StatusCode.ERROR
import static java.util.Collections.emptyMap
abstract class SpringBatchTest extends AgentInstrumentationSpecification {
abstract runJob(String jobName, Map<String, JobParameter> params = emptyMap())
def "should trace tasklet job+step"() {
when:
runJob("taskletJob")
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "BatchJob taskletJob"
kind INTERNAL
attributes {
"job.system" "spring_batch"
}
}
span(1) {
name "BatchJob taskletJob.step"
kind INTERNAL
childOf span(0)
attributes {}
}
span(2) {
name "BatchJob taskletJob.step.Tasklet"
kind INTERNAL
childOf span(1)
attributes {}
}
}
}
}
def "should handle exception in tasklet job+step"() {
when:
runJob("taskletJob", ["fail": new JobParameter(1)])
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "BatchJob taskletJob"
kind INTERNAL
attributes {
"job.system" "spring_batch"
}
}
span(1) {
name "BatchJob taskletJob.step"
kind INTERNAL
childOf span(0)
attributes {}
}
span(2) {
name "BatchJob taskletJob.step.Tasklet"
kind INTERNAL
childOf span(1)
status ERROR
errorEvent IllegalStateException, "fail"
attributes {}
}
}
}
}
def "should trace chunked items job"() {
when:
runJob("itemsAndTaskletJob")
then:
assertTraces(1) {
trace(0, 7) {
span(0) {
name "BatchJob itemsAndTaskletJob"
kind INTERNAL
attributes {
"job.system" "spring_batch"
}
}
span(1) {
name "BatchJob itemsAndTaskletJob.itemStep"
kind INTERNAL
childOf span(0)
attributes {}
}
span(2) {
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
kind INTERNAL
childOf span(1)
attributes {}
}
span(3) {
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
kind INTERNAL
childOf span(1)
attributes {}
}
span(4) {
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
kind INTERNAL
childOf span(1)
attributes {}
}
span(5) {
name "BatchJob itemsAndTaskletJob.taskletStep"
kind INTERNAL
childOf span(0)
attributes {}
}
span(6) {
name "BatchJob itemsAndTaskletJob.taskletStep.Tasklet"
kind INTERNAL
childOf span(5)
attributes {}
}
}
}
}
def "should trace flow job"() {
when:
runJob("flowJob")
then:
assertTraces(1) {
trace(0, 5) {
span(0) {
name "BatchJob flowJob"
kind INTERNAL
attributes {
"job.system" "spring_batch"
}
}
span(1) {
name "BatchJob flowJob.flowStep1"
kind INTERNAL
childOf span(0)
attributes {}
}
span(2) {
name "BatchJob flowJob.flowStep1.Tasklet"
kind INTERNAL
childOf span(1)
attributes {}
}
span(3) {
name "BatchJob flowJob.flowStep2"
kind INTERNAL
childOf span(0)
attributes {}
}
span(4) {
name "BatchJob flowJob.flowStep2.Tasklet"
kind INTERNAL
childOf span(3)
attributes {}
}
}
}
}
def "should trace split flow job"() {
when:
runJob("splitJob")
then:
assertTraces(1) {
trace(0, 5) {
span(0) {
name "BatchJob splitJob"
kind INTERNAL
attributes {
"job.system" "spring_batch"
}
}
span(1) {
name ~/BatchJob splitJob\.splitFlowStep[12]/
kind INTERNAL
childOf span(0)
attributes {}
}
span(2) {
name ~/BatchJob splitJob\.splitFlowStep[12]\.Tasklet/
kind INTERNAL
childOf span(1)
attributes {}
}
span(3) {
name ~/BatchJob splitJob\.splitFlowStep[12]/
kind INTERNAL
childOf span(0)
attributes {}
}
span(4) {
name ~/BatchJob splitJob\.splitFlowStep[12]\.Tasklet/
kind INTERNAL
childOf span(3)
attributes {}
}
}
}
}
def "should trace job with decision"() {
when:
runJob("decisionJob")
then:
assertTraces(1) {
trace(0, 5) {
span(0) {
name "BatchJob decisionJob"
kind INTERNAL
attributes {
"job.system" "spring_batch"
}
}
span(1) {
name "BatchJob decisionJob.decisionStepStart"
kind INTERNAL
childOf span(0)
attributes {}
}
span(2) {
name "BatchJob decisionJob.decisionStepStart.Tasklet"
kind INTERNAL
childOf span(1)
attributes {}
}
span(3) {
name "BatchJob decisionJob.decisionStepLeft"
kind INTERNAL
childOf span(0)
attributes {}
}
span(4) {
name "BatchJob decisionJob.decisionStepLeft.Tasklet"
kind INTERNAL
childOf span(3)
attributes {}
}
}
}
}
def "should trace partitioned job"() {
when:
runJob("partitionedJob")
then:
assertTraces(1) {
trace(0, 8) {
span(0) {
name "BatchJob partitionedJob"
kind INTERNAL
attributes {
"job.system" "spring_batch"
}
}
span(1) {
def stepName = hasPartitionManagerStep() ? "partitionManagerStep" : "partitionWorkerStep"
name "BatchJob partitionedJob.$stepName"
kind INTERNAL
childOf span(0)
attributes {}
}
span(2) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01]/
kind INTERNAL
childOf span(1)
attributes {}
}
span(3) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(2)
attributes {}
}
span(4) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(2)
attributes {}
}
span(5) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01]/
kind INTERNAL
childOf span(1)
attributes {}
}
span(6) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(5)
attributes {}
}
span(7) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(5)
attributes {}
}
}
}
}
protected boolean hasPartitionManagerStep() {
true
}
}
class JavaConfigBatchJobTest extends SpringBatchTest implements ApplicationConfigTrait {
@Override
ConfigurableApplicationContext createApplicationContext() {
new AnnotationConfigApplicationContext(SpringBatchApplication)
}
}
class XmlConfigBatchJobTest extends SpringBatchTest implements ApplicationConfigTrait {
@Override
ConfigurableApplicationContext createApplicationContext() {
new ClassPathXmlApplicationContext("spring-batch.xml")
}
}
class JsrConfigBatchJobTest extends SpringBatchTest implements JavaxBatchConfigTrait {
protected boolean hasPartitionManagerStep() {
false
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.basic;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.SpringBatchApplication;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
class JavaConfigBatchJobTest extends SpringBatchTest {
@RegisterExtension
static final ApplicationConfigRunner runner =
new ApplicationConfigRunner(
() -> new AnnotationConfigApplicationContext(SpringBatchApplication.class));
public JavaConfigBatchJobTest() {
super(runner);
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.basic;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JavaxBatchConfigRunner;
import org.junit.jupiter.api.extension.RegisterExtension;
class JsrConfigBatchJobTest extends SpringBatchTest {
@Override
protected boolean hasPartitionManagerStep() {
return false;
}
@RegisterExtension static final JavaxBatchConfigRunner runner = new JavaxBatchConfigRunner();
public JsrConfigBatchJobTest() {
super(runner);
}
}

View File

@ -0,0 +1,279 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.basic;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static java.util.Collections.singletonMap;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JobRunner;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.batch.core.JobParameter;
abstract class SpringBatchTest {
private final JobRunner runner;
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
public SpringBatchTest(JobRunner runner) {
this.runner = runner;
}
@Test
void shouldTraceTaskletJobStep() {
runner.runJob("taskletJob");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("BatchJob taskletJob")
.hasKind(SpanKind.INTERNAL)
.hasAttribute(AttributeKey.stringKey("job.system"), "spring_batch"),
span ->
span.hasName("BatchJob taskletJob.step")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("BatchJob taskletJob.step.Tasklet")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(1))));
}
@Test
void shouldHandleExceptionInTaskletJobStep() {
runner.runJob("taskletJob", singletonMap("fail", new JobParameter(1L)));
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("BatchJob taskletJob")
.hasKind(SpanKind.INTERNAL)
.hasAttribute(AttributeKey.stringKey("job.system"), "spring_batch"),
span ->
span.hasName("BatchJob taskletJob.step")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("BatchJob taskletJob.step.Tasklet")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))
.hasStatus(StatusData.error())
.hasTotalAttributeCount(0)
.hasException(new IllegalStateException("fail"))));
}
@Test
void shouldTraceChunkedItemsJob() {
runner.runJob("itemsAndTaskletJob");
testing.waitAndAssertTraces(
trace -> {
Consumer<SpanDataAssert> chunk =
span ->
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(1));
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("BatchJob itemsAndTaskletJob")
.hasKind(SpanKind.INTERNAL)
.hasAttribute(AttributeKey.stringKey("job.system"), "spring_batch"),
span ->
span.hasName("BatchJob itemsAndTaskletJob.itemStep")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
chunk,
chunk,
chunk,
span ->
span.hasName("BatchJob itemsAndTaskletJob.taskletStep")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("BatchJob itemsAndTaskletJob.taskletStep.Tasklet")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(5)));
});
}
@Test
void shouldTraceFlowJob() {
runner.runJob("flowJob");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("BatchJob flowJob")
.hasKind(SpanKind.INTERNAL)
.hasAttribute(AttributeKey.stringKey("job.system"), "spring_batch"),
span ->
span.hasName("BatchJob flowJob.flowStep1")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("BatchJob flowJob.flowStep1.Tasklet")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(1)),
span ->
span.hasName("BatchJob flowJob.flowStep2")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("BatchJob flowJob.flowStep2.Tasklet")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(3))));
}
@Test
void shouldTraceSplitFlowJob() {
runner.runJob("splitJob");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("BatchJob splitJob")
.hasKind(SpanKind.INTERNAL)
.hasAttribute(AttributeKey.stringKey("job.system"), "spring_batch"),
span ->
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches("BatchJob splitJob.splitFlowStep[12]"))
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches("BatchJob splitJob.splitFlowStep[12].Tasklet"))
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(1)),
span ->
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches("BatchJob splitJob.splitFlowStep[12]"))
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
span ->
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches("BatchJob splitJob.splitFlowStep[12].Tasklet"))
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(3))));
}
@Test
void shouldTraceJobWithDecision() {
runner.runJob("decisionJob");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("BatchJob decisionJob")
.hasKind(SpanKind.INTERNAL)
.hasAttribute(AttributeKey.stringKey("job.system"), "spring_batch"),
span ->
span.hasName("BatchJob decisionJob.decisionStepStart")
.hasKind(SpanKind.INTERNAL)
.hasTotalAttributeCount(0)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("BatchJob decisionJob.decisionStepStart.Tasklet")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1)),
span ->
span.hasName("BatchJob decisionJob.decisionStepLeft")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.hasName("BatchJob decisionJob.decisionStepLeft.Tasklet")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(3))));
}
@Test
void shouldTracePartitionedJob() {
runner.runJob("partitionedJob");
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("BatchJob partitionedJob")
.hasKind(SpanKind.INTERNAL)
.hasAttribute(AttributeKey.stringKey("job.system"), "spring_batch"),
span ->
span.hasName(
hasPartitionManagerStep()
? "BatchJob partitionedJob.partitionManagerStep"
: "BatchJob partitionedJob.partitionWorkerStep")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(0)),
span ->
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches(
"BatchJob partitionedJob.partitionWorkerStep:partition[01]"))
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1)),
span -> partitionChunk(trace, span, 2),
span -> partitionChunk(trace, span, 2),
span ->
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches(
"BatchJob partitionedJob.partitionWorkerStep:partition[01]"))
.hasParent(trace.getSpan(1)),
span -> partitionChunk(trace, span, 5),
span -> partitionChunk(trace, span, 5));
});
}
private static void partitionChunk(TraceAssert trace, SpanDataAssert span, int index) {
span.satisfies(
spanData ->
assertThat(spanData.getName())
.matches("BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk"))
.hasParent(trace.getSpan(index));
}
protected boolean hasPartitionManagerStep() {
return true;
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.basic;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.context.support.ClassPathXmlApplicationContext;
class XmlConfigBatchJobTest extends SpringBatchTest {
@RegisterExtension
static final ApplicationConfigRunner runner =
new ApplicationConfigRunner(() -> new ClassPathXmlApplicationContext("spring-batch.xml"));
public XmlConfigBatchJobTest() {
super(runner);
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import io.opentelemetry.api.trace.Span;
import javax.batch.api.chunk.listener.ChunkListener;
public class CustomEventChunkListener implements ChunkListener {
@Override
public void beforeChunk() {
Span.current().addEvent("chunk.before");
}
@Override
public void onError(Exception e) {
Span.current().addEvent("chunk.error");
}
@Override
public void afterChunk() {
Span.current().addEvent("chunk.after");
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import io.opentelemetry.api.trace.Span;
import javax.batch.api.chunk.listener.ItemProcessListener;
class CustomEventItemProcessListener implements ItemProcessListener {
@Override
public void beforeProcess(Object o) {
Span.current().addEvent("item.process.before");
}
@Override
public void afterProcess(Object o, Object o1) {
Span.current().addEvent("item.process.after");
}
@Override
public void onProcessError(Object o, Exception e) {
Span.current().addEvent("item.process.error");
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import io.opentelemetry.api.trace.Span;
import javax.batch.api.chunk.listener.ItemReadListener;
class CustomEventItemReadListener implements ItemReadListener {
@Override
public void beforeRead() {
Span.current().addEvent("item.read.before");
}
@Override
public void afterRead(Object o) {
Span.current().addEvent("item.read.after");
}
@Override
public void onReadError(Exception e) {
Span.current().addEvent("item.read.error");
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import io.opentelemetry.api.trace.Span;
import java.util.List;
import javax.batch.api.chunk.listener.ItemWriteListener;
class CustomEventItemWriteListener implements ItemWriteListener {
@Override
public void beforeWrite(List<Object> list) {
Span.current().addEvent("item.write.before");
}
@Override
public void afterWrite(List<Object> list) {
Span.current().addEvent("item.write.after");
}
@Override
public void onWriteError(List<Object> list, Exception e) {
Span.current().addEvent("item.write.error");
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import io.opentelemetry.api.trace.Span;
import javax.batch.api.listener.JobListener;
class CustomEventJobListener implements JobListener {
@Override
public void beforeJob() {
Span.current().addEvent("job.before");
}
@Override
public void afterJob() {
Span.current().addEvent("job.after");
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import io.opentelemetry.api.trace.Span;
import javax.batch.api.listener.StepListener;
class CustomEventStepListener implements StepListener {
@Override
public void beforeStep() {
Span.current().addEvent("step.before");
}
@Override
public void afterStep() {
Span.current().addEvent("step.after");
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;
import javax.batch.api.chunk.ItemReader;
class SingleItemReader implements ItemReader {
@Override
public void open(Serializable serializable) {}
@Override
public void close() {}
@Override
public Object readItem() {
return item.getAndSet(null);
}
@Override
public Serializable checkpointInfo() {
return null;
}
public final AtomicReference<String> getItem() {
return item;
}
private final AtomicReference<String> item = new AtomicReference<String>("42");
}

View File

@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import javax.batch.api.BatchProperty;
import javax.batch.api.Batchlet;
import javax.inject.Inject;
class TestBatchlet implements Batchlet {
@Inject
@BatchProperty(name = "fail")
private String fail;
@Override
public String process() {
if (fail != null && Integer.valueOf(fail) == 1) {
throw new IllegalStateException("fail");
}
return "FINISHED";
}
@Override
public void stop() {}
public String getFail() {
return fail;
}
public void setFail(String fail) {
this.fail = fail;
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import javax.batch.api.Decider;
import javax.batch.runtime.StepExecution;
class TestDecider implements Decider {
@Override
public String decide(StepExecution[] stepExecutions) {
return "LEFT";
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import javax.batch.api.chunk.ItemProcessor;
import org.codehaus.groovy.runtime.DefaultGroovyMethods;
class TestItemProcessor implements ItemProcessor {
@Override
public Object processItem(Object item) {
return Integer.parseInt(DefaultGroovyMethods.asType(item, String.class));
}
}

View File

@ -0,0 +1,44 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.batch.api.chunk.ItemReader;
class TestItemReader implements ItemReader {
private final List<String> items =
IntStream.range(0, 13).mapToObj(String::valueOf).collect(Collectors.toList());
private Iterator<String> itemsIt;
@Override
public void open(Serializable serializable) {
itemsIt = items.iterator();
}
@Override
public void close() {
itemsIt = null;
}
@Override
public Object readItem() {
if (itemsIt == null) {
return null;
}
return itemsIt.hasNext() ? itemsIt.next() : null;
}
@Override
public Serializable checkpointInfo() {
return null;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.batch.api.chunk.ItemWriter;
import org.codehaus.groovy.runtime.DefaultGroovyMethods;
class TestItemWriter implements ItemWriter {
private final List<Integer> items = new ArrayList<>();
@Override
public void open(Serializable checkpoint) {}
@Override
public void close() {}
@Override
public void writeItems(List<Object> items) {
for (Object item : items) {
this.items.add(DefaultGroovyMethods.asType(item, Integer.class));
}
}
@Override
public Serializable checkpointInfo() {
return null;
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr;
import java.io.Serializable;
import javax.batch.api.BatchProperty;
import javax.batch.api.chunk.ItemReader;
import javax.inject.Inject;
class TestPartitionedItemReader implements ItemReader {
@Inject
@BatchProperty(name = "start")
private String startStr;
@Inject
@BatchProperty(name = "end")
private String endStr;
private int start;
private int end;
@Override
public void open(Serializable checkpoint) {
start = Integer.parseInt(startStr);
end = Integer.parseInt(endStr);
}
@Override
public void close() {}
@Override
public Object readItem() {
if (start >= end) {
return null;
}
return String.valueOf(start++);
}
@Override
public Serializable checkpointInfo() {
return null;
}
public String getStartStr() {
return startStr;
}
public void setStartStr(String startStr) {
this.startStr = startStr;
}
public String getEndStr() {
return endStr;
}
public void setEndStr(String endStr) {
this.endStr = endStr;
}
public int getStart() {
return start;
}
public void setStart(int start) {
this.start = start;
}
public int getEnd() {
return end;
}
public void setEnd(int end) {
this.end = end;
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;
import org.junit.jupiter.api.extension.AfterEachCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameter;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ConfigurableApplicationContext;
public class ApplicationConfigRunner implements BeforeEachCallback, AfterEachCallback, JobRunner {
private final Supplier<ConfigurableApplicationContext> applicationContextFactory;
private final BiConsumer<String, Job> jobPostProcessor;
static JobLauncher jobLauncher;
private ConfigurableApplicationContext applicationContext;
public ApplicationConfigRunner(
Supplier<ConfigurableApplicationContext> applicationContextFactory) {
this(applicationContextFactory, (jobName, job) -> {});
}
public ApplicationConfigRunner(
Supplier<ConfigurableApplicationContext> applicationContextFactory,
BiConsumer<String, Job> jobPostProcessor) {
this.applicationContextFactory = applicationContextFactory;
this.jobPostProcessor = jobPostProcessor;
}
@Override
public void beforeEach(ExtensionContext context) {
applicationContext = applicationContextFactory.get();
applicationContext.start();
jobLauncher = applicationContext.getBean(JobLauncher.class);
}
@Override
public void afterEach(ExtensionContext context) {
applicationContext.stop();
applicationContext.close();
}
@Override
public void runJob(String jobName, Map<String, JobParameter> params) {
Job job = applicationContext.getBean(jobName, Job.class);
jobPostProcessor.accept(jobName, job);
try {
jobLauncher.run(job, new JobParameters(params));
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import javax.batch.operations.JobOperator;
import javax.batch.runtime.BatchRuntime;
import org.junit.jupiter.api.extension.BeforeEachCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.springframework.batch.core.JobParameter;
public class JavaxBatchConfigRunner implements BeforeEachCallback, JobRunner {
static JobOperator jobOperator;
static AtomicInteger counter = new AtomicInteger();
@Override
public void beforeEach(ExtensionContext context) {
jobOperator = BatchRuntime.getJobOperator();
}
@Override
public void runJob(String jobName, Map<String, JobParameter> params) {
Properties jobParams = new Properties();
params.forEach((k, v) -> jobParams.setProperty(k, v.getValue().toString()));
// each job instance with the same name needs to be unique
jobParams.setProperty("uniqueJobIdCounter", String.valueOf(counter.getAndIncrement()));
jobOperator.start(jobName, jobParams);
}
}

View File

@ -0,0 +1,19 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner;
import static java.util.Collections.emptyMap;
import java.util.Map;
import org.springframework.batch.core.JobParameter;
public interface JobRunner {
void runJob(String jobName, Map<String, JobParameter> params);
default void runJob(String jobName) {
runJob(jobName, emptyMap());
}
}

View File

@ -0,0 +1,274 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventChunkListener;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventItemProcessListener;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventItemReadListener;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventItemWriteListener;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventJobListener;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventStepListener;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.SingleItemReader;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestDecider;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestItemProcessor;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestItemReader;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestItemWriter;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestPartitionedItemReader;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestPartitioner;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestSyncItemReader;
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestTasklet;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.core.job.flow.support.SimpleFlow;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableBatchProcessing
public class SpringBatchApplication {
@Autowired JobBuilderFactory jobs;
@Autowired StepBuilderFactory steps;
@Autowired JobRepository jobRepository;
@Bean
AsyncTaskExecutor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(10);
return executor;
}
@Bean
JobLauncher jobLauncher() {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.setTaskExecutor(asyncTaskExecutor());
return launcher;
}
// common
@Bean
ItemReader<String> itemReader() {
return new TestItemReader();
}
@Bean
ItemProcessor<String, Integer> itemProcessor() {
return new TestItemProcessor();
}
@Bean
ItemWriter<Integer> itemWriter() {
return new TestItemWriter();
}
// simple tasklet job
@Bean
Job taskletJob() {
return jobs.get("taskletJob").start(step()).build();
}
@Bean
Step step() {
return steps.get("step").tasklet(new TestTasklet()).build();
}
// 2-step tasklet + chunked items job
@Bean
Job itemsAndTaskletJob() {
return jobs.get("itemsAndTaskletJob").start(itemStep()).next(taskletStep()).build();
}
@Bean
Step taskletStep() {
return steps.get("taskletStep").tasklet(new TestTasklet()).build();
}
@Bean
Step itemStep() {
return steps
.get("itemStep")
.<String, Integer>chunk(5)
.reader(itemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
// parallel items job
@Bean
Job parallelItemsJob() {
return jobs.get("parallelItemsJob").start(parallelItemsStep()).build();
}
@Bean
Step parallelItemsStep() {
return steps
.get("parallelItemsStep")
.<String, Integer>chunk(2)
.reader(new TestSyncItemReader(5))
.processor(itemProcessor())
.writer(itemWriter())
.taskExecutor(asyncTaskExecutor())
.throttleLimit(2)
.build();
}
// job using a flow
@Bean
Job flowJob() {
return jobs.get("flowJob").start(flow()).build().build();
}
@Bean
Flow flow() {
return new FlowBuilder<SimpleFlow>("flow").start(flowStep1()).on("*").to(flowStep2()).build();
}
@Bean
Step flowStep1() {
return steps.get("flowStep1").tasklet(new TestTasklet()).build();
}
@Bean
Step flowStep2() {
return steps.get("flowStep2").tasklet(new TestTasklet()).build();
}
// split job
@Bean
Job splitJob() {
return jobs.get("splitJob")
.start(splitFlowStep1())
.split(asyncTaskExecutor())
.add(splitFlow2())
.build()
.build();
}
@Bean
Step splitFlowStep1() {
return steps.get("splitFlowStep1").tasklet(new TestTasklet()).build();
}
@Bean
Flow splitFlow2() {
return new FlowBuilder<SimpleFlow>("splitFlow2").start(splitFlowStep2()).build();
}
@Bean
Step splitFlowStep2() {
return steps.get("splitFlowStep2").tasklet(new TestTasklet()).build();
}
// job with decisions
@Bean
Job decisionJob() {
return jobs.get("decisionJob")
.start(decisionStepStart())
.next(new TestDecider())
.on("LEFT")
.to(decisionStepLeft())
.on("RIGHT")
.to(decisionStepRight())
.end()
.build();
}
@Bean
Step decisionStepStart() {
return steps.get("decisionStepStart").tasklet(new TestTasklet()).build();
}
@Bean
Step decisionStepLeft() {
return steps.get("decisionStepLeft").tasklet(new TestTasklet()).build();
}
@Bean
Step decisionStepRight() {
return steps.get("decisionStepRight").tasklet(new TestTasklet()).build();
}
// partitioned job
@Bean
Job partitionedJob() {
return jobs.get("partitionedJob").start(partitionManagerStep()).build();
}
@Bean
Step partitionManagerStep() {
return steps
.get("partitionManagerStep")
.partitioner("partitionWorkerStep", partitioner())
.step(partitionWorkerStep())
.gridSize(2)
.taskExecutor(asyncTaskExecutor())
.build();
}
@Bean
Partitioner partitioner() {
return new TestPartitioner();
}
@Bean
Step partitionWorkerStep() {
return steps
.get("partitionWorkerStep")
.<String, Integer>chunk(5)
.reader(partitionedItemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build();
}
@Bean
ItemReader<String> partitionedItemReader() {
return new TestPartitionedItemReader();
}
// custom span events items job
@Bean
Job customSpanEventsItemsJob() {
return jobs.get("customSpanEventsItemsJob")
.start(customSpanEventsItemStep())
.listener(new CustomEventJobListener())
.build();
}
@Bean
Step customSpanEventsItemStep() {
return steps
.get("customSpanEventsItemStep")
.<String, Integer>chunk(5)
.reader(new SingleItemReader())
.processor(itemProcessor())
.writer(itemWriter())
.listener(new CustomEventStepListener())
.listener(new CustomEventChunkListener())
.listener(new CustomEventItemReadListener())
.listener(new CustomEventItemProcessListener())
.listener(new CustomEventItemWriteListener())
.build();
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import io.opentelemetry.api.trace.Span;
import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.scope.context.ChunkContext;
public class CustomEventChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
Span.current().addEvent("chunk.before");
}
@Override
public void afterChunk(ChunkContext context) {
Span.current().addEvent("chunk.after");
}
@Override
public void afterChunkError(ChunkContext context) {
Span.current().addEvent("chunk.error");
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import io.opentelemetry.api.trace.Span;
import org.springframework.batch.core.ItemProcessListener;
public class CustomEventItemProcessListener implements ItemProcessListener<String, Integer> {
@Override
public void beforeProcess(String item) {
Span.current().addEvent("item.process.before");
}
@Override
public void afterProcess(String item, Integer result) {
Span.current().addEvent("item.process.after");
}
@Override
public void onProcessError(String item, Exception e) {
Span.current().addEvent("item.process.error");
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import io.opentelemetry.api.trace.Span;
import org.springframework.batch.core.ItemReadListener;
public class CustomEventItemReadListener implements ItemReadListener<String> {
@Override
public void beforeRead() {
Span.current().addEvent("item.read.before");
}
@Override
public void afterRead(String item) {
Span.current().addEvent("item.read.after");
}
@Override
public void onReadError(Exception ex) {
Span.current().addEvent("item.read.error");
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import io.opentelemetry.api.trace.Span;
import java.util.List;
import org.springframework.batch.core.ItemWriteListener;
public class CustomEventItemWriteListener implements ItemWriteListener<Integer> {
@Override
public void beforeWrite(List<? extends Integer> items) {
Span.current().addEvent("item.write.before");
}
@Override
public void afterWrite(List<? extends Integer> items) {
Span.current().addEvent("item.write.after");
}
@Override
public void onWriteError(Exception exception, List<? extends Integer> items) {
Span.current().addEvent("item.write.error");
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import io.opentelemetry.api.trace.Span;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
public class CustomEventJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
Span.current().addEvent("job.before");
}
@Override
public void afterJob(JobExecution jobExecution) {
Span.current().addEvent("job.after");
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import io.opentelemetry.api.trace.Span;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
public class CustomEventStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
Span.current().addEvent("step.before");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
Span.current().addEvent("step.after");
return null;
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import java.util.concurrent.atomic.AtomicReference;
import org.springframework.batch.item.ItemReader;
public class SingleItemReader implements ItemReader<String> {
final AtomicReference<String> item = new AtomicReference<>("42");
@Override
public String read() {
return item.getAndSet(null);
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.job.flow.FlowExecutionStatus;
import org.springframework.batch.core.job.flow.JobExecutionDecider;
public class TestDecider implements JobExecutionDecider {
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
return new FlowExecutionStatus("LEFT");
}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import org.springframework.batch.item.ItemProcessor;
public class TestItemProcessor implements ItemProcessor<String, Integer> {
@Override
public Integer process(String item) {
return Integer.parseInt(item);
}
}

View File

@ -0,0 +1,16 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.springframework.batch.item.support.ListItemReader;
public class TestItemReader extends ListItemReader<String> {
public TestItemReader() {
super(IntStream.range(0, 13).mapToObj(String::valueOf).collect(Collectors.toList()));
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import static java.util.Collections.synchronizedList;
import java.util.ArrayList;
import java.util.List;
import org.springframework.batch.item.ItemWriter;
public class TestItemWriter implements ItemWriter<Integer> {
final List<Integer> items = synchronizedList(new ArrayList<>());
@Override
public void write(List<? extends Integer> items) {
this.items.addAll(items);
}
}

View File

@ -0,0 +1,37 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
public class TestPartitionedItemReader implements ItemReader<String>, ItemStream {
ThreadLocal<Integer> start = new ThreadLocal<>();
ThreadLocal<Integer> end = new ThreadLocal<>();
@Override
public String read() {
if (start.get() >= end.get()) {
return null;
}
Integer value = start.get();
start.set(value + 1);
return String.valueOf(value);
}
@Override
public void open(ExecutionContext executionContext) {
start.set(executionContext.getInt("start"));
end.set(executionContext.getInt("end"));
}
@Override
public void update(ExecutionContext executionContext) {}
@Override
public void close() {}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
public class TestPartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<>();
map.put("partition0", new ExecutionContext(ImmutableMap.of("start", 0, "end", 8)));
map.put("partition1", new ExecutionContext(ImmutableMap.of("start", 8, "end", 13)));
return map;
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import java.util.Iterator;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.springframework.batch.item.ItemReader;
public class TestSyncItemReader implements ItemReader<String> {
private final Iterator<String> items;
public TestSyncItemReader(int max) {
items =
IntStream.range(0, max).mapToObj(String::valueOf).collect(Collectors.toList()).iterator();
}
@Override
public synchronized String read() {
if (items.hasNext()) {
return items.next();
}
return null;
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch;
import java.util.Objects;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
public class TestTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
if (Objects.equals(
chunkContext.getStepContext().getStepExecution().getJobParameters().getLong("fail"), 1L)) {
throw new IllegalStateException("fail");
}
return RepeatStatus.FINISHED;
}
}

View File

@ -1,20 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="customSpanEventsItemsJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<job id="customSpanEventsItemsJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<listeners>
<listener ref="jsr.CustomEventJobListener"/>
<listener ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.CustomEventJobListener"/>
</listeners>
<step id="customSpanEventsItemStep">
<listeners>
<listener ref="jsr.CustomEventStepListener"/>
<listener ref="jsr.CustomEventChunkListener"/>
<listener ref="jsr.CustomEventItemReadListener"/>
<listener ref="jsr.CustomEventItemProcessListener"/>
<listener ref="jsr.CustomEventItemWriteListener"/>
<listener ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.CustomEventStepListener"/>
<listener ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.CustomEventChunkListener"/>
<listener ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.CustomEventItemReadListener"/>
<listener ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.CustomEventItemProcessListener"/>
<listener ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.CustomEventItemWriteListener"/>
</listeners>
<chunk item-count="5">
<reader ref="jsr.SingleItemReader"/>
<processor ref="jsr.TestItemProcessor"/>
<writer ref="jsr.TestItemWriter"/>
<reader ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.SingleItemReader"/>
<processor ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestItemProcessor"/>
<writer ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestItemWriter"/>
</chunk>
</step>
</job>

View File

@ -1,16 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="decisionJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<job id="decisionJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="decisionStepStart" next="decision">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
<decision id="decision" ref="jsr.TestDecider">
<decision id="decision" ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestDecider">
<next on="LEFT" to="decisionStepLeft"/>
<next on="LEFT" to="decisionStepRight"/>
</decision>
<step id="decisionStepLeft">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
<step id="decisionStepRight">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
</job>
</job>

View File

@ -1,11 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="flowJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<job id="flowJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<flow id="flow">
<step id="flowStep1" next="flowStep2">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
<step id="flowStep2">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
</flow>
</job>

View File

@ -1,14 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="itemsAndTaskletJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<job id="itemsAndTaskletJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="itemStep">
<chunk item-count="5">
<reader ref="jsr.TestItemReader"/>
<processor ref="jsr.TestItemProcessor"/>
<writer ref="jsr.TestItemWriter"/>
<reader ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestItemReader"/>
<processor ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestItemProcessor"/>
<writer ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestItemWriter"/>
</chunk>
<next on="*" to="taskletStep"/>
</step>
<step id="taskletStep">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
</job>
</job>

View File

@ -1,15 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="partitionedJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<job id="partitionedJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="partitionWorkerStep">
<chunk item-count="5">
<reader ref="jsr.TestPartitionedItemReader">
<reader ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestPartitionedItemReader">
<properties>
<property name="start" value="#{partitionPlan['start']}"/>
<property name="end" value="#{partitionPlan['end']}"/>
</properties>
</reader>
<processor ref="jsr.TestItemProcessor"/>
<writer ref="jsr.TestItemWriter"/>
<processor ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestItemProcessor"/>
<writer ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestItemWriter"/>
</chunk>
<partition>
<plan partitions="2" threads="2">

View File

@ -1,14 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="splitJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<job id="splitJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<split id="split">
<flow id="splitFlow1">
<step id="splitFlowStep1">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
</flow>
<flow id="splitFlow2">
<step id="splitFlowStep2">
<batchlet ref="jsr.TestBatchlet"/>
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet"/>
</step>
</flow>
</split>

View File

@ -1,10 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="taskletJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<job id="taskletJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/jobXML_1_0.xsd"
version="1.0">
<step id="step">
<batchlet ref="jsr.TestBatchlet">
<batchlet ref="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.jsr.TestBatchlet">
<properties>
<property name="fail" value="#{jobParameters['fail']}"/>
</properties>
</batchlet>
</step>
</job>
</job>

View File

@ -90,31 +90,31 @@
writer="itemWriter"/>
</b:tasklet>
</b:step>
<bean id="testPartitioner" class="springbatch.TestPartitioner"/>
<bean id="testPartitionedItemReader" class="springbatch.TestPartitionedItemReader"/>
<bean id="testPartitioner" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestPartitioner"/>
<bean id="testPartitionedItemReader" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestPartitionedItemReader"/>
<b:job id="customSpanEventsItemsJob">
<b:step id="customSpanEventsItemStep">
<b:tasklet>
<b:chunk commit-interval="5" processor="itemProcessor" writer="itemWriter">
<b:reader>
<bean class="springbatch.SingleItemReader"/>
<bean class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.SingleItemReader"/>
</b:reader>
<b:listeners>
<b:listener>
<bean class="springbatch.CustomEventStepListener"/>
<bean class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventStepListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventChunkListener"/>
<bean class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventChunkListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventItemReadListener"/>
<bean class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventItemReadListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventItemProcessListener"/>
<bean class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventItemProcessListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventItemWriteListener"/>
<bean class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventItemWriteListener"/>
</b:listener>
</b:listeners>
</b:chunk>
@ -122,17 +122,17 @@
</b:step>
<b:listeners>
<b:listener>
<bean class="springbatch.CustomEventJobListener"/>
<bean class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.CustomEventJobListener"/>
</b:listener>
</b:listeners>
</b:job>
<bean id="itemReader" class="springbatch.TestItemReader"/>
<bean id="syncItemReader" class="springbatch.TestSyncItemReader">
<bean id="itemReader" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestItemReader"/>
<bean id="syncItemReader" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestSyncItemReader">
<constructor-arg name="max" value="5"/>
</bean>
<bean id="itemProcessor" class="springbatch.TestItemProcessor"/>
<bean id="itemWriter" class="springbatch.TestItemWriter"/>
<bean id="testTasklet" class="springbatch.TestTasklet"/>
<bean id="testDecider" class="springbatch.TestDecider"/>
</beans>
<bean id="itemProcessor" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestItemProcessor"/>
<bean id="itemWriter" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestItemWriter"/>
<bean id="testTasklet" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestTasklet"/>
<bean id="testDecider" class="io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.springbatch.TestDecider"/>
</beans>