Compare commits

..

No commits in common. "main" and "v0.6.0" have entirely different histories.
main ... v0.6.0

59 changed files with 6738 additions and 4568 deletions

View File

@ -22,18 +22,16 @@ jobs:
- x86_64-unknown-linux-gnu
- x86_64-unknown-linux-musl
- wasm32-unknown-unknown
- wasm32-wasip1
steps:
- uses: actions/checkout@v2
# setup wasmedge
- run: curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | sudo bash -s -- -p /usr/local
# Setup musl if needed
- run: sudo apt-get update
if: matrix.target == 'x86_64-unknown-linux-musl'
- run: sudo apt-get install -y musl musl-dev musl-tools cmake
if: matrix.target == 'x86_64-unknown-linux-musl'
# # Caching stuff
# Caching stuff
- uses: actions/cache@v2
with:
path: |
@ -98,31 +96,8 @@ jobs:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target wasm32-unknown-unknown --features reqwest
- uses: actions-rs/cargo@v1
name: "Build"
if: matrix.target == 'wasm32-wasi'
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --features "http-0-2-binding hyper-0-14 hyper_wasi"
- uses: actions-rs/cargo@v1
name: "Test"
if: matrix.target == 'wasm32-wasi'
with:
command: test
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --features "http-0-2-binding hyper-0-14 hyper_wasi"
env:
CARGO_TARGET_WASM32_WASI_RUNNER: wasmedge
# Build examples
- uses: actions-rs/cargo@v1
name: "Build wasi-example"
if: matrix.target == 'wasm32-wasi' && matrix.toolchain == 'stable'
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/wasi-example/Cargo.toml
# Build examples
- uses: actions-rs/cargo@v1
name: "Build reqwest-wasm-example"
if: matrix.target == 'wasm32-unknown-unknown' && matrix.toolchain == 'stable'
@ -178,4 +153,3 @@ jobs:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/nats-example/Cargo.toml

View File

