Compare commits
40 Commits
Author | SHA1 | Date |
---|---|---|
|
fa0aadb31d | |
|
3590617290 | |
|
5345ee3edc | |
|
2bd15dfe9d | |
|
897cd85c40 | |
|
09661ddaf7 | |
|
9b38aead8d | |
|
f9dde9daae | |
|
0f5f748685 | |
|
bf45f01602 | |
|
1f94433c0a | |
|
2f57c3ce36 | |
|
13c36fdbfe | |
|
a59c3f55a0 | |
|
1978ae16aa | |
|
c4a5443d19 | |
|
48b7e33cc2 | |
|
b719f70cee | |
|
22f54770c9 | |
|
a73743b06a | |
|
721c42c27c | |
|
933edbc883 | |
|
109d02d66a | |
|
957ef1aa6d | |
|
e001b9cd8b | |
|
e19431fc06 | |
|
9e10eaadf0 | |
|
4f265cd142 | |
|
77232941e4 | |
|
1e147eb560 | |
|
4a86973f22 | |
|
38469b245d | |
|
20fd82a651 | |
|
4acd162cb7 | |
|
c8454bce97 | |
|
eee1f82f8d | |
|
6148f2efd3 | |
|
bc1fc8e79c | |
|
ee987c9955 | |
|
e3a86d642a |
|
@ -22,16 +22,18 @@ jobs:
|
|||
- x86_64-unknown-linux-gnu
|
||||
- x86_64-unknown-linux-musl
|
||||
- wasm32-unknown-unknown
|
||||
- wasm32-wasip1
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
|
||||
# setup wasmedge
|
||||
- run: curl -sSf https://raw.githubusercontent.com/WasmEdge/WasmEdge/master/utils/install.sh | sudo bash -s -- -p /usr/local
|
||||
# Setup musl if needed
|
||||
- run: sudo apt-get update
|
||||
if: matrix.target == 'x86_64-unknown-linux-musl'
|
||||
- run: sudo apt-get install -y musl musl-dev musl-tools cmake
|
||||
if: matrix.target == 'x86_64-unknown-linux-musl'
|
||||
|
||||
# Caching stuff
|
||||
# # Caching stuff
|
||||
- uses: actions/cache@v2
|
||||
with:
|
||||
path: |
|
||||
|
@ -96,8 +98,31 @@ jobs:
|
|||
command: build
|
||||
toolchain: ${{ matrix.toolchain }}
|
||||
args: --target wasm32-unknown-unknown --features reqwest
|
||||
|
||||
- uses: actions-rs/cargo@v1
|
||||
name: "Build"
|
||||
if: matrix.target == 'wasm32-wasi'
|
||||
with:
|
||||
command: build
|
||||
toolchain: ${{ matrix.toolchain }}
|
||||
args: --target ${{ matrix.target }} --features "http-0-2-binding hyper-0-14 hyper_wasi"
|
||||
- uses: actions-rs/cargo@v1
|
||||
name: "Test"
|
||||
if: matrix.target == 'wasm32-wasi'
|
||||
with:
|
||||
command: test
|
||||
toolchain: ${{ matrix.toolchain }}
|
||||
args: --target ${{ matrix.target }} --features "http-0-2-binding hyper-0-14 hyper_wasi"
|
||||
env:
|
||||
CARGO_TARGET_WASM32_WASI_RUNNER: wasmedge
|
||||
# Build examples
|
||||
- uses: actions-rs/cargo@v1
|
||||
name: "Build wasi-example"
|
||||
if: matrix.target == 'wasm32-wasi' && matrix.toolchain == 'stable'
|
||||
with:
|
||||
command: build
|
||||
toolchain: ${{ matrix.toolchain }}
|
||||
args: --target ${{ matrix.target }} --manifest-path ./example-projects/wasi-example/Cargo.toml
|
||||
|
||||
- uses: actions-rs/cargo@v1
|
||||
name: "Build reqwest-wasm-example"
|
||||
if: matrix.target == 'wasm32-unknown-unknown' && matrix.toolchain == 'stable'
|
||||
|
@ -153,3 +178,4 @@ jobs:
|
|||
command: build
|
||||
toolchain: ${{ matrix.toolchain }}
|
||||
args: --target ${{ matrix.target }} --manifest-path ./example-projects/nats-example/Cargo.toml
|
||||
|
||||
|
|
79
Cargo.toml
79
Cargo.toml
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "cloudevents-sdk"
|
||||
version = "0.6.0"
|
||||
version = "0.8.0"
|
||||
authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
|
||||
license-file = "LICENSE"
|
||||
edition = "2018"
|
||||
|
@ -12,6 +12,7 @@ exclude = [
|
|||
".github/*"
|
||||
]
|
||||
categories = ["web-programming", "encoding", "data-structures"]
|
||||
resolver = "2"
|
||||
|
||||
# Enable all features when building on docs.rs to show feature gated bindings
|
||||
[package.metadata.docs.rs]
|
||||
|
@ -23,60 +24,82 @@ name = "cloudevents"
|
|||
|
||||
[features]
|
||||
http-binding = ["async-trait", "bytes", "futures", "http"]
|
||||
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http"]
|
||||
http-0-2-binding = ["async-trait", "bytes", "futures", "http-0-2"]
|
||||
actix = ["actix-web", "actix-http", "async-trait", "bytes", "futures", "http-0-2"]
|
||||
reqwest = ["reqwest-lib", "async-trait", "bytes", "http", "uuid/js"]
|
||||
rdkafka = ["rdkafka-lib", "bytes", "futures"]
|
||||
warp = ["warp-lib", "bytes", "http", "hyper"]
|
||||
axum = ["bytes", "http", "hyper", "axum-lib", "http-body", "async-trait"]
|
||||
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait"]
|
||||
warp = ["warp-lib", "bytes", "http-0-2", "http-body-util", "hyper-0-14"]
|
||||
axum = ["bytes", "http", "hyper", "axum-lib", "http-body-util", "async-trait"]
|
||||
poem = ["bytes", "http", "poem-lib", "hyper", "async-trait", "http-body-util", "futures"]
|
||||
nats = ["nats-lib"]
|
||||
|
||||
[dependencies]
|
||||
serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = "^1.0"
|
||||
chrono = { version = "^0.4", features = ["serde"] }
|
||||
delegate-attr = "^0.2"
|
||||
base64 = "^0.12"
|
||||
url = { version = "^2.1", features = ["serde"] }
|
||||
snafu = "^0.6"
|
||||
bitflags = "^1.2"
|
||||
delegate-attr = "^0.3"
|
||||
base64 = "^0.22"
|
||||
url = { version = "^2.5", features = ["serde"] }
|
||||
snafu = "^0.8"
|
||||
bitflags = "^2.6"
|
||||
uuid = { version = "1", features = ["v4"] }
|
||||
|
||||
# runtime optional deps
|
||||
actix-web = { version = "4", optional = true }
|
||||
actix-http = { version = "3", optional = true }
|
||||
reqwest-lib = { version = "^0.11", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
|
||||
rdkafka-lib = { version = "^0.29", features = ["cmake-build"], optional = true, package = "rdkafka" }
|
||||
reqwest-lib = { version = "^0.12", default-features = false, features = ["rustls-tls"], optional = true, package = "reqwest" }
|
||||
rdkafka-lib = { version = "^0.37", features = ["cmake-build"], optional = true, package = "rdkafka" }
|
||||
warp-lib = { version = "^0.3", optional = true, package = "warp" }
|
||||
async-trait = { version = "^0.1.33", optional = true }
|
||||
async-trait = { version = "^0.1", optional = true }
|
||||
bytes = { version = "^1.0", optional = true }
|
||||
futures = { version = "^0.3", optional = true }
|
||||
http = { version = "0.2", optional = true }
|
||||
hyper = { version = "^0.14", optional = true }
|
||||
axum-lib = { version = "^0.5", optional = true , package="axum"}
|
||||
http-body = { version = "^0.4", optional = true }
|
||||
poem-lib = { version = "=1.2.34", optional = true, package = "poem" }
|
||||
nats-lib = { version = "0.21.0", optional = true, package = "nats" }
|
||||
futures = { version = "^0.3", optional = true, features = ["compat"]}
|
||||
http = { version = "1.2", optional = true}
|
||||
http-0-2 = { version = "0.2", optional = true, package = "http"}
|
||||
axum-lib = { version = "^0.8", optional = true, package="axum"}
|
||||
http-body-util = {version = "^0.1", optional = true}
|
||||
poem-lib = { version = "^3.1", optional = true, package = "poem" }
|
||||
nats-lib = { version = "0.25.0", optional = true, package = "nats" }
|
||||
|
||||
[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
|
||||
hostname = "^0.3"
|
||||
hostname = "^0.4"
|
||||
|
||||
[target.'cfg(all(target_arch = "wasm32", target_os = "unknown"))'.dependencies]
|
||||
web-sys = { version = "^0.3", features = ["Window", "Location"] }
|
||||
|
||||
[target.'cfg(not(target_os = "wasi"))'.dependencies]
|
||||
hyper = { version = "^1.5", optional = true, package="hyper" }
|
||||
hyper-0-14 = { version = "^0.14", optional = true, package = "hyper"}
|
||||
|
||||
[target.'cfg(all(target_arch = "wasm32", target_os = "wasi"))'.dependencies]
|
||||
hyper_wasi = { version = "0.15", features = ["full"], optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
rstest = "0.6"
|
||||
claim = "0.3.1"
|
||||
rstest = "0.23"
|
||||
claims = "0.8"
|
||||
version-sync = "0.9.2"
|
||||
serde_yaml = "0.8"
|
||||
serde_yaml = "^0.9"
|
||||
rmp-serde = "1"
|
||||
|
||||
# runtime dev-deps
|
||||
actix-rt = { version = "^2" }
|
||||
|
||||
url = { version = "^2.1", features = ["serde"] }
|
||||
serde_json = { version = "^1.0" }
|
||||
chrono = { version = "^0.4", features = ["serde"] }
|
||||
mockito = "0.25.1"
|
||||
tokio = { version = "^1.0", features = ["full"] }
|
||||
mockito = "0.31.1"
|
||||
mime = "0.3"
|
||||
tower = { version = "0.4", features = ["util"] }
|
||||
|
||||
|
||||
[target.'cfg(not(target_os = "wasi"))'.dev-dependencies]
|
||||
actix-rt = { version = "^2" }
|
||||
tokio = { version = "^1.0", features = ["full"] }
|
||||
tower = { version = "0.5", features = ["util"] }
|
||||
|
||||
[target.'cfg(all(target_arch = "wasm32", target_os = "wasi"))'.dev-dependencies]
|
||||
tokio_wasi = { version = "1", features = [
|
||||
"io-util",
|
||||
"fs",
|
||||
"net",
|
||||
"time",
|
||||
"rt",
|
||||
"macros",
|
||||
] }
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
# Maintainers
|
||||
|
||||
Current active maintainers of this SDK:
|
||||
|
||||
- [Jim Crossley](https://github.com/jcrossley3)
|
||||
- [Francesco Guardiani](https://github.com/slinkydeveloper)
|
||||
- [Fabrizio Lazzaretti](https://github.com/Lazzaretti)
|
|
@ -0,0 +1,6 @@
|
|||
admins:
|
||||
- jcrossley3
|
||||
- linuxbasic
|
||||
- slinkydeveloper
|
||||
- Lazzaretti
|
||||
approvers:
|
17
README.md
17
README.md
|
@ -42,7 +42,7 @@ enabling your Protocol Binding of choice:
|
|||
|
||||
```toml
|
||||
[dependencies]
|
||||
cloudevents-sdk = { version = "0.6.0" }
|
||||
cloudevents-sdk = { version = "0.8.0" }
|
||||
```
|
||||
|
||||
Now you can start creating events:
|
||||
|
@ -67,10 +67,6 @@ Checkout the examples using our integrations to learn how to send and receive ev
|
|||
* [Warp Example](example-projects/warp-example)
|
||||
* [NATS Example](example-projects/nats-example)
|
||||
|
||||
## Development & Contributing
|
||||
|
||||
If you're interested in contributing to sdk-rust, look at [Contributing documentation](CONTRIBUTING.md)
|
||||
|
||||
## Community
|
||||
|
||||
- There are bi-weekly calls immediately following the
|
||||
|
@ -97,8 +93,19 @@ for how PR reviews and approval, and our
|
|||
[Code of Conduct](https://github.com/cloudevents/spec/blob/master/community/GOVERNANCE.md#additional-information)
|
||||
information.
|
||||
|
||||
If there is a security concern with one of the CloudEvents specifications, or
|
||||
with one of the project's SDKs, please send an email to
|
||||
[cncf-cloudevents-security@lists.cncf.io](mailto:cncf-cloudevents-security@lists.cncf.io).
|
||||
|
||||
[Crates badge]: https://img.shields.io/crates/v/cloudevents-sdk.svg
|
||||
[crates.io]: https://crates.io/crates/cloudevents-sdk
|
||||
[Docs badge]: https://docs.rs/cloudevents-sdk/badge.svg
|
||||
[docs.rs]: https://docs.rs/cloudevents-sdk
|
||||
[feature flag]: https://doc.rust-lang.org/cargo/reference/manifest.html#the-features-section
|
||||
|
||||
## Additional SDK Resources
|
||||
|
||||
- [List of current active maintainers](MAINTAINERS.md)
|
||||
- [How to contribute to the project](CONTRIBUTING.md)
|
||||
- [SDK's License](LICENSE)
|
||||
- [SDK's Release process](RELEASING.md)
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
# How to create a Release
|
||||
|
||||
To create a new release, do the following:
|
||||
- Bump the version in the README, lib.rs and cargo.toml
|
||||
- Try to run `cargo test --all-features`, `cargo doc --all-features --lib` and
|
||||
`cargo publish --dry-run`
|
||||
- If none of the above commands fail, PR the changes and merge it
|
||||
- Checkout `main` on your local machine and run `cargo publish`
|
||||
- Once that is done, create the release in the Github UI (make sure it
|
||||
creates the git tag as well) and that's it!
|
|
@ -7,7 +7,7 @@ edition = "2018"
|
|||
[dependencies]
|
||||
cloudevents-sdk = { path = "../..", features = ["actix"] }
|
||||
actix-web = "4"
|
||||
actix-cors = "0.6.0-beta.8"
|
||||
actix-cors = "^0.7"
|
||||
serde_json = "^1.0"
|
||||
url = { version = "^2.1" }
|
||||
env_logger = "0.7.1"
|
||||
env_logger = "^0.11"
|
||||
|
|
|
@ -6,16 +6,16 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
cloudevents-sdk = { path = "../..", features = ["axum"] }
|
||||
axum = "^0.5"
|
||||
http = "^0.2"
|
||||
axum = "^0.8"
|
||||
http = "^1.1"
|
||||
tokio = { version = "^1", features = ["full"] }
|
||||
tracing = "^0.1"
|
||||
tracing-subscriber = "^0.2"
|
||||
tower-http = { version = "^0.1", features = ["trace"] }
|
||||
tracing-subscriber = "^0.3"
|
||||
tower-http = { version = "^0.6", features = ["trace"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tower = { version = "^0.4", features = ["util"] }
|
||||
tower = { version = "^0.5", features = ["util"] }
|
||||
serde = { version = "^1.0", features = ["derive"] }
|
||||
serde_json = "^1.0"
|
||||
chrono = { version = "^0.4", features = ["serde"] }
|
||||
hyper = { version = "^0.14" }
|
||||
hyper = { version = "^1.4" }
|
||||
|
|
|
@ -4,7 +4,6 @@ use axum::{
|
|||
};
|
||||
use cloudevents::Event;
|
||||
use http::StatusCode;
|
||||
use std::net::SocketAddr;
|
||||
use tower_http::trace::TraceLayer;
|
||||
|
||||
fn echo_app() -> Router {
|
||||
|
@ -27,12 +26,8 @@ async fn main() {
|
|||
}
|
||||
tracing_subscriber::fmt::init();
|
||||
let service = echo_app();
|
||||
let addr = SocketAddr::from(([127, 0, 0, 1], 8080));
|
||||
tracing::debug!("listening on {}", addr);
|
||||
axum::Server::bind(&addr)
|
||||
.serve(service.into_make_service())
|
||||
.await
|
||||
.unwrap();
|
||||
let listener = tokio::net::TcpListener::bind("0.0.0.0:8080").await.unwrap();
|
||||
axum::serve(listener, service).await.unwrap();
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -9,4 +9,4 @@ edition = "2021"
|
|||
[dependencies]
|
||||
cloudevents-sdk = { path = "../..", features = ["nats"] }
|
||||
serde_json = "^1.0"
|
||||
nats = "0.21.0"
|
||||
nats = "^0.25"
|
|
@ -7,8 +7,8 @@ edition = "2021"
|
|||
cloudevents-sdk = { path = "../..", features = ["poem"] }
|
||||
tokio = { version = "1.13", features = ["macros", "rt-multi-thread"] }
|
||||
tracing = "0.1"
|
||||
poem = { version = "1" }
|
||||
tracing-subscriber = "0.2"
|
||||
poem = { version = "^3.0" }
|
||||
tracing-subscriber = "0.3"
|
||||
serde_json = "1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "rdkafka-example"
|
||||
version = "0.3.0"
|
||||
version = "0.4.0"
|
||||
authors = ["Pranav Bhatt <adpranavb2000@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
|
@ -16,4 +16,4 @@ serde_json = "^1.0"
|
|||
futures = "^0.3"
|
||||
tokio = { version = "^1.0", features = ["full"] }
|
||||
clap = "2.33.1"
|
||||
rdkafka = { version = "^0.29", features = ["cmake-build"] }
|
||||
rdkafka = { version = "^0.37", features = ["cmake-build"] }
|
||||
|
|
|
@ -3,6 +3,7 @@ name = "reqwest-wasm-example"
|
|||
version = "0.3.0"
|
||||
authors = ["Francesco Guardiani <francescoguard@gmail.com>"]
|
||||
edition = "2018"
|
||||
resolver = "2"
|
||||
|
||||
# Config mostly pulled from: https://github.com/rustwasm/wasm-bindgen/blob/master/examples/fetch/Cargo.toml
|
||||
|
||||
|
@ -10,7 +11,7 @@ edition = "2018"
|
|||
crate-type = ["cdylib"]
|
||||
|
||||
[dependencies]
|
||||
reqwest = "^0.11"
|
||||
reqwest = "^0.12"
|
||||
uuid = "1"
|
||||
cloudevents-sdk = { path = "../..", features = ["reqwest"] }
|
||||
url = { version = "^2.1" }
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -6,12 +6,12 @@
|
|||
"devDependencies": {
|
||||
"@wasm-tool/wasm-pack-plugin": "^1.4.0",
|
||||
"css-loader": "^5.2.6",
|
||||
"html-webpack-plugin": "^4.5.2",
|
||||
"html-webpack-plugin": "^5.5.0",
|
||||
"style-loader": "^2.0.0",
|
||||
"text-encoding": "^0.7.0",
|
||||
"webpack": "^4.46.0",
|
||||
"webpack": "^5.95.0",
|
||||
"webpack-cli": "^4.8.0",
|
||||
"webpack-dev-server": "^3.11.2"
|
||||
"webpack-dev-server": "^5.0.4"
|
||||
},
|
||||
"dependencies": {
|
||||
"bootstrap": "^5.0.2",
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
[package]
|
||||
name = "wasi-example"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
cloudevents-sdk = { path = "../..", features = ["http-0-2-binding", "hyper_wasi", "hyper-0-14" ] }
|
||||
hyper_wasi = { version = "0.15", features = ["full"] }
|
||||
log = "0.4.21"
|
||||
tokio_wasi = { version = "1", features = ["io-util", "fs", "net", "time", "rt", "macros"] }
|
||||
serde_json = "^1.0"
|
||||
|
||||
[dev-dependencies]
|
||||
bytes = "1.6.0"
|
||||
http-body-util = "0.1.1"
|
||||
chrono = "*"
|
|
@ -0,0 +1,26 @@
|
|||
Install WASMEdge:
|
||||
|
||||
https://wasmedge.org/docs/start/install/
|
||||
|
||||
To run the server:
|
||||
```console
|
||||
cargo run --target wasm32-wasi
|
||||
```
|
||||
|
||||
To test a GET:
|
||||
|
||||
```console
|
||||
curl -sw '%{http_code}\n' http://localhost:9000/health/readiness
|
||||
```
|
||||
|
||||
To test a POST:
|
||||
|
||||
```console
|
||||
curl -d '{"name": "wasi-womble"}' \
|
||||
-H'content-type: application/json' \
|
||||
-H'ce-specversion: 1.0' \
|
||||
-H'ce-id: 1' \
|
||||
-H'ce-source: http://cloudevents.io' \
|
||||
-H'ce-type: dev.knative.example' \
|
||||
http://localhost:9000
|
||||
```
|
|
@ -0,0 +1,39 @@
|
|||
use cloudevents::{event::Data, Event, EventBuilder, EventBuilderV10};
|
||||
use log::info;
|
||||
use serde_json::{from_slice, from_str, json};
|
||||
|
||||
pub async fn handle_event(event: Event) -> Result<Event, anyhow::Error> {
|
||||
info!("event: {}", event);
|
||||
|
||||
let input = match event.data() {
|
||||
Some(Data::Binary(v)) => from_slice(v)?,
|
||||
Some(Data::String(v)) => from_str(v)?,
|
||||
Some(Data::Json(v)) => v.to_owned(),
|
||||
None => json!({ "name": "default" }),
|
||||
};
|
||||
|
||||
EventBuilderV10::from(event)
|
||||
.source("func://handler")
|
||||
.ty("func.example")
|
||||
.data("application/json", json!({ "hello": input["name"] }))
|
||||
.build()
|
||||
.map_err(|err| err.into())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
#[tokio::test]
|
||||
async fn post_test() -> Result<(), anyhow::Error> {
|
||||
let reqevt = Event::default();
|
||||
let respevt = handle_event(reqevt).await?;
|
||||
let output = match respevt.data() {
|
||||
Some(Data::Binary(v)) => from_slice(v)?,
|
||||
Some(Data::String(v)) => from_str(v)?,
|
||||
Some(Data::Json(v)) => v.to_owned(),
|
||||
None => json!({ "name": "default" }),
|
||||
};
|
||||
assert_eq!(output, json!({ "hello": "default" }));
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
use cloudevents::binding::http_0_2::builder::adapter::to_response;
|
||||
use cloudevents::binding::http_0_2::to_event;
|
||||
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::Server;
|
||||
use hyper::{Body, Method, Request, Response, StatusCode};
|
||||
use std::convert::Infallible;
|
||||
use std::net::SocketAddr;
|
||||
use std::result::Result;
|
||||
|
||||
mod handler;
|
||||
|
||||
#[allow(clippy::redundant_closure)]
|
||||
#[tokio::main(flavor = "current_thread")]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], 9000));
|
||||
let make_svc = make_service_fn(|_| async move {
|
||||
Ok::<_, Infallible>(service_fn(move |req| handle_request(req)))
|
||||
});
|
||||
let server = Server::bind(&addr).serve(make_svc);
|
||||
if let Err(e) = server.await {
|
||||
eprintln!("server error: {}", e);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn handle_request(
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, anyhow::Error> {
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&Method::POST, "/") => {
|
||||
let headers = req.headers().clone();
|
||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await?;
|
||||
let body = body_bytes.to_vec();
|
||||
let reqevt = to_event(&headers, body)?;
|
||||
let _respevt = handler::handle_event(reqevt).await?;
|
||||
|
||||
to_response(_respevt).map_err(|err| err.into())
|
||||
}
|
||||
(&Method::GET, "/health/readiness") => {
|
||||
Ok(Response::new(Body::from("")))
|
||||
}
|
||||
(&Method::GET, "/health/liveness") => Ok(Response::new(Body::from(""))),
|
||||
_ => {
|
||||
let mut not_found = Response::default();
|
||||
*not_found.status_mut() = StatusCode::NOT_FOUND;
|
||||
Ok(not_found)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
use crate::binding::http::{to_event, Headers};
|
||||
use crate::binding::http_0_2::{to_event, Headers};
|
||||
use crate::Event;
|
||||
use actix_web::dev::Payload;
|
||||
use actix_web::web::BytesMut;
|
||||
|
@ -6,6 +6,7 @@ use actix_web::{web, HttpRequest};
|
|||
use async_trait::async_trait;
|
||||
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
|
||||
use http::header::{AsHeaderName, HeaderName, HeaderValue};
|
||||
use http_0_2 as http;
|
||||
|
||||
/// Implement Headers for the actix HeaderMap
|
||||
impl<'a> Headers<'a> for actix_http::header::HeaderMap {
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
use crate::binding::http::{Builder, Serializer};
|
||||
use crate::binding::http_0_2::{Builder, Serializer};
|
||||
use crate::message::{BinaryDeserializer, Result};
|
||||
use crate::Event;
|
||||
use actix_web::http::StatusCode;
|
||||
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder};
|
||||
use http_0_2 as http;
|
||||
|
||||
impl Builder<HttpResponse> for HttpResponseBuilder {
|
||||
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
|
||||
|
|
|
@ -1,46 +1,44 @@
|
|||
use axum::body::Bytes;
|
||||
use axum::extract::{FromRequest, Request};
|
||||
use axum::response::Response;
|
||||
use axum_lib as axum;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use axum::extract::{FromRequest, RequestParts};
|
||||
use http;
|
||||
use http::StatusCode;
|
||||
use http_body::Body;
|
||||
use hyper::body;
|
||||
|
||||
use crate::binding::http::to_event;
|
||||
use crate::event::Event;
|
||||
|
||||
type BoxError = Box<dyn std::error::Error + Send + Sync>;
|
||||
|
||||
#[async_trait]
|
||||
impl<B> FromRequest<B> for Event
|
||||
impl<S> FromRequest<S> for Event
|
||||
where
|
||||
B: Body + Send,
|
||||
B::Data: Send,
|
||||
B::Error: Into<BoxError>,
|
||||
Bytes: FromRequest<S>,
|
||||
S: Send + Sync,
|
||||
{
|
||||
type Rejection = (StatusCode, String);
|
||||
type Rejection = Response;
|
||||
|
||||
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
|
||||
let req_body = req
|
||||
.take_body()
|
||||
.ok_or(0)
|
||||
.map_err(|_| (StatusCode::BAD_REQUEST, "unexpected empty body".to_string()))?;
|
||||
let buf = body::to_bytes(req_body)
|
||||
.await
|
||||
.map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e.into())))?
|
||||
.to_vec();
|
||||
let headers = req.headers();
|
||||
async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
|
||||
let (parts, body) = req.into_parts();
|
||||
|
||||
to_event(headers, buf).map_err(|e| (StatusCode::BAD_REQUEST, format!("{}", e)))
|
||||
let body = axum::body::to_bytes(body, usize::MAX).await.map_err(|e| {
|
||||
Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(axum::body::Body::from(e.to_string()))
|
||||
.unwrap()
|
||||
})?;
|
||||
|
||||
to_event(&parts.headers, body.to_vec()).map_err(|e| {
|
||||
Response::builder()
|
||||
.status(StatusCode::BAD_REQUEST)
|
||||
.body(axum::body::Body::from(e.to_string()))
|
||||
.unwrap()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use axum_lib as axum;
|
||||
|
||||
use super::*;
|
||||
use axum::body::Body;
|
||||
use axum::extract::FromRequest;
|
||||
use axum::http::{self, Request, StatusCode};
|
||||
|
||||
use crate::test::fixtures;
|
||||
|
@ -49,8 +47,7 @@ mod tests {
|
|||
async fn axum_test_request() {
|
||||
let expected = fixtures::v10::minimal_string_extension();
|
||||
|
||||
let mut request = RequestParts::new(
|
||||
Request::builder()
|
||||
let request = Request::builder()
|
||||
.method(http::Method::POST)
|
||||
.header("ce-specversion", "1.0")
|
||||
.header("ce-id", "0001")
|
||||
|
@ -58,18 +55,16 @@ mod tests {
|
|||
.header("ce-source", "http://localhost/")
|
||||
.header("ce-someint", "10")
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
);
|
||||
.unwrap();
|
||||
|
||||
let result = Event::from_request(&mut request).await.unwrap();
|
||||
let result = Event::from_request(request, &()).await.unwrap();
|
||||
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn axum_test_bad_request() {
|
||||
let mut request = RequestParts::new(
|
||||
Request::builder()
|
||||
let request = Request::builder()
|
||||
.method(http::Method::POST)
|
||||
.header("ce-specversion", "BAD SPECIFICATION")
|
||||
.header("ce-id", "0001")
|
||||
|
@ -78,14 +73,13 @@ mod tests {
|
|||
.header("ce-someint", "10")
|
||||
.header("ce-time", fixtures::time().to_rfc3339())
|
||||
.body(Body::empty())
|
||||
.unwrap(),
|
||||
);
|
||||
.unwrap();
|
||||
|
||||
let result = Event::from_request(&mut request).await;
|
||||
let result = Event::from_request(request, &()).await;
|
||||
assert!(result.is_err());
|
||||
let rejection = result.unwrap_err();
|
||||
|
||||
let reason = rejection.0;
|
||||
let reason = rejection.status();
|
||||
assert_eq!(reason, StatusCode::BAD_REQUEST)
|
||||
}
|
||||
|
||||
|
@ -93,8 +87,7 @@ mod tests {
|
|||
async fn axum_test_request_with_full_data() {
|
||||
let expected = fixtures::v10::full_binary_json_data_string_extension();
|
||||
|
||||
let mut request = RequestParts::new(
|
||||
Request::builder()
|
||||
let request = Request::builder()
|
||||
.method(http::Method::POST)
|
||||
.header("ce-specversion", "1.0")
|
||||
.header("ce-id", "0001")
|
||||
|
@ -107,10 +100,9 @@ mod tests {
|
|||
.header("ce-bool_ex", "true")
|
||||
.header("ce-time", &fixtures::time().to_rfc3339())
|
||||
.body(Body::from(fixtures::json_data_binary()))
|
||||
.unwrap(),
|
||||
);
|
||||
.unwrap();
|
||||
|
||||
let result = Event::from_request(&mut request).await.unwrap();
|
||||
let result = Event::from_request(request, &()).await.unwrap();
|
||||
|
||||
assert_eq!(expected, result);
|
||||
}
|
||||
|
|
|
@ -155,7 +155,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let (_, body) = resp.into_parts();
|
||||
let body = hyper::body::to_bytes(body).await.unwrap();
|
||||
let body = axum::body::to_bytes(body, usize::MAX).await.unwrap();
|
||||
|
||||
assert_eq!(j.to_string().as_bytes(), body);
|
||||
}
|
||||
|
|
|
@ -1,27 +1,21 @@
|
|||
use axum_lib as axum;
|
||||
|
||||
use axum::{
|
||||
body::{boxed, BoxBody},
|
||||
http::Response,
|
||||
response::IntoResponse,
|
||||
};
|
||||
use http::{header, StatusCode};
|
||||
use hyper::body::Body;
|
||||
|
||||
use crate::binding::http::builder::adapter::to_response;
|
||||
use crate::event::Event;
|
||||
use axum::{body::Body, http::Response, response::IntoResponse};
|
||||
use axum_lib as axum;
|
||||
use http;
|
||||
use http::{header, StatusCode};
|
||||
|
||||
impl IntoResponse for Event {
|
||||
fn into_response(self) -> Response<BoxBody> {
|
||||
fn into_response(self) -> Response<Body> {
|
||||
match to_response(self) {
|
||||
Ok(resp) => {
|
||||
let (parts, body) = resp.into_parts();
|
||||
Response::from_parts(parts, boxed(body))
|
||||
Response::from_parts(parts, Body::new(body))
|
||||
}
|
||||
Err(err) => Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.header(header::CONTENT_TYPE, "text/plain")
|
||||
.body(boxed(Body::from(err.to_string())))
|
||||
.body(Body::from(err.to_string()))
|
||||
.unwrap(),
|
||||
}
|
||||
}
|
||||
|
@ -105,7 +99,7 @@ mod tests {
|
|||
);
|
||||
|
||||
let (_, body) = resp.into_parts();
|
||||
let body = hyper::body::to_bytes(body).await.unwrap();
|
||||
let body = axum::body::to_bytes(body, usize::MAX).await.unwrap();
|
||||
|
||||
assert_eq!(fixtures::json_data_binary(), body);
|
||||
}
|
||||
|
|
|
@ -1,33 +1,38 @@
|
|||
use bytes::Bytes;
|
||||
use http::Response;
|
||||
use hyper::body::Body;
|
||||
use http_body_util::Full;
|
||||
use std::cell::Cell;
|
||||
|
||||
use crate::binding::http::{Builder, Serializer};
|
||||
use crate::message::{BinaryDeserializer, Error, Result};
|
||||
use crate::Event;
|
||||
use std::convert::Infallible;
|
||||
type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Infallible>;
|
||||
|
||||
struct Adapter {
|
||||
builder: Cell<http::response::Builder>,
|
||||
}
|
||||
|
||||
impl Builder<Response<Body>> for Adapter {
|
||||
impl Builder<Response<BoxBody>> for Adapter {
|
||||
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
|
||||
self.builder.set(self.builder.take().header(key, value));
|
||||
}
|
||||
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<Body>> {
|
||||
|
||||
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<BoxBody>> {
|
||||
self.builder
|
||||
.take()
|
||||
.body(Body::from(bytes))
|
||||
.body(BoxBody::new(Full::from(bytes)))
|
||||
.map_err(|e| crate::message::Error::Other {
|
||||
source: Box::new(e),
|
||||
})
|
||||
}
|
||||
fn finish(&mut self) -> Result<Response<Body>> {
|
||||
|
||||
fn finish(&mut self) -> Result<Response<BoxBody>> {
|
||||
self.body(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_response(event: Event) -> std::result::Result<Response<Body>, Error> {
|
||||
pub fn to_response(event: Event) -> std::result::Result<Response<BoxBody>, Error> {
|
||||
BinaryDeserializer::deserialize_binary(
|
||||
event,
|
||||
Serializer::new(Adapter {
|
||||
|
|
|
@ -3,6 +3,8 @@ pub mod adapter;
|
|||
|
||||
use crate::message::Result;
|
||||
|
||||
use http;
|
||||
|
||||
pub trait Builder<R> {
|
||||
fn header(&mut self, key: &str, value: http::header::HeaderValue);
|
||||
fn body(&mut self, bytes: Vec<u8>) -> Result<R>;
|
||||
|
|
|
@ -8,6 +8,8 @@ use crate::{
|
|||
Result, StructuredDeserializer, StructuredSerializer,
|
||||
},
|
||||
};
|
||||
|
||||
use http;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
pub struct Deserializer<'a, T: Headers<'a>> {
|
||||
|
@ -34,10 +36,10 @@ impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
|
|||
.unwrap()?,
|
||||
)?;
|
||||
|
||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
||||
|
||||
let attributes = spec_version.attribute_names();
|
||||
|
||||
visitor = visitor.set_spec_version(spec_version)?;
|
||||
|
||||
for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
|
||||
let key = hn.as_str();
|
||||
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue};
|
||||
|
||||
use http;
|
||||
|
||||
/// Any http library should be able to use the
|
||||
/// [`to_event`](super::to_event) function with an implementation of
|
||||
/// this trait.
|
||||
|
|
|
@ -13,6 +13,8 @@ mod serializer;
|
|||
pub use builder::Builder;
|
||||
use core::convert::TryFrom;
|
||||
use http::Response;
|
||||
|
||||
use http;
|
||||
pub use serializer::Serializer;
|
||||
use std::convert::TryInto;
|
||||
use std::fmt::Debug;
|
||||
|
@ -53,6 +55,8 @@ mod tests {
|
|||
use core::convert::TryFrom;
|
||||
use http::Response;
|
||||
|
||||
use http;
|
||||
|
||||
#[test]
|
||||
fn test_response_to_event() {
|
||||
let event = fixtures::v10::minimal_string_extension();
|
||||
|
|
|
@ -12,6 +12,8 @@ use crate::message::{
|
|||
};
|
||||
use crate::Event;
|
||||
use http::Request;
|
||||
|
||||
use http;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::Debug;
|
||||
|
||||
|
@ -131,6 +133,8 @@ mod tests {
|
|||
use crate::test::fixtures;
|
||||
use bytes::Bytes;
|
||||
use http::Request;
|
||||
|
||||
use http;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
use http::Response;
|
||||
use http_0_2 as http;
|
||||
use hyper::body::Body;
|
||||
use std::cell::Cell;
|
||||
|
||||
#[cfg(not(target_os = "wasi"))]
|
||||
use hyper_0_14 as hyper;
|
||||
|
||||
#[cfg(target_os = "wasi")]
|
||||
use hyper;
|
||||
|
||||
use crate::binding::http_0_2::{Builder, Serializer};
|
||||
use crate::message::{BinaryDeserializer, Error, Result};
|
||||
use crate::Event;
|
||||
|
||||
struct Adapter {
|
||||
builder: Cell<http::response::Builder>,
|
||||
}
|
||||
|
||||
impl Builder<Response<Body>> for Adapter {
|
||||
fn header(&mut self, key: &str, value: http::header::HeaderValue) {
|
||||
self.builder.set(self.builder.take().header(key, value));
|
||||
}
|
||||
fn body(&mut self, bytes: Vec<u8>) -> Result<Response<Body>> {
|
||||
self.builder
|
||||
.take()
|
||||
.body(Body::from(bytes))
|
||||
.map_err(|e| crate::message::Error::Other {
|
||||
source: Box::new(e),
|
||||
})
|
||||
}
|
||||
fn finish(&mut self) -> Result<Response<Body>> {
|
||||
self.body(Vec::new())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_response(event: Event) -> std::result::Result<Response<Body>, Error> {
|
||||
BinaryDeserializer::deserialize_binary(
|
||||
event,
|
||||
Serializer::new(Adapter {
|
||||
builder: Cell::new(http::Response::builder()),
|
||||
}),
|
||||
)
|
||||
}
|
|
@ -0,0 +1,11 @@
|
|||
#[cfg(feature = "hyper-0-14")]
|
||||
pub mod adapter;
|
||||
|
||||
use crate::message::Result;
|
||||
use http_0_2 as http;
|
||||
|
||||
pub trait Builder<R> {
|
||||
fn header(&mut self, key: &str, value: http::header::HeaderValue);
|
||||
fn body(&mut self, bytes: Vec<u8>) -> Result<R>;
|
||||
fn finish(&mut self) -> Result<R>;
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
use super::{Headers, SPEC_VERSION_HEADER};
|
||||
use crate::{
|
||||
binding::CLOUDEVENTS_JSON_HEADER,
|
||||
event::SpecVersion,
|
||||
header_value_to_str, message,
|
||||
message::{
|
||||
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
|
||||
Result, StructuredDeserializer, StructuredSerializer,
|
||||
},
|
||||
};
|
||||
use http_0_2 as http;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
pub struct Deserializer<'a, T: Headers<'a>> {
|
||||
headers: &'a T,
|
||||
body: Vec<u8>,
|
||||
}
|
||||
|
||||
impl<'a, T: Headers<'a>> Deserializer<'a, T> {
|
||||
pub fn new(headers: &'a T, body: Vec<u8>) -> Deserializer<'a, T> {
|
||||
Deserializer { headers, body }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Headers<'a>> BinaryDeserializer for Deserializer<'a, T> {
|
||||
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
|
||||
if self.encoding() != Encoding::BINARY {
|
||||
return Err(message::Error::WrongEncoding {});
|
||||
}
|
||||
|
||||
let spec_version = SpecVersion::try_from(
|
||||
self.headers
|
||||
.get(SPEC_VERSION_HEADER)
|
||||
.map(|a| header_value_to_str!(a))
|
||||
.unwrap()?,
|
||||
)?;
|
||||
|
||||
let attributes = spec_version.attribute_names();
|
||||
|
||||
visitor = visitor.set_spec_version(spec_version)?;
|
||||
|
||||
for (hn, hv) in self.headers.iter().filter(|(hn, _)| {
|
||||
let key = hn.as_str();
|
||||
SPEC_VERSION_HEADER.ne(key) && key.starts_with("ce-")
|
||||
}) {
|
||||
let name = &hn.as_str()["ce-".len()..];
|
||||
|
||||
if attributes.contains(&name) {
|
||||
visitor = visitor.set_attribute(
|
||||
name,
|
||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
||||
)?
|
||||
} else {
|
||||
visitor = visitor.set_extension(
|
||||
name,
|
||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
||||
)?
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(hv) = self.headers.get(http::header::CONTENT_TYPE) {
|
||||
visitor = visitor.set_attribute(
|
||||
"datacontenttype",
|
||||
MessageAttributeValue::String(String::from(header_value_to_str!(hv)?)),
|
||||
)?
|
||||
}
|
||||
|
||||
if !self.body.is_empty() {
|
||||
visitor.end_with_data(self.body)
|
||||
} else {
|
||||
visitor.end()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Headers<'a>> StructuredDeserializer for Deserializer<'a, T> {
|
||||
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
|
||||
if self.encoding() != Encoding::STRUCTURED {
|
||||
return Err(message::Error::WrongEncoding {});
|
||||
}
|
||||
visitor.set_structured_event(self.body)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, T: Headers<'a>> MessageDeserializer for Deserializer<'a, T> {
|
||||
fn encoding(&self) -> Encoding {
|
||||
if self
|
||||
.headers
|
||||
.get(http::header::CONTENT_TYPE)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.filter(|&v| v.starts_with(CLOUDEVENTS_JSON_HEADER))
|
||||
.is_some()
|
||||
{
|
||||
Encoding::STRUCTURED
|
||||
} else if self.headers.get(SPEC_VERSION_HEADER).is_some() {
|
||||
Encoding::BINARY
|
||||
} else {
|
||||
Encoding::UNKNOWN
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
use http::header::{AsHeaderName, HeaderMap, HeaderName, HeaderValue};
|
||||
use http_0_2 as http;
|
||||
|
||||
/// Any http library should be able to use the
|
||||
/// [`to_event`](super::to_event) function with an implementation of
|
||||
/// this trait.
|
||||
pub trait Headers<'a> {
|
||||
type Iterator: Iterator<Item = (&'a HeaderName, &'a HeaderValue)>;
|
||||
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue>;
|
||||
fn iter(&'a self) -> Self::Iterator;
|
||||
}
|
||||
|
||||
/// Implemention for the HeaderMap used by warp/reqwest
|
||||
impl<'a> Headers<'a> for HeaderMap<HeaderValue> {
|
||||
type Iterator = http::header::Iter<'a, HeaderValue>;
|
||||
fn get<K: AsHeaderName>(&self, name: K) -> Option<&HeaderValue> {
|
||||
self.get(name)
|
||||
}
|
||||
fn iter(&'a self) -> Self::Iterator {
|
||||
self.iter()
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
pub mod builder;
|
||||
pub mod deserializer;
|
||||
mod headers;
|
||||
|
||||
use crate::{
|
||||
message::{Error, MessageDeserializer},
|
||||
Event,
|
||||
};
|
||||
use deserializer::Deserializer;
|
||||
pub use headers::Headers;
|
||||
mod serializer;
|
||||
|
||||
pub use builder::Builder;
|
||||
use core::convert::TryFrom;
|
||||
use http::Response;
|
||||
use http_0_2 as http;
|
||||
pub use serializer::Serializer;
|
||||
use std::convert::TryInto;
|
||||
use std::fmt::Debug;
|
||||
|
||||
pub static SPEC_VERSION_HEADER: &str = "ce-specversion";
|
||||
|
||||
/// Turn a pile of HTTP headers and a body into a CloudEvent
|
||||
pub fn to_event<'a, T: Headers<'a>>(
|
||||
headers: &'a T,
|
||||
body: Vec<u8>,
|
||||
) -> std::result::Result<Event, Error> {
|
||||
MessageDeserializer::into_event(Deserializer::new(headers, body))
|
||||
}
|
||||
|
||||
pub fn header_prefix(name: &str) -> String {
|
||||
super::header_prefix("ce-", name)
|
||||
}
|
||||
|
||||
impl<T> TryFrom<Response<T>> for Event
|
||||
where
|
||||
T: TryInto<Vec<u8>>,
|
||||
<T as TryInto<Vec<u8>>>::Error: Debug,
|
||||
{
|
||||
type Error = crate::message::Error;
|
||||
|
||||
fn try_from(response: Response<T>) -> Result<Self, Self::Error> {
|
||||
let headers = response.headers().to_owned();
|
||||
let body = T::try_into(response.into_body()).unwrap();
|
||||
|
||||
to_event(&headers, body)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test::fixtures;
|
||||
use crate::Event;
|
||||
use core::convert::TryFrom;
|
||||
use http::Response;
|
||||
use http_0_2 as http;
|
||||
|
||||
#[test]
|
||||
fn test_response_to_event() {
|
||||
let event = fixtures::v10::minimal_string_extension();
|
||||
|
||||
let response = Response::builder()
|
||||
.header("ce-id", fixtures::id())
|
||||
.header("ce-source", fixtures::source())
|
||||
.header("ce-type", fixtures::ty())
|
||||
.header("ce-specversion", "1.0")
|
||||
.header("ce-someint", "10")
|
||||
.body(Vec::new())
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(event, Event::try_from(response).unwrap());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,159 @@
|
|||
use std::{cell::RefCell, rc::Rc};
|
||||
|
||||
use crate::binding::http_0_2::builder::Builder;
|
||||
use crate::binding::{
|
||||
http_0_2::{header_prefix, SPEC_VERSION_HEADER},
|
||||
CLOUDEVENTS_JSON_HEADER,
|
||||
};
|
||||
use crate::event::SpecVersion;
|
||||
use crate::message::BinaryDeserializer;
|
||||
use crate::message::{
|
||||
BinarySerializer, Error, MessageAttributeValue, Result, StructuredSerializer,
|
||||
};
|
||||
use crate::Event;
|
||||
use http::Request;
|
||||
use http_0_2 as http;
|
||||
use std::convert::TryFrom;
|
||||
use std::fmt::Debug;
|
||||
|
||||
macro_rules! str_to_header_value {
|
||||
($header_value:expr) => {
|
||||
http::header::HeaderValue::from_str(&$header_value.to_string()).map_err(|e| {
|
||||
crate::message::Error::Other {
|
||||
source: Box::new(e),
|
||||
}
|
||||
})
|
||||
};
|
||||
}
|
||||
|
||||
pub struct Serializer<T> {
|
||||
builder: Rc<RefCell<dyn Builder<T>>>,
|
||||
}
|
||||
|
||||
impl<T> Serializer<T> {
|
||||
pub fn new<B: Builder<T> + 'static>(delegate: B) -> Serializer<T> {
|
||||
let builder = Rc::new(RefCell::new(delegate));
|
||||
Serializer { builder }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> BinarySerializer<T> for Serializer<T> {
|
||||
fn set_spec_version(self, spec_version: SpecVersion) -> Result<Self> {
|
||||
self.builder
|
||||
.borrow_mut()
|
||||
.header(SPEC_VERSION_HEADER, str_to_header_value!(spec_version)?);
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn set_attribute(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||
self.builder
|
||||
.borrow_mut()
|
||||
.header(&header_prefix(name), str_to_header_value!(value)?);
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn set_extension(self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||
self.builder
|
||||
.borrow_mut()
|
||||
.header(&header_prefix(name), str_to_header_value!(value)?);
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn end_with_data(self, bytes: Vec<u8>) -> Result<T> {
|
||||
self.builder.borrow_mut().body(bytes)
|
||||
}
|
||||
|
||||
fn end(self) -> Result<T> {
|
||||
self.builder.borrow_mut().finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> StructuredSerializer<T> for Serializer<T> {
|
||||
fn set_structured_event(self, bytes: Vec<u8>) -> Result<T> {
|
||||
let mut builder = self.builder.borrow_mut();
|
||||
builder.header(
|
||||
http::header::CONTENT_TYPE.as_str(),
|
||||
http::HeaderValue::from_static(CLOUDEVENTS_JSON_HEADER),
|
||||
);
|
||||
builder.body(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> BinarySerializer<http::request::Request<Option<T>>> for http::request::Builder
|
||||
where
|
||||
T: TryFrom<Vec<u8>>,
|
||||
<T as TryFrom<Vec<u8>>>::Error: Debug,
|
||||
{
|
||||
fn set_spec_version(mut self, sv: SpecVersion) -> Result<Self> {
|
||||
self = self.header(SPEC_VERSION_HEADER, &sv.to_string());
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||
let key = &header_prefix(name);
|
||||
self = self.header(key, &value.to_string());
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result<Self> {
|
||||
let key = &header_prefix(name);
|
||||
self = self.header(key, &value.to_string());
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
fn end_with_data(self, bytes: Vec<u8>) -> Result<http::request::Request<Option<T>>> {
|
||||
let body = T::try_from(bytes).unwrap();
|
||||
self.body(Some(body)).map_err(|e| Error::Other {
|
||||
source: Box::new(e),
|
||||
})
|
||||
}
|
||||
|
||||
fn end(self) -> Result<http::request::Request<Option<T>>> {
|
||||
self.body(None).map_err(|e| Error::Other {
|
||||
source: Box::new(e),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> TryFrom<Event> for Request<Option<T>>
|
||||
where
|
||||
T: TryFrom<Vec<u8>>,
|
||||
<T as TryFrom<Vec<u8>>>::Error: Debug,
|
||||
{
|
||||
type Error = crate::message::Error;
|
||||
|
||||
fn try_from(event: Event) -> Result<Self> {
|
||||
BinaryDeserializer::deserialize_binary(event, http::request::Builder::new())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test::fixtures;
|
||||
use bytes::Bytes;
|
||||
use http::Request;
|
||||
use http_0_2 as http;
|
||||
use std::convert::TryFrom;
|
||||
|
||||
#[test]
|
||||
fn test_event_to_http_request() {
|
||||
let event = fixtures::v10::minimal_string_extension();
|
||||
let request: Request<Option<Vec<u8>>> = Request::try_from(event).unwrap();
|
||||
|
||||
assert_eq!(request.headers()["ce-id"], "0001");
|
||||
assert_eq!(request.headers()["ce-type"], "test_event.test_application");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_event_to_bytes_body() {
|
||||
let event = fixtures::v10::full_binary_json_data_string_extension();
|
||||
let request: Request<Option<Vec<u8>>> = Request::try_from(event).unwrap();
|
||||
|
||||
assert_eq!(request.headers()["ce-id"], "0001");
|
||||
assert_eq!(request.headers()["ce-type"], "test_event.test_application");
|
||||
assert_eq!(
|
||||
request.body().as_ref().unwrap(),
|
||||
&Bytes::from(fixtures::json_data().to_string())
|
||||
);
|
||||
}
|
||||
}
|
|
@ -11,8 +11,6 @@ pub mod axum;
|
|||
docsrs,
|
||||
doc(cfg(any(
|
||||
feature = "http-binding",
|
||||
feature = "actix",
|
||||
feature = "warp",
|
||||
feature = "reqwest",
|
||||
feature = "axum",
|
||||
feature = "poem"
|
||||
|
@ -20,13 +18,19 @@ pub mod axum;
|
|||
)]
|
||||
#[cfg(any(
|
||||
feature = "http-binding",
|
||||
feature = "actix",
|
||||
feature = "warp",
|
||||
feature = "reqwest",
|
||||
feature = "axum",
|
||||
feature = "poem"
|
||||
))]
|
||||
pub mod http;
|
||||
|
||||
#[cfg_attr(
|
||||
docsrs,
|
||||
doc(cfg(any(feature = "http-0-2-binding", feature = "actix", feature = "warp",)))
|
||||
)]
|
||||
#[cfg(any(feature = "http-0-2-binding", feature = "actix", feature = "warp",))]
|
||||
pub mod http_0_2;
|
||||
|
||||
#[cfg_attr(docsrs, doc(cfg(feature = "nats")))]
|
||||
#[cfg(feature = "nats")]
|
||||
pub mod nats;
|
||||
|
@ -52,6 +56,7 @@ pub(crate) mod kafka {
|
|||
}
|
||||
|
||||
pub(crate) static CLOUDEVENTS_JSON_HEADER: &str = "application/cloudevents+json";
|
||||
pub(crate) static CLOUDEVENTS_BATCH_JSON_HEADER: &str = "application/cloudevents-batch+json";
|
||||
pub(crate) static CONTENT_TYPE: &str = "content-type";
|
||||
|
||||
fn header_prefix(prefix: &str, name: &str) -> String {
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
use async_trait::async_trait;
|
||||
use crate::binding::http::to_event;
|
||||
use crate::Event;
|
||||
|
||||
use poem_lib::error::ResponseError;
|
||||
use poem_lib::http::StatusCode;
|
||||
use poem_lib::{FromRequest, Request, RequestBody, Result};
|
||||
|
||||
use crate::binding::http::to_event;
|
||||
use crate::Event;
|
||||
|
||||
impl ResponseError for crate::message::Error {
|
||||
fn status(&self) -> StatusCode {
|
||||
StatusCode::BAD_REQUEST
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<'a> FromRequest<'a> for Event {
|
||||
async fn from_request(req: &'a Request, body: &mut RequestBody) -> Result<Self> {
|
||||
Ok(to_event(req.headers(), body.take()?.into_vec().await?)?)
|
||||
|
@ -57,7 +55,7 @@ mod tests {
|
|||
|
||||
let (req, mut body) = req.split();
|
||||
let resp = Event::from_request(&req, &mut body).await.err().unwrap();
|
||||
assert_eq!(resp.as_response().status(), StatusCode::BAD_REQUEST);
|
||||
assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
|
||||
assert_eq!(resp.to_string(), "Invalid specversion BAD SPECIFICATION");
|
||||
}
|
||||
|
||||
|
|
|
@ -1,14 +1,33 @@
|
|||
use crate::{AttributesReader, Data, Event};
|
||||
|
||||
use bytes::Bytes;
|
||||
use poem_lib::http::StatusCode;
|
||||
use poem_lib::{IntoResponse, Response};
|
||||
|
||||
use crate::binding::http::builder::adapter::to_response;
|
||||
use crate::Event;
|
||||
|
||||
impl IntoResponse for Event {
|
||||
fn into_response(self) -> Response {
|
||||
match to_response(self) {
|
||||
Ok(resp) => resp.into(),
|
||||
Err(err) => (StatusCode::INTERNAL_SERVER_ERROR, err.to_string()).into(),
|
||||
let mut builder = Response::builder().status(StatusCode::OK);
|
||||
|
||||
if let Some(dct) = self.datacontenttype() {
|
||||
builder = builder.content_type(dct);
|
||||
}
|
||||
|
||||
for (key, value) in self.iter() {
|
||||
builder = builder.header(format!("ce-{key}").as_str(), value.to_string());
|
||||
}
|
||||
|
||||
match self.data {
|
||||
Some(data) => match data {
|
||||
Data::Binary(v) => builder.body(Bytes::copy_from_slice(v.as_slice())),
|
||||
Data::String(s) => builder.body(s.clone()),
|
||||
Data::Json(j) => match serde_json::to_string(&j) {
|
||||
Ok(s) => builder.body(s),
|
||||
Err(e) => Response::builder()
|
||||
.status(StatusCode::INTERNAL_SERVER_ERROR)
|
||||
.body(e.to_string()),
|
||||
},
|
||||
},
|
||||
None => builder.finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,10 +51,10 @@ impl BinaryDeserializer for ConsumerRecordDeserializer {
|
|||
})?,
|
||||
)?;
|
||||
|
||||
visitor = visitor.set_spec_version(spec_version.clone())?;
|
||||
|
||||
let attributes = spec_version.attribute_names();
|
||||
|
||||
visitor = visitor.set_spec_version(spec_version)?;
|
||||
|
||||
if let Some(hv) = self.headers.remove(CONTENT_TYPE) {
|
||||
visitor = visitor.set_attribute(
|
||||
"datacontenttype",
|
||||
|
|
|
@ -2,7 +2,7 @@ use reqwest_lib as reqwest;
|
|||
|
||||
use crate::binding::{
|
||||
http::{header_prefix, SPEC_VERSION_HEADER},
|
||||
CLOUDEVENTS_JSON_HEADER,
|
||||
CLOUDEVENTS_BATCH_JSON_HEADER, CLOUDEVENTS_JSON_HEADER,
|
||||
};
|
||||
use crate::event::SpecVersion;
|
||||
use crate::message::{
|
||||
|
@ -72,18 +72,35 @@ pub fn event_to_request(event: Event, request_builder: RequestBuilder) -> Result
|
|||
BinaryDeserializer::deserialize_binary(event, RequestSerializer::new(request_builder))
|
||||
}
|
||||
|
||||
/// Method to fill a [`RequestBuilder`] with a batched [`Vec<Event>`].
|
||||
pub fn events_to_request(
|
||||
events: Vec<Event>,
|
||||
request_builder: RequestBuilder,
|
||||
) -> Result<RequestBuilder> {
|
||||
let bytes = serde_json::to_vec(&events)?;
|
||||
Ok(request_builder
|
||||
.header(reqwest::header::CONTENT_TYPE, CLOUDEVENTS_BATCH_JSON_HEADER)
|
||||
.body(bytes))
|
||||
}
|
||||
|
||||
/// Extension Trait for [`RequestBuilder`] which acts as a wrapper for the function [`event_to_request()`].
|
||||
///
|
||||
/// This trait is sealed and cannot be implemented for types outside of this crate.
|
||||
pub trait RequestBuilderExt: private::Sealed {
|
||||
/// Write in this [`RequestBuilder`] the provided [`Event`]. Similar to invoking [`Event`].
|
||||
fn event(self, event: Event) -> Result<RequestBuilder>;
|
||||
/// Write in this [`RequestBuilder`] the provided batched [`Vec<Event>`].
|
||||
fn events(self, events: Vec<Event>) -> Result<RequestBuilder>;
|
||||
}
|
||||
|
||||
impl RequestBuilderExt for RequestBuilder {
|
||||
fn event(self, event: Event) -> Result<RequestBuilder> {
|
||||
event_to_request(event, self)
|
||||
}
|
||||
|
||||
fn events(self, events: Vec<Event>) -> Result<RequestBuilder> {
|
||||
events_to_request(events, self)
|
||||
}
|
||||
}
|
||||
|
||||
// Sealing the RequestBuilderExt
|
||||
|
@ -97,7 +114,7 @@ mod private {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use mockito::{mock, Matcher};
|
||||
use mockito::Matcher;
|
||||
use reqwest_lib as reqwest;
|
||||
|
||||
use crate::message::StructuredDeserializer;
|
||||
|
@ -106,7 +123,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_request() {
|
||||
let url = mockito::server_url();
|
||||
let m = mock("POST", "/")
|
||||
let m = mockito::mock("POST", "/")
|
||||
.match_header("ce-specversion", "1.0")
|
||||
.match_header("ce-id", "0001")
|
||||
.match_header("ce-type", "test_event.test_application")
|
||||
|
@ -132,7 +149,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_request_with_full_data() {
|
||||
let url = mockito::server_url();
|
||||
let m = mock("POST", "/")
|
||||
let m = mockito::mock("POST", "/")
|
||||
.match_header("ce-specversion", "1.0")
|
||||
.match_header("ce-id", "0001")
|
||||
.with_header("ce-type", "test_event.test_application")
|
||||
|
@ -166,7 +183,7 @@ mod tests {
|
|||
let input = fixtures::v10::full_json_data_string_extension();
|
||||
|
||||
let url = mockito::server_url();
|
||||
let m = mock("POST", "/")
|
||||
let m = mockito::mock("POST", "/")
|
||||
.match_header("content-type", "application/cloudevents+json")
|
||||
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
|
||||
.create();
|
||||
|
@ -183,4 +200,26 @@ mod tests {
|
|||
|
||||
m.assert();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batched_request() {
|
||||
let input = vec![fixtures::v10::full_json_data_string_extension()];
|
||||
|
||||
let url = mockito::server_url();
|
||||
let m = mockito::mock("POST", "/")
|
||||
.match_header("content-type", "application/cloudevents-batch+json")
|
||||
.match_body(Matcher::Exact(serde_json::to_string(&input).unwrap()))
|
||||
.create();
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
client
|
||||
.post(&url)
|
||||
.events(input)
|
||||
.unwrap()
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
m.assert();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
use reqwest_lib as reqwest;
|
||||
|
||||
use crate::binding::http;
|
||||
use crate::binding;
|
||||
use crate::message::{Error, Result};
|
||||
use crate::Event;
|
||||
use async_trait::async_trait;
|
||||
use http;
|
||||
use http::header;
|
||||
use reqwest::Response;
|
||||
|
||||
/// Method to transform an incoming [`Response`] to [`Event`].
|
||||
|
@ -12,7 +14,26 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
|
|||
let b = res.bytes().await.map_err(|e| Error::Other {
|
||||
source: Box::new(e),
|
||||
})?;
|
||||
http::to_event(&h, b.to_vec())
|
||||
binding::http::to_event(&h, b.to_vec())
|
||||
}
|
||||
|
||||
/// Method to transform an incoming [`Response`] to a batched [`Vec<Event>`]
|
||||
pub async fn response_to_events(res: Response) -> Result<Vec<Event>> {
|
||||
if res
|
||||
.headers()
|
||||
.get(header::CONTENT_TYPE)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.filter(|&v| v.starts_with(binding::CLOUDEVENTS_BATCH_JSON_HEADER))
|
||||
.is_none()
|
||||
{
|
||||
return Err(Error::WrongEncoding {});
|
||||
}
|
||||
|
||||
let bytes = res.bytes().await.map_err(|e| Error::Other {
|
||||
source: Box::new(e),
|
||||
})?;
|
||||
|
||||
Ok(serde_json::from_slice(&bytes)?)
|
||||
}
|
||||
|
||||
/// Extension Trait for [`Response`] which acts as a wrapper for the function [`response_to_event()`].
|
||||
|
@ -22,6 +43,8 @@ pub async fn response_to_event(res: Response) -> Result<Event> {
|
|||
pub trait ResponseExt: private::Sealed {
|
||||
/// Convert this [`Response`] to [`Event`].
|
||||
async fn into_event(self) -> Result<Event>;
|
||||
/// Convert this [`Response`] to a batched [`Vec<Event>`].
|
||||
async fn into_events(self) -> Result<Vec<Event>>;
|
||||
}
|
||||
|
||||
#[async_trait(?Send)]
|
||||
|
@ -29,6 +52,10 @@ impl ResponseExt for Response {
|
|||
async fn into_event(self) -> Result<Event> {
|
||||
response_to_event(self).await
|
||||
}
|
||||
|
||||
async fn into_events(self) -> Result<Vec<Event>> {
|
||||
response_to_events(self).await
|
||||
}
|
||||
}
|
||||
|
||||
// Sealing the ResponseExt
|
||||
|
@ -42,15 +69,15 @@ mod private {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use mockito::mock;
|
||||
use reqwest_lib as reqwest;
|
||||
use std::vec;
|
||||
|
||||
use crate::test::fixtures;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_response() {
|
||||
let url = mockito::server_url();
|
||||
let _m = mock("GET", "/")
|
||||
let _m = mockito::mock("GET", "/")
|
||||
.with_status(200)
|
||||
.with_header("ce-specversion", "1.0")
|
||||
.with_header("ce-id", "0001")
|
||||
|
@ -77,7 +104,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_response_with_full_data() {
|
||||
let url = mockito::server_url();
|
||||
let _m = mock("GET", "/")
|
||||
let _m = mockito::mock("GET", "/")
|
||||
.with_status(200)
|
||||
.with_header("ce-specversion", "1.0")
|
||||
.with_header("ce-id", "0001")
|
||||
|
@ -112,7 +139,7 @@ mod tests {
|
|||
let expected = fixtures::v10::full_json_data_string_extension();
|
||||
|
||||
let url = mockito::server_url();
|
||||
let _m = mock("GET", "/")
|
||||
let _m = mockito::mock("GET", "/")
|
||||
.with_status(200)
|
||||
.with_header(
|
||||
"content-type",
|
||||
|
@ -133,4 +160,31 @@ mod tests {
|
|||
|
||||
assert_eq!(expected, res);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batched_response() {
|
||||
let expected = vec![fixtures::v10::full_json_data_string_extension()];
|
||||
|
||||
let url = mockito::server_url();
|
||||
let _m = mockito::mock("GET", "/")
|
||||
.with_status(200)
|
||||
.with_header(
|
||||
"content-type",
|
||||
"application/cloudevents-batch+json; charset=utf-8",
|
||||
)
|
||||
.with_body(serde_json::to_string(&expected).unwrap())
|
||||
.create();
|
||||
|
||||
let client = reqwest::Client::new();
|
||||
let res = client
|
||||
.get(&url)
|
||||
.send()
|
||||
.await
|
||||
.unwrap()
|
||||
.into_events()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(expected, res);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use warp_lib as warp;
|
||||
|
||||
use crate::binding::http;
|
||||
use crate::binding::http_0_2 as http;
|
||||
|
||||
use crate::Event;
|
||||
use warp::http::HeaderMap;
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
//!
|
||||
//! ```
|
||||
//! # use warp_lib as warp;
|
||||
//! # use http_0_2 as http;
|
||||
//! use cloudevents::{Event, EventBuilder, EventBuilderV10};
|
||||
//! use http::StatusCode;
|
||||
//! use serde_json::json;
|
||||
|
@ -49,12 +50,11 @@
|
|||
//! .build();
|
||||
//!
|
||||
//! match event {
|
||||
//! Ok(event) => Ok(from_event(event)),
|
||||
//! Err(e) => Ok(warp::reply::with_status(
|
||||
//! Ok(event) => from_event(event),
|
||||
//! Err(e) => warp::reply::with_status(
|
||||
//! e.to_string(),
|
||||
//! StatusCode::INTERNAL_SERVER_ERROR,
|
||||
//! )
|
||||
//! .into_response()),
|
||||
//! ).into_response(),
|
||||
//! }
|
||||
//! });
|
||||
//!
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
use warp_lib as warp;
|
||||
|
||||
use crate::binding::http::builder::adapter::to_response;
|
||||
use crate::binding::http_0_2::builder::adapter::to_response;
|
||||
|
||||
use crate::Event;
|
||||
use http::StatusCode;
|
||||
use http_0_2 as http;
|
||||
use hyper_0_14 as hyper;
|
||||
use warp::reply::Response;
|
||||
|
||||
///
|
||||
/// # Serializes [`crate::Event`] as a http response
|
||||
///
|
||||
|
@ -32,6 +33,7 @@ pub fn from_event(event: Event) -> Response {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::test::fixtures;
|
||||
use hyper_0_14 as hyper;
|
||||
|
||||
#[test]
|
||||
fn test_response() {
|
||||
|
|
|
@ -2,6 +2,7 @@ use super::{
|
|||
AttributesIntoIteratorV03, AttributesIntoIteratorV10, AttributesV03, AttributesV10,
|
||||
ExtensionValue, SpecVersion, UriReference,
|
||||
};
|
||||
use base64::prelude::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::Serializer;
|
||||
use std::fmt;
|
||||
|
@ -11,13 +12,14 @@ use url::Url;
|
|||
/// This represents the types defined in the [CloudEvent spec type system](https://github.com/cloudevents/spec/blob/v1.0/spec.md#type-system)
|
||||
#[derive(Debug, PartialEq, Eq)]
|
||||
pub enum AttributeValue<'a> {
|
||||
SpecVersion(SpecVersion),
|
||||
String(&'a str),
|
||||
URI(&'a Url),
|
||||
URIRef(&'a UriReference),
|
||||
Boolean(&'a bool),
|
||||
Integer(&'a i64),
|
||||
String(&'a str),
|
||||
Binary(&'a [u8]),
|
||||
URI(&'a Url),
|
||||
URIRef(&'a UriReference),
|
||||
Time(&'a DateTime<Utc>),
|
||||
SpecVersion(SpecVersion),
|
||||
}
|
||||
|
||||
impl<'a> From<&'a ExtensionValue> for AttributeValue<'a> {
|
||||
|
@ -33,13 +35,14 @@ impl<'a> From<&'a ExtensionValue> for AttributeValue<'a> {
|
|||
impl fmt::Display for AttributeValue<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
AttributeValue::SpecVersion(s) => s.fmt(f),
|
||||
AttributeValue::Boolean(b) => f.serialize_bool(**b),
|
||||
AttributeValue::Integer(i) => f.serialize_i64(**i),
|
||||
AttributeValue::String(s) => f.write_str(s),
|
||||
AttributeValue::Binary(b) => f.write_str(&BASE64_STANDARD.encode(b)),
|
||||
AttributeValue::URI(s) => f.write_str(s.as_str()),
|
||||
AttributeValue::URIRef(s) => f.write_str(s.as_str()),
|
||||
AttributeValue::Time(s) => f.write_str(&s.to_rfc3339()),
|
||||
AttributeValue::Boolean(b) => f.serialize_bool(**b),
|
||||
AttributeValue::Integer(i) => f.serialize_i64(**i),
|
||||
AttributeValue::SpecVersion(s) => s.fmt(f),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ mod tests {
|
|||
use crate::EventBuilder;
|
||||
use crate::EventBuilderV03;
|
||||
use crate::EventBuilderV10;
|
||||
use claim::*;
|
||||
use claims::*;
|
||||
use rstest::rstest;
|
||||
use serde_json::{json, Value};
|
||||
use serde_yaml;
|
||||
|
|
|
@ -3,6 +3,7 @@ use super::{
|
|||
EventFormatSerializerV03, EventFormatSerializerV10,
|
||||
};
|
||||
use crate::event::{AttributesReader, ExtensionValue};
|
||||
use base64::prelude::*;
|
||||
use serde::de::{Error, IntoDeserializer};
|
||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||
use serde_json::{Map, Value};
|
||||
|
@ -58,7 +59,9 @@ pub fn parse_data_string<E: serde::de::Error>(v: Value) -> Result<String, E> {
|
|||
|
||||
pub fn parse_data_base64<E: serde::de::Error>(v: Value) -> Result<Vec<u8>, E> {
|
||||
parse_field!(v, String, E).and_then(|s| {
|
||||
base64::decode(s).map_err(|e| E::custom(format_args!("decode error `{}`", e)))
|
||||
BASE64_STANDARD
|
||||
.decode(s)
|
||||
.map_err(|e| E::custom(format_args!("decode error `{}`", e)))
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -84,26 +84,29 @@ pub struct Event {
|
|||
|
||||
#[delegate(self.attributes)]
|
||||
impl AttributesReader for Event {
|
||||
fn id(&self) -> &str;
|
||||
fn source(&self) -> &UriReference;
|
||||
fn specversion(&self) -> SpecVersion;
|
||||
fn ty(&self) -> &str;
|
||||
fn datacontenttype(&self) -> Option<&str>;
|
||||
fn dataschema(&self) -> Option<&Url>;
|
||||
fn subject(&self) -> Option<&str>;
|
||||
fn time(&self) -> Option<&DateTime<Utc>>;
|
||||
fn id(&self) -> &str {}
|
||||
fn source(&self) -> &UriReference {}
|
||||
fn specversion(&self) -> SpecVersion {}
|
||||
fn ty(&self) -> &str {}
|
||||
fn datacontenttype(&self) -> Option<&str> {}
|
||||
fn dataschema(&self) -> Option<&Url> {}
|
||||
fn subject(&self) -> Option<&str> {}
|
||||
fn time(&self) -> Option<&DateTime<Utc>> {}
|
||||
}
|
||||
|
||||
#[delegate(self.attributes)]
|
||||
impl AttributesWriter for Event {
|
||||
fn set_id(&mut self, id: impl Into<String>) -> String;
|
||||
fn set_source(&mut self, source: impl Into<UriReference>) -> UriReference;
|
||||
fn set_type(&mut self, ty: impl Into<String>) -> String;
|
||||
fn set_subject(&mut self, subject: Option<impl Into<String>>) -> Option<String>;
|
||||
fn set_time(&mut self, time: Option<impl Into<DateTime<Utc>>>) -> Option<DateTime<Utc>>;
|
||||
fn set_datacontenttype(&mut self, datacontenttype: Option<impl Into<String>>)
|
||||
-> Option<String>;
|
||||
fn set_dataschema(&mut self, dataschema: Option<impl Into<Url>>) -> Option<Url>;
|
||||
fn set_id(&mut self, id: impl Into<String>) -> String {}
|
||||
fn set_source(&mut self, source: impl Into<UriReference>) -> UriReference {}
|
||||
fn set_type(&mut self, ty: impl Into<String>) -> String {}
|
||||
fn set_subject(&mut self, subject: Option<impl Into<String>>) -> Option<String> {}
|
||||
fn set_time(&mut self, time: Option<impl Into<DateTime<Utc>>>) -> Option<DateTime<Utc>> {}
|
||||
fn set_datacontenttype(
|
||||
&mut self,
|
||||
datacontenttype: Option<impl Into<String>>,
|
||||
) -> Option<String> {
|
||||
}
|
||||
fn set_dataschema(&mut self, dataschema: Option<impl Into<Url>>) -> Option<Url> {}
|
||||
}
|
||||
|
||||
impl Default for Event {
|
||||
|
|
|
@ -221,7 +221,7 @@ impl crate::event::message::AttributesDeserializer for super::Attributes {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::test::fixtures;
|
||||
use chrono::NaiveDateTime;
|
||||
use chrono::DateTime;
|
||||
|
||||
#[test]
|
||||
fn iter_v03_test() {
|
||||
|
@ -243,13 +243,10 @@ mod tests {
|
|||
datacontenttype: None,
|
||||
schemaurl: None,
|
||||
subject: None,
|
||||
time: Some(DateTime::<Utc>::from_utc(
|
||||
NaiveDateTime::from_timestamp(61, 0),
|
||||
Utc,
|
||||
)),
|
||||
time: DateTime::from_timestamp(61, 0),
|
||||
};
|
||||
let b = &mut a.into_iter();
|
||||
let time = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(61, 0), Utc);
|
||||
let time = DateTime::from_timestamp(61, 0).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
("specversion", AttributeValue::SpecVersion(SpecVersion::V03)),
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::event::format::{
|
|||
parse_data_base64, parse_data_base64_json, parse_data_json, parse_data_string,
|
||||
};
|
||||
use crate::event::{Data, ExtensionValue};
|
||||
use base64::prelude::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::de::IntoDeserializer;
|
||||
use serde::ser::SerializeMap;
|
||||
|
@ -102,7 +103,7 @@ impl<S: serde::Serializer> crate::event::format::EventFormatSerializer<S, Attrib
|
|||
Some(Data::Json(j)) => state.serialize_entry("data", j)?,
|
||||
Some(Data::String(s)) => state.serialize_entry("data", s)?,
|
||||
Some(Data::Binary(v)) => {
|
||||
state.serialize_entry("data", &base64::encode(v))?;
|
||||
state.serialize_entry("data", &BASE64_STANDARD.encode(v))?;
|
||||
state.serialize_entry("datacontentencoding", "base64")?;
|
||||
}
|
||||
_ => (),
|
||||
|
|
|
@ -222,7 +222,6 @@ impl AttributesConverter for Attributes {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use crate::test::fixtures;
|
||||
use chrono::NaiveDateTime;
|
||||
|
||||
#[test]
|
||||
fn iter_v10_test() {
|
||||
|
@ -244,13 +243,10 @@ mod tests {
|
|||
datacontenttype: None,
|
||||
dataschema: None,
|
||||
subject: None,
|
||||
time: Some(DateTime::<Utc>::from_utc(
|
||||
NaiveDateTime::from_timestamp(61, 0),
|
||||
Utc,
|
||||
)),
|
||||
time: DateTime::from_timestamp(61, 0),
|
||||
};
|
||||
let b = &mut a.into_iter();
|
||||
let time = DateTime::<Utc>::from_utc(NaiveDateTime::from_timestamp(61, 0), Utc);
|
||||
let time = DateTime::from_timestamp(61, 0).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
("specversion", AttributeValue::SpecVersion(SpecVersion::V10)),
|
||||
|
|
|
@ -4,6 +4,7 @@ use crate::event::format::{
|
|||
parse_data_base64, parse_data_base64_json, parse_data_json, parse_data_string,
|
||||
};
|
||||
use crate::event::{Data, ExtensionValue};
|
||||
use base64::prelude::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::de::IntoDeserializer;
|
||||
use serde::ser::SerializeMap;
|
||||
|
@ -102,7 +103,9 @@ impl<S: serde::Serializer> crate::event::format::EventFormatSerializer<S, Attrib
|
|||
match data {
|
||||
Some(Data::Json(j)) => state.serialize_entry("data", j)?,
|
||||
Some(Data::String(s)) => state.serialize_entry("data", s)?,
|
||||
Some(Data::Binary(v)) => state.serialize_entry("data_base64", &base64::encode(v))?,
|
||||
Some(Data::Binary(v)) => {
|
||||
state.serialize_entry("data_base64", &BASE64_STANDARD.encode(v))?
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
for (k, v) in extensions {
|
||||
|
|
|
@ -54,7 +54,7 @@
|
|||
//! [Extractors]: https://actix.rs/docs/extractors/
|
||||
//! [Responders]: https://actix.rs/docs/handlers/
|
||||
|
||||
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.6.0")]
|
||||
#![doc(html_root_url = "https://docs.rs/cloudevents-sdk/0.8.0")]
|
||||
#![deny(rustdoc::broken_intra_doc_links)]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg))] // Show feature gate in doc
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use crate::event::{ExtensionValue, UriReference};
|
||||
use base64::prelude::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use std::convert::TryInto;
|
||||
use std::fmt;
|
||||
|
@ -46,10 +47,14 @@ impl fmt::Display for MessageAttributeValue {
|
|||
MessageAttributeValue::Boolean(b) => write!(f, "{}", b),
|
||||
MessageAttributeValue::Integer(i) => write!(f, "{}", i),
|
||||
MessageAttributeValue::String(s) => write!(f, "{}", s),
|
||||
MessageAttributeValue::Binary(v) => write!(f, "{}", base64::encode(v)),
|
||||
MessageAttributeValue::Binary(v) => {
|
||||
write!(f, "{}", BASE64_STANDARD.encode(v))
|
||||
}
|
||||
MessageAttributeValue::Uri(u) => write!(f, "{}", u),
|
||||
MessageAttributeValue::UriRef(u) => write!(f, "{}", u),
|
||||
MessageAttributeValue::DateTime(d) => write!(f, "{}", d.to_rfc3339()),
|
||||
MessageAttributeValue::DateTime(d) => {
|
||||
write!(f, "{}", d.to_rfc3339())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ pub fn subject() -> String {
|
|||
}
|
||||
|
||||
pub fn time() -> DateTime<Utc> {
|
||||
Utc.ymd(2020, 3, 16).and_hms(11, 50, 00)
|
||||
Utc.with_ymd_and_hms(2020, 3, 16, 11, 50, 00).unwrap()
|
||||
}
|
||||
|
||||
pub fn string_extension() -> (String, String) {
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::test::fixtures::*;
|
||||
use crate::{Event, EventBuilder, EventBuilderV03};
|
||||
use base64::prelude::*;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use url::Url;
|
||||
|
@ -120,7 +121,7 @@ pub fn full_json_base64_data_json() -> Value {
|
|||
"datacontenttype": json_datacontenttype(),
|
||||
"schemaurl": dataschema(),
|
||||
"datacontentencoding": "base64",
|
||||
"data": base64::encode(json_data_binary())
|
||||
"data": BASE64_STANDARD.encode(json_data_binary())
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -199,6 +200,6 @@ pub fn full_xml_base64_data_json() -> Value {
|
|||
int_ext_name: int_ext_value,
|
||||
"datacontenttype": xml_datacontenttype(),
|
||||
"datacontentencoding": "base64",
|
||||
"data": base64::encode(Vec::from(xml_data()))
|
||||
"data": BASE64_STANDARD.encode(Vec::from(xml_data()))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
use crate::test::fixtures::*;
|
||||
use crate::{Event, EventBuilder, EventBuilderV10};
|
||||
use base64::prelude::*;
|
||||
use serde_json::{json, Value};
|
||||
use url::Url;
|
||||
|
||||
|
@ -166,7 +167,7 @@ pub fn full_json_base64_data_json() -> Value {
|
|||
int_ext_name: int_ext_value,
|
||||
"datacontenttype": json_datacontenttype(),
|
||||
"dataschema": dataschema(),
|
||||
"data_base64": base64::encode(json_data_binary())
|
||||
"data_base64": BASE64_STANDARD.encode(json_data_binary())
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -175,7 +176,7 @@ pub fn full_non_json_base64_data() -> Value {
|
|||
Value::Object(mut m) => {
|
||||
m.insert(
|
||||
"data_base64".to_string(),
|
||||
Value::String(base64::encode(b"hello world")),
|
||||
Value::String(BASE64_STANDARD.encode(b"hello world")),
|
||||
);
|
||||
Value::Object(m)
|
||||
}
|
||||
|
@ -187,7 +188,11 @@ pub fn full_non_json_data() -> Event {
|
|||
let mut event = full_json_data();
|
||||
let value = full_non_json_base64_data();
|
||||
if let Value::Object(m) = value {
|
||||
event.set_data_unchecked(base64::decode(m["data_base64"].as_str().unwrap()).unwrap());
|
||||
event.set_data_unchecked(
|
||||
BASE64_STANDARD
|
||||
.decode(m["data_base64"].as_str().unwrap())
|
||||
.unwrap(),
|
||||
);
|
||||
}
|
||||
event
|
||||
}
|
||||
|
@ -266,6 +271,6 @@ pub fn full_xml_base64_data_json() -> Value {
|
|||
bool_ext_name: bool_ext_value,
|
||||
int_ext_name: int_ext_value,
|
||||
"datacontenttype": xml_datacontenttype(),
|
||||
"data_base64": base64::encode(Vec::from(xml_data()))
|
||||
"data_base64": BASE64_STANDARD.encode(Vec::from(xml_data()))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
#[test]
|
||||
#[cfg_attr(target_os = "wasi", ignore)]
|
||||
fn test_readme_deps() {
|
||||
version_sync::assert_markdown_deps_updated!("README.md");
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_os = "wasi", ignore)]
|
||||
fn test_html_root_url() {
|
||||
version_sync::assert_html_root_url_updated!("src/lib.rs");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue