Compare commits

...

32 Commits
v1.0.4 ... main

Author SHA1 Message Date
dependabot[bot] d4aaa03301
chore(deps): Bump github/codeql-action from 3.29.7 to 3.29.8 (#1282)
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 3.29.7 to 3.29.8.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](51f77329af...76621b61de)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-version: 3.29.8
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:39:56 +08:00
dependabot[bot] d68dba20c1
chore(deps): Bump clap from 4.5.41 to 4.5.43 (#1283)
Bumps [clap](https://github.com/clap-rs/clap) from 4.5.41 to 4.5.43.
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/clap_complete-v4.5.41...clap_complete-v4.5.43)

---
updated-dependencies:
- dependency-name: clap
  dependency-version: 4.5.43
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:39:31 +08:00
dependabot[bot] 8116538556
chore(deps): Bump glob from 0.3.2 to 0.3.3 (#1285)
Bumps [glob](https://github.com/rust-lang/glob) from 0.3.2 to 0.3.3.
- [Release notes](https://github.com/rust-lang/glob/releases)
- [Changelog](https://github.com/rust-lang/glob/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-lang/glob/compare/v0.3.2...v0.3.3)

---
updated-dependencies:
- dependency-name: glob
  dependency-version: 0.3.3
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:39:00 +08:00
dependabot[bot] 3822048e6b
chore(deps): Bump actions/download-artifact from 4 to 5 (#1286)
Bumps [actions/download-artifact](https://github.com/actions/download-artifact) from 4 to 5.
- [Release notes](https://github.com/actions/download-artifact/releases)
- [Commits](https://github.com/actions/download-artifact/compare/v4...v5)

---
updated-dependencies:
- dependency-name: actions/download-artifact
  dependency-version: '5'
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-13 11:38:35 +08:00
Gaius e05270e598
feat: get network speed for scheduler (#1279)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-08-11 22:24:16 +08:00
Chlins Zhang 2cca5c7b9b
refactor: remove the seed peer announcement to manager (#1261)
Signed-off-by: chlins <chlins.zhang@gmail.com>
2025-08-08 12:25:08 +08:00
this is my name 1f8a323665
docs:Modify Cache storage comments (#1277)
Modify Cache storage comments to clarify its usage scenarios.

Signed-off-by: fu220 <2863318196@qq.com>
2025-08-06 17:04:20 +08:00
Gaius e1ae65a48d
chore(ci/Dockerfile): add grpcurl for dfdaemon container (#1276)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-08-06 08:26:15 +00:00
Gaius e415df936d
feat: enable console subscriber layer for tracing spawn tasks on `127.0.0.1:6669` when log level is TRACE (#1275)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-08-06 06:58:47 +00:00
this is my name 848737e327
feat:removes the load_to_cache field (#1264)
This pull request removes the load_to_cache field and adds trait method definitions to dfdaemon_download.rs and dfdaemon_upload.rs.

- Removed cache-related processing from Task handling.
- Added trait method definitions such as download_cache_task to dfdaemon_download.rs and dfdaemon_upload.rs to comply with the API format of version 2.1.55.

- Aim to allow Task to focus on disk interactions while delegating memory cache operations to CacheTask.

Signed-off-by: fu220 <2863318196@qq.com>
2025-08-06 14:40:39 +08:00
Gaius 7796ee7342
chore(deps): remove unused dependencies (#1274)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-08-06 11:11:15 +08:00
dependabot[bot] 60ebb33b50
chore(deps): Bump hyper-util from 0.1.15 to 0.1.16 (#1267)
Bumps [hyper-util](https://github.com/hyperium/hyper-util) from 0.1.15 to 0.1.16.
- [Release notes](https://github.com/hyperium/hyper-util/releases)
- [Changelog](https://github.com/hyperium/hyper-util/blob/master/CHANGELOG.md)
- [Commits](https://github.com/hyperium/hyper-util/compare/v0.1.15...v0.1.16)

---
updated-dependencies:
- dependency-name: hyper-util
  dependency-version: 0.1.16
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-05 17:12:52 +08:00
dependabot[bot] b2ac70f5f6
chore(deps): Bump tokio-util from 0.7.15 to 0.7.16 (#1270)
Bumps [tokio-util](https://github.com/tokio-rs/tokio) from 0.7.15 to 0.7.16.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-util-0.7.15...tokio-util-0.7.16)

---
updated-dependencies:
- dependency-name: tokio-util
  dependency-version: 0.7.16
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-05 17:11:55 +08:00
Gaius 52b263ac66
feat: Disable compression in HTTP client configuration (#1273)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-08-05 16:58:43 +08:00
dependabot[bot] 15dea31154
chore(deps): Bump tokio from 1.46.1 to 1.47.1 (#1266)
Bumps [tokio](https://github.com/tokio-rs/tokio) from 1.46.1 to 1.47.1.
- [Release notes](https://github.com/tokio-rs/tokio/releases)
- [Commits](https://github.com/tokio-rs/tokio/compare/tokio-1.46.1...tokio-1.47.1)

---
updated-dependencies:
- dependency-name: tokio
  dependency-version: 1.47.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-05 11:37:48 +08:00
dependabot[bot] ab616f9498
chore(deps): Bump github/codeql-action from 3.29.3 to 3.29.5 (#1271)
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 3.29.3 to 3.29.5.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](d6bbdef45e...51f77329af)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-version: 3.29.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-05 03:26:36 +00:00
dependabot[bot] f0b406c37a
chore(deps): Bump kentaro-m/auto-assign-action from 586b61c136c65d09c1775da39cc4a80e026834f4 to 9f6dbe84a80c6e7639d1b9698048b201052a2a94 (#1272)
chore(deps): Bump kentaro-m/auto-assign-action

Bumps [kentaro-m/auto-assign-action](https://github.com/kentaro-m/auto-assign-action) from 586b61c136c65d09c1775da39cc4a80e026834f4 to 9f6dbe84a80c6e7639d1b9698048b201052a2a94.
- [Release notes](https://github.com/kentaro-m/auto-assign-action/releases)
- [Commits](586b61c136...9f6dbe84a8)

---
updated-dependencies:
- dependency-name: kentaro-m/auto-assign-action
  dependency-version: 9f6dbe84a80c6e7639d1b9698048b201052a2a94
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-05 11:23:13 +08:00
dependabot[bot] 3d4b07e86e
chore(deps): Bump taiki-e/cache-cargo-install-action from 2.2.0 to 2.3.0 (#1269)
Bumps [taiki-e/cache-cargo-install-action](https://github.com/taiki-e/cache-cargo-install-action) from 2.2.0 to 2.3.0.
- [Release notes](https://github.com/taiki-e/cache-cargo-install-action/releases)
- [Changelog](https://github.com/taiki-e/cache-cargo-install-action/blob/main/CHANGELOG.md)
- [Commits](1bb5728d79...b33c63d3b3)

---
updated-dependencies:
- dependency-name: taiki-e/cache-cargo-install-action
  dependency-version: 2.3.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-05 11:22:51 +08:00
dependabot[bot] 02690b8365
chore(deps): Bump serde_json from 1.0.141 to 1.0.142 (#1268)
Bumps [serde_json](https://github.com/serde-rs/json) from 1.0.141 to 1.0.142.
- [Release notes](https://github.com/serde-rs/json/releases)
- [Commits](https://github.com/serde-rs/json/compare/v1.0.141...v1.0.142)

---
updated-dependencies:
- dependency-name: serde_json
  dependency-version: 1.0.142
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-08-05 11:22:26 +08:00
Gaius f0c983093a
feat: add Range header to ensure Content-Length is returned in response headers (#1263)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-08-04 06:25:58 +00:00
Gaius cd6ca368d5
feat: rename tracing field from `uri` to `url` in proxy handler functions (#1260)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-30 16:55:56 +08:00
Cyrus 1aefde8ed4
refactor: improve filter_entries test function (#1259)
* refactor filter_entries test

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>

* fix a variable name

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>

* merge all filter_entries test

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>

* add a assert

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>

---------

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>
2025-07-29 11:18:49 +08:00
Gaius b30993eef0
refactor(dfget): Improve logic and error handling in `filter_entries` (#1258)
- Updated function signature to use references (`&Url`, `&[String]`) for efficiency.
- Improved error handling with detailed `ValidationError` messages instead of generic `UnexpectedResponse`.
- Renamed `rel_path_to_entry` to `entries_by_relative_path` for better clarity.
- Replaced `Vec` with `HashSet` for filtered entries to avoid duplicates.
- Simplified parent directory path construction using `join("")`.
- Enhanced doc comments to clearly describe functionality and behavior.
- Streamlined pattern compilation and iteration using `iter()`.

Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-25 14:09:34 +00:00
Cyrus cca88b3eea
feat: add include-files argrument in the downloaded directory (#1247)
* add include-files arg

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>

* fix regular expression matching

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>

* fix lint

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>

---------

Signed-off-by: LunaWhispers <yangmuyucs@gmail.com>
2025-07-25 17:13:57 +08:00
Gaius bf6f49e0e9
feat: add task ID response header in Dragonfly client proxy (#1256)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-24 18:16:34 +08:00
Gaius 777c131fbe
feat: use piece_timeout for list task entries (#1255)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-24 09:33:47 +00:00
Gaius 45f86226cf
feat: use piece_timeout for list task entries (#1254)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-24 07:47:25 +00:00
Gaius a340f0c2f1
chore: update crate version to 1.0.6 (#1253)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-24 14:01:55 +08:00
Gaius 5c87849f67
feat: update gRPC server to use non-cloned reflection service and fix task piece filtering logic (#1252)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-24 03:29:11 +00:00
Gaius 74ab386d87
feat: Enhance stat_task error handling in dfdaemon_download and dfdaemon_upload (#1251)
feat: Bump dragonfly-api to 2.1.49 and add local_only support for task stat

Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-23 10:06:11 +00:00
Gaius 10c73119cb
feat: Bump dragonfly-api to 2.1.49 and add local_only support for task stat (#1250)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-23 09:43:21 +00:00
Gaius 7190529693
feat(dragonfly-client): change permissions of download grpc uds (#1249)
Signed-off-by: Gaius <gaius.qi@gmail.com>
2025-07-23 02:58:01 +00:00
33 changed files with 1360 additions and 586 deletions

2
.cargo/config.toml Normal file
View File

@ -0,0 +1,2 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]

View File

@ -8,4 +8,4 @@ jobs:
add-assignee:
runs-on: ubuntu-latest
steps:
- uses: kentaro-m/auto-assign-action@586b61c136c65d09c1775da39cc4a80e026834f4
- uses: kentaro-m/auto-assign-action@9f6dbe84a80c6e7639d1b9698048b201052a2a94

View File

@ -94,7 +94,7 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@d6bbdef45e766d081b84a2def353b0055f728d3e
uses: github/codeql-action/upload-sarif@76621b61decf072c1cee8dd1ce2d2a82d33c17ed
with:
sarif_file: 'trivy-results.sarif'
@ -189,7 +189,7 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@d6bbdef45e766d081b84a2def353b0055f728d3e
uses: github/codeql-action/upload-sarif@76621b61decf072c1cee8dd1ce2d2a82d33c17ed
with:
sarif_file: 'trivy-results.sarif'
@ -284,7 +284,7 @@ jobs:
output: 'trivy-results.sarif'
- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@d6bbdef45e766d081b84a2def353b0055f728d3e
uses: github/codeql-action/upload-sarif@76621b61decf072c1cee8dd1ce2d2a82d33c17ed
with:
sarif_file: 'trivy-results.sarif'

View File

@ -52,7 +52,7 @@ jobs:
target: ${{ matrix.target }}
- name: Install cargo-deb
uses: taiki-e/cache-cargo-install-action@1bb5728d7988b14bfdd9690a8e5399fc8a3f75ab
uses: taiki-e/cache-cargo-install-action@b33c63d3b3c85540f4eba8a4f71a5cc0ce030855
with:
# Don't upgrade cargo-deb, refer to https://github.com/kornelski/cargo-deb/issues/169.
tool: cargo-deb@2.10.0
@ -119,7 +119,7 @@ jobs:
contents: write
steps:
- name: Download Release Artifacts
uses: actions/download-artifact@v4
uses: actions/download-artifact@v5
with:
path: releases
pattern: release-*

175
Cargo.lock generated
View File

@ -603,9 +603,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.41"
version = "4.5.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9"
checksum = "1fc0e74a703892159f5ae7d3aac52c8e6c392f5ae5f359c70b5881d60aaac318"
dependencies = [
"clap_builder",
"clap_derive",
@ -613,9 +613,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.41"
version = "4.5.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d"
checksum = "b3e7f4214277f3c7aa526a59dd3fbe306a370daee1f8b7b8c987069cd8e888a8"
dependencies = [
"anstream",
"anstyle",
@ -625,9 +625,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.5.41"
version = "4.5.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef4f52386a59ca4c860f7393bcf8abd8dfd91ecccc0f774635ff68e92eeef491"
checksum = "14cb31bb0a7d536caef2639baa7fad459e15c3144efefa6dbd1c84562c4739f6"
dependencies = [
"heck",
"proc-macro2",
@ -660,6 +660,45 @@ dependencies = [
"windows-sys 0.60.2",
]
[[package]]
name = "console-api"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857"
dependencies = [
"futures-core",
"prost 0.13.5",
"prost-types 0.13.5",
"tonic",
"tracing-core",
]
[[package]]
name = "console-subscriber"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01"
dependencies = [
"console-api",
"crossbeam-channel",
"crossbeam-utils",
"futures-task",
"hdrhistogram",
"humantime",
"hyper-util",
"prost 0.13.5",
"prost-types 0.13.5",
"serde",
"serde_json",
"thread_local",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "const-oid"
version = "0.9.6"
@ -939,9 +978,9 @@ dependencies = [
[[package]]
name = "dragonfly-api"
version = "2.1.47"
version = "2.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490d6bcc647f10a00afbf64f9180f5e705aeb939aa4d476463c336ab8590cb76"
checksum = "8d07e740a105d6dd2ce968318897beaf37ef8b8f581fbae3d0e227722857786b"
dependencies = [
"prost 0.13.5",
"prost-types 0.14.1",
@ -954,13 +993,14 @@ dependencies = [
[[package]]
name = "dragonfly-client"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"anyhow",
"bytes",
"bytesize",
"chrono",
"clap",
"console-subscriber",
"dashmap",
"dragonfly-api",
"dragonfly-client-backend",
@ -970,8 +1010,8 @@ dependencies = [
"dragonfly-client-util",
"fastrand",
"fs2",
"fslock",
"futures",
"glob",
"hashring",
"http 1.3.1",
"http-body-util",
@ -984,7 +1024,6 @@ dependencies = [
"lazy_static",
"leaky-bucket",
"local-ip-address",
"lru",
"openssl",
"opentelemetry",
"opentelemetry-otlp",
@ -1022,13 +1061,12 @@ dependencies = [
"tracing-subscriber",
"url",
"uuid",
"validator",
"warp",
]
[[package]]
name = "dragonfly-client-backend"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"dragonfly-api",
"dragonfly-client-core",
@ -1059,7 +1097,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-config"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"bytesize",
"bytesize-serde",
@ -1089,7 +1127,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-core"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"headers 0.4.1",
"hyper 1.6.0",
@ -1107,7 +1145,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-init"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"anyhow",
"clap",
@ -1117,7 +1155,6 @@ dependencies = [
"serde_json",
"tempfile",
"tokio",
"toml",
"toml_edit",
"tracing",
"url",
@ -1125,7 +1162,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-storage"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"bincode",
"bytes",
@ -1138,13 +1175,11 @@ dependencies = [
"dragonfly-client-core",
"dragonfly-client-util",
"fs2",
"lru",
"num_cpus",
"prost-wkt-types",
"reqwest",
"rocksdb",
"serde",
"sha2",
"tempfile",
"tokio",
"tokio-util",
@ -1154,7 +1189,7 @@ dependencies = [
[[package]]
name = "dragonfly-client-util"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"base64 0.22.1",
"bytesize",
@ -1164,7 +1199,6 @@ dependencies = [
"hex",
"http 1.3.1",
"http-range-header",
"hyper 1.6.0",
"lazy_static",
"lru",
"openssl",
@ -1176,6 +1210,7 @@ dependencies = [
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"sha2",
"sysinfo",
"tempfile",
"tokio",
"tracing",
@ -1335,16 +1370,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "fslock"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04412b8935272e3a9bae6f48c7bfff74c2911f60525404edfdd28e49884c3bfb"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "futures"
version = "0.3.31"
@ -1477,9 +1502,9 @@ checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "glob"
version = "0.3.1"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
checksum = "0cc23270f6e1808e30a928bdc84dea0b9b4136a8bc82338574f23baf47bbd280"
[[package]]
name = "h2"
@ -1563,13 +1588,26 @@ dependencies = [
[[package]]
name = "hdfs"
version = "1.0.4"
version = "1.0.10"
dependencies = [
"dragonfly-client-backend",
"dragonfly-client-core",
"tonic",
]
[[package]]
name = "hdrhistogram"
version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [
"base64 0.21.7",
"byteorder",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "headers"
version = "0.3.9"
@ -1788,7 +1826,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.5.9",
"tokio",
"tower-service",
"tracing",
@ -1866,9 +1904,9 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.15"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f66d5bd4c6f02bf0542fad85d626775bab9258cf795a4256dcaf3161114d1df"
checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e"
dependencies = [
"bytes",
"futures-channel",
@ -1879,7 +1917,7 @@ dependencies = [
"hyper 1.6.0",
"libc",
"pin-project-lite",
"socket2",
"socket2 0.6.0",
"tokio",
"tower-service",
"tracing",
@ -2830,9 +2868,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]]
name = "openssl-src"
version = "300.2.1+3.2.0"
version = "300.5.1+3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fe476c29791a5ca0d1273c697e96085bbabbbea2ef7afd5617e78a4b40332d3"
checksum = "735230c832b28c000e3bc117119e6466a663ec73506bc0a9907ea4187508e42a"
dependencies = [
"cc",
]
@ -4332,9 +4370,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.141"
version = "1.0.142"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b9eff21ebe718216c6ec64e1d9ac57087aad11efc64e32002bce4a0d4c03d3"
checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7"
dependencies = [
"itoa",
"memchr",
@ -4352,15 +4390,6 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_spanned"
version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf41e0cfaf7226dca15e8197172c295a782857fcb97fad1808a166870dee75a3"
dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
@ -4485,6 +4514,16 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807"
dependencies = [
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "spin"
version = "0.5.2"
@ -4870,9 +4909,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.46.1"
version = "1.47.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17"
checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
dependencies = [
"backtrace",
"bytes",
@ -4883,9 +4922,10 @@ dependencies = [
"pin-project-lite",
"signal-hook-registry",
"slab",
"socket2",
"socket2 0.6.0",
"tokio-macros",
"windows-sys 0.52.0",
"tracing",
"windows-sys 0.59.0",
]
[[package]]
@ -4956,9 +4996,9 @@ dependencies = [
[[package]]
name = "tokio-util"
version = "0.7.15"
version = "0.7.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df"
checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5"
dependencies = [
"bytes",
"futures-core",
@ -4971,26 +5011,11 @@ dependencies = [
"tokio",
]
[[package]]
name = "toml"
version = "0.8.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc1beb996b9d83529a9e75c17a1686767d148d70663143c7854d8b4a09ced362"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
]
[[package]]
name = "toml_datetime"
version = "0.6.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22cddaf88f4fbc13c51aebbf5f8eceb5c7c5a9da2ac40a13519eb5b0a0e8f11c"
dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
@ -4999,8 +5024,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a"
dependencies = [
"indexmap 2.5.0",
"serde",
"serde_spanned",
"toml_datetime",
"toml_write",
"winnow",
@ -5034,7 +5057,7 @@ dependencies = [
"pin-project",
"prost 0.13.5",
"rustls-pemfile 2.2.0",
"socket2",
"socket2 0.5.9",
"tokio",
"tokio-rustls 0.26.0",
"tokio-stream",

View File

@ -12,7 +12,7 @@ members = [
]
[workspace.package]
version = "1.0.4"
version = "1.0.10"
authors = ["The Dragonfly Developers"]
homepage = "https://d7y.io/"
repository = "https://github.com/dragonflyoss/client.git"
@ -22,14 +22,14 @@ readme = "README.md"
edition = "2021"
[workspace.dependencies]
dragonfly-client = { path = "dragonfly-client", version = "1.0.4" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.4" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.4" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.4" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.4" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.4" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.4" }
dragonfly-api = "=2.1.47"
dragonfly-client = { path = "dragonfly-client", version = "1.0.10" }
dragonfly-client-core = { path = "dragonfly-client-core", version = "1.0.10" }
dragonfly-client-config = { path = "dragonfly-client-config", version = "1.0.10" }
dragonfly-client-storage = { path = "dragonfly-client-storage", version = "1.0.10" }
dragonfly-client-backend = { path = "dragonfly-client-backend", version = "1.0.10" }
dragonfly-client-util = { path = "dragonfly-client-util", version = "1.0.10" }
dragonfly-client-init = { path = "dragonfly-client-init", version = "1.0.10" }
dragonfly-api = "2.1.57"
thiserror = "2.0"
futures = "0.3.31"
reqwest = { version = "0.12.4", features = [
@ -46,7 +46,7 @@ reqwest = { version = "0.12.4", features = [
reqwest-middleware = "0.4"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
hyper = { version = "1.6", features = ["full"] }
hyper-util = { version = "0.1.15", features = [
hyper-util = { version = "0.1.16", features = [
"client",
"client-legacy",
"tokio",
@ -71,8 +71,8 @@ serde_yaml = "0.9"
http = "1"
tonic = { version = "0.12.2", features = ["tls"] }
tonic-reflection = "0.12.3"
tokio = { version = "1.46.1", features = ["full"] }
tokio-util = { version = "0.7.15", features = ["full"] }
tokio = { version = "1.47.1", features = ["full", "tracing"] }
tokio-util = { version = "0.7.16", features = ["full"] }
tokio-stream = "0.1.17"
validator = { version = "0.16", features = ["derive"] }
warp = "0.3.5"
@ -91,7 +91,7 @@ opendal = { version = "0.48.0", features = [
"services-cos",
"services-webhdfs",
] }
clap = { version = "4.5.41", features = ["derive"] }
clap = { version = "4.5.45", features = ["derive"] }
anyhow = "1.0.98"
toml_edit = "0.22.26"
toml = "0.8.23"
@ -100,12 +100,13 @@ bytesize-serde = "0.2.1"
percent-encoding = "2.3.1"
tempfile = "3.20.0"
tokio-rustls = "0.25.0-alpha.4"
serde_json = "1.0.141"
serde_json = "1.0.142"
lru = "0.12.5"
fs2 = "0.4.3"
lazy_static = "1.5"
bytes = "1.10"
local-ip-address = "0.6.5"
sysinfo = { version = "0.32.1", default-features = false, features = ["component", "disk", "network", "system", "user"] }
[profile.release]
opt-level = 3

View File

@ -7,6 +7,7 @@ RUN apt-get update && apt-get install -y \
&& rm -rf /var/lib/apt/lists/*
COPY Cargo.toml Cargo.lock ./
COPY .cargo ./cargo
COPY dragonfly-client/Cargo.toml ./dragonfly-client/Cargo.toml
COPY dragonfly-client/src ./dragonfly-client/src
@ -40,6 +41,8 @@ RUN case "${TARGETPLATFORM}" in \
esac && \
cargo build --release --verbose --bin dfget --bin dfdaemon --bin dfcache
RUN cargo install tokio-console --locked --root /usr/local
FROM public.ecr.aws/docker/library/alpine:3.20 AS health
ENV GRPC_HEALTH_PROBE_VERSION=v0.4.24
@ -56,6 +59,7 @@ RUN if [ "$(uname -m)" = "ppc64le" ]; then \
FROM public.ecr.aws/docker/library/golang:1.23.0-alpine3.20 AS pprof
RUN go install github.com/google/pprof@latest
RUN go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
FROM public.ecr.aws/debian/debian:bookworm-slim
@ -67,7 +71,9 @@ RUN apt-get update && apt-get install -y --no-install-recommends iperf3 fio curl
COPY --from=builder /app/client/target/release/dfget /usr/local/bin/dfget
COPY --from=builder /app/client/target/release/dfdaemon /usr/local/bin/dfdaemon
COPY --from=builder /app/client/target/release/dfcache /usr/local/bin/dfcache
COPY --from=builder /usr/local/bin/tokio-console /usr/local/bin/
COPY --from=pprof /go/bin/pprof /bin/pprof
COPY --from=pprof /go/bin/grpcurl /bin/grpcurl
COPY --from=health /bin/grpc_health_probe /bin/grpc_health_probe
ENTRYPOINT ["/usr/local/bin/dfdaemon"]

View File

@ -7,6 +7,7 @@ RUN apt-get update && apt-get install -y \
&& rm -rf /var/lib/apt/lists/*
COPY Cargo.toml Cargo.lock ./
COPY .cargo ./cargo
COPY dragonfly-client/Cargo.toml ./dragonfly-client/Cargo.toml
COPY dragonfly-client/src ./dragonfly-client/src
@ -42,6 +43,7 @@ RUN case "${TARGETPLATFORM}" in \
RUN cargo install flamegraph --root /usr/local
RUN cargo install bottom --locked --root /usr/local
RUN cargo install tokio-console --locked --root /usr/local
FROM public.ecr.aws/docker/library/alpine:3.20 AS health
@ -59,6 +61,7 @@ RUN if [ "$(uname -m)" = "ppc64le" ]; then \
FROM public.ecr.aws/docker/library/golang:1.23.0-alpine3.20 AS pprof
RUN go install github.com/google/pprof@latest
RUN go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
FROM public.ecr.aws/debian/debian:bookworm-slim
@ -72,7 +75,9 @@ COPY --from=builder /app/client/target/debug/dfdaemon /usr/local/bin/dfdaemon
COPY --from=builder /app/client/target/debug/dfcache /usr/local/bin/dfcache
COPY --from=builder /usr/local/bin/flamegraph /usr/local/bin/
COPY --from=builder /usr/local/bin/btm /usr/local/bin/
COPY --from=builder /usr/local/bin/tokio-console /usr/local/bin/
COPY --from=pprof /go/bin/pprof /bin/pprof
COPY --from=pprof /go/bin/grpcurl /bin/grpcurl
COPY --from=health /bin/grpc_health_probe /bin/grpc_health_probe
ENTRYPOINT ["/usr/local/bin/dfdaemon"]

View File

@ -7,6 +7,7 @@ RUN apt-get update && apt-get install -y \
WORKDIR /app/client
COPY Cargo.toml Cargo.lock ./
COPY .cargo ./cargo
COPY dragonfly-client/Cargo.toml ./dragonfly-client/Cargo.toml
COPY dragonfly-client/src ./dragonfly-client/src

View File

@ -50,11 +50,22 @@ impl HTTP {
.with_custom_certificate_verifier(NoVerifier::new())
.with_no_client_auth();
// Disable automatic compression to prevent double-decompression issues.
//
// Problem scenario:
// 1. Origin server supports gzip and returns "content-encoding: gzip" header.
// 2. Backend decompresses the response and stores uncompressed content to disk.
// 3. When user's client downloads via dfdaemon proxy, the original "content-encoding: gzip".
// header is forwarded to it.
// 4. User's client attempts to decompress the already-decompressed content, causing errors.
//
// Solution: Disable all compression formats (gzip, brotli, zstd, deflate) to ensure
// we receive and store uncompressed content, eliminating the double-decompression issue.
let client = reqwest::Client::builder()
.gzip(true)
.brotli(true)
.zstd(true)
.deflate(true)
.no_gzip()
.no_brotli()
.no_zstd()
.no_deflate()
.use_preconfigured_tls(client_config_builder)
.pool_max_idle_per_host(super::POOL_MAX_IDLE_PER_HOST)
.tcp_keepalive(super::KEEP_ALIVE_INTERVAL)
@ -88,11 +99,22 @@ impl HTTP {
.with_root_certificates(root_cert_store)
.with_no_client_auth();
// Disable automatic compression to prevent double-decompression issues.
//
// Problem scenario:
// 1. Origin server supports gzip and returns "content-encoding: gzip" header.
// 2. Backend decompresses the response and stores uncompressed content to disk.
// 3. When user's client downloads via dfdaemon proxy, the original "content-encoding: gzip".
// header is forwarded to it.
// 4. User's client attempts to decompress the already-decompressed content, causing errors.
//
// Solution: Disable all compression formats (gzip, brotli, zstd, deflate) to ensure
// we receive and store uncompressed content, eliminating the double-decompression issue.
let client = reqwest::Client::builder()
.gzip(true)
.brotli(true)
.zstd(true)
.deflate(true)
.no_gzip()
.no_brotli()
.no_zstd()
.no_deflate()
.use_preconfigured_tls(client_config_builder)
.build()?;
@ -138,6 +160,13 @@ impl super::Backend for HTTP {
.client(request.client_cert)?
.get(&request.url)
.headers(header)
// Add Range header to ensure Content-Length is returned in response headers.
// Some servers (especially when using Transfer-Encoding: chunked,
// refer to https://developer.mozilla.org/en-US/docs/Web/HTTP/Reference/Headers/Transfer-Encoding.) may not
// include Content-Length in HEAD requests. Using "bytes=0-" requests the
// entire file starting from byte 0, forcing the server to include file size
// information in the response headers.
.header(reqwest::header::RANGE, "bytes=0-")
.timeout(request.timeout)
.send()
.await

View File

@ -166,7 +166,7 @@ where
}
/// The File Entry of a directory, including some relevant file metadata.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct DirEntry {
/// url is the url of the entry.
pub url: String,

View File

@ -226,18 +226,6 @@ fn default_storage_cache_capacity() -> ByteSize {
ByteSize::mib(64)
}
/// default_seed_peer_cluster_id is the default cluster id of seed peer.
#[inline]
fn default_seed_peer_cluster_id() -> u64 {
1
}
/// default_seed_peer_keepalive_interval is the default interval to keepalive with manager.
#[inline]
fn default_seed_peer_keepalive_interval() -> Duration {
Duration::from_secs(15)
}
/// default_gc_interval is the default interval to do gc.
#[inline]
fn default_gc_interval() -> Duration {
@ -924,18 +912,6 @@ pub struct SeedPeer {
/// kind is the type of seed peer.
#[serde(default, rename = "type")]
pub kind: HostType,
/// cluster_id is the cluster id of the seed peer cluster.
#[serde(default = "default_seed_peer_cluster_id", rename = "clusterID")]
#[validate(range(min = 1))]
pub cluster_id: u64,
/// keepalive_interval is the interval to keep alive with manager.
#[serde(
default = "default_seed_peer_keepalive_interval",
with = "humantime_serde"
)]
pub keepalive_interval: Duration,
}
/// SeedPeer implements Default.
@ -944,8 +920,6 @@ impl Default for SeedPeer {
SeedPeer {
enable: false,
kind: HostType::Normal,
cluster_id: default_seed_peer_cluster_id(),
keepalive_interval: default_seed_peer_keepalive_interval(),
}
}
}
@ -1027,31 +1001,33 @@ pub struct Storage {
/// cache_capacity is the cache capacity for downloading, default is 100.
///
/// Cache storage:
/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`.
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443.
/// 1. Users can preheat task by caching to memory (via CacheTask) or to disk (via Task).
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/dfdaemon.proto#L174.
/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no
/// page cache for the first read.
///
/// ```text
/// 1.Preheat
/// |
/// |
/// +--------------------------------------------------+
/// | | Peer |
/// | | +-----------+ |
/// | | -- Partial -->| Cache | |
/// | | | +-----------+ |
/// | v | | | |
/// | Download | Miss | |
/// | Task -->| | --- Hit ------>|<-- 2.Download
/// | | | ^ |
/// | | v | |
/// | | +-----------+ | |
/// | -- Full -->| Disk |---------- |
/// | +-----------+ |
/// | |
/// +--------------------------------------------------+
/// ```
///```text
/// +--------+
/// │ Source │
/// +--------+
/// ^ ^ Preheat
/// │ │ |
/// +-----------------+ │ │ +----------------------------+
/// │ Other Peers │ │ │ │ Peer | │
/// │ │ │ │ │ v │
/// │ +----------+ │ │ │ │ +----------+ │
/// │ │ Cache |<--|----------|<-Miss--| Cache |--Hit-->|<----Download CacheTask
/// │ +----------+ │ │ │ +----------+ │
/// │ │ │ │ │
/// │ +----------+ │ │ │ +----------+ │
/// │ │ Disk |<--|----------|<-Miss--| Disk |--Hit-->|<----Download Task
/// │ +----------+ │ │ +----------+ │
/// │ │ │ ^ │
/// │ │ │ | │
/// +-----------------+ +----------------------------+
/// |
/// Preheat
///```
#[serde(with = "bytesize_serde", default = "default_storage_cache_capacity")]
pub cache_capacity: ByteSize,
}
@ -2013,11 +1989,6 @@ key: /etc/ssl/private/client.pem
let default_seed_peer = SeedPeer::default();
assert!(!default_seed_peer.enable);
assert_eq!(default_seed_peer.kind, HostType::Normal);
assert_eq!(default_seed_peer.cluster_id, 1);
assert_eq!(
default_seed_peer.keepalive_interval,
default_seed_peer_keepalive_interval()
);
}
#[test]
@ -2025,20 +1996,9 @@ key: /etc/ssl/private/client.pem
let valid_seed_peer = SeedPeer {
enable: true,
kind: HostType::Weak,
cluster_id: 5,
keepalive_interval: Duration::from_secs(90),
};
assert!(valid_seed_peer.validate().is_ok());
let invalid_seed_peer = SeedPeer {
enable: true,
kind: HostType::Weak,
cluster_id: 0,
keepalive_interval: Duration::from_secs(90),
};
assert!(invalid_seed_peer.validate().is_err());
}
#[test]
@ -2055,8 +2015,6 @@ key: /etc/ssl/private/client.pem
assert!(seed_peer.enable);
assert_eq!(seed_peer.kind, HostType::Super);
assert_eq!(seed_peer.cluster_id, 2);
assert_eq!(seed_peer.keepalive_interval, Duration::from_secs(60));
}
#[test]

View File

@ -23,7 +23,6 @@ tokio.workspace = true
anyhow.workspace = true
tracing.workspace = true
toml_edit.workspace = true
toml.workspace = true
url.workspace = true
tempfile.workspace = true
serde_json.workspace = true

View File

@ -22,10 +22,8 @@ tracing.workspace = true
prost-wkt-types.workspace = true
tokio.workspace = true
tokio-util.workspace = true
sha2.workspace = true
crc32fast.workspace = true
fs2.workspace = true
lru.workspace = true
bytes.workspace = true
bytesize.workspace = true
num_cpus = "1.17"

View File

@ -76,31 +76,33 @@ impl Task {
/// Cache is the cache for storing piece content by LRU algorithm.
///
/// Cache storage:
/// 1. Users can create preheating jobs and preheat tasks to memory and disk by setting `load_to_cache` to `true`.
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/common.proto#L443.
/// 1. Users can preheat task by caching to memory (via CacheTask) or to disk (via Task).
/// For more details, refer to https://github.com/dragonflyoss/api/blob/main/proto/dfdaemon.proto#L174.
/// 2. If the download hits the memory cache, it will be faster than reading from the disk, because there is no
/// page cache for the first read.
///
/// ```text
/// 1.Preheat
/// |
/// |
/// +--------------------------------------------------+
/// | | Peer |
/// | | +-----------+ |
/// | | -- Partial -->| Cache | |
/// | | | +-----------+ |
/// | v | | | |
/// | Download | Miss | |
/// | Task -->| | --- Hit ------>|<-- 2.Download
/// | | | ^ |
/// | | v | |
/// | | +-----------+ | |
/// | -- Full -->| Disk |---------- |
/// | +-----------+ |
/// | |
/// +--------------------------------------------------+
/// ```
///```text
/// +--------+
/// │ Source │
/// +--------+
/// ^ ^ Preheat
/// │ │ |
/// +-----------------+ │ │ +----------------------------+
/// │ Other Peers │ │ │ │ Peer | │
/// │ │ │ │ │ v │
/// │ +----------+ │ │ │ │ +----------+ │
/// │ │ Cache |<--|----------|<-Miss--| Cache |--Hit-->|<----Download CacheTask
/// │ +----------+ │ │ │ +----------+ │
/// │ │ │ │ │
/// │ +----------+ │ │ │ +----------+ │
/// │ │ Disk |<--|----------|<-Miss--| Disk |--Hit-->|<----Download Task
/// │ +----------+ │ │ +----------+ │
/// │ │ │ ^ │
/// │ │ │ | │
/// +-----------------+ +----------------------------+
/// |
/// Preheat
///```
/// Task is the metadata of the task.
#[derive(Clone)]
pub struct Cache {

View File

@ -27,7 +27,6 @@ use std::time::Duration;
use tokio::io::AsyncRead;
use tokio::time::sleep;
use tokio_util::either::Either;
use tokio_util::io::InspectReader;
use tracing::{debug, error, info, instrument, warn};
pub mod cache;
@ -117,14 +116,8 @@ impl Storage {
piece_length: u64,
content_length: u64,
response_header: Option<HeaderMap>,
load_to_cache: bool,
) -> Result<metadata::Task> {
self.content.create_task(id, content_length).await?;
if load_to_cache {
let mut cache = self.cache.clone();
cache.put_task(id, content_length).await;
debug!("put task to cache: {}", id);
}
self.metadata.download_task_started(
id,
@ -422,11 +415,10 @@ impl Storage {
offset: u64,
length: u64,
reader: &mut R,
load_to_cache: bool,
timeout: Duration,
) -> Result<metadata::Piece> {
tokio::select! {
piece = self.handle_downloaded_from_source_finished(piece_id, task_id, offset, length, reader, load_to_cache) => {
piece = self.handle_downloaded_from_source_finished(piece_id, task_id, offset, length, reader) => {
piece
}
_ = sleep(timeout) => {
@ -444,30 +436,11 @@ impl Storage {
offset: u64,
length: u64,
reader: &mut R,
load_to_cache: bool,
) -> Result<metadata::Piece> {
let response = if load_to_cache {
let mut buffer = Vec::with_capacity(length as usize);
let mut tee = InspectReader::new(reader, |bytes| {
buffer.extend_from_slice(bytes);
});
let response = self
.content
.write_piece(task_id, offset, length, &mut tee)
.await?;
self.cache
.write_piece(task_id, piece_id, bytes::Bytes::from(buffer))
.await?;
debug!("put piece to cache: {}", piece_id);
response
} else {
self.content
.write_piece(task_id, offset, length, reader)
.await?
};
let response = self
.content
.write_piece(task_id, offset, length, reader)
.await?;
let digest = Digest::new(Algorithm::Crc32, response.hash);
self.metadata.download_piece_finished(
@ -491,11 +464,10 @@ impl Storage {
expected_digest: &str,
parent_id: &str,
reader: &mut R,
load_to_cache: bool,
timeout: Duration,
) -> Result<metadata::Piece> {
tokio::select! {
piece = self.handle_downloaded_piece_from_parent_finished(piece_id, task_id, offset, length, expected_digest, parent_id, reader, load_to_cache) => {
piece = self.handle_downloaded_piece_from_parent_finished(piece_id, task_id, offset, length, expected_digest, parent_id, reader) => {
piece
}
_ = sleep(timeout) => {
@ -516,30 +488,11 @@ impl Storage {
expected_digest: &str,
parent_id: &str,
reader: &mut R,
load_to_cache: bool,
) -> Result<metadata::Piece> {
let response = if load_to_cache {
let mut buffer = Vec::with_capacity(length as usize);
let mut tee = InspectReader::new(reader, |bytes| {
buffer.extend_from_slice(bytes);
});
let response = self
.content
.write_piece(task_id, offset, length, &mut tee)
.await?;
self.cache
.write_piece(task_id, piece_id, bytes::Bytes::from(buffer))
.await?;
debug!("put piece to cache: {}", piece_id);
response
} else {
self.content
.write_piece(task_id, offset, length, reader)
.await?
};
let response = self
.content
.write_piece(task_id, offset, length, reader)
.await?;
let length = response.length;
let digest = Digest::new(Algorithm::Crc32, response.hash);

View File

@ -13,7 +13,6 @@ edition.workspace = true
dragonfly-client-core.workspace = true
dragonfly-api.workspace = true
reqwest.workspace = true
hyper.workspace = true
http-range-header.workspace = true
http.workspace = true
tracing.workspace = true
@ -24,9 +23,10 @@ rustls-pki-types.workspace = true
rustls-pemfile.workspace = true
sha2.workspace = true
uuid.workspace = true
sysinfo.workspace = true
hex.workspace = true
openssl.workspace = true
crc32fast.workspace = true
openssl.workspace = true
lazy_static.workspace = true
bytesize.workspace = true
lru.workspace = true

View File

@ -14,48 +14,15 @@
* limitations under the License.
*/
use bytesize::{ByteSize, MB};
use bytesize::ByteSize;
use pnet::datalink::{self, NetworkInterface};
use std::cmp::min;
use std::net::IpAddr;
#[cfg(not(target_os = "linux"))]
use tracing::warn;
/// get_interface_by_ip returns the name of the network interface that has the specified IP
/// address.
pub fn get_interface_by_ip(ip: IpAddr) -> Option<NetworkInterface> {
for interface in datalink::interfaces() {
for ip_network in interface.ips.iter() {
if ip_network.ip() == ip {
return Some(interface);
}
}
}
None
}
/// get_interface_speed_by_ip returns the speed of the network interface that has the specified IP
/// address in Mbps.
pub fn get_interface_speed(interface_name: &str) -> Option<u64> {
#[cfg(target_os = "linux")]
{
let speed_path = format!("/sys/class/net/{}/speed", interface_name);
std::fs::read_to_string(&speed_path)
.ok()
.and_then(|speed_str| speed_str.trim().parse::<u64>().ok())
}
#[cfg(not(target_os = "linux"))]
{
warn!(
"can not get interface {} speed on non-linux platform",
interface_name
);
None
}
}
use std::sync::Arc;
use std::time::Duration;
use sysinfo::Networks;
use tokio::sync::Mutex;
use tracing::{info, warn};
/// Interface represents a network interface with its information.
#[derive(Debug, Clone, Default)]
@ -63,23 +30,201 @@ pub struct Interface {
/// name is the name of the network interface.
pub name: String,
// bandwidth is the bandwidth of the network interface in Mbps.
/// bandwidth is the bandwidth of the network interface in bps.
pub bandwidth: u64,
// network_data_mutex is a mutex to protect access to network data.
network_data_mutex: Arc<Mutex<()>>,
}
/// get_interface_info returns the network interface information for the specified IP address.
pub fn get_interface_info(ip: IpAddr, rate_limit: ByteSize) -> Option<Interface> {
let rate_limit = rate_limit.as_u64() / MB * 8; // convert to Mbps
/// NetworkData represents the network data for a specific interface,
#[derive(Debug, Clone, Default)]
pub struct NetworkData {
/// max_rx_bandwidth is the maximum receive bandwidth of the interface in bps.
pub max_rx_bandwidth: u64,
let interface = get_interface_by_ip(ip)?;
match get_interface_speed(&interface.name) {
Some(speed) => Some(Interface {
name: interface.name,
bandwidth: min(speed, rate_limit),
}),
None => Some(Interface {
name: interface.name,
bandwidth: rate_limit,
}),
/// rx_bandwidth is the current receive bandwidth of the interface in bps.
pub rx_bandwidth: Option<u64>,
/// max_tx_bandwidth is the maximum transmit bandwidth of the interface in bps.
pub max_tx_bandwidth: u64,
/// tx_bandwidth is the current transmit bandwidth of the interface in bps.
pub tx_bandwidth: Option<u64>,
}
/// Interface methods provide functionality to get network interface information.
impl Interface {
/// DEFAULT_NETWORKS_REFRESH_INTERVAL is the default interval for refreshing network data.
const DEFAULT_NETWORKS_REFRESH_INTERVAL: Duration = Duration::from_secs(2);
/// new creates a new Interface instance based on the provided IP address and rate limit.
pub fn new(ip: IpAddr, rate_limit: ByteSize) -> Interface {
let rate_limit = Self::byte_size_to_bits(rate_limit); // convert to bps
let Some(interface) = Self::get_network_interface_by_ip(ip) else {
warn!(
"can not find interface for IP address {}, network interface unknown with bandwidth {} bps",
ip, rate_limit
);
return Interface {
name: "unknown".to_string(),
bandwidth: rate_limit,
network_data_mutex: Arc::new(Mutex::new(())),
};
};
match Self::get_speed(&interface.name) {
Some(speed) => {
let bandwidth = min(Self::megabits_to_bits(speed), rate_limit);
info!(
"network interface {} with bandwidth {} bps",
interface.name, bandwidth
);
Interface {
name: interface.name,
bandwidth,
network_data_mutex: Arc::new(Mutex::new(())),
}
}
None => {
warn!(
"can not get speed, network interface {} with bandwidth {} bps",
interface.name, rate_limit
);
Interface {
name: interface.name,
bandwidth: rate_limit,
network_data_mutex: Arc::new(Mutex::new(())),
}
}
}
}
/// get_network_data retrieves the network data for the interface.
pub async fn get_network_data(&self) -> NetworkData {
// Lock the mutex to ensure exclusive access to network data.
let _guard = self.network_data_mutex.lock().await;
// Initialize sysinfo network.
let mut networks = Networks::new_with_refreshed_list();
// Sleep to calculate the network traffic difference over
// the DEFAULT_NETWORKS_REFRESH_INTERVAL.
tokio::time::sleep(Self::DEFAULT_NETWORKS_REFRESH_INTERVAL).await;
// Refresh network information.
networks.refresh();
let Some(network_data) = networks.get(self.name.as_str()) else {
warn!("can not find network data for interface {}", self.name);
return NetworkData {
max_rx_bandwidth: self.bandwidth,
max_tx_bandwidth: self.bandwidth,
..Default::default()
};
};
// Calculate the receive and transmit bandwidth in bits per second.
let rx_bandwidth = (Self::bytes_to_bits(network_data.received()) as f64
/ Self::DEFAULT_NETWORKS_REFRESH_INTERVAL.as_secs_f64())
.round() as u64;
// Calculate the transmit bandwidth in bits per second.
let tx_bandwidth = (Self::bytes_to_bits(network_data.transmitted()) as f64
/ Self::DEFAULT_NETWORKS_REFRESH_INTERVAL.as_secs_f64())
.round() as u64;
NetworkData {
max_rx_bandwidth: self.bandwidth,
rx_bandwidth: Some(rx_bandwidth),
max_tx_bandwidth: self.bandwidth,
tx_bandwidth: Some(tx_bandwidth),
}
}
/// get_speed returns the speed of the network interface in Mbps.
pub fn get_speed(name: &str) -> Option<u64> {
#[cfg(target_os = "linux")]
{
let speed_path = format!("/sys/class/net/{}/speed", name);
std::fs::read_to_string(&speed_path)
.ok()
.and_then(|speed_str| speed_str.trim().parse::<u64>().ok())
}
#[cfg(not(target_os = "linux"))]
{
warn!("can not get interface {} speed on non-linux platform", name);
None
}
}
/// get_network_interface_by_ip returns the network interface that has the specified
/// IP address.
pub fn get_network_interface_by_ip(ip: IpAddr) -> Option<NetworkInterface> {
datalink::interfaces()
.into_iter()
.find(|interface| interface.ips.iter().any(|ip_net| ip_net.ip() == ip))
}
/// byte_size_to_bits converts a ByteSize to bits.
pub fn byte_size_to_bits(size: ByteSize) -> u64 {
size.as_u64() * 8
}
/// megabits_to_bit converts megabits to bits.
pub fn megabits_to_bits(size: u64) -> u64 {
size * 1_000_000 // 1 Mbit = 1,000,000 bits
}
/// bytes_to_bits converts bytes to bits.
pub fn bytes_to_bits(size: u64) -> u64 {
size * 8 // 1 byte = 8 bits
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytesize::ByteSize;
#[test]
fn test_byte_size_to_bits() {
let test_cases = vec![
(ByteSize::kb(1), 8_000u64),
(ByteSize::mb(1), 8_000_000u64),
(ByteSize::gb(1), 8_000_000_000u64),
(ByteSize::b(0), 0u64),
];
for (input, expected) in test_cases {
let result = Interface::byte_size_to_bits(input);
assert_eq!(result, expected);
}
}
#[test]
fn test_megabits_to_bits() {
let test_cases = vec![
(1u64, 1_000_000u64),
(1000u64, 1_000_000_000u64),
(0u64, 0u64),
];
for (input, expected) in test_cases {
let result = Interface::megabits_to_bits(input);
assert_eq!(result, expected);
}
}
#[test]
fn test_bytes_to_bits() {
let test_cases = vec![(1u64, 8u64), (1000u64, 8_000u64), (0u64, 0u64)];
for (input, expected) in test_cases {
let result = Interface::bytes_to_bits(input);
assert_eq!(result, expected);
}
}
}

View File

@ -34,8 +34,6 @@ hyper.workspace = true
hyper-util.workspace = true
hyper-rustls.workspace = true
tracing.workspace = true
validator.workspace = true
humantime.workspace = true
serde.workspace = true
chrono.workspace = true
prost-wkt-types.workspace = true
@ -55,15 +53,16 @@ clap.workspace = true
anyhow.workspace = true
bytes.workspace = true
bytesize.workspace = true
humantime.workspace = true
uuid.workspace = true
percent-encoding.workspace = true
tokio-rustls.workspace = true
serde_json.workspace = true
lru.workspace = true
fs2.workspace = true
lazy_static.workspace = true
futures.workspace = true
local-ip-address.workspace = true
sysinfo.workspace = true
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3", features = ["env-filter", "time", "chrono"] }
tracing-panic = "0.1.2"
@ -76,11 +75,9 @@ rolling-file = "0.2.0"
pprof = { version = "0.15", features = ["flamegraph", "protobuf-codec"] }
prometheus = { version = "0.13", features = ["process"] }
tonic-health = "0.12.3"
sysinfo = { version = "0.32.1", default-features = false, features = ["component", "disk", "network", "system", "user"] }
tower = { version = "0.4.13", features = ["limit", "load-shed", "buffer"] }
indicatif = "0.18.0"
hashring = "0.3.6"
fslock = "0.2.1"
leaky-bucket = "1.1.2"
http-body-util = "0.1.3"
termion = "4.0.5"
@ -88,6 +85,8 @@ tabled = "0.20.0"
path-absolutize = "3.1.1"
dashmap = "6.1.0"
fastrand = "2.3.0"
glob = "0.3.3"
console-subscriber = "0.4.1"
[dev-dependencies]
tempfile.workspace = true

View File

@ -14,10 +14,9 @@
* limitations under the License.
*/
use crate::grpc::{manager::ManagerClient, scheduler::SchedulerClient};
use crate::grpc::scheduler::SchedulerClient;
use crate::shutdown;
use dragonfly_api::common::v2::{Build, Cpu, Disk, Host, Memory, Network};
use dragonfly_api::manager::v2::{DeleteSeedPeerRequest, SourceType, UpdateSeedPeerRequest};
use dragonfly_api::scheduler::v2::{AnnounceHostRequest, DeleteHostRequest};
use dragonfly_client_config::{
dfdaemon::{Config, HostType},
@ -25,89 +24,13 @@ use dragonfly_client_config::{
};
use dragonfly_client_core::error::{ErrorType, OrErr};
use dragonfly_client_core::Result;
use dragonfly_client_util::net::Interface;
use std::env;
use std::sync::Arc;
use std::time::Duration;
use sysinfo::System;
use tokio::sync::mpsc;
use tracing::{error, info, instrument};
/// ManagerAnnouncer is used to announce the dfdaemon information to the manager.
pub struct ManagerAnnouncer {
/// config is the configuration of the dfdaemon.
config: Arc<Config>,
/// manager_client is the grpc client of the manager.
manager_client: Arc<ManagerClient>,
/// shutdown is used to shutdown the announcer.
shutdown: shutdown::Shutdown,
/// _shutdown_complete is used to notify the announcer is shutdown.
_shutdown_complete: mpsc::UnboundedSender<()>,
}
/// ManagerAnnouncer implements the manager announcer of the dfdaemon.
impl ManagerAnnouncer {
/// new creates a new manager announcer.
pub fn new(
config: Arc<Config>,
manager_client: Arc<ManagerClient>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Self {
Self {
config,
manager_client,
shutdown,
_shutdown_complete: shutdown_complete_tx,
}
}
/// run announces the dfdaemon information to the manager.
pub async fn run(&self) -> Result<()> {
// Clone the shutdown channel.
let mut shutdown = self.shutdown.clone();
// If the seed peer is enabled, we should announce the seed peer to the manager.
if self.config.seed_peer.enable {
// Register the seed peer to the manager.
self.manager_client
.update_seed_peer(UpdateSeedPeerRequest {
source_type: SourceType::SeedPeerSource.into(),
hostname: self.config.host.hostname.clone(),
r#type: self.config.seed_peer.kind.to_string(),
idc: self.config.host.idc.clone(),
location: self.config.host.location.clone(),
ip: self.config.host.ip.unwrap().to_string(),
port: self.config.upload.server.port as i32,
download_port: self.config.upload.server.port as i32,
seed_peer_cluster_id: self.config.seed_peer.cluster_id,
})
.await?;
// Announce to scheduler shutting down with signals.
shutdown.recv().await;
// Delete the seed peer from the manager.
self.manager_client
.delete_seed_peer(DeleteSeedPeerRequest {
source_type: SourceType::SeedPeerSource.into(),
hostname: self.config.host.hostname.clone(),
ip: self.config.host.ip.unwrap().to_string(),
seed_peer_cluster_id: self.config.seed_peer.cluster_id,
})
.await?;
info!("announce to manager shutting down");
} else {
shutdown.recv().await;
info!("announce to manager shutting down");
}
Ok(())
}
}
use tracing::{debug, error, info, instrument};
/// Announcer is used to announce the dfdaemon information to the manager and scheduler.
pub struct SchedulerAnnouncer {
@ -120,6 +43,9 @@ pub struct SchedulerAnnouncer {
/// scheduler_client is the grpc client of the scheduler.
scheduler_client: Arc<SchedulerClient>,
/// interface is the network interface.
interface: Arc<Interface>,
/// shutdown is used to shutdown the announcer.
shutdown: shutdown::Shutdown,
@ -134,6 +60,7 @@ impl SchedulerAnnouncer {
config: Arc<Config>,
host_id: String,
scheduler_client: Arc<SchedulerClient>,
interface: Arc<Interface>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Result<Self> {
@ -141,6 +68,7 @@ impl SchedulerAnnouncer {
config,
host_id,
scheduler_client,
interface,
shutdown,
_shutdown_complete: shutdown_complete_tx,
};
@ -148,7 +76,7 @@ impl SchedulerAnnouncer {
// Initialize the scheduler announcer.
announcer
.scheduler_client
.init_announce_host(announcer.make_announce_host_request(Duration::ZERO)?)
.init_announce_host(announcer.make_announce_host_request(Duration::ZERO).await?)
.await?;
Ok(announcer)
}
@ -163,7 +91,7 @@ impl SchedulerAnnouncer {
loop {
tokio::select! {
_ = interval.tick() => {
let request = match self.make_announce_host_request(interval.period()) {
let request = match self.make_announce_host_request(interval.period()).await {
Ok(request) => request,
Err(err) => {
error!("make announce host request failed: {}", err);
@ -192,7 +120,7 @@ impl SchedulerAnnouncer {
/// make_announce_host_request makes the announce host request.
#[instrument(skip_all)]
fn make_announce_host_request(&self, interval: Duration) -> Result<AnnounceHostRequest> {
async fn make_announce_host_request(&self, interval: Duration) -> Result<AnnounceHostRequest> {
// If the seed peer is enabled, we should announce the seed peer to the scheduler.
let host_type = if self.config.seed_peer.enable {
self.config.seed_peer.kind
@ -228,25 +156,25 @@ impl SchedulerAnnouncer {
free: sys.free_memory(),
};
// Wait for getting the network data.
let network_data = self.interface.get_network_data().await;
debug!(
"network data: rx bandwidth {}/{} bps, tx bandwidth {}/{} bps",
network_data.rx_bandwidth.unwrap_or(0),
network_data.max_rx_bandwidth,
network_data.tx_bandwidth.unwrap_or(0),
network_data.max_tx_bandwidth
);
// Get the network information.
let network = Network {
// TODO: Get the count of the tcp connection.
tcp_connection_count: 0,
// TODO: Get the count of the upload tcp connection.
upload_tcp_connection_count: 0,
idc: self.config.host.idc.clone(),
location: self.config.host.location.clone(),
// TODO: Get the network download rate, refer to
// https://docs.rs/sysinfo/latest/sysinfo/struct.NetworkData.html#method.received.
download_rate: 0,
download_rate_limit: self.config.download.rate_limit.as_u64(),
// TODO: Get the network download rate, refer to
// https://docs.rs/sysinfo/latest/sysinfo/struct.NetworkData.html#method.transmitted
upload_rate: 0,
upload_rate_limit: self.config.upload.rate_limit.as_u64(),
max_rx_bandwidth: network_data.max_rx_bandwidth,
rx_bandwidth: network_data.rx_bandwidth,
max_tx_bandwidth: network_data.max_tx_bandwidth,
tx_bandwidth: network_data.tx_bandwidth,
..Default::default()
};
// Get the disk information.

View File

@ -129,7 +129,13 @@ pub struct ExportCommand {
/// Implement the execute for ExportCommand.
impl ExportCommand {
/// execute executes the export command.
/// Executes the export command with comprehensive validation and advanced error handling.
///
/// This function serves as the main entry point for the dfcache export command execution.
/// It handles the complete workflow including argument parsing, validation, logging setup,
/// dfdaemon client connection, and export operation execution. The function provides
/// sophisticated error reporting with colored terminal output, including specialized
/// handling for backend errors with HTTP status codes and headers.
pub async fn execute(&self) -> Result<()> {
// Parse command line arguments.
Args::parse();
@ -436,7 +442,13 @@ impl ExportCommand {
Ok(())
}
/// run runs the export command.
/// Executes the export operation to retrieve cached files from the persistent cache system.
///
/// This function handles the core export functionality by downloading a cached file from the
/// dfdaemon persistent cache system. It supports two transfer modes: direct file transfer
/// by dfdaemon (hardlink/copy) or streaming piece content through the client for manual
/// file assembly. The operation provides real-time progress feedback and handles file
/// creation, directory setup, and efficient piece-by-piece writing with sparse file allocation.
async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
// Dfcache needs to notify dfdaemon to transfer the piece content of downloading file via unix domain socket
// when the `transfer_from_dfdaemon` is true. Otherwise, dfdaemon will download the file and hardlink or
@ -565,7 +577,12 @@ impl ExportCommand {
Ok(())
}
/// validate_args validates the command line arguments.
/// Validates command line arguments for the export operation to ensure safe file output.
///
/// This function performs essential validation of the output path to prevent file conflicts
/// and ensure the target location is suitable for export operations. It checks parent
/// directory existence, prevents accidental file overwrites, and validates path accessibility
/// before allowing the export operation to proceed.
fn validate_args(&self) -> Result<()> {
let absolute_path = Path::new(&self.output).absolutize()?;
match absolute_path.parent() {

View File

@ -128,7 +128,13 @@ pub struct ImportCommand {
/// Implement the execute for ImportCommand.
impl ImportCommand {
/// execute executes the import sub command.
/// Executes the import sub command with comprehensive validation and error handling.
///
/// This function serves as the main entry point for the dfcache import command execution.
/// It handles the complete workflow including argument parsing, validation, logging setup,
/// dfdaemon client connection, and import operation execution. The function provides
/// detailed error reporting with colored terminal output and follows a fail-fast approach
/// with immediate process termination on any critical failures.
pub async fn execute(&self) -> Result<()> {
// Parse command line arguments.
Args::parse();
@ -326,7 +332,13 @@ impl ImportCommand {
Ok(())
}
/// run runs the import sub command.
/// Executes the cache import operation by uploading a file to the persistent cache system.
///
/// This function handles the core import functionality by uploading a local file to the
/// dfdaemon persistent cache system. It provides visual feedback through a progress spinner,
/// converts the file path to absolute format, and configures the cache task with specified
/// parameters including TTL, replica count, and piece length. The operation is asynchronous
/// and provides completion feedback with the generated task ID.
async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
let absolute_path = Path::new(&self.path).absolutize()?;
info!("import file: {}", absolute_path.to_string_lossy());
@ -363,7 +375,12 @@ impl ImportCommand {
Ok(())
}
/// validate_args validates the command line arguments.
/// Validates command line arguments for the import operation to ensure safe and correct execution.
///
/// This function performs comprehensive validation of import-specific parameters to prevent
/// invalid operations and ensure the import request meets all system requirements. It validates
/// TTL boundaries, file existence and type, and piece length constraints before allowing the
/// import operation to proceed.
fn validate_args(&self) -> Result<()> {
if self.ttl < Duration::from_secs(5 * 60)
|| self.ttl > Duration::from_secs(7 * 24 * 60 * 60)

View File

@ -106,7 +106,12 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
/// get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health.
/// Creates and validates a dfdaemon download client with health checking.
///
/// This function establishes a connection to the dfdaemon service via Unix domain socket
/// and performs a health check to ensure the service is running and ready to handle
/// download requests. Only after successful health verification does it return the
/// download client for actual use.
pub async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownloadClient> {
// Check dfdaemon's health.
let health_client = HealthClient::new_unix(endpoint.clone()).await?;

View File

@ -74,7 +74,13 @@ pub struct StatCommand {
/// Implement the execute for StatCommand.
impl StatCommand {
/// execute executes the stat command.
/// Executes the stat command with comprehensive error handling and user feedback.
///
/// This function serves as the main entry point for the dfcache stat command execution.
/// It handles the complete lifecycle including argument parsing, logging initialization,
/// dfdaemon client setup, and command execution with detailed error reporting. The
/// function provides colored terminal output for better user experience and exits
/// with appropriate status codes on failure.
pub async fn execute(&self) -> Result<()> {
// Parse command line arguments.
Args::parse();
@ -234,7 +240,12 @@ impl StatCommand {
Ok(())
}
/// run runs the stat command.
/// Executes the stat command to retrieve and display persistent cache task information.
///
/// This function queries the dfdaemon service for detailed information about a specific
/// persistent cache task and presents it in a formatted table for user consumption.
/// It handles data conversion from raw protocol buffer values to human-readable formats
/// including byte sizes, durations, and timestamps with proper timezone conversion.
async fn run(&self, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
let task = dfdaemon_download_client
.stat_persistent_cache_task(StatPersistentCacheTaskRequest {

View File

@ -15,7 +15,7 @@
*/
use clap::Parser;
use dragonfly_client::announcer::{ManagerAnnouncer, SchedulerAnnouncer};
use dragonfly_client::announcer::SchedulerAnnouncer;
use dragonfly_client::dynconfig::Dynconfig;
use dragonfly_client::gc::GC;
use dragonfly_client::grpc::{
@ -32,7 +32,7 @@ use dragonfly_client::tracing::init_tracing;
use dragonfly_client_backend::BackendFactory;
use dragonfly_client_config::{dfdaemon, VersionValueParser};
use dragonfly_client_storage::Storage;
use dragonfly_client_util::id_generator::IDGenerator;
use dragonfly_client_util::{id_generator::IDGenerator, net::Interface};
use std::net::SocketAddr;
use std::path::PathBuf;
use std::sync::Arc;
@ -229,6 +229,9 @@ async fn main() -> Result<(), anyhow::Error> {
)?;
let persistent_cache_task = Arc::new(persistent_cache_task);
let interface = Interface::new(config.host.ip.unwrap(), config.upload.rate_limit);
let interface = Arc::new(interface);
// Initialize health server.
let health = Health::new(
SocketAddr::new(config.health.server.ip.unwrap(), config.health.server.port),
@ -258,19 +261,12 @@ async fn main() -> Result<(), anyhow::Error> {
shutdown_complete_tx.clone(),
);
// Initialize manager announcer.
let manager_announcer = ManagerAnnouncer::new(
config.clone(),
manager_client.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
// Initialize scheduler announcer.
let scheduler_announcer = SchedulerAnnouncer::new(
config.clone(),
id_generator.host_id(),
scheduler_client.clone(),
interface.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
)
@ -285,6 +281,7 @@ async fn main() -> Result<(), anyhow::Error> {
SocketAddr::new(config.upload.server.ip.unwrap(), config.upload.server.port),
task.clone(),
persistent_cache_task.clone(),
interface.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
@ -333,10 +330,6 @@ async fn main() -> Result<(), anyhow::Error> {
info!("stats server exited");
},
_ = tokio::spawn(async move { manager_announcer.run().await.unwrap_or_else(|err| error!("announcer manager failed: {}", err))} ) => {
info!("announcer manager exited");
},
_ = tokio::spawn(async move { scheduler_announcer.run().await }) => {
info!("announcer scheduler exited");
},

View File

@ -31,11 +31,13 @@ use dragonfly_client_config::{self, dfdaemon, dfget};
use dragonfly_client_core::error::{ErrorType, OrErr};
use dragonfly_client_core::{Error, Result};
use dragonfly_client_util::{fs::fallocate, http::header_vec_to_hashmap};
use glob::Pattern;
use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};
use local_ip_address::local_ip;
use path_absolutize::*;
use percent_encoding::percent_decode_str;
use std::path::{Path, PathBuf};
use std::collections::{HashMap, HashSet};
use std::path::{Component, Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@ -109,7 +111,7 @@ struct Args {
#[arg(
long = "content-for-calculating-task-id",
help = "Specify the content used to calculate the task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the task ID based on url, piece-length, tag, application, and filtered-query-params."
help = "Specify the content used to calculate the task ID. If it is set, use its value to calculate the task ID, Otherwise, calculate the task ID based on URL, piece-length, tag, application, and filtered-query-params."
)]
content_for_calculating_task_id: Option<String>,
@ -139,7 +141,7 @@ struct Args {
#[arg(
long = "digest",
required = false,
help = "Verify the integrity of the downloaded file using the specified digest, support sha256, sha512, crc32. If the digest is not specified, the downloaded file will not be verified. Format: <algorithm>:<digest>, e.g. sha256:1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef, crc32:12345678"
help = "Verify the integrity of the downloaded file using the specified digest, support sha256, sha512, crc32. If the digest is not specified, the downloaded file will not be verified. Format: <algorithm>:<digest>. Examples: sha256:1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef, crc32:12345678"
)]
digest: Option<String>,
@ -161,14 +163,14 @@ struct Args {
#[arg(
long = "application",
default_value = "",
help = "Different applications for the same url will be divided into different tasks"
help = "Different applications for the same URL will be divided into different tasks"
)]
application: String,
#[arg(
long = "tag",
default_value = "",
help = "Different tags for the same url will be divided into different tasks"
help = "Different tags for the same URL will be divided into different tasks"
)]
tag: String,
@ -176,17 +178,24 @@ struct Args {
short = 'H',
long = "header",
required = false,
help = "Specify the header for downloading file, e.g. --header='Content-Type: application/json' --header='Accept: application/json'"
help = "Specify the header for downloading file. Examples: --header='Content-Type: application/json' --header='Accept: application/json'"
)]
header: Option<Vec<String>>,
#[arg(
long = "filtered-query-param",
required = false,
help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task, e.g. --filtered-query-param='signature' --filtered-query-param='timeout'"
help = "Filter the query parameters of the downloaded URL. If the download URL is the same, it will be scheduled as the same task. Examples: --filtered-query-param='signature' --filtered-query-param='timeout'"
)]
filtered_query_params: Option<Vec<String>>,
#[arg(
long = "include-files",
required = false,
help = "Filter files to download in a directory using glob patterns relative to the root URL's path. Examples: --include-files='*.txt' --include-files='subdir/file.txt'"
)]
include_files: Option<Vec<String>>,
#[arg(
long = "disable-back-to-source",
default_value_t = false,
@ -594,7 +603,12 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
/// run runs the dfget command.
/// Runs the dfget command to download files or directories from a given URL.
///
/// This function serves as the main entry point for the dfget download operation.
/// It handles both single file downloads and directory downloads based on the URL format.
/// The function performs path normalization, validates the URL scheme's capabilities,
/// and delegates to the appropriate download handler.
async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) -> Result<()> {
// Get the absolute path of the output file.
args.output = Path::new(&args.output).absolutize()?.into();
@ -614,7 +628,13 @@ async fn run(mut args: Args, dfdaemon_download_client: DfdaemonDownloadClient) -
download(args, ProgressBar::new(0), dfdaemon_download_client).await
}
/// download_dir downloads all files in the directory.
/// Downloads all files in a directory from various storage backends (object storage, HDFS, etc.).
///
/// This function handles directory-based downloads by recursively fetching all entries
/// in the specified directory. It supports filtering files based on include patterns,
/// enforces download limits, and performs concurrent downloads with configurable
/// concurrency control. The function creates the necessary directory structure
/// locally and downloads files while preserving the remote directory hierarchy.
async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Result<()> {
// Initialize the object storage config and the hdfs config.
let object_storage = Some(ObjectStorage {
@ -631,12 +651,17 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
delegation_token: args.hdfs_delegation_token.clone(),
});
// Get all entries in the directory. If the directory is empty, then return directly.
let entries = get_entries(args.clone(), object_storage, hdfs, download_client.clone()).await?;
// Get all entries in the directory.
let mut entries = get_entries(&args, object_storage, hdfs, download_client.clone()).await?;
if let Some(ref include_files) = args.include_files {
entries = filter_entries(&args.url, entries, include_files)?;
}
// If the entries is empty, then return directly.
if entries.is_empty() {
warn!("directory {} is empty", args.url);
warn!("no entries found in directory {}", args.url);
return Ok(());
};
}
// If the actual file count is greater than the max_files, then reject the downloading.
let count = entries.iter().filter(|entry| !entry.is_dir).count();
@ -707,7 +732,13 @@ async fn download_dir(args: Args, download_client: DfdaemonDownloadClient) -> Re
Ok(())
}
/// download downloads the single file.
/// Downloads a single file from various storage backends using the dfdaemon service.
///
/// This function handles single file downloads by communicating with a dfdaemon client.
/// It supports multiple storage protocols (object storage, HDFS, HTTP/HTTPS) and provides
/// two transfer modes: direct download by dfdaemon or streaming piece content through
/// the client. The function includes progress tracking, file creation, and proper error
/// handling throughout the download process.
async fn download(
args: Args,
progress_bar: ProgressBar,
@ -777,7 +808,6 @@ async fn download(
need_piece_content,
object_storage,
hdfs,
load_to_cache: false,
force_hard_link: args.force_hard_link,
content_for_calculating_task_id: args.content_for_calculating_task_id,
remote_ip: Some(local_ip().unwrap().to_string()),
@ -880,9 +910,15 @@ async fn download(
Ok(())
}
/// get_entries gets all entries in the directory.
/// Retrieves all directory entries from a remote storage location.
///
/// This function communicates with the dfdaemon service to list all entries
/// (files and subdirectories) in the specified directory URL. It supports
/// various storage backends including object storage and HDFS by passing
/// the appropriate credentials and configuration. The function converts
/// the gRPC response into a local `DirEntry` format for further processing.
async fn get_entries(
args: Args,
args: &Args,
object_storage: Option<ObjectStorage>,
hdfs: Option<Hdfs>,
download_client: DfdaemonDownloadClient,
@ -893,10 +929,8 @@ async fn get_entries(
.list_task_entries(ListTaskEntriesRequest {
task_id: Uuid::new_v4().to_string(),
url: args.url.to_string(),
request_header: header_vec_to_hashmap(args.header.unwrap_or_default())?,
timeout: Some(
prost_wkt_types::Duration::try_from(args.timeout).or_err(ErrorType::ParseError)?,
),
request_header: header_vec_to_hashmap(args.header.clone().unwrap_or_default())?,
timeout: None,
certificate_chain: Vec::new(),
object_storage,
hdfs,
@ -918,7 +952,74 @@ async fn get_entries(
.collect())
}
/// make_output_by_entry makes the output path by the entry information.
/// Filters directory entries based on include patterns and validates their URLs.
///
/// This function takes a collection of directory entries and filters them based on
/// glob patterns specified in `include_files`. It performs URL validation to ensure
/// all entries have valid URLs and that their paths fall within the scope of the
/// root URL. When an entry matches a pattern, both the entry and its parent
/// directory (if it exists) are included in the result.
fn filter_entries(
url: &Url,
entries: Vec<DirEntry>,
include_files: &[String],
) -> Result<Vec<DirEntry>> {
let patterns: Vec<Pattern> = include_files
.iter()
.filter_map(|include_file| Pattern::new(include_file).ok())
.collect();
// Build a HashMap of DirEntry objects keyed by relative paths for filtering and
// validates URLs and ensures paths are within the root URL's scope.
let mut entries_by_relative_path = HashMap::with_capacity(entries.len());
for entry in entries {
let entry_url = Url::parse(&entry.url).map_err(|err| {
error!("failed to parse entry URL '{}': {}", entry.url, err);
Error::ValidationError(format!("invalid URL: {}", entry.url))
})?;
let entry_path = entry_url.path();
match entry_path.strip_prefix(url.path()) {
Some(relative_path) => entries_by_relative_path
.insert(relative_path.trim_start_matches('/').to_string(), entry),
None => {
error!(
"entry path '{}' does not belong to the root path",
entry_path
);
return Err(Error::ValidationError(format!(
"path '{}' is outside the expected scope",
entry_path
)));
}
};
}
// Filter entries by matching relative paths against patterns, including
// parent directories for matches.
let mut filtered_entries = HashSet::new();
for (relative_path, entry) in &entries_by_relative_path {
if patterns.iter().any(|pat| pat.matches(relative_path)) {
filtered_entries.insert(entry.clone());
if let Some(parent) = std::path::Path::new(relative_path).parent() {
if let Some(parent_entry) =
entries_by_relative_path.get(&parent.join("").to_string_lossy().to_string())
{
filtered_entries.insert(parent_entry.clone());
}
}
}
}
Ok(filtered_entries.into_iter().collect())
}
/// Constructs the local output path for a directory entry based on its remote URL.
///
/// This function maps a remote directory entry to its corresponding local file system
/// path by replacing the remote root directory with the local output directory.
/// It handles URL percent-decoding to ensure proper path construction and maintains
/// the relative directory structure from the remote source.
fn make_output_by_entry(url: Url, output: &Path, entry: DirEntry) -> Result<PathBuf> {
// Get the root directory of the download directory and the output root directory.
let root_dir = url.path().to_string();
@ -936,7 +1037,12 @@ fn make_output_by_entry(url: Url, output: &Path, entry: DirEntry) -> Result<Path
.into())
}
/// get_and_check_dfdaemon_download_client gets a dfdaemon download client and checks its health.
/// Creates and validates a dfdaemon download client with health checking.
///
/// This function establishes a connection to the dfdaemon service via Unix domain socket
/// and performs a health check to ensure the service is running and ready to handle
/// download requests. Only after successful health verification does it return the
/// download client for actual use.
async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownloadClient> {
// Check dfdaemon's health.
let health_client = HealthClient::new_unix(endpoint.clone()).await?;
@ -947,7 +1053,13 @@ async fn get_dfdaemon_download_client(endpoint: PathBuf) -> Result<DfdaemonDownl
Ok(dfdaemon_download_client)
}
/// validate_args validates the command line arguments.
/// Validates command line arguments for consistency and safety requirements.
///
/// This function performs comprehensive validation of the download arguments to ensure
/// they are logically consistent and safe to execute. It checks URL-output path matching,
/// directory existence, file conflicts, piece length constraints, and glob pattern validity.
/// The validation prevents common user errors and potential security issues before
/// starting the download process.
fn validate_args(args: &Args) -> Result<()> {
// If the URL is a directory, the output path should be a directory.
if args.url.path().ends_with('/') && !args.output.is_dir() {
@ -996,9 +1108,42 @@ fn validate_args(args: &Args) -> Result<()> {
}
}
if let Some(ref include_files) = args.include_files {
for include_file in include_files {
if Pattern::new(include_file).is_err() {
return Err(Error::ValidationError(format!(
"invalid glob pattern in include_files: '{}'",
include_file
)));
}
if !is_normal_relative_path(include_file) {
return Err(Error::ValidationError(format!(
"path is not a normal relative path in include_files: '{}'. It must not contain '..', '.', or start with '/'.",
include_file
)));
}
}
}
Ok(())
}
/// Validates that a path string is a normal relative path without unsafe components.
///
/// This function ensures that a given path is both relative (doesn't start with '/')
/// and contains only normal path components. It rejects paths with parent directory
/// references ('..'), current directory references ('.'), or any other special
/// path components that could be used for directory traversal attacks or unexpected
/// file system navigation.
fn is_normal_relative_path(path: &str) -> bool {
let path = Path::new(path);
path.is_relative()
&& path
.components()
.all(|comp| matches!(comp, Component::Normal(_)))
}
#[cfg(test)]
mod tests {
use super::*;
@ -1162,4 +1307,346 @@ mod tests {
let result = make_output_by_entry(url, output, entry);
assert!(result.is_err());
}
#[test]
fn should_filter_entries() {
let test_cases = vec![
(
Url::parse("http://example.com/root/").unwrap(),
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file3.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
vec!["dir/file.txt".to_string()],
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
],
),
(
Url::parse("http://example.com/root/").unwrap(),
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file3.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
vec![
"dir/file.txt".to_string(),
"dir/subdir/file4.png".to_string(),
],
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
),
(
Url::parse("http://example.com/root/").unwrap(),
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file3.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
vec!["dir/subdir/*.png".to_string()],
vec![
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
),
(
Url::parse("http://example.com/root/").unwrap(),
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file3.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
vec!["dir/*".to_string()],
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file3.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
),
(
Url::parse("http://example.com/root/").unwrap(),
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file3.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
vec!["dir/".to_string()],
vec![DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
}],
),
(
Url::parse("http://example.com/root/").unwrap(),
vec![
DirEntry {
url: "http://example.com/root/dir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/file2.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/".to_string(),
content_length: 10,
is_dir: true,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file3.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: "http://example.com/root/dir/subdir/file4.png".to_string(),
content_length: 100,
is_dir: false,
},
],
vec!["test".to_string()],
vec![],
),
(
Url::parse("http://example.com/root/").unwrap(),
vec![
DirEntry {
url: "http://example.com/root/dir/file.txt".to_string(),
content_length: 100,
is_dir: false,
},
DirEntry {
url: " ".to_string(),
content_length: 100,
is_dir: false,
},
],
vec!["dir/file.txt".to_string()],
vec![],
),
];
for (url, entries, include_files, expected_entries) in test_cases {
let result = filter_entries(&url, entries, &include_files);
if result.is_err() {
assert!(matches!(result, Err(Error::ValidationError(_))));
} else {
let filtered_entries = result.unwrap();
assert_eq!(filtered_entries.len(), expected_entries.len());
for filtered_entry in &filtered_entries {
assert!(expected_entries
.iter()
.any(|expected_entry| { expected_entry.url == filtered_entry.url }));
}
}
}
}
}

View File

@ -25,15 +25,16 @@ use crate::metrics::{
};
use crate::resource::{persistent_cache_task, task};
use crate::shutdown;
use dragonfly_api::common::v2::{PersistentCacheTask, Priority, Task, TaskType};
use dragonfly_api::common::v2::{CacheTask, PersistentCacheTask, Priority, Task, TaskType};
use dragonfly_api::dfdaemon::v2::{
dfdaemon_download_client::DfdaemonDownloadClient as DfdaemonDownloadGRPCClient,
dfdaemon_download_server::{
DfdaemonDownload, DfdaemonDownloadServer as DfdaemonDownloadGRPCServer,
},
DeleteTaskRequest, DownloadPersistentCacheTaskRequest, DownloadPersistentCacheTaskResponse,
DownloadTaskRequest, DownloadTaskResponse, Entry, ListTaskEntriesRequest,
ListTaskEntriesResponse, StatPersistentCacheTaskRequest,
DeleteCacheTaskRequest, DeleteTaskRequest, DownloadCacheTaskRequest, DownloadCacheTaskResponse,
DownloadPersistentCacheTaskRequest, DownloadPersistentCacheTaskResponse, DownloadTaskRequest,
DownloadTaskResponse, Entry, ListTaskEntriesRequest, ListTaskEntriesResponse,
StatCacheTaskRequest as DfdaemonStatCacheTaskRequest, StatPersistentCacheTaskRequest,
StatTaskRequest as DfdaemonStatTaskRequest, UploadPersistentCacheTaskRequest,
};
use dragonfly_api::errordetails::v2::Backend;
@ -148,7 +149,7 @@ impl DfdaemonDownloadServer {
// Bind the unix domain socket and set the permissions for the socket.
let uds = UnixListener::bind(&self.socket_path)?;
let perms = std::fs::Permissions::from_mode(0o660);
let perms = std::fs::Permissions::from_mode(0o777);
fs::set_permissions(&self.socket_path, perms).await?;
// TODO(Gaius): RateLimitLayer is not implemented Clone, so we can't use it here.
@ -165,7 +166,7 @@ impl DfdaemonDownloadServer {
.http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT))
.layer(rate_limit_layer)
.add_service(reflection.clone())
.add_service(reflection)
.add_service(health_service)
.add_service(service)
.serve_with_incoming_shutdown(uds_stream, async move {
@ -669,7 +670,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
}
/// stat_task gets the status of the task.
#[instrument(skip_all, fields(host_id, task_id, remote_ip))]
#[instrument(skip_all, fields(host_id, task_id, remote_ip, local_only))]
async fn stat_task(
&self,
request: Request<DfdaemonStatTaskRequest>,
@ -688,6 +689,9 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
// Get the task id from the request.
let task_id = request.task_id;
// Get the local_only flag from the request, default to false.
let local_only = request.local_only;
// Span record the host id and task id.
Span::current().record("host_id", host_id.as_str());
Span::current().record("task_id", task_id.as_str());
@ -695,25 +699,34 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
"remote_ip",
request.remote_ip.clone().unwrap_or_default().as_str(),
);
Span::current().record("local_only", local_only.to_string().as_str());
info!("stat task in download server");
// Collect the stat task metrics.
collect_stat_task_started_metrics(TaskType::Standard as i32);
// Get the task from the scheduler.
let task = self
match self
.task
.stat(task_id.as_str(), host_id.as_str())
.stat(task_id.as_str(), host_id.as_str(), local_only)
.await
.map_err(|err| {
{
Ok(task) => Ok(Response::new(task)),
Err(err) => {
// Collect the stat task failure metrics.
collect_stat_task_failure_metrics(TaskType::Standard as i32);
error!("stat task: {}", err);
Status::internal(err.to_string())
})?;
// Log the error with detailed context.
error!("stat task failed: {}", err);
Ok(Response::new(task))
// Map the error to an appropriate gRPC status.
Err(match err {
ClientError::TaskNotFound(id) => {
Status::not_found(format!("task not found: {}", id))
}
_ => Status::internal(err.to_string()),
})
}
}
}
/// list_tasks lists the tasks.
@ -755,17 +768,6 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
Status::internal(err.to_string())
})?;
let timeout = match request.timeout {
Some(timeout) => Duration::try_from(timeout).map_err(|err| {
// Collect the list tasks failure metrics.
collect_list_task_entries_failure_metrics(TaskType::Standard as i32);
error!("parse timeout: {}", err);
Status::invalid_argument(err.to_string())
})?,
None => self.config.download.piece_timeout,
};
// Head the task entries.
let response = backend
.head(HeadRequest {
@ -777,7 +779,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
Status::internal(err.to_string())
},
)?),
timeout,
timeout: self.config.download.piece_timeout,
client_cert: None,
object_storage: request.object_storage.clone(),
hdfs: request.hdfs.clone(),
@ -1296,6 +1298,39 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
Ok(Response::new(task))
}
/// DownloadCacheTaskStream is the stream of the download cache task response.
type DownloadCacheTaskStream = ReceiverStream<Result<DownloadCacheTaskResponse, Status>>;
/// download_cache_task tells the dfdaemon to download the cache task.
#[instrument(
skip_all,
fields(host_id, task_id, peer_id, url, remote_ip, content_length)
)]
async fn download_cache_task(
&self,
_request: Request<DownloadCacheTaskRequest>,
) -> Result<Response<Self::DownloadCacheTaskStream>, Status> {
todo!();
}
/// stat_cache_task gets the status of the cache task.
#[instrument(skip_all, fields(host_id, task_id, remote_pi, local_only))]
async fn stat_cache_task(
&self,
_request: Request<DfdaemonStatCacheTaskRequest>,
) -> Result<Response<CacheTask>, Status> {
todo!();
}
/// delete_cache_task calls the dfdaemon to delete the cache task.
#[instrument(skip_all, fields(host_id, task_id, remote_ip))]
async fn delete_cache_task(
&self,
_request: Request<DeleteCacheTaskRequest>,
) -> Result<Response<()>, Status> {
todo!();
}
}
/// DfdaemonDownloadClient is a wrapper of DfdaemonDownloadGRPCClient.

View File

@ -24,20 +24,22 @@ use crate::metrics::{
};
use crate::resource::{persistent_cache_task, task};
use crate::shutdown;
use bytesize::MB;
use dragonfly_api::common::v2::{
Host, Network, PersistentCacheTask, Piece, Priority, Task, TaskType,
CacheTask, Host, Network, PersistentCacheTask, Piece, Priority, Task, TaskType,
};
use dragonfly_api::dfdaemon::v2::{
dfdaemon_upload_client::DfdaemonUploadClient as DfdaemonUploadGRPCClient,
dfdaemon_upload_server::{DfdaemonUpload, DfdaemonUploadServer as DfdaemonUploadGRPCServer},
DeletePersistentCacheTaskRequest, DeleteTaskRequest, DownloadPersistentCachePieceRequest,
DeleteCacheTaskRequest, DeletePersistentCacheTaskRequest, DeleteTaskRequest,
DownloadCachePieceRequest, DownloadCachePieceResponse, DownloadCacheTaskRequest,
DownloadCacheTaskResponse, DownloadPersistentCachePieceRequest,
DownloadPersistentCachePieceResponse, DownloadPersistentCacheTaskRequest,
DownloadPersistentCacheTaskResponse, DownloadPieceRequest, DownloadPieceResponse,
DownloadTaskRequest, DownloadTaskResponse, ExchangeIbVerbsQueuePairEndpointRequest,
ExchangeIbVerbsQueuePairEndpointResponse, StatPersistentCacheTaskRequest, StatTaskRequest,
SyncHostRequest, SyncPersistentCachePiecesRequest, SyncPersistentCachePiecesResponse,
SyncPiecesRequest, SyncPiecesResponse, UpdatePersistentCacheTaskRequest,
ExchangeIbVerbsQueuePairEndpointResponse, StatCacheTaskRequest, StatPersistentCacheTaskRequest,
StatTaskRequest, SyncCachePiecesRequest, SyncCachePiecesResponse, SyncHostRequest,
SyncPersistentCachePiecesRequest, SyncPersistentCachePiecesResponse, SyncPiecesRequest,
SyncPiecesResponse, UpdatePersistentCacheTaskRequest,
};
use dragonfly_api::errordetails::v2::Backend;
use dragonfly_client_config::dfdaemon::Config;
@ -48,14 +50,13 @@ use dragonfly_client_core::{
use dragonfly_client_util::{
http::{get_range, hashmap_to_headermap, headermap_to_hashmap},
id_generator::TaskIDParameter,
net::{get_interface_info, Interface},
net::Interface,
};
use opentelemetry::Context;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use sysinfo::Networks;
use tokio::io::AsyncReadExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
@ -67,7 +68,7 @@ use tonic::{
Code, Request, Response, Status,
};
use tower::ServiceBuilder;
use tracing::{error, info, instrument, Instrument, Span};
use tracing::{debug, error, info, instrument, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use url::Url;
@ -87,6 +88,9 @@ pub struct DfdaemonUploadServer {
/// persistent_cache_task is the persistent cache task manager.
persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>,
/// interface is the network interface.
interface: Arc<Interface>,
/// shutdown is used to shutdown the grpc server.
shutdown: shutdown::Shutdown,
@ -102,6 +106,7 @@ impl DfdaemonUploadServer {
addr: SocketAddr,
task: Arc<task::Task>,
persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>,
interface: Arc<Interface>,
shutdown: shutdown::Shutdown,
shutdown_complete_tx: mpsc::UnboundedSender<()>,
) -> Self {
@ -109,6 +114,7 @@ impl DfdaemonUploadServer {
config,
addr,
task,
interface,
persistent_cache_task,
shutdown,
_shutdown_complete: shutdown_complete_tx,
@ -117,16 +123,12 @@ impl DfdaemonUploadServer {
/// run starts the upload server.
pub async fn run(&mut self, grpc_server_started_barrier: Arc<Barrier>) -> ClientResult<()> {
// Initialize the grpc service.
let interface =
get_interface_info(self.config.host.ip.unwrap(), self.config.upload.rate_limit);
let service = DfdaemonUploadGRPCServer::with_interceptor(
DfdaemonUploadServerHandler {
interface,
socket_path: self.config.download.server.socket_path.clone(),
task: self.task.clone(),
persistent_cache_task: self.persistent_cache_task.clone(),
interface: self.interface.clone(),
},
ExtractTracingInterceptor,
);
@ -165,7 +167,7 @@ impl DfdaemonUploadServer {
.http2_keepalive_interval(Some(super::HTTP2_KEEP_ALIVE_INTERVAL))
.http2_keepalive_timeout(Some(super::HTTP2_KEEP_ALIVE_TIMEOUT))
.layer(rate_limit_layer)
.add_service(reflection.clone())
.add_service(reflection)
.add_service(health_service)
.add_service(service)
.serve_with_shutdown(self.addr, async move {
@ -202,9 +204,6 @@ impl DfdaemonUploadServer {
/// DfdaemonUploadServerHandler is the handler of the dfdaemon upload grpc service.
pub struct DfdaemonUploadServerHandler {
/// interface is the network interface.
interface: Option<Interface>,
/// socket_path is the path of the unix domain socket.
socket_path: PathBuf,
@ -213,6 +212,9 @@ pub struct DfdaemonUploadServerHandler {
/// persistent_cache_task is the persistent cache task manager.
persistent_cache_task: Arc<persistent_cache_task::PersistentCacheTask>,
/// interface is the network interface.
interface: Arc<Interface>,
}
/// DfdaemonUploadServerHandler implements the dfdaemon upload grpc service.
@ -637,7 +639,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
}
/// stat_task stats the task.
#[instrument(skip_all, fields(host_id, task_id, remote_ip))]
#[instrument(skip_all, fields(host_id, task_id, remote_ip, local_only))]
async fn stat_task(&self, request: Request<StatTaskRequest>) -> Result<Response<Task>, Status> {
// If the parent context is set, use it as the parent context for the span.
if let Some(parent_ctx) = request.extensions().get::<Context>() {
@ -653,6 +655,9 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Get the task id from the request.
let task_id = request.task_id;
// Get the local_only flag from the request, default to false.
let local_only = request.local_only;
// Span record the host id and task id.
Span::current().record("host_id", host_id.as_str());
Span::current().record("task_id", task_id.as_str());
@ -660,25 +665,34 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
"remote_ip",
request.remote_ip.clone().unwrap_or_default().as_str(),
);
Span::current().record("local_only", local_only.to_string().as_str());
info!("stat task in upload server");
// Collect the stat task metrics.
collect_stat_task_started_metrics(TaskType::Standard as i32);
// Get the task from the scheduler.
let task = self
match self
.task
.stat(task_id.as_str(), host_id.as_str())
.stat(task_id.as_str(), host_id.as_str(), local_only)
.await
.map_err(|err| {
{
Ok(task) => Ok(Response::new(task)),
Err(err) => {
// Collect the stat task failure metrics.
collect_stat_task_failure_metrics(TaskType::Standard as i32);
error!("stat task: {}", err);
Status::internal(err.to_string())
})?;
// Log the error with detailed context.
error!("stat task failed: {}", err);
Ok(Response::new(task))
// Map the error to an appropriate gRPC status.
Err(match err {
ClientError::TaskNotFound(id) => {
Status::not_found(format!("task not found: {}", id))
}
_ => Status::internal(err.to_string()),
})
}
}
}
/// delete_task deletes the task.
@ -988,9 +1002,6 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
Span::current().set_parent(parent_ctx.clone());
};
// DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info.
const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(500);
// Clone the request.
let request = request.into_inner();
@ -1012,42 +1023,42 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
// Get local interface.
let interface = self.interface.clone();
// DEFAULT_HOST_INFO_REFRESH_INTERVAL is the default interval for refreshing the host info.
const DEFAULT_HOST_INFO_REFRESH_INTERVAL: Duration = Duration::from_millis(500);
// Initialize stream channel.
let (out_stream_tx, out_stream_rx) = mpsc::channel(10 * 1024);
tokio::spawn(
async move {
// Initialize sysinfo network.
let mut networks = Networks::new_with_refreshed_list();
// Start the host info update loop.
loop {
// Sleep to calculate the network traffic difference over
// the DEFAULT_HOST_INFO_REFRESH_INTERVAL.
// Wait for the host info refresh interval.
tokio::time::sleep(DEFAULT_HOST_INFO_REFRESH_INTERVAL).await;
// Refresh network information.
networks.refresh();
// Init response.
let mut host = Host::default();
if let Some(interface) = &interface {
if let Some(network_data) = networks.get(&interface.name) {
host.network = Some(Network {
download_rate: network_data.received()
/ DEFAULT_HOST_INFO_REFRESH_INTERVAL.as_secs(),
// Convert bandwidth to bytes per second.
download_rate_limit: interface.bandwidth / 8 * MB,
upload_rate: network_data.transmitted()
/ DEFAULT_HOST_INFO_REFRESH_INTERVAL.as_secs(),
// Convert bandwidth to bytes per second.
upload_rate_limit: interface.bandwidth / 8 * MB,
..Default::default()
});
};
}
// Wait for getting the network data.
let network_data = interface.get_network_data().await;
debug!(
"network data: rx bandwidth {}/{} bps, tx bandwidth {}/{} bps",
network_data.rx_bandwidth.unwrap_or(0),
network_data.max_rx_bandwidth,
network_data.tx_bandwidth.unwrap_or(0),
network_data.max_tx_bandwidth
);
// Send host info.
match out_stream_tx.send(Ok(host.clone())).await {
match out_stream_tx
.send(Ok(Host {
network: Some(Network {
max_rx_bandwidth: network_data.max_rx_bandwidth,
rx_bandwidth: network_data.rx_bandwidth,
max_tx_bandwidth: network_data.max_tx_bandwidth,
tx_bandwidth: network_data.tx_bandwidth,
..Default::default()
}),
..Default::default()
}))
.await
{
Ok(_) => {}
Err(err) => {
error!(
@ -1055,7 +1066,7 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
remote_host_id, err
);
break;
return;
}
};
}
@ -1680,6 +1691,63 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
) -> Result<Response<ExchangeIbVerbsQueuePairEndpointResponse>, Status> {
unimplemented!()
}
/// DownloadCacheTaskStream is the stream of the download cache task response.
type DownloadCacheTaskStream = ReceiverStream<Result<DownloadCacheTaskResponse, Status>>;
/// download_cache_task downloads the cache task.
#[instrument(
skip_all,
fields(host_id, task_id, peer_id, url, remote_ip, content_length)
)]
async fn download_cache_task(
&self,
_request: Request<DownloadCacheTaskRequest>,
) -> Result<Response<Self::DownloadCacheTaskStream>, Status> {
todo!();
}
/// stat_cache_task stats the cache task.
#[instrument(skip_all, fields(host_id, task_id, remote_ip, local_only))]
async fn stat_cache_task(
&self,
_request: Request<StatCacheTaskRequest>,
) -> Result<Response<CacheTask>, Status> {
todo!();
}
/// delete_cache_task deletes the cache task.
#[instrument(skip_all, fields(host_id, task_id, remote_ip))]
async fn delete_cache_task(
&self,
_request: Request<DeleteCacheTaskRequest>,
) -> Result<Response<()>, Status> {
todo!();
}
/// SyncCachePiecesStream is the stream of the sync cache pieces response.
type SyncCachePiecesStream = ReceiverStream<Result<SyncCachePiecesResponse, Status>>;
/// sync_cache_pieces provides the cache piece metadata for parent.
#[instrument(skip_all, fields(host_id, remote_host_id, task_id))]
async fn sync_cache_pieces(
&self,
_request: Request<SyncCachePiecesRequest>,
) -> Result<Response<Self::SyncCachePiecesStream>, Status> {
todo!();
}
/// download_cache_piece provides the cache piece content for parent.
#[instrument(
skip_all,
fields(host_id, remote_host_id, task_id, piece_id, piece_length)
)]
async fn download_cache_piece(
&self,
_request: Request<DownloadCachePieceRequest>,
) -> Result<Response<DownloadCachePieceResponse>, Status> {
todo!();
}
}
/// DfdaemonUploadClient is a wrapper of DfdaemonUploadGRPCClient.

View File

@ -66,19 +66,28 @@ pub const DRAGONFLY_OUTPUT_PATH_HEADER: &str = "X-Dragonfly-Output-Path";
/// For more details refer to https://github.com/dragonflyoss/design/blob/main/systems-analysis/file-download-workflow-with-hard-link/README.md.
pub const DRAGONFLY_FORCE_HARD_LINK_HEADER: &str = "X-Dragonfly-Force-Hard-Link";
/// DRAGONFLY_PIECE_LENGTH is the header key of piece length in http request.
/// DRAGONFLY_PIECE_LENGTH_HEADER is the header key of piece length in http request.
/// If the value is set, the piece length will be used to download the file.
/// Different piece length will generate different task id. The value needs to
/// be set with human readable format and needs to be greater than or equal
/// to 4mib, for example: 4mib, 1gib
pub const DRAGONFLY_PIECE_LENGTH: &str = "X-Dragonfly-Piece-Length";
pub const DRAGONFLY_PIECE_LENGTH_HEADER: &str = "X-Dragonfly-Piece-Length";
/// DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID is the header key of content for calculating task id.
/// If DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID is set, use its value to calculate the task ID.
/// DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID_HEADER is the header key of content for calculating task id.
/// If DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID_HEADER is set, use its value to calculate the task ID.
/// Otherwise, calculate the task ID based on `url`, `piece_length`, `tag`, `application`, and `filtered_query_params`.
pub const DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID: &str =
pub const DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID_HEADER: &str =
"X-Dragonfly-Content-For-Calculating-Task-ID";
/// DRAGONFLY_TASK_DOWNLOAD_FINISHED_HEADER is the response header key to indicate whether the task download finished.
/// When the task download is finished, the response will include this header with the value `"true"`,
/// indicating that the download hit the local cache.
pub const DRAGONFLY_TASK_DOWNLOAD_FINISHED_HEADER: &str = "X-Dragonfly-Task-Download-Finished";
/// DRAGONFLY_TASK_ID_HEADER is the response header key of task id. Client will calculate the task ID
/// based on `url`, `piece_length`, `tag`, `application`, and `filtered_query_params`.
pub const DRAGONFLY_TASK_ID_HEADER: &str = "X-Dragonfly-Task-ID";
/// get_tag gets the tag from http header.
pub fn get_tag(header: &HeaderMap) -> Option<String> {
header
@ -193,7 +202,7 @@ pub fn get_force_hard_link(header: &HeaderMap) -> bool {
/// get_piece_length gets the piece length from http header.
pub fn get_piece_length(header: &HeaderMap) -> Option<ByteSize> {
match header.get(DRAGONFLY_PIECE_LENGTH) {
match header.get(DRAGONFLY_PIECE_LENGTH_HEADER) {
Some(piece_length) => match piece_length.to_str() {
Ok(piece_length) => match piece_length.parse::<ByteSize>() {
Ok(piece_length) => Some(piece_length),
@ -214,7 +223,7 @@ pub fn get_piece_length(header: &HeaderMap) -> Option<ByteSize> {
/// get_content_for_calculating_task_id gets the content for calculating task id from http header.
pub fn get_content_for_calculating_task_id(header: &HeaderMap) -> Option<String> {
header
.get(DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID)
.get(DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID_HEADER)
.and_then(|content| content.to_str().ok())
.map(|content| content.to_string())
}
@ -359,16 +368,22 @@ mod tests {
#[test]
fn test_get_piece_length() {
let mut headers = HeaderMap::new();
headers.insert(DRAGONFLY_PIECE_LENGTH, HeaderValue::from_static("4mib"));
headers.insert(
DRAGONFLY_PIECE_LENGTH_HEADER,
HeaderValue::from_static("4mib"),
);
assert_eq!(get_piece_length(&headers), Some(ByteSize::mib(4)));
let empty_headers = HeaderMap::new();
assert_eq!(get_piece_length(&empty_headers), None);
headers.insert(DRAGONFLY_PIECE_LENGTH, HeaderValue::from_static("invalid"));
headers.insert(
DRAGONFLY_PIECE_LENGTH_HEADER,
HeaderValue::from_static("invalid"),
);
assert_eq!(get_piece_length(&headers), None);
headers.insert(DRAGONFLY_PIECE_LENGTH, HeaderValue::from_static("0"));
headers.insert(DRAGONFLY_PIECE_LENGTH_HEADER, HeaderValue::from_static("0"));
assert_eq!(get_piece_length(&headers), Some(ByteSize::b(0)));
}
@ -376,7 +391,7 @@ mod tests {
fn test_get_content_for_calculating_task_id() {
let mut headers = HeaderMap::new();
headers.insert(
DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID,
DRAGONFLY_CONTENT_FOR_CALCULATING_TASK_ID_HEADER,
HeaderValue::from_static("test-content"),
);
assert_eq!(

View File

@ -231,7 +231,7 @@ impl Proxy {
}
/// handler handles the request from the client.
#[instrument(skip_all, fields(uri, method, remote_ip))]
#[instrument(skip_all, fields(url, method, remote_ip))]
pub async fn handler(
config: Arc<Config>,
task: Arc<Task>,
@ -241,8 +241,8 @@ pub async fn handler(
server_ca_cert: Arc<Option<Certificate>>,
remote_ip: std::net::IpAddr,
) -> ClientResult<Response> {
// Span record the uri and method.
Span::current().record("uri", request.uri().to_string().as_str());
// Span record the url and method.
Span::current().record("url", request.uri().to_string().as_str());
Span::current().record("method", request.method().as_str());
Span::current().record("remote_ip", remote_ip.to_string().as_str());
@ -556,7 +556,7 @@ async fn upgraded_tunnel(
/// upgraded_handler handles the upgraded https request from the client.
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all, fields(uri, method))]
#[instrument(skip_all, fields(url, method))]
pub async fn upgraded_handler(
config: Arc<Config>,
task: Arc<Task>,
@ -567,8 +567,8 @@ pub async fn upgraded_handler(
dfdaemon_download_client: DfdaemonDownloadClient,
registry_cert: Arc<Option<Vec<CertificateDer<'static>>>>,
) -> ClientResult<Response> {
// Span record the uri and method.
Span::current().record("uri", request.uri().to_string().as_str());
// Span record the url and method.
Span::current().record("url", request.uri().to_string().as_str());
Span::current().record("method", request.method().as_str());
// Authenticate the request with the basic auth.
@ -768,7 +768,10 @@ async fn proxy_via_dfdaemon(
// Construct the response.
let mut response = Response::new(boxed_body);
*response.headers_mut() = make_response_headers(download_task_started_response.clone())?;
*response.headers_mut() = make_response_headers(
message.task_id.as_str(),
download_task_started_response.clone(),
)?;
*response.status_mut() = http::StatusCode::OK;
// Return the response if the client return the first piece.
@ -1099,7 +1102,6 @@ fn make_download_task_request(
hdfs: None,
is_prefetch: false,
need_piece_content: false,
load_to_cache: false,
force_hard_link: header::get_force_hard_link(&header),
content_for_calculating_task_id: header::get_content_for_calculating_task_id(&header),
remote_ip: Some(remote_ip.to_string()),
@ -1151,6 +1153,7 @@ fn make_download_url(
/// make_response_headers makes the response headers.
fn make_response_headers(
task_id: &str,
mut download_task_started_response: DownloadTaskStartedResponse,
) -> ClientResult<hyper::header::HeaderMap> {
// Insert the content range header to the response header.
@ -1171,6 +1174,18 @@ fn make_response_headers(
);
};
if download_task_started_response.is_finished {
download_task_started_response.response_header.insert(
header::DRAGONFLY_TASK_DOWNLOAD_FINISHED_HEADER.to_string(),
"true".to_string(),
);
}
download_task_started_response.response_header.insert(
header::DRAGONFLY_TASK_ID_HEADER.to_string(),
task_id.to_string(),
);
hashmap_to_headermap(&download_task_started_response.response_header)
}

View File

@ -144,6 +144,11 @@ impl Piece {
self.storage.get_piece(piece_id)
}
/// get_all gets all pieces of a task from the local storage.
pub fn get_all(&self, task_id: &str) -> Result<Vec<metadata::Piece>> {
self.storage.get_pieces(task_id)
}
/// calculate_interested calculates the interested pieces by content_length and range.
pub fn calculate_interested(
&self,
@ -407,7 +412,6 @@ impl Piece {
length: u64,
parent: piece_collector::CollectedParent,
is_prefetch: bool,
load_to_cache: bool,
) -> Result<metadata::Piece> {
// Span record the piece_id.
Span::current().record("piece_id", piece_id);
@ -472,7 +476,6 @@ impl Piece {
digest.as_str(),
parent.id.as_str(),
&mut reader,
load_to_cache,
self.config.storage.write_piece_timeout,
)
.await
@ -510,7 +513,6 @@ impl Piece {
length: u64,
request_header: HeaderMap,
is_prefetch: bool,
load_to_cache: bool,
object_storage: Option<ObjectStorage>,
hdfs: Option<Hdfs>,
) -> Result<metadata::Piece> {
@ -636,7 +638,6 @@ impl Piece {
offset,
length,
&mut response.reader,
load_to_cache,
self.config.storage.write_piece_timeout,
)
.await

View File

@ -20,7 +20,8 @@ use crate::metrics::{
collect_backend_request_started_metrics,
};
use dragonfly_api::common::v2::{
Download, Hdfs, ObjectStorage, Peer, Piece, Task as CommonTask, TrafficType,
Download, Hdfs, ObjectStorage, Peer, Piece, SizeScope, Task as CommonTask, TaskType,
TrafficType,
};
use dragonfly_api::dfdaemon::{
self,
@ -48,6 +49,7 @@ use dragonfly_client_util::{
id_generator::IDGenerator,
};
use reqwest::header::HeaderMap;
use std::collections::HashMap;
use std::path::Path;
use std::sync::{
atomic::{AtomicBool, Ordering},
@ -246,13 +248,7 @@ impl Task {
let task = self
.storage
.download_task_started(
id,
piece_length,
content_length,
response.http_header,
request.load_to_cache,
)
.download_task_started(id, piece_length, content_length, response.http_header)
.await;
// Attempt to create a hard link from the task file to the output path.
@ -392,6 +388,7 @@ impl Task {
range: request.range,
response_header: task.response_header.clone(),
pieces,
is_finished: task.is_finished(),
},
),
),
@ -737,7 +734,6 @@ impl Task {
remaining_interested_pieces.clone(),
request.is_prefetch,
request.need_piece_content,
request.load_to_cache,
download_progress_tx.clone(),
in_stream_tx.clone(),
)
@ -981,7 +977,6 @@ impl Task {
interested_pieces: Vec<metadata::Piece>,
is_prefetch: bool,
need_piece_content: bool,
load_to_cache: bool,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
in_stream_tx: Sender<AnnouncePeerRequest>,
) -> ClientResult<Vec<metadata::Piece>> {
@ -1042,7 +1037,6 @@ impl Task {
finished_pieces: Arc<Mutex<Vec<metadata::Piece>>>,
is_prefetch: bool,
need_piece_content: bool,
load_to_cache: bool,
) -> ClientResult<metadata::Piece> {
// Limit the concurrent piece count.
let _permit = semaphore.acquire().await.unwrap();
@ -1063,7 +1057,6 @@ impl Task {
length,
parent.clone(),
is_prefetch,
load_to_cache,
)
.await
.map_err(|err| {
@ -1197,7 +1190,6 @@ impl Task {
finished_pieces.clone(),
is_prefetch,
need_piece_content,
load_to_cache,
)
.in_current_span(),
);
@ -1311,7 +1303,6 @@ impl Task {
request_header: HeaderMap,
is_prefetch: bool,
need_piece_content: bool,
load_to_cache: bool,
piece_manager: Arc<piece::Piece>,
semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
@ -1335,7 +1326,6 @@ impl Task {
length,
request_header,
is_prefetch,
load_to_cache,
object_storage,
hdfs,
)
@ -1440,7 +1430,6 @@ impl Task {
request_header.clone(),
request.is_prefetch,
request.need_piece_content,
request.load_to_cache,
self.piece.clone(),
semaphore.clone(),
download_progress_tx.clone(),
@ -1707,7 +1696,6 @@ impl Task {
length: u64,
request_header: HeaderMap,
is_prefetch: bool,
load_to_cache: bool,
piece_manager: Arc<piece::Piece>,
semaphore: Arc<Semaphore>,
download_progress_tx: Sender<Result<DownloadTaskResponse, Status>>,
@ -1730,7 +1718,6 @@ impl Task {
length,
request_header,
is_prefetch,
load_to_cache,
object_storage,
hdfs,
)
@ -1789,7 +1776,6 @@ impl Task {
interested_piece.length,
request_header.clone(),
request.is_prefetch,
request.load_to_cache,
self.piece.clone(),
semaphore.clone(),
download_progress_tx.clone(),
@ -1835,7 +1821,74 @@ impl Task {
/// stat_task returns the task metadata.
#[instrument(skip_all)]
pub async fn stat(&self, task_id: &str, host_id: &str) -> ClientResult<CommonTask> {
pub async fn stat(
&self,
task_id: &str,
host_id: &str,
local_only: bool,
) -> ClientResult<CommonTask> {
if local_only {
let Some(task_metadata) = self.storage.get_task(task_id).inspect_err(|err| {
error!("get task {} from local storage error: {:?}", task_id, err);
})?
else {
return Err(Error::TaskNotFound(task_id.to_owned()));
};
let piece_metadatas = self.piece.get_all(task_id).inspect_err(|err| {
error!(
"get pieces for task {} from local storage error: {:?}",
task_id, err
);
})?;
let pieces = piece_metadatas
.into_iter()
.filter(|piece| piece.is_finished())
.map(|piece| {
// The traffic_type indicates whether the first download was from the source or hit the remote peer cache.
// If the parent_id exists, the piece was downloaded from a remote peer. Otherwise, it was
// downloaded from the source.
let traffic_type = match piece.parent_id {
None => TrafficType::BackToSource,
Some(_) => TrafficType::RemotePeer,
};
Piece {
number: piece.number,
parent_id: piece.parent_id.clone(),
offset: piece.offset,
length: piece.length,
digest: piece.digest.clone(),
content: None,
traffic_type: Some(traffic_type as i32),
cost: piece.prost_cost(),
created_at: Some(prost_wkt_types::Timestamp::from(piece.created_at)),
}
})
.collect::<Vec<Piece>>();
return Ok(CommonTask {
id: task_metadata.id,
r#type: TaskType::Standard as i32,
url: String::new(),
digest: None,
tag: None,
application: None,
filtered_query_params: Vec::new(),
request_header: HashMap::new(),
content_length: task_metadata.content_length.unwrap_or(0),
piece_count: pieces.len() as u32,
size_scope: SizeScope::Normal as i32,
pieces,
state: String::new(),
peer_count: 0,
has_available_peer: false,
created_at: Some(prost_wkt_types::Timestamp::from(task_metadata.created_at)),
updated_at: Some(prost_wkt_types::Timestamp::from(task_metadata.updated_at)),
});
}
let task = self
.scheduler_client
.stat_task(StatTaskRequest {
@ -1916,7 +1969,7 @@ mod tests {
// Create a task and save it to storage.
let task_id = "test-task-id";
storage
.download_task_started(task_id, 1024, 4096, None, false)
.download_task_started(task_id, 1024, 4096, None)
.await
.unwrap();

View File

@ -102,8 +102,16 @@ pub fn init_tracing(
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::default().add_directive(log_level.into()));
// Enable console subscriber layer for tracing spawn tasks on `127.0.0.1:6669` when log level is TRACE.
let console_subscriber_layer = if log_level == Level::TRACE {
Some(console_subscriber::spawn())
} else {
None
};
let subscriber = Registry::default()
.with(env_filter)
.with(console_subscriber_layer)
.with(file_logging_layer)
.with(stdout_logging_layer);