Instrument spring-batch: tests for parallel steps (#2121)

This commit is contained in:
Mateusz Rzeszutek 2021-01-28 08:09:42 +01:00 committed by GitHub
parent 6b13bcca63
commit b13ef2d809
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 219 additions and 14 deletions

View File

@ -127,6 +127,129 @@ abstract class ItemLevelSpanTest extends AgentTestRunner {
}
}
}
def "should trace all item operations on a parallel items job"() {
when:
runJob("parallelItemsJob")
then:
assertTraces(1) {
trace(0, 23) {
span(0) {
name "BatchJob parallelItemsJob"
kind INTERNAL
}
span(1) {
name "BatchJob parallelItemsJob.parallelItemsStep"
kind INTERNAL
childOf span(0)
}
// chunk 1, first two items; thread 1
span(2) {
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
kind INTERNAL
childOf span(1)
}
[3, 4].forEach {
span(it) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
kind INTERNAL
childOf span(2)
}
}
[5, 6].forEach {
span(it) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess"
kind INTERNAL
childOf span(2)
}
}
span(7) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite"
kind INTERNAL
childOf span(2)
}
// chunk 2, items 3 & 4; thread 2
span(8) {
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
kind INTERNAL
childOf span(1)
}
[9, 10].forEach {
span(it) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
kind INTERNAL
childOf span(8)
}
}
[11, 12].forEach {
span(it) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess"
kind INTERNAL
childOf span(8)
}
}
span(13) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite"
kind INTERNAL
childOf span(8)
}
// chunk 3, 5th item; thread 1
span(14) {
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
kind INTERNAL
childOf span(1)
}
// +1 for last read returning end of stream marker
[15, 16].forEach {
span(it) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
kind INTERNAL
childOf span(14)
}
}
span(17) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess"
kind INTERNAL
childOf span(14)
}
span(18) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite"
kind INTERNAL
childOf span(14)
}
// empty chunk on thread 2, end processing
span(19) {
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
kind INTERNAL
childOf span(1)
}
// end of stream marker
span(20) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
kind INTERNAL
childOf span(19)
}
// empty chunk on thread 1, end processing
span(21) {
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
kind INTERNAL
childOf span(1)
}
// end of stream marker
span(22) {
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
kind INTERNAL
childOf span(21)
}
}
}
}
}
class JavaConfigItemLevelSpanTest extends ItemLevelSpanTest implements ApplicationConfigTrait {

View File

@ -11,7 +11,10 @@ import org.springframework.batch.core.configuration.annotation.StepBuilderFactor
import org.springframework.batch.core.job.builder.FlowBuilder
import org.springframework.batch.core.job.flow.Flow
import org.springframework.batch.core.job.flow.support.SimpleFlow
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.launch.support.SimpleJobLauncher
import org.springframework.batch.core.partition.support.Partitioner
import org.springframework.batch.core.repository.JobRepository
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter
@ -26,6 +29,7 @@ import springbatch.TestItemReader
import springbatch.TestItemWriter
import springbatch.TestPartitionedItemReader
import springbatch.TestPartitioner
import springbatch.TestSyncItemReader
import springbatch.TestTasklet
@Configuration
@ -36,6 +40,24 @@ class SpringBatchApplication {
JobBuilderFactory jobs
@Autowired
StepBuilderFactory steps
@Autowired
JobRepository jobRepository
@Bean
AsyncTaskExecutor asyncTaskExecutor() {
def executor = new ThreadPoolTaskExecutor()
executor.corePoolSize = 10
executor.maxPoolSize = 10
executor
}
@Bean
JobLauncher jobLauncher() {
def launcher = new SimpleJobLauncher()
launcher.jobRepository = jobRepository
launcher.taskExecutor = asyncTaskExecutor()
launcher
}
// common
@Bean
@ -53,11 +75,6 @@ class SpringBatchApplication {
new TestItemWriter()
}
@Bean
AsyncTaskExecutor asyncTaskExecutor() {
new ThreadPoolTaskExecutor()
}
// simple tasklet job
@Bean
Job taskletJob() {
@ -99,6 +116,26 @@ class SpringBatchApplication {
.build()
}
// parallel items job
@Bean
Job parallelItemsJob() {
jobs.get("parallelItemsJob")
.start(parallelItemsStep())
.build()
}
@Bean
Step parallelItemsStep() {
steps.get("parallelItemsStep")
.chunk(2)
.reader(new TestSyncItemReader(5))
.processor(itemProcessor())
.writer(itemWriter())
.taskExecutor(asyncTaskExecutor())
.throttleLimit(2)
.build()
}
// job using a flow
@Bean
Job flowJob() {

View File

@ -14,21 +14,23 @@ import org.springframework.batch.item.ParseException
import org.springframework.batch.item.UnexpectedInputException
class TestPartitionedItemReader implements ItemReader<String>, ItemStream {
int start
int end
ThreadLocal<Integer> start = new ThreadLocal<>()
ThreadLocal<Integer> end = new ThreadLocal<>()
@Override
String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (start >= end) {
if (start.get() >= end.get()) {
return null
}
return String.valueOf(start++)
def value = start.get()
start.set(value + 1)
return String.valueOf(value)
}
@Override
void open(ExecutionContext executionContext) throws ItemStreamException {
start = executionContext.getInt("start")
end = executionContext.getInt("end")
start.set(executionContext.getInt("start"))
end.set(executionContext.getInt("end"))
}
@Override

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import java.util.stream.Collectors
import java.util.stream.IntStream
import org.springframework.batch.item.ItemReader
class TestSyncItemReader implements ItemReader<String> {
private final Iterator<String> items
TestSyncItemReader(int max) {
items = IntStream.range(0, max).mapToObj(String.&valueOf).collect(Collectors.toList()).iterator()
}
synchronized String read() {
if (items.hasNext()) {
return items.next()
}
return null
}
}

View File

@ -12,7 +12,7 @@
<writer ref="jsr.TestItemWriter"/>
</chunk>
<partition>
<plan partitions="2">
<plan partitions="2" threads="2">
<properties partition="0">
<property name="start" value="0"/>
<property name="end" value="8"/>

View File

@ -3,12 +3,16 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="asyncTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>
<bean id="asyncTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10"/>
<property name="maxPoolSize" value="10"/>
</bean>
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean"/>
<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
<property name="taskExecutor" ref="asyncTaskExecutor"/>
</bean>
<bean id="jobParametersConverter"
class="org.springframework.batch.core.converter.DefaultJobParametersConverter"/>

View File

@ -25,6 +25,15 @@
</b:step>
</b:job>
<b:job id="parallelItemsJob">
<b:step id="parallelItemsStep">
<b:tasklet task-executor="asyncTaskExecutor" throttle-limit="2">
<b:chunk commit-interval="2" reader="syncItemReader" processor="itemProcessor"
writer="itemWriter"/>
</b:tasklet>
</b:step>
</b:job>
<b:job id="flowJob">
<b:flow id="flow" parent="parentFlow"/>
</b:job>
@ -70,7 +79,9 @@
<b:job id="partitionedJob">
<b:step id="partitionManagerStep">
<b:partition step="partitionWorkerStep" partitioner="testPartitioner"/>
<b:partition step="partitionWorkerStep" partitioner="testPartitioner">
<b:handler task-executor="asyncTaskExecutor" grid-size="2"/>
</b:partition>
</b:step>
</b:job>
<b:step id="partitionWorkerStep">
@ -83,6 +94,9 @@
<bean id="testPartitionedItemReader" class="springbatch.TestPartitionedItemReader"/>
<bean id="itemReader" class="springbatch.TestItemReader"/>
<bean id="syncItemReader" class="springbatch.TestSyncItemReader">
<constructor-arg name="max" value="5"/>
</bean>
<bean id="itemProcessor" class="springbatch.TestItemProcessor"/>
<bean id="itemWriter" class="springbatch.TestItemWriter"/>
<bean id="testTasklet" class="springbatch.TestTasklet"/>