@ -1,6 +1,6 @@
[package]
name = "cloudevents-sdk"
version = "0.8.0"
version = "0.6.0"
authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
license-file = "LICENSE"
edition = "2018"
@ -12,7 +12,6 @@ exclude = [
".github/*"
]
categories = ["web-programming", "encoding", "data-structures"]
resolver = "2"
# Enable all features when building on docs.rs to show feature gated bindings
[package.metadata.docs.rs]
@ -24,82 +23,60 @@ name = "cloudevents"
[features]
http-binding = ["async-trait", "bytes", "futures", "http"]
http-0-2-binding = ["async-trait", "bytes", "futures", "http-0-2"]
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http-0-2"]
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http"]
reqwest = ["reqwest-lib", "async-trait", "bytes", "http", "uuid/js"]
rdkafka = ["rdkafka-lib", "bytes", "futures"]
warp = ["warp-lib", "bytes", "http-0-2", "http-body-util", "hyper-0-14"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body-util", "async-trait"]
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait", "http-body-util", "futures"]
warp = ["warp-lib", "bytes", "http", "hyper"]
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
nats = ["nats-lib"]
[dependencies]
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
delegate-attr = "^0.3"
base64 = "^0.22"
url = { version = "^2.5", features = ["serde"] }
snafu = "^0.8"
bitflags = "^2.6"
delegate-attr = "^0.2"
base64 = "^0.12"
url = { version = "^2.1", features = ["serde"] }
snafu = "^0.6"
bitflags = "^1.2"
uuid = { version = "1", features = ["v4"] }
# runtime optional deps
actix-web = { version = "4", optional = true }
actix-http = { version = "3", optional = true }
reqwest-lib = { version = "^0.12", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
rdkafka-lib = { version = "^0.37", features = ["cmake-build"], optional = true, package = "rdkafka" }
reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
rdkafka-lib = { version = "^0.29", features = ["cmake-build"], optional = true, package = "rdkafka" }
warp-lib = { version = "^0.3", optional = true, package = "warp" }
async-trait = { version = "^0.1", optional = true }
async-trait = { version = "^0.1.33", optional = true }
bytes = { version = "^1.0", optional = true }
futures = { version = "^0.3", optional = true, features = ["compat"]}
http = { version = "1.2", optional = true}
http-0-2 = { version = "0.2", optional = true, package = "http"}
axum-lib = { version = "^0.8", optional = true, package="axum"}
http-body-util = {version = "^0.1", optional = true}
poem-lib = { version = "^3.1", optional = true, package = "poem" }
nats-lib = { version = "0.25.0", optional = true, package = "nats" }
futures = { version = "^0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "^0.14", optional = true }
axum-lib = { version = "^0.5", optional = true , package="axum"}
http-body = { version = "^0.4", optional = true }
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.4"
hostname = "^0.3"
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
web-sys = { version = "^0.3", features = ["Window", "Location"] }
[target.'cfg(not(target_os = "wasi"))'.dependencies]
hyper = { version = "^1.5", optional = true, package="hyper" }
hyper-0-14 = { version = "^0.14", optional = true, package = "hyper"}
[target.'cfg(all(target_arch = "wasm32", target_os = "wasi"))'.dependencies]
hyper_wasi = { version = "0.15", features = ["full"], optional = true }
[dev-dependencies]
rstest = "0.23"
claims = "0.8"
rstest = "0.6"
claim = "0.3.1"
version-sync = "0.9.2"
serde_yaml = "^0.9"
serde_yaml = "0.8"
rmp-serde = "1"
# runtime dev-deps
actix-rt = { version = "^2" }
url = { version = "^2.1", features = ["serde"] }
serde_json = { version = "^1.0" }
chrono = { version = "^0.4", features = ["serde"] }
mockito = "0.31.1"
mime = "0.3"
[target.'cfg(not(target_os = "wasi"))'.dev-dependencies]
actix-rt = { version = "^2" }
mockito = "0.25.1"
tokio = { version = "^1.0", features = ["full"] }
tower = { version = "0.5", features = ["util"] }
[target.'cfg(all(target_arch = "wasm32", target_os = "wasi"))'.dev-dependencies]
tokio_wasi = { version = "1", features = [
"io-util",
"fs",
"net",
"time",
"rt",
"macros",
] }
mime = "0.3"
tower = { version = "0.4", features = ["util"] }

View File

@ -1,7 +0,0 @@
# Maintainers
Current active maintainers of this SDK:
- [Jim Crossley](https://github.com/jcrossley3)
- [Francesco Guardiani](https://github.com/slinkydeveloper)
- [Fabrizio Lazzaretti](https://github.com/Lazzaretti)

6
OWNERS
View File

@ -1,6 +0,0 @@
admins:
- jcrossley3
- linuxbasic
- slinkydeveloper
- Lazzaretti
approvers:

View File

@ -42,7 +42,7 @@ enabling your Protocol Binding of choice:
```toml
[dependencies]
cloudevents-sdk = { version = "0.8.0" }
cloudevents-sdk = { version = "0.6.0" }
```
Now you can start creating events:
@ -67,6 +67,10 @@ Checkout the examples using our integrations to learn how to send and receive ev
* [Warp Example](example-projects/warp-example)
* [NATS Example](example-projects/nats-example)
## Development & Contributing
If you're interested in contributing to sdk-rust, look at [Contributing documentation](CONTRIBUTING.md)
## Community
- There are bi-weekly calls immediately following the
@ -93,19 +97,8 @@ for how PR reviews and approval, and our
[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information)
information.
If there is a security concern with one of the CloudEvents specifications, or
with one of the project's SDKs, please send an email to
[cncf-cloudevents-security@lists.cncf.io](mailto:cncf-cloudevents-security@lists.cncf.io).
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk.svg
[crates.io]: https://crates.io/crates/cloudevents-sdk
[Docs badge]: https://docs.rs/cloudevents-sdk/badge.svg
[docs.rs]: https://docs.rs/cloudevents-sdk
[feature flag]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section
## Additional SDK Resources
- [List of current active maintainers](MAINTAINERS.md)
- [How to contribute to the project](CONTRIBUTING.md)
- [SDK's License](LICENSE)
- [SDK's Release process](RELEASING.md)

View File

@ -1,10 +0,0 @@
# How to create a Release
To create a new release, do the following:
- Bump the version in the README, lib.rs and cargo.toml
- Try to run `cargo test --all-features`, `cargo doc --all-features --lib` and
`cargo publish --dry-run`
- If none of the above commands fail, PR the changes and merge it
- Checkout `main` on your local machine and run `cargo publish`
- Once that is done, create the release in the Github UI (make sure it
creates the git tag as well) and that's it!

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
cloudevents-sdk = { path = "../..", features = ["actix"] }
actix-web = "4"
actix-cors = "^0.7"
actix-cors = "0.6.0-beta.8"
serde_json = "^1.0"
url = { version = "^2.1" }
env_logger = "^0.11"
env_logger = "0.7.1"

View File

@ -6,16 +6,16 @@ edition = "2021"
[dependencies]
cloudevents-sdk = { path = "../..", features = ["axum"] }
axum = "^0.8"
http = "^1.1"
axum = "^0.5"
http = "^0.2"
tokio = { version = "^1", features = ["full"] }
tracing = "^0.1"
tracing-subscriber = "^0.3"
tower-http = { version = "^0.6", features = ["trace"] }
tracing-subscriber = "^0.2"
tower-http = { version = "^0.1", features = ["trace"] }
[dev-dependencies]
tower = { version = "^0.5", features = ["util"] }
tower = { version = "^0.4", features = ["util"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
chrono = { version = "^0.4", features = ["serde"] }
hyper = { version = "^1.4" }
hyper = { version = "^0.14" }

View File

@ -4,6 +4,7 @@ use axum::{
};
use cloudevents::Event;
use http::StatusCode;
use std::net::SocketAddr;
use tower_http::trace::TraceLayer;
fn echo_app() -> Router {
@ -26,8 +27,12 @@ async fn main() {
}
tracing_subscriber::fmt::init();
let service = echo_app();
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
axum::serve(listener, service).await.unwrap();
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(service.into_make_service())
.await
.unwrap();
}
#[cfg(test)]

View File

@ -9,4 +9,4 @@ edition = "2021"
[dependencies]
cloudevents-sdk = { path = "../..", features = ["nats"] }
serde_json = "^1.0"
nats = "^0.25"
nats = "0.21.0"

View File

@ -7,8 +7,8 @@ edition = "2021"
cloudevents-sdk = { path = "../..", features = ["poem"] }
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
tracing = "0.1"
poem = { version = "^3.0" }
tracing-subscriber = "0.3"
poem = { version = "1" }
tracing-subscriber = "0.2"
serde_json = "1.0"
[dev-dependencies]

View File

@ -1,6 +1,6 @@
[package]
name = "rdkafka-example"
version = "0.4.0"
version = "0.3.0"
authors = ["Pranav Bhatt <adpranavb2000@gmail.com>"]
edition = "2018"
@ -16,4 +16,4 @@ serde_json = "^1.0"
futures = "^0.3"
tokio = { version = "^1.0", features = ["full"] }
clap = "2.33.1"
rdkafka = { version = "^0.37", features = ["cmake-build"] }
rdkafka = { version = "^0.29", features = ["cmake-build"] }

View File

@ -3,7 +3,6 @@ name = "reqwest-wasm-example"
version = "0.3.0"
authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
edition = "2018"
resolver = "2"
# Config mostly pulled from: https://github.com/rustwasm/wasm-bindgen/blob/master/examples/fetch/Cargo.toml
@ -11,7 +10,7 @@ resolver = "2"
crate-type = ["cdylib"]
[dependencies]
reqwest = "^0.12"
reqwest = "^0.11"
uuid = "1"
cloudevents-sdk = { path = "../..", features = ["reqwest"] }
url = { version = "^2.1" }

File diff suppressed because it is too large Load Diff

View File

@ -6,12 +6,12 @@
"devDependencies": {
"@wasm-tool/wasm-pack-plugin": "^1.4.0",
"css-loader": "^5.2.6",
"html-webpack-plugin": "^5.5.0",
"html-webpack-plugin": "^4.5.2",
"style-loader": "^2.0.0",
"text-encoding": "^0.7.0",
"webpack": "^5.95.0",
"webpack": "^4.46.0",
"webpack-cli": "^4.8.0",
"webpack-dev-server": "^5.0.4"
"webpack-dev-server": "^3.11.2"
},
"dependencies": {
"bootstrap": "^5.0.2",

View File

@ -1,17 +0,0 @@
[package]
name = "wasi-example"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0"
cloudevents-sdk = { path = "../..", features = ["http-0-2-binding", "hyper_wasi", "hyper-0-14" ] }
hyper_wasi = { version = "0.15", features = ["full"] }
log = "0.4.21"
tokio_wasi = { version = "1", features = ["io-util", "fs", "net", "time", "rt", "macros"] }
serde_json = "^1.0"
[dev-dependencies]
bytes = "1.6.0"
http-body-util = "0.1.1"
chrono = "*"

View File

@ -1,26 +0,0 @@
Install WASMEdge:
https://wasmedge.org/docs/start/install/
To run the server:
```console
cargo run --target wasm32-wasi
```
To test a GET:
```console
curl -sw '%{http_code}\n' http://localhost:9000/health/readiness
```
To test a POST:
```console
curl -d '{"name": "wasi-womble"}' \
-H'content-type: application/json' \
-H'ce-specversion: 1.0' \
-H'ce-id: 1' \
-H'ce-source: http://cloudevents.io' \
-H'ce-type: dev.knative.example' \
http://localhost:9000
```

View File

@ -1,39 +0,0 @@
use cloudevents::{event::Data, Event, EventBuilder, EventBuilderV10};
use log::info;
use serde_json::{from_slice, from_str, json};
pub async fn handle_event(event: Event) -> Result<Event, anyhow::Error> {
info!("event: {}", event);
let input = match event.data() {
Some(Data::Binary(v)) => from_slice(v)?,
Some(Data::String(v)) => from_str(v)?,
Some(Data::Json(v)) => v.to_owned(),
None => json!({ "name": "default" }),
};
EventBuilderV10::from(event)
.source("func://handler")
.ty("func.example")
.data("application/json", json!({ "hello": input["name"] }))
.build()
.map_err(|err| err.into())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn post_test() -> Result<(), anyhow::Error> {
let reqevt = Event::default();
let respevt = handle_event(reqevt).await?;
let output = match respevt.data() {
Some(Data::Binary(v)) => from_slice(v)?,
Some(Data::String(v)) => from_str(v)?,
Some(Data::Json(v)) => v.to_owned(),
None => json!({ "name": "default" }),
};
assert_eq!(output, json!({ "hello": "default" }));
Ok(())
}
}

View File

@ -1,49 +0,0 @@
use cloudevents::binding::http_0_2::builder::adapter::to_response;
use cloudevents::binding::http_0_2::to_event;
use hyper::service::{make_service_fn, service_fn};
use hyper::Server;
use hyper::{Body, Method, Request, Response, StatusCode};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::result::Result;
mod handler;
#[allow(clippy::redundant_closure)]
#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([0, 0, 0, 0], 9000));
let make_svc = make_service_fn(|_| async move {
Ok::<_, Infallible>(service_fn(move |req| handle_request(req)))
});
let server = Server::bind(&addr).serve(make_svc);
if let Err(e) = server.await {
eprintln!("server error: {}", e);
}
Ok(())
}
async fn handle_request(
req: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> {
match (req.method(), req.uri().path()) {
(&Method::POST, "/") => {
let headers = req.headers().clone();
let body_bytes = hyper::body::to_bytes(req.into_body()).await?;
let body = body_bytes.to_vec();
let reqevt = to_event(&headers, body)?;
let _respevt = handler::handle_event(reqevt).await?;
to_response(_respevt).map_err(|err| err.into())
}
(&Method::GET, "/health/readiness") => {
Ok(Response::new(Body::from("")))
}
(&Method::GET, "/health/liveness") => Ok(Response::new(Body::from(""))),
_ => {
let mut not_found = Response::default();
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
}

View File

@ -1,4 +1,4 @@
use crate::binding::http_0_2::{to_event, Headers};
use crate::binding::http::{to_event, Headers};
use crate::Event;
use actix_web::dev::Payload;
use actix_web::web::BytesMut;
@ -6,7 +6,6 @@ use actix_web::{web, HttpRequest};
use async_trait::async_trait;
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use http::header::{AsHeaderName, HeaderName, HeaderValue};
use http_0_2 as http;
/// Implement Headers for the actix HeaderMap
impl<'a> Headers<'a> for actix_http::header::HeaderMap {

View File

@ -1,9 +1,8 @@
use crate::binding::http_0_2::{Builder, Serializer};
use crate::binding::http::{Builder, Serializer};
use crate::message::{BinaryDeserializer, Result};
use crate::Event;
use actix_web::http::StatusCode;
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder};
use http_0_2 as http;
impl Builder<HttpResponse> for HttpResponseBuilder {
fn header(&mut self, key: &str, value: http::header::HeaderValue) {

View File

@ -1,44 +1,46 @@
use axum::body::Bytes;
use axum::extract::{FromRequest, Request};
use axum::response::Response;
use axum_lib as axum;
use http;
use async_trait::async_trait;
use axum::extract::{FromRequest, RequestParts};
use http::StatusCode;
use http_body::Body;
use hyper::body;
use crate::binding::http::to_event;
use crate::event::Event;
impl<S> FromRequest<S> for Event
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[async_trait]
impl<B> FromRequest<B> for Event
where
Bytes: FromRequest<S>,
S: Send + Sync,
B: Body + Send,
B::Data: Send,
B::Error: Into<BoxError>,
{
type Rejection = Response;
type Rejection = (StatusCode, String);
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
let (parts, body) = req.into_parts();
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
let req_body = req
.take_body()
.ok_or(0)
.map_err(|_| (StatusCode::BAD_REQUEST, "unexpected empty body".to_string()))?;
let buf = body::to_bytes(req_body)
.await
.map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e.into())))?
.to_vec();
let headers = req.headers();
let body = axum::body::to_bytes(body, usize::MAX).await.map_err(|e| {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from(e.to_string()))
.unwrap()
})?;
to_event(&parts.headers, body.to_vec()).map_err(|e| {
Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(axum::body::Body::from(e.to_string()))
.unwrap()
})
to_event(headers, buf).map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e)))
}
}
#[cfg(test)]
mod tests {
use axum_lib as axum;
use super::*;
use axum::body::Body;
use axum::extract::FromRequest;
use axum::http::{self, Request, StatusCode};
use crate::test::fixtures;
@ -47,39 +49,43 @@ mod tests {
async fn axum_test_request() {
let expected = fixtures::v10::minimal_string_extension();
let request = Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "test_event.test_application")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.body(Body::empty())
.unwrap();
let mut request = RequestParts::new(
Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "test_event.test_application")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.body(Body::empty())
.unwrap(),
);
let result = Event::from_request(request, &()).await.unwrap();
let result = Event::from_request(&mut request).await.unwrap();
assert_eq!(expected, result);
}
#[tokio::test]
async fn axum_test_bad_request() {
let request = Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "BAD SPECIFICATION")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", fixtures::time().to_rfc3339())
.body(Body::empty())
.unwrap();
let mut request = RequestParts::new(
Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "BAD SPECIFICATION")
.header("ce-id", "0001")
.header("ce-type", "example.test")
.header("ce-source", "http://localhost/")
.header("ce-someint", "10")
.header("ce-time", fixtures::time().to_rfc3339())
.body(Body::empty())
.unwrap(),
);
let result = Event::from_request(request, &()).await;
let result = Event::from_request(&mut request).await;
assert!(result.is_err());
let rejection = result.unwrap_err();
let reason = rejection.status();
let reason = rejection.0;
assert_eq!(reason, StatusCode::BAD_REQUEST)
}
@ -87,22 +93,24 @@ mod tests {
async fn axum_test_request_with_full_data() {
let expected = fixtures::v10::full_binary_json_data_string_extension();
let request = Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "test_event.test_application")
.header("ce-source", "http://localhost/")
.header("ce-subject", "cloudevents-sdk")
.header("content-type", "application/json")
.header("ce-string_ex", "val")
.header("ce-int_ex", "10")
.header("ce-bool_ex", "true")
.header("ce-time", &fixtures::time().to_rfc3339())
.body(Body::from(fixtures::json_data_binary()))
.unwrap();
let mut request = RequestParts::new(
Request::builder()
.method(http::Method::POST)
.header("ce-specversion", "1.0")
.header("ce-id", "0001")
.header("ce-type", "test_event.test_application")
.header("ce-source", "http://localhost/")
.header("ce-subject", "cloudevents-sdk")
.header("content-type", "application/json")
.header("ce-string_ex", "val")
.header("ce-int_ex", "10")
.header("ce-bool_ex", "true")
.header("ce-time", &fixtures::time().to_rfc3339())
.body(Body::from(fixtures::json_data_binary()))
.unwrap(),
);
let result = Event::from_request(request, &()).await.unwrap();
let result = Event::from_request(&mut request).await.unwrap();
assert_eq!(expected, result);
}

View File

@ -155,7 +155,7 @@ mod tests {
);
let (_, body) = resp.into_parts();
let body = axum::body::to_bytes(body, usize::MAX).await.unwrap();
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(j.to_string().as_bytes(), body);
}

View File

@ -1,21 +1,27 @@
use axum_lib as axum;
use axum::{
body::{boxed, BoxBody},
http::Response,
response::IntoResponse,
};
use http::{header, StatusCode};
use hyper::body::Body;
use crate::binding::http::builder::adapter::to_response;
use crate::event::Event;
use axum::{body::Body, http::Response, response::IntoResponse};
use axum_lib as axum;
use http;
use http::{header, StatusCode};
impl IntoResponse for Event {
fn into_response(self) -> Response<Body> {
fn into_response(self) -> Response<BoxBody> {
match to_response(self) {
Ok(resp) => {
let (parts, body) = resp.into_parts();
Response::from_parts(parts, Body::new(body))
Response::from_parts(parts, boxed(body))
}
Err(err) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from(err.to_string()))
.body(boxed(Body::from(err.to_string())))
.unwrap(),
}
}
@ -99,7 +105,7 @@ mod tests {
);
let (_, body) = resp.into_parts();
let body = axum::body::to_bytes(body, usize::MAX).await.unwrap();
let body = hyper::body::to_bytes(body).await.unwrap();
assert_eq!(fixtures::json_data_binary(), body);
}

View File

@ -1,38 +1,33 @@
use bytes::Bytes;
use http::Response;
use http_body_util::Full;
use hyper::body::Body;
use std::cell::Cell;
use crate::binding::http::{Builder, Serializer};
use crate::message::{BinaryDeserializer, Error, Result};
use crate::Event;
use std::convert::Infallible;
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Infallible>;
struct Adapter {
builder: Cell<http::response::Builder>,
}
impl Builder<Response<BoxBody>> for Adapter {
impl Builder<Response<Body>> for Adapter {
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
self.builder.set(self.builder.take().header(key, value));
}
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<BoxBody>> {
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<Body>> {
self.builder
.take()
.body(BoxBody::new(Full::from(bytes)))
.body(Body::from(bytes))
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
}
fn finish(&mut self) -> Result<Response<BoxBody>> {
fn finish(&mut self) -> Result<Response<Body>> {
self.body(Vec::new())
}
}
pub fn to_response(event: Event) -> std::result::Result<Response<BoxBody>, Error> {
pub fn to_response(event: Event) -> std::result::Result<Response<Body>, Error> {
BinaryDeserializer::deserialize_binary(
event,
Serializer::new(Adapter {

View File

@ -3,8 +3,6 @@ pub mod adapter;
use crate::message::Result;
use http;
pub trait Builder<R> {
fn header(&mut self, key: &str, value: http::header::HeaderValue);
fn body(&mut self, bytes: Vec<u8>) -> Result<R>;

View File

@ -8,8 +8,6 @@ use crate::{
Result, StructuredDeserializer, StructuredSerializer,
},
};
use http;
use std::convert::TryFrom;
pub struct Deserializer<'a, T: Headers<'a>> {
@ -36,9 +34,9 @@ impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
.unwrap()?,
)?;
let attributes = spec_version.attribute_names();
visitor = visitor.set_spec_version(spec_version.clone())?;
visitor = visitor.set_spec_version(spec_version)?;
let attributes = spec_version.attribute_names();
for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
let key = hn.as_str();

View File

@ -1,7 +1,5 @@
use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue};
use http;
/// Any http library should be able to use the
/// [`to_event`](super::to_event) function with an implementation of
/// this trait.

View File

@ -13,8 +13,6 @@ mod serializer;
pub use builder::Builder;
use core::convert::TryFrom;
use http::Response;
use http;
pub use serializer::Serializer;
use std::convert::TryInto;
use std::fmt::Debug;
@ -55,8 +53,6 @@ mod tests {
use core::convert::TryFrom;
use http::Response;
use http;
#[test]
fn test_response_to_event() {
let event = fixtures::v10::minimal_string_extension();

View File

@ -12,8 +12,6 @@ use crate::message::{
};
use crate::Event;
use http::Request;
use http;
use std::convert::TryFrom;
use std::fmt::Debug;
@ -133,8 +131,6 @@ mod tests {
use crate::test::fixtures;
use bytes::Bytes;
use http::Request;
use http;
use std::convert::TryFrom;
#[test]

View File

@ -1,44 +0,0 @@
use http::Response;
use http_0_2 as http;
use hyper::body::Body;
use std::cell::Cell;
#[cfg(not(target_os = "wasi"))]
use hyper_0_14 as hyper;
#[cfg(target_os = "wasi")]
use hyper;
use crate::binding::http_0_2::{Builder, Serializer};
use crate::message::{BinaryDeserializer, Error, Result};
use crate::Event;
struct Adapter {
builder: Cell<http::response::Builder>,
}
impl Builder<Response<Body>> for Adapter {
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
self.builder.set(self.builder.take().header(key, value));
}
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<Body>> {
self.builder
.take()
.body(Body::from(bytes))
.map_err(|e| crate::message::Error::Other {
source: Box::new(e),
})
}
fn finish(&mut self) -> Result<Response<Body>> {
self.body(Vec::new())
}
}
pub fn to_response(event: Event) -> std::result::Result<Response<Body>, Error> {
BinaryDeserializer::deserialize_binary(
event,
Serializer::new(Adapter {
builder: Cell::new(http::Response::builder()),
}),
)
}

View File

@ -1,11 +0,0 @@
#[cfg(feature = "hyper-0-14")]
pub mod adapter;
use crate::message::Result;
use http_0_2 as http;
pub trait Builder<R> {
fn header(&mut self, key: &str, value: http::header::HeaderValue);
fn body(&mut self, bytes: Vec<u8>) -> Result<R>;
fn finish(&mut self) -> Result<R>;
}

View File

@ -1,101 +0,0 @@
use super::{Headers, SPEC_VERSION_HEADER};
use crate::{
binding::CLOUDEVENTS_JSON_HEADER,
event::SpecVersion,
header_value_to_str, message,
message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
},
};
use http_0_2 as http;
use std::convert::TryFrom;
pub struct Deserializer<'a, T: Headers<'a>> {
headers: &'a T,
body: Vec<u8>,
}
impl<'a, T: Headers<'a>> Deserializer<'a, T> {
pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
Deserializer { headers, body }
}
}
impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}
let spec_version = SpecVersion::try_from(
self.headers
.get(SPEC_VERSION_HEADER)
.map(|a| header_value_to_str!(a))
.unwrap()?,
)?;
let attributes = spec_version.attribute_names();
visitor = visitor.set_spec_version(spec_version)?;
for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
let key = hn.as_str();
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
}) {
let name = &hn.as_str()["ce-".len()..];
if attributes.contains(&name) {
visitor = visitor.set_attribute(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
} else {
visitor = visitor.set_extension(
name,
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
}
if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
visitor = visitor.set_attribute(
"datacontenttype",
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
)?
}
if !self.body.is_empty() {
visitor.end_with_data(self.body)
} else {
visitor.end()
}
}
}
impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
if self.encoding() != Encoding::STRUCTURED {
return Err(message::Error::WrongEncoding {});
}
visitor.set_structured_event(self.body)
}
}
impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
fn encoding(&self) -> Encoding {
if self
.headers
.get(http::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
.is_some()
{
Encoding::STRUCTURED
} else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
Encoding::BINARY
} else {
Encoding::UNKNOWN
}
}
}

View File

@ -1,22 +0,0 @@
use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue};
use http_0_2 as http;
/// Any http library should be able to use the
/// [`to_event`](super::to_event) function with an implementation of
/// this trait.
pub trait Headers<'a> {
type Iterator: Iterator<Item = (&'a HeaderName, &'a HeaderValue)>;
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue>;
fn iter(&'a self) -> Self::Iterator;
}
/// Implemention for the HeaderMap used by warp/reqwest
impl<'a> Headers<'a> for HeaderMap<HeaderValue> {
type Iterator = http::header::Iter<'a, HeaderValue>;
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue> {
self.get(name)
}
fn iter(&'a self) -> Self::Iterator {
self.iter()
}
}

View File

@ -1,73 +0,0 @@
pub mod builder;
pub mod deserializer;
mod headers;
use crate::{
message::{Error, MessageDeserializer},
Event,
};
use deserializer::Deserializer;
pub use headers::Headers;
mod serializer;
pub use builder::Builder;
use core::convert::TryFrom;
use http::Response;
use http_0_2 as http;
pub use serializer::Serializer;
use std::convert::TryInto;
use std::fmt::Debug;
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
/// Turn a pile of HTTP headers and a body into a CloudEvent
pub fn to_event<'a, T: Headers<'a>>(
headers: &'a T,
body: Vec<u8>,
) -> std::result::Result<Event, Error> {
MessageDeserializer::into_event(Deserializer::new(headers, body))
}
pub fn header_prefix(name: &str) -> String {
super::header_prefix("ce-", name)
}
impl<T> TryFrom<Response<T>> for Event
where
T: TryInto<Vec<u8>>,
<T as TryInto<Vec<u8>>>::Error: Debug,
{
type Error = crate::message::Error;
fn try_from(response: Response<T>) -> Result<Self, Self::Error> {
let headers = response.headers().to_owned();
let body = T::try_into(response.into_body()).unwrap();
to_event(&headers, body)
}
}
#[cfg(test)]
mod tests {
use crate::test::fixtures;
use crate::Event;
use core::convert::TryFrom;
use http::Response;
use http_0_2 as http;
#[test]
fn test_response_to_event() {
let event = fixtures::v10::minimal_string_extension();
let response = Response::builder()
.header("ce-id", fixtures::id())
.header("ce-source", fixtures::source())
.header("ce-type", fixtures::ty())
.header("ce-specversion", "1.0")
.header("ce-someint", "10")
.body(Vec::new())
.unwrap();
assert_eq!(event, Event::try_from(response).unwrap());
}
}

View File

@ -1,159 +0,0 @@
use std::{cell::RefCell, rc::Rc};
use crate::binding::http_0_2::builder::Builder;
use crate::binding::{
http_0_2::{header_prefix, SPEC_VERSION_HEADER},
CLOUDEVENTS_JSON_HEADER,
};
use crate::event::SpecVersion;
use crate::message::BinaryDeserializer;
use crate::message::{
BinarySerializer, Error, MessageAttributeValue, Result, StructuredSerializer,
};
use crate::Event;
use http::Request;
use http_0_2 as http;
use std::convert::TryFrom;
use std::fmt::Debug;
macro_rules! str_to_header_value {
($header_value:expr) => {
http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(|e| {
crate::message::Error::Other {
source: Box::new(e),
}
})
};
}
pub struct Serializer<T> {
builder: Rc<RefCell<dyn Builder<T>>>,
}
impl<T> Serializer<T> {
pub fn new<B: Builder<T> + 'static>(delegate: B) -> Serializer<T> {
let builder = Rc::new(RefCell::new(delegate));
Serializer { builder }
}
}
impl<T> BinarySerializer<T> for Serializer<T> {
fn set_spec_version(self, spec_version: SpecVersion) -> Result<Self> {
self.builder
.borrow_mut()
.header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
Ok(self)
}
fn set_attribute(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder
.borrow_mut()
.header(&header_prefix(name), str_to_header_value!(value)?);
Ok(self)
}
fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
self.builder
.borrow_mut()
.header(&header_prefix(name), str_to_header_value!(value)?);
Ok(self)
}
fn end_with_data(self, bytes: Vec<u8>) -> Result<T> {
self.builder.borrow_mut().body(bytes)
}
fn end(self) -> Result<T> {
self.builder.borrow_mut().finish()
}
}
impl<T> StructuredSerializer<T> for Serializer<T> {
fn set_structured_event(self, bytes: Vec<u8>) -> Result<T> {
let mut builder = self.builder.borrow_mut();
builder.header(
http::header::CONTENT_TYPE.as_str(),
http::HeaderValue::from_static(CLOUDEVENTS_JSON_HEADER),
);
builder.body(bytes)
}
}
impl<T> BinarySerializer<http::request::Request<Option<T>>> for http::request::Builder
where
T: TryFrom<Vec<u8>>,
<T as TryFrom<Vec<u8>>>::Error: Debug,
{
fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
self = self.header(SPEC_VERSION_HEADER, &sv.to_string());
Ok(self)
}
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self = self.header(key, &value.to_string());
Ok(self)
}
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
let key = &header_prefix(name);
self = self.header(key, &value.to_string());
Ok(self)
}
fn end_with_data(self, bytes: Vec<u8>) -> Result<http::request::Request<Option<T>>> {
let body = T::try_from(bytes).unwrap();
self.body(Some(body)).map_err(|e| Error::Other {
source: Box::new(e),
})
}
fn end(self) -> Result<http::request::Request<Option<T>>> {
self.body(None).map_err(|e| Error::Other {
source: Box::new(e),
})
}
}
impl<T> TryFrom<Event> for Request<Option<T>>
where
T: TryFrom<Vec<u8>>,
<T as TryFrom<Vec<u8>>>::Error: Debug,
{
type Error = crate::message::Error;
fn try_from(event: Event) -> Result<Self> {
BinaryDeserializer::deserialize_binary(event, http::request::Builder::new())
}
}
#[cfg(test)]
mod tests {
use crate::test::fixtures;
use bytes::Bytes;
use http::Request;
use http_0_2 as http;
use std::convert::TryFrom;
#[test]
fn test_event_to_http_request() {
let event = fixtures::v10::minimal_string_extension();
let request: Request<Option<Vec<u8>>> = Request::try_from(event).unwrap();
assert_eq!(request.headers()["ce-id"], "0001");
assert_eq!(request.headers()["ce-type"], "test_event.test_application");
}
#[test]
fn test_event_to_bytes_body() {
let event = fixtures::v10::full_binary_json_data_string_extension();
let request: Request<Option<Vec<u8>>> = Request::try_from(event).unwrap();
assert_eq!(request.headers()["ce-id"], "0001");
assert_eq!(request.headers()["ce-type"], "test_event.test_application");
assert_eq!(
request.body().as_ref().unwrap(),
&Bytes::from(fixtures::json_data().to_string())
);
}
}

