Compare commits

...

28 Commits

Author SHA1 Message Date
imeoer 2fd7070bf7
Merge pull request #839 from imeoer/stable/v2.1-cherry-pick
[backport to stable/v2.1] storage: add mirror health checking support
2022-11-04 21:35:35 +08:00
Bin Tang a514f66851 storage: fix syntax for mirror health checking
Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-04 13:27:47 +00:00
Bin Tang 34d84cac91 storage: refresh token to avoid forwarding to P2P/dragonfly
Forward 401 response to P2P/dragonfly will affect performance.
When there is a mirror that auth_through false, we refresh the token regularly
to avoid forwarding the 401 response to mirror.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-04 13:27:42 +00:00
Bin Tang 41fefcdbae storage: add mirror health checking support
Currently, the mirror is set to unavailable if the failed times reach failure_limit.
We added mirror health checking, which will recover unavailable mirror server.
The failure_limit indicates the failed time at which the mirror is set to unavailable.
The health_check_interval indicates the time interval to recover the unavailable mirror.
The ping_url is the endpoint to check mirror server health.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-11-04 13:27:35 +00:00
imeoer 29a9af49a4
Merge pull request #838 from jiangliu/v2.1.1-pub2
prepare for publishing to crates.io
2022-11-04 21:25:17 +08:00
Jiang Liu 2496bc98f3 release: prepare for publishing to crates.io
Prepare for publishing to crates.io.

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
2022-11-04 21:03:40 +08:00
imeoer 36b4edb638
Merge pull request #834 from imeoer/stable-update-release
action: update release notes for download mirror
2022-11-04 10:26:49 +08:00
Yan Song dd0a0d8522 action: update release notes for download mirror
Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-11-04 02:25:59 +00:00
imeoer 29af7a1267
Merge pull request #830 from changweige/backport-nydusify-drop-label
nydusify: drop label "nydus-blob-ids" from meta layer
2022-11-03 14:04:16 +08:00
Changwei Ge 07788809a2 nydusify: drop label "nydus-blob-ids" from meta layer
Image with layers more than 64 can't be pulled by containerd
since the label is exceeding the label size limitation 4096bytes.

We should figure out another way to do GC in nydus-snapshotter

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-11-03 13:55:35 +08:00
imeoer 405c79de17
Merge pull request #828 from changweige/nydusify-backport-v2.1
Backport 3 patches for stable/v2.1
2022-11-03 09:54:25 +08:00
泰友 c8b21e3529 fix: miss oss file of nydusify packer
Reproduction:

Prepare configuration file used for pack command. {
"bucket_name": "XXX",
"endpoint": "XXX",
"access_key_id": "XXX",
"access_key_secret": "XXX",
"meta_prefix": "nydus_rund_sidecar_meta",
"blob_prefix": "blobs"
}

Pack by nydusify sudo contrib/nydusify/cmd/nydusify pack
--source-dir test
--output-dir tmp
--name ccx-test
--backend-push
--backend-config-file backend-config.json
--backend-type oss
--nydus-image target/debug/nydus-image

Miss blob file and meta file in oss

Problem:

Forgot to CompleteMultipartUpload after chunk uploading.

Fix:

CompleteMultipartUpload to complete uploading.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
2022-11-02 19:01:22 +08:00
泰友 67e0cc6f32 refact: use specified object prefix and meta prefix directly
issue: https://github.com/dragonflyoss/image-service/issues/608

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
2022-11-02 19:01:17 +08:00
泰友 121a108ac9 fix: nydusify pack fail
Reproduction
1. Prepare configuration file used for pack command.
{
    "bucket_name": "XXX",
    "endpoint": "XXX",
    "access_key_id": "XXX",
    "access_key_secret": "XXX",
    "meta_prefix": "nydus_rund_sidecar_meta",
    "blob_prefix": "blobs"
}

2. Pack by nydusify
sudo contrib/nydusify/cmd/nydusify pack \
--source-dir test \
--output-dir tmp \
--name ccx-test \
--backend-push \
--backend-config-file backend-config.json \
--backend-type oss \
--nydus-image target/debug/nydus-image

3. Got error
FATA[2022-10-08T18:06:46+08:00] failed to push pack result to remote: failed to put metafile to remote: split file by part size: open tmp/tmp/ccx-test.meta: no such file or directory

Problem
The path of bootstrap file, which is to upload, is wrong.

Fix
Use imageName as req.Meta, which is bootstrap file to upload.

Signed-off-by: 泰友 <cuichengxu.ccx@antgroup.com>
2022-11-02 18:58:03 +08:00
Jiang Liu e9a774c2ee
Merge pull request #805 from changweige/pytest-stop-v2
action/nydus-test: stop on the first test failure
2022-10-20 14:17:21 +08:00
Changwei Ge 7975d09dc3 action/nydus-test: stop on the first test failure
By default, pytest will continue executing test even
current test fails. It's hard to tell what to happen
on such a environment. And it makes it hard to investigate
the first failed case.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-10-20 13:35:01 +08:00
Jiang Liu 8c9c73b5b7
Merge pull request #804 from imeoer/bring-auto-version
release: update version on build automatically
2022-10-19 20:18:47 +08:00
Yan Song c7eaa2e858 release: update version on build automatically
We only need to git tag to release a version without modifying
the version field in Cargo.toml and Cargo.lock.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-10-19 12:14:28 +00:00
Changwei Ge 6a0bef4ce6
Merge pull request #799 from changweige/backport-patches
Backport some patches for stable/v2.1
2022-10-18 15:18:25 +08:00
Yan Song 8a32d5b61e nydusify: fix overlay error for image with single layer
Nydusify check subcommand will check the consistency of
OCI image and nydus image by mounting (overlayfs or nydusd).

For the OCI image with a single layer, we should use bind
mount instead of overlay to mount rootfs, otherwise an error
will be thrown like:

```
wrong fs type, bad option, bad superblock on overlay, missing
codepage or helper program, or other error.
```

This commit also refine the codes for image.Mount/image.Umount.

Signed-off-by: Yan Song <imeoer@linux.alibaba.com>
2022-10-18 14:48:00 +08:00
Bin Tang 5ac9831130 fix mirror's performance issue
In some scenarios(e.g. P2P/Dragonfly), sending an authorization request
to the mirror will cause performance loss. We add parameter
auth_through. When auth_through is false, nydusd will directly send
non-authorization request to original registry.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-10-18 14:47:39 +08:00
Xin Yin c75d3fbfcf storage: retry timeout chunks for fscache ondemand path
for fscache ondemand path, if some requested chunks are set to pending by
prefetch threads, and wait them timeout, will casue EIO to container side.

retry the timeout chunks on ondemand path, minimize EIOs.

Signed-off-by: Xin Yin <yinxin.x@bytedance.com>
2022-10-18 14:47:14 +08:00
Jiang Liu 91d26745e2
Merge pull request #785 from sctb512/rafsv6-file-parent
nydus-image: fix inspect to get correct path of rafs v6 file
2022-10-17 10:05:32 +08:00
Jiang Liu a51a7185f1
Merge pull request #793 from changweige/fix-v5-prefetch-table
nydus-image/v5: prefetch table should contain inode numbers rather its index
2022-10-14 21:49:40 +08:00
Changwei Ge afaf75cfff nydus-image/v5: prefetch table should contain inode numbers rather its
index

Nydusd is performing prefetch by finding all inodes matching its
inode number.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-10-14 16:18:56 +08:00
Bin Tang c28585f06f fix inspect to get correct path of rafs v6 file
Because rafs v6 doesn't support get_parent, the prefetch and icheck
command of inspect will cause error. We fixed it by handling
get_file_name get_file_name and path_from_ino for rafs v6 files
separately. This commit does not affect the rafs core code.

Signed-off-by: Bin Tang <tangbin.bin@bytedance.com>
2022-10-14 15:29:19 +08:00
Changwei Ge fd588c918f
Merge pull request #789 from changweige/port-2.1-enlarge-fuse-threads-num
nydusd: enlarge default fuse server threads
2022-10-11 10:37:39 +08:00
Changwei Ge 3b15cf50a5 nydusd: enlarge default fuse server threads
Now the default value is only 1, it affacts performance.

Signed-off-by: Changwei Ge <gechangwei@bytedance.com>
2022-10-11 10:36:18 +08:00
36 changed files with 705 additions and 296 deletions

View File

@ -102,11 +102,9 @@ jobs:
"target": "musl"
}
EOF
- name: run test_api
- name: run e2e tests
run: |
cd /home/runner/work/image-service/image-service/contrib/nydus-test
sudo mkdir -p /blobdir
sudo python3 nydus_test_config.py --dist fs_structure.yaml
sudo pytest -vs --durations=0 functional-test/test_api.py \
functional-test/test_nydus.py \
functional-test/test_layered_image.py
sudo pytest -vs -x --durations=0 functional-test/test_api.py functional-test/test_nydus.py functional-test/test_layered_image.py

View File

@ -205,7 +205,7 @@ jobs:
with:
name: "Nydus Image Service ${{ env.tag }}"
body: |
Mirror (update in 10 min): https://registry.npmmirror.com/binary.html?path=nydus/${{ env.tag }}/
Binaries download mirror (sync within a few hours): https://registry.npmmirror.com/binary.html?path=nydus/${{ env.tag }}/
generate_release_notes: true
files: |
${{ env.tarballs }}

16
Cargo.lock generated
View File

@ -928,7 +928,7 @@ dependencies = [
[[package]]
name = "nydus-api"
version = "0.1.1"
version = "0.1.2"
dependencies = [
"dbs-uhttp",
"http",
@ -947,7 +947,7 @@ dependencies = [
[[package]]
name = "nydus-app"
version = "0.3.0"
version = "0.3.1"
dependencies = [
"flexi_logger",
"libc",
@ -962,7 +962,7 @@ dependencies = [
[[package]]
name = "nydus-blobfs"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"fuse-backend-rs",
"libc",
@ -978,7 +978,7 @@ dependencies = [
[[package]]
name = "nydus-error"
version = "0.2.1"
version = "0.2.2"
dependencies = [
"backtrace",
"httpdate",
@ -990,7 +990,7 @@ dependencies = [
[[package]]
name = "nydus-rafs"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"anyhow",
"arc-swap",
@ -1022,7 +1022,7 @@ dependencies = [
[[package]]
name = "nydus-rs"
version = "2.1.0-rc.3.1"
version = "0.0.0-git"
dependencies = [
"anyhow",
"base64",
@ -1068,7 +1068,7 @@ dependencies = [
[[package]]
name = "nydus-storage"
version = "0.5.0"
version = "0.5.1"
dependencies = [
"anyhow",
"arc-swap",
@ -1100,7 +1100,7 @@ dependencies = [
[[package]]
name = "nydus-utils"
version = "0.3.1"
version = "0.3.2"
dependencies = [
"blake3",
"flate2",

View File

@ -1,6 +1,7 @@
[package]
name = "nydus-rs"
version = "2.1.0-rc.3.1"
# will be overridden by real git tag during cargo build
version = "0.0.0-git"
description = "Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-api"
version = "0.1.1"
version = "0.1.2"
description = "APIs for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -423,11 +423,41 @@ impl Default for ProxyConfig {
/// Configuration for mirror.
#[derive(Clone, Deserialize, Serialize, Debug)]
#[serde(default)]
pub struct MirrorConfig {
/// Mirror server URL, for example http://127.0.0.1:65001
/// Mirror server URL, for example http://127.0.0.1:65001.
pub host: String,
/// HTTP request headers to be passed to mirror server
pub headers: Option<HashMap<String, String>>,
/// HTTP request headers to be passed to mirror server.
#[serde(default)]
pub headers: HashMap<String, String>,
/// Whether the authorization process is through mirror? default false.
/// true: authorization through mirror, e.g. Using normal registry as mirror.
/// false: authorization through original registry,
/// e.g. when using Dragonfly server as mirror, authorization through it will affect performance.
#[serde(default)]
pub auth_through: bool,
/// Interval of mirror health checking, in seconds.
#[serde(default = "default_check_interval")]
pub health_check_interval: u64,
/// Failure count for which mirror is considered unavailable.
#[serde(default = "default_failure_limit")]
pub failure_limit: u8,
/// Ping URL to check mirror server health.
#[serde(default)]
pub ping_url: String,
}
impl Default for MirrorConfig {
fn default() -> Self {
Self {
host: String::new(),
headers: HashMap::new(),
auth_through: false,
health_check_interval: 5,
failure_limit: 5,
ping_url: String::new(),
}
}
}
#[derive(Debug)]
@ -944,6 +974,14 @@ fn default_http_timeout() -> u32 {
5
}
fn default_check_interval() -> u64 {
5
}
fn default_failure_limit() -> u8 {
5
}
fn default_work_dir() -> String {
".".to_string()
}

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-app"
version = "0.3.0"
version = "0.3.1"
authors = ["The Nydus Developers"]
description = "Application framework for Nydus Image Service"
readme = "README.md"

View File

@ -39,7 +39,7 @@ use nydus_app::{BuildTimeInfo, setup_logging};
fn main() -> Result<()> {
let level = cmd.value_of("log-level").unwrap().parse().unwrap();
let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
let (bti_string, build_info) = BuildTimeInfo::dump();
let _cmd = App::new("")
.version(bti_string.as_str())
.author(crate_authors!())

View File

@ -28,7 +28,17 @@ fn get_git_commit_hash() -> String {
return commit.to_string();
}
}
"Unknown".to_string()
"unknown".to_string()
}
fn get_git_commit_version() -> String {
let tag = Command::new("git").args(&["describe", "--tags"]).output();
if let Ok(tag) = tag {
if let Some(tag) = String::from_utf8_lossy(&tag.stdout).lines().next() {
return tag.to_string();
}
}
"unknown".to_string()
}
fn main() {
@ -43,10 +53,12 @@ fn main() {
.format(&time::format_description::well_known::Iso8601::DEFAULT)
.unwrap();
let git_commit_hash = get_git_commit_hash();
let git_commit_version = get_git_commit_version();
println!("cargo:rerun-if-changed=../git/HEAD");
println!("cargo:rustc-env=RUSTC_VERSION={}", rustc_ver);
println!("cargo:rustc-env=PROFILE={}", profile);
println!("cargo:rustc-env=BUILT_TIME_UTC={}", build_time);
println!("cargo:rustc-env=GIT_COMMIT_HASH={}", git_commit_hash);
println!("cargo:rustc-env=GIT_COMMIT_VERSION={}", git_commit_version);
}

View File

@ -22,7 +22,7 @@
//!
//! fn main() -> Result<()> {
//! let level = cmd.value_of("log-level").unwrap().parse().unwrap();
//! let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
//! let (bti_string, build_info) = BuildTimeInfo::dump();
//! let _cmd = App::new("")
//! .version(bti_string.as_str())
//! .author(crate_authors!())
@ -65,14 +65,15 @@ pub mod built_info {
pub const PROFILE: &str = env!("PROFILE");
pub const RUSTC_VERSION: &str = env!("RUSTC_VERSION");
pub const BUILT_TIME_UTC: &str = env!("BUILT_TIME_UTC");
pub const GIT_COMMIT_VERSION: &str = env!("GIT_COMMIT_VERSION");
pub const GIT_COMMIT_HASH: &str = env!("GIT_COMMIT_HASH");
}
/// Dump program build and version information.
pub fn dump_program_info(prog_version: &str) {
pub fn dump_program_info() {
info!(
"Program Version: {}, Git Commit: {:?}, Build Time: {:?}, Profile: {:?}, Rustc Version: {:?}",
prog_version,
built_info::GIT_COMMIT_VERSION,
built_info::GIT_COMMIT_HASH,
built_info::BUILT_TIME_UTC,
built_info::PROFILE,
@ -91,10 +92,10 @@ pub struct BuildTimeInfo {
}
impl BuildTimeInfo {
pub fn dump(package_ver: &str) -> (String, Self) {
pub fn dump() -> (String, Self) {
let info_string = format!(
"\rVersion: \t{}\nGit Commit: \t{}\nBuild Time: \t{}\nProfile: \t{}\nRustc: \t\t{}\n",
package_ver,
built_info::GIT_COMMIT_VERSION,
built_info::GIT_COMMIT_HASH,
built_info::BUILT_TIME_UTC,
built_info::PROFILE,
@ -102,7 +103,7 @@ impl BuildTimeInfo {
);
let info = Self {
package_ver: package_ver.to_string(),
package_ver: built_info::GIT_COMMIT_VERSION.to_string(),
git_commit: built_info::GIT_COMMIT_HASH.to_string(),
build_time: built_info::BUILT_TIME_UTC.to_string(),
profile: built_info::PROFILE.to_string(),

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-blobfs"
version = "0.1.0"
version = "0.1.1"
description = "Blob object file system for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -43,10 +43,9 @@
"digest": "sha256:aec98c9e3dce739877b8f5fe1cddd339de1db2b36c20995d76f6265056dbdb08",
"size": 273320,
"annotations": {
"containerd.io/snapshot/nydus-blob-ids": "[\"09845cce1d983b158d4865fc37c23bbfb892d4775c786e8114d3cf868975c059\",\"b413839e4ee5248697ef30fe9a84b659fa744d69bbc9b7754113adc2b2b6bc90\",\"b6a85be8248b0d3c2f0565ef71d549f404f8edcee1ab666c9871a8e6d9384860\",\"00d151e7d392e68e2c756a6fc42640006ddc0a98d37dba3f90a7b73f63188bbd\"]",
"containerd.io/snapshot/nydus-bootstrap": "true",
"containerd.io/snapshot/nydus-reference-blob-ids": "[\"09845cce1d983b158d4865fc37c23bbfb892d4775c786e8114d3cf868975c059\"]"
}
}
]
}
}

View File

@ -113,6 +113,7 @@ func (b *OSSBackend) Upload(ctx context.Context, blobID, blobPath string, size i
blobObjectKey := b.objectPrefix + blobID
desc := blobDesc(size, blobID)
desc.URLs = append(desc.URLs, b.remoteID(blobID))
if !forcePush {
if exist, err := b.bucket.IsObjectExist(blobObjectKey); err != nil {
@ -254,3 +255,7 @@ func (b *OSSBackend) Check(blobID string) (bool, error) {
func (b *OSSBackend) Type() Type {
return OssBackend
}
func (b *OSSBackend) remoteID(blobID string) string {
return fmt.Sprintf("oss://%s/%s%s", b.bucket.BucketName, b.objectPrefix, blobID)
}

View File

@ -145,8 +145,8 @@ func (cache *Cache) recordToLayer(record *Record) (*ocispec.Descriptor, *ocispec
utils.LayerAnnotationUncompressed: record.NydusBootstrapDiffID.String(),
},
}
if refenceBlobsStr, ok := record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs]; ok {
bootstrapCacheDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs] = refenceBlobsStr
if referenceBlobsStr, ok := record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs]; ok {
bootstrapCacheDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs] = referenceBlobsStr
}
var blobCacheDesc *ocispec.Descriptor

View File

@ -205,14 +205,15 @@ func (rule *FilesystemRule) pullSourceImage() (*tool.Image, error) {
func (rule *FilesystemRule) mountSourceImage() (*tool.Image, error) {
logrus.Infof("Mounting source image to %s", rule.SourceMountPath)
if err := os.MkdirAll(rule.SourceMountPath, 0755); err != nil {
return nil, errors.Wrap(err, "create mountpoint directory of source image")
}
image, err := rule.pullSourceImage()
if err != nil {
return nil, errors.Wrap(err, "pull source image")
}
if err := image.Umount(); err != nil {
return nil, errors.Wrap(err, "umount previous rootfs")
}
if err := image.Mount(); err != nil {
return nil, errors.Wrap(err, "mount source image")
}

View File

@ -7,18 +7,42 @@ package tool
import (
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/containerd/containerd/mount"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
func run(cmd string, args ...string) error {
_cmd := exec.Command("sh", "-c", cmd)
_cmd.Stdout = os.Stdout
_cmd.Stderr = os.Stderr
return _cmd.Run()
func mkMounts(dirs []string) []mount.Mount {
var options []string
if len(dirs) == 0 {
return nil
}
if len(dirs) == 1 {
return []mount.Mount{
{
Source: dirs[0],
Type: "bind",
Options: []string{
"ro",
"rbind",
},
},
}
}
options = append(options, fmt.Sprintf("lowerdir=%s", strings.Join(dirs, ":")))
return []mount.Mount{
{
Type: "overlay",
Source: "overlay",
Options: options,
},
}
}
type Image struct {
@ -30,41 +54,20 @@ type Image struct {
// Mount mounts rootfs of OCI image.
func (image *Image) Mount() error {
image.Umount()
if err := os.MkdirAll(image.Rootfs, 0750); err != nil {
return errors.Wrap(err, "create rootfs dir")
}
var dirs []string
layerLen := len(image.Layers)
count := len(image.Layers)
for i := range image.Layers {
layerDir := filepath.Join(image.SourcePath, image.Layers[layerLen-i-1].Digest.Encoded())
layerDir := filepath.Join(image.SourcePath, image.Layers[count-i-1].Digest.Encoded())
dirs = append(dirs, strings.ReplaceAll(layerDir, ":", "\\:"))
}
lowerOption := strings.Join(dirs, ":")
// Handle long options string overed 4096 chars, split them to
// two overlay mounts.
if len(lowerOption) >= 4096 {
half := (len(dirs) - 1) / 2
upperDirs := dirs[half+1:]
lowerDirs := dirs[:half+1]
lowerOverlay := image.Rootfs + "_lower"
if err := os.MkdirAll(lowerOverlay, 0755); err != nil {
return err
}
if err := run(fmt.Sprintf(
"mount -t overlay overlay -o lowerdir='%s' %s",
strings.Join(upperDirs, ":"), lowerOverlay,
)); err != nil {
return err
}
lowerDirs = append(lowerDirs, lowerOverlay)
lowerOption = strings.Join(lowerDirs, ":")
}
if err := run(fmt.Sprintf(
"mount -t overlay overlay -o lowerdir='%s' %s",
lowerOption, image.Rootfs,
)); err != nil {
return err
mounts := mkMounts(dirs)
if err := mount.All(mounts, image.Rootfs); err != nil {
return errors.Wrap(err, "mount source layer")
}
return nil
@ -72,21 +75,19 @@ func (image *Image) Mount() error {
// Umount umounts rootfs mountpoint of OCI image.
func (image *Image) Umount() error {
lowerOverlay := image.Rootfs + "_lower"
if _, err := os.Stat(lowerOverlay); err == nil {
if err := run(fmt.Sprintf("umount %s", lowerOverlay)); err != nil {
return err
if _, err := os.Stat(image.Rootfs); err != nil {
if os.IsNotExist(err) {
return nil
}
return errors.Wrap(err, "stat rootfs")
}
if _, err := os.Stat(image.Rootfs); err == nil {
if err := run(fmt.Sprintf("umount %s", image.Rootfs)); err != nil {
return err
}
if err := mount.Unmount(image.Rootfs, 0); err != nil {
return errors.Wrap(err, "umount rootfs")
}
if err := os.RemoveAll(image.SourcePath); err != nil {
return err
if err := os.RemoveAll(image.Rootfs); err != nil {
return errors.Wrap(err, "remove rootfs")
}
return nil

View File

@ -194,19 +194,15 @@ func appendBlobs(oldBlobs []string, newBlobs []string) []string {
return oldBlobs
}
func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer) error {
func (mm *manifestManager) Push(ctx context.Context, builtLayers []*buildLayer) error {
var (
blobListInAnnotation []string
referenceBlobs []string
layers []ocispec.Descriptor
referenceBlobs []string
layers []ocispec.Descriptor
)
for idx, _layer := range buildLayers {
for idx, _layer := range builtLayers {
record := _layer.GetCacheRecord()
referenceBlobs = appendBlobs(referenceBlobs, layersHex(_layer.referenceBlobs))
blobListInAnnotation = appendBlobs(blobListInAnnotation, layersHex(_layer.referenceBlobs))
if record.NydusBlobDesc != nil {
// Write blob digest list in JSON format to layer annotation of bootstrap.
blobListInAnnotation = append(blobListInAnnotation, record.NydusBlobDesc.Digest.Hex())
// For registry backend, we need to write the blob layer to
// manifest to prevent them from being deleted by registry GC.
if mm.backend.Type() == backend.RegistryBackend {
@ -222,15 +218,10 @@ func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer)
}
}
// Only need to write lastest bootstrap layer in nydus manifest
if idx == len(buildLayers)-1 {
blobListBytes, err := json.Marshal(blobListInAnnotation)
if err != nil {
return errors.Wrap(err, "Marshal blob list")
}
record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusBlobIDs] = string(blobListBytes)
// Only need to write latest bootstrap layer in nydus manifest
if idx == len(builtLayers)-1 {
if len(referenceBlobs) > 0 {
blobListBytes, err = json.Marshal(referenceBlobs)
blobListBytes, err := json.Marshal(referenceBlobs)
if err != nil {
return errors.Wrap(err, "Marshal blob list")
}
@ -250,7 +241,6 @@ func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer)
// Remove useless annotations from layer
validAnnotationKeys := map[string]bool{
utils.LayerAnnotationNydusBlob: true,
utils.LayerAnnotationNydusBlobIDs: true,
utils.LayerAnnotationNydusReferenceBlobIDs: true,
utils.LayerAnnotationNydusBootstrap: true,
utils.LayerAnnotationNydusFsVersion: true,

View File

@ -67,7 +67,7 @@ func (cfg *BackendConfig) rawMetaBackendCfg() []byte {
"access_key_id": cfg.AccessKeyID,
"access_key_secret": cfg.AccessKeySecret,
"bucket_name": cfg.BucketName,
"object_prefix": cfg.MetaPrefix + "/",
"object_prefix": cfg.MetaPrefix,
}
b, _ := json.Marshal(configMap)
return b
@ -79,7 +79,7 @@ func (cfg *BackendConfig) rawBlobBackendCfg() []byte {
"access_key_id": cfg.AccessKeyID,
"access_key_secret": cfg.AccessKeySecret,
"bucket_name": cfg.BucketName,
"object_prefix": cfg.BlobPrefix + "/",
"object_prefix": cfg.BlobPrefix,
}
b, _ := json.Marshal(configMap)
return b
@ -316,7 +316,7 @@ func (p *Packer) Pack(_ context.Context, req PackRequest) (PackResult, error) {
return PackResult{}, errors.New("can not push image to remote due to lack of backend configuration")
}
pushResult, err := p.pusher.Push(PushRequest{
Meta: bootstrapPath,
Meta: req.ImageName,
Blob: newBlobHash,
ParentBlobs: parentBlobs,
})

View File

@ -3,7 +3,6 @@ package packer
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
@ -69,12 +68,23 @@ func NewPusher(opt NewPusherOpt) (*Pusher, error) {
// Push will push the meta and blob file to remote backend
// at this moment, oss is the only possible backend, the meta file name is user defined
// and blob file name is the hash of the blobfile that is extracted from output.json
func (p *Pusher) Push(req PushRequest) (PushResult, error) {
func (p *Pusher) Push(req PushRequest) (pushResult PushResult, retErr error) {
p.logger.Info("start to push meta and blob to remote backend")
// todo: add a suitable timeout
ctx := context.Background()
// todo: use blob desc to build manifest
defer func() {
if retErr != nil {
if err := p.blobBackend.Finalize(true); err != nil {
logrus.WithError(err).Warnf("Cancel blob backend upload")
}
if err := p.metaBackend.Finalize(true); err != nil {
logrus.WithError(err).Warnf("Cancel meta backend upload")
}
}
}()
for _, blob := range req.ParentBlobs {
// try push parent blobs
if _, err := p.blobBackend.Upload(ctx, blob, p.blobFilePath(blob, true), 0, false); err != nil {
@ -84,18 +94,30 @@ func (p *Pusher) Push(req PushRequest) (PushResult, error) {
p.logger.Infof("push blob %s", req.Blob)
if req.Blob != "" {
if _, err := p.blobBackend.Upload(ctx, req.Blob, p.blobFilePath(req.Blob, true), 0, false); err != nil {
desc, err := p.blobBackend.Upload(ctx, req.Blob, p.blobFilePath(req.Blob, true), 0, false)
if err != nil {
return PushResult{}, errors.Wrap(err, "failed to put blobfile to remote")
}
if len(desc.URLs) > 0 {
pushResult.RemoteBlob = desc.URLs[0]
}
}
if _, err := p.metaBackend.Upload(ctx, req.Meta, p.bootstrapPath(req.Meta), 0, true); err != nil {
return PushResult{}, errors.Wrapf(err, "failed to put metafile to remote")
if retErr = p.blobBackend.Finalize(false); retErr != nil {
return PushResult{}, errors.Wrap(retErr, "Finalize blob backend upload")
}
return PushResult{
RemoteMeta: fmt.Sprintf("oss://%s/%s/%s", p.cfg.BucketName, p.cfg.MetaPrefix, req.Meta),
RemoteBlob: fmt.Sprintf("oss://%s/%s/%s", p.cfg.BucketName, p.cfg.BlobPrefix, req.Blob),
}, nil
desc, retErr := p.metaBackend.Upload(ctx, req.Meta, p.bootstrapPath(req.Meta), 0, true)
if retErr != nil {
return PushResult{}, errors.Wrapf(retErr, "failed to put metafile to remote")
}
if len(desc.URLs) != 0 {
pushResult.RemoteMeta = desc.URLs[0]
}
if retErr = p.metaBackend.Finalize(false); retErr != nil {
return PushResult{}, errors.Wrap(retErr, "Finalize meta backend upload")
}
return
}
func ParseBackendConfig(backendConfigFile string) (BackendConfig, error) {

View File

@ -20,7 +20,8 @@ type mockBackend struct {
func (m *mockBackend) Upload(ctx context.Context, blobID, blobPath string, blobSize int64, forcePush bool) (*ocispec.Descriptor, error) {
args := m.Called(ctx, blobID, blobPath, blobSize, forcePush)
return nil, args.Error(0)
desc := args.Get(0)
return desc.(*ocispec.Descriptor), nil
}
func (m *mockBackend) Finalize(cancel bool) error {
@ -94,8 +95,12 @@ func TestPusher_Push(t *testing.T) {
hash := "3093776c78a21e47f0a8b4c80a1f019b1e838fc1ade274209332af1ca5f57090"
assert.Nil(t, err)
mp.On("Upload", mock.Anything, "mock.meta", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
mp.On("Upload", mock.Anything, hash, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
mp.On("Upload", mock.Anything, "mock.meta", mock.Anything, mock.Anything, mock.Anything).Return(&ocispec.Descriptor{
URLs: []string{"oss://testbucket/testmetaprefix/mock.meta"},
}, nil)
mp.On("Upload", mock.Anything, hash, mock.Anything, mock.Anything, mock.Anything).Return(&ocispec.Descriptor{
URLs: []string{"oss://testbucket/testblobprefix/3093776c78a21e47f0a8b4c80a1f019b1e838fc1ade274209332af1ca5f57090"},
}, nil)
res, err := pusher.Push(PushRequest{
Meta: "mock.meta",
Blob: hash,

View File

@ -14,7 +14,6 @@ const (
LayerAnnotationNydusBlob = "containerd.io/snapshot/nydus-blob"
LayerAnnotationNydusBlobDigest = "containerd.io/snapshot/nydus-blob-digest"
LayerAnnotationNydusBlobSize = "containerd.io/snapshot/nydus-blob-size"
LayerAnnotationNydusBlobIDs = "containerd.io/snapshot/nydus-blob-ids"
LayerAnnotationNydusBootstrap = "containerd.io/snapshot/nydus-bootstrap"
LayerAnnotationNydusFsVersion = "containerd.io/snapshot/nydus-fs-version"
LayerAnnotationNydusSourceChainID = "containerd.io/snapshot/nydus-source-chainid"

View File

@ -256,19 +256,30 @@ Currently, the mirror mode is only tested in the registry backend, and in theory
{
// Mirror server URL (include scheme), e.g. Dragonfly dfdaemon server URL
"host": "http://dragonfly1.io:65001",
// true: Send the authorization request to the mirror e.g. another docker registry.
// false: Authorization request won't be relayed by the mirror e.g. Dragonfly.
"auth_through": false,
// Headers for mirror server
"headers": {
// For Dragonfly dfdaemon server URL, we need to specify "X-Dragonfly-Registry" (include scheme).
// When Dragonfly does not cache data, it will pull them from "X-Dragonfly-Registry".
// If not set "X-Dragonfly-Registry", Dragonfly will pull data from proxy.registryMirror.url.
"X-Dragonfly-Registry": "https://index.docker.io"
}
},
// This URL endpoint is used to check the health of mirror server, and if the mirror is unhealthy,
// the request will fallback to the next mirror or the original registry server.
// Use $host/v2 as default if left empty.
"ping_url": "http://127.0.0.1:40901/server/ping",
// Interval time (s) to check and recover unavailable mirror. Use 5 as default if left empty.
"health_check_interval": 5,
// Failure counts before disabling this mirror. Use 5 as default if left empty.
"failure_limit": 5,
},
{
"host": "http://dragonfly2.io:65001",
"headers": {
"X-Dragonfly-Registry": "https://index.docker.io"
}
},
}
],
...

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-error"
version = "0.2.1"
version = "0.2.2"
description = "Error handling utilities for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-rafs"
version = "0.1.0"
version = "0.1.1"
description = "The RAFS filesystem format for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -405,12 +405,13 @@ impl Bootstrap {
let inode_table_size = inode_table.size();
// Set prefetch table
let (prefetch_table_size, prefetch_table_entries) =
if let Some(prefetch_table) = ctx.prefetch.get_rafsv5_prefetch_table() {
(prefetch_table.size(), prefetch_table.len() as u32)
} else {
(0, 0u32)
};
let (prefetch_table_size, prefetch_table_entries) = if let Some(prefetch_table) =
ctx.prefetch.get_rafsv5_prefetch_table(&bootstrap_ctx.nodes)
{
(prefetch_table.size(), prefetch_table.len() as u32)
} else {
(0, 0u32)
};
// Set blob table, use sha256 string (length 64) as blob id if not specified
let prefetch_table_offset = super_block_size + inode_table_size;
@ -481,7 +482,9 @@ impl Bootstrap {
.context("failed to store inode table")?;
// Dump prefetch table
if let Some(mut prefetch_table) = ctx.prefetch.get_rafsv5_prefetch_table() {
if let Some(mut prefetch_table) =
ctx.prefetch.get_rafsv5_prefetch_table(&bootstrap_ctx.nodes)
{
prefetch_table
.store(bootstrap_ctx.writer.as_mut())
.context("failed to store prefetch table")?;

View File

@ -165,12 +165,12 @@ impl Prefetch {
indexes
}
pub fn get_rafsv5_prefetch_table(&mut self) -> Option<RafsV5PrefetchTable> {
pub fn get_rafsv5_prefetch_table(&mut self, nodes: &[Node]) -> Option<RafsV5PrefetchTable> {
if self.policy == PrefetchPolicy::Fs {
let mut prefetch_table = RafsV5PrefetchTable::new();
for i in self.readahead_patterns.values().filter_map(|v| *v) {
// Rafs v5 has inode number equal to index.
prefetch_table.add_entry(i as u32);
prefetch_table.add_entry(nodes[i as usize - 1].inode.ino() as u32);
}
Some(prefetch_table)
} else {

View File

@ -3,11 +3,12 @@
// SPDX-License-Identifier: Apache-2.0
use std::{
ffi::OsString,
fs::Permissions,
io::{Error, ErrorKind, Write},
ops::DerefMut,
os::unix::prelude::PermissionsExt,
path::Path,
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
@ -164,7 +165,8 @@ impl RafsInspector {
fn cmd_stat_file(&self, file_name: &str) -> Result<Option<Value>, anyhow::Error> {
// Stat current directory
if file_name == "." {
return self.stat_single_file(self.cur_dir_ino);
let inode = self.rafs_meta.get_inode(self.cur_dir_ino, false)?;
return self.stat_single_file(Some(inode.parent()), self.cur_dir_ino);
}
// Walk through children inodes to find the file
@ -173,7 +175,7 @@ impl RafsInspector {
dir_inode.walk_children_inodes(0, &mut |_inode, child_name, child_ino, _offset| {
if child_name == file_name {
// Print file information
if let Err(e) = self.stat_single_file(child_ino) {
if let Err(e) = self.stat_single_file(Some(dir_inode.ino()), child_ino) {
return Err(Error::new(ErrorKind::Other, e));
}
@ -273,6 +275,36 @@ Compressed Size: {compressed_size}
Ok(None)
}
// Convert an inode number to a file path, the rafs v6 file is handled separately.
fn path_from_ino(&self, ino: u64) -> Result<PathBuf, anyhow::Error> {
let inode = self.rafs_meta.superblock.get_inode(ino, false)?;
if ino == self.rafs_meta.superblock.root_ino() {
return Ok(self
.rafs_meta
.superblock
.get_inode(ino, false)?
.name()
.into());
}
let mut file_path = PathBuf::from("");
if self.rafs_meta.meta.is_v6() && !inode.is_dir() {
self.rafs_meta.walk_dir(
self.rafs_meta.superblock.root_ino(),
None,
&mut |inode, path| {
if inode.ino() == ino {
file_path = PathBuf::from(path);
}
Ok(())
},
)?;
} else {
file_path = self.rafs_meta.path_from_ino(ino as u64)?;
};
Ok(file_path)
}
// Implement command "prefetch"
fn cmd_list_prefetch(&self) -> Result<Option<Value>, anyhow::Error> {
let mut guard = self.bootstrap.lock().unwrap();
@ -283,7 +315,7 @@ Compressed Size: {compressed_size}
let o = if self.request_mode {
let mut value = json!([]);
for ino in prefetch_inos {
let path = self.rafs_meta.path_from_ino(ino as u64)?;
let path = self.path_from_ino(ino as u64)?;
let v = json!({"inode": ino, "path": path});
value.as_array_mut().unwrap().push(v);
}
@ -294,7 +326,7 @@ Compressed Size: {compressed_size}
self.rafs_meta.meta.prefetch_table_entries
);
for ino in prefetch_inos {
let path = self.rafs_meta.path_from_ino(ino as u64)?;
let path = self.path_from_ino(ino as u64)?;
println!(
r#"Inode Number:{inode_number:10} | Path: {path:?} "#,
path = path,
@ -362,15 +394,56 @@ Blob ID: {}
Ok(None)
}
/// Walkthrough the file tree rooted at ino, calling cb for each file or directory
/// in the tree by DFS order, including ino, please ensure ino is a directory.
fn walk_dir(
&self,
ino: u64,
parent: Option<&PathBuf>,
parent_ino: Option<u64>,
cb: &mut dyn FnMut(Option<u64>, &dyn RafsInode, &Path) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let inode = self.rafs_meta.superblock.get_inode(ino, false)?;
if !inode.is_dir() {
bail!("inode {} is not a directory", ino);
}
self.walk_dir_inner(inode.as_ref(), parent, parent_ino, cb)
}
fn walk_dir_inner(
&self,
inode: &dyn RafsInode,
parent: Option<&PathBuf>,
parent_ino: Option<u64>,
cb: &mut dyn FnMut(Option<u64>, &dyn RafsInode, &Path) -> anyhow::Result<()>,
) -> anyhow::Result<()> {
let path = if let Some(parent) = parent {
parent.join(inode.name())
} else {
PathBuf::from("/")
};
cb(parent_ino, inode, &path)?;
if !inode.is_dir() {
return Ok(());
}
let child_count = inode.get_child_count();
for idx in 0..child_count {
let child = inode.get_child_by_index(idx)?;
self.walk_dir_inner(child.as_ref(), Some(&path), Some(inode.ino()), cb)?;
}
Ok(())
}
// Implement command "icheck"
fn cmd_check_inode(&self, ino: u64) -> Result<Option<Value>, anyhow::Error> {
self.rafs_meta.walk_dir(
self.walk_dir(
self.rafs_meta.superblock.root_ino(),
None,
&mut |inode, path| {
None,
&mut |parent, inode, path| {
if inode.ino() == ino {
println!(r#"{}"#, path.to_string_lossy(),);
self.stat_single_file(ino)?;
self.stat_single_file(parent, ino)?;
}
Ok(())
},
@ -380,14 +453,41 @@ Blob ID: {}
}
impl RafsInspector {
/// Get file name of the inode, the rafs v6 file is handled separately.
fn get_file_name(&self, parent_inode: &dyn RafsInode, inode: &dyn RafsInode) -> OsString {
let mut filename = OsString::from("");
if self.rafs_meta.meta.is_v6() && !inode.is_dir() {
parent_inode
.walk_children_inodes(
0,
&mut |_inode: Option<Arc<dyn RafsInode>>, name: OsString, cur_ino, _offset| {
if cur_ino == inode.ino() {
filename = name;
}
Ok(PostWalkAction::Continue)
},
)
.unwrap();
} else {
filename = inode.name();
}
filename
}
// print information of single file
fn stat_single_file(&self, ino: u64) -> Result<Option<Value>, anyhow::Error> {
fn stat_single_file(
&self,
parent_ino: Option<u64>,
ino: u64,
) -> Result<Option<Value>, anyhow::Error> {
// get RafsInode of current ino
let inode = self.rafs_meta.get_inode(ino, false)?;
let inode_attr = inode.get_attr();
println!(
r#"
if let Some(parent_ino) = parent_ino {
let parent = self.rafs_meta.superblock.get_inode(parent_ino, false)?;
println!(
r#"
Inode Number: {inode_number}
Name: {name:?}
Size: {size}
@ -400,19 +500,20 @@ GID: {gid}
Mtime: {mtime}
MtimeNsec: {mtime_nsec}
Blocks: {blocks}"#,
inode_number = inode.ino(),
name = inode.name(),
size = inode.size(),
parent = inode.parent(),
mode = inode_attr.mode,
permissions = Permissions::from_mode(inode_attr.mode).mode(),
nlink = inode_attr.nlink,
uid = inode_attr.uid,
gid = inode_attr.gid,
mtime = inode_attr.mtime,
mtime_nsec = inode_attr.mtimensec,
blocks = inode_attr.blocks,
);
inode_number = inode.ino(),
name = self.get_file_name(parent.as_ref(), inode.as_ref()),
size = inode.size(),
parent = parent.ino(),
mode = inode_attr.mode,
permissions = Permissions::from_mode(inode_attr.mode).mode(),
nlink = inode_attr.nlink,
uid = inode_attr.uid,
gid = inode_attr.gid,
mtime = inode_attr.mtime,
mtime_nsec = inode_attr.mtimensec,
blocks = inode_attr.blocks,
);
}
Ok(None)
}

View File

@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
#![deny(warnings)]
#[macro_use(crate_authors, crate_version)]
#[macro_use(crate_authors)]
extern crate clap;
#[macro_use]
extern crate anyhow;
@ -542,7 +542,7 @@ fn init_log(matches: &ArgMatches) -> Result<()> {
}
fn main() -> Result<()> {
let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
let (bti_string, build_info) = BuildTimeInfo::dump();
let cmd = prepare_cmd_args(bti_string);

View File

@ -4,7 +4,7 @@
#![deny(warnings)]
#[macro_use(crate_authors, crate_version)]
#[macro_use(crate_authors)]
extern crate clap;
#[macro_use]
extern crate anyhow;
@ -25,11 +25,13 @@ mod commands;
use commands::{
CommandBackend, CommandCache, CommandDaemon, CommandFsStats, CommandMount, CommandUmount,
};
use nydus_app::BuildTimeInfo;
#[tokio::main]
async fn main() -> Result<()> {
let (_, build_info) = BuildTimeInfo::dump();
let app = App::new("A client to query and configure the nydusd daemon\n")
.version(crate_version!())
.version(build_info.package_ver.as_str())
.author(crate_authors!())
.arg(
Arg::with_name("sock")

View File

@ -5,7 +5,6 @@
// SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
#![deny(warnings)]
#![allow(dead_code)]
#[macro_use(crate_version)]
extern crate clap;
#[macro_use]
extern crate log;
@ -263,7 +262,7 @@ fn append_fuse_options(app: App<'static, 'static>) -> App<'static, 'static> {
Arg::with_name("threads")
.long("thread-num")
.short("T")
.default_value("1")
.default_value("4")
.help("Number of worker threads to serve IO requests")
.takes_value(true)
.required(false)
@ -707,7 +706,7 @@ fn process_singleton_arguments(
}
fn main() -> Result<()> {
let (bti_string, bti) = BuildTimeInfo::dump(crate_version!());
let (bti_string, bti) = BuildTimeInfo::dump();
let cmd_options = prepare_commandline_options().version(bti_string.as_str());
let args = cmd_options.clone().get_matches();
let logging_file = args.value_of("log-file").map(|l| l.into());
@ -722,7 +721,7 @@ fn main() -> Result<()> {
setup_logging(logging_file, level, rotation_size)?;
dump_program_info(crate_version!());
dump_program_info();
handle_rlimit_nofile_option(&args, "rlimit-nofile")?;
match args.subcommand_name() {

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-storage"
version = "0.5.0"
version = "0.5.1"
description = "Storage subsystem for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"

View File

@ -12,7 +12,6 @@ use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use arc_swap::ArcSwapOption;
use log::{max_level, Level};
use reqwest::header::{HeaderName, HeaderValue};
@ -134,7 +133,7 @@ impl<R: Read + Send + 'static> Read for Progress<R> {
/// HTTP request data to send to server.
#[derive(Clone)]
pub enum ReqBody<R> {
pub enum ReqBody<R: Clone> {
Read(Progress<R>, usize),
Buf(Vec<u8>),
Form(HashMap<String, String>),
@ -222,30 +221,42 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Re
#[derive(Debug)]
pub(crate) struct Connection {
client: Client,
proxy: Option<Proxy>,
mirror_state: MirrorState,
shutdown: AtomicBool,
}
#[derive(Debug)]
pub(crate) struct MirrorState {
mirrors: Vec<Arc<Mirror>>,
/// Current mirror, None if there is no mirror available.
current: ArcSwapOption<Mirror>,
proxy: Option<Arc<Proxy>>,
pub mirrors: Vec<Arc<Mirror>>,
pub shutdown: AtomicBool,
}
#[derive(Debug)]
pub(crate) struct Mirror {
/// Information for mirror from configuration file.
config: MirrorConfig,
pub config: MirrorConfig,
/// Mirror status, it will be set to false by atomic operation when mirror is not work.
status: AtomicBool,
/// Falied times for mirror, the status will be marked as false when failed_times = failure_limit.
/// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit.
failed_times: AtomicU8,
/// Failed limit for mirror.
/// Failure count for which mirror is considered unavailable.
failure_limit: u8,
}
impl Mirror {
/// Convert original URL to mirror URL.
fn mirror_url(&self, url: &str) -> ConnectionResult<Url> {
let mirror_host = Url::parse(self.config.host.as_ref()).map_err(ConnectionError::Url)?;
let mut current_url = Url::parse(url).map_err(ConnectionError::Url)?;
current_url
.set_scheme(mirror_host.scheme())
.map_err(|_| ConnectionError::Scheme)?;
current_url
.set_host(mirror_host.host_str())
.map_err(|_| ConnectionError::Host)?;
current_url
.set_port(mirror_host.port())
.map_err(|_| ConnectionError::Port)?;
Ok(current_url)
}
}
impl Connection {
/// Create a new connection according to the configuration.
pub fn new(config: &ConnectionConfig) -> Result<Arc<Connection>> {
@ -258,13 +269,13 @@ impl Connection {
} else {
None
};
Some(Proxy {
Some(Arc::new(Proxy {
client: Self::build_connection(&config.proxy.url, config)?,
health: ProxyHealth::new(config.proxy.check_interval, ping_url),
fallback: config.proxy.fallback,
use_http: config.proxy.use_http,
replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
})
}))
} else {
None
};
@ -275,34 +286,34 @@ impl Connection {
mirrors.push(Arc::new(Mirror {
config: mirror_config.clone(),
status: AtomicBool::from(true),
// Maybe read from configuration file
failure_limit: 5,
failed_times: AtomicU8::from(0),
failure_limit: mirror_config.failure_limit,
}));
}
}
let current = if let Some(first_mirror) = mirrors.first() {
ArcSwapOption::from(Some(first_mirror.clone()))
} else {
ArcSwapOption::from(None)
};
let connection = Arc::new(Connection {
client,
proxy,
mirror_state: MirrorState { mirrors, current },
mirrors,
shutdown: AtomicBool::new(false),
});
if let Some(proxy) = &connection.proxy {
if proxy.health.ping_url.is_some() {
let conn = connection.clone();
let connect_timeout = config.connect_timeout;
// Start proxy's health checking thread.
connection.start_proxy_health_thread(config.connect_timeout as u64);
// Start mirrors' health checking thread.
connection.start_mirrors_health_thread(config.timeout as u64);
Ok(connection)
}
fn start_proxy_health_thread(&self, connect_timeout: u64) {
if let Some(proxy) = self.proxy.as_ref() {
if proxy.health.ping_url.is_some() {
let proxy = proxy.clone();
// Spawn thread to update the health status of proxy server
thread::spawn(move || {
let proxy = conn.proxy.as_ref().unwrap();
let ping_url = proxy.health.ping_url.as_ref().unwrap();
let mut last_success = true;
@ -333,20 +344,60 @@ impl Connection {
proxy.health.set(false)
});
if conn.shutdown.load(Ordering::Acquire) {
break;
}
thread::sleep(proxy.health.check_interval);
if conn.shutdown.load(Ordering::Acquire) {
break;
}
}
});
}
}
// TODO: check mirrors' health
}
Ok(connection)
fn start_mirrors_health_thread(&self, timeout: u64) {
for mirror in self.mirrors.iter() {
let mirror_cloned = mirror.clone();
thread::spawn(move || {
let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() {
format!("{}/v2", mirror_cloned.config.host)
} else {
mirror_cloned.config.ping_url.clone()
};
info!("Mirror health checking url: {}", mirror_health_url);
let client = Client::new();
loop {
// Try to recover the mirror server when it is unavailable.
if !mirror_cloned.status.load(Ordering::Relaxed) {
info!(
"Mirror server {} unhealthy, try to recover",
mirror_cloned.config.host
);
let _ = client
.get(mirror_health_url.as_str())
.timeout(Duration::from_secs(timeout as u64))
.send()
.map(|resp| {
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
// the mirror server is recovered.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
info!("Mirror server {} recovered", mirror_cloned.config.host);
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
mirror_cloned.status.store(true, Ordering::Relaxed);
}
})
.map_err(|e| {
warn!(
"Mirror server {} is not recovered: {}",
mirror_cloned.config.host, e
);
});
}
thread::sleep(Duration::from_secs(
mirror_cloned.config.health_check_interval,
));
}
});
}
}
/// Shutdown the connection.
@ -354,8 +405,15 @@ impl Connection {
self.shutdown.store(true, Ordering::Release);
}
/// Send a request to server and wait for response.
pub fn call<R: Read + Send + 'static>(
/// If the auth_through is enable, all requests are send to the mirror server.
/// If the auth_through disabled, e.g. P2P/Dragonfly, we try to avoid sending
/// non-authorization request to the mirror server, which causes performance loss.
/// requesting_auth means this request is to get authorization from a server,
/// which must be a non-authorization request.
/// IOW, only the requesting_auth is false and the headers contain authorization token,
/// we send this request to mirror.
#[allow(clippy::too_many_arguments)]
pub fn call<R: Read + Clone + Send + 'static>(
&self,
method: Method,
url: &str,
@ -363,6 +421,8 @@ impl Connection {
data: Option<ReqBody<R>>,
headers: &mut HeaderMap,
catch_status: bool,
// This means the request is dedicated to authorization.
requesting_auth: bool,
) -> ConnectionResult<Response> {
if self.shutdown.load(Ordering::Acquire) {
return Err(ConnectionError::Disconnected);
@ -370,11 +430,7 @@ impl Connection {
if let Some(proxy) = &self.proxy {
if proxy.health.ok() {
let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
_ => None,
};
let data_cloned = data.as_ref().cloned();
let http_url: Option<String>;
let mut replaced_url = url;
@ -425,93 +481,83 @@ impl Connection {
}
}
let current_mirror = self.mirror_state.current.load();
if let Some(mirror) = current_mirror.as_ref() {
let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
_ => None,
};
let mirror_host = Url::parse(&mirror.config.host).map_err(ConnectionError::Url)?;
let mut current_url = Url::parse(url).map_err(ConnectionError::Url)?;
current_url
.set_scheme(mirror_host.scheme())
.map_err(|_| ConnectionError::Scheme)?;
current_url
.set_host(mirror_host.host_str())
.map_err(|_| ConnectionError::Host)?;
current_url
.set_port(mirror_host.port())
.map_err(|_| ConnectionError::Port)?;
if let Some(working_headers) = &mirror.config.headers {
for (key, value) in working_headers.iter() {
headers.insert(
HeaderName::from_str(key).unwrap(),
HeaderValue::from_str(value).unwrap(),
);
if !self.mirrors.is_empty() {
let mut fallback_due_auth = false;
for mirror in self.mirrors.iter() {
// With configuration `auth_through` disabled, we should not intend to send authentication
// request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when
// relaying non-data requests. But it's still possible that ever returned token is expired.
// So mirror might still respond us with status code UNAUTHORIZED, which should be handle
// by sending authentication request to the original registry.
//
// - For non-authentication request with token in request header, handle is as usual requests to registry.
// This request should already take token in header.
// - For authentication request
// 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler.
// 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED.
if !mirror.config.auth_through
&& (!headers.contains_key(HEADER_AUTHORIZATION) || requesting_auth)
{
fallback_due_auth = true;
break;
}
}
let result = self.call_inner(
&self.client,
method.clone(),
current_url.to_string().as_str(),
&query,
data_cloned,
headers,
catch_status,
false,
);
if mirror.status.load(Ordering::Relaxed) {
let data_cloned = data.as_ref().cloned();
match result {
Ok(resp) => {
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
return Ok(resp);
}
}
Err(err) => {
warn!(
"request mirror server failed, mirror: {}, error: {:?}",
format!("{:?}", mirror).to_lowercase(),
err
);
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
warn!(
"reach to fail limit {}, disable mirror: {}",
mirror.failure_limit,
format!("{:?}", mirror).to_lowercase()
for (key, value) in mirror.config.headers.iter() {
headers.insert(
HeaderName::from_str(key).unwrap(),
HeaderValue::from_str(value).unwrap(),
);
mirror.status.store(false, Ordering::Relaxed);
}
let mut idx = 0;
loop {
if idx == self.mirror_state.mirrors.len() {
break None;
}
let m = &self.mirror_state.mirrors[idx];
if m.status.load(Ordering::Relaxed) {
warn!(
"mirror server has been changed to {}",
format!("{:?}", m).to_lowercase()
);
break Some(m);
}
let current_url = mirror.mirror_url(url)?;
debug!("mirror server url {}", current_url);
idx += 1;
let result = self.call_inner(
&self.client,
method.clone(),
current_url.as_str(),
&query,
data_cloned,
headers,
catch_status,
false,
);
match result {
Ok(resp) => {
// If the response status >= INTERNAL_SERVER_ERROR, move to the next mirror server.
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
return Ok(resp);
}
}
Err(err) => {
warn!(
"request mirror server failed, mirror: {:?}, error: {:?}",
mirror, err
);
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
warn!(
"reach to failure limit {}, disable mirror: {:?}",
mirror.failure_limit, mirror
);
mirror.status.store(false, Ordering::Relaxed);
}
}
.map(|m| self.mirror_state.current.store(Some(m.clone())))
.unwrap_or_else(|| self.mirror_state.current.store(None));
}
}
// Remove mirror-related headers to avoid sending them to the next mirror server and original registry.
for (key, _) in mirror.config.headers.iter() {
headers.remove(HeaderName::from_str(key).unwrap());
}
}
if !fallback_due_auth {
warn!("Request to all mirror server failed, fallback to original server.");
}
warn!("Failed to request mirror server, fallback to original server.");
}
self.call_inner(
@ -555,7 +601,7 @@ impl Connection {
}
#[allow(clippy::too_many_arguments)]
fn call_inner<R: Read + Send + 'static>(
fn call_inner<R: Read + Clone + Send + 'static>(
&self,
client: &Client,
method: Method,

View File

@ -143,7 +143,15 @@ impl BlobReader for OssReader {
let resp = self
.connection
.call::<&[u8]>(Method::HEAD, url.as_str(), None, None, &mut headers, true)
.call::<&[u8]>(
Method::HEAD,
url.as_str(),
None,
None,
&mut headers,
true,
false,
)
.map_err(OssError::Request)?;
let content_length = resp
.headers()
@ -178,7 +186,15 @@ impl BlobReader for OssReader {
// Safe because the the call() is a synchronous operation.
let mut resp = self
.connection
.call::<&[u8]>(Method::GET, url.as_str(), None, None, &mut headers, true)
.call::<&[u8]>(
Method::GET,
url.as_str(),
None,
None,
&mut headers,
true,
false,
)
.map_err(OssError::Request)?;
Ok(resp
.copy_to(&mut buf)

View File

@ -5,8 +5,12 @@
//! Storage backend driver to access blobs on container image registry.
use std::collections::HashMap;
use std::io::{Error, Read, Result};
use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use arc_swap::ArcSwapOption;
use reqwest::blocking::Response;
pub use reqwest::header::HeaderMap;
use reqwest::header::{HeaderValue, CONTENT_LENGTH};
@ -102,6 +106,12 @@ impl HashCache {
#[derive(Clone, serde::Deserialize)]
struct TokenResponse {
token: String,
#[serde(default = "default_expires_in")]
expires_in: u64,
}
fn default_expires_in() -> u64 {
10 * 60
}
#[derive(Debug)]
@ -110,7 +120,7 @@ struct BasicAuth {
realm: String,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct BearerAuth {
realm: String,
service: String,
@ -149,6 +159,11 @@ struct RegistryState {
// Cache 30X redirect url
// Example: RwLock<HashMap<"<blob_id>", "<redirected_url>">>
cached_redirect: HashCache,
// The expiration time of the token, which is obtained from the registry server.
refresh_token_time: ArcSwapOption<u64>,
// Cache bearer auth for refreshing token.
cached_bearer_auth: ArcSwapOption<BearerAuth>,
}
impl RegistryState {
@ -198,6 +213,7 @@ impl RegistryState {
Some(ReqBody::Form(form)),
&mut headers,
true,
true,
)
.map_err(|e| einval!(format!("registry auth server request failed {:?}", e)))?;
let ret: TokenResponse = token_resp.json().map_err(|e| {
@ -206,6 +222,19 @@ impl RegistryState {
e
))
})?;
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
self.refresh_token_time
.store(Some(Arc::new(now_timestamp.as_secs() + ret.expires_in)));
info!(
"cached_bearer_auth: {:?}, next time: {}",
auth,
now_timestamp.as_secs() + ret.expires_in
);
}
// Cache bearer auth for refreshing token.
self.cached_bearer_auth.store(Some(Arc::new(auth)));
Ok(ret.token)
}
@ -312,7 +341,7 @@ impl RegistryReader {
/// Request: POST https://my-registry.com/test/repo/blobs/uploads
/// header: authorization: Basic base64(<username:password>)
/// Response: status: 200 Ok
fn request<R: Read + Send + 'static>(
fn request<R: Read + Clone + Send + 'static>(
&self,
method: Method,
url: &str,
@ -336,16 +365,40 @@ impl RegistryReader {
if let Some(data) = data {
return self
.connection
.call(method, url, None, Some(data), &mut headers, catch_status)
.call(
method,
url,
None,
Some(data),
&mut headers,
catch_status,
false,
)
.map_err(RegistryError::Request);
}
// Try to request registry server with `authorization` header
let resp = self
let mut resp = self
.connection
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false)
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false, false)
.map_err(RegistryError::Request)?;
if resp.status() == StatusCode::UNAUTHORIZED {
if headers.contains_key(HEADER_AUTHORIZATION) {
// If we request registry (harbor server) with expired authorization token,
// the `www-authenticate: Basic realm="harbor"` in response headers is not expected.
// Related code in harbor:
// https://github.com/goharbor/harbor/blob/v2.5.3/src/server/middleware/v2auth/auth.go#L98
//
// We can remove the expired authorization token and
// resend the request to get the correct "www-authenticate" value.
headers.remove(HEADER_AUTHORIZATION);
resp = self
.connection
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false, false)
.map_err(RegistryError::Request)?;
};
if let Some(resp_auth_header) = resp.headers().get(HEADER_WWW_AUTHENTICATE) {
// Get token from registry authorization server
if let Some(auth) = RegistryState::parse_auth(resp_auth_header, &self.state.auth) {
@ -353,6 +406,7 @@ impl RegistryReader {
.state
.get_auth_header(auth, &self.connection)
.map_err(|e| RegistryError::Common(e.to_string()))?;
headers.insert(
HEADER_AUTHORIZATION,
HeaderValue::from_str(auth_header.as_str()).unwrap(),
@ -361,7 +415,7 @@ impl RegistryReader {
// Try to request registry server with `authorization` header again
let resp = self
.connection
.call(method, url, None, data, &mut headers, catch_status)
.call(method, url, None, data, &mut headers, catch_status, false)
.map_err(RegistryError::Request)?;
let status = resp.status();
@ -417,6 +471,7 @@ impl RegistryReader {
None,
&mut headers,
false,
false,
)
.map_err(RegistryError::Request)?;
@ -473,6 +528,7 @@ impl RegistryReader {
None,
&mut headers,
true,
false,
)
.map_err(RegistryError::Request);
match resp_ret {
@ -577,13 +633,27 @@ impl Registry {
blob_url_scheme: config.blob_url_scheme,
blob_redirected_host: config.blob_redirected_host,
cached_redirect: HashCache::new(),
refresh_token_time: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
});
Ok(Registry {
let mirrors = connection.mirrors.clone();
let registry = Registry {
connection,
state,
metrics: BackendMetrics::new(id, "registry"),
})
};
for mirror in mirrors.iter() {
if !mirror.config.auth_through {
registry.start_refresh_token_thread();
info!("Refresh token thread started.");
break;
}
}
Ok(registry)
}
fn get_authorization_info(auth: &Option<String>) -> Result<(String, String)> {
@ -610,6 +680,72 @@ impl Registry {
Ok((String::new(), String::new()))
}
}
fn start_refresh_token_thread(&self) {
let conn = self.connection.clone();
let state = self.state.clone();
// The default refresh token internal is 10 minutes.
let refresh_check_internal = 10 * 60;
thread::spawn(move || {
loop {
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
if let Some(next_refresh_timestamp) = state.refresh_token_time.load().as_deref()
{
// If the token will expire in next refresh check internal, get new token now.
// Add 20 seconds to handle critical cases.
if now_timestamp.as_secs() + refresh_check_internal + 20
>= *next_refresh_timestamp
{
if let Some(cached_bearer_auth) =
state.cached_bearer_auth.load().as_deref()
{
let mut cached_bearer_auth_clone = cached_bearer_auth.clone();
if let Ok(url) = Url::parse(&cached_bearer_auth_clone.realm) {
let last_cached_auth = state.cached_auth.get();
let query: Vec<(_, _)> =
url.query_pairs().filter(|p| p.0 != "grant_type").collect();
let mut refresh_url = url.clone();
refresh_url.set_query(None);
for pair in query {
refresh_url.query_pairs_mut().append_pair(
&pair.0.to_string()[..],
&pair.1.to_string()[..],
);
}
refresh_url
.query_pairs_mut()
.append_pair("grant_type", "refresh_token");
cached_bearer_auth_clone.realm = refresh_url.to_string();
let token = state.get_token(cached_bearer_auth_clone, &conn);
if let Ok(token) = token {
let new_cached_auth = format!("Bearer {}", token);
info!(
"Authorization token for registry has been refreshed."
);
// Refresh authorization token
state.cached_auth.set(&last_cached_auth, new_cached_auth);
}
}
}
}
}
}
if conn.shutdown.load(Ordering::Acquire) {
break;
}
thread::sleep(Duration::from_secs(refresh_check_internal));
if conn.shutdown.load(Ordering::Acquire) {
break;
}
}
});
}
}
impl BlobBackend for Registry {
@ -695,6 +831,8 @@ mod tests {
blob_redirected_host: "oss.alibaba-inc.com".to_string(),
cached_auth: Default::default(),
cached_redirect: Default::default(),
refresh_token_time: ArcSwapOption::new(None),
cached_bearer_auth: ArcSwapOption::new(None),
};
assert_eq!(

View File

@ -579,7 +579,28 @@ impl FileCacheEntry {
}
if !bitmap.wait_for_range_ready(chunk_index, count)? {
Err(eio!("failed to read data from storage backend"))
if prefetch {
return Err(eio!("failed to read data from storage backend"));
}
// if we are in ondemand path, retry for the timeout chunks
for chunk in chunks {
if self.chunk_map.is_ready(chunk.as_ref())? {
continue;
}
info!("retry for timeout chunk, {}", chunk.id());
let mut buf = alloc_buf(chunk.uncompressed_size() as usize);
self.read_raw_chunk(chunk.as_ref(), &mut buf, false, None)
.map_err(|e| eio!(format!("read_raw_chunk failed, {:?}", e)))?;
if self.dio_enabled {
self.adjust_buffer_for_dio(&mut buf)
}
Self::persist_chunk(&self.file, chunk.uncompressed_offset(), &buf)
.map_err(|e| eio!(format!("do_fetch_chunk failed to persist data, {:?}", e)))?;
self.chunk_map
.set_ready_and_clear_pending(chunk.as_ref())
.unwrap_or_else(|e| error!("set chunk ready failed, {}", e));
}
Ok(total_size)
} else {
Ok(total_size)
}

View File

@ -1,6 +1,6 @@
[package]
name = "nydus-utils"
version = "0.3.1"
version = "0.3.2"
description = "Utilities and helpers for Nydus Image Service"
authors = ["The Nydus Developers"]
license = "Apache-2.0 OR BSD-3-Clause"