diff --git a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/ItemLevelSpanTest.groovy b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/ItemLevelSpanTest.groovy index 1fc40ec637..4907867319 100644 --- a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/ItemLevelSpanTest.groovy +++ b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/ItemLevelSpanTest.groovy @@ -127,6 +127,129 @@ abstract class ItemLevelSpanTest extends AgentTestRunner { } } } + + def "should trace all item operations on a parallel items job"() { + when: + runJob("parallelItemsJob") + + then: + assertTraces(1) { + trace(0, 23) { + span(0) { + name "BatchJob parallelItemsJob" + kind INTERNAL + } + span(1) { + name "BatchJob parallelItemsJob.parallelItemsStep" + kind INTERNAL + childOf span(0) + } + + // chunk 1, first two items; thread 1 + span(2) { + name "BatchJob parallelItemsJob.parallelItemsStep.Chunk" + kind INTERNAL + childOf span(1) + } + [3, 4].forEach { + span(it) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead" + kind INTERNAL + childOf span(2) + } + } + [5, 6].forEach { + span(it) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess" + kind INTERNAL + childOf span(2) + } + } + span(7) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite" + kind INTERNAL + childOf span(2) + } + + // chunk 2, items 3 & 4; thread 2 + span(8) { + name "BatchJob parallelItemsJob.parallelItemsStep.Chunk" + kind INTERNAL + childOf span(1) + } + [9, 10].forEach { + span(it) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead" + kind INTERNAL + childOf span(8) + } + } + [11, 12].forEach { + span(it) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess" + kind INTERNAL + childOf span(8) + } + } + span(13) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite" + kind INTERNAL + childOf span(8) + } + + // chunk 3, 5th item; thread 1 + span(14) { + name "BatchJob parallelItemsJob.parallelItemsStep.Chunk" + kind INTERNAL + childOf span(1) + } + // +1 for last read returning end of stream marker + [15, 16].forEach { + span(it) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead" + kind INTERNAL + childOf span(14) + } + } + span(17) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess" + kind INTERNAL + childOf span(14) + } + span(18) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite" + kind INTERNAL + childOf span(14) + } + + // empty chunk on thread 2, end processing + span(19) { + name "BatchJob parallelItemsJob.parallelItemsStep.Chunk" + kind INTERNAL + childOf span(1) + } + // end of stream marker + span(20) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead" + kind INTERNAL + childOf span(19) + } + + // empty chunk on thread 1, end processing + span(21) { + name "BatchJob parallelItemsJob.parallelItemsStep.Chunk" + kind INTERNAL + childOf span(1) + } + // end of stream marker + span(22) { + name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead" + kind INTERNAL + childOf span(21) + } + } + } + } } class JavaConfigItemLevelSpanTest extends ItemLevelSpanTest implements ApplicationConfigTrait { diff --git a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/SpringBatchApplication.groovy b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/SpringBatchApplication.groovy index 3f402055e1..8f89dbb6d3 100644 --- a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/SpringBatchApplication.groovy +++ b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/SpringBatchApplication.groovy @@ -11,7 +11,10 @@ import org.springframework.batch.core.configuration.annotation.StepBuilderFactor import org.springframework.batch.core.job.builder.FlowBuilder import org.springframework.batch.core.job.flow.Flow import org.springframework.batch.core.job.flow.support.SimpleFlow +import org.springframework.batch.core.launch.JobLauncher +import org.springframework.batch.core.launch.support.SimpleJobLauncher import org.springframework.batch.core.partition.support.Partitioner +import org.springframework.batch.core.repository.JobRepository import org.springframework.batch.item.ItemProcessor import org.springframework.batch.item.ItemReader import org.springframework.batch.item.ItemWriter @@ -26,6 +29,7 @@ import springbatch.TestItemReader import springbatch.TestItemWriter import springbatch.TestPartitionedItemReader import springbatch.TestPartitioner +import springbatch.TestSyncItemReader import springbatch.TestTasklet @Configuration @@ -36,6 +40,24 @@ class SpringBatchApplication { JobBuilderFactory jobs @Autowired StepBuilderFactory steps + @Autowired + JobRepository jobRepository + + @Bean + AsyncTaskExecutor asyncTaskExecutor() { + def executor = new ThreadPoolTaskExecutor() + executor.corePoolSize = 10 + executor.maxPoolSize = 10 + executor + } + + @Bean + JobLauncher jobLauncher() { + def launcher = new SimpleJobLauncher() + launcher.jobRepository = jobRepository + launcher.taskExecutor = asyncTaskExecutor() + launcher + } // common @Bean @@ -53,11 +75,6 @@ class SpringBatchApplication { new TestItemWriter() } - @Bean - AsyncTaskExecutor asyncTaskExecutor() { - new ThreadPoolTaskExecutor() - } - // simple tasklet job @Bean Job taskletJob() { @@ -99,6 +116,26 @@ class SpringBatchApplication { .build() } + // parallel items job + @Bean + Job parallelItemsJob() { + jobs.get("parallelItemsJob") + .start(parallelItemsStep()) + .build() + } + + @Bean + Step parallelItemsStep() { + steps.get("parallelItemsStep") + .chunk(2) + .reader(new TestSyncItemReader(5)) + .processor(itemProcessor()) + .writer(itemWriter()) + .taskExecutor(asyncTaskExecutor()) + .throttleLimit(2) + .build() + } + // job using a flow @Bean Job flowJob() { diff --git a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/springbatch/TestPartitionedItemReader.groovy b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/springbatch/TestPartitionedItemReader.groovy index af1cddf18d..bce1797cba 100644 --- a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/springbatch/TestPartitionedItemReader.groovy +++ b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/springbatch/TestPartitionedItemReader.groovy @@ -14,21 +14,23 @@ import org.springframework.batch.item.ParseException import org.springframework.batch.item.UnexpectedInputException class TestPartitionedItemReader implements ItemReader, ItemStream { - int start - int end + ThreadLocal start = new ThreadLocal<>() + ThreadLocal end = new ThreadLocal<>() @Override String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { - if (start >= end) { + if (start.get() >= end.get()) { return null } - return String.valueOf(start++) + def value = start.get() + start.set(value + 1) + return String.valueOf(value) } @Override void open(ExecutionContext executionContext) throws ItemStreamException { - start = executionContext.getInt("start") - end = executionContext.getInt("end") + start.set(executionContext.getInt("start")) + end.set(executionContext.getInt("end")) } @Override diff --git a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/springbatch/TestSyncItemReader.groovy b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/springbatch/TestSyncItemReader.groovy new file mode 100644 index 0000000000..66e997d1b6 --- /dev/null +++ b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/groovy/springbatch/TestSyncItemReader.groovy @@ -0,0 +1,25 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package springbatch + +import java.util.stream.Collectors +import java.util.stream.IntStream +import org.springframework.batch.item.ItemReader + +class TestSyncItemReader implements ItemReader { + private final Iterator items + + TestSyncItemReader(int max) { + items = IntStream.range(0, max).mapToObj(String.&valueOf).collect(Collectors.toList()).iterator() + } + + synchronized String read() { + if (items.hasNext()) { + return items.next() + } + return null + } +} diff --git a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/META-INF/batch-jobs/partitionedJob.xml b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/META-INF/batch-jobs/partitionedJob.xml index 9fd8d18039..330f99c2c6 100644 --- a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/META-INF/batch-jobs/partitionedJob.xml +++ b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/META-INF/batch-jobs/partitionedJob.xml @@ -12,7 +12,7 @@ - + diff --git a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/baseContext.xml b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/baseContext.xml index eb1ce9bdc6..c6396ad058 100644 --- a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/baseContext.xml +++ b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/baseContext.xml @@ -3,12 +3,16 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> - + + + + + diff --git a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/spring-batch.xml b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/spring-batch.xml index 0a182a7aaa..e8f24b0d65 100644 --- a/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/spring-batch.xml +++ b/instrumentation/spring/spring-batch-3.0/javaagent/src/test/resources/spring-batch.xml @@ -25,6 +25,15 @@ + + + + + + + + @@ -70,7 +79,9 @@ - + + + @@ -83,6 +94,9 @@ + + +