Compare commits

...

77 Commits

Author SHA1 Message Date
Yan Song ef2033c2e2 builder: support `--parent-bootstrap` for merge
This option allows merging multiple bootstraps of upper layer with
the bootstrap of a parent image, so that we can implement container
commit operation for nydus image.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2023-03-09 10:06:56 +00:00
Yan Song 2a83e06e50 nydusify: enable pigz by default
We should use pigz for supporting parallel gzip decompression, so that
improve the conversion speed when unpack gzip layer for source image.

We still allow users to specify the env `CONTAINERD_DISABLE_PIGZ=1` to
disable the feature when encounter any decompression error.

See 33c0eafb17/archive/compression/compression.go (L261)

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2023-02-28 07:02:07 +00:00
imeoer c69a43eeeb
Merge pull request #1106 from jiangliu/rafs-entry-name-v2.1
rafs: fix a bug in calculate offset for dirent name
2023-02-24 15:35:40 +08:00
Jiang Liu a8ef7ea415 rafs: fix a bug in calculate offset for dirent name
There's a bug in calculate offset and size for RAFS v6 Dirent name,
it will be treat as 0 instead of 4096 if the last block is 4096 bytes.

Fixes: https://github.com/dragonflyoss/image-service/issues/1098

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
2023-02-24 15:29:01 +08:00
imeoer 9eccf9b5fe
Merge pull request #1095 from jiangliu/v2.1-compat
rafs: reserve bits in RafsSuperFlags to be compatible with v2.2
2023-02-19 23:23:37 +08:00
Jiang Liu f6bd98fcdd rafs: reserve bits in RafsSuperFlags to be compatible with v2.2
Reserve bits in RafsSuperFlags so images generated by nydus 2.2 can be
mounted by v2.1.4 and later.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
2023-02-19 23:06:47 +08:00
Jiang Liu 876fb68f15
Merge pull request #1058 from yqleng1987/add-bats-test-for-stable-branch
add e2e testcases for ci for stable branch
2023-02-06 23:18:55 +08:00
Yiqun Leng 99322a0d5a add e2e testcases for ci for stable branch
These cases have been merged into master branch and the difference is
excluding zran format case since stable branch doesn't support zran for now.

Signed-off-by: Yiqun Leng <yqleng@linux.alibaba.com>
2023-02-06 18:42:02 +08:00
Jiang Liu 24c3bb9ab2
Merge pull request #1017 from jiangliu/v6-underflow-2.1
rafs: fix a underflow bug in rafs v6 implementation
2023-01-18 18:27:10 +08:00
Jiang Liu e3621f5397 rafs: fix a underflow bug in rafs v6 implementation
Fix a underflow bug in find_target_block() of RAFS v6. `last` is usize
instead of isize so it may underflow.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
2023-01-18 17:42:54 +08:00
Jiang Liu 1916c76767
Merge pull request #967 from imeoer/stable/v2.1-cherry-pick
[backport] nydusify: some minor fixups
2022-12-19 21:53:01 +08:00
Yan Song 876571ba4e nydusify: fix a http fallback case for build cache
When the `--source/--target` options specified by the users
is targeting the https registry, but `--build-cache` option
is targeting the http registry, nydusify can't fallback to
plain http for build cache registry, it causing a pull/push
failure for the build cache image.

This patch fixed the failure case.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-12-19 09:53:39 +00:00
Yan Song c433c44c93 nydusify: fix panic if only --target be specified
nydusify panics when use `nydusify check --target localhost:5000/library/test:nydus`:

```
INFO[2022-12-19T07:24:02Z] Parsing image localhost:5000/library/test:nydus
INFO[2022-12-19T07:24:02Z] trying next host                              error="failed to do request: Head \"https://localhost:5000/v2/library/test/manifests/nydus\": http: server gave HTTP response to HTTPS client" host="localhost:5000"
INFO[2022-12-19T07:24:02Z] Parsing image localhost:5000/library/test:nydus
INFO[2022-12-19T07:24:02Z] Dumping OCI and Nydus manifests to ./output
INFO[2022-12-19T07:24:02Z] Pulling Nydus bootstrap to output/nydus_bootstrap
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0xb363bd]

goroutine 1 [running]:
github.com/dragonflyoss/image-service/contrib/nydusify/pkg/checker.(*Checker).check(0xc000222500, {0x14544f8, 0xc0000a8000})
	/nydus-rs/contrib/nydusify/pkg/checker/checker.go:160 +0xedd
github.com/dragonflyoss/image-service/contrib/nydusify/pkg/checker.(*Checker).Check(0xc000222500, {0x14544f8, 0xc0000a8000})
	/nydus-rs/contrib/nydusify/pkg/checker/checker.go:88 +0xee
main.main.func2(0xc0000bbe40)
	/nydus-rs/contrib/nydusify/cmd/nydusify.go:608 +0x5b1
github.com/urfave/cli/v2.(*Command).Run(0xc000540fc0, 0xc0000bba80)
	/go/pkg/mod/github.com/urfave/cli/v2@v2.3.0/command.go:163 +0x8b8
github.com/urfave/cli/v2.(*App).RunContext(0xc0004e91e0, {0x14544f8, 0xc0000a8000}, {0xc0000ba040, 0x4, 0x4})
	/go/pkg/mod/github.com/urfave/cli/v2@v2.3.0/app.go:313 +0xb2a
github.com/urfave/cli/v2.(*App).Run(0xc0004e91e0, {0xc0000ba040, 0x4, 0x4})
	/go/pkg/mod/github.com/urfave/cli/v2@v2.3.0/app.go:224 +0x75
main.main()
	/nydus-rs/contrib/nydusify/cmd/nydusify.go:885 +0x8d5d
```

This patch checks the target is nil or not first.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-12-19 09:53:31 +00:00
Changwei Ge 3c649123cc
Merge pull request #961 from mofishzz/stable/v2.1_fix_v6_lookup
rafs: fix overflow panic of rafs v6 lookup
2022-12-15 11:27:09 +08:00
Huang Jianan d9d82e54bd rafs: fix overflow panic of rafs v6 lookup
The directory for v6 is stored in the following way:
...name1name2
The first subdirectory we got here is ".".

When looking for some file whose ascii value is less than "." like
"*", we need to move forward to the block index -1. This caused
uszie's index "pirvot" to attempt to subtract with overflow and then
panic.

Fixes: 50ca1a1 ("rafs: optimize entry search in rafs v6")
Signed-off-by: Huang Jianan <jnhuang@linux.alibaba.com>
2022-12-15 10:55:27 +08:00
Jiang Liu 820b3782e2
Merge pull request #940 from changweige/2.1-fix-graceful-exit
nydusd: register signal hanlder earlier
2022-12-09 10:27:30 +08:00
Jiang Liu e02fd274d3
Merge pull request #939 from changweige/2.1-pick-prefetch-fix
rafs: fix a bug in fs prefetch
2022-12-09 10:26:30 +08:00
Changwei Ge df8a6f59e7 nydusd: register signal hanlder earlier
Otherwise, it loses a window to gracefully exit
by umounting FUSE

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-12-09 10:12:56 +08:00
Jiang Liu 92c536a214 rafs: fix a bug in fs prefetch
There's a bug which skips data chunks at blob tail.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
2022-12-09 09:33:08 +08:00
imeoer 0b286cb3f5
Merge pull request #936 from changweige/2.1-backoff-retry
2.1 backoff retry
2022-12-08 16:56:03 +08:00
Changwei Ge db8760aa1a nydusctl: refine nydusctl general information print messages
Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-12-08 16:38:21 +08:00
Changwei Ge 10accd6284 storage: introduce BackOff delayer to mitigate backend presure
Don't retry immediately since registry can return "too many requests"
error. We'd better be slow to retry.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-12-08 16:34:51 +08:00
Jiang Liu fad6d17130
Merge pull request #914 from imeoer/backport-auto-scheme
[backport] nydusd: automatically retry registry http scheme
2022-12-02 15:51:56 +08:00
Yan Song 6f7c8e5a20 storage: enhance retryable registry http scheme
Check the tls connection error with `wrong version number` keywords,
it's more reliable than the specific error code.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-12-02 06:59:53 +00:00
Wenyu Huang 7e4e33becf nydusd: automatically retry registry http scheme
Signed-off-by: Wenyu Huang <huangwenyuu@outlook.com>
2022-12-02 06:59:45 +00:00
imeoer f3580b581f
Merge pull request #899 from changweige/2.1-enrich-nydusctl
Let nydusd show prefetch bandwidth and latency
2022-11-30 14:04:13 +08:00
Changwei Ge b2fa20d5f8 nydusctl: show prefetch latency and bandwidth
Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-30 13:46:34 +08:00
Changwei Ge 5ced20e5ce nydusctl: adapt renaming prefetch_mr_count to prefetch_requests_count
Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-30 13:46:34 +08:00
Jiang Liu 2eb90d9eb9
Merge pull request #892 from changweige/2.1-prefetch-metrcis
2.1 prefetch metrics
2022-11-30 12:17:03 +08:00
Jiang Liu 4119e1c34f
Merge pull request #887 from changweige/2.1-v6-blob-prefetch
rafs: prefetch based on blob chunks rather than files
2022-11-29 10:44:16 +08:00
imeoer f2e8a9b5e2
Merge pull request #898 from changweige/2.1-port-nydusify
2.1 port nydusify
2022-11-28 11:53:26 +08:00
Changwei Ge 852bdc2aab nydusify: add a parameter to change chunk size
Public the parameter to end users.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-28 11:34:19 +08:00
Changwei Ge fb0e8d13d8 nydusify: add CLI parameter --compressor to control nydus-image
It has been proved that zstd has a smaller image size. We
should provide user a option to use zstd as nydus image compressor
to reduce image size.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-28 11:34:15 +08:00
Changwei Ge 9b0a538d83 rafs: prefetch based on blob chunks rather than files
Perform different policy for v5 format and v6 format as rafs v6's blobs are capable to
to download chunks and decompress them all by themselves. For rafs v6, directly perform
chunk based full prefetch to reduce requests to container registry and
P2P cluster.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-28 11:19:43 +08:00
Changwei Ge c7b3f89b2e metrics: rename prefetch_mr_count to prefetch_requests_count
Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-28 10:41:00 +08:00
Changwei Ge fc69c331ac metrics/cache: add more prefetch related metrics
Record prefech request average latency.
Calculate prefetch average bandwidth.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-28 10:29:39 +08:00
Changwei Ge bbcb0bffa3 metrics: add method set() to initialize the metric
Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-24 12:11:19 +08:00
Jiang Liu 3efd75ae6a
Merge pull request #880 from changweige/2.1-fix-frequent-retry
fix too frequent retry
2022-11-22 18:48:19 +08:00
Changwei Ge 8684dac117 storage: fix too frequent retry when blob prefetch fails
tick() will complete when the next instant reaches which is
a very short time rather than 1 second.

In addition limit the total retry times

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-22 14:30:02 +08:00
Changwei Ge aa989d0264 storage: change error type if meta file is not found
ENOENT would be more suggestive.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-22 14:15:57 +08:00
Jiang Liu b8460d14d4
Merge pull request #877 from changweige/port-update-uhttp
cargo: update version of dbs-uhttp
2022-11-21 15:34:15 +08:00
imeoer 48a7a74143
Merge pull request #872 from changweige/port-nydusify-version
nydusify: beautify version print message of nydusify
2022-11-21 14:50:54 +08:00
Changwei Ge 8d05ba289a cargo: update version of dbs-uhttp
dbs-uhttp has fixed the problem that http client
get EBUSY error when fetching body data from API server.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-21 14:49:33 +08:00
Changwei Ge f47355d376 nydusify: fix a typo in its version message
It should be Revision

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-21 14:48:24 +08:00
Changwei Ge b8d57bda3d nydusify: beautify version print message of nydusify
Print more infomations on git version, reversion and golang version

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-18 09:27:18 +08:00
Peng Tao 4d2c95793b
Merge pull request #870 from sctb512/backport-fix-print-auth
storage: fix registry to avoid print bearer auth
2022-11-17 18:02:37 +08:00
Bin Tang 4f3da68db0 storage: fix registry to avoid print bearer auth
Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-17 17:56:09 +08:00
imeoer 522791b4b3
Merge pull request #842 from sctb512/backport-fix-mirror-health-check
storage: remove unused code for refreshing registry tokens
2022-11-07 10:04:02 +08:00
Bin Tang ad8b9a7f96 storage: remove unused code for refreshing registry tokens
There is no need to change 'grant_type' for refreshing registry tokens.
Because we use the URL with cached 'grant_type' can get token as well.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-06 22:12:08 +08:00
imeoer 2fd7070bf7
Merge pull request #839 from imeoer/stable/v2.1-cherry-pick
[backport to stable/v2.1] storage: add mirror health checking support
2022-11-04 21:35:35 +08:00
Bin Tang a514f66851 storage: fix syntax for mirror health checking
Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-04 13:27:47 +00:00
Bin Tang 34d84cac91 storage: refresh token to avoid forwarding to P2P/dragonfly
Forward 401 response to P2P/dragonfly will affect performance.
When there is a mirror that auth_through false, we refresh the token regularly
to avoid forwarding the 401 response to mirror.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-04 13:27:42 +00:00
Bin Tang 41fefcdbae storage: add mirror health checking support
Currently, the mirror is set to unavailable if the failed times reach failure_limit.
We added mirror health checking, which will recover unavailable mirror server.
The failure_limit indicates the failed time at which the mirror is set to unavailable.
The health_check_interval indicates the time interval to recover the unavailable mirror.
The ping_url is the endpoint to check mirror server health.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-04 13:27:35 +00:00
imeoer 29a9af49a4
Merge pull request #838 from jiangliu/v2.1.1-pub2
prepare for publishing to crates.io
2022-11-04 21:25:17 +08:00
Jiang Liu 2496bc98f3 release: prepare for publishing to crates.io
Prepare for publishing to crates.io.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
2022-11-04 21:03:40 +08:00
imeoer 36b4edb638
Merge pull request #834 from imeoer/stable-update-release
action: update release notes for download mirror
2022-11-04 10:26:49 +08:00
Yan Song dd0a0d8522 action: update release notes for download mirror
Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-11-04 02:25:59 +00:00
imeoer 29af7a1267
Merge pull request #830 from changweige/backport-nydusify-drop-label
nydusify: drop label "nydus-blob-ids" from meta layer
2022-11-03 14:04:16 +08:00
Changwei Ge 07788809a2 nydusify: drop label "nydus-blob-ids" from meta layer
Image with layers more than 64 can't be pulled by containerd
since the label is exceeding the label size limitation 4096bytes.

We should figure out another way to do GC in nydus-snapshotter

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-03 13:55:35 +08:00
imeoer 405c79de17
Merge pull request #828 from changweige/nydusify-backport-v2.1
Backport 3 patches for stable/v2.1
2022-11-03 09:54:25 +08:00
泰友 c8b21e3529 fix: miss oss file of nydusify packer
Reproduction:

Prepare configuration file used for pack command. {
"bucket_name": "XXX",
"endpoint": "XXX",
"access_key_id": "XXX",
"access_key_secret": "XXX",
"meta_prefix": "nydus_rund_sidecar_meta",
"blob_prefix": "blobs"
}

Pack by nydusify sudo contrib/nydusify/cmd/nydusify pack
--source-dir test
--output-dir tmp
--name ccx-test
--backend-push
--backend-config-file backend-config.json
--backend-type oss
--nydus-image target/debug/nydus-image

Miss blob file and meta file in oss

Problem:

Forgot to CompleteMultipartUpload after chunk uploading.

Fix:

CompleteMultipartUpload to complete uploading.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
2022-11-02 19:01:22 +08:00
泰友 67e0cc6f32 refact: use specified object prefix and meta prefix directly
issue: https://github.com/dragonflyoss/image-service/issues/608

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
2022-11-02 19:01:17 +08:00
泰友 121a108ac9 fix: nydusify pack fail
Reproduction
1. Prepare configuration file used for pack command.
{
    "bucket_name": "XXX",
    "endpoint": "XXX",
    "access_key_id": "XXX",
    "access_key_secret": "XXX",
    "meta_prefix": "nydus_rund_sidecar_meta",
    "blob_prefix": "blobs"
}

2. Pack by nydusify
sudo contrib/nydusify/cmd/nydusify pack \
--source-dir test \
--output-dir tmp \
--name ccx-test \
--backend-push \
--backend-config-file backend-config.json \
--backend-type oss \
--nydus-image target/debug/nydus-image

3. Got error
FATA[2022-10-08T18:06:46+08:00] failed to push pack result to remote: failed to put metafile to remote: split file by part size: open tmp/tmp/ccx-test.meta: no such file or directory

Problem
The path of bootstrap file, which is to upload, is wrong.

Fix
Use imageName as req.Meta, which is bootstrap file to upload.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
2022-11-02 18:58:03 +08:00
Jiang Liu e9a774c2ee
Merge pull request #805 from changweige/pytest-stop-v2
action/nydus-test: stop on the first test failure
2022-10-20 14:17:21 +08:00
Changwei Ge 7975d09dc3 action/nydus-test: stop on the first test failure
By default, pytest will continue executing test even
current test fails. It's hard to tell what to happen
on such a environment. And it makes it hard to investigate
the first failed case.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-10-20 13:35:01 +08:00
Jiang Liu 8c9c73b5b7
Merge pull request #804 from imeoer/bring-auto-version
release: update version on build automatically
2022-10-19 20:18:47 +08:00
Yan Song c7eaa2e858 release: update version on build automatically
We only need to git tag to release a version without modifying
the version field in Cargo.toml and Cargo.lock.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-10-19 12:14:28 +00:00
Changwei Ge 6a0bef4ce6
Merge pull request #799 from changweige/backport-patches
Backport some patches for stable/v2.1
2022-10-18 15:18:25 +08:00
Yan Song 8a32d5b61e nydusify: fix overlay error for image with single layer
Nydusify check subcommand will check the consistency of
OCI image and nydus image by mounting (overlayfs or nydusd).

