adding otlp http transport, using proto binary (#516)

This commit is contained in:
Dan Rusei 2021-04-20 21:54:20 +03:00 committed by GitHub
parent 241dff9d3b
commit 7e88009bc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 713 additions and 11 deletions

View File

@ -19,6 +19,7 @@ members = [
"examples/basic",
"examples/basic-otlp",
"examples/basic-otlp-with-selector",
"examples/basic-otlp-http",
"examples/datadog",
"examples/external-otlp-tonic-tokio",
"examples/grpc",

View File

@ -0,0 +1,12 @@
[package]
name = "basic-otlp-http"
version = "0.1.0"
authors = ["rdan <dan.rusei@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
opentelemetry = { path = "../../opentelemetry", features = ["rt-tokio", "metrics", "serialize"] }
opentelemetry-otlp = { path = "../../opentelemetry-otlp", features = ["http-proto", "reqwest-client"] }
tokio = { version = "1.0", features = ["full"] }

View File

@ -0,0 +1,6 @@
FROM rust:1.51
COPY . /usr/src/basic-otlp-http/
WORKDIR /usr/src/basic-otlp-http/
RUN cargo build --release
RUN cargo install --path .
CMD ["/usr/local/cargo/bin/basic-otlp-http"]

View File

@ -0,0 +1,24 @@
* The application send data directly to a Collector (port 55681)
* Run the application locally, to run as a docker container you have to change the relative paths from the `Cargo.toml`
* The Collector then sends the data to the appropriate backend, in this case JAEGER
This demo uses `docker-compose` and by default runs against the `otel/opentelemetry-collector-dev:latest` image.
```shell
docker-compose up
or
docker-compose up -d
```
In another terminal run the application `cargo run`
Use the browser to see the trace:
- Jaeger at http://0.0.0.0:16686
Tear it down:
```shell
docker-compose down
```

View File

@ -0,0 +1,37 @@
version: "2"
services:
# Jaeger
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268"
- "14250"
# Collector
otel-collector:
image: otel/opentelemetry-collector-dev:latest
command: ["--config=/etc/otel-collector-config.yaml", "${OTELCOL_ARGS}"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "1888:1888" # pprof extension
- "13133:13133" # health_check extension
- "4317" # OTLP gRPC receiver
- "55681:55681" # OTLP HTTP receiver
- "55670:55679" # zpages extension
depends_on:
- jaeger-all-in-one
# metrics-rust:
# build:
# dockerfile: $PWD/Dockerfile
# context: ./basic-otlp-http
# environment:
# - OTLP_TONIC_ENDPOINT=otel-collector:4317
# depends_on:
# - otel-collector

View File

@ -0,0 +1,35 @@
receivers:
otlp:
protocols:
http:
grpc:
exporters:
logging:
loglevel: debug
jaeger:
endpoint: jaeger-all-in-one:14250
insecure: true
processors:
batch:
extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679
service:
extensions: [pprof, zpages, health_check]
pipelines:
traces:
receivers: [otlp]
processors: [batch]
exporters: [logging, jaeger]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [logging]

View File

@ -0,0 +1,48 @@
use opentelemetry::trace::TraceError;
use opentelemetry::{global, sdk::trace as sdktrace};
use opentelemetry::{
trace::{TraceContextExt, Tracer},
Key,
};
use std::error::Error;
use std::time::Duration;
fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_otlp::new_pipeline()
.with_endpoint("http://localhost:55681/v1/traces")
.with_http()
.install_batch(opentelemetry::runtime::Tokio)
}
const LEMONS_KEY: Key = Key::from_static_str("ex.com/lemons");
const ANOTHER_KEY: Key = Key::from_static_str("ex.com/another");
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let _ = init_tracer()?;
let tracer = global::tracer("ex.com/basic");
tracer.in_span("operation", |cx| {
let span = cx.span();
span.add_event(
"Nice operation!".to_string(),
vec![Key::new("bogons").i64(100)],
);
span.set_attribute(ANOTHER_KEY.string("yes"));
tracer.in_span("Sub operation...", |cx| {
let span = cx.span();
span.set_attribute(LEMONS_KEY.string("five"));
span.add_event("Sub span event".to_string(), vec![]);
});
});
// wait for 1 minutes so that we could see metrics being pushed via OTLP every 10 seconds.
tokio::time::sleep(Duration::from_secs(60)).await;
global::shutdown_tracer_provider();
Ok(())
}

View File

@ -44,6 +44,10 @@ protobuf = { version = "2.18", optional = true }
thiserror = "1.0"
tonic = { version = "0.4", optional = true }
tokio = { version = "1.0", features = ["full"], optional = true }
opentelemetry-http = { version = "0.2", path = "../opentelemetry-http", optional = true }
reqwest = { version = "0.11", optional = true, default-features = false }
surf = { version = "2.0", optional = true }
http = "0.2"
[dev-dependencies]
chrono = "0.4"
@ -64,5 +68,12 @@ openssl = ["grpcio/openssl"]
openssl-vendored = ["grpcio/openssl-vendored"]
integration-testing = ["tonic", "tonic-build", "prost", "tokio/full", "opentelemetry/trace"]
http-proto = ["prost-build", "prost", "opentelemetry-http"]
reqwest-blocking-client = ["reqwest/blocking", "opentelemetry-http/reqwest"]
reqwest-client = ["reqwest", "opentelemetry-http/reqwest"]
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
surf-client = ["surf", "opentelemetry-http/surf"]
[build-dependencies]
tonic-build = { version = "0.4", optional = true }
prost-build = {version = "0.7", optional = true }

View File

@ -3,12 +3,22 @@
//
// Grpc related files used by grpcio are maintained at src/proto/grpcio. tests/grpc_build.rs makes
// sure they are up to date.
#[cfg(any(feature = "tonic", feature = "http-proto"))]
use std::path::PathBuf;
fn main() {
#[cfg(feature = "tonic")]
{
let out_dir = PathBuf::from(
std::env::var("OUT_DIR").expect("OUT_DIR should be set by cargo but can't find"),
)
.join("tonic");
std::fs::create_dir_all(out_dir.clone()).expect("cannot create output dir");
tonic_build::configure()
.build_server(std::env::var_os("CARGO_FEATURE_INTEGRATION_TESTING").is_some())
.build_client(true)
.format(false)
.out_dir(out_dir)
.compile(
&[
"src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto",
@ -23,3 +33,28 @@ fn main() {
)
.expect("Error generating protobuf");
}
#[cfg(feature = "http-proto")]
{
let out_dir = PathBuf::from(
std::env::var("OUT_DIR").expect("OUT_DIR should be set by cargo but can't find"),
)
.join("prost");
std::fs::create_dir_all(out_dir.clone()).expect("cannot create output dir");
prost_build::Config::new()
.out_dir(out_dir)
.compile_protos(
&[
"src/proto/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/trace/v1/trace_config.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto",
"src/proto/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto",
],
&["src/proto/opentelemetry-proto"],
)
.expect("Error generating protobuf");
}
}

View File

@ -148,7 +148,7 @@
use opentelemetry::{global, runtime::Runtime, sdk, trace::TracerProvider};
#[cfg(feature = "grpc-sys")]
#[cfg(any(feature = "grpc-sys", feature = "http-proto"))]
use std::collections::HashMap;
use std::str::FromStr;
@ -187,12 +187,18 @@ pub use crate::span::TonicConfig;
#[cfg(feature = "grpc-sys")]
pub use crate::span::GrpcioConfig;
#[cfg(feature = "http-proto")]
pub use crate::span::HttpConfig;
#[cfg(feature = "metrics")]
pub use crate::metric::{new_metrics_pipeline, MetricsExporter, OtlpMetricPipelineBuilder};
#[cfg(feature = "grpc-sys")]
pub use crate::span::{Compression, Credentials};
#[cfg(feature = "http-proto")]
use opentelemetry_http::HttpClient;
use opentelemetry::sdk::export::ExportError;
use opentelemetry::trace::TraceError;
@ -308,6 +314,17 @@ impl OtlpPipelineBuilder {
grpcio_config: GrpcioConfig::default(),
}
}
/// Use HTTP as transport layer, return a `HttpPipelineBuilder` to config the http transport
/// and build the exporter
#[cfg(feature = "http-proto")]
pub fn with_http(self) -> HttpPipelineBuilder {
HttpPipelineBuilder {
exporter_config: self.exporter_config,
trace_config: self.trace_config,
http_config: HttpConfig::default(),
}
}
}
/// Build a trace exporter that uses [tonic] as grpc layer and opentelemetry protocol.
@ -481,6 +498,69 @@ impl GrpcioPipelineBuilder {
}
}
/// Build a trace exporter that uses HTTP transport and opentelemetry protocol.
///
/// It provides methods to config http client. The methods can be chained. The exporter can be built by calling
/// [`install_simple`] or [`install_batch`].
///
/// `HttpPipelineBuilder` can be constructed by calling [`with_http`] function in [`OtlpPipelineBuilder`]
///
/// [`with_http`]: crate::OtlpPipelineBuilder::with_http
/// [`OtlpPipelineBuilder`]: crate::OtlpPipelineBuilder
/// [`install_simple`]: crate::HttpPipelineBuilder::install_simple
/// [`install_batch`]: crate::HttpPipelineBuilder::install_batch
#[derive(Default, Debug)]
#[cfg(feature = "http-proto")]
pub struct HttpPipelineBuilder {
exporter_config: ExporterConfig,
http_config: HttpConfig,
trace_config: Option<sdk::trace::Config>,
}
#[cfg(feature = "http-proto")]
impl HttpPipelineBuilder {
/// Assign client implementation
pub fn with_http_client<T: HttpClient + 'static>(mut self, client: T) -> Self {
self.http_config.client = Some(Box::new(client));
self
}
/// Set additional headers to send to the collector.
pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
self.http_config.headers = Some(headers);
self
}
/// Install a trace exporter using HTTP as transport layer and a simple span processor.
///
/// Returns a [`Tracer`] with the name `opentelemetry-otlp` and current crate version.
///
/// `install_simple` will panic if not called within a tokio runtime
///
/// [`Tracer`]: opentelemetry::trace::Tracer
pub fn install_simple(self) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = TraceExporter::new_http(self.exporter_config, self.http_config)?;
Ok(build_simple_with_exporter(exporter, self.trace_config))
}
/// Install a trace exporter using HTTP as transport layer and a batch span processor using the
/// specified runtime.
///
/// Returns a [`Tracer`] with the name `opentelemetry-otlp` and current crate version.
///
/// `install_batch` will panic if not called within a tokio runtime
///
/// [`Tracer`]: opentelemetry::trace::Tracer
pub fn install_batch<R: Runtime>(self, runtime: R) -> Result<sdk::trace::Tracer, TraceError> {
let exporter = TraceExporter::new_http(self.exporter_config, self.http_config)?;
Ok(build_batch_with_exporter(
exporter,
self.trace_config,
runtime,
))
}
}
fn build_simple_with_exporter(
exporter: TraceExporter,
trace_config: Option<sdk::trace::Config>,
@ -520,9 +600,9 @@ pub enum Error {
Transport(#[from] tonic::transport::Error),
/// Error from tonic::codegen::http::uri::InvalidUri
#[cfg(feature = "tonic")]
#[cfg(any(feature = "tonic", feature = "http-proto"))]
#[error("invalid URI {0}")]
InvalidUri(#[from] tonic::codegen::http::uri::InvalidUri),
InvalidUri(#[from] http::uri::InvalidUri),
/// Error from tonic::Status
#[cfg(feature = "tonic")]
@ -534,6 +614,31 @@ pub enum Error {
#[error("grpcio error {0}")]
Grpcio(#[from] grpcio::Error),
/// Http requests failed
#[cfg(feature = "http-proto")]
#[error("No Http Client, you must select one")]
NoHttpClient,
/// Http requests failed
#[cfg(feature = "http-proto")]
#[error("http request failed with {0}")]
RequestFailed(#[from] http::Error),
/// Invalid Header Value
#[cfg(feature = "http-proto")]
#[error("http header value error {0}")]
InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),
/// Invalid Header Name
#[cfg(feature = "http-proto")]
#[error("http header name error {0}")]
InvalidHeaderName(#[from] http::header::InvalidHeaderName),
/// Prost encode failed
#[cfg(feature = "http-proto")]
#[error("prost encoding error {0}")]
EncodeError(#[from] prost::EncodeError),
/// The lock in exporters has been poisoned.
#[cfg(feature = "metrics")]
#[error("the lock of the {0} has been poisoned")]
@ -553,7 +658,8 @@ pub enum Protocol {
Grpc,
// TODO add support for other protocols
// HttpJson,
// HttpProto,
/// HTTP protocol with binary protobuf
HttpBinary,
}
#[cfg(test)]

View File

@ -2,13 +2,13 @@
pub mod collector {
pub mod metrics {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.collector.metrics.v1");
include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.collector.metrics.v1.rs"));
}
}
pub mod trace {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.collector.trace.v1");
include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.collector.trace.v1.rs"));
}
}
}
@ -16,28 +16,69 @@ pub mod collector {
#[cfg(feature = "tonic")]
pub mod common {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.common.v1");
include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.common.v1.rs"));
}
}
#[cfg(feature = "tonic")]
pub mod metrics {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.metrics.v1");
include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.metrics.v1.rs"));
}
}
#[cfg(feature = "tonic")]
pub mod resource {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.resource.v1");
include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.resource.v1.rs"));
}
}
#[cfg(feature = "tonic")]
pub mod trace {
pub mod v1 {
tonic::include_proto!("opentelemetry.proto.trace.v1");
include!(concat!(env!("OUT_DIR"), "/tonic", "/opentelemetry.proto.trace.v1.rs"));
}
}
#[cfg(feature="http-proto")]
pub mod prost {
pub mod collector {
pub mod metrics {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.collector.metrics.v1.rs"));
}
}
pub mod trace {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.collector.trace.v1.rs"));
}
}
}
pub mod common {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.common.v1.rs"));
}
}
pub mod metrics {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.metrics.v1.rs"));
}
}
pub mod resource {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.resource.v1.rs"));
}
}
pub mod trace {
pub mod v1 {
include!(concat!(env!("OUT_DIR"), "/prost", "/opentelemetry.proto.trace.v1.rs"));
}
}
}

