Add tests that verify that Span.current() works in spring batch listeners (#3758)

This commit is contained in:
Mateusz Rzeszutek 2021-08-03 18:17:03 +02:00 committed by GitHub
parent 455574635f
commit f960456240
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 659 additions and 1 deletions

View File

@ -32,9 +32,10 @@ tasks {
val testItemLevelSpan by registering(Test::class) {
filter {
includeTestsMatching("*ItemLevelSpanTest")
includeTestsMatching("*CustomSpanEventTest")
isFailOnNoMatchingTests = false
}
include("**/*ItemLevelSpanTest.*")
include("**/*ItemLevelSpanTest.*", "**/*CustomSpanEventTest.*")
jvmArgs("-Dotel.instrumentation.spring-batch.item.enabled=true")
}
@ -44,11 +45,13 @@ tasks {
filter {
excludeTestsMatching("*ChunkRootSpanTest")
excludeTestsMatching("*ItemLevelSpanTest")
excludeTestsMatching("*CustomSpanEventTest")
isFailOnNoMatchingTests = false
}
}
withType<Test>().configureEach {
systemProperty("testLatestDeps", findProperty("testLatestDeps"))
jvmArgs("-Dotel.instrumentation.spring-batch.enabled=true")
}
}

View File

@ -0,0 +1,219 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static java.util.Collections.emptyMap
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import org.springframework.batch.core.JobParameter
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
abstract class CustomSpanEventTest extends AgentInstrumentationSpecification {
static final boolean VERSION_GREATER_THAN_4_0 = Boolean.getBoolean("testLatestDeps")
abstract runJob(String jobName, Map<String, JobParameter> params = emptyMap())
def "should be able to call Span.current() and add custom info to spans"() {
when:
runJob("customSpanEventsItemsJob")
then:
assertTraces(1) {
trace(0, 7) {
span(0) {
name "BatchJob customSpanEventsItemsJob"
kind INTERNAL
events(2)
event(0) {
eventName "job.before"
}
event(1) {
eventName "job.after"
}
}
span(1) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep"
kind INTERNAL
childOf span(0)
// CompositeChunkListener has broken ordering that causes listeners that do not override order() to appear first at all times
// because of that a custom ChunkListener will always see a Step span when using spring-batch versions [3, 4)
// that bug was fixed in 4.0
if (VERSION_GREATER_THAN_4_0) {
events(2)
event(0) {
eventName "step.before"
}
event(1) {
eventName "step.after"
}
} else {
events(4)
event(0) {
eventName "step.before"
}
event(1) {
eventName "chunk.before"
}
event(2) {
eventName "chunk.after"
}
event(3) {
eventName "step.after"
}
}
}
span(2) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.Chunk"
kind INTERNAL
childOf span(1)
// CompositeChunkListener has broken ordering that causes listeners that do not override order() to appear first at all times
// because of that a custom ChunkListener will always see a Step span when using spring-batch versions [3, 4)
// that bug was fixed in 4.0
if (VERSION_GREATER_THAN_4_0) {
events(2)
event(0) {
eventName "chunk.before"
}
event(1) {
eventName "chunk.after"
}
} else {
events(0)
}
}
itemSpans(it)
}
}
}
// Spring Batch Java & XML configs have slightly different ordering from JSR config
protected void itemSpans(TraceAssert trace) {
trace.with {
span(3) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
kind INTERNAL
childOf span(2)
events(2)
event(0) {
eventName "item.read.before"
}
event(1) {
eventName "item.read.after"
}
}
// second read that returns null and signifies end of stream
span(4) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
kind INTERNAL
childOf span(2)
// spring batch does not call ItemReadListener after() methods when read() returns end-of-stream
events(1)
event(0) {
eventName "item.read.before"
}
}
span(5) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemProcess"
kind INTERNAL
childOf span(2)
events(2)
event(0) {
eventName "item.process.before"
}
event(1) {
eventName "item.process.after"
}
}
span(6) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemWrite"
kind INTERNAL
childOf span(2)
events(2)
event(0) {
eventName "item.write.before"
}
event(1) {
eventName "item.write.after"
}
}
}
}
}
class JavaConfigCustomSpanEventTest extends CustomSpanEventTest implements ApplicationConfigTrait {
@Override
ConfigurableApplicationContext createApplicationContext() {
new AnnotationConfigApplicationContext(SpringBatchApplication)
}
}
class XmlConfigCustomSpanEventTest extends CustomSpanEventTest implements ApplicationConfigTrait {
@Override
ConfigurableApplicationContext createApplicationContext() {
new ClassPathXmlApplicationContext("spring-batch.xml")
}
}
class JsrConfigCustomSpanEventTest extends CustomSpanEventTest implements JavaxBatchConfigTrait {
// JSR config has different item span ordering
protected void itemSpans(TraceAssert trace) {
trace.with {
span(3) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
kind INTERNAL
childOf span(2)
events(2)
event(0) {
eventName "item.read.before"
}
event(1) {
eventName "item.read.after"
}
}
span(4) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemProcess"
kind INTERNAL
childOf span(2)
events(2)
event(0) {
eventName "item.process.before"
}
event(1) {
eventName "item.process.after"
}
}
// second read that returns null and signifies end of stream
span(5) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemRead"
kind INTERNAL
childOf span(2)
// spring batch does not call ItemReadListener after() methods when read() returns end-of-stream
events(1)
event(0) {
eventName "item.read.before"
}
}
span(6) {
name "BatchJob customSpanEventsItemsJob.customSpanEventsItemStep.ItemWrite"
kind INTERNAL
childOf span(2)
events(2)
event(0) {
eventName "item.write.before"
}
event(1) {
eventName "item.write.after"
}
}
}
}
}

