Instrument spring-batch: partitioning (#2034)

This commit is contained in:
Mateusz Rzeszutek 2021-01-14 18:33:59 +01:00 committed by GitHub
parent 8addc67eba
commit 99bcf7dcf2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 270 additions and 20 deletions

View File

@ -11,6 +11,7 @@ import org.springframework.batch.core.configuration.annotation.StepBuilderFactor
import org.springframework.batch.core.job.builder.FlowBuilder
import org.springframework.batch.core.job.flow.Flow
import org.springframework.batch.core.job.flow.support.SimpleFlow
import org.springframework.batch.core.partition.support.Partitioner
import org.springframework.batch.item.ItemProcessor
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemWriter
@ -23,6 +24,8 @@ import springbatch.TestDecider
import springbatch.TestItemProcessor
import springbatch.TestItemReader
import springbatch.TestItemWriter
import springbatch.TestPartitionedItemReader
import springbatch.TestPartitioner
import springbatch.TestTasklet
@Configuration
@ -34,6 +37,27 @@ class SpringBatchApplication {
@Autowired
StepBuilderFactory steps
// common
@Bean
ItemReader<String> itemReader() {
new TestItemReader()
}
@Bean
ItemProcessor<String, Integer> itemProcessor() {
new TestItemProcessor()
}
@Bean
ItemWriter<Integer> itemWriter() {
new TestItemWriter()
}
@Bean
AsyncTaskExecutor asyncTaskExecutor() {
new ThreadPoolTaskExecutor()
}
// simple tasklet job
@Bean
Job taskletJob() {
@ -75,21 +99,6 @@ class SpringBatchApplication {
.build()
}
@Bean
ItemReader<String> itemReader() {
new TestItemReader()
}
@Bean
ItemProcessor<String, Integer> itemProcessor() {
new TestItemProcessor()
}
@Bean
ItemWriter<Integer> itemWriter() {
new TestItemWriter()
}
// job using a flow
@Bean
Job flowJob() {
@ -154,11 +163,6 @@ class SpringBatchApplication {
.build()
}
@Bean
AsyncTaskExecutor asyncTaskExecutor() {
new ThreadPoolTaskExecutor()
}
// job with decisions
@Bean
Job decisionJob() {
@ -191,4 +195,42 @@ class SpringBatchApplication {
.tasklet(new TestTasklet())
.build()
}
// partitioned job
@Bean
Job partitionedJob() {
jobs.get("partitionedJob")
.start(partitionManagerStep())
.build()
}
@Bean
Step partitionManagerStep() {
steps.get("partitionManagerStep")
.partitioner("partitionWorkerStep", partitioner())
.step(partitionWorkerStep())
.gridSize(2)
.taskExecutor(asyncTaskExecutor())
.build()
}
@Bean
Partitioner partitioner() {
new TestPartitioner()
}
@Bean
Step partitionWorkerStep() {
steps.get("partitionWorkerStep")
.chunk(5)
.reader(partitionedItemReader())
.processor(itemProcessor())
.writer(itemWriter())
.build()
}
@Bean
ItemReader<String> partitionedItemReader() {
new TestPartitionedItemReader()
}
}

View File

@ -217,6 +217,61 @@ abstract class SpringBatchTest extends AgentTestRunner {
}
}
}
def "should trace partitioned job"() {
when:
runJob("partitionedJob")
then:
assertTraces(1) {
trace(0, 8) {
span(0) {
name "BatchJob partitionedJob"
kind INTERNAL
}
span(1) {
def stepName = hasPartitionManagerStep() ? "partitionManagerStep" : "partitionWorkerStep"
name "BatchJob partitionedJob.$stepName"
kind INTERNAL
childOf span(0)
}
span(2) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01]/
kind INTERNAL
childOf span(1)
}
span(3) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(2)
}
span(4) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(2)
}
span(5) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01]/
kind INTERNAL
childOf span(1)
}
span(6) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(5)
}
span(7) {
name ~/BatchJob partitionedJob.partitionWorkerStep:partition[01].Chunk/
kind INTERNAL
childOf span(5)
}
}
}
}
protected boolean hasPartitionManagerStep() {
true
}
}
class JavaConfigBatchJobTest extends SpringBatchTest implements ApplicationConfigTrait {
@ -234,4 +289,7 @@ class XmlConfigBatchJobTest extends SpringBatchTest implements ApplicationConfig
}
class JsrConfigBatchJobTest extends SpringBatchTest implements JavaxBatchConfigTrait {
protected boolean hasPartitionManagerStep() {
false
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import javax.batch.api.BatchProperty
import javax.batch.api.chunk.ItemReader
import javax.inject.Inject
class TestPartitionedItemReader implements ItemReader {
@Inject
@BatchProperty(name = "start")
String startStr
@Inject
@BatchProperty(name = "end")
String endStr
int start
int end
@Override
void open(Serializable checkpoint) throws Exception {
start = Integer.parseInt(startStr)
end = Integer.parseInt(endStr)
}
@Override
void close() throws Exception {
}
@Override
Object readItem() throws Exception {
if (start >= end) {
return null
}
return String.valueOf(start++)
}
@Override
Serializable checkpointInfo() throws Exception {
return null
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import org.springframework.batch.item.ExecutionContext
import org.springframework.batch.item.ItemReader
import org.springframework.batch.item.ItemStream
import org.springframework.batch.item.ItemStreamException
import org.springframework.batch.item.NonTransientResourceException
import org.springframework.batch.item.ParseException
import org.springframework.batch.item.UnexpectedInputException
class TestPartitionedItemReader implements ItemReader<String>, ItemStream {
int start
int end
@Override
String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
if (start >= end) {
return null
}
return String.valueOf(start++)
}
@Override
void open(ExecutionContext executionContext) throws ItemStreamException {
start = executionContext.getInt("start")
end = executionContext.getInt("end")
}
@Override
void update(ExecutionContext executionContext) throws ItemStreamException {
}
@Override
void close() throws ItemStreamException {
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import org.springframework.batch.core.partition.support.Partitioner
import org.springframework.batch.item.ExecutionContext
class TestPartitioner implements Partitioner {
@Override
Map<String, ExecutionContext> partition(int gridSize) {
return [
"partition0": new ExecutionContext([
"start": 0, "end": 8
]),
"partition1": new ExecutionContext([
"start": 8, "end": 13
])
]
}
}

View File

@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="partitionedJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<step id="partitionWorkerStep">
<chunk item-count="5">
<reader ref="jsr.TestPartitionedItemReader">
<properties>
<property name="start" value="#{partitionPlan['start']}"/>
<property name="end" value="#{partitionPlan['end']}"/>
</properties>
</reader>
<processor ref="jsr.TestItemProcessor"/>
<writer ref="jsr.TestItemWriter"/>
</chunk>
<partition>
<plan partitions="2">
<properties partition="0">
<property name="start" value="0"/>
<property name="end" value="8"/>
</properties>
<properties partition="1">
<property name="start" value="8"/>
<property name="end" value="13"/>
</properties>
</plan>
</partition>
</step>
</job>

View File

@ -68,6 +68,20 @@
</b:step>
</b:job>
<b:job id="partitionedJob">
<b:step id="partitionManagerStep">
<b:partition step="partitionWorkerStep" partitioner="testPartitioner"/>
</b:step>
</b:job>
<b:step id="partitionWorkerStep">
<b:tasklet>
<b:chunk commit-interval="5" reader="testPartitionedItemReader" processor="itemProcessor"
writer="itemWriter"/>
</b:tasklet>
</b:step>
<bean id="testPartitioner" class="springbatch.TestPartitioner"/>
<bean id="testPartitionedItemReader" class="springbatch.TestPartitionedItemReader"/>
<bean id="itemReader" class="springbatch.TestItemReader"/>
<bean id="itemProcessor" class="springbatch.TestItemProcessor"/>
<bean id="itemWriter" class="springbatch.TestItemWriter"/>