convert spring batch tests to java (#12004)
Co-authored-by: Steve Rao <raozihao.rzh@alibaba-inc.com> Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
parent
68e844e129
commit
e36be9c3f6
|
@ -1,42 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import org.springframework.batch.core.Job
|
||||
import org.springframework.batch.core.JobParameter
|
||||
import org.springframework.batch.core.JobParameters
|
||||
import org.springframework.batch.core.launch.JobLauncher
|
||||
import org.springframework.context.ConfigurableApplicationContext
|
||||
|
||||
trait ApplicationConfigTrait {
|
||||
static ConfigurableApplicationContext applicationContext
|
||||
static JobLauncher jobLauncher
|
||||
|
||||
abstract ConfigurableApplicationContext createApplicationContext()
|
||||
|
||||
def setupSpec() {
|
||||
applicationContext = createApplicationContext()
|
||||
applicationContext.start()
|
||||
|
||||
jobLauncher = applicationContext.getBean(JobLauncher)
|
||||
}
|
||||
|
||||
def cleanupSpec() {
|
||||
applicationContext.stop()
|
||||
applicationContext.close()
|
||||
|
||||
additionalCleanup()
|
||||
}
|
||||
|
||||
def additionalCleanup() {}
|
||||
|
||||
def runJob(String jobName, Map<String, JobParameter> params) {
|
||||
def job = applicationContext.getBean(jobName, Job)
|
||||
postProcessJob(jobName, job)
|
||||
jobLauncher.run(job, new JobParameters(params))
|
||||
}
|
||||
|
||||
def postProcessJob(String jobName, Job job) {
|
||||
}
|
||||
}
|
|
@ -1,94 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import org.springframework.batch.core.JobParameter
|
||||
import org.springframework.context.ConfigurableApplicationContext
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static java.util.Collections.emptyMap
|
||||
|
||||
abstract class ChunkRootSpanTest extends AgentInstrumentationSpecification {
|
||||
|
||||
abstract runJob(String jobName, Map<String, JobParameter> params = emptyMap())
|
||||
|
||||
def "should create separate traces for each chunk"() {
|
||||
when:
|
||||
runJob("itemsAndTaskletJob")
|
||||
|
||||
then:
|
||||
assertTraces(5) {
|
||||
def itemStepSpan = null
|
||||
def taskletStepSpan = null
|
||||
|
||||
trace(0, 3) {
|
||||
itemStepSpan = span(1)
|
||||
taskletStepSpan = span(2)
|
||||
|
||||
span(0) {
|
||||
name "BatchJob itemsAndTaskletJob"
|
||||
kind INTERNAL
|
||||
}
|
||||
span(1) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
span(2) {
|
||||
name "BatchJob itemsAndTaskletJob.taskletStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
}
|
||||
trace(1, 1) {
|
||||
span(0) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
hasLink itemStepSpan
|
||||
}
|
||||
}
|
||||
trace(2, 1) {
|
||||
span(0) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
hasLink itemStepSpan
|
||||
}
|
||||
}
|
||||
trace(3, 1) {
|
||||
span(0) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
hasLink itemStepSpan
|
||||
}
|
||||
}
|
||||
trace(4, 1) {
|
||||
span(0) {
|
||||
name "BatchJob itemsAndTaskletJob.taskletStep.Tasklet"
|
||||
kind INTERNAL
|
||||
hasLink taskletStepSpan
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JavaConfigChunkRootSpanTest extends ChunkRootSpanTest implements ApplicationConfigTrait {
|
||||
@Override
|
||||
ConfigurableApplicationContext createApplicationContext() {
|
||||
new AnnotationConfigApplicationContext(SpringBatchApplication)
|
||||
}
|
||||
}
|
||||
|
||||
class XmlConfigChunkRootSpanTest extends ChunkRootSpanTest implements ApplicationConfigTrait {
|
||||
@Override
|
||||
ConfigurableApplicationContext createApplicationContext() {
|
||||
new ClassPathXmlApplicationContext("spring-batch.xml")
|
||||
}
|
||||
}
|
||||
|
||||
class JsrConfigChunkRootSpanTest extends ChunkRootSpanTest implements JavaxBatchConfigTrait {
|
||||
}
|
|
@ -1,219 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
|
||||
import org.springframework.batch.core.JobParameter
|
||||
import org.springframework.context.ConfigurableApplicationContext
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static java.util.Collections.emptyMap
|
||||
|
||||
abstract class CustomSpanEventTest extends AgentInstrumentationSpecification {
|
||||
static final boolean VERSION_GREATER_THAN_4_0 = Boolean.getBoolean("testLatestDeps")
|
||||
|
||||
abstract runJob(String jobName, Map<String, JobParameter> params = emptyMap())
|
||||
|
||||
def "should be able to call Span.current() and add custom info to spans"() {
|
||||
when:
|
||||
runJob("customSpanEventsItemsJob")
|
||||
|
||||
then:
|
||||
assertTraces(1) {
|
||||
trace(0, 7) {
|
||||
span(0) {
|
||||
name "BatchJob customSpanEventsItemsJob"
|
||||
kind INTERNAL
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "job.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "job.after"
|
||||
}
|
||||
}
|
||||
span(1) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
|
||||
// CompositeChunkListener has broken ordering that causes listeners that do not override order() to appear first at all times
|
||||
// because of that a custom ChunkListener will always see a Step span when using spring-batch versions [3, 4)
|
||||
// that bug was fixed in 4.0
|
||||
if (VERSION_GREATER_THAN_4_0) {
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "step.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "step.after"
|
||||
}
|
||||
} else {
|
||||
events(4)
|
||||
event(0) {
|
||||
eventName "step.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "chunk.before"
|
||||
}
|
||||
event(2) {
|
||||
eventName "chunk.after"
|
||||
}
|
||||
event(3) {
|
||||
eventName "step.after"
|
||||
}
|
||||
}
|
||||
}
|
||||
span(2) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
|
||||
// CompositeChunkListener has broken ordering that causes listeners that do not override order() to appear first at all times
|
||||
// because of that a custom ChunkListener will always see a Step span when using spring-batch versions [3, 4)
|
||||
// that bug was fixed in 4.0
|
||||
if (VERSION_GREATER_THAN_4_0) {
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "chunk.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "chunk.after"
|
||||
}
|
||||
} else {
|
||||
events(0)
|
||||
}
|
||||
}
|
||||
|
||||
itemSpans(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Spring Batch Java & XML configs have slightly different ordering from JSR config
|
||||
protected void itemSpans(TraceAssert trace) {
|
||||
trace.with {
|
||||
span(3) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "item.read.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "item.read.after"
|
||||
}
|
||||
}
|
||||
// second read that returns null and signifies end of stream
|
||||
span(4) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
// spring batch does not call ItemReadListener after() methods when read() returns end-of-stream
|
||||
events(1)
|
||||
event(0) {
|
||||
eventName "item.read.before"
|
||||
}
|
||||
}
|
||||
span(5) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "item.process.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "item.process.after"
|
||||
}
|
||||
}
|
||||
span(6) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "item.write.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "item.write.after"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JavaConfigCustomSpanEventTest extends CustomSpanEventTest implements ApplicationConfigTrait {
|
||||
@Override
|
||||
ConfigurableApplicationContext createApplicationContext() {
|
||||
new AnnotationConfigApplicationContext(SpringBatchApplication)
|
||||
}
|
||||
}
|
||||
|
||||
class XmlConfigCustomSpanEventTest extends CustomSpanEventTest implements ApplicationConfigTrait {
|
||||
@Override
|
||||
ConfigurableApplicationContext createApplicationContext() {
|
||||
new ClassPathXmlApplicationContext("spring-batch.xml")
|
||||
}
|
||||
}
|
||||
|
||||
class JsrConfigCustomSpanEventTest extends CustomSpanEventTest implements JavaxBatchConfigTrait {
|
||||
|
||||
// JSR config has different item span ordering
|
||||
protected void itemSpans(TraceAssert trace) {
|
||||
trace.with {
|
||||
span(3) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "item.read.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "item.read.after"
|
||||
}
|
||||
}
|
||||
span(4) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "item.process.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "item.process.after"
|
||||
}
|
||||
}
|
||||
// second read that returns null and signifies end of stream
|
||||
span(5) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
// spring batch does not call ItemReadListener after() methods when read() returns end-of-stream
|
||||
events(1)
|
||||
event(0) {
|
||||
eventName "item.read.before"
|
||||
}
|
||||
}
|
||||
span(6) {
|
||||
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
events(2)
|
||||
event(0) {
|
||||
eventName "item.write.before"
|
||||
}
|
||||
event(1) {
|
||||
eventName "item.write.after"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,421 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
|
||||
import io.opentelemetry.sdk.trace.data.SpanData
|
||||
import org.springframework.batch.core.Job
|
||||
import org.springframework.batch.core.JobParameter
|
||||
import org.springframework.batch.core.Step
|
||||
import org.springframework.batch.core.job.AbstractJob
|
||||
import org.springframework.batch.core.step.tasklet.TaskletStep
|
||||
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy
|
||||
import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate
|
||||
import org.springframework.context.ConfigurableApplicationContext
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
|
||||
import static java.util.Collections.emptyMap
|
||||
|
||||
abstract class ItemLevelSpanTest extends AgentInstrumentationSpecification {
|
||||
abstract runJob(String jobName, Map<String, JobParameter> params = emptyMap())
|
||||
|
||||
def "should trace item read, process and write calls"() {
|
||||
when:
|
||||
runJob("itemsAndTaskletJob")
|
||||
|
||||
then:
|
||||
assertTraces(1) {
|
||||
trace(0, 37) {
|
||||
span(0) {
|
||||
name "BatchJob itemsAndTaskletJob"
|
||||
kind INTERNAL
|
||||
}
|
||||
|
||||
// item step
|
||||
span(1) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
|
||||
// chunk 1, items 0-5
|
||||
span(2) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
(3..7).forEach {
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
}
|
||||
(8..12).forEach {
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
}
|
||||
span(13) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
|
||||
// chunk 2, items 5-10
|
||||
span(14) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
(15..19).forEach {
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
}
|
||||
(20..24).forEach {
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
}
|
||||
span(25) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
|
||||
// chunk 3, items 10-13
|
||||
span(26) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
// +1 for last read returning end of stream marker
|
||||
(27..30).forEach {
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(26)
|
||||
}
|
||||
}
|
||||
(31..33).forEach {
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(26)
|
||||
}
|
||||
}
|
||||
span(34) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(26)
|
||||
}
|
||||
|
||||
// tasklet step
|
||||
span(35) {
|
||||
name "BatchJob itemsAndTaskletJob.taskletStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
span(36) {
|
||||
name "BatchJob itemsAndTaskletJob.taskletStep.Tasklet"
|
||||
kind INTERNAL
|
||||
childOf span(35)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "should trace all item operations on a parallel items job"() {
|
||||
when:
|
||||
runJob("parallelItemsJob")
|
||||
|
||||
then:
|
||||
assertTraces(1) {
|
||||
trace(0, 19) {
|
||||
// as chunks are processed in parallel we need to sort them to guarantee that they are
|
||||
// in the expected order
|
||||
// firstly compute child span count for each chunk, we'll sort chunks from larger to smaller
|
||||
// based on child count
|
||||
def childCount = new HashMap<SpanData, Number>()
|
||||
spans.forEach { span ->
|
||||
if (span.name == "BatchJob parallelItemsJob.parallelItemsStep.Chunk") {
|
||||
childCount.put(span, spans.count { it.parentSpanId == span.spanId })
|
||||
}
|
||||
}
|
||||
// sort spans with a ranking function
|
||||
spans.sort({
|
||||
// job span is first
|
||||
if (it.name == "BatchJob parallelItemsJob") {
|
||||
return 0
|
||||
}
|
||||
// step span is second
|
||||
if (it.name == "BatchJob parallelItemsJob.parallelItemsStep") {
|
||||
return 1
|
||||
}
|
||||
|
||||
// find the chunk this span belongs to
|
||||
def chunkSpan = it
|
||||
while (chunkSpan != null && chunkSpan.name != "BatchJob parallelItemsJob.parallelItemsStep.Chunk") {
|
||||
chunkSpan = spans.find { it.spanId == chunkSpan.parentSpanId }
|
||||
}
|
||||
if (chunkSpan != null) {
|
||||
// sort larger chunks first
|
||||
return 100 - childCount.get(chunkSpan)
|
||||
}
|
||||
throw new IllegalStateException("item spans should have a parent chunk span")
|
||||
})
|
||||
|
||||
span(0) {
|
||||
name "BatchJob parallelItemsJob"
|
||||
kind INTERNAL
|
||||
}
|
||||
span(1) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
|
||||
// chunk 1, first two items; thread 1
|
||||
span(2) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
[3, 4].forEach {
|
||||
span(it) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
}
|
||||
[5, 6].forEach {
|
||||
span(it) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
}
|
||||
span(7) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
|
||||
// chunk 2, items 3 & 4; thread 2
|
||||
span(8) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
[9, 10].forEach {
|
||||
span(it) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(8)
|
||||
}
|
||||
}
|
||||
[11, 12].forEach {
|
||||
span(it) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(8)
|
||||
}
|
||||
}
|
||||
span(13) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(8)
|
||||
}
|
||||
|
||||
// chunk 3, 5th item; thread 1
|
||||
span(14) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
// +1 for last read returning end of stream marker
|
||||
[15, 16].forEach {
|
||||
span(it) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
}
|
||||
span(17) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
span(18) {
|
||||
name "BatchJob parallelItemsJob.parallelItemsStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def postProcessParallelItemsJob(String jobName, Job job) {
|
||||
if ("parallelItemsJob" == jobName) {
|
||||
Step step = ((AbstractJob) job).getStep("parallelItemsStep")
|
||||
TaskletStep taskletStep = (TaskletStep) step
|
||||
// explicitly set the number of chunks we expect from this test to ensure we always get
|
||||
// the same number of spans
|
||||
((TaskExecutorRepeatTemplate) taskletStep.stepOperations).completionPolicy = new SimpleCompletionPolicy(3)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JavaConfigItemLevelSpanTest extends ItemLevelSpanTest implements ApplicationConfigTrait {
|
||||
@Override
|
||||
def postProcessJob(String jobName, Job job) {
|
||||
postProcessParallelItemsJob(jobName, job)
|
||||
}
|
||||
|
||||
@Override
|
||||
ConfigurableApplicationContext createApplicationContext() {
|
||||
new AnnotationConfigApplicationContext(SpringBatchApplication)
|
||||
}
|
||||
}
|
||||
|
||||
class XmlConfigItemLevelSpanTest extends ItemLevelSpanTest implements ApplicationConfigTrait {
|
||||
@Override
|
||||
def postProcessJob(String jobName, Job job) {
|
||||
postProcessParallelItemsJob(jobName, job)
|
||||
}
|
||||
|
||||
@Override
|
||||
ConfigurableApplicationContext createApplicationContext() {
|
||||
new ClassPathXmlApplicationContext("spring-batch.xml")
|
||||
}
|
||||
}
|
||||
|
||||
// JsrChunkProcessor works a bit differently than the "standard" one and does not read the whole
|
||||
// chunk at once, it reads every item separately; it results in a different span ordering, that's
|
||||
// why it has a completely separate test class
|
||||
class JsrConfigItemLevelSpanTest extends AgentInstrumentationSpecification implements JavaxBatchConfigTrait {
|
||||
def "should trace item read, process and write calls"() {
|
||||
when:
|
||||
runJob("itemsAndTaskletJob", [:])
|
||||
|
||||
then:
|
||||
assertTraces(1) {
|
||||
trace(0, 37) {
|
||||
span(0) {
|
||||
name "BatchJob itemsAndTaskletJob"
|
||||
kind INTERNAL
|
||||
}
|
||||
|
||||
// item step
|
||||
span(1) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
|
||||
// chunk 1, items 0-5
|
||||
span(2) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
(3..11).step(2) {
|
||||
println it
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
span(it + 1) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
}
|
||||
span(13) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(2)
|
||||
}
|
||||
|
||||
// chunk 2, items 5-10
|
||||
span(14) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
(15..23).step(2) {
|
||||
println it
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
span(it + 1) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
}
|
||||
span(25) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(14)
|
||||
}
|
||||
|
||||
// chunk 3, items 10-13
|
||||
span(26) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.Chunk"
|
||||
kind INTERNAL
|
||||
childOf span(1)
|
||||
}
|
||||
(27..32).step(2) {
|
||||
println it
|
||||
span(it) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(26)
|
||||
}
|
||||
span(it + 1) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemProcess"
|
||||
kind INTERNAL
|
||||
childOf span(26)
|
||||
}
|
||||
}
|
||||
// last read returning end of stream marker
|
||||
span(33) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemRead"
|
||||
kind INTERNAL
|
||||
childOf span(26)
|
||||
}
|
||||
span(34) {
|
||||
name "BatchJob itemsAndTaskletJob.itemStep.ItemWrite"
|
||||
kind INTERNAL
|
||||
childOf span(26)
|
||||
}
|
||||
|
||||
// tasklet step
|
||||
span(35) {
|
||||
name "BatchJob itemsAndTaskletJob.taskletStep"
|
||||
kind INTERNAL
|
||||
childOf span(0)
|
||||
}
|
||||
span(36) {
|
||||
name "BatchJob itemsAndTaskletJob.taskletStep.Tasklet"
|
||||
kind INTERNAL
|
||||
childOf span(35)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,36 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import org.springframework.batch.core.JobParameter
|
||||
|
||||
import javax.batch.operations.JobOperator
|
||||
import javax.batch.runtime.BatchRuntime
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
trait JavaxBatchConfigTrait {
|
||||
static JobOperator jobOperator
|
||||
static AtomicInteger counter = new AtomicInteger()
|
||||
|
||||
def setupSpec() {
|
||||
jobOperator = BatchRuntime.jobOperator
|
||||
}
|
||||
|
||||
// just for consistency with ApplicationConfigTrait
|
||||
def cleanupSpec() {
|
||||
additionalCleanup()
|
||||
}
|
||||
|
||||
def additionalCleanup() {}
|
||||
|
||||
def runJob(String jobName, Map<String, JobParameter> params) {
|
||||
def jobParams = new Properties()
|
||||
params.forEach({ k, v ->
|
||||
jobParams.setProperty(k, v.toString())
|
||||
})
|
||||
// each job instance with the same name needs to be unique
|
||||
jobParams.setProperty("uniqueJobIdCounter", counter.getAndIncrement().toString())
|
||||
jobOperator.start(jobName, jobParams)
|
||||
}
|
||||
}
|
|
@ -1,304 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
import org.springframework.batch.core.Job
|
||||
import org.springframework.batch.core.Step
|
||||
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing
|
||||
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory
|
||||
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory
|
||||
import org.springframework.batch.core.job.builder.FlowBuilder
|
||||
import org.springframework.batch.core.job.flow.Flow
|
||||
import org.springframework.batch.core.job.flow.support.SimpleFlow
|
||||
import org.springframework.batch.core.launch.JobLauncher
|
||||
import org.springframework.batch.core.launch.support.SimpleJobLauncher
|
||||
import org.springframework.batch.core.partition.support.Partitioner
|
||||
import org.springframework.batch.core.repository.JobRepository
|
||||
import org.springframework.batch.item.ItemProcessor
|
||||
import org.springframework.batch.item.ItemReader
|
||||
import org.springframework.batch.item.ItemWriter
|
||||
import org.springframework.beans.factory.annotation.Autowired
|
||||
import org.springframework.context.annotation.Bean
|
||||
import org.springframework.context.annotation.Configuration
|
||||
import org.springframework.core.task.AsyncTaskExecutor
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
|
||||
import springbatch.CustomEventChunkListener
|
||||
import springbatch.CustomEventItemProcessListener
|
||||
import springbatch.CustomEventItemReadListener
|
||||
import springbatch.CustomEventItemWriteListener
|
||||
import springbatch.CustomEventJobListener
|
||||
import springbatch.CustomEventStepListener
|
||||
import springbatch.SingleItemReader
|
||||
import springbatch.TestDecider
|
||||
import springbatch.TestItemProcessor
|
||||
import springbatch.TestItemReader
|
||||
import springbatch.TestItemWriter
|
||||
import springbatch.TestPartitionedItemReader
|
||||
import springbatch.TestPartitioner
|
||||
import springbatch.TestSyncItemReader
|
||||
import springbatch.TestTasklet
|
||||
|
||||
@Configuration
|
||||
@EnableBatchProcessing
|
||||
class SpringBatchApplication {
|
||||
|
||||
@Autowired
|
||||
JobBuilderFactory jobs
|
||||
@Autowired
|
||||
StepBuilderFactory steps
|
||||
@Autowired
|
||||
JobRepository jobRepository
|
||||
|
||||
@Bean
|
||||
AsyncTaskExecutor asyncTaskExecutor() {
|
||||
def executor = new ThreadPoolTaskExecutor()
|
||||
executor.corePoolSize = 10
|
||||
executor.maxPoolSize = 10
|
||||
executor
|
||||
}
|
||||
|
||||
@Bean
|
||||
JobLauncher jobLauncher() {
|
||||
def launcher = new SimpleJobLauncher()
|
||||
launcher.jobRepository = jobRepository
|
||||
launcher.taskExecutor = asyncTaskExecutor()
|
||||
launcher
|
||||
}
|
||||
|
||||
// common
|
||||
@Bean
|
||||
ItemReader<String> itemReader() {
|
||||
new TestItemReader()
|
||||
}
|
||||
|
||||
@Bean
|
||||
ItemProcessor<String, Integer> itemProcessor() {
|
||||
new TestItemProcessor()
|
||||
}
|
||||
|
||||
@Bean
|
||||
ItemWriter<Integer> itemWriter() {
|
||||
new TestItemWriter()
|
||||
}
|
||||
|
||||
// simple tasklet job
|
||||
@Bean
|
||||
Job taskletJob() {
|
||||
jobs.get("taskletJob")
|
||||
.start(step())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step step() {
|
||||
steps.get("step")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
// 2-step tasklet + chunked items job
|
||||
@Bean
|
||||
Job itemsAndTaskletJob() {
|
||||
jobs.get("itemsAndTaskletJob")
|
||||
.start(itemStep())
|
||||
.next(taskletStep())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step taskletStep() {
|
||||
steps.get("taskletStep")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step itemStep() {
|
||||
steps.get("itemStep")
|
||||
.chunk(5)
|
||||
.reader(itemReader())
|
||||
.processor(itemProcessor())
|
||||
.writer(itemWriter())
|
||||
.build()
|
||||
}
|
||||
|
||||
// parallel items job
|
||||
@Bean
|
||||
Job parallelItemsJob() {
|
||||
jobs.get("parallelItemsJob")
|
||||
.start(parallelItemsStep())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step parallelItemsStep() {
|
||||
steps.get("parallelItemsStep")
|
||||
.chunk(2)
|
||||
.reader(new TestSyncItemReader(5))
|
||||
.processor(itemProcessor())
|
||||
.writer(itemWriter())
|
||||
.taskExecutor(asyncTaskExecutor())
|
||||
.throttleLimit(2)
|
||||
.build()
|
||||
}
|
||||
|
||||
// job using a flow
|
||||
@Bean
|
||||
Job flowJob() {
|
||||
jobs.get("flowJob")
|
||||
.start(flow())
|
||||
.build()
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Flow flow() {
|
||||
new FlowBuilder<SimpleFlow>("flow")
|
||||
.start(flowStep1())
|
||||
.on("*")
|
||||
.to(flowStep2())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step flowStep1() {
|
||||
steps.get("flowStep1")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step flowStep2() {
|
||||
steps.get("flowStep2")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
// split job
|
||||
@Bean
|
||||
Job splitJob() {
|
||||
jobs.get("splitJob")
|
||||
.start(splitFlowStep1())
|
||||
.split(asyncTaskExecutor())
|
||||
.add(splitFlow2())
|
||||
.build()
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step splitFlowStep1() {
|
||||
steps.get("splitFlowStep1")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Flow splitFlow2() {
|
||||
new FlowBuilder<SimpleFlow>("splitFlow2")
|
||||
.start(splitFlowStep2())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step splitFlowStep2() {
|
||||
steps.get("splitFlowStep2")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
// job with decisions
|
||||
@Bean
|
||||
Job decisionJob() {
|
||||
jobs.get("decisionJob")
|
||||
.start(decisionStepStart())
|
||||
.next(new TestDecider())
|
||||
.on("LEFT").to(decisionStepLeft())
|
||||
.on("RIGHT").to(decisionStepRight())
|
||||
.end()
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step decisionStepStart() {
|
||||
steps.get("decisionStepStart")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step decisionStepLeft() {
|
||||
steps.get("decisionStepLeft")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step decisionStepRight() {
|
||||
steps.get("decisionStepRight")
|
||||
.tasklet(new TestTasklet())
|
||||
.build()
|
||||
}
|
||||
|
||||
// partitioned job
|
||||
@Bean
|
||||
Job partitionedJob() {
|
||||
jobs.get("partitionedJob")
|
||||
.start(partitionManagerStep())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step partitionManagerStep() {
|
||||
steps.get("partitionManagerStep")
|
||||
.partitioner("partitionWorkerStep", partitioner())
|
||||
.step(partitionWorkerStep())
|
||||
.gridSize(2)
|
||||
.taskExecutor(asyncTaskExecutor())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Partitioner partitioner() {
|
||||
new TestPartitioner()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step partitionWorkerStep() {
|
||||
steps.get("partitionWorkerStep")
|
||||
.chunk(5)
|
||||
.reader(partitionedItemReader())
|
||||
.processor(itemProcessor())
|
||||
.writer(itemWriter())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
ItemReader<String> partitionedItemReader() {
|
||||
new TestPartitionedItemReader()
|
||||
}
|
||||
|
||||
// custom span events items job
|
||||
@Bean
|
||||
Job customSpanEventsItemsJob() {
|
||||
jobs.get("customSpanEventsItemsJob")
|
||||
.start(customSpanEventsItemStep())
|
||||
.listener(new CustomEventJobListener())
|
||||
.build()
|
||||
}
|
||||
|
||||
@Bean
|
||||
Step customSpanEventsItemStep() {
|
||||
steps.get("customSpanEventsItemStep")
|
||||
.chunk(5)
|
||||
.reader(new SingleItemReader())
|
||||
.processor(itemProcessor())
|
||||
.writer(itemWriter())
|
||||
.listener(new CustomEventStepListener())
|
||||
.listener(new CustomEventChunkListener())
|
||||
.listener(new CustomEventItemReadListener())
|
||||
.listener(new CustomEventItemProcessListener())
|
||||
.listener(new CustomEventItemWriteListener())
|
||||
.build()
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
|
||||
import javax.batch.api.chunk.listener.ChunkListener
|
||||
|
||||
class CustomEventChunkListener implements ChunkListener {
|
||||
@Override
|
||||
void beforeChunk() throws Exception {
|
||||
Span.current().addEvent("chunk.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onError(Exception e) throws Exception {
|
||||
Span.current().addEvent("chunk.error")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterChunk() throws Exception {
|
||||
Span.current().addEvent("chunk.after")
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
|
||||
import javax.batch.api.chunk.listener.ItemProcessListener
|
||||
|
||||
class CustomEventItemProcessListener implements ItemProcessListener {
|
||||
@Override
|
||||
void beforeProcess(Object o) throws Exception {
|
||||
Span.current().addEvent("item.process.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterProcess(Object o, Object o1) throws Exception {
|
||||
Span.current().addEvent("item.process.after")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onProcessError(Object o, Exception e) throws Exception {
|
||||
Span.current().addEvent("item.process.error")
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
|
||||
import javax.batch.api.chunk.listener.ItemReadListener
|
||||
|
||||
class CustomEventItemReadListener implements ItemReadListener {
|
||||
@Override
|
||||
void beforeRead() throws Exception {
|
||||
Span.current().addEvent("item.read.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterRead(Object o) throws Exception {
|
||||
Span.current().addEvent("item.read.after")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onReadError(Exception e) throws Exception {
|
||||
Span.current().addEvent("item.read.error")
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
|
||||
import javax.batch.api.chunk.listener.ItemWriteListener
|
||||
|
||||
class CustomEventItemWriteListener implements ItemWriteListener {
|
||||
@Override
|
||||
void beforeWrite(List<Object> list) throws Exception {
|
||||
Span.current().addEvent("item.write.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterWrite(List<Object> list) throws Exception {
|
||||
Span.current().addEvent("item.write.after")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onWriteError(List<Object> list, Exception e) throws Exception {
|
||||
Span.current().addEvent("item.write.error")
|
||||
}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
|
||||
import javax.batch.api.listener.JobListener
|
||||
|
||||
class CustomEventJobListener implements JobListener {
|
||||
@Override
|
||||
void beforeJob() throws Exception {
|
||||
Span.current().addEvent("job.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterJob() throws Exception {
|
||||
Span.current().addEvent("job.after")
|
||||
}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
|
||||
import javax.batch.api.listener.StepListener
|
||||
|
||||
class CustomEventStepListener implements StepListener {
|
||||
@Override
|
||||
void beforeStep() throws Exception {
|
||||
Span.current().addEvent("step.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterStep() throws Exception {
|
||||
Span.current().addEvent("step.after")
|
||||
}
|
||||
}
|
|
@ -1,31 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import javax.batch.api.chunk.ItemReader
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class SingleItemReader implements ItemReader {
|
||||
final AtomicReference<String> item = new AtomicReference<>("42")
|
||||
|
||||
@Override
|
||||
void open(Serializable serializable) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
Object readItem() throws Exception {
|
||||
return item.getAndSet(null)
|
||||
}
|
||||
|
||||
@Override
|
||||
Serializable checkpointInfo() throws Exception {
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,28 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import javax.batch.api.BatchProperty
|
||||
import javax.batch.api.Batchlet
|
||||
import javax.inject.Inject
|
||||
|
||||
class TestBatchlet implements Batchlet {
|
||||
@Inject
|
||||
@BatchProperty(name = "fail")
|
||||
String fail
|
||||
|
||||
@Override
|
||||
String process() throws Exception {
|
||||
if (fail != null && Integer.valueOf(fail) == 1) {
|
||||
throw new IllegalStateException("fail")
|
||||
}
|
||||
return "FINISHED"
|
||||
}
|
||||
|
||||
@Override
|
||||
void stop() throws Exception {
|
||||
}
|
||||
}
|
|
@ -1,16 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import javax.batch.api.Decider
|
||||
import javax.batch.runtime.StepExecution
|
||||
|
||||
class TestDecider implements Decider {
|
||||
@Override
|
||||
String decide(StepExecution[] stepExecutions) throws Exception {
|
||||
"LEFT"
|
||||
}
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import javax.batch.api.chunk.ItemProcessor
|
||||
|
||||
class TestItemProcessor implements ItemProcessor {
|
||||
@Override
|
||||
Object processItem(Object item) throws Exception {
|
||||
Integer.parseInt(item as String)
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import javax.batch.api.chunk.ItemReader
|
||||
import java.util.stream.Collectors
|
||||
import java.util.stream.IntStream
|
||||
|
||||
class TestItemReader implements ItemReader {
|
||||
private final List<String> items = IntStream.range(0, 13).mapToObj(String.&valueOf).collect(Collectors.toList())
|
||||
private Iterator<String> itemsIt
|
||||
|
||||
@Override
|
||||
void open(Serializable serializable) throws Exception {
|
||||
itemsIt = items.iterator()
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() throws Exception {
|
||||
itemsIt = null
|
||||
}
|
||||
|
||||
@Override
|
||||
Object readItem() throws Exception {
|
||||
if (itemsIt == null) {
|
||||
return null
|
||||
}
|
||||
return itemsIt.hasNext() ? itemsIt.next() : null
|
||||
}
|
||||
|
||||
@Override
|
||||
Serializable checkpointInfo() throws Exception {
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,32 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import javax.batch.api.chunk.ItemWriter
|
||||
|
||||
class TestItemWriter implements ItemWriter {
|
||||
final List<Integer> items = new ArrayList()
|
||||
|
||||
@Override
|
||||
void open(Serializable checkpoint) throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
void writeItems(List<Object> items) throws Exception {
|
||||
for (item in items) {
|
||||
this.items.add(item as Integer)
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
Serializable checkpointInfo() throws Exception {
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,45 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package jsr
|
||||
|
||||
import javax.batch.api.BatchProperty
|
||||
import javax.batch.api.chunk.ItemReader
|
||||
import javax.inject.Inject
|
||||
|
||||
class TestPartitionedItemReader implements ItemReader {
|
||||
@Inject
|
||||
@BatchProperty(name = "start")
|
||||
String startStr
|
||||
@Inject
|
||||
@BatchProperty(name = "end")
|
||||
String endStr
|
||||
|
||||
int start
|
||||
int end
|
||||
|
||||
@Override
|
||||
void open(Serializable checkpoint) throws Exception {
|
||||
start = Integer.parseInt(startStr)
|
||||
end = Integer.parseInt(endStr)
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
Object readItem() throws Exception {
|
||||
if (start >= end) {
|
||||
return null
|
||||
}
|
||||
return String.valueOf(start++)
|
||||
}
|
||||
|
||||
@Override
|
||||
Serializable checkpointInfo() throws Exception {
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import org.springframework.batch.core.ChunkListener
|
||||
import org.springframework.batch.core.scope.context.ChunkContext
|
||||
|
||||
class CustomEventChunkListener implements ChunkListener {
|
||||
@Override
|
||||
void beforeChunk(ChunkContext context) {
|
||||
Span.current().addEvent("chunk.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterChunk(ChunkContext context) {
|
||||
Span.current().addEvent("chunk.after")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterChunkError(ChunkContext context) {
|
||||
Span.current().addEvent("chunk.error")
|
||||
}
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import org.springframework.batch.core.ItemProcessListener
|
||||
|
||||
class CustomEventItemProcessListener implements ItemProcessListener<String, Integer> {
|
||||
@Override
|
||||
void beforeProcess(String item) {
|
||||
Span.current().addEvent("item.process.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterProcess(String item, Integer result) {
|
||||
Span.current().addEvent("item.process.after")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onProcessError(String item, Exception e) {
|
||||
Span.current().addEvent("item.process.error")
|
||||
}
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import org.springframework.batch.core.ItemReadListener
|
||||
|
||||
class CustomEventItemReadListener implements ItemReadListener<String> {
|
||||
@Override
|
||||
void beforeRead() {
|
||||
Span.current().addEvent("item.read.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterRead(String item) {
|
||||
Span.current().addEvent("item.read.after")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onReadError(Exception ex) {
|
||||
Span.current().addEvent("item.read.error")
|
||||
}
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import org.springframework.batch.core.ItemWriteListener
|
||||
|
||||
class CustomEventItemWriteListener implements ItemWriteListener<Integer> {
|
||||
@Override
|
||||
void beforeWrite(List<? extends Integer> items) {
|
||||
Span.current().addEvent("item.write.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterWrite(List<? extends Integer> items) {
|
||||
Span.current().addEvent("item.write.after")
|
||||
}
|
||||
|
||||
@Override
|
||||
void onWriteError(Exception exception, List<? extends Integer> items) {
|
||||
Span.current().addEvent("item.write.error")
|
||||
}
|
||||
}
|
|
@ -1,22 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import org.springframework.batch.core.JobExecution
|
||||
import org.springframework.batch.core.JobExecutionListener
|
||||
|
||||
class CustomEventJobListener implements JobExecutionListener {
|
||||
@Override
|
||||
void beforeJob(JobExecution jobExecution) {
|
||||
Span.current().addEvent("job.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
void afterJob(JobExecution jobExecution) {
|
||||
Span.current().addEvent("job.after")
|
||||
}
|
||||
}
|
|
@ -1,24 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import io.opentelemetry.api.trace.Span
|
||||
import org.springframework.batch.core.ExitStatus
|
||||
import org.springframework.batch.core.StepExecution
|
||||
import org.springframework.batch.core.StepExecutionListener
|
||||
|
||||
class CustomEventStepListener implements StepExecutionListener {
|
||||
@Override
|
||||
void beforeStep(StepExecution stepExecution) {
|
||||
Span.current().addEvent("step.before")
|
||||
}
|
||||
|
||||
@Override
|
||||
ExitStatus afterStep(StepExecution stepExecution) {
|
||||
Span.current().addEvent("step.after")
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,19 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.item.ItemReader
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
class SingleItemReader implements ItemReader<String> {
|
||||
final AtomicReference<String> item = new AtomicReference<>("42")
|
||||
|
||||
@Override
|
||||
String read() {
|
||||
return item.getAndSet(null)
|
||||
}
|
||||
}
|
|
@ -1,18 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.core.JobExecution
|
||||
import org.springframework.batch.core.StepExecution
|
||||
import org.springframework.batch.core.job.flow.FlowExecutionStatus
|
||||
import org.springframework.batch.core.job.flow.JobExecutionDecider
|
||||
|
||||
class TestDecider implements JobExecutionDecider {
|
||||
@Override
|
||||
FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
|
||||
new FlowExecutionStatus("LEFT")
|
||||
}
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.item.ItemProcessor
|
||||
|
||||
class TestItemProcessor implements ItemProcessor<String, Integer> {
|
||||
@Override
|
||||
Integer process(String item) throws Exception {
|
||||
Integer.parseInt(item)
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.item.support.ListItemReader
|
||||
|
||||
import java.util.stream.Collectors
|
||||
import java.util.stream.IntStream
|
||||
|
||||
class TestItemReader extends ListItemReader<String> {
|
||||
TestItemReader() {
|
||||
super(IntStream.range(0, 13).mapToObj(String.&valueOf).collect(Collectors.toList()) as List<String>)
|
||||
}
|
||||
}
|
|
@ -1,17 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.item.ItemWriter
|
||||
|
||||
class TestItemWriter implements ItemWriter<Integer> {
|
||||
final List<Integer> items = Collections.synchronizedList(new ArrayList())
|
||||
|
||||
@Override
|
||||
void write(List<? extends Integer> items) throws Exception {
|
||||
this.items.addAll(items)
|
||||
}
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.item.ExecutionContext
|
||||
import org.springframework.batch.item.ItemReader
|
||||
import org.springframework.batch.item.ItemStream
|
||||
import org.springframework.batch.item.ItemStreamException
|
||||
import org.springframework.batch.item.NonTransientResourceException
|
||||
import org.springframework.batch.item.ParseException
|
||||
import org.springframework.batch.item.UnexpectedInputException
|
||||
|
||||
class TestPartitionedItemReader implements ItemReader<String>, ItemStream {
|
||||
ThreadLocal<Integer> start = new ThreadLocal<>()
|
||||
ThreadLocal<Integer> end = new ThreadLocal<>()
|
||||
|
||||
@Override
|
||||
String read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
|
||||
if (start.get() >= end.get()) {
|
||||
return null
|
||||
}
|
||||
def value = start.get()
|
||||
start.set(value + 1)
|
||||
return String.valueOf(value)
|
||||
}
|
||||
|
||||
@Override
|
||||
void open(ExecutionContext executionContext) throws ItemStreamException {
|
||||
start.set(executionContext.getInt("start"))
|
||||
end.set(executionContext.getInt("end"))
|
||||
}
|
||||
|
||||
@Override
|
||||
void update(ExecutionContext executionContext) throws ItemStreamException {
|
||||
}
|
||||
|
||||
@Override
|
||||
void close() throws ItemStreamException {
|
||||
}
|
||||
}
|
|
@ -1,23 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.core.partition.support.Partitioner
|
||||
import org.springframework.batch.item.ExecutionContext
|
||||
|
||||
class TestPartitioner implements Partitioner {
|
||||
@Override
|
||||
Map<String, ExecutionContext> partition(int gridSize) {
|
||||
return [
|
||||
"partition0": new ExecutionContext([
|
||||
"start": 0, "end": 8
|
||||
]),
|
||||
"partition1": new ExecutionContext([
|
||||
"start": 8, "end": 13
|
||||
])
|
||||
]
|
||||
}
|
||||
}
|
|
@ -1,26 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.item.ItemReader
|
||||
|
||||
import java.util.stream.Collectors
|
||||
import java.util.stream.IntStream
|
||||
|
||||
class TestSyncItemReader implements ItemReader<String> {
|
||||
private final Iterator<String> items
|
||||
|
||||
TestSyncItemReader(int max) {
|
||||
items = IntStream.range(0, max).mapToObj(String.&valueOf).collect(Collectors.toList()).iterator()
|
||||
}
|
||||
|
||||
synchronized String read() {
|
||||
if (items.hasNext()) {
|
||||
return items.next()
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
|
@ -1,21 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package springbatch
|
||||
|
||||
import org.springframework.batch.core.StepContribution
|
||||
import org.springframework.batch.core.scope.context.ChunkContext
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet
|
||||
import org.springframework.batch.repeat.RepeatStatus
|
||||
|
||||
class TestTasklet implements Tasklet {
|
||||
@Override
|
||||
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
|
||||
if (chunkContext.stepContext.stepExecution.jobParameters.getLong("fail") == 1) {
|
||||
throw new IllegalStateException("fail")
|
||||
}
|
||||
RepeatStatus.FINISHED
|
||||
}
|
||||
}
|
|
@ -0,0 +1,70 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.chunk;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JobRunner;
|
||||
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
|
||||
import io.opentelemetry.sdk.trace.data.LinkData;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Consumer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
abstract class AbstractChunkRootSpanTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
private final JobRunner jobRunner;
|
||||
|
||||
AbstractChunkRootSpanTest(JobRunner jobRunner) {
|
||||
this.jobRunner = jobRunner;
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldCreateSeparateTracesForEachChunk() {
|
||||
jobRunner.runJob("itemsAndTaskletJob");
|
||||
AtomicReference<SpanData> itemStepSpan = new AtomicReference<>();
|
||||
AtomicReference<SpanData> taskletStepSpan = new AtomicReference<>();
|
||||
|
||||
Consumer<TraceAssert> chunk =
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasLinks(LinkData.create(itemStepSpan.get().getSpanContext())));
|
||||
testing.waitAndAssertTraces(
|
||||
trace -> {
|
||||
itemStepSpan.set(trace.getSpan(1));
|
||||
taskletStepSpan.set(trace.getSpan(2));
|
||||
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("BatchJob itemsAndTaskletJob").hasKind(SpanKind.INTERNAL),
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)),
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.taskletStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)));
|
||||
},
|
||||
chunk,
|
||||
chunk,
|
||||
chunk,
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.taskletStep.Tasklet")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasLinks(LinkData.create(taskletStepSpan.get().getSpanContext()))));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.chunk;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.SpringBatchApplication;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
|
||||
class JavaConfigChunkRootSpanTest extends AbstractChunkRootSpanTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final ApplicationConfigRunner runner =
|
||||
new ApplicationConfigRunner(
|
||||
() -> new AnnotationConfigApplicationContext(SpringBatchApplication.class));
|
||||
|
||||
JavaConfigChunkRootSpanTest() {
|
||||
super(runner);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.chunk;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JavaxBatchConfigRunner;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
class JsrConfigChunkRootSpanTest extends AbstractChunkRootSpanTest {
|
||||
|
||||
@RegisterExtension static final JavaxBatchConfigRunner runner = new JavaxBatchConfigRunner();
|
||||
|
||||
JsrConfigChunkRootSpanTest() {
|
||||
super(runner);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.chunk;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
|
||||
class XmlConfigChunkRootSpanTest extends AbstractChunkRootSpanTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final ApplicationConfigRunner runner =
|
||||
new ApplicationConfigRunner(() -> new ClassPathXmlApplicationContext("spring-batch.xml"));
|
||||
|
||||
XmlConfigChunkRootSpanTest() {
|
||||
super(runner);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.event;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JobRunner;
|
||||
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
|
||||
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
abstract class CustomSpanEventTest {
|
||||
|
||||
private static final boolean VERSION_GREATER_THAN_4_0 = Boolean.getBoolean("testLatestDeps");
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
private final JobRunner runner;
|
||||
|
||||
CustomSpanEventTest(JobRunner runner) {
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldBeAbleToCallSpanCurrentAndAddCustomInfoToSpans() {
|
||||
runner.runJob("customSpanEventsItemsJob");
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace -> {
|
||||
List<Consumer<SpanDataAssert>> assertions =
|
||||
new ArrayList<>(
|
||||
Arrays.asList(
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasEventsSatisfyingExactly(
|
||||
event -> event.hasName("job.before"),
|
||||
event -> event.hasName("job.after")),
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob.customSpanEventsItemStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.satisfies(
|
||||
spanData -> {
|
||||
// CompositeChunkListener has broken ordering that causes
|
||||
// listeners that do not override order() to appear first at all
|
||||
// times because of that a custom ChunkListener will always see
|
||||
// a Step span when using spring-batch versions [3, 4)
|
||||
// that bug was fixed in 4.0
|
||||
if (VERSION_GREATER_THAN_4_0) {
|
||||
assertThat(spanData)
|
||||
.hasEventsSatisfyingExactly(
|
||||
event -> event.hasName("step.before"),
|
||||
event -> event.hasName("step.after"));
|
||||
} else {
|
||||
assertThat(spanData)
|
||||
.hasEventsSatisfyingExactly(
|
||||
event -> event.hasName("step.before"),
|
||||
event -> event.hasName("chunk.before"),
|
||||
event -> event.hasName("chunk.after"),
|
||||
event -> event.hasName("step.after"));
|
||||
}
|
||||
}),
|
||||
span ->
|
||||
span.hasName(
|
||||
"BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.satisfies(
|
||||
spanData -> {
|
||||
// CompositeChunkListener has broken ordering that causes
|
||||
// listeners that do not override order() to appear first at all
|
||||
// times because of that a custom ChunkListener will always see
|
||||
// a Step span when using spring-batch versions [3, 4)
|
||||
// that bug was fixed in 4.0
|
||||
if (VERSION_GREATER_THAN_4_0) {
|
||||
assertThat(spanData)
|
||||
.hasEventsSatisfyingExactly(
|
||||
event -> event.hasName("chunk.before"),
|
||||
event -> event.hasName("chunk.after"));
|
||||
} else {
|
||||
assertThat(spanData.getEvents()).isEmpty();
|
||||
}
|
||||
})));
|
||||
itemSpans(trace, assertions);
|
||||
trace.hasSpansSatisfyingExactly(assertions);
|
||||
});
|
||||
}
|
||||
|
||||
protected void itemSpans(TraceAssert trace, List<Consumer<SpanDataAssert>> assertions) {
|
||||
assertions.addAll(
|
||||
Arrays.asList(
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)),
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)),
|
||||
span ->
|
||||
span.hasName(
|
||||
"BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)),
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2))));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.event;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.SpringBatchApplication;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
|
||||
class JavaConfigCustomSpanEventTest extends CustomSpanEventTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final ApplicationConfigRunner runner =
|
||||
new ApplicationConfigRunner(
|
||||
() -> new AnnotationConfigApplicationContext(SpringBatchApplication.class));
|
||||
|
||||
JavaConfigCustomSpanEventTest() {
|
||||
super(runner);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.event;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JavaxBatchConfigRunner;
|
||||
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
|
||||
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
class JsrConfigCustomSpanEventTest extends CustomSpanEventTest {
|
||||
|
||||
@RegisterExtension static final JavaxBatchConfigRunner runner = new JavaxBatchConfigRunner();
|
||||
|
||||
JsrConfigCustomSpanEventTest() {
|
||||
super(runner);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void itemSpans(TraceAssert trace, List<Consumer<SpanDataAssert>> assertions) {
|
||||
assertions.addAll(
|
||||
Arrays.asList(
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2))
|
||||
.hasEventsSatisfyingExactly(
|
||||
event -> event.hasName("item.read.before"),
|
||||
event -> event.hasName("item.read.after")),
|
||||
span ->
|
||||
span.hasName(
|
||||
"BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2))
|
||||
.hasEventsSatisfyingExactly(
|
||||
event -> event.hasName("item.process.before"),
|
||||
event -> event.hasName("item.process.after")),
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2))
|
||||
.hasEventsSatisfyingExactly(event -> event.hasName("item.read.before")),
|
||||
span ->
|
||||
span.hasName("BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2))
|
||||
.hasEventsSatisfyingExactly(
|
||||
event -> event.hasName("item.write.before"),
|
||||
event -> event.hasName("item.write.after"))));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,21 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.event;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
|
||||
class XmlConfigCustomSpanEventTest extends CustomSpanEventTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final ApplicationConfigRunner runner =
|
||||
new ApplicationConfigRunner(() -> new ClassPathXmlApplicationContext("spring-batch.xml"));
|
||||
|
||||
XmlConfigCustomSpanEventTest() {
|
||||
super(runner);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,320 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.item;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JobRunner;
|
||||
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.lang.reflect.Field;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Consumer;
|
||||
import org.assertj.core.api.AssertAccess;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.job.AbstractJob;
|
||||
import org.springframework.batch.core.step.tasklet.TaskletStep;
|
||||
import org.springframework.batch.repeat.policy.SimpleCompletionPolicy;
|
||||
import org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate;
|
||||
|
||||
abstract class ItemLevelSpanTest {
|
||||
private final JobRunner runner;
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
ItemLevelSpanTest(JobRunner runner) {
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldTraceItemReadProcessAndWriteCalls() {
|
||||
runner.runJob("itemsAndTaskletJob");
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace -> {
|
||||
List<Consumer<SpanDataAssert>> assertions = new ArrayList<>();
|
||||
assertions.add(
|
||||
span -> span.hasName("BatchJob itemsAndTaskletJob").hasKind(SpanKind.INTERNAL));
|
||||
|
||||
// item step
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)));
|
||||
|
||||
// chunk 1, items 0-5
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
for (int i = 3; i <= 7; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
}
|
||||
for (int i = 8; i <= 12; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
}
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
|
||||
// chunk 2, items 5-10
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
for (int i = 15; i <= 19; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
}
|
||||
for (int i = 20; i <= 24; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
}
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
|
||||
// chunk 3, items 10-13
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
// +1 for last read returning end of stream marker
|
||||
for (int i = 27; i <= 30; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(26)));
|
||||
}
|
||||
for (int i = 31; i <= 33; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(26)));
|
||||
}
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(26)));
|
||||
|
||||
// tasklet step
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.taskletStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.taskletStep.Tasklet")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(35)));
|
||||
|
||||
trace.hasSpansSatisfyingExactly(assertions);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldTraceAllItemOperationsOnAparallelItemsJob() {
|
||||
runner.runJob("parallelItemsJob");
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace -> {
|
||||
// as chunks are processed in parallel we need to sort them to guarantee that they are
|
||||
// in the expected order
|
||||
// firstly compute child span count for each chunk, we'll sort chunks from larger to
|
||||
// smaller based on child count
|
||||
|
||||
List<SpanData> spans = AssertAccess.getActual(trace);
|
||||
Map<SpanData, Long> childCount = new HashMap<>();
|
||||
|
||||
for (SpanData span : spans) {
|
||||
if (span.getName().equals("BatchJob parallelItemsJob.parallelItemsStep.Chunk")) {
|
||||
childCount.put(
|
||||
span,
|
||||
spans.stream()
|
||||
.filter(it -> it.getParentSpanId().equals(span.getSpanId()))
|
||||
.count());
|
||||
}
|
||||
}
|
||||
|
||||
spans.sort(
|
||||
Comparator.comparingLong(
|
||||
it -> {
|
||||
// job span is first
|
||||
if (it.getName().equals("BatchJob parallelItemsJob")) {
|
||||
return 0;
|
||||
}
|
||||
// step span is second
|
||||
if (it.getName().equals("BatchJob parallelItemsJob.parallelItemsStep")) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
// find the chunk this span belongs to
|
||||
SpanData chunkSpan = it;
|
||||
while (chunkSpan != null
|
||||
&& !chunkSpan
|
||||
.getName()
|
||||
.equals("BatchJob parallelItemsJob.parallelItemsStep.Chunk")) {
|
||||
SpanData currentChunkSpan = chunkSpan;
|
||||
chunkSpan =
|
||||
spans.stream()
|
||||
.filter(
|
||||
candidate ->
|
||||
candidate
|
||||
.getSpanId()
|
||||
.equals(currentChunkSpan.getParentSpanId()))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
if (chunkSpan != null) {
|
||||
// sort larger chunks first
|
||||
return 100 - childCount.get(chunkSpan);
|
||||
}
|
||||
throw new IllegalStateException("item spans should have a parent chunk span");
|
||||
}));
|
||||
|
||||
List<Consumer<SpanDataAssert>> assertions = new ArrayList<>();
|
||||
assertions.add(
|
||||
span -> span.hasName("BatchJob parallelItemsJob").hasKind(SpanKind.INTERNAL));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)));
|
||||
|
||||
// chunk 1, first two items; thread 1
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
for (int i = 3; i <= 4; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
}
|
||||
for (int i = 5; i <= 6; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
}
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
|
||||
// chunk 2, items 3 & 4; thread 2
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
for (int i = 9; i <= 10; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(8)));
|
||||
}
|
||||
for (int i = 11; i <= 12; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(8)));
|
||||
}
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(8)));
|
||||
|
||||
// chunk 3, 5th item; thread 1
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
// +1 for last read returning end of stream marker
|
||||
for (int i = 15; i <= 16; i++) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
}
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob parallelItemsJob.parallelItemsStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
|
||||
trace.hasSpansSatisfyingExactly(assertions);
|
||||
});
|
||||
}
|
||||
|
||||
protected void postProcessParallelItemsJob(String jobName, Job job) {
|
||||
if ("parallelItemsJob".equals(jobName)) {
|
||||
Step step = ((AbstractJob) job).getStep("parallelItemsStep");
|
||||
TaskletStep taskletStep = (TaskletStep) step;
|
||||
// explicitly set the number of chunks we expect from this test to ensure we always get
|
||||
// the same number of spans
|
||||
try {
|
||||
Field field = taskletStep.getClass().getDeclaredField("stepOperations");
|
||||
field.setAccessible(true);
|
||||
TaskExecutorRepeatTemplate stepOperations =
|
||||
(TaskExecutorRepeatTemplate) field.get(taskletStep);
|
||||
stepOperations.setCompletionPolicy(new SimpleCompletionPolicy(3));
|
||||
} catch (Exception e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.item;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
|
||||
class JavaConfigItemLevelSpanTest extends ItemLevelSpanTest {
|
||||
static JavaConfigItemLevelSpanTest instance;
|
||||
|
||||
@RegisterExtension
|
||||
static final ApplicationConfigRunner runner =
|
||||
new ApplicationConfigRunner(
|
||||
() -> new ClassPathXmlApplicationContext("spring-batch.xml"),
|
||||
(jobName, job) -> instance.postProcessParallelItemsJob(jobName, job));
|
||||
|
||||
JavaConfigItemLevelSpanTest() {
|
||||
super(runner);
|
||||
instance = this;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,135 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.item;
|
||||
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.JavaxBatchConfigRunner;
|
||||
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Consumer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
|
||||
class JsrConfigItemLevelSpanTest {
|
||||
|
||||
@RegisterExtension
|
||||
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
@RegisterExtension static final JavaxBatchConfigRunner runner = new JavaxBatchConfigRunner();
|
||||
|
||||
@Test
|
||||
void shouldTraceItemReadProcessAndWriteCalls() {
|
||||
runner.runJob("itemsAndTaskletJob");
|
||||
|
||||
testing.waitAndAssertTraces(
|
||||
trace -> {
|
||||
List<Consumer<SpanDataAssert>> assertions = new ArrayList<>();
|
||||
assertions.add(
|
||||
span -> span.hasName("BatchJob itemsAndTaskletJob").hasKind(SpanKind.INTERNAL));
|
||||
|
||||
// item step
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)));
|
||||
|
||||
// chunk 1, items 0-5
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
|
||||
for (int i = 3; i <= 11; i += 2) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
}
|
||||
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(2)));
|
||||
|
||||
// chunk 2, items 6-10
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
for (int i = 15; i <= 23; i += 2) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
}
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(14)));
|
||||
|
||||
// chunk 3, items 11-13
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.Chunk")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(1)));
|
||||
for (int i = 27; i <= 32; i += 2) {
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemRead")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(26)));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemProcess")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(26)));
|
||||
}
|
||||
|
||||
// last read returning end of stream marker
|
||||
assertions.add(span -> span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemRead"));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.itemStep.ItemWrite")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(26)));
|
||||
|
||||
// tasklet step
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.taskletStep")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(0)));
|
||||
assertions.add(
|
||||
span ->
|
||||
span.hasName("BatchJob itemsAndTaskletJob.taskletStep.Tasklet")
|
||||
.hasKind(SpanKind.INTERNAL)
|
||||
.hasParent(trace.getSpan(35)));
|
||||
|
||||
trace.hasSpansSatisfyingExactly(assertions);
|
||||
});
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.item;
|
||||
|
||||
import io.opentelemetry.javaagent.instrumentation.spring.batch.v3_0.runner.ApplicationConfigRunner;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext;
|
||||
|
||||
class XmlConfigItemLevelSpanTest extends ItemLevelSpanTest {
|
||||
|
||||
static XmlConfigItemLevelSpanTest instance;
|
||||
|
||||
@RegisterExtension
|
||||
static final ApplicationConfigRunner runner =
|
||||
new ApplicationConfigRunner(
|
||||
() -> new ClassPathXmlApplicationContext("spring-batch.xml"),
|
||||
(jobName, job) -> instance.postProcessParallelItemsJob(jobName, job));
|
||||
|
||||
XmlConfigItemLevelSpanTest() {
|
||||
super(runner);
|
||||
instance = this;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue