diff --git a/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixObservableChainTest.groovy b/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixObservableChainTest.groovy deleted file mode 100644 index 02a2957c2f..0000000000 --- a/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixObservableChainTest.groovy +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import com.netflix.hystrix.HystrixCommandProperties -import com.netflix.hystrix.HystrixObservableCommand -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import rx.Observable -import rx.schedulers.Schedulers - -import static com.netflix.hystrix.HystrixCommandGroupKey.Factory.asKey - -class HystrixObservableChainTest extends AgentInstrumentationSpecification { - - def "test command #action"() { - setup: - - def result = runWithSpan("parent") { - def val = new HystrixObservableCommand(setter("ExampleGroup")) { - private String tracedMethod() { - runWithSpan("tracedMethod") {} - return "Hello" - } - - @Override - protected Observable construct() { - Observable.defer { - Observable.just(tracedMethod()) - } - .subscribeOn(Schedulers.immediate()) - } - }.toObservable() - .subscribeOn(Schedulers.io()) - .map { - it.toUpperCase() - }.flatMap { str -> - new HystrixObservableCommand(setter("OtherGroup")) { - private String anotherTracedMethod() { - runWithSpan("anotherTracedMethod") {} - return "$str!" - } - - @Override - protected Observable construct() { - Observable.defer { - Observable.just(anotherTracedMethod()) - } - .subscribeOn(Schedulers.computation()) - } - }.toObservable() - .subscribeOn(Schedulers.trampoline()) - }.toBlocking().first() - return val - } - - expect: - result == "HELLO!" - - assertTraces(1) { - trace(0, 5) { - span(0) { - name "parent" - hasNoParent() - attributes { - } - } - span(1) { - name "ExampleGroup.HystrixObservableChainTest\$1.execute" - childOf span(0) - attributes { - "hystrix.command" "HystrixObservableChainTest\$1" - "hystrix.group" "ExampleGroup" - "hystrix.circuit_open" false - } - } - span(2) { - name "tracedMethod" - childOf span(1) - attributes { - } - } - span(3) { - name "OtherGroup.HystrixObservableChainTest\$2.execute" - childOf span(1) - attributes { - "hystrix.command" "HystrixObservableChainTest\$2" - "hystrix.group" "OtherGroup" - "hystrix.circuit_open" false - } - } - span(4) { - name "anotherTracedMethod" - childOf span(3) - attributes { - } - } - } - } - } - - def setter(String key) { - def setter = new HystrixObservableCommand.Setter(asKey(key)) - setter.andCommandPropertiesDefaults(new HystrixCommandProperties.Setter() - .withExecutionTimeoutInMilliseconds(10_000)) - return setter - } -} diff --git a/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixObservableTest.groovy b/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixObservableTest.groovy deleted file mode 100644 index aa6506ce72..0000000000 --- a/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixObservableTest.groovy +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import com.netflix.hystrix.HystrixCommandProperties -import com.netflix.hystrix.HystrixObservable -import com.netflix.hystrix.HystrixObservableCommand -import com.netflix.hystrix.exception.HystrixRuntimeException -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import rx.Observable -import rx.schedulers.Schedulers - -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue - -import static com.netflix.hystrix.HystrixCommandGroupKey.Factory.asKey -import static io.opentelemetry.api.trace.StatusCode.ERROR - -class HystrixObservableTest extends AgentInstrumentationSpecification { - - def "test command #action"() { - setup: - def observeOnFn = observeOn - def subscribeOnFn = subscribeOn - def result = runWithSpan("parent") { - def val = operation new HystrixObservableCommand(setter("ExampleGroup")) { - private String tracedMethod() { - runWithSpan("tracedMethod") {} - return "Hello!" - } - - @Override - protected Observable construct() { - def obs = Observable.defer { - Observable.just(tracedMethod()).repeat(1) - } - if (observeOnFn) { - obs = obs.observeOn(observeOnFn) - } - if (subscribeOnFn) { - obs = obs.subscribeOn(subscribeOnFn) - } - return obs - } - } - return val - } - - expect: - result == "Hello!" - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - attributes { - } - } - span(1) { - name "ExampleGroup.HystrixObservableTest\$1.execute" - childOf span(0) - attributes { - "hystrix.command" "HystrixObservableTest\$1" - "hystrix.group" "ExampleGroup" - "hystrix.circuit_open" false - } - } - span(2) { - name "tracedMethod" - childOf span(1) - attributes { - } - } - } - } - - where: - action | observeOn | subscribeOn | operation - "toObservable" | null | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "observe" | null | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "toObservable block" | Schedulers.computation() | Schedulers.newThread() | { HystrixObservable cmd -> - BlockingQueue queue = new LinkedBlockingQueue() - def subscription = cmd.toObservable().subscribe { next -> - queue.put(next) - } - def val = queue.take() - subscription.unsubscribe() - return val - } - } - - def "test command #action fallback"() { - setup: - def observeOnFn = observeOn - def subscribeOnFn = subscribeOn - def result = runWithSpan("parent") { - def val = operation new HystrixObservableCommand(setter("ExampleGroup")) { - @Override - protected Observable construct() { - def err = Observable.defer { - Observable.error(new IllegalArgumentException()).repeat(1) - } - if (observeOnFn) { - err = err.observeOn(observeOnFn) - } - if (subscribeOnFn) { - err = err.subscribeOn(subscribeOnFn) - } - return err - } - - protected Observable resumeWithFallback() { - return Observable.just("Fallback!").repeat(1) - } - } - return val - } - - expect: - result == "Fallback!" - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - attributes { - } - } - span(1) { - name "ExampleGroup.HystrixObservableTest\$2.execute" - childOf span(0) - status ERROR - errorEvent(IllegalArgumentException) - attributes { - "hystrix.command" "HystrixObservableTest\$2" - "hystrix.group" "ExampleGroup" - "hystrix.circuit_open" false - } - } - span(2) { - name "ExampleGroup.HystrixObservableTest\$2.fallback" - childOf span(1) - attributes { - "hystrix.command" "HystrixObservableTest\$2" - "hystrix.group" "ExampleGroup" - "hystrix.circuit_open" false - } - } - } - } - - where: - action | observeOn | subscribeOn | operation - "toObservable" | null | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "observe" | null | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "toObservable block" | null | null | { HystrixObservable cmd -> - BlockingQueue queue = new LinkedBlockingQueue() - def subscription = cmd.toObservable().subscribe { next -> - queue.put(next) - } - def val = queue.take() - subscription.unsubscribe() - return val - } - } - - def "test no fallback results in error for #action"() { - setup: - def observeOnFn = observeOn - def subscribeOnFn = subscribeOn - - when: - runWithSpan("parent") { - operation new HystrixObservableCommand(setter("FailingGroup")) { - - @Override - protected Observable construct() { - def err = Observable.defer { - Observable.error(new IllegalArgumentException()) - } - if (observeOnFn) { - err = err.observeOn(observeOnFn) - } - if (subscribeOnFn) { - err = err.subscribeOn(subscribeOnFn) - } - return err - } - } - } - - then: - def err = thrown HystrixRuntimeException - err.cause instanceof IllegalArgumentException - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - status ERROR - errorEvent(HystrixRuntimeException, "HystrixObservableTest\$3 failed and no fallback available.") - } - span(1) { - name "FailingGroup.HystrixObservableTest\$3.execute" - childOf span(0) - status ERROR - errorEvent(IllegalArgumentException) - attributes { - "hystrix.command" "HystrixObservableTest\$3" - "hystrix.group" "FailingGroup" - "hystrix.circuit_open" false - } - } - span(2) { - name "FailingGroup.HystrixObservableTest\$3.fallback" - childOf span(1) - status ERROR - errorEvent(UnsupportedOperationException, "No fallback available.") - attributes { - "hystrix.command" "HystrixObservableTest\$3" - "hystrix.group" "FailingGroup" - "hystrix.circuit_open" false - } - } - } - } - - where: - action | observeOn | subscribeOn | operation - "toObservable" | null | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "toObservable+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() } - "observe" | null | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "observe+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() } - "toObservable block" | Schedulers.computation() | Schedulers.newThread() | { HystrixObservable cmd -> - def queue = new LinkedBlockingQueue() - def subscription = cmd.toObservable().subscribe({ next -> - queue.put(new Exception("Unexpectedly got a next")) - }, { next -> - queue.put(next) - }) - Throwable ex = queue.take() - subscription.unsubscribe() - throw ex - } - } - - def setter(String key) { - def setter = new HystrixObservableCommand.Setter(asKey(key)) - setter.andCommandPropertiesDefaults(new HystrixCommandProperties.Setter() - .withExecutionTimeoutInMilliseconds(10_000)) - return setter - } -} diff --git a/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixTest.groovy b/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixTest.groovy deleted file mode 100644 index f4451da267..0000000000 --- a/instrumentation/hystrix-1.4/javaagent/src/test/groovy/HystrixTest.groovy +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import com.netflix.hystrix.HystrixCommand -import com.netflix.hystrix.HystrixCommandProperties -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification - -import java.util.concurrent.BlockingQueue -import java.util.concurrent.LinkedBlockingQueue - -import static com.netflix.hystrix.HystrixCommandGroupKey.Factory.asKey -import static io.opentelemetry.api.trace.StatusCode.ERROR - -class HystrixTest extends AgentInstrumentationSpecification { - - def "test command #action"() { - setup: - def command = new HystrixCommand(setter("ExampleGroup")) { - @Override - protected String run() throws Exception { - return tracedMethod() - } - - private String tracedMethod() { - runWithSpan("tracedMethod") {} - return "Hello!" - } - } - def result = runWithSpan("parent") { - operation(command) - } - expect: - result == "Hello!" - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - attributes { - } - } - span(1) { - name "ExampleGroup.HystrixTest\$1.execute" - childOf span(0) - attributes { - "hystrix.command" "HystrixTest\$1" - "hystrix.group" "ExampleGroup" - "hystrix.circuit_open" false - } - } - span(2) { - name "tracedMethod" - childOf span(1) - attributes { - } - } - } - } - - where: - action | operation - "execute" | { HystrixCommand cmd -> cmd.execute() } - "queue" | { HystrixCommand cmd -> cmd.queue().get() } - "toObservable" | { HystrixCommand cmd -> cmd.toObservable().toBlocking().first() } - "observe" | { HystrixCommand cmd -> cmd.observe().toBlocking().first() } - "observe block" | { HystrixCommand cmd -> - BlockingQueue queue = new LinkedBlockingQueue() - cmd.observe().subscribe { next -> - queue.put(next) - } - queue.take() - } - } - - def "test command #action fallback"() { - setup: - def command = new HystrixCommand(setter("ExampleGroup")) { - @Override - protected String run() throws Exception { - throw new IllegalArgumentException() - } - - protected String getFallback() { - return "Fallback!" - } - } - def result = runWithSpan("parent") { - operation(command) - } - expect: - result == "Fallback!" - - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - hasNoParent() - attributes { - } - } - span(1) { - name "ExampleGroup.HystrixTest\$2.execute" - childOf span(0) - status ERROR - errorEvent(IllegalArgumentException) - attributes { - "hystrix.command" "HystrixTest\$2" - "hystrix.group" "ExampleGroup" - "hystrix.circuit_open" false - } - } - span(2) { - name "ExampleGroup.HystrixTest\$2.fallback" - childOf span(1) - attributes { - "hystrix.command" "HystrixTest\$2" - "hystrix.group" "ExampleGroup" - "hystrix.circuit_open" false - } - } - } - } - - where: - action | operation - "execute" | { HystrixCommand cmd -> cmd.execute() } - "queue" | { HystrixCommand cmd -> cmd.queue().get() } - "toObservable" | { HystrixCommand cmd -> cmd.toObservable().toBlocking().first() } - "observe" | { HystrixCommand cmd -> cmd.observe().toBlocking().first() } - "observe block" | { HystrixCommand cmd -> - BlockingQueue queue = new LinkedBlockingQueue() - cmd.observe().subscribe { next -> - queue.put(next) - } - queue.take() - } - } - - def setter(String key) { - def setter = new HystrixCommand.Setter(asKey(key)) - setter.andCommandPropertiesDefaults(new HystrixCommandProperties.Setter() - .withExecutionTimeoutInMilliseconds(10_000)) - return setter - } -} diff --git a/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixObservableChainTest.java b/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixObservableChainTest.java new file mode 100644 index 0000000000..89eef213fb --- /dev/null +++ b/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixObservableChainTest.java @@ -0,0 +1,123 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.hystrix; + +import static io.opentelemetry.api.common.AttributeKey.booleanKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static org.assertj.core.api.Assertions.assertThat; + +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixObservableCommand; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import rx.Observable; +import rx.schedulers.Schedulers; + +class HystrixObservableChainTest { + + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @Test + @SuppressWarnings("RxReturnValueIgnored") + void testCommand() { + + class TestCommand extends HystrixObservableCommand { + protected TestCommand(Setter setter) { + super(setter); + } + + private String tracedMethod() { + testing.runWithSpan("tracedMethod", () -> {}); + return "Hello"; + } + + @Override + protected Observable construct() { + return Observable.defer(() -> Observable.just(tracedMethod())) + .subscribeOn(Schedulers.immediate()); + } + } + + class AnotherTestCommand extends HystrixObservableCommand { + private final String str; + + protected AnotherTestCommand(Setter setter, String str) { + super(setter); + this.str = str; + } + + private String anotherTracedMethod() { + testing.runWithSpan("anotherTracedMethod", () -> {}); + return str + "!"; + } + + @Override + protected Observable construct() { + return Observable.defer(() -> Observable.just(anotherTracedMethod())) + .subscribeOn(Schedulers.computation()); + } + } + + String result = + testing.runWithSpan( + "parent", + () -> + new TestCommand(setter("ExampleGroup")) + .toObservable() + .subscribeOn(Schedulers.io()) + .map(String::toUpperCase) + .flatMap( + str -> + new AnotherTestCommand(setter("OtherGroup"), str) + .toObservable() + .subscribeOn(Schedulers.trampoline())) + .toBlocking() + .first()); + + assertThat(result).isEqualTo("HELLO!"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()), + span -> + span.hasName("ExampleGroup.TestCommand.execute") + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "ExampleGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)), + span -> + span.hasName("tracedMethod") + .hasParent(trace.getSpan(1)) + .hasAttributes(Attributes.empty()), + span -> + span.hasName("OtherGroup.AnotherTestCommand.execute") + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "AnotherTestCommand"), + equalTo(stringKey("hystrix.group"), "OtherGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)), + span -> + span.hasName("anotherTracedMethod") + .hasParent(trace.getSpan(3)) + .hasAttributes(Attributes.empty()))); + } + + private static HystrixObservableCommand.Setter setter(String key) { + HystrixObservableCommand.Setter setter = + HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key)); + setter.andCommandPropertiesDefaults( + HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10_000)); + return setter; + } +} diff --git a/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixObservableTest.java b/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixObservableTest.java new file mode 100644 index 0000000000..f5d4299ada --- /dev/null +++ b/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixObservableTest.java @@ -0,0 +1,488 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.hystrix; + +import static io.opentelemetry.api.common.AttributeKey.booleanKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_MESSAGE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_STACKTRACE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_TYPE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.catchException; +import static org.junit.jupiter.api.Named.named; + +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandProperties; +import com.netflix.hystrix.HystrixObservableCommand; +import com.netflix.hystrix.exception.HystrixRuntimeException; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Function; +import java.util.stream.Stream; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import rx.Observable; +import rx.Scheduler; +import rx.Subscription; +import rx.schedulers.Schedulers; + +class HystrixObservableTest { + + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @ParameterizedTest + @MethodSource("provideCommandActionArguments") + void testCommands(Parameter parameter) { + + class TestCommand extends HystrixObservableCommand { + protected TestCommand(Setter setter) { + super(setter); + } + + private String tracedMethod() { + testing.runWithSpan("tracedMethod", () -> {}); + return "Hello!"; + } + + @Override + protected Observable construct() { + Observable obs = Observable.defer(() -> Observable.just(tracedMethod()).repeat(1)); + if (parameter.observeOn != null) { + obs = obs.observeOn(parameter.observeOn); + } + if (parameter.subscribeOn != null) { + obs = obs.subscribeOn(parameter.subscribeOn); + } + return obs; + } + } + + String result = + testing.runWithSpan( + "parent", + () -> { + HystrixObservableCommand val = new TestCommand(setter("ExampleGroup")); + return parameter.operation.apply(val); + }); + + assertThat(result).isEqualTo("Hello!"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()), + span -> + span.hasName("ExampleGroup.TestCommand.execute") + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "ExampleGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)), + span -> + span.hasName("tracedMethod") + .hasParent(trace.getSpan(1)) + .hasAttributes(Attributes.empty()))); + } + + private static Stream baseArguments() { + return Stream.of( + Arguments.of( + named( + "toObservable", + new Parameter(null, null, cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable-I", + new Parameter( + Schedulers.immediate(), null, cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable-T", + new Parameter( + Schedulers.trampoline(), + null, + cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable-C", + new Parameter( + Schedulers.computation(), + null, + cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable-IO", + new Parameter( + Schedulers.io(), null, cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable-NT", + new Parameter( + Schedulers.newThread(), null, cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable+I", + new Parameter( + null, Schedulers.immediate(), cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable+T", + new Parameter( + null, + Schedulers.trampoline(), + cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable+C", + new Parameter( + null, + Schedulers.computation(), + cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable+IO", + new Parameter( + null, Schedulers.io(), cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named( + "toObservable+NT", + new Parameter( + null, Schedulers.newThread(), cmd -> cmd.toObservable().toBlocking().first()))), + Arguments.of( + named("observe", new Parameter(null, null, cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe-I", + new Parameter( + Schedulers.immediate(), null, cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe-T", + new Parameter( + Schedulers.trampoline(), null, cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe-C", + new Parameter( + Schedulers.computation(), null, cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe-IO", + new Parameter(Schedulers.io(), null, cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe-NT", + new Parameter( + Schedulers.newThread(), null, cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe+I", + new Parameter( + null, Schedulers.immediate(), cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe+T", + new Parameter( + null, Schedulers.trampoline(), cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe+C", + new Parameter( + null, Schedulers.computation(), cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe+IO", + new Parameter(null, Schedulers.io(), cmd -> cmd.observe().toBlocking().first()))), + Arguments.of( + named( + "observe+NT", + new Parameter( + null, Schedulers.newThread(), cmd -> cmd.observe().toBlocking().first())))); + } + + private static Stream provideCommandActionArguments() { + return Stream.concat( + baseArguments(), + Stream.of( + Arguments.of( + named( + "toObservable block", + new Parameter( + Schedulers.computation(), + Schedulers.newThread(), + cmd -> { + BlockingQueue queue = new LinkedBlockingQueue<>(); + Subscription subscription = + cmd.toObservable() + .subscribe( + next -> { + try { + queue.put(next); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + String returnValue; + try { + returnValue = queue.take(); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + subscription.unsubscribe(); + return returnValue; + }))))); + } + + @ParameterizedTest + @MethodSource("provideCommandFallbackArguments") + void testCommandFallbacks(Parameter parameter) { + + class TestCommand extends HystrixObservableCommand { + protected TestCommand(Setter setter) { + super(setter); + } + + @Override + protected Observable construct() { + Observable err = + Observable.defer(() -> Observable.error(new IllegalArgumentException())); + if (parameter.observeOn != null) { + err = err.observeOn(parameter.observeOn); + } + if (parameter.subscribeOn != null) { + err = err.subscribeOn(parameter.subscribeOn); + } + return err; + } + + @Override + protected Observable resumeWithFallback() { + return Observable.just("Fallback!").repeat(1); + } + } + + String result = + testing.runWithSpan( + "parent", + () -> { + HystrixObservableCommand val = new TestCommand(setter("ExampleGroup")); + return parameter.operation.apply(val); + }); + + assertThat(result).isEqualTo("Fallback!"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()), + span -> + span.hasName("ExampleGroup.TestCommand.execute") + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("exception") + .hasAttributesSatisfyingExactly( + equalTo( + EXCEPTION_TYPE, "java.lang.IllegalArgumentException"), + satisfies( + EXCEPTION_STACKTRACE, + val -> val.isInstanceOf(String.class)))), + span -> + span.hasName("ExampleGroup.TestCommand.fallback") + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "ExampleGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)))); + } + + private static Stream provideCommandFallbackArguments() { + return Stream.concat( + baseArguments(), + Stream.of( + Arguments.of( + named( + "toObservable block", + new Parameter( + null, + null, + cmd -> { + BlockingQueue queue = new LinkedBlockingQueue<>(); + Subscription subscription = + cmd.toObservable() + .subscribe( + next -> { + try { + queue.put(next); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + String returnValue; + try { + returnValue = queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + subscription.unsubscribe(); + return returnValue; + }))))); + } + + @ParameterizedTest + @MethodSource("provideCommandNoFallbackResultsInErrorArguments") + void testNoFallbackResultsInErrorForAction(Parameter parameter) { + + class TestCommand extends HystrixObservableCommand { + protected TestCommand(Setter setter) { + super(setter); + } + + @Override + protected Observable construct() { + Observable err = + Observable.defer(() -> Observable.error(new IllegalArgumentException())); + if (parameter.observeOn != null) { + err = err.observeOn(parameter.observeOn); + } + if (parameter.subscribeOn != null) { + err = err.subscribeOn(parameter.subscribeOn); + } + return err; + } + } + + Throwable exception = + catchException( + () -> + testing.runWithSpan( + "parent", + () -> { + HystrixObservableCommand val = + new TestCommand(setter("FailingGroup")); + return parameter.operation.apply(val); + })); + + assertThat(exception).hasCauseInstanceOf(IllegalArgumentException.class); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("parent") + .hasNoParent() + .hasStatus(StatusData.error()) + .hasException(exception), + span -> + span.hasName("FailingGroup.TestCommand.execute") + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasException(exception.getCause()) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "FailingGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)), + span -> + span.hasName("FailingGroup.TestCommand.fallback") + .hasParent(trace.getSpan(1)) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("exception") + .hasAttributesSatisfyingExactly( + equalTo( + EXCEPTION_TYPE, + "java.lang.UnsupportedOperationException"), + satisfies( + EXCEPTION_STACKTRACE, + val -> val.isInstanceOf(String.class)), + equalTo(EXCEPTION_MESSAGE, "No fallback available."))) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "FailingGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)))); + } + + private static Stream provideCommandNoFallbackResultsInErrorArguments() { + return Stream.concat( + baseArguments(), + Stream.of( + Arguments.of( + named( + "toObservable block", + new Parameter( + Schedulers.computation(), + Schedulers.newThread(), + cmd -> { + BlockingQueue queue = new LinkedBlockingQueue<>(); + Subscription subscription = + cmd.toObservable() + .subscribe( + next -> { + try { + queue.put(new Exception("Unexpectedly got a next")); + } catch (InterruptedException e) { + throw new IllegalArgumentException(e); + } + }, + next -> { + try { + queue.put(next); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + Throwable returnValue; + try { + returnValue = queue.take(); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + subscription.unsubscribe(); + try { + throw returnValue; + } catch (Throwable e) { + throw (HystrixRuntimeException) e; + } + }))))); + } + + private static HystrixObservableCommand.Setter setter(String key) { + HystrixObservableCommand.Setter setter = + HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key)); + setter.andCommandPropertiesDefaults( + HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10_000)); + return setter; + } + + private static class Parameter { + public final Scheduler observeOn; + public final Scheduler subscribeOn; + public final Function, String> operation; + + public Parameter( + Scheduler observeOn, + Scheduler subscribeOn, + Function, String> operation) { + this.observeOn = observeOn; + this.subscribeOn = subscribeOn; + this.operation = operation; + } + } +} diff --git a/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixTest.java b/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixTest.java new file mode 100644 index 0000000000..77cb6c0025 --- /dev/null +++ b/instrumentation/hystrix-1.4/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/hystrix/HystrixTest.java @@ -0,0 +1,192 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.hystrix; + +import static io.opentelemetry.api.common.AttributeKey.booleanKey; +import static io.opentelemetry.api.common.AttributeKey.stringKey; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_STACKTRACE; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_TYPE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Named.named; + +import com.netflix.hystrix.HystrixCommand; +import com.netflix.hystrix.HystrixCommandGroupKey; +import com.netflix.hystrix.HystrixCommandProperties; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.StatusData; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Function; +import java.util.stream.Stream; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +class HystrixTest { + + @RegisterExtension + protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + @ParameterizedTest + @MethodSource("provideCommandActionArguments") + void testCommands(Function, String> operation) { + class TestCommand extends HystrixCommand { + protected TestCommand(Setter setter) { + super(setter); + } + + @Override + protected String run() throws Exception { + return tracedMethod(); + } + + private String tracedMethod() { + testing.runWithSpan("tracedMethod", () -> {}); + return "Hello!"; + } + } + + HystrixCommand command = new TestCommand(setter()); + + String result = testing.runWithSpan("parent", () -> operation.apply(command)); + assertThat(result).isEqualTo("Hello!"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()), + span -> + span.hasName("ExampleGroup.TestCommand.execute") + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "ExampleGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)), + span -> + span.hasName("tracedMethod") + .hasParent(trace.getSpan(1)) + .hasAttributes(Attributes.empty()))); + } + + @ParameterizedTest + @MethodSource("provideCommandActionArguments") + void testCommandFallbacks(Function, String> operation) { + class TestCommand extends HystrixCommand { + protected TestCommand(Setter setter) { + super(setter); + } + + @Override + protected String run() throws Exception { + throw new IllegalArgumentException(); + } + + @Override + protected String getFallback() { + return "Fallback!"; + } + } + + HystrixCommand command = new TestCommand(setter()); + + String result = testing.runWithSpan("parent", () -> operation.apply(command)); + assertThat(result).isEqualTo("Fallback!"); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()), + span -> + span.hasName("ExampleGroup.TestCommand.execute") + .hasParent(trace.getSpan(0)) + .hasStatus(StatusData.error()) + .hasEventsSatisfyingExactly( + event -> + event + .hasName("exception") + .hasAttributesSatisfyingExactly( + equalTo( + EXCEPTION_TYPE, "java.lang.IllegalArgumentException"), + satisfies( + EXCEPTION_STACKTRACE, + val -> val.isInstanceOf(String.class)))) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "ExampleGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)), + span -> + span.hasName("ExampleGroup.TestCommand.fallback") + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + equalTo(stringKey("hystrix.command"), "TestCommand"), + equalTo(stringKey("hystrix.group"), "ExampleGroup"), + equalTo(booleanKey("hystrix.circuit_open"), false)))); + } + + private static Stream provideCommandActionArguments() { + return Stream.of( + Arguments.of( + named("execute", (Function, String>) HystrixCommand::execute)), + Arguments.of( + named( + "queue", + (Function, String>) + cmd -> { + try { + return cmd.queue().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + })), + Arguments.of( + named( + "toObservable", + (Function, String>) + cmd -> cmd.toObservable().toBlocking().first())), + Arguments.of( + named( + "observe", + (Function, String>) + cmd -> cmd.observe().toBlocking().first())), + Arguments.of( + named( + "observe block", + (Function, String>) + cmd -> { + BlockingQueue queue = new LinkedBlockingQueue<>(); + cmd.observe() + .subscribe( + next -> { + try { + queue.put(next); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + String returnValue; + try { + returnValue = queue.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return returnValue; + }))); + } + + private static HystrixCommand.Setter setter() { + HystrixCommand.Setter setter = + HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); + setter.andCommandPropertiesDefaults( + HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10_000)); + return setter; + } +}