Move reactor library instrumentation from instrumentation-core to instrumentation (#2359)

This commit is contained in:
Anuraag Agrawal 2021-02-21 09:37:21 +09:00 committed by GitHub
parent 79fa9e0816
commit 87e1ae6724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 163 additions and 446 deletions

View File

@ -1,17 +0,0 @@
This product contains a modified part of OpenTracing:
* License:
Copyright 2018 The OpenTracing Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.
* Homepage: https://github.com/opentracing-contrib/java-reactor

View File

@ -1,28 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
apply from: "$rootDir/gradle/instrumentation-library.gradle"
// need to override archivesBaseName set in instrumentation-library.gradle
archivesBaseName = "reactor-3.1"
dependencies {
library group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}

View File

@ -10,10 +10,12 @@ muzzle {
}
dependencies {
implementation project(':instrumentation-core:reactor-3.1')
implementation project(':instrumentation:reactor-3.1:library')
testLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
testImplementation project(':instrumentation:reactor-3.1:testing')
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+'

View File

@ -3,354 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
import io.opentelemetry.instrumentation.reactor.AbstractReactorCoreTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.TraceUtils
import java.time.Duration
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import spock.lang.Shared
class ReactorCoreTest extends AgentInstrumentationSpecification {
public static final String EXCEPTION_MESSAGE = "test exception"
@Shared
def addOne = { i ->
addOneFunc(i)
}
@Shared
def addTwo = { i ->
addTwoFunc(i)
}
@Shared
def throwException = {
throw new RuntimeException(EXCEPTION_MESSAGE)
}
def "Publisher '#name' test"() {
when:
def result = runUnderTrace(publisherSupplier)
then:
result == expected
and:
assertTraces(1) {
trace(0, workSpans + 2) {
span(0) {
name "trace-parent"
hasNoParent()
attributes {
}
}
span(1) {
name "publisher-parent"
childOf span(0)
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
for (int i = 0; i < workSpans; i++) {
span(i + 2) {
name "add one"
childOf span(1)
attributes {
}
}
}
}
}
where:
paramName | expected | workSpans | publisherSupplier
"basic mono" | 2 | 1 | { -> Mono.just(1).map(addOne) }
"two operations mono" | 4 | 2 | { -> Mono.just(2).map(addOne).map(addOne) }
"delayed mono" | 4 | 1 | { ->
Mono.just(3).delayElement(Duration.ofMillis(100)).map(addOne)
}
"delayed twice mono" | 6 | 2 | { ->
Mono.just(4).delayElement(Duration.ofMillis(100)).map(addOne).delayElement(Duration.ofMillis(100)).map(addOne)
}
"basic flux" | [6, 7] | 2 | { -> Flux.fromIterable([5, 6]).map(addOne) }
"two operations flux" | [8, 9] | 4 | { ->
Flux.fromIterable([6, 7]).map(addOne).map(addOne)
}
"delayed flux" | [8, 9] | 2 | { ->
Flux.fromIterable([7, 8]).delayElements(Duration.ofMillis(100)).map(addOne)
}
"delayed twice flux" | [10, 11] | 4 | { ->
Flux.fromIterable([8, 9]).delayElements(Duration.ofMillis(100)).map(addOne).delayElements(Duration.ofMillis(100)).map(addOne)
}
"mono from callable" | 12 | 2 | { ->
Mono.fromCallable({ addOneFunc(10) }).map(addOne)
}
}
def "Publisher error '#name' test"() {
when:
runUnderTrace(publisherSupplier)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "trace-parent"
errored true
errorEvent(RuntimeException, EXCEPTION_MESSAGE)
hasNoParent()
}
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor instrumentations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 1, "publisher-parent", span(0))
}
}
where:
paramName | publisherSupplier
"mono" | { -> Mono.error(new RuntimeException(EXCEPTION_MESSAGE)) }
"flux" | { -> Flux.error(new RuntimeException(EXCEPTION_MESSAGE)) }
}
def "Publisher step '#name' test"() {
when:
runUnderTrace(publisherSupplier)
then:
def exception = thrown RuntimeException
exception.message == EXCEPTION_MESSAGE
and:
assertTraces(1) {
trace(0, workSpans + 2) {
span(0) {
name "trace-parent"
errored true
errorEvent(RuntimeException, EXCEPTION_MESSAGE)
hasNoParent()
}
// It's important that we don't attach errors at the Reactor level so that we don't
// impact the spans on reactor instrumentations such as netty and lettuce, as reactor is
// more of a context propagation mechanism than something we would be tracking for
// errors this is ok.
basicSpan(it, 1, "publisher-parent", span(0))
for (int i = 0; i < workSpans; i++) {
span(i + 2) {
name "add one"
childOf span(1)
attributes {
}
}
}
}
}
where:
paramName | workSpans | publisherSupplier
"basic mono failure" | 1 | { -> Mono.just(1).map(addOne).map({ throwException() }) }
"basic flux failure" | 1 | { ->
Flux.fromIterable([5, 6]).map(addOne).map({ throwException() })
}
}
def "Publisher '#name' cancel"() {
when:
cancelUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "trace-parent"
hasNoParent()
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
}
}
where:
paramName | publisherSupplier
"basic mono" | { -> Mono.just(1) }
"basic flux" | { -> Flux.fromIterable([5, 6]) }
}
def "Publisher chain spans have the correct parent for '#name'"() {
when:
runUnderTrace(publisherSupplier)
then:
assertTraces(1) {
trace(0, workSpans + 2) {
span(0) {
name "trace-parent"
hasNoParent()
attributes {
}
}
basicSpan(it, 1, "publisher-parent", span(0))
for (int i = 0; i < workSpans; i++) {
span(i + 2) {
name "add one"
childOf span(1)
attributes {
}
}
}
}
}
where:
paramName | workSpans | publisherSupplier
"basic mono" | 3 | { ->
Mono.just(1).map(addOne).map(addOne).then(Mono.just(1).map(addOne))
}
"basic flux" | 5 | { ->
Flux.fromIterable([5, 6]).map(addOne).map(addOne).then(Mono.just(1).map(addOne))
}
}
def "Publisher chain spans have the correct parents from subscription time"() {
when:
def mono = Mono.just(42)
.map(addOne)
.map(addTwo)
TraceUtils.runUnderTrace("trace-parent") {
mono.block()
}
then:
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "trace-parent")
basicSpan(it, 1, "add one", span(0))
basicSpan(it, 2, "add two", span(0))
}
}
}
def "Publisher chain spans have the correct parents from subscription time '#name'"() {
when:
runUnderTrace {
// The "add one" operations in the publisher created here should be children of the publisher-parent
Publisher<Integer> publisher = publisherSupplier()
def tracer = GlobalOpenTelemetry.getTracer("test")
def intermediate = tracer.spanBuilder("intermediate").startSpan()
// After this activation, the "add two" operations below should be children of this span
def scope = Context.current().with(intermediate).makeCurrent()
try {
if (publisher instanceof Mono) {
return ((Mono) publisher).map(addTwo)
} else if (publisher instanceof Flux) {
return ((Flux) publisher).map(addTwo)
}
throw new IllegalStateException("Unknown publisher type")
} finally {
intermediate.end()
scope.close()
}
}
then:
assertTraces(1) {
trace(0, (workItems * 2) + 3) {
basicSpan(it, 0, "trace-parent")
basicSpan(it, 1, "publisher-parent", span(0))
basicSpan(it, 2, "intermediate", span(1))
for (int i = 0; i < 2 * workItems; i = i + 2) {
basicSpan(it, 3 + i, "add one", span(1))
basicSpan(it, 3 + i + 1, "add two", span(1))
}
}
}
where:
paramName | workItems | publisherSupplier
"basic mono" | 1 | { -> Mono.just(1).map(addOne) }
"basic flux" | 2 | { -> Flux.fromIterable([1, 2]).map(addOne) }
}
def runUnderTrace(def publisherSupplier) {
TraceUtils.runUnderTrace("trace-parent") {
def tracer = GlobalOpenTelemetry.getTracer("test")
def span = tracer.spanBuilder("publisher-parent").startSpan()
def scope = Context.current().with(span).makeCurrent()
try {
def publisher = publisherSupplier()
// Read all data from publisher
if (publisher instanceof Mono) {
return publisher.block()
} else if (publisher instanceof Flux) {
return publisher.toStream().toArray({ size -> new Integer[size] })
}
throw new RuntimeException("Unknown publisher: " + publisher)
} finally {
span.end()
scope.close()
}
}
}
def cancelUnderTrace(def publisherSupplier) {
TraceUtils.runUnderTrace("trace-parent") {
def tracer = GlobalOpenTelemetry.getTracer("test")
def span = tracer.spanBuilder("publisher-parent").startSpan()
def scope = Context.current().with(span).makeCurrent()
def publisher = publisherSupplier()
publisher.subscribe(new Subscriber<Integer>() {
void onSubscribe(Subscription subscription) {
subscription.cancel()
}
void onNext(Integer t) {
}
void onError(Throwable error) {
}
void onComplete() {
}
})
span.end()
scope.close()
}
}
static addOneFunc(int i) {
runInternalSpan("add one")
return i + 1
}
static addTwoFunc(int i) {
runInternalSpan("add two")
return i + 2
}
class ReactorCoreTest extends AbstractReactorCoreTest implements AgentTestTrait {
}

View File

@ -3,45 +3,8 @@
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.instrumentation.reactor.AbstractSubscriptionTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.util.concurrent.CountDownLatch
import reactor.core.publisher.Mono
class SubscriptionTest extends AgentInstrumentationSpecification {
def "subscription test"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Mono<Connection> connection = Mono.create {
it.success(new Connection())
}
connection.subscribe {
it.query()
latch.countDown()
}
}
latch.await()
then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "Connection.query", span(0))
}
}
}
static class Connection {
static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
span.end()
return new Random().nextInt()
}
}
class SubscriptionTest extends AbstractSubscriptionTest implements AgentTestTrait {
}

View File

@ -0,0 +1,11 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"
dependencies {
library group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
testImplementation project(':instrumentation:reactor-3.1:testing')
latestDepTestLibrary group: 'io.projectreactor', name: 'reactor-core', version: '3.+'
// Looks like later versions on reactor need this dependency for some reason even though it is marked as optional.
latestDepTestLibrary group: 'io.micrometer', name: 'micrometer-core', version: '1.+'
}

View File

@ -3,6 +3,23 @@
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2013-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.instrumentation.reactor;
import io.opentelemetry.context.Context;

View File

@ -3,6 +3,21 @@
* SPDX-License-Identifier: Apache-2.0
*/
// Includes work from:
/*
* Copyright 2018 The OpenTracing Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.opentelemetry.instrumentation.reactor;
import io.opentelemetry.context.Scope;

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.reactor
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class ReactorCoreTest extends AbstractReactorCoreTest implements LibraryTestTrait {
def setupSpec() {
TracingOperator.registerOnEachOperator()
}
def cleanupSpec() {
TracingOperator.resetOnEachOperator()
}
}

View File

@ -0,0 +1,18 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.reactor
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class SubscriptionTest extends AbstractSubscriptionTest implements LibraryTestTrait {
def setupSpec() {
TracingOperator.registerOnEachOperator()
}
def cleanupSpec() {
TracingOperator.resetOnEachOperator()
}
}

View File

@ -0,0 +1,11 @@
apply from: "$rootDir/gradle/java.gradle"
dependencies {
api project(':testing-common')
api group: 'io.projectreactor', name: 'reactor-core', version: '3.1.0.RELEASE'
implementation deps.groovy
implementation deps.opentelemetryApi
implementation deps.spock
}

View File

@ -4,13 +4,18 @@
*/
package io.opentelemetry.instrumentation.reactor
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.test.LibraryInstrumentationSpecification
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.TraceUtils
import java.time.Duration
import org.reactivestreams.Publisher
@ -22,18 +27,10 @@ import spock.lang.Shared
import spock.lang.Unroll
@Unroll
class ReactorCoreTest extends LibraryInstrumentationSpecification {
abstract class AbstractReactorCoreTest extends InstrumentationSpecification {
public static final String EXCEPTION_MESSAGE = "test exception"
def setupSpec() {
TracingOperator.registerOnEachOperator()
}
def cleanupSpec() {
TracingOperator.resetOnEachOperator()
}
@Shared
def addOne = { i ->
addOneFunc(i)

View File

@ -0,0 +1,55 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.reactor
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import java.util.concurrent.CountDownLatch
import reactor.core.publisher.Mono
abstract class AbstractSubscriptionTest extends InstrumentationSpecification {
def "subscription test"() {
when:
CountDownLatch latch = new CountDownLatch(1)
runUnderTrace("parent") {
Mono<Connection> connection = Mono.create {
it.success(new Connection())
}
connection.subscribe {
it.query()
latch.countDown()
}
}
latch.await()
then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
basicSpan(it, 1, "Connection.query", span(0))
}
}
}
static class Connection {
static int query() {
def span = GlobalOpenTelemetry.getTracer("test").spanBuilder("Connection.query").startSpan()
span.end()
return new Random().nextInt()
}
}
}

View File

@ -190,6 +190,8 @@ include ':instrumentation:play-ws:play-ws-testing'
include ':instrumentation:rabbitmq-2.7:javaagent'
include ':instrumentation:ratpack-1.4:javaagent'
include ':instrumentation:reactor-3.1:javaagent'
include ':instrumentation:reactor-3.1:library'
include ':instrumentation:reactor-3.1:testing'
include ':instrumentation:reactor-netty-0.9:javaagent'
include ':instrumentation:reactor-netty-1.0:javaagent'
include ':instrumentation:rediscala-1.8:javaagent'
@ -230,7 +232,6 @@ include ':instrumentation:vertx-web-3.0'
include ':instrumentation:vertx-reactive-3.5:javaagent'
include ':instrumentation:wicket-8.0:javaagent'
include ':instrumentation-core:reactor-3.1'
include ':instrumentation-core:servlet-2.2'
// benchmark