For the OCI image with a single layer, we should use bind
mount instead of overlay to mount rootfs, otherwise an error
will be thrown like:

```
wrong fs type, bad option, bad superblock on overlay, missing
codepage or helper program, or other error.
```

This commit also refine the codes for image.Mount/image.Umount.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-10-18 14:48:00 +08:00
Bin Tang 5ac9831130 fix mirror's performance issue
In some scenarios(e.g. P2P/Dragonfly), sending an authorization request
to the mirror will cause performance loss. We add parameter
auth_through. When auth_through is false, nydusd will directly send
non-authorization request to original registry.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-10-18 14:47:39 +08:00
Xin Yin c75d3fbfcf storage: retry timeout chunks for fscache ondemand path
for fscache ondemand path, if some requested chunks are set to pending by
prefetch threads, and wait them timeout, will casue EIO to container side.

retry the timeout chunks on ondemand path, minimize EIOs.

Signed-off-by: Xin Yin <yinxin.x@bytedance.com>
2022-10-18 14:47:14 +08:00
Jiang Liu 91d26745e2
Merge pull request #785 from sctb512/rafsv6-file-parent
nydus-image: fix inspect to get correct path of rafs v6 file
2022-10-17 10:05:32 +08:00
Jiang Liu a51a7185f1
Merge pull request #793 from changweige/fix-v5-prefetch-table
nydus-image/v5: prefetch table should contain inode numbers rather its index
2022-10-14 21:49:40 +08:00
Changwei Ge afaf75cfff nydus-image/v5: prefetch table should contain inode numbers rather its
index

Nydusd is performing prefetch by finding all inodes matching its
inode number.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-10-14 16:18:56 +08:00
Bin Tang c28585f06f fix inspect to get correct path of rafs v6 file
Because rafs v6 doesn't support get_parent, the prefetch and icheck
command of inspect will cause error. We fixed it by handling
get_file_name get_file_name and path_from_ino for rafs v6 files
separately. This commit does not affect the rafs core code.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-10-14 15:29:19 +08:00
Changwei Ge fd588c918f
Merge pull request #789 from changweige/port-2.1-enlarge-fuse-threads-num
nydusd: enlarge default fuse server threads
2022-10-11 10:37:39 +08:00
Changwei Ge 3b15cf50a5 nydusd: enlarge default fuse server threads
Now the default value is only 1, it affacts performance.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-10-11 10:36:18 +08:00
63 changed files with 1471 additions and 459 deletions

View File

@ -102,11 +102,9 @@ jobs:
"target": "musl"
}
EOF
- name: run test_api
- name: run e2e tests
run: |
cd /home/runner/work/image-service/image-service/contrib/nydus-test
sudo mkdir -p /blobdir
sudo python3 nydus_test_config.py --dist fs_structure.yaml
sudo pytest -vs --durations=0 functional-test/test_api.py \
functional-test/test_nydus.py \
functional-test/test_layered_image.py
sudo pytest -vs -x --durations=0 functional-test/test_api.py functional-test/test_nydus.py functional-test/test_layered_image.py

View File

@ -205,7 +205,7 @@ jobs:
with:
name: "Nydus Image Service ${{ env.tag }}"
body: |
Mirror (update in 10 min): https://registry.npmmirror.com/binary.html?path=nydus/${{ env.tag }}/
Binaries download mirror (sync within a few hours): https://registry.npmmirror.com/binary.html?path=nydus/${{ env.tag }}/
generate_release_notes: true
files: |
${{ env.tarballs }}

20
Cargo.lock generated
View File

