Separate out core reactor instrumentation (#650)

* Separate out core reactor instrumentation
This commit is contained in:
Anuraag Agrawal 2020-07-09 14:48:15 +09:00 committed by GitHub
parent c79be44de2
commit 2ef6aac61c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 535 additions and 29 deletions

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
plugins {
id "com.github.johnrengelman.shadow"
}
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
}
group = 'io.opentelemetry.instrumentation'
apply from: "$rootDir/gradle/java.gradle"
apply from: "$rootDir/gradle/publish.gradle"
apply plugin: 'org.unbroken-dome.test-sets'
testSets {
latestDepTest {
dirName = 'test'
}
}
dependencies {
api group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
implementation deps.opentelemetryApi
implementation deps.slf4j
testImplementation project(':testing')
latestDepTestImplementation group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestImplementation group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}
shadowJar {
archiveClassifier = 'agent'
configurations = []
relocate 'io.opentelemetry.instrumentation.reactor', 'io.opentelemetry.auto.instrumentation.reactor.shaded'
}

View File

@ -14,31 +14,39 @@
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.reactor;
package io.opentelemetry.instrumentation.reactor;
import io.grpc.Context;
import io.opentelemetry.OpenTelemetry;
import io.opentelemetry.context.ContextUtils;
import io.opentelemetry.context.Scope;
import io.opentelemetry.trace.Tracer;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Fuseable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.GroupedFlux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
public class TracingPublishers {
private static final Logger log = LoggerFactory.getLogger(TracingPublishers.class);
private static final Tracer TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.reactor");
/**
* Registers a hook that applies to every operator, propagating {@link Context} to downstream
* callbacks to ensure spans in the {@link Context} are available throughout the lifetime of a
* reactive stream. This should generally be called in a static initializer block in your
* application.
*/
public static void registerOnEachOperator() {
Hooks.onEachOperator(TracingPublishers.class.getName(), TracingPublishers::wrap);
}
/** Unregisters the hook registered by {@link #registerOnEachOperator()}. */
public static void resetOnEachOperator() {
Hooks.resetOnEachOperator(TracingPublishers.class.getName());
}
/**
* Instead of using {@link reactor.core.publisher.Operators#lift} (available in reactor 3.1) or

View File

@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.opentelemetry.auto.instrumentation.reactor;
package io.opentelemetry.instrumentation.reactor;
import io.opentelemetry.context.ContextUtils;
import io.opentelemetry.context.Scope;

View File

@ -0,0 +1,443 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.instrumentation.reactor
import io.opentelemetry.OpenTelemetry
import io.opentelemetry.auto.test.InstrumentationTestRunner
import io.opentelemetry.auto.test.utils.TraceUtils
import io.opentelemetry.trace.DefaultSpan
import io.opentelemetry.trace.Tracer
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import spock.lang.Shared
import java.time.Duration
import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
class ReactorCoreTest extends InstrumentationTestRunner {
public static final String EXCEPTION_MESSAGE = "test exception"
private static final Tracer TEST_TRACER =
OpenTelemetry.getTracerProvider().get("io.opentelemetry.auto.reactor")
def setupSpec() {
TracingPublishers.registerOnEachOperator()
}
def cleanupSpec() {
TracingPublishers.resetOnEachOperator()
}
@Shared
def addOne = { i ->
addOneFunc(i)
}
@Shared
def addTwo = { i ->
addTwoFunc(i)
}
@Shared
def throwException = {
throw new RuntimeException(EXCEPTION_MESSAGE)
}
def "Publisher '#name' test"() {
when:
def result = runUnderTrace(publisherSupplier)
then:
result == expected
and:
assertTraces(1) {
trace(0, workSpans + 2) {
span(0) {
operationName "trace-parent"
parent()
attributes {
}
}
span(1) {
operationName "publisher-parent"
childOf span(0)
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
for (int i = 0; i < workSpans; i++) {
span(i + 2) {
operationName "add one"
childOf span(1)
attributes {
}
}
}
}
}
where:
name | expected | workSpans | publisherSupplier
"basic mono" | 2 | 1 | { -> Mono.just(1).map(addOne) }
"two operations mono" | 4 | 2 | { -> Mono.just(2).map(addOne).map(addOne) }
"delayed mono" | 4 | 1 | { ->
Mono.just(3).delayElement(Duration.ofMillis(100)).map(addOne)
}
"delayed twice mono" | 6 | 2 | { ->
Mono.just(4).delayElement(Duration.ofMillis(100)).map(addOne).delayElement(Duration.ofMillis(100)).map(addOne)
}
"basic flux" | [6, 7] | 2 | { -> Flux.fromIterable([5, 6]).map(addOne) }
"two operations flux" | [8, 9] | 4 | { ->
Flux.fromIterable([6, 7]).map(addOne).map(addOne)
}
"delayed flux" | [8, 9] | 2 | { ->
Flux.fromIterable([7, 8]).delayElements(Duration.ofMillis(100)).map(addOne)
}
"delayed twice flux" | [10, 11] | 4 | { ->
Flux.fromIterable([8, 9]).delayElements(Duration.ofMillis(100)).map(addOne).delayElements(Duration.ofMillis(100)).map(addOne)
}
"mono from callable" | 12 | 2 | { ->
Mono.fromCallable({ addOneFunc(10) }).map(addOne)
}
}
def "Publisher error '#name' test"() {
when:
runUnderTrace(publisherSupplier)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "trace-parent"
errored true
parent()
attributes {
errorAttributes(RuntimeException, EXCEPTION_MESSAGE)
}
}
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 1, "publisher-parent", span(0))
}
}
where:
name | publisherSupplier
"mono" | { -> Mono.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"flux" | { -> Flux.error(new RuntimeException(EXCEPTION_MESSAGE)) }
}
def "Publisher step '#name' test"() {
when:
runUnderTrace(publisherSupplier)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
trace(0, workSpans + 2) {
span(0) {
operationName "trace-parent"
errored true
parent()
attributes {
errorAttributes(RuntimeException, EXCEPTION_MESSAGE)
}
}
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor integrations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 1, "publisher-parent", span(0))
for (int i = 0; i < workSpans; i++) {
span(i + 2) {
operationName "add one"
childOf span(1)
attributes {
}
}
}
}
}
where:
name | workSpans | publisherSupplier
"basic mono failure" | 1 | { -> Mono.just(1).map(addOne).map({ throwException() }) }
"basic flux failure" | 1 | { ->
Flux.fromIterable([5, 6]).map(addOne).map({ throwException() })
}
}
def "Publisher '#name' cancel"() {
when:
cancelUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
operationName "trace-parent"
parent()
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
}
}
where:
name | publisherSupplier
"basic mono" | { -> Mono.just(1) }
"basic flux" | { -> Flux.fromIterable([5, 6]) }
}
def "Publisher chain spans have the correct parent for '#name'"() {
when:
runUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, workSpans + 2) {
span(0) {
operationName "trace-parent"
parent()
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
for (int i = 0; i < workSpans; i++) {
span(i + 2) {
operationName "add one"
childOf span(1)
attributes {
}
}
}
}
}
where:
name | workSpans | publisherSupplier
"basic mono" | 3 | { ->
Mono.just(1).map(addOne).map(addOne).then(Mono.just(1).map(addOne))
}
"basic flux" | 5 | { ->
Flux.fromIterable([5, 6]).map(addOne).map(addOne).then(Mono.just(1).map(addOne))
}
}
def "Publisher chain spans have the correct parents from assembly time '#name'"() {
when:
runUnderTrace {
// The "add one" operations in the publisher created here should be children of the publisher-parent
Publisher<Integer> publisher = publisherSupplier()
def tracer = OpenTelemetry.getTracerProvider().get("test")
def intermediate = tracer.spanBuilder("intermediate").startSpan()
// After this activation, the "add two" operations below should be children of this span
def scope = tracer.withSpan(intermediate)
try {
if (publisher instanceof Mono) {
return ((Mono) publisher).map(addTwo)
} else if (publisher instanceof Flux) {
return ((Flux) publisher).map(addTwo)
}
throw new IllegalStateException("Unknown publisher type")
} finally {
intermediate.end()
scope.close()
}
}
then:
assertTraces(1) {
trace(0, (workItems * 2) + 3) {
span(0) {
operationName "trace-parent"
parent()
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
basicSpan(it, 2, "intermediate", span(1))
for (int i = 0; i < workItems; i++) {
span(3 + i) {
operationName "add two"
childOf span(2)
attributes {
}
}
}
for (int i = 0; i < workItems; i++) {
span(3 + workItems + i) {
operationName "add one"
childOf span(1)
attributes {
}
}
}
}
}
where:
name | workItems | publisherSupplier
"basic mono" | 1 | { -> Mono.just(1).map(addOne) }
"basic flux" | 2 | { -> Flux.fromIterable([1, 2]).map(addOne) }
}
def "Publisher chain spans can have the parent removed at assembly time '#name'"() {
when:
runUnderTrace {
// The operations in the publisher created here all end up children of the publisher-parent
Publisher<Integer> publisher = publisherSupplier()
// After this activation, all additions to the assembly will create new traces
def tracer = OpenTelemetry.getTracerProvider().get("test")
def scope = tracer.withSpan(DefaultSpan.getInvalid())
try {
if (publisher instanceof Mono) {
return ((Mono) publisher).map(addOne)
} else if (publisher instanceof Flux) {
return ((Flux) publisher).map(addOne)
}
throw new IllegalStateException("Unknown publisher type")
} finally {
scope.close()
}
}
then:
assertTraces(1 + workItems) {
trace(0, 2 + workItems) {
span(0) {
operationName "trace-parent"
parent()
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
for (int i = 0; i < workItems; i++) {
span(2 + i) {
operationName "add one"
childOf span(1)
attributes {
}
}
}
}
for (int i = 0; i < workItems; i++) {
trace(i + 1, 1) {
span(0) {
operationName "add one"
parent()
attributes {
}
}
}
}
}
where:
name | workItems | publisherSupplier
"basic mono" | 1 | { -> Mono.just(1).map(addOne) }
"basic flux" | 2 | { -> Flux.fromIterable([1, 2]).map(addOne) }
}
def runUnderTrace(def publisherSupplier) {
TraceUtils.runUnderTrace("trace-parent") {
def tracer = OpenTelemetry.getTracerProvider().get("test")
def span = tracer.spanBuilder("publisher-parent").startSpan()
def scope = tracer.withSpan(span)
try {
def publisher = publisherSupplier()
// Read all data from publisher
if (publisher instanceof Mono) {
return publisher.block()
} else if (publisher instanceof Flux) {
return publisher.toStream().toArray({ size -> new Integer[size] })
}
throw new RuntimeException("Unknown publisher: " + publisher)
} finally {
span.end()
scope.close()
}
}
}
def cancelUnderTrace(def publisherSupplier) {
TraceUtils.runUnderTrace("trace-parent") {
def tracer = OpenTelemetry.getTracerProvider().get("test")
def span = tracer.spanBuilder("publisher-parent").startSpan()
def scope = tracer.withSpan(span)
def publisher = publisherSupplier()
publisher.subscribe(new Subscriber<Integer>() {
void onSubscribe(Subscription subscription) {
subscription.cancel()
}
void onNext(Integer t) {
}
void onError(Throwable error) {
}
void onComplete() {
}
})
span.end()
scope.close()
}
}
static addOneFunc(int i) {
TEST_TRACER.spanBuilder("add one").startSpan().end()
return i + 1
}
static addTwoFunc(int i) {
TEST_TRACER.spanBuilder("add two").startSpan().end()
return i + 2
}
}

View File

@ -1,7 +1,6 @@
// Set properties before any plugins get loaded
ext {
minJavaVersionForTests = JavaVersion.VERSION_1_8
maxJavaVersionForTests = JavaVersion.VERSION_1_8
}
apply from: "$rootDir/gradle/instrumentation.gradle"
@ -23,9 +22,7 @@ testSets {
}
dependencies {
compileOnly group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
testImplementation project(':instrumentation:annotations')
implementation project(path: ':instrumentation-core:reactor-3.1', configuration: 'shadow')
testImplementation group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'

View File

@ -16,12 +16,12 @@
package io.opentelemetry.auto.instrumentation.reactor;
import io.opentelemetry.auto.instrumentation.reactor.shaded.TracingPublishers;
import net.bytebuddy.asm.Advice;
import reactor.core.publisher.Hooks;
public class ReactorHooksAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void postStaticInitializer() {
Hooks.onEachOperator(TracingPublishers.class.getName(), TracingPublishers::wrap);
TracingPublishers.registerOnEachOperator();
}
}

View File

@ -42,18 +42,18 @@ public final class ReactorHooksInstrumentation extends Instrumenter.Default {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".TracingPublishers",
packageName + ".TracingPublishers$MonoTracingPublisher",
packageName + ".TracingPublishers$ParallelFluxTracingPublisher",
packageName + ".TracingPublishers$ConnectableFluxTracingPublisher",
packageName + ".TracingPublishers$GroupedFluxTracingPublisher",
packageName + ".TracingPublishers$FluxTracingPublisher",
packageName + ".TracingPublishers$FuseableMonoTracingPublisher",
packageName + ".TracingPublishers$FuseableParallelFluxTracingPublisher",
packageName + ".TracingPublishers$FuseableConnectableFluxTracingPublisher",
packageName + ".TracingPublishers$FuseableGroupedFluxTracingPublisher",
packageName + ".TracingPublishers$FuseableFluxTracingPublisher",
packageName + ".TracingSubscriber"
packageName + ".shaded.TracingPublishers",
packageName + ".shaded.TracingPublishers$MonoTracingPublisher",
packageName + ".shaded.TracingPublishers$ParallelFluxTracingPublisher",
packageName + ".shaded.TracingPublishers$ConnectableFluxTracingPublisher",
packageName + ".shaded.TracingPublishers$GroupedFluxTracingPublisher",
packageName + ".shaded.TracingPublishers$FluxTracingPublisher",
packageName + ".shaded.TracingPublishers$FuseableMonoTracingPublisher",
packageName + ".shaded.TracingPublishers$FuseableParallelFluxTracingPublisher",
packageName + ".shaded.TracingPublishers$FuseableConnectableFluxTracingPublisher",
packageName + ".shaded.TracingPublishers$FuseableGroupedFluxTracingPublisher",
packageName + ".shaded.TracingPublishers$FuseableFluxTracingPublisher",
packageName + ".shaded.TracingSubscriber"
};
}

View File

@ -150,6 +150,7 @@ include ':instrumentation:vertx-3.0'
include ':instrumentation:vertx-reactive-3.5'
include ':instrumentation-core:aws-sdk:aws-sdk-2.2'
include ':instrumentation-core:reactor-3.1'
include ':instrumentation-core:servlet'
include ':instrumentation-core:spring:spring-web-3.1'

View File

@ -17,13 +17,14 @@ excludedClassesCoverage += [
dependencies {
api(project(path: ':opentelemetry-sdk-shaded-for-testing', configuration: 'shadow'))
api deps.guava
api deps.spock
implementation deps.opentelemetryApi
implementation deps.bytebuddy
implementation deps.bytebuddyagent
implementation deps.slf4j
implementation deps.spock
implementation deps.testLogging
implementation deps.guava
// okhttp 3.12.x is the last version to support Java7
api group: 'com.squareup.okhttp3', name: 'okhttp', version: '3.12.12'