View File

@ -23,6 +23,13 @@ import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.task.AsyncTaskExecutor
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import springbatch.CustomEventChunkListener
import springbatch.CustomEventItemProcessListener
import springbatch.CustomEventItemReadListener
import springbatch.CustomEventItemWriteListener
import springbatch.CustomEventJobListener
import springbatch.CustomEventStepListener
import springbatch.SingleItemReader
import springbatch.TestDecider
import springbatch.TestItemProcessor
import springbatch.TestItemReader
@ -270,4 +277,28 @@ class SpringBatchApplication {
ItemReader<String> partitionedItemReader() {
new TestPartitionedItemReader()
}
// custom span events items job
@Bean
Job customSpanEventsItemsJob() {
jobs.get("customSpanEventsItemsJob")
.start(customSpanEventsItemStep())
.listener(new CustomEventJobListener())
.build()
}
@Bean
Step customSpanEventsItemStep() {
steps.get("customSpanEventsItemStep")
.chunk(5)
.reader(new SingleItemReader())
.processor(itemProcessor())
.writer(itemWriter())
.listener(new CustomEventStepListener())
.listener(new CustomEventChunkListener())
.listener(new CustomEventItemReadListener())
.listener(new CustomEventItemProcessListener())
.listener(new CustomEventItemWriteListener())
.build()
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import io.opentelemetry.api.trace.Span
import javax.batch.api.chunk.listener.ChunkListener
class CustomEventChunkListener implements ChunkListener {
@Override
void beforeChunk() throws Exception {
Span.current().addEvent("chunk.before")
}
@Override
void onError(Exception e) throws Exception {
Span.current().addEvent("chunk.error")
}
@Override
void afterChunk() throws Exception {
Span.current().addEvent("chunk.after")
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import io.opentelemetry.api.trace.Span
import javax.batch.api.chunk.listener.ItemProcessListener
class CustomEventItemProcessListener implements ItemProcessListener {
@Override
void beforeProcess(Object o) throws Exception {
Span.current().addEvent("item.process.before")
}
@Override
void afterProcess(Object o, Object o1) throws Exception {
Span.current().addEvent("item.process.after")
}
@Override
void onProcessError(Object o, Exception e) throws Exception {
Span.current().addEvent("item.process.error")
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import io.opentelemetry.api.trace.Span
import javax.batch.api.chunk.listener.ItemReadListener
class CustomEventItemReadListener implements ItemReadListener {
@Override
void beforeRead() throws Exception {
Span.current().addEvent("item.read.before")
}
@Override
void afterRead(Object o) throws Exception {
Span.current().addEvent("item.read.after")
}
@Override
void onReadError(Exception e) throws Exception {
Span.current().addEvent("item.read.error")
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import io.opentelemetry.api.trace.Span
import javax.batch.api.chunk.listener.ItemWriteListener
class CustomEventItemWriteListener implements ItemWriteListener {
@Override
void beforeWrite(List<Object> list) throws Exception {
Span.current().addEvent("item.write.before")
}
@Override
void afterWrite(List<Object> list) throws Exception {
Span.current().addEvent("item.write.after")
}
@Override
void onWriteError(List<Object> list, Exception e) throws Exception {
Span.current().addEvent("item.write.error")
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import io.opentelemetry.api.trace.Span
import javax.batch.api.listener.JobListener
class CustomEventJobListener implements JobListener {
@Override
void beforeJob() throws Exception {
Span.current().addEvent("job.before")
}
@Override
void afterJob() throws Exception {
Span.current().addEvent("job.after")
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import io.opentelemetry.api.trace.Span
import javax.batch.api.listener.StepListener
class CustomEventStepListener implements StepListener {
@Override
void beforeStep() throws Exception {
Span.current().addEvent("step.before")
}
@Override
void afterStep() throws Exception {
Span.current().addEvent("step.after")
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package jsr
import java.util.concurrent.atomic.AtomicReference
import javax.batch.api.chunk.ItemReader
class SingleItemReader implements ItemReader {
final AtomicReference<String> item = new AtomicReference<>("42")
@Override
void open(Serializable serializable) throws Exception {
}
@Override
void close() throws Exception {
}
@Override
Object readItem() throws Exception {
return item.getAndSet(null)
}
@Override
Serializable checkpointInfo() throws Exception {
return null
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import io.opentelemetry.api.trace.Span
import org.springframework.batch.core.ChunkListener
import org.springframework.batch.core.scope.context.ChunkContext
class CustomEventChunkListener implements ChunkListener {
@Override
void beforeChunk(ChunkContext context) {
Span.current().addEvent("chunk.before")
}
@Override
void afterChunk(ChunkContext context) {
Span.current().addEvent("chunk.after")
}
@Override
void afterChunkError(ChunkContext context) {
Span.current().addEvent("chunk.error")
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import io.opentelemetry.api.trace.Span
import org.springframework.batch.core.ItemProcessListener
class CustomEventItemProcessListener implements ItemProcessListener<String, Integer> {
@Override
void beforeProcess(String item) {
Span.current().addEvent("item.process.before")
}
@Override
void afterProcess(String item, Integer result) {
Span.current().addEvent("item.process.after")
}
@Override
void onProcessError(String item, Exception e) {
Span.current().addEvent("item.process.error")
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import io.opentelemetry.api.trace.Span
import org.springframework.batch.core.ItemReadListener
class CustomEventItemReadListener implements ItemReadListener<String> {
@Override
void beforeRead() {
Span.current().addEvent("item.read.before")
}
@Override
void afterRead(String item) {
Span.current().addEvent("item.read.after")
}
@Override
void onReadError(Exception ex) {
Span.current().addEvent("item.read.error")
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import io.opentelemetry.api.trace.Span
import org.springframework.batch.core.ItemWriteListener
class CustomEventItemWriteListener implements ItemWriteListener<Integer> {
@Override
void beforeWrite(List<? extends Integer> items) {
Span.current().addEvent("item.write.before")
}
@Override
void afterWrite(List<? extends Integer> items) {
Span.current().addEvent("item.write.after")
}
@Override
void onWriteError(Exception exception, List<? extends Integer> items) {
Span.current().addEvent("item.write.error")
}
}

View File

@ -0,0 +1,22 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import io.opentelemetry.api.trace.Span
import org.springframework.batch.core.JobExecution
import org.springframework.batch.core.JobExecutionListener
class CustomEventJobListener implements JobExecutionListener {
@Override
void beforeJob(JobExecution jobExecution) {
Span.current().addEvent("job.before")
}
@Override
void afterJob(JobExecution jobExecution) {
Span.current().addEvent("job.after")
}
}

View File

@ -0,0 +1,24 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import io.opentelemetry.api.trace.Span
import org.springframework.batch.core.ExitStatus
import org.springframework.batch.core.StepExecution
import org.springframework.batch.core.StepExecutionListener
class CustomEventStepListener implements StepExecutionListener {
@Override
void beforeStep(StepExecution stepExecution) {
Span.current().addEvent("step.before")
}
@Override
ExitStatus afterStep(StepExecution stepExecution) {
Span.current().addEvent("step.after")
return null
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package springbatch
import java.util.concurrent.atomic.AtomicReference
import org.springframework.batch.item.ItemReader
class SingleItemReader implements ItemReader<String> {
final AtomicReference<String> item = new AtomicReference<>("42")
@Override
String read() {
return item.getAndSet(null)
}
}

View File

@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<job id="customSpanEventsItemsJob" xmlns="http://xmlns.jcp.org/xml/ns/javaee" version="1.0">
<listeners>
<listener ref="jsr.CustomEventJobListener"/>
</listeners>
<step id="customSpanEventsItemStep">
<listeners>
<listener ref="jsr.CustomEventStepListener"/>
<listener ref="jsr.CustomEventChunkListener"/>
<listener ref="jsr.CustomEventItemReadListener"/>
<listener ref="jsr.CustomEventItemProcessListener"/>
<listener ref="jsr.CustomEventItemWriteListener"/>
</listeners>
<chunk item-count="5">
<reader ref="jsr.SingleItemReader"/>
<processor ref="jsr.TestItemProcessor"/>
<writer ref="jsr.TestItemWriter"/>
</chunk>
</step>
</job>

View File

@ -93,6 +93,40 @@
<bean id="testPartitioner" class="springbatch.TestPartitioner"/>
<bean id="testPartitionedItemReader" class="springbatch.TestPartitionedItemReader"/>
<b:job id="customSpanEventsItemsJob">
<b:step id="customSpanEventsItemStep">
<b:tasklet>
<b:chunk commit-interval="5" processor="itemProcessor" writer="itemWriter">
<b:reader>
<bean class="springbatch.SingleItemReader"/>
</b:reader>
<b:listeners>
<b:listener>
<bean class="springbatch.CustomEventStepListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventChunkListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventItemReadListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventItemProcessListener"/>
</b:listener>
<b:listener>
<bean class="springbatch.CustomEventItemWriteListener"/>
</b:listener>
</b:listeners>
</b:chunk>
</b:tasklet>
</b:step>
<b:listeners>
<b:listener>
<bean class="springbatch.CustomEventJobListener"/>
</b:listener>
</b:listeners>
</b:job>
<bean id="itemReader" class="springbatch.TestItemReader"/>
<bean id="syncItemReader" class="springbatch.TestSyncItemReader">
<constructor-arg name="max" value="5"/>

View File

@ -46,6 +46,11 @@ class SpanAssert {
assertDefaults()
}
void events(int expectedCount) {
assert span.totalRecordedEvents == expectedCount
assert span.events.size() == expectedCount
}
void event(int index, @ClosureParams(value = SimpleType, options = ['io.opentelemetry.instrumentation.test.asserts.EventAssert']) @DelegatesTo(value = EventAssert, strategy = Closure.DELEGATE_FIRST) Closure spec) {
if (index >= span.events.size()) {
throw new ArrayIndexOutOfBoundsException(index)