View File

@ -11,6 +11,8 @@ pub mod axum;
docsrs,
doc(cfg(any(
feature = "http-binding",
feature = "actix",
feature = "warp",
feature = "reqwest",
feature = "axum",
feature = "poem"
@ -18,19 +20,13 @@ pub mod axum;
)]
#[cfg(any(
feature = "http-binding",
feature = "actix",
feature = "warp",
feature = "reqwest",
feature = "axum",
feature = "poem"
))]
pub mod http;
#[cfg_attr(
docsrs,
doc(cfg(any(feature = "http-0-2-binding", feature = "actix", feature = "warp",)))
)]
#[cfg(any(feature = "http-0-2-binding", feature = "actix", feature = "warp",))]
pub mod http_0_2;
#[cfg_attr(docsrs, doc(cfg(feature = "nats")))]
#[cfg(feature = "nats")]
pub mod nats;
@ -56,7 +52,6 @@ pub(crate) mod kafka {
}
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
pub(crate) static CLOUDEVENTS_BATCH_JSON_HEADER: &str = "application/cloudevents-batch+json";
pub(crate) static CONTENT_TYPE: &str = "content-type";
fn header_prefix(prefix: &str, name: &str) -> String {

View File

@ -1,16 +1,18 @@
use crate::binding::http::to_event;
use crate::Event;
use async_trait::async_trait;
use poem_lib::error::ResponseError;
use poem_lib::http::StatusCode;
use poem_lib::{FromRequest, Request, RequestBody, Result};
use crate::binding::http::to_event;
use crate::Event;
impl ResponseError for crate::message::Error {
fn status(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}
#[async_trait]
impl<'a> FromRequest<'a> for Event {
async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result<Self> {
Ok(to_event(req.headers(), body.take()?.into_vec().await?)?)
@ -55,7 +57,7 @@ mod tests {
let (req, mut body) = req.split();
let resp = Event::from_request(&req, &mut body).await.err().unwrap();
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
assert_eq!(resp.as_response().status(), StatusCode::BAD_REQUEST);
assert_eq!(resp.to_string(), "Invalid specversion BAD SPECIFICATION");
}

View File

@ -1,33 +1,14 @@
use crate::{AttributesReader, Data, Event};
use bytes::Bytes;
use poem_lib::http::StatusCode;
use poem_lib::{IntoResponse, Response};
use crate::binding::http::builder::adapter::to_response;
use crate::Event;
impl IntoResponse for Event {
fn into_response(self) -> Response {
let mut builder = Response::builder().status(StatusCode::OK);
if let Some(dct) = self.datacontenttype() {
builder = builder.content_type(dct);
}
for (key, value) in self.iter() {
builder = builder.header(format!("ce-{key}").as_str(), value.to_string());
}
match self.data {
Some(data) => match data {
Data::Binary(v) => builder.body(Bytes::copy_from_slice(v.as_slice())),
Data::String(s) => builder.body(s.clone()),
Data::Json(j) => match serde_json::to_string(&j) {
Ok(s) => builder.body(s),
Err(e) => Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(e.to_string()),
},
},
None => builder.finish(),
match to_response(self) {
Ok(resp) => resp.into(),
Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into(),
}
}
}

View File

@ -51,9 +51,9 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
})?,
)?;
let attributes = spec_version.attribute_names();
visitor = visitor.set_spec_version(spec_version.clone())?;
visitor = visitor.set_spec_version(spec_version)?;
let attributes = spec_version.attribute_names();
if let Some(hv) = self.headers.remove(CONTENT_TYPE) {
visitor = visitor.set_attribute(

View File

@ -2,7 +2,7 @@ use reqwest_lib as reqwest;
use crate::binding::{
http::{header_prefix, SPEC_VERSION_HEADER},
CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER,
CLOUDEVENTS_JSON_HEADER,
};
use crate::event::SpecVersion;
use crate::message::{
@ -72,35 +72,18 @@ pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result
BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
}
/// Method to fill a [`RequestBuilder`] with a batched [`Vec<Event>`].
pub fn events_to_request(
events: Vec<Event>,
request_builder: RequestBuilder,
) -> Result<RequestBuilder> {
let bytes = serde_json::to_vec(&events)?;
Ok(request_builder
.header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER)
.body(bytes))
}
/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`].
///
/// This trait is sealed and cannot be implemented for types outside of this crate.
pub trait RequestBuilderExt: private::Sealed {
/// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`].
fn event(self, event: Event) -> Result<RequestBuilder>;
/// Write in this [`RequestBuilder`] the provided batched [`Vec<Event>`].
fn events(self, events: Vec<Event>) -> Result<RequestBuilder>;
}
impl RequestBuilderExt for RequestBuilder {
fn event(self, event: Event) -> Result<RequestBuilder> {
event_to_request(event, self)
}
fn events(self, events: Vec<Event>) -> Result<RequestBuilder> {
events_to_request(events, self)
}
}
// Sealing the RequestBuilderExt
@ -114,7 +97,7 @@ mod private {
#[cfg(test)]
mod tests {
use super::*;
use mockito::Matcher;
use mockito::{mock, Matcher};
use reqwest_lib as reqwest;
use crate::message::StructuredDeserializer;
@ -123,7 +106,7 @@ mod tests {
#[tokio::test]
async fn test_request() {
let url = mockito::server_url();
let m = mockito::mock("POST", "/")
let m = mock("POST", "/")
.match_header("ce-specversion", "1.0")
.match_header("ce-id", "0001")
.match_header("ce-type", "test_event.test_application")
@ -149,7 +132,7 @@ mod tests {
#[tokio::test]
async fn test_request_with_full_data() {
let url = mockito::server_url();
let m = mockito::mock("POST", "/")
let m = mock("POST", "/")
.match_header("ce-specversion", "1.0")
.match_header("ce-id", "0001")
.with_header("ce-type", "test_event.test_application")
@ -183,7 +166,7 @@ mod tests {
let input = fixtures::v10::full_json_data_string_extension();
let url = mockito::server_url();
let m = mockito::mock("POST", "/")
let m = mock("POST", "/")
.match_header("content-type", "application/cloudevents+json")
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
.create();
@ -200,26 +183,4 @@ mod tests {
m.assert();
}
#[tokio::test]
async fn test_batched_request() {
let input = vec![fixtures::v10::full_json_data_string_extension()];
let url = mockito::server_url();
let m = mockito::mock("POST", "/")
.match_header("content-type", "application/cloudevents-batch+json")
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
.create();
let client = reqwest::Client::new();
client
.post(&url)
.events(input)
.unwrap()
.send()
.await
.unwrap();
m.assert();
}
}

View File

@ -1,11 +1,9 @@
use reqwest_lib as reqwest;
use crate::binding;
use crate::binding::http;
use crate::message::{Error, Result};
use crate::Event;
use async_trait::async_trait;
use http;
use http::header;
use reqwest::Response;
/// Method to transform an incoming [`Response`] to [`Event`].
@ -14,26 +12,7 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
let b = res.bytes().await.map_err(|e| Error::Other {
source: Box::new(e),
})?;
binding::http::to_event(&h, b.to_vec())
}
/// Method to transform an incoming [`Response`] to a batched [`Vec<Event>`]
pub async fn response_to_events(res: Response) -> Result<Vec<Event>> {
if res
.headers()
.get(header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.filter(|&v| v.starts_with(binding::CLOUDEVENTS_BATCH_JSON_HEADER))
.is_none()
{
return Err(Error::WrongEncoding {});
}
let bytes = res.bytes().await.map_err(|e| Error::Other {
source: Box::new(e),
})?;
Ok(serde_json::from_slice(&bytes)?)
http::to_event(&h, b.to_vec())
}
/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
@ -43,8 +22,6 @@ pub async fn response_to_events(res: Response) -> Result<Vec<Event>> {
pub trait ResponseExt: private::Sealed {
/// Convert this [`Response`] to [`Event`].
async fn into_event(self) -> Result<Event>;
/// Convert this [`Response`] to a batched [`Vec<Event>`].
async fn into_events(self) -> Result<Vec<Event>>;
}
#[async_trait(?Send)]
@ -52,10 +29,6 @@ impl ResponseExt for Response {
async fn into_event(self) -> Result<Event> {
response_to_event(self).await
}
async fn into_events(self) -> Result<Vec<Event>> {
response_to_events(self).await
}
}
// Sealing the ResponseExt
@ -69,15 +42,15 @@ mod private {
#[cfg(test)]
mod tests {
use super::*;
use mockito::mock;
use reqwest_lib as reqwest;
use std::vec;
use crate::test::fixtures;
#[tokio::test]
async fn test_response() {
let url = mockito::server_url();
let _m = mockito::mock("GET", "/")
let _m = mock("GET", "/")
.with_status(200)
.with_header("ce-specversion", "1.0")
.with_header("ce-id", "0001")
@ -104,7 +77,7 @@ mod tests {
#[tokio::test]
async fn test_response_with_full_data() {
let url = mockito::server_url();
let _m = mockito::mock("GET", "/")
let _m = mock("GET", "/")
.with_status(200)
.with_header("ce-specversion", "1.0")
.with_header("ce-id", "0001")
@ -139,7 +112,7 @@ mod tests {
let expected = fixtures::v10::full_json_data_string_extension();
let url = mockito::server_url();
let _m = mockito::mock("GET", "/")
let _m = mock("GET", "/")
.with_status(200)
.with_header(
"content-type",
@ -160,31 +133,4 @@ mod tests {
assert_eq!(expected, res);
}
#[tokio::test]
async fn test_batched_response() {
let expected = vec![fixtures::v10::full_json_data_string_extension()];
let url = mockito::server_url();
let _m = mockito::mock("GET", "/")
.with_status(200)
.with_header(
"content-type",
"application/cloudevents-batch+json; charset=utf-8",
)
.with_body(serde_json::to_string(&expected).unwrap())
.create();
let client = reqwest::Client::new();
let res = client
.get(&url)
.send()
.await
.unwrap()
.into_events()
.await
.unwrap();
assert_eq!(expected, res);
}
}

View File

@ -1,6 +1,6 @@
use warp_lib as warp;
use crate::binding::http_0_2 as http;
use crate::binding::http;
use crate::Event;
use warp::http::HeaderMap;

View File

@ -24,7 +24,6 @@
//!
//! ```
//! # use warp_lib as warp;
//! # use http_0_2 as http;
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
//! use http::StatusCode;
//! use serde_json::json;
@ -50,11 +49,12 @@
//! .build();
//!
//! match event {
//! Ok(event) => from_event(event),
//! Err(e) => warp::reply::with_status(
//! Ok(event) => Ok(from_event(event)),
//! Err(e) => Ok(warp::reply::with_status(
//! e.to_string(),
//! StatusCode::INTERNAL_SERVER_ERROR,
//! ).into_response(),
//! )
//! .into_response()),
//! }
//! });
//!

View File

@ -1,12 +1,11 @@
use warp_lib as warp;
use crate::binding::http_0_2::builder::adapter::to_response;
use crate::binding::http::builder::adapter::to_response;
use crate::Event;
use http::StatusCode;
use http_0_2 as http;
use hyper_0_14 as hyper;
use warp::reply::Response;
///
/// # Serializes [`crate::Event`] as a http response
///
@ -33,7 +32,6 @@ pub fn from_event(event: Event) -> Response {
#[cfg(test)]
mod tests {
use crate::test::fixtures;
use hyper_0_14 as hyper;
#[test]
fn test_response() {

View File

@ -2,7 +2,6 @@ use super::{
AttributesIntoIteratorV03, AttributesIntoIteratorV10, AttributesV03, AttributesV10,
ExtensionValue, SpecVersion, UriReference,
};
use base64::prelude::*;
use chrono::{DateTime, Utc};
use serde::Serializer;
use std::fmt;
@ -12,14 +11,13 @@ use url::Url;
/// This represents the types defined in the [CloudEvent spec type system](https://github.com/cloudevents/spec/blob/v1.0/spec.md#type-system)
#[derive(Debug, PartialEq, Eq)]
pub enum AttributeValue<'a> {
Boolean(&'a bool),
Integer(&'a i64),
SpecVersion(SpecVersion),
String(&'a str),
Binary(&'a [u8]),
URI(&'a Url),
URIRef(&'a UriReference),
Boolean(&'a bool),
Integer(&'a i64),
Time(&'a DateTime<Utc>),
SpecVersion(SpecVersion),
}
impl<'a> From<&'a ExtensionValue> for AttributeValue<'a> {
@ -35,14 +33,13 @@ impl<'a> From<&'a ExtensionValue> for AttributeValue<'a> {
impl fmt::Display for AttributeValue<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
AttributeValue::Boolean(b) => f.serialize_bool(**b),
AttributeValue::Integer(i) => f.serialize_i64(**i),
AttributeValue::SpecVersion(s) => s.fmt(f),
AttributeValue::String(s) => f.write_str(s),
AttributeValue::Binary(b) => f.write_str(&BASE64_STANDARD.encode(b)),
AttributeValue::URI(s) => f.write_str(s.as_str()),
AttributeValue::URIRef(s) => f.write_str(s.as_str()),
AttributeValue::Time(s) => f.write_str(&s.to_rfc3339()),
AttributeValue::SpecVersion(s) => s.fmt(f),
AttributeValue::Boolean(b) => f.serialize_bool(**b),
AttributeValue::Integer(i) => f.serialize_i64(**i),
}
}
}

View File

@ -66,7 +66,7 @@ mod tests {
use crate::EventBuilder;
use crate::EventBuilderV03;
use crate::EventBuilderV10;
use claims::*;
use claim::*;
use rstest::rstest;
use serde_json::{json, Value};
use serde_yaml;

View File

@ -3,7 +3,6 @@ use super::{
EventFormatSerializerV03, EventFormatSerializerV10,
};
use crate::event::{AttributesReader, ExtensionValue};
use base64::prelude::*;
use serde::de::{Error, IntoDeserializer};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use serde_json::{Map, Value};
@ -59,9 +58,7 @@ pub fn parse_data_string<E: serde::de::Error>(v: Value) -> Result<String, E> {
pub fn parse_data_base64<E: serde::de::Error>(v: Value) -> Result<Vec<u8>, E> {
parse_field!(v, String, E).and_then(|s| {
BASE64_STANDARD
.decode(s)
.map_err(|e| E::custom(format_args!("decode error `{}`", e)))
base64::decode(s).map_err(|e| E::custom(format_args!("decode error `{}`", e)))
})
}

View File

@ -84,29 +84,26 @@ pub struct Event {
#[delegate(self.attributes)]
impl AttributesReader for Event {
fn id(&self) -> &str {}
fn source(&self) -> &UriReference {}
fn specversion(&self) -> SpecVersion {}
fn ty(&self) -> &str {}
fn datacontenttype(&self) -> Option<&str> {}
fn dataschema(&self) -> Option<&Url> {}
fn subject(&self) -> Option<&str> {}
fn time(&self) -> Option<&DateTime<Utc>> {}
fn id(&self) -> &str;
fn source(&self) -> &UriReference;
fn specversion(&self) -> SpecVersion;
fn ty(&self) -> &str;
fn datacontenttype(&self) -> Option<&str>;
fn dataschema(&self) -> Option<&Url>;
fn subject(&self) -> Option<&str>;
fn time(&self) -> Option<&DateTime<Utc>>;
}
#[delegate(self.attributes)]
impl AttributesWriter for Event {
fn set_id(&mut self, id: impl Into<String>) -> String {}
fn set_source(&mut self, source: impl Into<UriReference>) -> UriReference {}
fn set_type(&mut self, ty: impl Into<String>) -> String {}
fn set_subject(&mut self, subject: Option<impl Into<String>>) -> Option<String> {}
fn set_time(&mut self, time: Option<impl Into<DateTime<Utc>>>) -> Option<DateTime<Utc>> {}
fn set_datacontenttype(
&mut self,
datacontenttype: Option<impl Into<String>>,
) -> Option<String> {
}
fn set_dataschema(&mut self, dataschema: Option<impl Into<Url>>) -> Option<Url> {}
fn set_id(&mut self, id: impl Into<String>) -> String;
fn set_source(&mut self, source: impl Into<UriReference>) -> UriReference;
fn set_type(&mut self, ty: impl Into<String>) -> String;
fn set_subject(&mut self, subject: Option<impl Into<String>>) -> Option<String>;
fn set_time(&mut self, time: Option<impl Into<DateTime<Utc>>>) -> Option<DateTime<Utc>>;
fn set_datacontenttype(&mut self, datacontenttype: Option<impl Into<String>>)
-> Option<String>;
fn set_dataschema(&mut self, dataschema: Option<impl Into<Url>>) -> Option<Url>;
}
impl Default for Event {

View File

@ -221,7 +221,7 @@ impl crate::event::message::AttributesDeserializer for super::Attributes {
mod tests {
use super::*;
use crate::test::fixtures;
use chrono::DateTime;
use chrono::NaiveDateTime;
#[test]
fn iter_v03_test() {
@ -243,10 +243,13 @@ mod tests {
datacontenttype: None,
schemaurl: None,
subject: None,
time: DateTime::from_timestamp(61, 0),
time: Some(DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(61, 0),
Utc,
)),
};
let b = &mut a.into_iter();
let time = DateTime::from_timestamp(61, 0).unwrap();
let time = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(61, 0), Utc);
assert_eq!(
("specversion", AttributeValue::SpecVersion(SpecVersion::V03)),

View File

@ -4,7 +4,6 @@ use crate::event::format::{
parse_data_base64, parse_data_base64_json, parse_data_json, parse_data_string,
};
use crate::event::{Data, ExtensionValue};
use base64::prelude::*;
use chrono::{DateTime, Utc};
use serde::de::IntoDeserializer;
use serde::ser::SerializeMap;
@ -103,7 +102,7 @@ impl<S: serde::Serializer> crate::event::format::EventFormatSerializer<S, Attrib
Some(Data::Json(j)) => state.serialize_entry("data", j)?,
Some(Data::String(s)) => state.serialize_entry("data", s)?,
Some(Data::Binary(v)) => {
state.serialize_entry("data", &BASE64_STANDARD.encode(v))?;
state.serialize_entry("data", &base64::encode(v))?;
state.serialize_entry("datacontentencoding", "base64")?;
}
_ => (),

View File

@ -222,6 +222,7 @@ impl AttributesConverter for Attributes {
mod tests {
use super::*;
use crate::test::fixtures;
use chrono::NaiveDateTime;
#[test]
fn iter_v10_test() {
@ -243,10 +244,13 @@ mod tests {
datacontenttype: None,
dataschema: None,
subject: None,
time: DateTime::from_timestamp(61, 0),
time: Some(DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(61, 0),
Utc,
)),
};
let b = &mut a.into_iter();
let time = DateTime::from_timestamp(61, 0).unwrap();
let time = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(61, 0), Utc);
assert_eq!(
("specversion", AttributeValue::SpecVersion(SpecVersion::V10)),

View File

@ -4,7 +4,6 @@ use crate::event::format::{
parse_data_base64, parse_data_base64_json, parse_data_json, parse_data_string,
};
use crate::event::{Data, ExtensionValue};
use base64::prelude::*;
use chrono::{DateTime, Utc};
use serde::de::IntoDeserializer;
use serde::ser::SerializeMap;
@ -103,9 +102,7 @@ impl<S: serde::Serializer> crate::event::format::EventFormatSerializer<S, Attrib
match data {
Some(Data::Json(j)) => state.serialize_entry("data", j)?,
Some(Data::String(s)) => state.serialize_entry("data", s)?,
Some(Data::Binary(v)) => {
state.serialize_entry("data_base64", &BASE64_STANDARD.encode(v))?
}
Some(Data::Binary(v)) => state.serialize_entry("data_base64", &base64::encode(v))?,
_ => (),
};
for (k, v) in extensions {

View File

@ -54,7 +54,7 @@
//! [Extractors]: https://actix.rs/docs/extractors/
//! [Responders]: https://actix.rs/docs/handlers/
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.8.0")]
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.6.0")]
#![deny(rustdoc::broken_intra_doc_links)]
#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc

View File

@ -1,5 +1,4 @@
use crate::event::{ExtensionValue, UriReference};
use base64::prelude::*;
use chrono::{DateTime, Utc};
use std::convert::TryInto;
use std::fmt;
@ -47,14 +46,10 @@ impl fmt::Display for MessageAttributeValue {
MessageAttributeValue::Boolean(b) => write!(f, "{}", b),
MessageAttributeValue::Integer(i) => write!(f, "{}", i),
MessageAttributeValue::String(s) => write!(f, "{}", s),
MessageAttributeValue::Binary(v) => {
write!(f, "{}", BASE64_STANDARD.encode(v))
}
MessageAttributeValue::Binary(v) => write!(f, "{}", base64::encode(v)),
MessageAttributeValue::Uri(u) => write!(f, "{}", u),
MessageAttributeValue::UriRef(u) => write!(f, "{}", u),
MessageAttributeValue::DateTime(d) => {
write!(f, "{}", d.to_rfc3339())
}
MessageAttributeValue::DateTime(d) => write!(f, "{}", d.to_rfc3339()),
}
}
}

View File

@ -45,7 +45,7 @@ pub fn subject() -> String {
}
pub fn time() -> DateTime<Utc> {
Utc.with_ymd_and_hms(2020, 3, 16, 11, 50, 00).unwrap()
Utc.ymd(2020, 3, 16).and_hms(11, 50, 00)
}
pub fn string_extension() -> (String, String) {

View File

@ -1,6 +1,5 @@
use crate::test::fixtures::*;
use crate::{Event, EventBuilder, EventBuilderV03};
use base64::prelude::*;
use serde_json::{json, Value};
use url::Url;
@ -121,7 +120,7 @@ pub fn full_json_base64_data_json() -> Value {
"datacontenttype": json_datacontenttype(),
"schemaurl": dataschema(),
"datacontentencoding": "base64",
"data": BASE64_STANDARD.encode(json_data_binary())
"data": base64::encode(json_data_binary())
})
}
@ -200,6 +199,6 @@ pub fn full_xml_base64_data_json() -> Value {
int_ext_name: int_ext_value,
"datacontenttype": xml_datacontenttype(),
"datacontentencoding": "base64",
"data": BASE64_STANDARD.encode(Vec::from(xml_data()))
"data": base64::encode(Vec::from(xml_data()))
})
}

View File

@ -1,6 +1,5 @@
use crate::test::fixtures::*;
use crate::{Event, EventBuilder, EventBuilderV10};
use base64::prelude::*;
use serde_json::{json, Value};
use url::Url;
@ -167,7 +166,7 @@ pub fn full_json_base64_data_json() -> Value {
int_ext_name: int_ext_value,
"datacontenttype": json_datacontenttype(),
"dataschema": dataschema(),
"data_base64": BASE64_STANDARD.encode(json_data_binary())
"data_base64": base64::encode(json_data_binary())
})
}
@ -176,7 +175,7 @@ pub fn full_non_json_base64_data() -> Value {
Value::Object(mut m) => {
m.insert(
"data_base64".to_string(),
Value::String(BASE64_STANDARD.encode(b"hello world")),
Value::String(base64::encode(b"hello world")),
);
Value::Object(m)
}
@ -188,11 +187,7 @@ pub fn full_non_json_data() -> Event {
let mut event = full_json_data();
let value = full_non_json_base64_data();
if let Value::Object(m) = value {
event.set_data_unchecked(
BASE64_STANDARD
.decode(m["data_base64"].as_str().unwrap())
.unwrap(),
);
event.set_data_unchecked(base64::decode(m["data_base64"].as_str().unwrap()).unwrap());
}
event
}
@ -271,6 +266,6 @@ pub fn full_xml_base64_data_json() -> Value {
bool_ext_name: bool_ext_value,
int_ext_name: int_ext_value,
"datacontenttype": xml_datacontenttype(),
"data_base64": BASE64_STANDARD.encode(Vec::from(xml_data()))
"data_base64": base64::encode(Vec::from(xml_data()))
})
}

View File

@ -1,11 +1,9 @@
#[test]
#[cfg_attr(target_os = "wasi", ignore)]
fn test_readme_deps() {
version_sync::assert_markdown_deps_updated!("README.md");
}
#[test]
#[cfg_attr(target_os = "wasi", ignore)]
fn test_html_root_url() {
version_sync::assert_html_root_url_updated!("src/lib.rs");
}