View File

@ -8,6 +8,9 @@ use crate::proto::collector::trace::v1::{
ExportTraceServiceRequest as TonicRequest,
};
#[cfg(feature = "http-proto")]
use crate::proto::prost::collector::trace::v1::ExportTraceServiceRequest as ProstRequest;
#[cfg(feature = "tonic")]
use tonic::{
metadata::{KeyAndValueRef, MetadataMap},
@ -33,9 +36,14 @@ use grpcio::{
#[cfg(feature = "grpc-sys")]
use protobuf::RepeatedField;
#[cfg(feature = "http-proto")]
use prost::Message;
#[cfg(feature = "http-proto")]
use std::convert::TryFrom;
use async_trait::async_trait;
#[cfg(feature = "grpc-sys")]
#[cfg(any(feature = "grpc-sys", feature = "http-proto"))]
use std::collections::HashMap;
use std::fmt;
@ -48,6 +56,14 @@ use crate::{Protocol, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT, OTEL_EXPORTER_OTLP_TI
use opentelemetry::sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use std::time::Duration;
#[cfg(feature = "http-proto")]
use http::{
header::{HeaderName, HeaderValue, CONTENT_TYPE},
Method, Uri,
};
#[cfg(feature = "http-proto")]
use opentelemetry_http::HttpClient;
/// Exporter that sends data in OTLP format.
pub enum TraceExporter {
#[cfg(feature = "tonic")]
@ -70,6 +86,18 @@ pub enum TraceExporter {
/// The Grpc trace exporter
trace_exporter: GrpcioTraceServiceClient,
},
#[cfg(feature = "http-proto")]
/// Trace Exporter using HTTP transport
Http {
/// Duration of timeout when sending spans to backend.
timeout: Duration,
/// Additional headers of the outbound requests.
headers: Option<HashMap<String, String>>,
/// The Collector URL
collector_endpoint: Uri,
/// The HTTP trace exporter
trace_exporter: Option<Box<dyn HttpClient>>,
},
}
/// Configuration for the OTLP exporter.
@ -170,6 +198,46 @@ impl From<Compression> for grpcio::CompressionAlgorithms {
}
}
/// Configuration of the http transport
#[cfg(feature = "http-proto")]
#[derive(Debug)]
pub struct HttpConfig {
/// Select the HTTP client
pub client: Option<Box<dyn HttpClient>>,
/// Additional headers to send to the collector.
pub headers: Option<HashMap<String, String>>,
}
#[cfg(feature = "http-proto")]
impl Default for HttpConfig {
fn default() -> Self {
HttpConfig {
#[cfg(feature = "reqwest-blocking-client")]
client: Some(Box::new(reqwest::blocking::Client::new())),
#[cfg(all(
not(feature = "reqwest-blocking-client"),
not(feature = "surf-client"),
feature = "reqwest-client"
))]
client: Some(Box::new(reqwest::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "reqwest-blocking-client"),
feature = "surf-client"
))]
client: Some(Box::new(surf::Client::new())),
#[cfg(all(
not(feature = "reqwest-client"),
not(feature = "surf-client"),
not(feature = "reqwest-blocking-client")
))]
client: None,
headers: None,
}
}
}
impl Default for ExporterConfig {
fn default() -> Self {
ExporterConfig {
@ -201,6 +269,15 @@ impl Debug for TraceExporter {
.field("timeout", &timeout)
.field("trace_exporter", &"TraceServiceClient")
.finish(),
#[cfg(feature = "http-proto")]
TraceExporter::Http {
headers, timeout, ..
} => f
.debug_struct("Exporter")
.field("headers", &headers)
.field("timeout", &timeout)
.field("trace_exporter", &"TraceServiceClient")
.finish(),
}
}
}
@ -299,6 +376,22 @@ impl TraceExporter {
headers: grpcio_config.headers,
}
}
/// Builds a new span exporter with the given configuration
#[cfg(feature = "http-proto")]
pub fn new_http(config: ExporterConfig, http_config: HttpConfig) -> Result<Self, crate::Error> {
let url: Uri = config
.endpoint
.parse()
.map_err::<crate::Error, _>(Into::into)?;
Ok(TraceExporter::Http {
trace_exporter: http_config.client,
timeout: config.timeout,
collector_endpoint: url,
headers: http_config.headers,
})
}
}
#[async_trait]
@ -352,6 +445,46 @@ impl SpanExporter for TraceExporter {
Ok(())
}
#[cfg(feature = "http-proto")]
TraceExporter::Http {
trace_exporter,
collector_endpoint,
headers,
..
} => {
let req = ProstRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
};
let mut buf = vec![];
req.encode(&mut buf)
.map_err::<crate::Error, _>(Into::into)?;
let mut request = http::Request::builder()
.method(Method::POST)
.uri(collector_endpoint.clone())
.header(CONTENT_TYPE, "application/x-protobuf")
.body(buf)
.map_err::<crate::Error, _>(Into::into)?;
if let Some(headers) = headers.clone() {
for (k, val) in headers {
let value = HeaderValue::from_str(val.as_ref())
.map_err::<crate::Error, _>(Into::into)?;
let key =
HeaderName::try_from(&k).map_err::<crate::Error, _>(Into::into)?;
request.headers_mut().insert(key, value);
}
}
if let Some(client) = trace_exporter {
client.send(request).await?;
Ok(())
} else {
Err(crate::Error::NoHttpClient.into())
}
}
}
}
}

