From 7e88009bc43259d09dc506bcc49b9964ba58558c Mon Sep 17 00:00:00 2001 From: Dan Rusei <31576500+danrusei@users.noreply.github.com> Date: Tue, 20 Apr 2021 21:54:20 +0300 Subject: [PATCH] adding otlp http transport, using proto binary (#516) --- Cargo.toml | 1 + examples/basic-otlp-http/Cargo.toml | 12 ++ examples/basic-otlp-http/Dockerfile | 6 + examples/basic-otlp-http/README.md | 24 ++++ examples/basic-otlp-http/docker-compose.yaml | 37 +++++ .../otel-collector-config.yaml | 35 +++++ examples/basic-otlp-http/src/main.rs | 48 +++++++ opentelemetry-otlp/Cargo.toml | 11 ++ opentelemetry-otlp/build.rs | 35 +++++ opentelemetry-otlp/src/lib.rs | 114 ++++++++++++++- opentelemetry-otlp/src/proto.rs | 53 ++++++- opentelemetry-otlp/src/span.rs | 135 +++++++++++++++++- opentelemetry-otlp/src/transform/common.rs | 79 ++++++++++ opentelemetry-otlp/src/transform/traces.rs | 129 +++++++++++++++++ scripts/lint.sh | 5 + 15 files changed, 713 insertions(+), 11 deletions(-) create mode 100644 examples/basic-otlp-http/Cargo.toml create mode 100644 examples/basic-otlp-http/Dockerfile create mode 100644 examples/basic-otlp-http/README.md create mode 100644 examples/basic-otlp-http/docker-compose.yaml create mode 100644 examples/basic-otlp-http/otel-collector-config.yaml create mode 100644 examples/basic-otlp-http/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 22010cdf3..c2b8907e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "examples/basic", "examples/basic-otlp", "examples/basic-otlp-with-selector", + "examples/basic-otlp-http", "examples/datadog", "examples/external-otlp-tonic-tokio", "examples/grpc", diff --git a/examples/basic-otlp-http/Cargo.toml b/examples/basic-otlp-http/Cargo.toml new file mode 100644 index 000000000..eaa104de1 --- /dev/null +++ b/examples/basic-otlp-http/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "basic-otlp-http" +version = "0.1.0" +authors = ["rdan "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio", "metrics", "serialize"] } +opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["http-proto", "reqwest-client"] } +tokio = { version = "1.0", features = ["full"] } diff --git a/examples/basic-otlp-http/Dockerfile b/examples/basic-otlp-http/Dockerfile new file mode 100644 index 000000000..f88c276a5 --- /dev/null +++ b/examples/basic-otlp-http/Dockerfile @@ -0,0 +1,6 @@ +FROM rust:1.51 +COPY . /usr/src/basic-otlp-http/ +WORKDIR /usr/src/basic-otlp-http/ +RUN cargo build --release +RUN cargo install --path . +CMD ["/usr/local/cargo/bin/basic-otlp-http"] diff --git a/examples/basic-otlp-http/README.md b/examples/basic-otlp-http/README.md new file mode 100644 index 000000000..a9d335326 --- /dev/null +++ b/examples/basic-otlp-http/README.md @@ -0,0 +1,24 @@ +* The application send data directly to a Collector (port 55681) +* Run the application locally, to run as a docker container you have to change the relative paths from the `Cargo.toml` +* The Collector then sends the data to the appropriate backend, in this case JAEGER + +This demo uses `docker-compose` and by default runs against the `otel/opentelemetry-collector-dev:latest` image. + +```shell +docker-compose up +or +docker-compose up -d +``` + +In another terminal run the application `cargo run` + +Use the browser to see the trace: +- Jaeger at http://0.0.0.0:16686 + +Tear it down: + +```shell +docker-compose down +``` + + diff --git a/examples/basic-otlp-http/docker-compose.yaml b/examples/basic-otlp-http/docker-compose.yaml new file mode 100644 index 000000000..058a437b5 --- /dev/null +++ b/examples/basic-otlp-http/docker-compose.yaml @@ -0,0 +1,37 @@ +version: "2" +services: + + # Jaeger + jaeger-all-in-one: + image: jaegertracing/all-in-one:latest + ports: + - "16686:16686" + - "14268" + - "14250" + + # Collector + otel-collector: + image: otel/opentelemetry-collector-dev:latest + command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"] + volumes: + - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml + ports: + - "1888:1888" # pprof extension + - "13133:13133" # health_check extension + - "4317" # OTLP gRPC receiver + - "55681:55681" # OTLP HTTP receiver + - "55670:55679" # zpages extension + depends_on: + - jaeger-all-in-one + + + # metrics-rust: + # build: + # dockerfile: $PWD/Dockerfile + # context: ./basic-otlp-http + # environment: + # - OTLP_TONIC_ENDPOINT=otel-collector:4317 + # depends_on: + # - otel-collector + + diff --git a/examples/basic-otlp-http/otel-collector-config.yaml b/examples/basic-otlp-http/otel-collector-config.yaml new file mode 100644 index 000000000..1a822eee7 --- /dev/null +++ b/examples/basic-otlp-http/otel-collector-config.yaml @@ -0,0 +1,35 @@ +receivers: + otlp: + protocols: + http: + grpc: + +exporters: + logging: + loglevel: debug + + jaeger: + endpoint: jaeger-all-in-one:14250 + insecure: true + +processors: + batch: + +extensions: + health_check: + pprof: + endpoint: :1888 + zpages: + endpoint: :55679 + +service: + extensions: [pprof, zpages, health_check] + pipelines: + traces: + receivers: [otlp] + processors: [batch] + exporters: [logging, jaeger] + metrics: + receivers: [otlp] + processors: [batch] + exporters: [logging] diff --git a/examples/basic-otlp-http/src/main.rs b/examples/basic-otlp-http/src/main.rs new file mode 100644 index 000000000..7844c7bf4 --- /dev/null +++ b/examples/basic-otlp-http/src/main.rs @@ -0,0 +1,48 @@ +use opentelemetry::trace::TraceError; +use opentelemetry::{global, sdk::trace as sdktrace}; +use opentelemetry::{ + trace::{TraceContextExt, Tracer}, + Key, +}; +use std::error::Error; +use std::time::Duration; + +fn init_tracer() -> Result { + opentelemetry_otlp::new_pipeline() + .with_endpoint("http://localhost:55681/v1/traces") + .with_http() + .install_batch(opentelemetry::runtime::Tokio) +} + +const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons"); +const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another"); + +#[tokio::main] +async fn main() -> Result<(), Box> { + let _ = init_tracer()?; + + let tracer = global::tracer("ex.com/basic"); + + tracer.in_span("operation", |cx| { + let span = cx.span(); + span.add_event( + "Nice operation!".to_string(), + vec![Key::new("bogons").i64(100)], + ); + span.set_attribute(ANOTHER_KEY.string("yes")); + + tracer.in_span("Sub operation...", |cx| { + let span = cx.span(); + span.set_attribute(LEMONS_KEY.string("five")); + + span.add_event("Sub span event".to_string(), vec![]); + }); + }); + + // wait for 1 minutes so that we could see metrics being pushed via OTLP every 10 seconds. + tokio::time::sleep(Duration::from_secs(60)).await; + + global::shutdown_tracer_provider(); + + Ok(()) +} diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index a147fcc63..62db5a354 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -44,6 +44,10 @@ protobuf = { version = "2.18", optional = true } thiserror = "1.0" tonic = { version = "0.4", optional = true } tokio = { version = "1.0", features = ["full"], optional = true } +opentelemetry-http = { version = "0.2", path = "../opentelemetry-http", optional = true } +reqwest = { version = "0.11", optional = true, default-features = false } +surf = { version = "2.0", optional = true } +http = "0.2" [dev-dependencies] chrono = "0.4" @@ -64,5 +68,12 @@ openssl = ["grpcio/openssl"] openssl-vendored = ["grpcio/openssl-vendored"] integration-testing = ["tonic", "tonic-build", "prost", "tokio/full", "opentelemetry/trace"] +http-proto = ["prost-build", "prost", "opentelemetry-http"] +reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"] +reqwest-client = ["reqwest", "opentelemetry-http/reqwest"] +reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"] +surf-client = ["surf", "opentelemetry-http/surf"] + [build-dependencies] tonic-build = { version = "0.4", optional = true } +prost-build = {version = "0.7", optional = true } diff --git a/opentelemetry-otlp/build.rs b/opentelemetry-otlp/build.rs index 78723a1a1..6364809fd 100644 --- a/opentelemetry-otlp/build.rs +++ b/opentelemetry-otlp/build.rs @@ -3,12 +3,22 @@ // // Grpc related files used by grpcio are maintained at src/proto/grpcio. tests/grpc_build.rs makes // sure they are up to date. +#[cfg(any(feature = "tonic", feature = "http-proto"))] +use std::path::PathBuf; + fn main() { #[cfg(feature = "tonic")] + { + let out_dir = PathBuf::from( + std::env::var("OUT_DIR").expect("OUT_DIR should be set by cargo but can't find"), + ) + .join("tonic"); + std::fs::create_dir_all(out_dir.clone()).expect("cannot create output dir"); tonic_build::configure() .build_server(std::env::var_os("CARGO_FEATURE_INTEGRATION_TESTING").is_some()) .build_client(true) .format(false) + .out_dir(out_dir) .compile( &[ "src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto", @@ -22,4 +32,29 @@ fn main() { &["src/proto/opentelemetry-proto"], ) .expect("Error generating protobuf"); + } + + #[cfg(feature = "http-proto")] + { + let out_dir = PathBuf::from( + std::env::var("OUT_DIR").expect("OUT_DIR should be set by cargo but can't find"), + ) + .join("prost"); + std::fs::create_dir_all(out_dir.clone()).expect("cannot create output dir"); + prost_build::Config::new() + .out_dir(out_dir) + .compile_protos( + &[ + "src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace_config.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto", + "src/proto/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto", + ], + &["src/proto/opentelemetry-proto"], + ) + .expect("Error generating protobuf"); + } } diff --git a/opentelemetry-otlp/src/lib.rs b/opentelemetry-otlp/src/lib.rs index d5d40b1b5..97b136d83 100644 --- a/opentelemetry-otlp/src/lib.rs +++ b/opentelemetry-otlp/src/lib.rs @@ -148,7 +148,7 @@ use opentelemetry::{global, runtime::Runtime, sdk, trace::TracerProvider}; -#[cfg(feature = "grpc-sys")] +#[cfg(any(feature = "grpc-sys", feature = "http-proto"))] use std::collections::HashMap; use std::str::FromStr; @@ -187,12 +187,18 @@ pub use crate::span::TonicConfig; #[cfg(feature = "grpc-sys")] pub use crate::span::GrpcioConfig; +#[cfg(feature = "http-proto")] +pub use crate::span::HttpConfig; + #[cfg(feature = "metrics")] pub use crate::metric::{new_metrics_pipeline, MetricsExporter, OtlpMetricPipelineBuilder}; #[cfg(feature = "grpc-sys")] pub use crate::span::{Compression, Credentials}; +#[cfg(feature = "http-proto")] +use opentelemetry_http::HttpClient; + use opentelemetry::sdk::export::ExportError; use opentelemetry::trace::TraceError; @@ -308,6 +314,17 @@ impl OtlpPipelineBuilder { grpcio_config: GrpcioConfig::default(), } } + + /// Use HTTP as transport layer, return a `HttpPipelineBuilder` to config the http transport + /// and build the exporter + #[cfg(feature = "http-proto")] + pub fn with_http(self) -> HttpPipelineBuilder { + HttpPipelineBuilder { + exporter_config: self.exporter_config, + trace_config: self.trace_config, + http_config: HttpConfig::default(), + } + } } /// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol. @@ -481,6 +498,69 @@ impl GrpcioPipelineBuilder { } } +/// Build a trace exporter that uses HTTP transport and opentelemetry protocol. +/// +/// It provides methods to config http client. The methods can be chained. The exporter can be built by calling +/// [`install_simple`] or [`install_batch`]. +/// +/// `HttpPipelineBuilder` can be constructed by calling [`with_http`] function in [`OtlpPipelineBuilder`] +/// +/// [`with_http`]: crate::OtlpPipelineBuilder::with_http +/// [`OtlpPipelineBuilder`]: crate::OtlpPipelineBuilder +/// [`install_simple`]: crate::HttpPipelineBuilder::install_simple +/// [`install_batch`]: crate::HttpPipelineBuilder::install_batch +#[derive(Default, Debug)] +#[cfg(feature = "http-proto")] +pub struct HttpPipelineBuilder { + exporter_config: ExporterConfig, + http_config: HttpConfig, + trace_config: Option, +} + +#[cfg(feature = "http-proto")] +impl HttpPipelineBuilder { + /// Assign client implementation + pub fn with_http_client(mut self, client: T) -> Self { + self.http_config.client = Some(Box::new(client)); + self + } + + /// Set additional headers to send to the collector. + pub fn with_headers(mut self, headers: HashMap) -> Self { + self.http_config.headers = Some(headers); + self + } + + /// Install a trace exporter using HTTP as transport layer and a simple span processor. + /// + /// Returns a [`Tracer`] with the name `opentelemetry-otlp` and current crate version. + /// + /// `install_simple` will panic if not called within a tokio runtime + /// + /// [`Tracer`]: opentelemetry::trace::Tracer + pub fn install_simple(self) -> Result { + let exporter = TraceExporter::new_http(self.exporter_config, self.http_config)?; + Ok(build_simple_with_exporter(exporter, self.trace_config)) + } + + /// Install a trace exporter using HTTP as transport layer and a batch span processor using the + /// specified runtime. + /// + /// Returns a [`Tracer`] with the name `opentelemetry-otlp` and current crate version. + /// + /// `install_batch` will panic if not called within a tokio runtime + /// + /// [`Tracer`]: opentelemetry::trace::Tracer + pub fn install_batch(self, runtime: R) -> Result { + let exporter = TraceExporter::new_http(self.exporter_config, self.http_config)?; + Ok(build_batch_with_exporter( + exporter, + self.trace_config, + runtime, + )) + } +} + fn build_simple_with_exporter( exporter: TraceExporter, trace_config: Option, @@ -520,9 +600,9 @@ pub enum Error { Transport(#[from] tonic::transport::Error), /// Error from tonic::codegen::http::uri::InvalidUri - #[cfg(feature = "tonic")] + #[cfg(any(feature = "tonic", feature = "http-proto"))] #[error("invalid URI {0}")] - InvalidUri(#[from] tonic::codegen::http::uri::InvalidUri), + InvalidUri(#[from] http::uri::InvalidUri), /// Error from tonic::Status #[cfg(feature = "tonic")] @@ -534,6 +614,31 @@ pub enum Error { #[error("grpcio error {0}")] Grpcio(#[from] grpcio::Error), + /// Http requests failed + #[cfg(feature = "http-proto")] + #[error("No Http Client, you must select one")] + NoHttpClient, + + /// Http requests failed + #[cfg(feature = "http-proto")] + #[error("http request failed with {0}")] + RequestFailed(#[from] http::Error), + + /// Invalid Header Value + #[cfg(feature = "http-proto")] + #[error("http header value error {0}")] + InvalidHeaderValue(#[from] http::header::InvalidHeaderValue), + + /// Invalid Header Name + #[cfg(feature = "http-proto")] + #[error("http header name error {0}")] + InvalidHeaderName(#[from] http::header::InvalidHeaderName), + + /// Prost encode failed + #[cfg(feature = "http-proto")] + #[error("prost encoding error {0}")] + EncodeError(#[from] prost::EncodeError), + /// The lock in exporters has been poisoned. #[cfg(feature = "metrics")] #[error("the lock of the {0} has been poisoned")] @@ -553,7 +658,8 @@ pub enum Protocol { Grpc, // TODO add support for other protocols // HttpJson, - // HttpProto, + /// HTTP protocol with binary protobuf + HttpBinary, } #[cfg(test)] diff --git a/opentelemetry-otlp/src/proto.rs b/opentelemetry-otlp/src/proto.rs index 8bfd1e5d3..1ad70c4ec 100644 --- a/opentelemetry-otlp/src/proto.rs +++ b/opentelemetry-otlp/src/proto.rs @@ -2,13 +2,13 @@ pub mod collector { pub mod metrics { pub mod v1 { - tonic::include_proto!("opentelemetry.proto.collector.metrics.v1"); + include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.collector.metrics.v1.rs")); } } pub mod trace { pub mod v1 { - tonic::include_proto!("opentelemetry.proto.collector.trace.v1"); + include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.collector.trace.v1.rs")); } } } @@ -16,28 +16,69 @@ pub mod collector { #[cfg(feature = "tonic")] pub mod common { pub mod v1 { - tonic::include_proto!("opentelemetry.proto.common.v1"); + include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.common.v1.rs")); } } #[cfg(feature = "tonic")] pub mod metrics { pub mod v1 { - tonic::include_proto!("opentelemetry.proto.metrics.v1"); + include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.metrics.v1.rs")); } } #[cfg(feature = "tonic")] pub mod resource { pub mod v1 { - tonic::include_proto!("opentelemetry.proto.resource.v1"); + include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.resource.v1.rs")); } } #[cfg(feature = "tonic")] pub mod trace { pub mod v1 { - tonic::include_proto!("opentelemetry.proto.trace.v1"); + include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.trace.v1.rs")); + } +} + +#[cfg(feature="http-proto")] +pub mod prost { + pub mod collector { + pub mod metrics { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.collector.metrics.v1.rs")); + } + } + + pub mod trace { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.collector.trace.v1.rs")); + } + } + } + + pub mod common { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.common.v1.rs")); + } + } + + pub mod metrics { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.metrics.v1.rs")); + } + } + + pub mod resource { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.resource.v1.rs")); + } + } + + pub mod trace { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.trace.v1.rs")); + } } } diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index 0b0355b33..a9a95f96c 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -8,6 +8,9 @@ use crate::proto::collector::trace::v1::{ ExportTraceServiceRequest as TonicRequest, }; +#[cfg(feature = "http-proto")] +use crate::proto::prost::collector::trace::v1::ExportTraceServiceRequest as ProstRequest; + #[cfg(feature = "tonic")] use tonic::{ metadata::{KeyAndValueRef, MetadataMap}, @@ -33,9 +36,14 @@ use grpcio::{ #[cfg(feature = "grpc-sys")] use protobuf::RepeatedField; +#[cfg(feature = "http-proto")] +use prost::Message; +#[cfg(feature = "http-proto")] +use std::convert::TryFrom; + use async_trait::async_trait; -#[cfg(feature = "grpc-sys")] +#[cfg(any(feature = "grpc-sys", feature = "http-proto"))] use std::collections::HashMap; use std::fmt; @@ -48,6 +56,14 @@ use crate::{Protocol, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_TI use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use std::time::Duration; +#[cfg(feature = "http-proto")] +use http::{ + header::{HeaderName, HeaderValue, CONTENT_TYPE}, + Method, Uri, +}; +#[cfg(feature = "http-proto")] +use opentelemetry_http::HttpClient; + /// Exporter that sends data in OTLP format. pub enum TraceExporter { #[cfg(feature = "tonic")] @@ -70,6 +86,18 @@ pub enum TraceExporter { /// The Grpc trace exporter trace_exporter: GrpcioTraceServiceClient, }, + #[cfg(feature = "http-proto")] + /// Trace Exporter using HTTP transport + Http { + /// Duration of timeout when sending spans to backend. + timeout: Duration, + /// Additional headers of the outbound requests. + headers: Option>, + /// The Collector URL + collector_endpoint: Uri, + /// The HTTP trace exporter + trace_exporter: Option>, + }, } /// Configuration for the OTLP exporter. @@ -170,6 +198,46 @@ impl From for grpcio::CompressionAlgorithms { } } +/// Configuration of the http transport +#[cfg(feature = "http-proto")] +#[derive(Debug)] +pub struct HttpConfig { + /// Select the HTTP client + pub client: Option>, + + /// Additional headers to send to the collector. + pub headers: Option>, +} + +#[cfg(feature = "http-proto")] +impl Default for HttpConfig { + fn default() -> Self { + HttpConfig { + #[cfg(feature = "reqwest-blocking-client")] + client: Some(Box::new(reqwest::blocking::Client::new())), + #[cfg(all( + not(feature = "reqwest-blocking-client"), + not(feature = "surf-client"), + feature = "reqwest-client" + ))] + client: Some(Box::new(reqwest::Client::new())), + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "reqwest-blocking-client"), + feature = "surf-client" + ))] + client: Some(Box::new(surf::Client::new())), + #[cfg(all( + not(feature = "reqwest-client"), + not(feature = "surf-client"), + not(feature = "reqwest-blocking-client") + ))] + client: None, + headers: None, + } + } +} + impl Default for ExporterConfig { fn default() -> Self { ExporterConfig { @@ -201,6 +269,15 @@ impl Debug for TraceExporter { .field("timeout", &timeout) .field("trace_exporter", &"TraceServiceClient") .finish(), + #[cfg(feature = "http-proto")] + TraceExporter::Http { + headers, timeout, .. + } => f + .debug_struct("Exporter") + .field("headers", &headers) + .field("timeout", &timeout) + .field("trace_exporter", &"TraceServiceClient") + .finish(), } } } @@ -299,6 +376,22 @@ impl TraceExporter { headers: grpcio_config.headers, } } + + /// Builds a new span exporter with the given configuration + #[cfg(feature = "http-proto")] + pub fn new_http(config: ExporterConfig, http_config: HttpConfig) -> Result { + let url: Uri = config + .endpoint + .parse() + .map_err::(Into::into)?; + + Ok(TraceExporter::Http { + trace_exporter: http_config.client, + timeout: config.timeout, + collector_endpoint: url, + headers: http_config.headers, + }) + } } #[async_trait] @@ -352,6 +445,46 @@ impl SpanExporter for TraceExporter { Ok(()) } + + #[cfg(feature = "http-proto")] + TraceExporter::Http { + trace_exporter, + collector_endpoint, + headers, + .. + } => { + let req = ProstRequest { + resource_spans: batch.into_iter().map(Into::into).collect(), + }; + + let mut buf = vec![]; + req.encode(&mut buf) + .map_err::(Into::into)?; + + let mut request = http::Request::builder() + .method(Method::POST) + .uri(collector_endpoint.clone()) + .header(CONTENT_TYPE, "application/x-protobuf") + .body(buf) + .map_err::(Into::into)?; + + if let Some(headers) = headers.clone() { + for (k, val) in headers { + let value = HeaderValue::from_str(val.as_ref()) + .map_err::(Into::into)?; + let key = + HeaderName::try_from(&k).map_err::(Into::into)?; + request.headers_mut().insert(key, value); + } + } + + if let Some(client) = trace_exporter { + client.send(request).await?; + Ok(()) + } else { + Err(crate::Error::NoHttpClient.into()) + } + } } } } diff --git a/opentelemetry-otlp/src/transform/common.rs b/opentelemetry-otlp/src/transform/common.rs index 8d3b1cd4c..2dab9a046 100644 --- a/opentelemetry-otlp/src/transform/common.rs +++ b/opentelemetry-otlp/src/transform/common.rs @@ -80,6 +80,85 @@ pub(crate) mod tonic { } } +#[cfg(feature = "http-proto")] +pub(crate) mod prost { + use super::*; + use crate::proto::prost::common::v1::{ + any_value, AnyValue, ArrayValue, InstrumentationLibrary, KeyValue, + }; + + impl From for InstrumentationLibrary { + fn from(library: opentelemetry::sdk::InstrumentationLibrary) -> Self { + InstrumentationLibrary { + name: library.name.to_string(), + version: library.version.unwrap_or("").to_string(), + } + } + } + + pub(crate) struct Attributes( + pub(crate) ::std::vec::Vec, + ); + + impl From for Attributes { + fn from(attributes: EvictedHashMap) -> Self { + Attributes( + attributes + .into_iter() + .map(|(key, value)| KeyValue { + key: key.as_str().to_string(), + value: Some(value.into()), + }) + .collect(), + ) + } + } + + impl From> for Attributes { + fn from(kvs: Vec) -> Self { + Attributes( + kvs.into_iter() + .map(|api_kv| KeyValue { + key: api_kv.key.as_str().to_string(), + value: Some(api_kv.value.into()), + }) + .collect(), + ) + } + } + + impl From for AnyValue { + fn from(value: Value) -> Self { + AnyValue { + value: match value { + Value::Bool(val) => Some(any_value::Value::BoolValue(val)), + Value::I64(val) => Some(any_value::Value::IntValue(val)), + Value::F64(val) => Some(any_value::Value::DoubleValue(val)), + Value::String(val) => Some(any_value::Value::StringValue(val.into_owned())), + Value::Array(array) => Some(any_value::Value::ArrayValue(match array { + Array::Bool(vals) => array_into_proto(vals), + Array::I64(vals) => array_into_proto(vals), + Array::F64(vals) => array_into_proto(vals), + Array::String(vals) => array_into_proto(vals), + })), + }, + } + } + } + + fn array_into_proto(vals: Vec) -> ArrayValue + where + Value: From, + { + let values = vals + .into_iter() + .map(|val| AnyValue::from(Value::from(val))) + .collect(); + + ArrayValue { values } + } +} + #[cfg(feature = "grpc-sys")] pub(crate) mod grpcio { use super::*; diff --git a/opentelemetry-otlp/src/transform/traces.rs b/opentelemetry-otlp/src/transform/traces.rs index 7dfb0ce8c..6f9491f86 100644 --- a/opentelemetry-otlp/src/transform/traces.rs +++ b/opentelemetry-otlp/src/transform/traces.rs @@ -131,6 +131,135 @@ mod tonic { } } +#[cfg(feature = "http-proto")] +mod prost { + use super::*; + use crate::proto::prost::resource::v1::Resource; + use crate::proto::prost::trace::v1::{ + span, status, InstrumentationLibrarySpans, ResourceSpans, Span, Status, + }; + use crate::transform::common::prost::Attributes; + + impl From for span::SpanKind { + fn from(span_kind: SpanKind) -> Self { + match span_kind { + SpanKind::Client => span::SpanKind::Client, + SpanKind::Consumer => span::SpanKind::Consumer, + SpanKind::Internal => span::SpanKind::Internal, + SpanKind::Producer => span::SpanKind::Producer, + SpanKind::Server => span::SpanKind::Server, + } + } + } + + impl From for status::StatusCode { + fn from(status_code: StatusCode) -> Self { + match status_code { + StatusCode::Ok => status::StatusCode::Ok, + StatusCode::Unset => status::StatusCode::Unset, + StatusCode::Error => status::StatusCode::Error, + } + } + } + + impl From for span::Link { + fn from(link: Link) -> Self { + span::Link { + trace_id: link + .span_context() + .trace_id() + .to_u128() + .to_be_bytes() + .to_vec(), + span_id: link + .span_context() + .span_id() + .to_u64() + .to_be_bytes() + .to_vec(), + trace_state: link.span_context().trace_state().header(), + attributes: Attributes::from(link.attributes().clone()).0, + dropped_attributes_count: 0, + } + } + } + + impl From for ResourceSpans { + fn from(source_span: SpanData) -> Self { + let span_kind: span::SpanKind = source_span.span_kind.into(); + ResourceSpans { + resource: Some(Resource { + attributes: resource_attributes( + source_span.resource.as_ref().map(AsRef::as_ref), + ) + .0, + dropped_attributes_count: 0, + }), + instrumentation_library_spans: vec![InstrumentationLibrarySpans { + instrumentation_library: Default::default(), + spans: vec![Span { + trace_id: source_span + .span_context + .trace_id() + .to_u128() + .to_be_bytes() + .to_vec(), + span_id: source_span + .span_context + .span_id() + .to_u64() + .to_be_bytes() + .to_vec(), + trace_state: source_span.span_context.trace_state().header(), + parent_span_id: { + if source_span.parent_span_id.to_u64() > 0 { + source_span.parent_span_id.to_u64().to_be_bytes().to_vec() + } else { + vec![] + } + }, + name: source_span.name.into_owned(), + kind: span_kind as i32, + start_time_unix_nano: to_nanos(source_span.start_time), + end_time_unix_nano: to_nanos(source_span.end_time), + dropped_attributes_count: source_span.attributes.dropped_count(), + attributes: Attributes::from(source_span.attributes).0, + dropped_events_count: source_span.message_events.dropped_count(), + events: source_span + .message_events + .into_iter() + .map(|event| span::Event { + time_unix_nano: to_nanos(event.timestamp), + name: event.name.into(), + attributes: Attributes::from(event.attributes).0, + dropped_attributes_count: 0, + }) + .collect(), + dropped_links_count: source_span.links.dropped_count(), + links: source_span.links.into_iter().map(Into::into).collect(), + status: Some(Status { + code: status::StatusCode::from(source_span.status_code).into(), + message: source_span.status_message.into_owned(), + ..Default::default() + }), + }], + }], + } + } + } + + fn resource_attributes(resource: Option<&sdk::Resource>) -> Attributes { + resource + .map(|res| { + res.iter() + .map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())) + .collect::>() + }) + .unwrap_or_default() + .into() + } +} + #[cfg(feature = "grpc-sys")] mod grpcio { use super::*; diff --git a/scripts/lint.sh b/scripts/lint.sh index 8c54737ef..e0e9c86e3 100755 --- a/scripts/lint.sh +++ b/scripts/lint.sh @@ -22,6 +22,11 @@ if rustup component add clippy; then cargo_feature opentelemetry-otlp "trace,grpc-sys" cargo_feature opentelemetry-otlp "trace,grpc-sys,openssl" cargo_feature opentelemetry-otlp "trace,grpc-sys,openssl-vendored" + cargo_feature opentelemetry-otlp "http-proto" + cargo_feature opentelemetry-otlp "http-proto, reqwest-blocking-client" + cargo_feature opentelemetry-otlp "http-proto, reqwest-client" + cargo_feature opentelemetry-otlp "http-proto, reqwest-rustls" + cargo_feature opentelemetry-otlp "http-proto, surf-client" cargo_feature opentelemetry-jaeger "surf_collector_client" cargo_feature opentelemetry-jaeger "isahc_collector_client"