@ -256,9 +256,9 @@ dependencies = [
[[package]]
name = "dbs-uhttp"
version = "0.3.1"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fd0544fe7ba81fa8deb8800843836d279a81b051e2e8ab046fe1b0cb096c1cc"
checksum = "bcab9b457bf9cac784c38ad87a37eb15dad06e72751acdd556e442b3aa4b7248"
dependencies = [
"libc",
"mio",
@ -928,7 +928,7 @@ dependencies = [
[[package]]
name = "nydus-api"
version = "0.1.1"
version = "0.1.2"
dependencies = [
"dbs-uhttp",
"http",
@ -947,7 +947,7 @@ dependencies = [
[[package]]
name = "nydus-app"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"flexi_logger",
"libc",
@ -962,7 +962,7 @@ dependencies = [
[[package]]
name = "nydus-blobfs"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"fuse-backend-rs",
"libc",
@ -978,7 +978,7 @@ dependencies = [
[[package]]
name = "nydus-error"
version = "0.2.1"
version = "0.2.2"
dependencies = [
"backtrace",
"httpdate",
@ -990,7 +990,7 @@ dependencies = [
[[package]]
name = "nydus-rafs"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"anyhow",
"arc-swap",
@ -1022,7 +1022,7 @@ dependencies = [
[[package]]
name = "nydus-rs"
version = "2.1.0-rc.3.1"
version = "0.0.0-git"
dependencies = [
"anyhow",
"base64",
@ -1068,7 +1068,7 @@ dependencies = [
[[package]]
name = "nydus-storage"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"anyhow",
"arc-swap",
@ -1100,7 +1100,7 @@ dependencies = [
[[package]]
name = "nydus-utils"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"blake3",
"flate2",

View File

@ -1,6 +1,7 @@
[package]
name = "nydus-rs"
version = "2.1.0-rc.3.1"
# will be overridden by real git tag during cargo build
version = "0.0.0-git"
description = "Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-api"
version = "0.1.1"
version = "0.1.2"
description = "APIs for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -423,11 +423,41 @@ impl Default for ProxyConfig {
/// Configuration for mirror.
#[derive(Clone, Deserialize, Serialize, Debug)]
#[serde(default)]
pub struct MirrorConfig {
/// Mirror server URL, for example http://127.0.0.1:65001
/// Mirror server URL, for example http://127.0.0.1:65001.
pub host: String,
/// HTTP request headers to be passed to mirror server
pub headers: Option<HashMap<String, String>>,
/// HTTP request headers to be passed to mirror server.
#[serde(default)]
pub headers: HashMap<String, String>,
/// Whether the authorization process is through mirror? default false.
/// true: authorization through mirror, e.g. Using normal registry as mirror.
/// false: authorization through original registry,
/// e.g. when using Dragonfly server as mirror, authorization through it will affect performance.
#[serde(default)]
pub auth_through: bool,
/// Interval of mirror health checking, in seconds.
#[serde(default = "default_check_interval")]
pub health_check_interval: u64,
/// Failure count for which mirror is considered unavailable.
#[serde(default = "default_failure_limit")]
pub failure_limit: u8,
/// Ping URL to check mirror server health.
#[serde(default)]
pub ping_url: String,
}
impl Default for MirrorConfig {
fn default() -> Self {
Self {
host: String::new(),
headers: HashMap::new(),
auth_through: false,
health_check_interval: 5,
failure_limit: 5,
ping_url: String::new(),
}
}
}
#[derive(Debug)]
@ -944,6 +974,14 @@ fn default_http_timeout() -> u32 {
5
}
fn default_check_interval() -> u64 {
5
}
fn default_failure_limit() -> u8 {
5
}
fn default_work_dir() -> String {
".".to_string()
}

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-app"
version = "0.3.0"
version = "0.3.1"
authors = ["The Nydus Developers"]
description = "Application framework for Nydus Image Service"
readme = "README.md"

View File

@ -39,7 +39,7 @@ use nydus_app::{BuildTimeInfo, setup_logging};
fn main() -> Result<()> {
let level = cmd.value_of("log-level").unwrap().parse().unwrap();
let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
let (bti_string, build_info) = BuildTimeInfo::dump();
let _cmd = App::new("")
.version(bti_string.as_str())
.author(crate_authors!())

View File

@ -28,7 +28,17 @@ fn get_git_commit_hash() -> String {
return commit.to_string();
}
}
"Unknown".to_string()
"unknown".to_string()
}
fn get_git_commit_version() -> String {
let tag = Command::new("git").args(&["describe", "--tags"]).output();
if let Ok(tag) = tag {
if let Some(tag) = String::from_utf8_lossy(&tag.stdout).lines().next() {
return tag.to_string();
}
}
"unknown".to_string()
}
fn main() {
@ -43,10 +53,12 @@ fn main() {
.format(&time::format_description::well_known::Iso8601::DEFAULT)
.unwrap();
let git_commit_hash = get_git_commit_hash();
let git_commit_version = get_git_commit_version();
println!("cargo:rerun-if-changed=../git/HEAD");
println!("cargo:rustc-env=RUSTC_VERSION={}", rustc_ver);
println!("cargo:rustc-env=PROFILE={}", profile);
println!("cargo:rustc-env=BUILT_TIME_UTC={}", build_time);
println!("cargo:rustc-env=GIT_COMMIT_HASH={}", git_commit_hash);
println!("cargo:rustc-env=GIT_COMMIT_VERSION={}", git_commit_version);
}

View File

@ -22,7 +22,7 @@
//!
//! fn main() -> Result<()> {
//! let level = cmd.value_of("log-level").unwrap().parse().unwrap();
//! let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
//! let (bti_string, build_info) = BuildTimeInfo::dump();
//! let _cmd = App::new("")
//! .version(bti_string.as_str())
//! .author(crate_authors!())
@ -65,14 +65,15 @@ pub mod built_info {
pub const PROFILE: &str = env!("PROFILE");
pub const RUSTC_VERSION: &str = env!("RUSTC_VERSION");
pub const BUILT_TIME_UTC: &str = env!("BUILT_TIME_UTC");
pub const GIT_COMMIT_VERSION: &str = env!("GIT_COMMIT_VERSION");
pub const GIT_COMMIT_HASH: &str = env!("GIT_COMMIT_HASH");
}
/// Dump program build and version information.
pub fn dump_program_info(prog_version: &str) {
pub fn dump_program_info() {
info!(
"Program Version: {}, Git Commit: {:?}, Build Time: {:?}, Profile: {:?}, Rustc Version: {:?}",
prog_version,
built_info::GIT_COMMIT_VERSION,
built_info::GIT_COMMIT_HASH,
built_info::BUILT_TIME_UTC,
built_info::PROFILE,
@ -91,10 +92,10 @@ pub struct BuildTimeInfo {
}
impl BuildTimeInfo {
pub fn dump(package_ver: &str) -> (String, Self) {
pub fn dump() -> (String, Self) {
let info_string = format!(
"\rVersion: \t{}\nGit Commit: \t{}\nBuild Time: \t{}\nProfile: \t{}\nRustc: \t\t{}\n",
package_ver,
built_info::GIT_COMMIT_VERSION,
built_info::GIT_COMMIT_HASH,
built_info::BUILT_TIME_UTC,
built_info::PROFILE,
@ -102,7 +103,7 @@ impl BuildTimeInfo {
);
let info = Self {
package_ver: package_ver.to_string(),
package_ver: built_info::GIT_COMMIT_VERSION.to_string(),
git_commit: built_info::GIT_COMMIT_HASH.to_string(),
build_time: built_info::BUILT_TIME_UTC.to_string(),
profile: built_info::PROFILE.to_string(),

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-blobfs"
version = "0.1.0"
version = "0.1.1"
description = "Blob object file system for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -1,22 +1,27 @@
GIT_COMMIT := $(shell git rev-list -1 HEAD)
BUILD_TIME := $(shell date -u +%Y%m%d.%H%M)
PACKAGES ?= $(shell go list ./... | grep -v /vendor/)
GOARCH ?= amd64
GOARCH ?= $(shell go env GOARCH)
GOPROXY ?= https://goproxy.io
ifdef GOPROXY
PROXY := GOPROXY=${GOPROXY}
endif
# Used to populate variables in version package.
BUILD_TIMESTAMP=$(shell date '+%Y-%m-%dT%H:%M:%S')
VERSION=$(shell git describe --match 'v[0-9]*' --dirty='.m' --always --tags)
REVISION=$(shell git rev-parse HEAD)$(shell if ! git diff --no-ext-diff --quiet --exit-code; then echo .m; fi)
RELEASE_INFO = -X main.revision=${REVISION} -X main.gitVersion=${VERSION} -X main.buildTime=${BUILD_TIMESTAMP}
.PHONY: all build release plugin test clean build-smoke
all: build
build:
@CGO_ENABLED=0 ${PROXY} GOOS=linux GOARCH=${GOARCH} go build -ldflags '-X main.versionGitCommit=${GIT_COMMIT} -X main.versionBuildTime=${BUILD_TIME}' -gcflags=all="-N -l" -o ./cmd ./cmd/nydusify.go
@CGO_ENABLED=0 ${PROXY} GOOS=linux GOARCH=${GOARCH} go build -ldflags '${RELEASE_INFO}' -gcflags=all="-N -l" -o ./cmd ./cmd/nydusify.go
release:
@CGO_ENABLED=0 ${PROXY} GOOS=linux GOARCH=${GOARCH} go build -ldflags '-X main.versionGitCommit=${GIT_COMMIT} -X main.versionBuildTime=${BUILD_TIME} -s -w -extldflags "-static"' -o ./cmd ./cmd/nydusify.go
@CGO_ENABLED=0 ${PROXY} GOOS=linux GOARCH=${GOARCH} go build -ldflags '${RELEASE_INFO} -s -w -extldflags "-static"' -o ./cmd ./cmd/nydusify.go
plugin:
@CGO_ENABLED=0 ${PROXY} GOOS=linux GOARCH=${GOARCH} go build -ldflags '-s -w -extldflags "-static"' -o nydus-hook-plugin ./plugin

View File

@ -32,8 +32,12 @@ import (
"github.com/dragonflyoss/image-service/contrib/nydusify/pkg/viewer"
)
var versionGitCommit string
var versionBuildTime string
var (
revision string
buildTime string
gitVersion string
)
var maxCacheMaxRecords uint = 50
const defaultLogLevel = logrus.InfoLevel
@ -47,7 +51,7 @@ func isPossibleValue(excepted []string, value string) bool {
return false
}
// This only works for OSS backend rightnow
// This only works for OSS backend right now
func parseBackendConfig(backendConfigJSON, backendConfigFile string) (string, error) {
if backendConfigJSON != "" && backendConfigFile != "" {
return "", fmt.Errorf("--backend-config conflicts with --backend-config-file")
@ -163,7 +167,7 @@ func getPrefetchPatterns(c *cli.Context) (string, error) {
patterns = prefetchedDir
}
if len(patterns) <= 0 {
if len(patterns) == 0 {
patterns = "/"
}
@ -175,7 +179,7 @@ func main() {
FullTimestamp: true,
})
version := fmt.Sprintf("%s.%s", versionGitCommit, versionBuildTime)
version := fmt.Sprintf("\nVersion : %s\nRevision : %s\nGo version : %s\nBuild time : %s", gitVersion, revision, runtime.Version(), buildTime)
app := &cli.App{
Name: "Nydusify",
@ -360,7 +364,18 @@ func main() {
Usage: "Read prefetch list from STDIN, please input absolute paths line by line",
EnvVars: []string{"PREFETCH_PATTERNS"},
},
&cli.StringFlag{
Name: "compressor",
Value: "zstd",
Usage: "Algorithm to compress image data blob, possible values: none, lz4_block, zstd",
EnvVars: []string{"COMPRESSOR"},
},
&cli.StringFlag{
Name: "chunk-size",
Value: "0x100000",
Usage: "size of nydus image data chunk, must be power of two and between 0x1000-0x100000, [default: 0x100000]",
EnvVars: []string{"CHUNK_SIZE"},
},
&cli.StringFlag{
Name: "work-dir",
Value: "./tmp",
@ -462,6 +477,8 @@ func main() {
NydusifyVersion: version,
FsVersion: fsVersion,
FsAlignChunk: c.Bool("backend-aligned-chunk") || c.Bool("fs-align-chunk"),
Compressor: c.String("compressor"),
ChunkSize: c.String("chunk-size"),
ChunkDict: converter.ChunkDictOpt{
Args: c.String("chunk-dict"),
@ -773,6 +790,18 @@ func main() {
Value: "5",
DefaultText: "V5 format",
},
&cli.StringFlag{
Name: "compressor",
Value: "zstd",
Usage: "Algorithm to compress image data blob, possible values: none, lz4_block, zstd",
EnvVars: []string{"COMPRESSOR"},
},
&cli.StringFlag{
Name: "chunk-size",
Value: "0x100000",
Usage: "size of nydus image data chunk, must be power of two and between 0x1000-0x100000, [default: 0x100000]",
EnvVars: []string{"CHUNK_SIZE"},
},
&cli.StringFlag{
Name: "nydus-image",
@ -845,11 +874,6 @@ func main() {
},
}
// With linux/arm64 platform, containerd/compression prioritizes `unpigz`
// to decompress tar.gz file, which may generate corrupted data somehow.
// Keep the same behavior with x86_64 platform by disabling pigz.
os.Setenv("CONTAINERD_DISABLE_PIGZ", "1")
if !utils.IsSupportedArch(runtime.GOARCH) {
logrus.Fatal("Nydusify can only work under architecture 'amd64' and 'arm64'")
}

View File

@ -43,10 +43,9 @@
"digest": "sha256:aec98c9e3dce739877b8f5fe1cddd339de1db2b36c20995d76f6265056dbdb08",
"size": 273320,
"annotations": {
"containerd.io/snapshot/nydus-blob-ids": "[\"09845cce1d983b158d4865fc37c23bbfb892d4775c786e8114d3cf868975c059\",\"b413839e4ee5248697ef30fe9a84b659fa744d69bbc9b7754113adc2b2b6bc90\",\"b6a85be8248b0d3c2f0565ef71d549f404f8edcee1ab666c9871a8e6d9384860\",\"00d151e7d392e68e2c756a6fc42640006ddc0a98d37dba3f90a7b73f63188bbd\"]",
"containerd.io/snapshot/nydus-bootstrap": "true",
"containerd.io/snapshot/nydus-reference-blob-ids": "[\"09845cce1d983b158d4865fc37c23bbfb892d4775c786e8114d3cf868975c059\"]"
}
}
]
}
}

View File

@ -113,6 +113,7 @@ func (b *OSSBackend) Upload(ctx context.Context, blobID, blobPath string, size i
blobObjectKey := b.objectPrefix + blobID
desc := blobDesc(size, blobID)
desc.URLs = append(desc.URLs, b.remoteID(blobID))
if !forcePush {
if exist, err := b.bucket.IsObjectExist(blobObjectKey); err != nil {
@ -254,3 +255,7 @@ func (b *OSSBackend) Check(blobID string) (bool, error) {
func (b *OSSBackend) Type() Type {
return OssBackend
}
func (b *OSSBackend) remoteID(blobID string) string {
return fmt.Sprintf("oss://%s/%s%s", b.bucket.BucketName, b.objectPrefix, blobID)
}

View File

@ -26,6 +26,8 @@ type BuilderOption struct {
// A regular file or fifo into which commands nydus-image to dump contents.
BlobPath string
AlignedChunk bool
Compressor string
ChunkSize string
FsVersion string
}
@ -125,10 +127,18 @@ func (builder *Builder) Run(option BuilderOption) error {
option.FsVersion,
)
if option.Compressor != "" {
args = append(args, "--compressor", option.Compressor)
}
if len(option.PrefetchPatterns) > 0 {
args = append(args, "--prefetch-policy", "fs")
}
if option.ChunkSize != "" {
args = append(args, "--chunk-size", option.ChunkSize)
}
args = append(args, option.RootfsPath)
return builder.run(args, option.PrefetchPatterns)

View File

@ -22,6 +22,8 @@ type WorkflowOption struct {
NydusImagePath string
PrefetchPatterns string
FsVersion string
Compressor string
ChunkSize string
}
type Workflow struct {
@ -120,6 +122,8 @@ func (workflow *Workflow) Build(
AlignedChunk: alignedChunk,
ChunkDict: workflow.ChunkDict,
FsVersion: workflow.FsVersion,
Compressor: workflow.Compressor,
ChunkSize: workflow.ChunkSize,
}); err != nil {
return "", errors.Wrapf(err, "build layer %s", layerDir)
}

View File

@ -145,8 +145,8 @@ func (cache *Cache) recordToLayer(record *Record) (*ocispec.Descriptor, *ocispec
utils.LayerAnnotationUncompressed: record.NydusBootstrapDiffID.String(),
},
}
if refenceBlobsStr, ok := record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs]; ok {
bootstrapCacheDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs] = refenceBlobsStr
if referenceBlobsStr, ok := record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs]; ok {
bootstrapCacheDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs] = referenceBlobsStr
}
var blobCacheDesc *ocispec.Descriptor

View File

@ -16,6 +16,7 @@ import (
"github.com/dragonflyoss/image-service/contrib/nydusify/pkg/checker/tool"
"github.com/dragonflyoss/image-service/contrib/nydusify/pkg/converter/provider"
"github.com/dragonflyoss/image-service/contrib/nydusify/pkg/parser"
"github.com/dragonflyoss/image-service/contrib/nydusify/pkg/remote"
"github.com/dragonflyoss/image-service/contrib/nydusify/pkg/utils"
)
@ -137,6 +138,11 @@ func (checker *Checker) check(ctx context.Context) error {
}
}
var sourceRemote *remote.Remote
if checker.sourceParser != nil {
sourceRemote = checker.sourceParser.Remote
}
rules := []rule.Rule{
&rule.ManifestRule{
SourceParsed: sourceParsed,
@ -157,7 +163,7 @@ func (checker *Checker) check(ctx context.Context) error {
SourceMountPath: filepath.Join(checker.WorkDir, "fs/source_mounted"),
SourceParsed: sourceParsed,
SourcePath: filepath.Join(checker.WorkDir, "fs/source"),
SourceRemote: checker.sourceParser.Remote,
SourceRemote: sourceRemote,
Target: checker.Target,
TargetInsecure: checker.TargetInsecure,
PlainHTTP: checker.targetParser.Remote.IsWithHTTP(),

View File

@ -205,14 +205,15 @@ func (rule *FilesystemRule) pullSourceImage() (*tool.Image, error) {
func (rule *FilesystemRule) mountSourceImage() (*tool.Image, error) {
logrus.Infof("Mounting source image to %s", rule.SourceMountPath)
if err := os.MkdirAll(rule.SourceMountPath, 0755); err != nil {
return nil, errors.Wrap(err, "create mountpoint directory of source image")
}
image, err := rule.pullSourceImage()
if err != nil {
return nil, errors.Wrap(err, "pull source image")
}
if err := image.Umount(); err != nil {
return nil, errors.Wrap(err, "umount previous rootfs")
}
if err := image.Mount(); err != nil {
return nil, errors.Wrap(err, "mount source image")
}

View File

@ -7,18 +7,42 @@ package tool
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/containerd/containerd/mount"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
func run(cmd string, args ...string) error {
_cmd := exec.Command("sh", "-c", cmd)
_cmd.Stdout = os.Stdout
_cmd.Stderr = os.Stderr
return _cmd.Run()
func mkMounts(dirs []string) []mount.Mount {
var options []string
if len(dirs) == 0 {
return nil
}
if len(dirs) == 1 {
return []mount.Mount{
{
Source: dirs[0],
Type: "bind",
Options: []string{
"ro",
"rbind",
},
},
}
}
options = append(options, fmt.Sprintf("lowerdir=%s", strings.Join(dirs, ":")))
return []mount.Mount{
{
Type: "overlay",
Source: "overlay",
Options: options,
},
}
}
type Image struct {
@ -30,41 +54,20 @@ type Image struct {
// Mount mounts rootfs of OCI image.
func (image *Image) Mount() error {
image.Umount()
if err := os.MkdirAll(image.Rootfs, 0750); err != nil {
return errors.Wrap(err, "create rootfs dir")
}
var dirs []string
layerLen := len(image.Layers)
count := len(image.Layers)
for i := range image.Layers {
layerDir := filepath.Join(image.SourcePath, image.Layers[layerLen-i-1].Digest.Encoded())
layerDir := filepath.Join(image.SourcePath, image.Layers[count-i-1].Digest.Encoded())
dirs = append(dirs, strings.ReplaceAll(layerDir, ":", "\\:"))
}
lowerOption := strings.Join(dirs, ":")
// Handle long options string overed 4096 chars, split them to
// two overlay mounts.
if len(lowerOption) >= 4096 {
half := (len(dirs) - 1) / 2
upperDirs := dirs[half+1:]
lowerDirs := dirs[:half+1]
lowerOverlay := image.Rootfs + "_lower"
if err := os.MkdirAll(lowerOverlay, 0755); err != nil {
return err
}
if err := run(fmt.Sprintf(
"mount -t overlay overlay -o lowerdir='%s' %s",
strings.Join(upperDirs, ":"), lowerOverlay,
)); err != nil {
return err
}
lowerDirs = append(lowerDirs, lowerOverlay)
lowerOption = strings.Join(lowerDirs, ":")
}
if err := run(fmt.Sprintf(
"mount -t overlay overlay -o lowerdir='%s' %s",
lowerOption, image.Rootfs,
)); err != nil {
return err
mounts := mkMounts(dirs)
if err := mount.All(mounts, image.Rootfs); err != nil {
return errors.Wrap(err, "mount source layer")
}
return nil
@ -72,21 +75,19 @@ func (image *Image) Mount() error {
// Umount umounts rootfs mountpoint of OCI image.
func (image *Image) Umount() error {
lowerOverlay := image.Rootfs + "_lower"
if _, err := os.Stat(lowerOverlay); err == nil {
if err := run(fmt.Sprintf("umount %s", lowerOverlay)); err != nil {
return err
if _, err := os.Stat(image.Rootfs); err != nil {
if os.IsNotExist(err) {
return nil
}
return errors.Wrap(err, "stat rootfs")
}
if _, err := os.Stat(image.Rootfs); err == nil {
if err := run(fmt.Sprintf("umount %s", image.Rootfs)); err != nil {
return err
}
if err := mount.Unmount(image.Rootfs, 0); err != nil {
return errors.Wrap(err, "umount rootfs")
}
if err := os.RemoveAll(image.SourcePath); err != nil {
return err
if err := os.RemoveAll(image.Rootfs); err != nil {
return errors.Wrap(err, "remove rootfs")
}
return nil

View File

@ -49,10 +49,15 @@ func newCacheGlue(
return nil, errors.Wrap(err, "Import cache image")
}
// Ingore the error of importing cache image, it doesn't affect
// Ignore the error of importing cache image, it doesn't affect
// the build workflow.
if err := cache.Import(ctx); err != nil {
logrus.Warnf("Failed to import cache: %s", err)
if utils.RetryWithHTTP(err) {
cacheRemote.MaybeWithHTTP(err)
if err := cache.Import(ctx); err != nil {
logrus.Warnf("Failed to import cache: %s", err)
}
}
}
return &cacheGlue{

View File

@ -84,6 +84,8 @@ type Opt struct {
DockerV2Format bool
FsVersion string
FsAlignChunk bool
Compressor string
ChunkSize string
PrefetchPatterns string
}
@ -110,6 +112,8 @@ type Converter struct {
DockerV2Format bool
FsVersion string
FsAlignChunk bool
Compressor string
ChunkSize string
PrefetchPatterns string
}
@ -154,6 +158,8 @@ func New(opt Opt) (*Converter, error) {
DockerV2Format: opt.DockerV2Format,
BackendForcePush: opt.BackendForcePush,
FsAlignChunk: opt.FsAlignChunk,
Compressor: opt.Compressor,
ChunkSize: opt.ChunkSize,
NydusifyVersion: opt.NydusifyVersion,
storageBackend: backend,
@ -218,6 +224,8 @@ func (cvt *Converter) convert(ctx context.Context) (retErr error) {
PrefetchPatterns: cvt.PrefetchPatterns,
TargetDir: cvt.WorkDir,
FsVersion: cvt.FsVersion,
Compressor: cvt.Compressor,
ChunkSize: cvt.ChunkSize,
})
if err != nil {
return errors.Wrap(err, "Create build flow")
@ -424,9 +432,6 @@ func (cvt *Converter) Convert(ctx context.Context) error {
if utils.RetryWithHTTP(err) {
cvt.SourceRemote.MaybeWithHTTP(err)
cvt.TargetRemote.MaybeWithHTTP(err)
if cvt.CacheRemote != nil {
cvt.CacheRemote.MaybeWithHTTP(err)
}
return cvt.convert(ctx)
}
return errors.Wrap(err, "Failed to convert")

View File

@ -194,19 +194,15 @@ func appendBlobs(oldBlobs []string, newBlobs []string) []string {
return oldBlobs
}
func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer) error {
func (mm *manifestManager) Push(ctx context.Context, builtLayers []*buildLayer) error {
var (
blobListInAnnotation []string
referenceBlobs []string
layers []ocispec.Descriptor
referenceBlobs []string
layers []ocispec.Descriptor
)
for idx, _layer := range buildLayers {
for idx, _layer := range builtLayers {
record := _layer.GetCacheRecord()
referenceBlobs = appendBlobs(referenceBlobs, layersHex(_layer.referenceBlobs))
blobListInAnnotation = appendBlobs(blobListInAnnotation, layersHex(_layer.referenceBlobs))
if record.NydusBlobDesc != nil {
// Write blob digest list in JSON format to layer annotation of bootstrap.
blobListInAnnotation = append(blobListInAnnotation, record.NydusBlobDesc.Digest.Hex())
// For registry backend, we need to write the blob layer to
// manifest to prevent them from being deleted by registry GC.
if mm.backend.Type() == backend.RegistryBackend {
@ -222,15 +218,10 @@ func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer)
}
}
// Only need to write lastest bootstrap layer in nydus manifest
if idx == len(buildLayers)-1 {
blobListBytes, err := json.Marshal(blobListInAnnotation)
if err != nil {
return errors.Wrap(err, "Marshal blob list")
}
record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusBlobIDs] = string(blobListBytes)
// Only need to write latest bootstrap layer in nydus manifest
if idx == len(builtLayers)-1 {
if len(referenceBlobs) > 0 {
blobListBytes, err = json.Marshal(referenceBlobs)
blobListBytes, err := json.Marshal(referenceBlobs)
if err != nil {
return errors.Wrap(err, "Marshal blob list")
}
@ -250,7 +241,6 @@ func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer)
// Remove useless annotations from layer
validAnnotationKeys := map[string]bool{
utils.LayerAnnotationNydusBlob: true,
utils.LayerAnnotationNydusBlobIDs: true,
utils.LayerAnnotationNydusReferenceBlobIDs: true,
utils.LayerAnnotationNydusBootstrap: true,
utils.LayerAnnotationNydusFsVersion: true,

View File

@ -67,7 +67,7 @@ func (cfg *BackendConfig) rawMetaBackendCfg() []byte {
"access_key_id": cfg.AccessKeyID,
"access_key_secret": cfg.AccessKeySecret,
"bucket_name": cfg.BucketName,
"object_prefix": cfg.MetaPrefix + "/",
"object_prefix": cfg.MetaPrefix,
}
b, _ := json.Marshal(configMap)
return b
@ -79,7 +79,7 @@ func (cfg *BackendConfig) rawBlobBackendCfg() []byte {
"access_key_id": cfg.AccessKeyID,
"access_key_secret": cfg.AccessKeySecret,
"bucket_name": cfg.BucketName,
"object_prefix": cfg.BlobPrefix + "/",
"object_prefix": cfg.BlobPrefix,
}
b, _ := json.Marshal(configMap)
return b
@ -316,7 +316,7 @@ func (p *Packer) Pack(_ context.Context, req PackRequest) (PackResult, error) {
return PackResult{}, errors.New("can not push image to remote due to lack of backend configuration")
}
pushResult, err := p.pusher.Push(PushRequest{
Meta: bootstrapPath,
Meta: req.ImageName,
Blob: newBlobHash,
ParentBlobs: parentBlobs,
})

View File

@ -3,7 +3,6 @@ package packer
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
@ -69,12 +68,23 @@ func NewPusher(opt NewPusherOpt) (*Pusher, error) {
// Push will push the meta and blob file to remote backend
// at this moment, oss is the only possible backend, the meta file name is user defined
// and blob file name is the hash of the blobfile that is extracted from output.json
func (p *Pusher) Push(req PushRequest) (PushResult, error) {
func (p *Pusher) Push(req PushRequest) (pushResult PushResult, retErr error) {
p.logger.Info("start to push meta and blob to remote backend")
// todo: add a suitable timeout
ctx := context.Background()
// todo: use blob desc to build manifest
defer func() {
if retErr != nil {
if err := p.blobBackend.Finalize(true); err != nil {
logrus.WithError(err).Warnf("Cancel blob backend upload")
}
if err := p.metaBackend.Finalize(true); err != nil {
logrus.WithError(err).Warnf("Cancel meta backend upload")
}
}
}()
for _, blob := range req.ParentBlobs {
// try push parent blobs
if _, err := p.blobBackend.Upload(ctx, blob, p.blobFilePath(blob, true), 0, false); err != nil {
@ -84,18 +94,30 @@ func (p *Pusher) Push(req PushRequest) (PushResult, error) {
p.logger.Infof("push blob %s", req.Blob)
if req.Blob != "" {
if _, err := p.blobBackend.Upload(ctx, req.Blob, p.blobFilePath(req.Blob, true), 0, false); err != nil {
desc, err := p.blobBackend.Upload(ctx, req.Blob, p.blobFilePath(req.Blob, true), 0, false)
if err != nil {
return PushResult{}, errors.Wrap(err, "failed to put blobfile to remote")
}
if len(desc.URLs) > 0 {
pushResult.RemoteBlob = desc.URLs[0]
}
}
if _, err := p.metaBackend.Upload(ctx, req.Meta, p.bootstrapPath(req.Meta), 0, true); err != nil {
return PushResult{}, errors.Wrapf(err, "failed to put metafile to remote")
if retErr = p.blobBackend.Finalize(false); retErr != nil {
return PushResult{}, errors.Wrap(retErr, "Finalize blob backend upload")
}
return PushResult{
RemoteMeta: fmt.Sprintf("oss://%s/%s/%s", p.cfg.BucketName, p.cfg.MetaPrefix, req.Meta),
RemoteBlob: fmt.Sprintf("oss://%s/%s/%s", p.cfg.BucketName, p.cfg.BlobPrefix, req.Blob),
}, nil
desc, retErr := p.metaBackend.Upload(ctx, req.Meta, p.bootstrapPath(req.Meta), 0, true)
if retErr != nil {
return PushResult{}, errors.Wrapf(retErr, "failed to put metafile to remote")
}
if len(desc.URLs) != 0 {
pushResult.RemoteMeta = desc.URLs[0]
}
if retErr = p.metaBackend.Finalize(false); retErr != nil {
return PushResult{}, errors.Wrap(retErr, "Finalize meta backend upload")
}
return
}
func ParseBackendConfig(backendConfigFile string) (BackendConfig, error) {

View File

@ -20,7 +20,8 @@ type mockBackend struct {
func (m *mockBackend) Upload(ctx context.Context, blobID, blobPath string, blobSize int64, forcePush bool) (*ocispec.Descriptor, error) {
args := m.Called(ctx, blobID, blobPath, blobSize, forcePush)
return nil, args.Error(0)
desc := args.Get(0)
return desc.(*ocispec.Descriptor), nil
}
func (m *mockBackend) Finalize(cancel bool) error {
@ -94,8 +95,12 @@ func TestPusher_Push(t *testing.T) {
hash := "3093776c78a21e47f0a8b4c80a1f019b1e838fc1ade274209332af1ca5f57090"
assert.Nil(t, err)
mp.On("Upload", mock.Anything, "mock.meta", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
mp.On("Upload", mock.Anything, hash, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
mp.On("Upload", mock.Anything, "mock.meta", mock.Anything, mock.Anything, mock.Anything).Return(&ocispec.Descriptor{
URLs: []string{"oss://testbucket/testmetaprefix/mock.meta"},
}, nil)
mp.On("Upload", mock.Anything, hash, mock.Anything, mock.Anything, mock.Anything).Return(&ocispec.Descriptor{
URLs: []string{"oss://testbucket/testblobprefix/3093776c78a21e47f0a8b4c80a1f019b1e838fc1ade274209332af1ca5f57090"},
}, nil)
res, err := pusher.Push(PushRequest{
Meta: "mock.meta",
Blob: hash,

View File

@ -14,7 +14,6 @@ const (
LayerAnnotationNydusBlob = "containerd.io/snapshot/nydus-blob"
LayerAnnotationNydusBlobDigest = "containerd.io/snapshot/nydus-blob-digest"
LayerAnnotationNydusBlobSize = "containerd.io/snapshot/nydus-blob-size"
LayerAnnotationNydusBlobIDs = "containerd.io/snapshot/nydus-blob-ids"
LayerAnnotationNydusBootstrap = "containerd.io/snapshot/nydus-bootstrap"
LayerAnnotationNydusFsVersion = "containerd.io/snapshot/nydus-fs-version"
LayerAnnotationNydusSourceChainID = "containerd.io/snapshot/nydus-source-chainid"

View File

@ -29,7 +29,7 @@ $ sudo tee /etc/nydusd-config.json > /dev/null << EOF
"backend": {
"type": "registry",
"config": {
"scheme": "http",
"scheme": "",
"skip_verify": false,
"timeout": 5,
"connect_timeout": 5,
@ -58,7 +58,7 @@ EOF
Note:
- You might have to change the scheme from `http` to `https` according to you registry configuration.
- The `scheme` is registry url scheme, leave empty to automatically detect, otherwise specify to `https` or `http` according to your registry server configuration.
- The `auth` is base64 encoded `username:password`. It is required by `nydusd` to lazily pull image data from registry which is authentication enabled.
- `containerd-nydus-grpc` will automatically read docker login auth from the configuration `$HOME/.docker/config.json`, otherwise please copy it to replace `YOUR_LOGIN_AUTH=`.

View File

@ -181,8 +181,8 @@ We are working on enabling cloud-hypervisor support for nydus.
"type": "registry",
"config": {
...
// Registry url scheme, https or http
"scheme": "http",
// Registry url scheme, leave empty to automatically detect, otherwise specify to https or http.
"scheme": "",
// Registry hostname with format `$host:$port`
"host": "my-registry:5000",
// Skip SSL certificate validation for HTTPS scheme
@ -256,19 +256,30 @@ Currently, the mirror mode is only tested in the registry backend, and in theory
{
// Mirror server URL (include scheme), e.g. Dragonfly dfdaemon server URL
"host": "http://dragonfly1.io:65001",
// true: Send the authorization request to the mirror e.g. another docker registry.
// false: Authorization request won't be relayed by the mirror e.g. Dragonfly.
"auth_through": false,
// Headers for mirror server
"headers": {
// For Dragonfly dfdaemon server URL, we need to specify "X-Dragonfly-Registry" (include scheme).
// When Dragonfly does not cache data, it will pull them from "X-Dragonfly-Registry".
// If not set "X-Dragonfly-Registry", Dragonfly will pull data from proxy.registryMirror.url.
"X-Dragonfly-Registry": "https://index.docker.io"
}
},
// This URL endpoint is used to check the health of mirror server, and if the mirror is unhealthy,
// the request will fallback to the next mirror or the original registry server.
// Use $host/v2 as default if left empty.
"ping_url": "http://127.0.0.1:40901/server/ping",
// Interval time (s) to check and recover unavailable mirror. Use 5 as default if left empty.
"health_check_interval": 5,
// Failure counts before disabling this mirror. Use 5 as default if left empty.
"failure_limit": 5,
},
{
"host": "http://dragonfly2.io:65001",
"headers": {
"X-Dragonfly-Registry": "https://index.docker.io"
}
},
}
],
...

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-error"
version = "0.2.1"
version = "0.2.2"
description = "Error handling utilities for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-rafs"
version = "0.1.0"
version = "0.1.1"
description = "The RAFS filesystem format for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -36,7 +36,10 @@ use serde::Deserialize;
use nydus_api::http::{BlobPrefetchConfig, FactoryConfig};
use nydus_storage::device::{BlobDevice, BlobPrefetchRequest};
use nydus_utils::metrics::{self, FopRecorder, StatsFop::*};
use nydus_utils::{
div_round_up,
metrics::{self, FopRecorder, StatsFop::*},
};
use storage::RAFS_DEFAULT_CHUNK_SIZE;
use crate::metadata::{
@ -505,10 +508,10 @@ impl Rafs {
device: BlobDevice,
) {
// First do range based prefetch for rafs v6.
if sb.meta.is_v6() {
let blob_infos = sb.superblock.get_blob_infos();
if sb.meta.is_v6() && !blob_infos.is_empty() {
let mut prefetches = Vec::new();
for blob in sb.superblock.get_blob_infos() {
for blob in &blob_infos {
let sz = blob.readahead_size();
if sz > 0 {
let mut offset = 0;
@ -530,40 +533,86 @@ impl Rafs {
}
}
let mut ignore_prefetch_all = prefetch_files
// Bootstrap has non-empty prefetch table indicating a full prefetch
let inlay_prefetch_all = sb
.is_inlay_prefetch_all(&mut reader)
.map_err(|e| error!("Detect prefetch table error {}", e))
.unwrap_or_default();
// Nydusd has a CLI option indicating a full prefetch
let startup_prefetch_all = prefetch_files
.as_ref()
.map(|f| f.len() == 1 && f[0].as_os_str() == "/")
.unwrap_or(false);
// Then do file based prefetch based on:
// - prefetch listed passed in by user
// - or file prefetch list in metadata
let inodes = prefetch_files.map(|files| Self::convert_file_list(&files, &sb));
let res = sb.prefetch_files(&mut reader, root_ino, inodes, &|desc| {
if desc.bi_size > 0 {
device.prefetch(&[desc], &[]).unwrap_or_else(|e| {
warn!("Prefetch error, {:?}", e);
});
}
});
match res {
Ok(true) => ignore_prefetch_all = true,
Ok(false) => {}
Err(e) => info!("No file to be prefetched {:?}", e),
}
let mut ignore_prefetch_all = false;
// Last optionally prefetch all data
if prefetch_all && !ignore_prefetch_all {
let root = vec![root_ino];
let res = sb.prefetch_files(&mut reader, root_ino, Some(root), &|desc| {
// User specified prefetch files have high priority to be prefetched.
// Moreover, user specified prefetch files list will override those on-disk prefetch table.
if !startup_prefetch_all && !inlay_prefetch_all {
// Then do file based prefetch based on:
// - prefetch listed passed in by user
// - or file prefetch list in metadata
let inodes = prefetch_files.map(|files| Self::convert_file_list(&files, &sb));
let res = sb.prefetch_files(&mut reader, root_ino, inodes, &|desc| {
if desc.bi_size > 0 {
device.prefetch(&[desc], &[]).unwrap_or_else(|e| {
warn!("Prefetch error, {:?}", e);
});
}
});
if let Err(e) = res {
info!("No file to be prefetched {:?}", e);
match res {
Ok(true) => {
ignore_prefetch_all = true;
info!("Root inode was found, but it should not prefetch all files!")
}
Ok(false) => {}
Err(e) => info!("No file to be prefetched {:?}", e),
}
}
// Perform different policy for v5 format and v6 format as rafs v6's blobs are capable to
// to download chunks and decompress them all by themselves. For rafs v6, directly perform
// chunk based full prefetch
if !ignore_prefetch_all && (inlay_prefetch_all || prefetch_all || startup_prefetch_all) {
if sb.meta.is_v6() {
// The larger batch size, the fewer requests to registry
let batch_size = 1024 * 1024 * 2;
for blob in &blob_infos {
let blob_size = blob.compressed_size();
let count = div_round_up(blob_size, batch_size);
let mut pre_offset = 0u64;
for _i in 0..count {
let req = BlobPrefetchRequest {
blob_id: blob.blob_id().to_owned(),
offset: pre_offset,
len: cmp::min(batch_size, blob_size - pre_offset),
};
device
.prefetch(&[], &[req])
.map_err(|e| warn!("failed to prefetch blob data, {}", e))
.unwrap_or_default();
pre_offset += batch_size;
if pre_offset > blob_size {
break;
}
}
}
} else {
let root = vec![root_ino];
let res = sb.prefetch_files(&mut reader, root_ino, Some(root), &|desc| {
if desc.bi_size > 0 {
device.prefetch(&[desc], &[]).unwrap_or_else(|e| {
warn!("Prefetch error, {:?}", e);
});
}
});
if let Err(e) = res {
info!("No file to be prefetched {:?}", e);
}
}
}
}

View File

@ -534,7 +534,11 @@ impl OndiskInodeWrapper {
// Because the other blocks should be fully used, while the last may not.
let len = if div_round_up(self.size(), EROFS_BLOCK_SIZE) as usize == block_index + 1
{
(self.size() % EROFS_BLOCK_SIZE - s) as usize
if self.size() % EROFS_BLOCK_SIZE == 0 {
EROFS_BLOCK_SIZE as usize
} else {
(self.size() % EROFS_BLOCK_SIZE - s) as usize
}
} else {
(EROFS_BLOCK_SIZE - s) as usize
};
@ -640,37 +644,40 @@ impl OndiskInodeWrapper {
Ok(chunks)
}
fn find_target_block(&self, name: &OsStr) -> Result<usize> {
fn find_target_block(&self, name: &OsStr) -> Result<Option<usize>> {
let inode = self.disk_inode();
if inode.size() == 0 {
return Err(enoent!());
return Ok(None);
}
let blocks_count = div_round_up(inode.size(), EROFS_BLOCK_SIZE);
// find target block
let mut first = 0usize;
let mut last = (blocks_count - 1) as usize;
let mut target_block = 0usize;
let mut first = 0;
let mut last = (blocks_count - 1) as i64;
while first <= last {
let pivot = first + ((last - first) >> 1);
let head_entry = self.get_entry(pivot, 0).map_err(err_invalidate_data)?;
let head_entry = self
.get_entry(pivot as usize, 0)
.map_err(err_invalidate_data)?;
let head_name_offset = head_entry.e_nameoff as usize;
let entries_count = head_name_offset / size_of::<RafsV6Dirent>();
let h_name = self
.entry_name(pivot, 0, entries_count)
.entry_name(pivot as usize, 0, entries_count)
.map_err(err_invalidate_data)?;
let t_name = self
.entry_name(pivot, entries_count - 1, entries_count)
.entry_name(pivot as usize, entries_count - 1, entries_count)
.map_err(err_invalidate_data)?;
if h_name <= name && t_name >= name {
target_block = pivot;
break;
return Ok(Some(pivot as usize));
} else if h_name > name {
if pivot == 0 {
break;
}
last = pivot - 1;
} else {
first = pivot + 1;
}
}
Ok(target_block)
Ok(None)
}
}
@ -783,7 +790,7 @@ impl RafsInode for OndiskInodeWrapper {
fn get_child_by_name(&self, name: &OsStr) -> Result<Arc<dyn RafsInode>> {
let mut target: Option<u64> = None;
// find target dirent
if let Ok(target_block) = self.find_target_block(name) {
if let Some(target_block) = self.find_target_block(name)? {
let head_entry = self
.get_entry(target_block, 0)
.map_err(err_invalidate_data)?;
@ -791,14 +798,14 @@ impl RafsInode for OndiskInodeWrapper {
let entries_count = head_name_offset / size_of::<RafsV6Dirent>();
let mut first = 0;
let mut last = entries_count - 1;
let mut last = (entries_count - 1) as i64;
while first <= last {
let pivot = first + ((last - first) >> 1);
let de = self
.get_entry(target_block, pivot)
.get_entry(target_block, pivot as usize)
.map_err(err_invalidate_data)?;
let d_name = self
.entry_name(target_block, pivot, entries_count)
.entry_name(target_block, pivot as usize, entries_count)
.map_err(err_invalidate_data)?;
match d_name.cmp(name) {
Ordering::Equal => {

View File

@ -68,6 +68,38 @@ impl RafsSuper {
}
}
pub(crate) fn is_inlay_prefetch_all(&self, r: &mut RafsIoReader) -> RafsResult<bool> {
let hint_entries = self.meta.prefetch_table_entries as usize;
if hint_entries != 1 {
return Ok(false);
}
let unique = if self.meta.is_v6() {
let mut prefetch_table = RafsV6PrefetchTable::new();
prefetch_table
.load_prefetch_table_from(r, self.meta.prefetch_table_offset, hint_entries)
.map_err(|e| {
RafsError::Prefetch(format!(
"Failed in loading hint prefetch table at offset {}. {:?}",
self.meta.prefetch_table_offset, e
))
})?;
prefetch_table.inodes[0] as u64
} else {
let mut prefetch_table = RafsV5PrefetchTable::new();
prefetch_table
.load_prefetch_table_from(r, self.meta.prefetch_table_offset, hint_entries)
.map_err(|e| {
RafsError::Prefetch(format!(
"Failed in loading hint prefetch table at offset {}. {:?}",
self.meta.prefetch_table_offset, e
))
})?;
prefetch_table.inodes[0] as u64
};
Ok(unique == self.superblock.root_ino())
}
pub(crate) fn prefetch_data_v6<F>(
&self,
r: &mut RafsIoReader,

View File

@ -255,6 +255,19 @@ bitflags! {
const COMPRESS_GZIP = 0x0000_0040;
// V5: Data chunks are compressed with zstd
const COMPRESS_ZSTD = 0x0000_0080;
// Reserved for v2.2: chunk digests are inlined in RAFS v6 data blob.
const PRESERVED_INLINED_CHUNK_DIGEST = 0x0000_0100;
// Reserved for future compatible changes.
const PRESERVED_COMPAT_7 = 0x0100_0000;
const PRESERVED_COMPAT_6 = 0x0200_0000;
const PRESERVED_COMPAT_5 = 0x0400_0000;
const PRESERVED_COMPAT_4 = 0x0800_0000;
const PRESERVED_COMPAT_3 = 0x1000_0000;
const PRESERVED_COMPAT_2 = 0x2000_0000;
const PRESERVED_COMPAT_1 = 0x4000_0000;
const PRESERVED_COMPAT_0 = 0x8000_0000;
}
}
@ -635,6 +648,7 @@ impl RafsSuper {
///
/// Each inode passed into should correspond to directory. And it already does the file type
/// check inside.
/// Return Ok(true) means root inode is found during performing prefetching and all files should be prefetched.
pub fn prefetch_files(
&self,
r: &mut RafsIoReader,

View File

@ -405,12 +405,13 @@ impl Bootstrap {
let inode_table_size = inode_table.size();
// Set prefetch table
let (prefetch_table_size, prefetch_table_entries) =
if let Some(prefetch_table) = ctx.prefetch.get_rafsv5_prefetch_table() {
(prefetch_table.size(), prefetch_table.len() as u32)
} else {
(0, 0u32)
};
let (prefetch_table_size, prefetch_table_entries) = if let Some(prefetch_table) =
ctx.prefetch.get_rafsv5_prefetch_table(&bootstrap_ctx.nodes)
{
(prefetch_table.size(), prefetch_table.len() as u32)
} else {
(0, 0u32)
};
// Set blob table, use sha256 string (length 64) as blob id if not specified
let prefetch_table_offset = super_block_size + inode_table_size;
@ -481,7 +482,9 @@ impl Bootstrap {
.context("failed to store inode table")?;
// Dump prefetch table
if let Some(mut prefetch_table) = ctx.prefetch.get_rafsv5_prefetch_table() {
if let Some(mut prefetch_table) =
ctx.prefetch.get_rafsv5_prefetch_table(&bootstrap_ctx.nodes)
{
prefetch_table
.store(bootstrap_ctx.writer.as_mut())
.context("failed to store prefetch table")?;

View File

@ -165,12 +165,12 @@ impl Prefetch {
indexes
}
pub fn get_rafsv5_prefetch_table(&mut self) -> Option<RafsV5PrefetchTable> {
pub fn get_rafsv5_prefetch_table(&mut self, nodes: &[Node]) -> Option<RafsV5PrefetchTable> {
if self.policy == PrefetchPolicy::Fs {
let mut prefetch_table = RafsV5PrefetchTable::new();
for i in self.readahead_patterns.values().filter_map(|v| *v) {
// Rafs v5 has inode number equal to index.
prefetch_table.add_entry(i as u32);
prefetch_table.add_entry(nodes[i as usize - 1].inode.ino() as u32);
}
Some(prefetch_table)
} else {

View File

@ -3,11 +3,12 @@
// SPDX-License-Identifier: Apache-2.0
use std::{
ffi::OsString,
fs::Permissions,
io::{Error, ErrorKind, Write},
ops::DerefMut,
os::unix::prelude::PermissionsExt,
path::Path,
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
@ -164,7 +165,8 @@ impl RafsInspector {
fn cmd_stat_file(&self, file_name: &str) -> Result<Option<Value>, anyhow::Error> {
// Stat current directory
if file_name == "." {
return self.stat_single_file(self.cur_dir_ino);
let inode = self.rafs_meta.get_inode(self.cur_dir_ino, false)?;
return self.stat_single_file(Some(inode.parent()), self.cur_dir_ino);
}
// Walk through children inodes to find the file
@ -173,7 +175,7 @@ impl RafsInspector {
dir_inode.walk_children_inodes(0, &mut |_inode, child_name, child_ino, _offset| {
if child_name == file_name {
// Print file information
if let Err(e) = self.stat_single_file(child_ino) {
if let Err(e) = self.stat_single_file(Some(dir_inode.ino()), child_ino) {
return Err(Error::new(ErrorKind::Other, e));
}
@ -273,6 +275,36 @@ Compressed Size: {compressed_size}
Ok(None)
}
// Convert an inode number to a file path, the rafs v6 file is handled separately.
fn path_from_ino(&self, ino: u64) -> Result<PathBuf, anyhow::Error> {
let inode = self.rafs_meta.superblock.get_inode(ino, false)?;
if ino == self.rafs_meta.superblock.root_ino() {
return Ok(self
.rafs_meta
.superblock
.get_inode(ino, false)?
.name()
.into());
}
let mut file_path = PathBuf::from("");
if self.rafs_meta.meta.is_v6() && !inode.is_dir() {
self.rafs_meta.walk_dir(
self.rafs_meta.superblock.root_ino(),
None,
&mut |inode, path| {
if inode.ino() == ino {
file_path = PathBuf::from(path);
}
Ok(())
},
)?;
} else {
file_path = self.rafs_meta.path_from_ino(ino as u64)?;
};
Ok(file_path)
}
// Implement command "prefetch"
fn cmd_list_prefetch(&self) -> Result<Option<Value>, anyhow::Error> {
let mut guard = self.bootstrap.lock().unwrap();
@ -283,7 +315,7 @@ Compressed Size: {compressed_size}
let o = if self.request_mode {
let mut value = json!([]);
for ino in prefetch_inos {
let path = self.rafs_meta.path_from_ino(ino as u64)?;
let path = self.path_from_ino(ino as u64)?;
let v = json!({"inode": ino, "path": path});
value.as_array_mut().unwrap().push(v);
}
@ -294,7 +326,7 @@ Compressed Size: {compressed_size}
self.rafs_meta.meta.prefetch_table_entries
);
for ino in prefetch_inos {
let path = self.rafs_meta.path_from_ino(ino as u64)?;
let path = self.path_from_ino(ino as u64)?;
println!(
r#"Inode Number:{inode_number:10} | Path: {path:?} "#,
path = path,
@ -362,15 +394,56 @@ Blob ID: {}
Ok(None)
}
/// Walkthrough the file tree rooted at ino, calling cb for each file or directory
/// in the tree by DFS order, including ino, please ensure ino is a directory.
fn walk_dir(
&self,
ino: u64,
parent: Option<&PathBuf>,
parent_ino: Option<u64>,
cb: &mut dyn FnMut(Option<u64>, &dyn RafsInode, &Path) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let inode = self.rafs_meta.superblock.get_inode(ino, false)?;
if !inode.is_dir() {
bail!("inode {} is not a directory", ino);
}
self.walk_dir_inner(inode.as_ref(), parent, parent_ino, cb)
}
fn walk_dir_inner(
&self,
inode: &dyn RafsInode,
parent: Option<&PathBuf>,
parent_ino: Option<u64>,
cb: &mut dyn FnMut(Option<u64>, &dyn RafsInode, &Path) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let path = if let Some(parent) = parent {
parent.join(inode.name())
} else {
PathBuf::from("/")
};
cb(parent_ino, inode, &path)?;
if !inode.is_dir() {
return Ok(());
}
let child_count = inode.get_child_count();
for idx in 0..child_count {
let child = inode.get_child_by_index(idx)?;
self.walk_dir_inner(child.as_ref(), Some(&path), Some(inode.ino()), cb)?;
}
Ok(())
}
// Implement command "icheck"
fn cmd_check_inode(&self, ino: u64) -> Result<Option<Value>, anyhow::Error> {
self.rafs_meta.walk_dir(
self.walk_dir(
self.rafs_meta.superblock.root_ino(),
None,
&mut |inode, path| {
None,
&mut |parent, inode, path| {
if inode.ino() == ino {
println!(r#"{}"#, path.to_string_lossy(),);
self.stat_single_file(ino)?;
self.stat_single_file(parent, ino)?;
}
Ok(())
},
@ -380,14 +453,41 @@ Blob ID: {}
}
impl RafsInspector {
/// Get file name of the inode, the rafs v6 file is handled separately.
fn get_file_name(&self, parent_inode: &dyn RafsInode, inode: &dyn RafsInode) -> OsString {
let mut filename = OsString::from("");
if self.rafs_meta.meta.is_v6() && !inode.is_dir() {
parent_inode
.walk_children_inodes(
0,
&mut |_inode: Option<Arc<dyn RafsInode>>, name: OsString, cur_ino, _offset| {
if cur_ino == inode.ino() {
filename = name;
}
Ok(PostWalkAction::Continue)
},
)
.unwrap();
} else {
filename = inode.name();
}
filename
}
// print information of single file
fn stat_single_file(&self, ino: u64) -> Result<Option<Value>, anyhow::Error> {
fn stat_single_file(
&self,
parent_ino: Option<u64>,
ino: u64,
) -> Result<Option<Value>, anyhow::Error> {
// get RafsInode of current ino
let inode = self.rafs_meta.get_inode(ino, false)?;
let inode_attr = inode.get_attr();
println!(
r#"
if let Some(parent_ino) = parent_ino {
let parent = self.rafs_meta.superblock.get_inode(parent_ino, false)?;
println!(
r#"
Inode Number: {inode_number}
Name: {name:?}
Size: {size}
@ -400,19 +500,20 @@ GID: {gid}
Mtime: {mtime}
MtimeNsec: {mtime_nsec}
Blocks: {blocks}"#,
inode_number = inode.ino(),
name = inode.name(),
size = inode.size(),
parent = inode.parent(),
mode = inode_attr.mode,
permissions = Permissions::from_mode(inode_attr.mode).mode(),
nlink = inode_attr.nlink,
uid = inode_attr.uid,
gid = inode_attr.gid,
mtime = inode_attr.mtime,
mtime_nsec = inode_attr.mtimensec,
blocks = inode_attr.blocks,
);
inode_number = inode.ino(),
name = self.get_file_name(parent.as_ref(), inode.as_ref()),
size = inode.size(),
parent = parent.ino(),
mode = inode_attr.mode,
permissions = Permissions::from_mode(inode_attr.mode).mode(),
nlink = inode_attr.nlink,
uid = inode_attr.uid,
gid = inode_attr.gid,
mtime = inode_attr.mtime,
mtime_nsec = inode_attr.mtimensec,
blocks = inode_attr.blocks,
);
}
Ok(None)
}

View File

@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
#![deny(warnings)]
#[macro_use(crate_authors, crate_version)]
#[macro_use(crate_authors)]
extern crate clap;
#[macro_use]
extern crate anyhow;
@ -331,11 +331,18 @@ fn prepare_cmd_args(bti_string: String) -> ArgMatches<'static> {
.subcommand(
SubCommand::with_name("merge")
.about("Merge multiple bootstraps into a overlaid bootstrap")
.arg(
Arg::with_name("parent-bootstrap")
.long("parent-bootstrap")
.help("File path of the parent/referenced RAFS metadata blob (optional)")
.required(false)
.takes_value(true),
)
.arg(
Arg::with_name("bootstrap")
.long("bootstrap")
.short("B")
.help("output path of nydus overlaid bootstrap")
.help("Output path of nydus overlaid bootstrap")
.required(true)
.takes_value(true),
)
@ -542,7 +549,7 @@ fn init_log(matches: &ArgMatches) -> Result<()> {
}
fn main() -> Result<()> {
let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
let (bti_string, build_info) = BuildTimeInfo::dump();
let cmd = prepare_cmd_args(bti_string);
@ -704,8 +711,12 @@ impl Command {
prefetch: Self::get_prefetch(matches)?,
..Default::default()
};
let parent_bootstrap_path = matches.value_of("parent-bootstrap");
let output = Merger::merge(
&mut ctx,
parent_bootstrap_path,
source_bootstrap_paths,
target_bootstrap_path.to_path_buf(),
chunk_dict_path,

View File

@ -2,7 +2,7 @@
//
// SPDX-License-Identifier: Apache-2.0
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
@ -61,6 +61,7 @@ impl Merger {
pub fn merge(
ctx: &mut BuildContext,
parent_bootstrap_path: Option<&str>,
sources: Vec<PathBuf>,
target: PathBuf,
chunk_dict: Option<PathBuf>,
@ -73,6 +74,22 @@ impl Merger {
let mut blob_mgr = BlobManager::new();
let mut flags: Option<Flags> = None;
// Load parent bootstrap
let mut blob_idx_map = HashMap::new();
let mut parent_layers = 0;
if let Some(parent_bootstrap_path) = &parent_bootstrap_path {
let rs = RafsSuper::load_from_metadata(parent_bootstrap_path, RafsMode::Direct, true)
.context(format!("load parent bootstrap {:?}", parent_bootstrap_path))?;
tree = Some(Tree::from_bootstrap(&rs, &mut ())?);
let blobs = rs.superblock.get_blob_infos();
for blob in &blobs {
let blob_ctx = BlobContext::from(ctx, blob, ChunkSource::Parent);
blob_idx_map.insert(blob_ctx.blob_id.clone(), blob_mgr.len());
blob_mgr.add(blob_ctx);
}
parent_layers = blobs.len();
}
// Get the blobs come from chunk dict bootstrap.
let mut chunk_dict_blobs = HashSet::new();
if let Some(chunk_dict_path) = &chunk_dict {
@ -117,7 +134,6 @@ impl Merger {
let parent_blobs = rs.superblock.get_blob_infos();
let blob_hash = Self::get_blob_hash(bootstrap_path)?;
let mut blob_idx_map = Vec::new();
let mut parent_blob_added = false;
for blob in &parent_blobs {
@ -135,8 +151,10 @@ impl Merger {
chunk_size = Some(blob_ctx.chunk_size);
parent_blob_added = true;
}
blob_idx_map.push(blob_mgr.len() as u32);
blob_mgr.add(blob_ctx);
if !blob_idx_map.contains_key(blob.blob_id()) {
blob_idx_map.insert(blob.blob_id().to_string(), blob_mgr.len());
blob_mgr.add(blob_ctx);
}
}
if let Some(tree) = &mut tree {
@ -153,8 +171,11 @@ impl Merger {
))?;
for chunk in &mut node.chunks {
let origin_blob_index = chunk.inner.blob_index() as usize;
// Set the blob index of chunk to real index in blob table of final bootstrap.
chunk.inner.set_blob_index(blob_idx_map[origin_blob_index]);
let blob_ctx = parent_blobs[origin_blob_index].as_ref();
if let Some(blob_index) = blob_idx_map.get(blob_ctx.blob_id()) {
// Set the blob index of chunk to real index in blob table of final bootstrap.
chunk.inner.set_blob_index(*blob_index as u32);
}
}
// Set node's layer index to distinguish same inode number (from bootstrap)
// between different layers.
@ -162,7 +183,7 @@ impl Merger {
"too many layers {}, limited to {}",
layer_idx,
u16::MAX
))?;
))? + parent_layers as u16;
node.overlay = Overlay::UpperAddition;
match node.whiteout_type(WhiteoutSpec::Oci) {
Some(_) => {

View File

@ -58,39 +58,55 @@ impl CommandCache {
let metrics = client.get("v1/metrics/blobcache").await?;
let m = metrics.as_object().unwrap();
let prefetch_duration = m["prefetch_end_time_secs"].as_f64().unwrap()
+ m["prefetch_end_time_millis"].as_f64().unwrap() / 1000.0
- (m["prefetch_begin_time_secs"].as_f64().unwrap()
+ m["prefetch_begin_time_millis"].as_f64().unwrap() / 1000.0);
let prefetch_data_amount = m["prefetch_data_amount"].as_f64().unwrap();
if raw {
println!("{}", metrics);
} else {
print!(
r#"
Partial Hits: {partial_hits}
Whole Hits: {whole_hits}
Total Read: {total_read}
Directory: {directory}
Files: {files}
Prefetch Workers: {workers}
Prefetch Amount: {prefetch_amount} = {prefetch_amount_kb} KB
Prefetch Requests: {requests}
Prefetch Average Size: {avg_prefetch_size} Bytes
Prefetch Unmerged: {unmerged_blocks}
Persister Buffer: {buffered}
Partial Hits: {partial_hits}
Whole Hits: {whole_hits}
Total Read: {total_read}
Directory: {directory}
Files: {files}
Persister Buffer: {buffered}
Prefetch Workers: {workers}
Prefetch Amount: {prefetch_amount} = {prefetch_amount_kb} KB
Prefetch Requests: {requests}
Prefetch Average Size: {avg_prefetch_size} Bytes
Prefetch Duration: {prefetch_duration} Seconds
Prefetch Bandwidth: {prefetch_bandwidth} MB/S
Prefetch Request Latency: {prefetch_request_latency} Seconds
Prefetch Unmerged: {unmerged_blocks}
"#,
partial_hits = m["partial_hits"],
whole_hits = m["whole_hits"],
total_read = m["total"],
prefetch_amount = m["prefetch_data_amount"],
prefetch_amount_kb = m["prefetch_data_amount"].as_u64().unwrap() / 1024,
prefetch_amount = prefetch_data_amount,
prefetch_amount_kb = prefetch_data_amount / 1024.0,
files = m["underlying_files"],
directory = m["store_path"],
requests = m["prefetch_mr_count"],
requests = m["prefetch_requests_count"],
avg_prefetch_size = m["prefetch_data_amount"]
.as_u64()
.unwrap()
.checked_div(m["prefetch_mr_count"].as_u64().unwrap())
.checked_div(m["prefetch_requests_count"].as_u64().unwrap())
.unwrap_or_default(),
workers = m["prefetch_workers"],
unmerged_blocks = m["prefetch_unmerged_chunks"],
buffered = m["buffered_backend_size"],
prefetch_duration = prefetch_duration,
prefetch_bandwidth = prefetch_data_amount / 1024.0 / 1024.0 / prefetch_duration,
prefetch_request_latency = m["prefetch_cumulative_time_millis"].as_f64().unwrap()
/ m["prefetch_requests_count"].as_f64().unwrap()
/ 1000.0
);
}
@ -386,31 +402,29 @@ Commit: {git_commit}
if let Some(b) = i.get("backend_collection") {
if let Some(fs_backends) = b.as_object() {
if !fs_backends.is_empty() {
println!("Backend list:")
println!("Instances:")
}
for (mount_point, backend_obj) in fs_backends {
let backend: FsBackendDesc =
serde_json::from_value(backend_obj.clone()).unwrap();
println!(" {}", mount_point);
println!(" type: {}", backend.backend_type);
println!(" mountpoint: {}", backend.mountpoint);
println!(" mounted_time: {}", backend.mounted_time);
println!("\tInstance Mountpoint: {}", mount_point);
println!("\tType: {}", backend.backend_type);
println!("\tMounted Time: {}", backend.mounted_time);
match backend.backend_type {
FsBackendType::PassthroughFs => {}
FsBackendType::Rafs => {
let fs: RafsConfig =
let cfg: RafsConfig =
serde_json::from_value(backend.config.unwrap().clone())
.unwrap();
print!(
r#" Mode: {meta_mode}
Prefetch: {enabled}
Prefetch Merging Size: {merging_size}
"#,
meta_mode = fs.mode,
enabled = fs.fs_prefetch.enable,
merging_size = fs.fs_prefetch.merging_size,
println!("\tMode: {}", cfg.mode);
println!("\tPrefetch: {}", cfg.fs_prefetch.enable);
println!(
"\tPrefetch Merging Size: {}",
cfg.fs_prefetch.merging_size
);
println!();
}
}
}

View File

@ -4,7 +4,7 @@
#![deny(warnings)]
#[macro_use(crate_authors, crate_version)]
#[macro_use(crate_authors)]
extern crate clap;
#[macro_use]
extern crate anyhow;
@ -25,11 +25,13 @@ mod commands;
use commands::{
CommandBackend, CommandCache, CommandDaemon, CommandFsStats, CommandMount, CommandUmount,
};
use nydus_app::BuildTimeInfo;
#[tokio::main]
async fn main() -> Result<()> {
let (_, build_info) = BuildTimeInfo::dump();
let app = App::new("A client to query and configure the nydusd daemon\n")
.version(crate_version!())
.version(build_info.package_ver.as_str())
.author(crate_authors!())
.arg(
Arg::with_name("sock")

View File

@ -5,7 +5,6 @@
// SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
#![deny(warnings)]
#![allow(dead_code)]
#[macro_use(crate_version)]
extern crate clap;
#[macro_use]
extern crate log;
@ -146,14 +145,6 @@ impl DaemonController {
let daemon = self.daemon.lock().unwrap().take();
if let Some(d) = daemon {
/*
// TODO: fix the behavior
if cfg!(feature = "virtiofs") {
// In case of virtiofs, mechanism to unblock recvmsg() from VMM is lacked.
// Given the fact that we have nothing to clean up, directly exit seems fine.
process::exit(0);
}
*/
if let Err(e) = d.stop() {
error!("failed to stop daemon: {}", e);
}
@ -263,7 +254,7 @@ fn append_fuse_options(app: App<'static, 'static>) -> App<'static, 'static> {
Arg::with_name("threads")
.long("thread-num")
.short("T")
.default_value("1")
.default_value("4")
.help("Number of worker threads to serve IO requests")
.takes_value(true)
.required(false)
@ -707,7 +698,7 @@ fn process_singleton_arguments(
}
fn main() -> Result<()> {
let (bti_string, bti) = BuildTimeInfo::dump(crate_version!());
let (bti_string, bti) = BuildTimeInfo::dump();
let cmd_options = prepare_commandline_options().version(bti_string.as_str());
let args = cmd_options.clone().get_matches();
let logging_file = args.value_of("log-file").map(|l| l.into());
@ -722,7 +713,11 @@ fn main() -> Result<()> {
setup_logging(logging_file, level, rotation_size)?;
dump_program_info(crate_version!());
// Initialize and run the daemon controller event loop.
nydus_app::signal::register_signal_handler(signal::SIGINT, sig_exit);
nydus_app::signal::register_signal_handler(signal::SIGTERM, sig_exit);
dump_program_info();
handle_rlimit_nofile_option(&args, "rlimit-nofile")?;
match args.subcommand_name() {
@ -759,10 +754,6 @@ fn main() -> Result<()> {
let mut api_controller = ApiServerController::new(apisock);
api_controller.start()?;
// Initialize and run the daemon controller event loop.
nydus_app::signal::register_signal_handler(signal::SIGINT, sig_exit);
nydus_app::signal::register_signal_handler(signal::SIGTERM, sig_exit);
// Run the main event loop
if DAEMON_CONTROLLER.is_active() {
DAEMON_CONTROLLER.run_loop();

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-storage"
version = "0.5.0"
version = "0.5.1"
description = "Storage subsystem for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -12,7 +12,6 @@ use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use arc_swap::ArcSwapOption;
use log::{max_level, Level};
use reqwest::header::{HeaderName, HeaderValue};
@ -134,7 +133,7 @@ impl<R: Read + Send + 'static> Read for Progress<R> {
/// HTTP request data to send to server.
#[derive(Clone)]
pub enum ReqBody<R> {
pub enum ReqBody<R: Clone> {
Read(Progress<R>, usize),
Buf(Vec<u8>),
Form(HashMap<String, String>),
@ -222,30 +221,42 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Re
#[derive(Debug)]
pub(crate) struct Connection {
client: Client,
proxy: Option<Proxy>,
mirror_state: MirrorState,
shutdown: AtomicBool,
}
#[derive(Debug)]
pub(crate) struct MirrorState {
mirrors: Vec<Arc<Mirror>>,
/// Current mirror, None if there is no mirror available.
current: ArcSwapOption<Mirror>,
proxy: Option<Arc<Proxy>>,
pub mirrors: Vec<Arc<Mirror>>,
pub shutdown: AtomicBool,
}
#[derive(Debug)]
pub(crate) struct Mirror {
/// Information for mirror from configuration file.
config: MirrorConfig,
pub config: MirrorConfig,
/// Mirror status, it will be set to false by atomic operation when mirror is not work.
status: AtomicBool,
/// Falied times for mirror, the status will be marked as false when failed_times = failure_limit.
/// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit.
failed_times: AtomicU8,
/// Failed limit for mirror.
/// Failure count for which mirror is considered unavailable.
failure_limit: u8,
}
impl Mirror {
/// Convert original URL to mirror URL.
fn mirror_url(&self, url: &str) -> ConnectionResult<Url> {
let mirror_host = Url::parse(self.config.host.as_ref()).map_err(ConnectionError::Url)?;
let mut current_url = Url::parse(url).map_err(ConnectionError::Url)?;
current_url
.set_scheme(mirror_host.scheme())
.map_err(|_| ConnectionError::Scheme)?;
current_url
.set_host(mirror_host.host_str())
.map_err(|_| ConnectionError::Host)?;
current_url
.set_port(mirror_host.port())
.map_err(|_| ConnectionError::Port)?;
Ok(current_url)
}
}
impl Connection {
/// Create a new connection according to the configuration.
pub fn new(config: &ConnectionConfig) -> Result<Arc<Connection>> {
@ -258,13 +269,13 @@ impl Connection {
} else {
None
};
Some(Proxy {
Some(Arc::new(Proxy {
client: Self::build_connection(&config.proxy.url, config)?,
health: ProxyHealth::new(config.proxy.check_interval, ping_url),
fallback: config.proxy.fallback,
use_http: config.proxy.use_http,
replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
})
}))
} else {
None
};
@ -275,34 +286,34 @@ impl Connection {
mirrors.push(Arc::new(Mirror {
config: mirror_config.clone(),
status: AtomicBool::from(true),
// Maybe read from configuration file
failure_limit: 5,
failed_times: AtomicU8::from(0),
failure_limit: mirror_config.failure_limit,
}));
}
}
let current = if let Some(first_mirror) = mirrors.first() {
ArcSwapOption::from(Some(first_mirror.clone()))
} else {
ArcSwapOption::from(None)
};
let connection = Arc::new(Connection {
client,
proxy,
mirror_state: MirrorState { mirrors, current },
mirrors,
shutdown: AtomicBool::new(false),
});
if let Some(proxy) = &connection.proxy {
if proxy.health.ping_url.is_some() {
let conn = connection.clone();
let connect_timeout = config.connect_timeout;
// Start proxy's health checking thread.
connection.start_proxy_health_thread(config.connect_timeout as u64);
// Start mirrors' health checking thread.
connection.start_mirrors_health_thread(config.timeout as u64);
Ok(connection)
}
fn start_proxy_health_thread(&self, connect_timeout: u64) {
if let Some(proxy) = self.proxy.as_ref() {
if proxy.health.ping_url.is_some() {
let proxy = proxy.clone();
// Spawn thread to update the health status of proxy server
thread::spawn(move || {
let proxy = conn.proxy.as_ref().unwrap();
let ping_url = proxy.health.ping_url.as_ref().unwrap();
let mut last_success = true;
@ -333,20 +344,60 @@ impl Connection {
proxy.health.set(false)
});
if conn.shutdown.load(Ordering::Acquire) {
break;
}
thread::sleep(proxy.health.check_interval);
if conn.shutdown.load(Ordering::Acquire) {
break;
}
}
});
}
}
// TODO: check mirrors' health
}
Ok(connection)
fn start_mirrors_health_thread(&self, timeout: u64) {
for mirror in self.mirrors.iter() {
let mirror_cloned = mirror.clone();
thread::spawn(move || {
let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() {
format!("{}/v2", mirror_cloned.config.host)
} else {
mirror_cloned.config.ping_url.clone()
};
info!("Mirror health checking url: {}", mirror_health_url);
let client = Client::new();
loop {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"Mirror server {} unhealthy, try to recover",
mirror_cloned.config.host
);
let _ = client
.get(mirror_health_url.as_str())
.timeout(Duration::from_secs(timeout as u64))
.send()
.map(|resp| {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!("Mirror server {} recovered", mirror_cloned.config.host);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"Mirror server {} is not recovered: {}",
mirror_cloned.config.host, e
);
});
}
thread::sleep(Duration::from_secs(
mirror_cloned.config.health_check_interval,
));
}
});
}
}
/// Shutdown the connection.
@ -354,8 +405,15 @@ impl Connection {
self.shutdown.store(true, Ordering::Release);
}
/// Send a request to server and wait for response.
pub fn call<R: Read + Send + 'static>(
/// If the auth_through is enable, all requests are send to the mirror server.
/// If the auth_through disabled, e.g. P2P/Dragonfly, we try to avoid sending
/// non-authorization request to the mirror server, which causes performance loss.
/// requesting_auth means this request is to get authorization from a server,
/// which must be a non-authorization request.
/// IOW, only the requesting_auth is false and the headers contain authorization token,
/// we send this request to mirror.
#[allow(clippy::too_many_arguments)]
pub fn call<R: Read + Clone + Send + 'static>(
&self,
method: Method,
url: &str,
@ -363,6 +421,8 @@ impl Connection {
data: Option<ReqBody<R>>,
headers: &mut HeaderMap,
catch_status: bool,
// This means the request is dedicated to authorization.
requesting_auth: bool,
) -> ConnectionResult<Response> {
if self.shutdown.load(Ordering::Acquire) {
return Err(ConnectionError::Disconnected);
@ -370,11 +430,7 @@ impl Connection {
if let Some(proxy) = &self.proxy {
if proxy.health.ok() {
let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
_ => None,
};
let data_cloned = data.as_ref().cloned();
let http_url: Option<String>;
let mut replaced_url = url;
@ -425,93 +481,83 @@ impl Connection {
}
}
let current_mirror = self.mirror_state.current.load();
if let Some(mirror) = current_mirror.as_ref() {
let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
_ => None,
};
let mirror_host = Url::parse(&mirror.config.host).map_err(ConnectionError::Url)?;
let mut current_url = Url::parse(url).map_err(ConnectionError::Url)?;
current_url
.set_scheme(mirror_host.scheme())
.map_err(|_| ConnectionError::Scheme)?;
current_url
.set_host(mirror_host.host_str())
.map_err(|_| ConnectionError::Host)?;
current_url
.set_port(mirror_host.port())
.map_err(|_| ConnectionError::Port)?;
if let Some(working_headers) = &mirror.config.headers {
for (key, value) in working_headers.iter() {
headers.insert(
HeaderName::from_str(key).unwrap(),
HeaderValue::from_str(value).unwrap(),
);
if !self.mirrors.is_empty() {
let mut fallback_due_auth = false;
for mirror in self.mirrors.iter() {
// With configuration `auth_through` disabled, we should not intend to send authentication
// request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when
// relaying non-data requests. But it's still possible that ever returned token is expired.
// So mirror might still respond us with status code UNAUTHORIZED, which should be handle
// by sending authentication request to the original registry.
//
// - For non-authentication request with token in request header, handle is as usual requests to registry.
// This request should already take token in header.
// - For authentication request
// 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler.
// 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED.
if !mirror.config.auth_through
&& (!headers.contains_key(HEADER_AUTHORIZATION) || requesting_auth)
{
fallback_due_auth = true;
break;
}
}
let result = self.call_inner(
&self.client,
method.clone(),
current_url.to_string().as_str(),
&query,
data_cloned,
headers,
catch_status,
false,
);
if mirror.status.load(Ordering::Relaxed) {
let data_cloned = data.as_ref().cloned();
match result {
Ok(resp) => {
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
return Ok(resp);
}
}
Err(err) => {
warn!(
"request mirror server failed, mirror: {}, error: {:?}",
format!("{:?}", mirror).to_lowercase(),
err
);
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
warn!(
"reach to fail limit {}, disable mirror: {}",
mirror.failure_limit,
format!("{:?}", mirror).to_lowercase()
for (key, value) in mirror.config.headers.iter() {
headers.insert(
HeaderName::from_str(key).unwrap(),
HeaderValue::from_str(value).unwrap(),
);
mirror.status.store(false, Ordering::Relaxed);
}
let mut idx = 0;
loop {
if idx == self.mirror_state.mirrors.len() {
break None;
}
let m = &self.mirror_state.mirrors[idx];
if m.status.load(Ordering::Relaxed) {
warn!(
"mirror server has been changed to {}",
format!("{:?}", m).to_lowercase()
);
break Some(m);
}
let current_url = mirror.mirror_url(url)?;
debug!("mirror server url {}", current_url);
idx += 1;
let result = self.call_inner(
&self.client,
method.clone(),
current_url.as_str(),
&query,
data_cloned,
headers,
catch_status,
false,
);
match result {
Ok(resp) => {
// If the response status >= INTERNAL_SERVER_ERROR, move to the next mirror server.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
return Ok(resp);
}
}
Err(err) => {
warn!(
"request mirror server failed, mirror: {:?}, error: {:?}",
mirror, err
);
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
warn!(
"reach to failure limit {}, disable mirror: {:?}",
mirror.failure_limit, mirror
);
mirror.status.store(false, Ordering::Relaxed);
}
}
.map(|m| self.mirror_state.current.store(Some(m.clone())))
.unwrap_or_else(|| self.mirror_state.current.store(None));
}
}
// Remove mirror-related headers to avoid sending them to the next mirror server and original registry.
for (key, _) in mirror.config.headers.iter() {
headers.remove(HeaderName::from_str(key).unwrap());
}
}
if !fallback_due_auth {
warn!("Request to all mirror server failed, fallback to original server.");
}
warn!("Failed to request mirror server, fallback to original server.");
}
self.call_inner(
@ -555,7 +601,7 @@ impl Connection {
}
#[allow(clippy::too_many_arguments)]
fn call_inner<R: Read + Send + 'static>(
fn call_inner<R: Read + Clone + Send + 'static>(
&self,
client: &Client,
method: Method,

View File

@ -13,10 +13,13 @@
//! The [LocalFs](localfs/struct.LocalFs.html) storage backend supports backend level data
//! prefetching, which is to load data into page cache.
use std::sync::Arc;
use std::{sync::Arc, time::Duration};
use fuse_backend_rs::file_buf::FileVolatileSlice;
use nydus_utils::metrics::{BackendMetrics, ERROR_HOLDER};
use nydus_utils::{
metrics::{BackendMetrics, ERROR_HOLDER},
DelayType, Delayer,
};
use crate::utils::{alloc_buf, copyv};
use crate::StorageError;
@ -75,6 +78,8 @@ pub trait BlobReader: Send + Sync {
let mut retry_count = self.retry_limit();
let begin_time = self.metrics().begin();
let mut delayer = Delayer::new(DelayType::BackOff, Duration::from_millis(500));
loop {
match self.try_read(buf, offset) {
Ok(size) => {
@ -88,6 +93,7 @@ pub trait BlobReader: Send + Sync {
err, retry_count
);
retry_count -= 1;
delayer.delay();
} else {
self.metrics().end(&begin_time, buf.len(), true);
ERROR_HOLDER

View File

@ -143,7 +143,15 @@ impl BlobReader for OssReader {
let resp = self
.connection
.call::<&[u8]>(Method::HEAD, url.as_str(), None, None, &mut headers, true)
.call::<&[u8]>(
Method::HEAD,
url.as_str(),
None,
None,
&mut headers,
true,
false,
)
.map_err(OssError::Request)?;
let content_length = resp
.headers()
@ -178,7 +186,15 @@ impl BlobReader for OssReader {
// Safe because the the call() is a synchronous operation.
let mut resp = self
.connection
.call::<&[u8]>(Method::GET, url.as_str(), None, None, &mut headers, true)
.call::<&[u8]>(
Method::GET,
url.as_str(),
None,
None,
&mut headers,
true,
false,
)
.map_err(OssError::Request)?;
Ok(resp
.copy_to(&mut buf)

View File

@ -4,9 +4,15 @@
//! Storage backend driver to access blobs on container image registry.
use std::collections::HashMap;
use std::io::{Error, Read, Result};
use std::error::Error;
use std::fmt::Display;
use std::io::{Read, Result};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use arc_swap::ArcSwapOption;
use reqwest::blocking::Response;
pub use reqwest::header::HeaderMap;
use reqwest::header::{HeaderValue, CONTENT_LENGTH};
@ -39,7 +45,7 @@ pub enum RegistryError {
Scheme(String),
Auth(String),
ResponseHead(String),
Response(Error),
Response(std::io::Error),
Transport(reqwest::Error),
}
@ -102,6 +108,12 @@ impl HashCache {
#[derive(Clone, serde::Deserialize)]
struct TokenResponse {
token: String,
#[serde(default = "default_expires_in")]
expires_in: u64,
}
fn default_expires_in() -> u64 {
10 * 60
}
#[derive(Debug)]
@ -110,7 +122,7 @@ struct BasicAuth {
realm: String,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct BearerAuth {
realm: String,
service: String,
@ -124,9 +136,27 @@ enum Auth {
Bearer(BearerAuth),
}
pub struct Scheme(AtomicBool);
impl Scheme {
fn new(value: bool) -> Self {
Scheme(AtomicBool::new(value))
}
}
impl Display for Scheme {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.0.load(Ordering::Relaxed) {
write!(f, "https")
} else {
write!(f, "http")
}
}
}
struct RegistryState {
// HTTP scheme like: https, http
scheme: String,
scheme: Scheme,
host: String,
// Image repo name like: library/ubuntu
repo: String,
@ -149,6 +179,11 @@ struct RegistryState {
// Cache 30X redirect url
// Example: RwLock<HashMap<"<blob_id>", "<redirected_url>">>
cached_redirect: HashCache,
// The expiration time of the token, which is obtained from the registry server.
refresh_token_time: ArcSwapOption<u64>,
// Cache bearer auth for refreshing token.
cached_bearer_auth: ArcSwapOption<BearerAuth>,
}
impl RegistryState {
@ -165,6 +200,29 @@ impl RegistryState {
Ok(url.to_string())
}
fn needs_fallback_http(&self, e: &dyn Error) -> bool {
match e.source() {
Some(err) => match err.source() {
Some(err) => {
if !self.scheme.0.load(Ordering::Relaxed) {
return false;
}
let msg = err.to_string().to_lowercase();
// As far as we can observe, if we try to establish a tls connection
// with the http registry server, we will encounter this type of error:
// https://github.com/openssl/openssl/blob/6b3d28757620e0781bb1556032bb6961ee39af63/crypto/err/openssl.txt#L1574
let fallback = msg.contains("wrong version number");
if fallback {
warn!("fallback to http due to tls connection error: {}", err);
}
fallback
}
None => false,
},
None => false,
}
}
/// Request registry authentication server to get bearer token
fn get_token(&self, auth: BearerAuth, connection: &Arc<Connection>) -> Result<String> {
// The information needed for getting token needs to be placed both in
@ -198,6 +256,7 @@ impl RegistryState {
Some(ReqBody::Form(form)),
&mut headers,
true,
true,
)
.map_err(|e| einval!(format!("registry auth server request failed {:?}", e)))?;
let ret: TokenResponse = token_resp.json().map_err(|e| {
@ -206,6 +265,18 @@ impl RegistryState {
e
))
})?;
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
self.refresh_token_time
.store(Some(Arc::new(now_timestamp.as_secs() + ret.expires_in)));
info!(
"cached bearer auth, next time: {}",
now_timestamp.as_secs() + ret.expires_in
);
}
// Cache bearer auth for refreshing token.
self.cached_bearer_auth.store(Some(Arc::new(auth)));
Ok(ret.token)
}
@ -276,6 +347,10 @@ impl RegistryState {
_ => None,
}
}
fn fallback_http(&self) {
self.scheme.0.store(false, Ordering::Relaxed);
}
}
struct RegistryReader {
@ -312,7 +387,7 @@ impl RegistryReader {
/// Request: POST https://my-registry.com/test/repo/blobs/uploads
/// header: authorization: Basic base64(<username:password>)
/// Response: status: 200 Ok
fn request<R: Read + Send + 'static>(
fn request<R: Read + Clone + Send + 'static>(
&self,
method: Method,
url: &str,
@ -336,16 +411,40 @@ impl RegistryReader {
if let Some(data) = data {
return self
.connection
.call(method, url, None, Some(data), &mut headers, catch_status)
.call(
method,
url,
None,
Some(data),
&mut headers,
catch_status,
false,
)
.map_err(RegistryError::Request);
}
// Try to request registry server with `authorization` header
let resp = self
let mut resp = self
.connection
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false)
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false, false)
.map_err(RegistryError::Request)?;
if resp.status() == StatusCode::UNAUTHORIZED {
if headers.contains_key(HEADER_AUTHORIZATION) {
// If we request registry (harbor server) with expired authorization token,
// the `www-authenticate: Basic realm="harbor"` in response headers is not expected.
// Related code in harbor:
// https://github.com/goharbor/harbor/blob/v2.5.3/src/server/middleware/v2auth/auth.go#L98
//
// We can remove the expired authorization token and
// resend the request to get the correct "www-authenticate" value.
headers.remove(HEADER_AUTHORIZATION);
resp = self
.connection
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false, false)
.map_err(RegistryError::Request)?;
};
if let Some(resp_auth_header) = resp.headers().get(HEADER_WWW_AUTHENTICATE) {
// Get token from registry authorization server
if let Some(auth) = RegistryState::parse_auth(resp_auth_header, &self.state.auth) {
@ -353,6 +452,7 @@ impl RegistryReader {
.state
.get_auth_header(auth, &self.connection)
.map_err(|e| RegistryError::Common(e.to_string()))?;
headers.insert(
HEADER_AUTHORIZATION,
HeaderValue::from_str(auth_header.as_str()).unwrap(),
@ -361,7 +461,7 @@ impl RegistryReader {
// Try to request registry server with `authorization` header again
let resp = self
.connection
.call(method, url, None, data, &mut headers, catch_status)
.call(method, url, None, data, &mut headers, catch_status, false)
.map_err(RegistryError::Request)?;
let status = resp.status();
@ -417,6 +517,7 @@ impl RegistryReader {
None,
&mut headers,
false,
false,
)
.map_err(RegistryError::Request)?;
@ -433,8 +534,28 @@ impl RegistryReader {
return self._try_read(buf, offset, false);
}
} else {
resp =
self.request::<&[u8]>(Method::GET, url.as_str(), None, headers.clone(), false)?;
resp = match self.request::<&[u8]>(
Method::GET,
url.as_str(),
None,
headers.clone(),
false,
) {
Ok(res) => res,
Err(RegistryError::Request(ConnectionError::Common(e)))
if self.state.needs_fallback_http(&e) =>
{
self.state.fallback_http();
let url = self
.state
.url(format!("/blobs/sha256:{}", self.blob_id).as_str(), &[])
.map_err(RegistryError::Url)?;
self.request::<&[u8]>(Method::GET, url.as_str(), None, headers.clone(), false)?
}
Err(e) => {
return Err(e);
}
};
let status = resp.status();
// Handle redirect request and cache redirect url
@ -473,6 +594,7 @@ impl RegistryReader {
None,
&mut headers,
true,
false,
)
.map_err(RegistryError::Request);
match resp_ret {
@ -504,8 +626,24 @@ impl BlobReader for RegistryReader {
.state
.url(&format!("/blobs/sha256:{}", self.blob_id), &[])
.map_err(RegistryError::Url)?;
let resp =
self.request::<&[u8]>(Method::HEAD, url.as_str(), None, HeaderMap::new(), true)?;
match self.request::<&[u8]>(Method::HEAD, url.as_str(), None, HeaderMap::new(), true) {
Ok(res) => res,
Err(RegistryError::Request(ConnectionError::Common(e)))
if self.state.needs_fallback_http(&e) =>
{
self.state.fallback_http();
let url = self
.state
.url(format!("/blobs/sha256:{}", self.blob_id).as_str(), &[])
.map_err(RegistryError::Url)?;
self.request::<&[u8]>(Method::HEAD, url.as_str(), None, HeaderMap::new(), true)?
}
Err(e) => {
return Err(BackendError::Registry(e));
}
};
let content_length = resp
.headers()
.get(CONTENT_LENGTH)
@ -565,8 +703,14 @@ impl Registry {
Cache::new(String::new())
};
let scheme = if !config.scheme.is_empty() && config.scheme == "http" {
Scheme::new(false)
} else {
Scheme::new(true)
};
let state = Arc::new(RegistryState {
scheme: config.scheme,
scheme,
host: config.host,
repo: config.repo,
auth,
@ -577,13 +721,27 @@ impl Registry {
blob_url_scheme: config.blob_url_scheme,
blob_redirected_host: config.blob_redirected_host,
cached_redirect: HashCache::new(),
refresh_token_time: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
});
Ok(Registry {
let mirrors = connection.mirrors.clone();
let registry = Registry {
connection,
state,
metrics: BackendMetrics::new(id, "registry"),
})
};
for mirror in mirrors.iter() {
if !mirror.config.auth_through {
registry.start_refresh_token_thread();
info!("Refresh token thread started.");
break;
}
}
Ok(registry)
}
fn get_authorization_info(auth: &Option<String>) -> Result<(String, String)> {
@ -610,6 +768,50 @@ impl Registry {
Ok((String::new(), String::new()))
}
}
fn start_refresh_token_thread(&self) {
let conn = self.connection.clone();
let state = self.state.clone();
// The default refresh token internal is 10 minutes.
let refresh_check_internal = 10 * 60;
thread::spawn(move || {
loop {
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
if let Some(next_refresh_timestamp) = state.refresh_token_time.load().as_deref()
{
// If the token will expire in next refresh check internal, get new token now.
// Add 20 seconds to handle critical cases.
if now_timestamp.as_secs() + refresh_check_internal + 20
>= *next_refresh_timestamp
{
if let Some(cached_bearer_auth) =
state.cached_bearer_auth.load().as_deref()
{
if let Ok(token) =
state.get_token(cached_bearer_auth.to_owned(), &conn)
{
let new_cached_auth = format!("Bearer {}", token);
info!("Authorization token for registry has been refreshed.");
// Refresh authorization token
state
.cached_auth
.set(&state.cached_auth.get(), new_cached_auth);
}
}
}
}
}
if conn.shutdown.load(Ordering::Acquire) {
break;
}
thread::sleep(Duration::from_secs(refresh_check_internal));
if conn.shutdown.load(Ordering::Acquire) {
break;
}
}
});
}
}
impl BlobBackend for Registry {
@ -683,8 +885,8 @@ mod tests {
#[test]
fn test_state_url() {
let mut state = RegistryState {
scheme: "http".to_string(),
let state = RegistryState {
scheme: Scheme::new(false),
host: "alibaba-inc.com".to_string(),
repo: "nydus".to_string(),
auth: None,
@ -695,6 +897,8 @@ mod tests {
blob_redirected_host: "oss.alibaba-inc.com".to_string(),
cached_auth: Default::default(),
cached_redirect: Default::default(),
refresh_token_time: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
};
assert_eq!(
@ -705,9 +909,6 @@ mod tests {
state.url("image", &[]).unwrap(),
"http://alibaba-inc.com/v2/nydusimage".to_owned()
);
state.scheme = "unknown_schema".to_owned();
assert!(state.url("image", &[]).is_err());
}
#[test]

View File

@ -22,7 +22,7 @@ use fuse_backend_rs::file_buf::FileVolatileSlice;
use nix::sys::uio;
use nix::unistd::dup;
use nydus_utils::metrics::{BlobcacheMetrics, Metric};
use nydus_utils::{compress, digest};
use nydus_utils::{compress, digest, DelayType, Delayer};
use tokio::runtime::Runtime;
use crate::backend::BlobReader;
@ -37,8 +37,8 @@ use crate::meta::{BlobMetaChunk, BlobMetaInfo};
use crate::utils::{alloc_buf, copyv, readv, MemSliceCursor};
use crate::{StorageError, StorageResult, RAFS_DEFAULT_CHUNK_SIZE};
const DOWNLOAD_META_RETRY_COUNT: u32 = 20;
const DOWNLOAD_META_RETRY_DELAY: u64 = 500;
const DOWNLOAD_META_RETRY_COUNT: u32 = 5;
const DOWNLOAD_META_RETRY_DELAY: u64 = 400;
#[derive(Default, Clone)]
pub(crate) struct FileCacheMeta {
@ -60,6 +60,10 @@ impl FileCacheMeta {
std::thread::spawn(move || {
let mut retry = 0;
let mut delayer = Delayer::new(
DelayType::BackOff,
Duration::from_millis(DOWNLOAD_META_RETRY_DELAY),
);
while retry < DOWNLOAD_META_RETRY_COUNT {
match BlobMetaInfo::new(&blob_file, &blob_info, reader.as_ref()) {
Ok(m) => {
@ -68,7 +72,7 @@ impl FileCacheMeta {
}
Err(e) => {
info!("temporarily failed to get blob.meta, {}", e);
std::thread::sleep(Duration::from_millis(DOWNLOAD_META_RETRY_DELAY));
delayer.delay();
retry += 1;
}
}
@ -410,7 +414,7 @@ impl BlobObject for FileCacheEntry {
}
fn fetch_range_compressed(&self, offset: u64, size: u64) -> Result<usize> {
let meta = self.meta.as_ref().ok_or_else(|| einval!())?;
let meta = self.meta.as_ref().ok_or_else(|| enoent!())?;
let meta = meta.get_blob_meta().ok_or_else(|| einval!())?;
let chunks = meta.get_chunks_compressed(offset, size, RAFS_DEFAULT_CHUNK_SIZE * 2)?;
debug_assert!(!chunks.is_empty());
@ -579,7 +583,28 @@ impl FileCacheEntry {
}
if !bitmap.wait_for_range_ready(chunk_index, count)? {
Err(eio!("failed to read data from storage backend"))
if prefetch {
return Err(eio!("failed to read data from storage backend"));
}
// if we are in ondemand path, retry for the timeout chunks
for chunk in chunks {
if self.chunk_map.is_ready(chunk.as_ref())? {
continue;
}
info!("retry for timeout chunk, {}", chunk.id());
let mut buf = alloc_buf(chunk.uncompressed_size() as usize);
self.read_raw_chunk(chunk.as_ref(), &mut buf, false, None)
.map_err(|e| eio!(format!("read_raw_chunk failed, {:?}", e)))?;
if self.dio_enabled {
self.adjust_buffer_for_dio(&mut buf)
}
Self::persist_chunk(&self.file, chunk.uncompressed_offset(), &buf)
.map_err(|e| eio!(format!("do_fetch_chunk failed to persist data, {:?}", e)))?;
self.chunk_map
.set_ready_and_clear_pending(chunk.as_ref())
.unwrap_or_else(|e| error!("set chunk ready failed, {}", e));
}
Ok(total_size)
} else {
Ok(total_size)
}

View File

@ -5,11 +5,10 @@
use std::io::Result;
use std::num::NonZeroU32;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU32, AtomicU64, Ordering};
use std::sync::{Arc, Once};
use std::thread;
use std::time::Duration;
use tokio::time::interval;
use std::time::{Duration, SystemTime};
use governor::clock::QuantaClock;
use governor::state::{InMemoryState, NotKeyed};
@ -52,9 +51,9 @@ impl From<BlobPrefetchConfig> for AsyncPrefetchConfig {
/// Asynchronous service request message.
pub(crate) enum AsyncPrefetchMessage {
/// Asynchronous blob layer prefetch request with (offset, size) of blob on storage backend.
BlobPrefetch(Arc<dyn BlobCache>, u64, u64),
BlobPrefetch(Arc<dyn BlobCache>, u64, u64, SystemTime),
/// Asynchronous file-system layer prefetch request.
FsPrefetch(Arc<dyn BlobCache>, BlobIoRange),
FsPrefetch(Arc<dyn BlobCache>, BlobIoRange, SystemTime),
#[cfg_attr(not(test), allow(unused))]
/// Ping for test.
Ping,
@ -65,12 +64,12 @@ pub(crate) enum AsyncPrefetchMessage {
impl AsyncPrefetchMessage {
/// Create a new asynchronous filesystem prefetch request message.
pub fn new_fs_prefetch(blob_cache: Arc<dyn BlobCache>, req: BlobIoRange) -> Self {
AsyncPrefetchMessage::FsPrefetch(blob_cache, req)
AsyncPrefetchMessage::FsPrefetch(blob_cache, req, SystemTime::now())
}
/// Create a new asynchronous blob prefetch request message.
pub fn new_blob_prefetch(blob_cache: Arc<dyn BlobCache>, offset: u64, size: u64) -> Self {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size)
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size, SystemTime::now())
}
}
@ -80,6 +79,10 @@ pub(crate) struct AsyncWorkerMgr {
ping_requests: AtomicU32,
workers: AtomicU32,
active: AtomicBool,
begin_timing_once: Once,
// Limit the total retry times to avoid unnecessary resource consumption.
retry_times: AtomicI32,
prefetch_sema: Arc<Semaphore>,
prefetch_channel: Arc<Channel<AsyncPrefetchMessage>>,
@ -116,6 +119,9 @@ impl AsyncWorkerMgr {
ping_requests: AtomicU32::new(0),
workers: AtomicU32::new(0),
active: AtomicBool::new(false),
begin_timing_once: Once::new(),
retry_times: AtomicI32::new(32),
prefetch_sema: Arc::new(Semaphore::new(0)),
prefetch_channel: Arc::new(Channel::new()),
@ -169,10 +175,10 @@ impl AsyncWorkerMgr {
pub fn flush_pending_prefetch_requests(&self, blob_id: &str) {
self.prefetch_channel
.flush_pending_prefetch_requests(|t| match t {
AsyncPrefetchMessage::BlobPrefetch(blob, _, _) => {
AsyncPrefetchMessage::BlobPrefetch(blob, _, _, _) => {
blob_id == blob.blob_id() && !blob.is_prefetch_active()
}
AsyncPrefetchMessage::FsPrefetch(blob, _) => {
AsyncPrefetchMessage::FsPrefetch(blob, _, _) => {
blob_id == blob.blob_id() && !blob.is_prefetch_active()
}
_ => false,
@ -228,6 +234,17 @@ impl AsyncWorkerMgr {
}
async fn handle_prefetch_requests(mgr: Arc<AsyncWorkerMgr>, rt: &Runtime) {
mgr.begin_timing_once.call_once(|| {
let now = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
mgr.metrics.prefetch_begin_time_secs.set(now.as_secs());
mgr.metrics
.prefetch_begin_time_millis
.set(now.subsec_millis() as u64);
});
// Max 1 active requests per thread.
mgr.prefetch_sema.add_permits(1);
@ -236,7 +253,7 @@ impl AsyncWorkerMgr {
let mgr2 = mgr.clone();
match msg {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size) => {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, offset, size, begin_time) => {
let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())
.await
.unwrap();
@ -247,19 +264,25 @@ impl AsyncWorkerMgr {
blob_cache,
offset,
size,
begin_time,
);
drop(token);
});
}
}
AsyncPrefetchMessage::FsPrefetch(blob_cache, req) => {
AsyncPrefetchMessage::FsPrefetch(blob_cache, req, begin_time) => {
let token = Semaphore::acquire_owned(mgr2.prefetch_sema.clone())
.await
.unwrap();
if blob_cache.is_prefetch_active() {
rt.spawn_blocking(move || {
let _ = Self::handle_fs_prefetch_request(mgr2.clone(), blob_cache, req);
let _ = Self::handle_fs_prefetch_request(
mgr2.clone(),
blob_cache,
req,
begin_time,
);
drop(token)
});
}
@ -278,14 +301,14 @@ impl AsyncWorkerMgr {
// Allocate network bandwidth budget
if let Some(limiter) = &self.prefetch_limiter {
let size = match msg {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, _offset, size) => {
AsyncPrefetchMessage::BlobPrefetch(blob_cache, _offset, size, _) => {
if blob_cache.is_prefetch_active() {
*size
} else {
0
}
}
AsyncPrefetchMessage::FsPrefetch(blob_cache, req) => {
AsyncPrefetchMessage::FsPrefetch(blob_cache, req, _) => {
if blob_cache.is_prefetch_active() {
req.blob_size
} else {
@ -317,6 +340,7 @@ impl AsyncWorkerMgr {
cache: Arc<dyn BlobCache>,
offset: u64,
size: u64,
begin_time: SystemTime,
) -> Result<()> {
trace!(
"storage: prefetch blob {} offset {} size {}",
@ -328,6 +352,13 @@ impl AsyncWorkerMgr {
return Ok(());
}
// Record how much prefetch data is requested from storage backend.
// So the average backend merged request size will be prefetch_data_amount/prefetch_requests_count.
// We can measure merging possibility by this.
let metrics = mgr.metrics.clone();
metrics.prefetch_requests_count.inc();
metrics.prefetch_data_amount.add(size);
if let Some(obj) = cache.get_blob_object() {
if let Err(e) = obj.fetch_range_compressed(offset, size) {
warn!(
@ -338,17 +369,22 @@ impl AsyncWorkerMgr {
e
);
ASYNC_RUNTIME.spawn(async move {
let mut interval = interval(Duration::from_secs(1));
interval.tick().await;
let msg = AsyncPrefetchMessage::new_blob_prefetch(cache.clone(), offset, size);
let _ = mgr.send_prefetch_message(msg);
});
if mgr.retry_times.load(Ordering::Relaxed) > 0 {
mgr.retry_times.fetch_sub(1, Ordering::Relaxed);
ASYNC_RUNTIME.spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let msg =
AsyncPrefetchMessage::new_blob_prefetch(cache.clone(), offset, size);
let _ = mgr.send_prefetch_message(msg);
});
}
}
} else {
warn!("prefetch blob range is not supported");
}
metrics.calculate_prefetch_metrics(begin_time);
Ok(())
}
@ -361,6 +397,7 @@ impl AsyncWorkerMgr {
mgr: Arc<AsyncWorkerMgr>,
cache: Arc<dyn BlobCache>,
req: BlobIoRange,
begin_time: SystemTime,
) -> Result<()> {
let blob_offset = req.blob_offset;
let blob_size = req.blob_size;
@ -375,9 +412,9 @@ impl AsyncWorkerMgr {
}
// Record how much prefetch data is requested from storage backend.
// So the average backend merged request size will be prefetch_data_amount/prefetch_mr_count.
// So the average backend merged request size will be prefetch_data_amount/prefetch_requests_count.
// We can measure merging possibility by this.
mgr.metrics.prefetch_mr_count.inc();
mgr.metrics.prefetch_requests_count.inc();
mgr.metrics.prefetch_data_amount.add(blob_size);
if let Some(obj) = cache.get_blob_object() {
@ -386,6 +423,8 @@ impl AsyncWorkerMgr {
cache.prefetch_range(&req)?;
}
mgr.metrics.calculate_prefetch_metrics(begin_time);
Ok(())
}

13
tests/bats/Makefile Normal file
View File

@ -0,0 +1,13 @@
ifneq (,$(wildcard /usr/lib/os-release))
include /usr/lib/os-release
else
include /etc/os-release
endif
ci:
bash -f ./install_bats.sh
bats --show-output-of-passing-tests --formatter tap build_docker_image.bats
bats --show-output-of-passing-tests --formatter tap compile_nydusd.bats
bats --show-output-of-passing-tests --formatter tap compile_ctr_remote.bats
bats --show-output-of-passing-tests --formatter tap compile_nydus_snapshotter.bats
bats --show-output-of-passing-tests --formatter tap run_container_with_rafs.bats

View File

@ -0,0 +1,34 @@
#!/usr/bin/bats
load "${BATS_TEST_DIRNAME}/common_tests.sh"
setup() {
dockerfile="/tmp/rust_golang_dockerfile"
cat > $dockerfile <<EOF
FROM rust:${rust_toolchain}
RUN apt-get update -y \
&& apt-get install -y cmake g++ pkg-config jq libcurl4-openssl-dev libelf-dev libdw-dev binutils-dev libiberty-dev musl-tools \
&& rustup component add rustfmt clippy \
&& rm -rf /var/lib/apt/lists/*
# install golang env
Run wget https://go.dev/dl/go1.19.linux-amd64.tar.gz \
&& tar -C /usr/local -xzf go1.19.linux-amd64.tar.gz \
&& rm -rf go1.19.linux-amd64.tar.gz
ENV PATH \$PATH:/usr/local/go/bin
RUN go env -w GO111MODULE=on
RUN go env -w GOPROXY=https://goproxy.io,direct
EOF
}
@test "build rust golang image" {
yum install -y docker
docker build -f $dockerfile -t $compile_image .
}
teardown() {
rm -f $dockerfile
}

View File

@ -0,0 +1,50 @@
repo_base_dir="${BATS_TEST_DIRNAME}/../.."
rust_toolchain=$(cat ${repo_base_dir}/rust-toolchain)
compile_image="localhost/compile-image:${rust_toolchain}"
nydus_snapshotter_repo="https://github.com/containerd/nydus-snapshotter.git"
run_nydus_snapshotter() {
rm -rf /var/lib/containerd/io.containerd.snapshotter.v1.nydus
rm -rf /var/lib/nydus/cache
cat >/tmp/nydus-erofs-config.json <<EOF
{
"type": "bootstrap",
"config": {
"backend_type": "registry",
"backend_config": {
"scheme": "https"
},
"cache_type": "fscache"
}
}
EOF
containerd-nydus-grpc --config-path /tmp/nydus-erofs-config.json --daemon-mode shared \
--fs-driver fscache --root /var/lib/containerd/io.containerd.snapshotter.v1.nydus \
--address /run/containerd/containerd-nydus-grpc.sock --nydusd /usr/local/bin/nydusd \
--log-to-stdout >${BATS_TEST_DIRNAME}/nydus-snapshotter-${BATS_TEST_NAME}.log 2>&1 &
}
config_containerd_for_nydus() {
[ -d "/etc/containerd" ] || mkdir -p /etc/containerd
cat >/etc/containerd/config.toml <<EOF
version = 2
[plugins]
[plugins."io.containerd.grpc.v1.cri"]
[plugins."io.containerd.grpc.v1.cri".cni]
bin_dir = "/usr/lib/cni"
conf_dir = "/etc/cni/net.d"
[plugins."io.containerd.internal.v1.opt"]
path = "/var/lib/containerd/opt"
[proxy_plugins]
[proxy_plugins.nydus]
type = "snapshot"
address = "/run/containerd/containerd-nydus-grpc.sock"
[plugins."io.containerd.grpc.v1.cri".containerd]
snapshotter = "nydus"
disable_snapshot_annotations = false
EOF
systemctl restart containerd
}

View File

@ -0,0 +1,14 @@
#!/usr/bin/bats
load "${BATS_TEST_DIRNAME}/common_tests.sh"
@test "compile ctr remote" {
docker run --rm -v $repo_base_dir:/image-service $compile_image bash -c 'cd /image-service/contrib/ctr-remote && make clean && make'
if [ -f "${repo_base_dir}/contrib/ctr-remote/bin/ctr-remote" ]; then
/usr/bin/cp -f ${repo_base_dir}/contrib/ctr-remote/bin/ctr-remote /usr/local/bin/
else
echo "cannot find ctr-remote binary"
exit 1
fi
}

View File

@ -0,0 +1,25 @@
#!/usr/bin/bats
load "${BATS_TEST_DIRNAME}/common_tests.sh"
setup() {
rm -rf /tmp/nydus-snapshotter
mkdir /tmp/nydus-snapshotter
git clone "${nydus_snapshotter_repo}" /tmp/nydus-snapshotter
}
@test "compile nydus snapshotter" {
docker run --rm -v /tmp/nydus-snapshotter:/nydus-snapshotter $compile_image bash -c 'cd /nydus-snapshotter && make clear && make'
if [ -f "/tmp/nydus-snapshotter/bin/containerd-nydus-grpc" ]; then
/usr/bin/cp -f /tmp/nydus-snapshotter/bin/containerd-nydus-grpc /usr/local/bin/
echo "nydus-snapshotter version"
containerd-nydus-grpc --version
else
echo "cannot find containerd-nydus-grpc binary"
exit 1
fi
}
teardown() {
rm -rf /tmp/nydus-snapshotter
}

View File

@ -0,0 +1,14 @@
#!/usr/bin/bats
load "${BATS_TEST_DIRNAME}/common_tests.sh"
@test "compile nydusd" {
docker run --rm -v $repo_base_dir:/image-service $compile_image bash -c 'cd /image-service && make clean && make release'
if [ -f "${repo_base_dir}/target/release/nydusd" ] && [ -f "${repo_base_dir}/target/release/nydus-image" ]; then
/usr/bin/cp -f ${repo_base_dir}/target/release/nydusd /usr/local/bin/
/usr/bin/cp -f ${repo_base_dir}/target/release/nydus-image /usr/local/bin/
else
echo "cannot find nydusd binary or nydus-image binary"
exit 1
fi
}

17
tests/bats/install_bats.sh Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
set -e
which bats && exit
BATS_REPO="https://github.com/bats-core/bats-core.git"
LOCAL_DIR="/tmp/bats"
echo "Install BATS from sources"
rm -rf $LOCAL_DIR
mkdir -p $LOCAL_DIR
pushd "${LOCAL_DIR}"
git clone "${BATS_REPO}" || true
cd bats-core
sh -c "./install.sh /usr"
popd
rm -rf $LOCAL_DIR

View File

@ -0,0 +1,29 @@
#!/usr/bin/bats
load "${BATS_TEST_DIRNAME}/common_tests.sh"
setup() {
nydus_rafs_image="docker.io/hsiangkao/ubuntu:20.04-rafs-v6"
run_nydus_snapshotter
config_containerd_for_nydus
ctr images ls | grep -q "${nydus_rafs_image}" && ctr images rm $nydus_rafs_image
ctr-remote images rpull $nydus_rafs_image
}
@test "run container with rafs" {
ctr run --rm --snapshotter=nydus $nydus_rafs_image test_container tar cvf /tmp/foo.tar --exclude=/sys --exclude=/proc --exclude=/dev /
}
teardown() {
dmesg -T | tail -300 > ${BATS_TEST_DIRNAME}/dmesg-${BATS_TEST_NAME}.log
ctr images ls | grep -q "${nydus_rafs_image}" && ctr images rm $nydus_rafs_image
if ps -ef | grep containerd-nydus-grpc | grep -v grep; then
ps -ef | grep containerd-nydus-grpc | grep -v grep | awk '{print $2}' | xargs kill -9
fi
if ps -ef | grep nydusd | grep fscache; then
ps -ef | grep nydusd | grep fscache | awk '{print $2}' | xargs kill -9
fi
if mount | grep 'erofs on'; then
mount | grep 'erofs on' | awk '{print $3}' | xargs umount
fi
}

View File

@ -0,0 +1,29 @@
#!/usr/bin/bats
load "${BATS_TEST_DIRNAME}/common_tests.sh"
setup() {
nydus_zran_image="docker.io/hsiangkao/node:18-nydus-oci-ref"
run_nydus_snapshotter
config_containerd_for_nydus
ctr images ls | grep -q "${nydus_zran_image}" && ctr images rm $nydus_zran_image
ctr-remote images rpull $nydus_zran_image
}
@test "run container with zran" {
ctr run --rm --snapshotter=nydus $nydus_zran_image test_container tar cvf /tmp/foo.tar --exclude=/sys --exclude=/proc --exclude=/dev /
}
teardown() {
dmesg -T | tail -300 > ${BATS_TEST_DIRNAME}/dmesg-${BATS_TEST_NAME}.log
ctr images ls | grep -q "${nydus_zran_image}" && ctr images rm $nydus_zran_image
if ps -ef | grep containerd-nydus-grpc | grep -v grep; then
ps -ef | grep containerd-nydus-grpc | grep -v grep | awk '{print $2}' | xargs kill -9
fi
if ps -ef | grep nydusd | grep fscache; then
ps -ef | grep nydusd | grep fscache | awk '{print $2}' | xargs kill -9
fi
if mount | grep 'erofs on'; then
mount | grep 'erofs on' | awk '{print $3}' | xargs umount
fi
}

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-utils"
version = "0.3.1"
version = "0.3.2"
description = "Utilities and helpers for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -12,6 +12,7 @@ extern crate serde;
extern crate lazy_static;
use std::convert::{Into, TryFrom, TryInto};
use std::time::Duration;
pub use self::exec::*;
pub use self::inode_bitmap::InodeBitmap;
@ -56,6 +57,38 @@ pub fn round_down_4k(x: u64) -> u64 {
x & (!4095u64)
}
pub enum DelayType {
Fixed,
// an exponential delay between each attempts
BackOff,
}
pub struct Delayer {
r#type: DelayType,
attempts: u32,
time: Duration,
}
impl Delayer {
pub fn new(t: DelayType, time: Duration) -> Self {
Delayer {
r#type: t,
attempts: 0,
time,
}
}
pub fn delay(&mut self) {
use std::thread::sleep;
match self.r#type {
DelayType::Fixed => sleep(self.time),
DelayType::BackOff => sleep((1 << self.attempts) * self.time),
}
self.attempts += 1;
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -590,6 +590,8 @@ pub trait Metric {
fn dec(&self) {
self.sub(1);
}
fn set(&self, value: u64);
}
/// Basic 64-bit metric counter.
@ -608,6 +610,10 @@ impl Metric for BasicMetric {
fn sub(&self, value: u64) {
self.0.fetch_sub(value, Ordering::Relaxed);
}
fn set(&self, value: u64) {
self.0.store(value, Ordering::Relaxed);
}
}
/// Metrics for storage backends.
@ -733,9 +739,25 @@ pub struct BlobcacheMetrics {
// to estimate the possibility to merge backend IOs.
// In unit of Bytes
pub prefetch_data_amount: BasicMetric,
pub prefetch_mr_count: BasicMetric,
// Total prefetch requests issued from storage/blobs or rafs filesystem layer for each file that needs prefetch
pub prefetch_requests_count: BasicMetric,
pub prefetch_workers: AtomicUsize,
pub prefetch_unmerged_chunks: BasicMetric,
// Cumulative time latencies of each prefetch request which can be handled in parallel.
// It starts when the request is born including nydusd processing and schedule and end when the chunk is downloaded and stored.
// Then the average prefetch latency can be calculated by
// `prefetch_cumulative_time_millis / prefetch_requests_count`
pub prefetch_cumulative_time_millis: BasicMetric,
// The time seconds part when nydusd begins to prefetch
// We can calculate prefetch average bandwidth by
// `prefetch_data_amount / (prefetch_end_time_secs - prefetch_begin_time_secs)`. Note, it does not take milliseconds into account yet.s
pub prefetch_begin_time_secs: BasicMetric,
// The time milliseconds part when nydusd begins to prefetch
pub prefetch_begin_time_millis: BasicMetric,
// The time seconds part when nydusd ends prefetching
pub prefetch_end_time_secs: BasicMetric,
// The time milliseconds part when nydusd ends prefetching
pub prefetch_end_time_millis: BasicMetric,
pub buffered_backend_size: BasicMetric,
pub data_all_ready: AtomicBool,
}
@ -774,6 +796,18 @@ impl BlobcacheMetrics {
pub fn export_metrics(&self) -> IoStatsResult<String> {
serde_json::to_string(self).map_err(IoStatsError::Serialize)
}
pub fn calculate_prefetch_metrics(&self, begin_time: SystemTime) {
let now = SystemTime::now();
if let Ok(ref t) = now.duration_since(SystemTime::UNIX_EPOCH) {
self.prefetch_end_time_secs.set(t.as_secs());
self.prefetch_end_time_millis.set(t.subsec_millis() as u64);
}
if let Ok(ref t) = now.duration_since(begin_time) {
let elapsed = saturating_duration_millis(t);
self.prefetch_cumulative_time_millis.add(elapsed);
}
}
}
#[cfg(test)]