View File

@ -80,6 +80,85 @@ pub(crate) mod tonic {
}
}
#[cfg(feature = "http-proto")]
pub(crate) mod prost {
use super::*;
use crate::proto::prost::common::v1::{
any_value, AnyValue, ArrayValue, InstrumentationLibrary, KeyValue,
};
impl From<opentelemetry::sdk::InstrumentationLibrary> for InstrumentationLibrary {
fn from(library: opentelemetry::sdk::InstrumentationLibrary) -> Self {
InstrumentationLibrary {
name: library.name.to_string(),
version: library.version.unwrap_or("").to_string(),
}
}
}
pub(crate) struct Attributes(
pub(crate) ::std::vec::Vec<crate::proto::prost::common::v1::KeyValue>,
);
impl From<EvictedHashMap> for Attributes {
fn from(attributes: EvictedHashMap) -> Self {
Attributes(
attributes
.into_iter()
.map(|(key, value)| KeyValue {
key: key.as_str().to_string(),
value: Some(value.into()),
})
.collect(),
)
}
}
impl From<Vec<opentelemetry::KeyValue>> for Attributes {
fn from(kvs: Vec<opentelemetry::KeyValue>) -> Self {
Attributes(
kvs.into_iter()
.map(|api_kv| KeyValue {
key: api_kv.key.as_str().to_string(),
value: Some(api_kv.value.into()),
})
.collect(),
)
}
}
impl From<Value> for AnyValue {
fn from(value: Value) -> Self {
AnyValue {
value: match value {
Value::Bool(val) => Some(any_value::Value::BoolValue(val)),
Value::I64(val) => Some(any_value::Value::IntValue(val)),
Value::F64(val) => Some(any_value::Value::DoubleValue(val)),
Value::String(val) => Some(any_value::Value::StringValue(val.into_owned())),
Value::Array(array) => Some(any_value::Value::ArrayValue(match array {
Array::Bool(vals) => array_into_proto(vals),
Array::I64(vals) => array_into_proto(vals),
Array::F64(vals) => array_into_proto(vals),
Array::String(vals) => array_into_proto(vals),
})),
},
}
}
}
fn array_into_proto<T>(vals: Vec<T>) -> ArrayValue
where
Value: From<T>,
{
let values = vals
.into_iter()
.map(|val| AnyValue::from(Value::from(val)))
.collect();
ArrayValue { values }
}
}
#[cfg(feature = "grpc-sys")]
pub(crate) mod grpcio {
use super::*;

View File

@ -131,6 +131,135 @@ mod tonic {
}
}
#[cfg(feature = "http-proto")]
mod prost {
use super::*;
use crate::proto::prost::resource::v1::Resource;
use crate::proto::prost::trace::v1::{
span, status, InstrumentationLibrarySpans, ResourceSpans, Span, Status,
};
use crate::transform::common::prost::Attributes;
impl From<SpanKind> for span::SpanKind {
fn from(span_kind: SpanKind) -> Self {
match span_kind {
SpanKind::Client => span::SpanKind::Client,
SpanKind::Consumer => span::SpanKind::Consumer,
SpanKind::Internal => span::SpanKind::Internal,
SpanKind::Producer => span::SpanKind::Producer,
SpanKind::Server => span::SpanKind::Server,
}
}
}
impl From<StatusCode> for status::StatusCode {
fn from(status_code: StatusCode) -> Self {
match status_code {
StatusCode::Ok => status::StatusCode::Ok,
StatusCode::Unset => status::StatusCode::Unset,
StatusCode::Error => status::StatusCode::Error,
}
}
}
impl From<Link> for span::Link {
fn from(link: Link) -> Self {
span::Link {
trace_id: link
.span_context()
.trace_id()
.to_u128()
.to_be_bytes()
.to_vec(),
span_id: link
.span_context()
.span_id()
.to_u64()
.to_be_bytes()
.to_vec(),
trace_state: link.span_context().trace_state().header(),
attributes: Attributes::from(link.attributes().clone()).0,
dropped_attributes_count: 0,
}
}
}
impl From<SpanData> for ResourceSpans {
fn from(source_span: SpanData) -> Self {
let span_kind: span::SpanKind = source_span.span_kind.into();
ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes(
source_span.resource.as_ref().map(AsRef::as_ref),
)
.0,
dropped_attributes_count: 0,
}),
instrumentation_library_spans: vec![InstrumentationLibrarySpans {
instrumentation_library: Default::default(),
spans: vec![Span {
trace_id: source_span
.span_context
.trace_id()
.to_u128()
.to_be_bytes()
.to_vec(),
span_id: source_span
.span_context
.span_id()
.to_u64()
.to_be_bytes()
.to_vec(),
trace_state: source_span.span_context.trace_state().header(),
parent_span_id: {
if source_span.parent_span_id.to_u64() > 0 {
source_span.parent_span_id.to_u64().to_be_bytes().to_vec()
} else {
vec![]
}
},
name: source_span.name.into_owned(),
kind: span_kind as i32,
start_time_unix_nano: to_nanos(source_span.start_time),
end_time_unix_nano: to_nanos(source_span.end_time),
dropped_attributes_count: source_span.attributes.dropped_count(),
attributes: Attributes::from(source_span.attributes).0,
dropped_events_count: source_span.message_events.dropped_count(),
events: source_span
.message_events
.into_iter()
.map(|event| span::Event {
time_unix_nano: to_nanos(event.timestamp),
name: event.name.into(),
attributes: Attributes::from(event.attributes).0,
dropped_attributes_count: 0,
})
.collect(),
dropped_links_count: source_span.links.dropped_count(),
links: source_span.links.into_iter().map(Into::into).collect(),
status: Some(Status {
code: status::StatusCode::from(source_span.status_code).into(),
message: source_span.status_message.into_owned(),
..Default::default()
}),
}],
}],
}
}
}
fn resource_attributes(resource: Option<&sdk::Resource>) -> Attributes {
resource
.map(|res| {
res.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default()
.into()
}
}
#[cfg(feature = "grpc-sys")]
mod grpcio {
use super::*;

View File

@ -22,6 +22,11 @@ if rustup component add clippy; then
cargo_feature opentelemetry-otlp "trace,grpc-sys"
cargo_feature opentelemetry-otlp "trace,grpc-sys,openssl"
cargo_feature opentelemetry-otlp "trace,grpc-sys,openssl-vendored"
cargo_feature opentelemetry-otlp "http-proto"
cargo_feature opentelemetry-otlp "http-proto, reqwest-blocking-client"
cargo_feature opentelemetry-otlp "http-proto, reqwest-client"
cargo_feature opentelemetry-otlp "http-proto, reqwest-rustls"
cargo_feature opentelemetry-otlp "http-proto, surf-client"
cargo_feature opentelemetry-jaeger "surf_collector_client"
cargo_feature opentelemetry-jaeger "isahc_collector_client"