Compare commits
28 Commits
Author | SHA1 | Date |
---|---|---|
|
2fd7070bf7 | |
|
a514f66851 | |
|
34d84cac91 | |
|
41fefcdbae | |
|
29a9af49a4 | |
|
2496bc98f3 | |
|
36b4edb638 | |
|
dd0a0d8522 | |
|
29af7a1267 | |
|
07788809a2 | |
|
405c79de17 | |
|
c8b21e3529 | |
|
67e0cc6f32 | |
|
121a108ac9 | |
|
e9a774c2ee | |
|
7975d09dc3 | |
|
8c9c73b5b7 | |
|
c7eaa2e858 | |
|
6a0bef4ce6 | |
|
8a32d5b61e | |
|
5ac9831130 | |
|
c75d3fbfcf | |
|
91d26745e2 | |
|
a51a7185f1 | |
|
afaf75cfff | |
|
c28585f06f | |
|
fd588c918f | |
|
3b15cf50a5 |
|
@ -102,11 +102,9 @@ jobs:
|
|||
"target": "musl"
|
||||
}
|
||||
EOF
|
||||
- name: run test_api
|
||||
- name: run e2e tests
|
||||
run: |
|
||||
cd /home/runner/work/image-service/image-service/contrib/nydus-test
|
||||
sudo mkdir -p /blobdir
|
||||
sudo python3 nydus_test_config.py --dist fs_structure.yaml
|
||||
sudo pytest -vs --durations=0 functional-test/test_api.py \
|
||||
functional-test/test_nydus.py \
|
||||
functional-test/test_layered_image.py
|
||||
sudo pytest -vs -x --durations=0 functional-test/test_api.py functional-test/test_nydus.py functional-test/test_layered_image.py
|
||||
|
|
|
@ -205,7 +205,7 @@ jobs:
|
|||
with:
|
||||
name: "Nydus Image Service ${{ env.tag }}"
|
||||
body: |
|
||||
Mirror (update in 10 min): https://registry.npmmirror.com/binary.html?path=nydus/${{ env.tag }}/
|
||||
Binaries download mirror (sync within a few hours): https://registry.npmmirror.com/binary.html?path=nydus/${{ env.tag }}/
|
||||
generate_release_notes: true
|
||||
files: |
|
||||
${{ env.tarballs }}
|
||||
|
|
|
@ -928,7 +928,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-api"
|
||||
version = "0.1.1"
|
||||
version = "0.1.2"
|
||||
dependencies = [
|
||||
"dbs-uhttp",
|
||||
"http",
|
||||
|
@ -947,7 +947,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-app"
|
||||
version = "0.3.0"
|
||||
version = "0.3.1"
|
||||
dependencies = [
|
||||
"flexi_logger",
|
||||
"libc",
|
||||
|
@ -962,7 +962,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-blobfs"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
dependencies = [
|
||||
"fuse-backend-rs",
|
||||
"libc",
|
||||
|
@ -978,7 +978,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-error"
|
||||
version = "0.2.1"
|
||||
version = "0.2.2"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"httpdate",
|
||||
|
@ -990,7 +990,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-rafs"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
@ -1022,7 +1022,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-rs"
|
||||
version = "2.1.0-rc.3.1"
|
||||
version = "0.0.0-git"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
|
@ -1068,7 +1068,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-storage"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arc-swap",
|
||||
|
@ -1100,7 +1100,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "nydus-utils"
|
||||
version = "0.3.1"
|
||||
version = "0.3.2"
|
||||
dependencies = [
|
||||
"blake3",
|
||||
"flate2",
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
[package]
|
||||
name = "nydus-rs"
|
||||
version = "2.1.0-rc.3.1"
|
||||
# will be overridden by real git tag during cargo build
|
||||
version = "0.0.0-git"
|
||||
description = "Nydus Image Service"
|
||||
authors = ["The Nydus Developers"]
|
||||
license = "Apache-2.0 OR BSD-3-Clause"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nydus-api"
|
||||
version = "0.1.1"
|
||||
version = "0.1.2"
|
||||
description = "APIs for Nydus Image Service"
|
||||
authors = ["The Nydus Developers"]
|
||||
license = "Apache-2.0 OR BSD-3-Clause"
|
||||
|
|
|
@ -423,11 +423,41 @@ impl Default for ProxyConfig {
|
|||
|
||||
/// Configuration for mirror.
|
||||
#[derive(Clone, Deserialize, Serialize, Debug)]
|
||||
#[serde(default)]
|
||||
pub struct MirrorConfig {
|
||||
/// Mirror server URL, for example http://127.0.0.1:65001
|
||||
/// Mirror server URL, for example http://127.0.0.1:65001.
|
||||
pub host: String,
|
||||
/// HTTP request headers to be passed to mirror server
|
||||
pub headers: Option<HashMap<String, String>>,
|
||||
/// HTTP request headers to be passed to mirror server.
|
||||
#[serde(default)]
|
||||
pub headers: HashMap<String, String>,
|
||||
/// Whether the authorization process is through mirror? default false.
|
||||
/// true: authorization through mirror, e.g. Using normal registry as mirror.
|
||||
/// false: authorization through original registry,
|
||||
/// e.g. when using Dragonfly server as mirror, authorization through it will affect performance.
|
||||
#[serde(default)]
|
||||
pub auth_through: bool,
|
||||
/// Interval of mirror health checking, in seconds.
|
||||
#[serde(default = "default_check_interval")]
|
||||
pub health_check_interval: u64,
|
||||
/// Failure count for which mirror is considered unavailable.
|
||||
#[serde(default = "default_failure_limit")]
|
||||
pub failure_limit: u8,
|
||||
/// Ping URL to check mirror server health.
|
||||
#[serde(default)]
|
||||
pub ping_url: String,
|
||||
}
|
||||
|
||||
impl Default for MirrorConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
host: String::new(),
|
||||
headers: HashMap::new(),
|
||||
auth_through: false,
|
||||
health_check_interval: 5,
|
||||
failure_limit: 5,
|
||||
ping_url: String::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -944,6 +974,14 @@ fn default_http_timeout() -> u32 {
|
|||
5
|
||||
}
|
||||
|
||||
fn default_check_interval() -> u64 {
|
||||
5
|
||||
}
|
||||
|
||||
fn default_failure_limit() -> u8 {
|
||||
5
|
||||
}
|
||||
|
||||
fn default_work_dir() -> String {
|
||||
".".to_string()
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nydus-app"
|
||||
version = "0.3.0"
|
||||
version = "0.3.1"
|
||||
authors = ["The Nydus Developers"]
|
||||
description = "Application framework for Nydus Image Service"
|
||||
readme = "README.md"
|
||||
|
|
|
@ -39,7 +39,7 @@ use nydus_app::{BuildTimeInfo, setup_logging};
|
|||
|
||||
fn main() -> Result<()> {
|
||||
let level = cmd.value_of("log-level").unwrap().parse().unwrap();
|
||||
let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
|
||||
let (bti_string, build_info) = BuildTimeInfo::dump();
|
||||
let _cmd = App::new("")
|
||||
.version(bti_string.as_str())
|
||||
.author(crate_authors!())
|
||||
|
|
14
app/build.rs
14
app/build.rs
|
@ -28,7 +28,17 @@ fn get_git_commit_hash() -> String {
|
|||
return commit.to_string();
|
||||
}
|
||||
}
|
||||
"Unknown".to_string()
|
||||
"unknown".to_string()
|
||||
}
|
||||
|
||||
fn get_git_commit_version() -> String {
|
||||
let tag = Command::new("git").args(&["describe", "--tags"]).output();
|
||||
if let Ok(tag) = tag {
|
||||
if let Some(tag) = String::from_utf8_lossy(&tag.stdout).lines().next() {
|
||||
return tag.to_string();
|
||||
}
|
||||
}
|
||||
"unknown".to_string()
|
||||
}
|
||||
|
||||
fn main() {
|
||||
|
@ -43,10 +53,12 @@ fn main() {
|
|||
.format(&time::format_description::well_known::Iso8601::DEFAULT)
|
||||
.unwrap();
|
||||
let git_commit_hash = get_git_commit_hash();
|
||||
let git_commit_version = get_git_commit_version();
|
||||
|
||||
println!("cargo:rerun-if-changed=../git/HEAD");
|
||||
println!("cargo:rustc-env=RUSTC_VERSION={}", rustc_ver);
|
||||
println!("cargo:rustc-env=PROFILE={}", profile);
|
||||
println!("cargo:rustc-env=BUILT_TIME_UTC={}", build_time);
|
||||
println!("cargo:rustc-env=GIT_COMMIT_HASH={}", git_commit_hash);
|
||||
println!("cargo:rustc-env=GIT_COMMIT_VERSION={}", git_commit_version);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
//!
|
||||
//! fn main() -> Result<()> {
|
||||
//! let level = cmd.value_of("log-level").unwrap().parse().unwrap();
|
||||
//! let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
|
||||
//! let (bti_string, build_info) = BuildTimeInfo::dump();
|
||||
//! let _cmd = App::new("")
|
||||
//! .version(bti_string.as_str())
|
||||
//! .author(crate_authors!())
|
||||
|
@ -65,14 +65,15 @@ pub mod built_info {
|
|||
pub const PROFILE: &str = env!("PROFILE");
|
||||
pub const RUSTC_VERSION: &str = env!("RUSTC_VERSION");
|
||||
pub const BUILT_TIME_UTC: &str = env!("BUILT_TIME_UTC");
|
||||
pub const GIT_COMMIT_VERSION: &str = env!("GIT_COMMIT_VERSION");
|
||||
pub const GIT_COMMIT_HASH: &str = env!("GIT_COMMIT_HASH");
|
||||
}
|
||||
|
||||
/// Dump program build and version information.
|
||||
pub fn dump_program_info(prog_version: &str) {
|
||||
pub fn dump_program_info() {
|
||||
info!(
|
||||
"Program Version: {}, Git Commit: {:?}, Build Time: {:?}, Profile: {:?}, Rustc Version: {:?}",
|
||||
prog_version,
|
||||
built_info::GIT_COMMIT_VERSION,
|
||||
built_info::GIT_COMMIT_HASH,
|
||||
built_info::BUILT_TIME_UTC,
|
||||
built_info::PROFILE,
|
||||
|
@ -91,10 +92,10 @@ pub struct BuildTimeInfo {
|
|||
}
|
||||
|
||||
impl BuildTimeInfo {
|
||||
pub fn dump(package_ver: &str) -> (String, Self) {
|
||||
pub fn dump() -> (String, Self) {
|
||||
let info_string = format!(
|
||||
"\rVersion: \t{}\nGit Commit: \t{}\nBuild Time: \t{}\nProfile: \t{}\nRustc: \t\t{}\n",
|
||||
package_ver,
|
||||
built_info::GIT_COMMIT_VERSION,
|
||||
built_info::GIT_COMMIT_HASH,
|
||||
built_info::BUILT_TIME_UTC,
|
||||
built_info::PROFILE,
|
||||
|
@ -102,7 +103,7 @@ impl BuildTimeInfo {
|
|||
);
|
||||
|
||||
let info = Self {
|
||||
package_ver: package_ver.to_string(),
|
||||
package_ver: built_info::GIT_COMMIT_VERSION.to_string(),
|
||||
git_commit: built_info::GIT_COMMIT_HASH.to_string(),
|
||||
build_time: built_info::BUILT_TIME_UTC.to_string(),
|
||||
profile: built_info::PROFILE.to_string(),
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nydus-blobfs"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
description = "Blob object file system for Nydus Image Service"
|
||||
authors = ["The Nydus Developers"]
|
||||
license = "Apache-2.0 OR BSD-3-Clause"
|
||||
|
|
|
@ -43,10 +43,9 @@
|
|||
"digest": "sha256:aec98c9e3dce739877b8f5fe1cddd339de1db2b36c20995d76f6265056dbdb08",
|
||||
"size": 273320,
|
||||
"annotations": {
|
||||
"containerd.io/snapshot/nydus-blob-ids": "[\"09845cce1d983b158d4865fc37c23bbfb892d4775c786e8114d3cf868975c059\",\"b413839e4ee5248697ef30fe9a84b659fa744d69bbc9b7754113adc2b2b6bc90\",\"b6a85be8248b0d3c2f0565ef71d549f404f8edcee1ab666c9871a8e6d9384860\",\"00d151e7d392e68e2c756a6fc42640006ddc0a98d37dba3f90a7b73f63188bbd\"]",
|
||||
"containerd.io/snapshot/nydus-bootstrap": "true",
|
||||
"containerd.io/snapshot/nydus-reference-blob-ids": "[\"09845cce1d983b158d4865fc37c23bbfb892d4775c786e8114d3cf868975c059\"]"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
|
@ -113,6 +113,7 @@ func (b *OSSBackend) Upload(ctx context.Context, blobID, blobPath string, size i
|
|||
blobObjectKey := b.objectPrefix + blobID
|
||||
|
||||
desc := blobDesc(size, blobID)
|
||||
desc.URLs = append(desc.URLs, b.remoteID(blobID))
|
||||
|
||||
if !forcePush {
|
||||
if exist, err := b.bucket.IsObjectExist(blobObjectKey); err != nil {
|
||||
|
@ -254,3 +255,7 @@ func (b *OSSBackend) Check(blobID string) (bool, error) {
|
|||
func (b *OSSBackend) Type() Type {
|
||||
return OssBackend
|
||||
}
|
||||
|
||||
func (b *OSSBackend) remoteID(blobID string) string {
|
||||
return fmt.Sprintf("oss://%s/%s%s", b.bucket.BucketName, b.objectPrefix, blobID)
|
||||
}
|
||||
|
|
|
@ -145,8 +145,8 @@ func (cache *Cache) recordToLayer(record *Record) (*ocispec.Descriptor, *ocispec
|
|||
utils.LayerAnnotationUncompressed: record.NydusBootstrapDiffID.String(),
|
||||
},
|
||||
}
|
||||
if refenceBlobsStr, ok := record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs]; ok {
|
||||
bootstrapCacheDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs] = refenceBlobsStr
|
||||
if referenceBlobsStr, ok := record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs]; ok {
|
||||
bootstrapCacheDesc.Annotations[utils.LayerAnnotationNydusReferenceBlobIDs] = referenceBlobsStr
|
||||
}
|
||||
|
||||
var blobCacheDesc *ocispec.Descriptor
|
||||
|
|
|
@ -205,14 +205,15 @@ func (rule *FilesystemRule) pullSourceImage() (*tool.Image, error) {
|
|||
func (rule *FilesystemRule) mountSourceImage() (*tool.Image, error) {
|
||||
logrus.Infof("Mounting source image to %s", rule.SourceMountPath)
|
||||
|
||||
if err := os.MkdirAll(rule.SourceMountPath, 0755); err != nil {
|
||||
return nil, errors.Wrap(err, "create mountpoint directory of source image")
|
||||
}
|
||||
|
||||
image, err := rule.pullSourceImage()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "pull source image")
|
||||
}
|
||||
|
||||
if err := image.Umount(); err != nil {
|
||||
return nil, errors.Wrap(err, "umount previous rootfs")
|
||||
}
|
||||
|
||||
if err := image.Mount(); err != nil {
|
||||
return nil, errors.Wrap(err, "mount source image")
|
||||
}
|
||||
|
|
|
@ -7,18 +7,42 @@ package tool
|
|||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/containerd/containerd/mount"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func run(cmd string, args ...string) error {
|
||||
_cmd := exec.Command("sh", "-c", cmd)
|
||||
_cmd.Stdout = os.Stdout
|
||||
_cmd.Stderr = os.Stderr
|
||||
return _cmd.Run()
|
||||
func mkMounts(dirs []string) []mount.Mount {
|
||||
var options []string
|
||||
|
||||
if len(dirs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(dirs) == 1 {
|
||||
return []mount.Mount{
|
||||
{
|
||||
Source: dirs[0],
|
||||
Type: "bind",
|
||||
Options: []string{
|
||||
"ro",
|
||||
"rbind",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
options = append(options, fmt.Sprintf("lowerdir=%s", strings.Join(dirs, ":")))
|
||||
return []mount.Mount{
|
||||
{
|
||||
Type: "overlay",
|
||||
Source: "overlay",
|
||||
Options: options,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type Image struct {
|
||||
|
@ -30,41 +54,20 @@ type Image struct {
|
|||
|
||||
// Mount mounts rootfs of OCI image.
|
||||
func (image *Image) Mount() error {
|
||||
image.Umount()
|
||||
if err := os.MkdirAll(image.Rootfs, 0750); err != nil {
|
||||
return errors.Wrap(err, "create rootfs dir")
|
||||
}
|
||||
|
||||
var dirs []string
|
||||
layerLen := len(image.Layers)
|
||||
count := len(image.Layers)
|
||||
for i := range image.Layers {
|
||||
layerDir := filepath.Join(image.SourcePath, image.Layers[layerLen-i-1].Digest.Encoded())
|
||||
layerDir := filepath.Join(image.SourcePath, image.Layers[count-i-1].Digest.Encoded())
|
||||
dirs = append(dirs, strings.ReplaceAll(layerDir, ":", "\\:"))
|
||||
}
|
||||
|
||||
lowerOption := strings.Join(dirs, ":")
|
||||
// Handle long options string overed 4096 chars, split them to
|
||||
// two overlay mounts.
|
||||
if len(lowerOption) >= 4096 {
|
||||
half := (len(dirs) - 1) / 2
|
||||
upperDirs := dirs[half+1:]
|
||||
lowerDirs := dirs[:half+1]
|
||||
lowerOverlay := image.Rootfs + "_lower"
|
||||
if err := os.MkdirAll(lowerOverlay, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := run(fmt.Sprintf(
|
||||
"mount -t overlay overlay -o lowerdir='%s' %s",
|
||||
strings.Join(upperDirs, ":"), lowerOverlay,
|
||||
)); err != nil {
|
||||
return err
|
||||
}
|
||||
lowerDirs = append(lowerDirs, lowerOverlay)
|
||||
lowerOption = strings.Join(lowerDirs, ":")
|
||||
}
|
||||
|
||||
if err := run(fmt.Sprintf(
|
||||
"mount -t overlay overlay -o lowerdir='%s' %s",
|
||||
lowerOption, image.Rootfs,
|
||||
)); err != nil {
|
||||
return err
|
||||
mounts := mkMounts(dirs)
|
||||
if err := mount.All(mounts, image.Rootfs); err != nil {
|
||||
return errors.Wrap(err, "mount source layer")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -72,21 +75,19 @@ func (image *Image) Mount() error {
|
|||
|
||||
// Umount umounts rootfs mountpoint of OCI image.
|
||||
func (image *Image) Umount() error {
|
||||
lowerOverlay := image.Rootfs + "_lower"
|
||||
if _, err := os.Stat(lowerOverlay); err == nil {
|
||||
if err := run(fmt.Sprintf("umount %s", lowerOverlay)); err != nil {
|
||||
return err
|
||||
if _, err := os.Stat(image.Rootfs); err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return errors.Wrap(err, "stat rootfs")
|
||||
}
|
||||
|
||||
if _, err := os.Stat(image.Rootfs); err == nil {
|
||||
if err := run(fmt.Sprintf("umount %s", image.Rootfs)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mount.Unmount(image.Rootfs, 0); err != nil {
|
||||
return errors.Wrap(err, "umount rootfs")
|
||||
}
|
||||
|
||||
if err := os.RemoveAll(image.SourcePath); err != nil {
|
||||
return err
|
||||
if err := os.RemoveAll(image.Rootfs); err != nil {
|
||||
return errors.Wrap(err, "remove rootfs")
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -194,19 +194,15 @@ func appendBlobs(oldBlobs []string, newBlobs []string) []string {
|
|||
return oldBlobs
|
||||
}
|
||||
|
||||
func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer) error {
|
||||
func (mm *manifestManager) Push(ctx context.Context, builtLayers []*buildLayer) error {
|
||||
var (
|
||||
blobListInAnnotation []string
|
||||
referenceBlobs []string
|
||||
layers []ocispec.Descriptor
|
||||
referenceBlobs []string
|
||||
layers []ocispec.Descriptor
|
||||
)
|
||||
for idx, _layer := range buildLayers {
|
||||
for idx, _layer := range builtLayers {
|
||||
record := _layer.GetCacheRecord()
|
||||
referenceBlobs = appendBlobs(referenceBlobs, layersHex(_layer.referenceBlobs))
|
||||
blobListInAnnotation = appendBlobs(blobListInAnnotation, layersHex(_layer.referenceBlobs))
|
||||
if record.NydusBlobDesc != nil {
|
||||
// Write blob digest list in JSON format to layer annotation of bootstrap.
|
||||
blobListInAnnotation = append(blobListInAnnotation, record.NydusBlobDesc.Digest.Hex())
|
||||
// For registry backend, we need to write the blob layer to
|
||||
// manifest to prevent them from being deleted by registry GC.
|
||||
if mm.backend.Type() == backend.RegistryBackend {
|
||||
|
@ -222,15 +218,10 @@ func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer)
|
|||
}
|
||||
}
|
||||
|
||||
// Only need to write lastest bootstrap layer in nydus manifest
|
||||
if idx == len(buildLayers)-1 {
|
||||
blobListBytes, err := json.Marshal(blobListInAnnotation)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Marshal blob list")
|
||||
}
|
||||
record.NydusBootstrapDesc.Annotations[utils.LayerAnnotationNydusBlobIDs] = string(blobListBytes)
|
||||
// Only need to write latest bootstrap layer in nydus manifest
|
||||
if idx == len(builtLayers)-1 {
|
||||
if len(referenceBlobs) > 0 {
|
||||
blobListBytes, err = json.Marshal(referenceBlobs)
|
||||
blobListBytes, err := json.Marshal(referenceBlobs)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Marshal blob list")
|
||||
}
|
||||
|
@ -250,7 +241,6 @@ func (mm *manifestManager) Push(ctx context.Context, buildLayers []*buildLayer)
|
|||
// Remove useless annotations from layer
|
||||
validAnnotationKeys := map[string]bool{
|
||||
utils.LayerAnnotationNydusBlob: true,
|
||||
utils.LayerAnnotationNydusBlobIDs: true,
|
||||
utils.LayerAnnotationNydusReferenceBlobIDs: true,
|
||||
utils.LayerAnnotationNydusBootstrap: true,
|
||||
utils.LayerAnnotationNydusFsVersion: true,
|
||||
|
|
|
@ -67,7 +67,7 @@ func (cfg *BackendConfig) rawMetaBackendCfg() []byte {
|
|||
"access_key_id": cfg.AccessKeyID,
|
||||
"access_key_secret": cfg.AccessKeySecret,
|
||||
"bucket_name": cfg.BucketName,
|
||||
"object_prefix": cfg.MetaPrefix + "/",
|
||||
"object_prefix": cfg.MetaPrefix,
|
||||
}
|
||||
b, _ := json.Marshal(configMap)
|
||||
return b
|
||||
|
@ -79,7 +79,7 @@ func (cfg *BackendConfig) rawBlobBackendCfg() []byte {
|
|||
"access_key_id": cfg.AccessKeyID,
|
||||
"access_key_secret": cfg.AccessKeySecret,
|
||||
"bucket_name": cfg.BucketName,
|
||||
"object_prefix": cfg.BlobPrefix + "/",
|
||||
"object_prefix": cfg.BlobPrefix,
|
||||
}
|
||||
b, _ := json.Marshal(configMap)
|
||||
return b
|
||||
|
@ -316,7 +316,7 @@ func (p *Packer) Pack(_ context.Context, req PackRequest) (PackResult, error) {
|
|||
return PackResult{}, errors.New("can not push image to remote due to lack of backend configuration")
|
||||
}
|
||||
pushResult, err := p.pusher.Push(PushRequest{
|
||||
Meta: bootstrapPath,
|
||||
Meta: req.ImageName,
|
||||
Blob: newBlobHash,
|
||||
ParentBlobs: parentBlobs,
|
||||
})
|
||||
|
|
|
@ -3,7 +3,6 @@ package packer
|
|||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
|
@ -69,12 +68,23 @@ func NewPusher(opt NewPusherOpt) (*Pusher, error) {
|
|||
// Push will push the meta and blob file to remote backend
|
||||
// at this moment, oss is the only possible backend, the meta file name is user defined
|
||||
// and blob file name is the hash of the blobfile that is extracted from output.json
|
||||
func (p *Pusher) Push(req PushRequest) (PushResult, error) {
|
||||
func (p *Pusher) Push(req PushRequest) (pushResult PushResult, retErr error) {
|
||||
p.logger.Info("start to push meta and blob to remote backend")
|
||||
// todo: add a suitable timeout
|
||||
ctx := context.Background()
|
||||
// todo: use blob desc to build manifest
|
||||
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
if err := p.blobBackend.Finalize(true); err != nil {
|
||||
logrus.WithError(err).Warnf("Cancel blob backend upload")
|
||||
}
|
||||
if err := p.metaBackend.Finalize(true); err != nil {
|
||||
logrus.WithError(err).Warnf("Cancel meta backend upload")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for _, blob := range req.ParentBlobs {
|
||||
// try push parent blobs
|
||||
if _, err := p.blobBackend.Upload(ctx, blob, p.blobFilePath(blob, true), 0, false); err != nil {
|
||||
|
@ -84,18 +94,30 @@ func (p *Pusher) Push(req PushRequest) (PushResult, error) {
|
|||
|
||||
p.logger.Infof("push blob %s", req.Blob)
|
||||
if req.Blob != "" {
|
||||
if _, err := p.blobBackend.Upload(ctx, req.Blob, p.blobFilePath(req.Blob, true), 0, false); err != nil {
|
||||
desc, err := p.blobBackend.Upload(ctx, req.Blob, p.blobFilePath(req.Blob, true), 0, false)
|
||||
if err != nil {
|
||||
return PushResult{}, errors.Wrap(err, "failed to put blobfile to remote")
|
||||
}
|
||||
if len(desc.URLs) > 0 {
|
||||
pushResult.RemoteBlob = desc.URLs[0]
|
||||
}
|
||||
}
|
||||
if _, err := p.metaBackend.Upload(ctx, req.Meta, p.bootstrapPath(req.Meta), 0, true); err != nil {
|
||||
return PushResult{}, errors.Wrapf(err, "failed to put metafile to remote")
|
||||
if retErr = p.blobBackend.Finalize(false); retErr != nil {
|
||||
return PushResult{}, errors.Wrap(retErr, "Finalize blob backend upload")
|
||||
}
|
||||
|
||||
return PushResult{
|
||||
RemoteMeta: fmt.Sprintf("oss://%s/%s/%s", p.cfg.BucketName, p.cfg.MetaPrefix, req.Meta),
|
||||
RemoteBlob: fmt.Sprintf("oss://%s/%s/%s", p.cfg.BucketName, p.cfg.BlobPrefix, req.Blob),
|
||||
}, nil
|
||||
desc, retErr := p.metaBackend.Upload(ctx, req.Meta, p.bootstrapPath(req.Meta), 0, true)
|
||||
if retErr != nil {
|
||||
return PushResult{}, errors.Wrapf(retErr, "failed to put metafile to remote")
|
||||
}
|
||||
if len(desc.URLs) != 0 {
|
||||
pushResult.RemoteMeta = desc.URLs[0]
|
||||
}
|
||||
if retErr = p.metaBackend.Finalize(false); retErr != nil {
|
||||
return PushResult{}, errors.Wrap(retErr, "Finalize meta backend upload")
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func ParseBackendConfig(backendConfigFile string) (BackendConfig, error) {
|
||||
|
|
|
@ -20,7 +20,8 @@ type mockBackend struct {
|
|||
|
||||
func (m *mockBackend) Upload(ctx context.Context, blobID, blobPath string, blobSize int64, forcePush bool) (*ocispec.Descriptor, error) {
|
||||
args := m.Called(ctx, blobID, blobPath, blobSize, forcePush)
|
||||
return nil, args.Error(0)
|
||||
desc := args.Get(0)
|
||||
return desc.(*ocispec.Descriptor), nil
|
||||
}
|
||||
|
||||
func (m *mockBackend) Finalize(cancel bool) error {
|
||||
|
@ -94,8 +95,12 @@ func TestPusher_Push(t *testing.T) {
|
|||
|
||||
hash := "3093776c78a21e47f0a8b4c80a1f019b1e838fc1ade274209332af1ca5f57090"
|
||||
assert.Nil(t, err)
|
||||
mp.On("Upload", mock.Anything, "mock.meta", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
|
||||
mp.On("Upload", mock.Anything, hash, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
|
||||
mp.On("Upload", mock.Anything, "mock.meta", mock.Anything, mock.Anything, mock.Anything).Return(&ocispec.Descriptor{
|
||||
URLs: []string{"oss://testbucket/testmetaprefix/mock.meta"},
|
||||
}, nil)
|
||||
mp.On("Upload", mock.Anything, hash, mock.Anything, mock.Anything, mock.Anything).Return(&ocispec.Descriptor{
|
||||
URLs: []string{"oss://testbucket/testblobprefix/3093776c78a21e47f0a8b4c80a1f019b1e838fc1ade274209332af1ca5f57090"},
|
||||
}, nil)
|
||||
res, err := pusher.Push(PushRequest{
|
||||
Meta: "mock.meta",
|
||||
Blob: hash,
|
||||
|
|
|
@ -14,7 +14,6 @@ const (
|
|||
LayerAnnotationNydusBlob = "containerd.io/snapshot/nydus-blob"
|
||||
LayerAnnotationNydusBlobDigest = "containerd.io/snapshot/nydus-blob-digest"
|
||||
LayerAnnotationNydusBlobSize = "containerd.io/snapshot/nydus-blob-size"
|
||||
LayerAnnotationNydusBlobIDs = "containerd.io/snapshot/nydus-blob-ids"
|
||||
LayerAnnotationNydusBootstrap = "containerd.io/snapshot/nydus-bootstrap"
|
||||
LayerAnnotationNydusFsVersion = "containerd.io/snapshot/nydus-fs-version"
|
||||
LayerAnnotationNydusSourceChainID = "containerd.io/snapshot/nydus-source-chainid"
|
||||
|
|
|
@ -256,19 +256,30 @@ Currently, the mirror mode is only tested in the registry backend, and in theory
|
|||
{
|
||||
// Mirror server URL (include scheme), e.g. Dragonfly dfdaemon server URL
|
||||
"host": "http://dragonfly1.io:65001",
|
||||
// true: Send the authorization request to the mirror e.g. another docker registry.
|
||||
// false: Authorization request won't be relayed by the mirror e.g. Dragonfly.
|
||||
"auth_through": false,
|
||||
// Headers for mirror server
|
||||
"headers": {
|
||||
// For Dragonfly dfdaemon server URL, we need to specify "X-Dragonfly-Registry" (include scheme).
|
||||
// When Dragonfly does not cache data, it will pull them from "X-Dragonfly-Registry".
|
||||
// If not set "X-Dragonfly-Registry", Dragonfly will pull data from proxy.registryMirror.url.
|
||||
"X-Dragonfly-Registry": "https://index.docker.io"
|
||||
}
|
||||
},
|
||||
// This URL endpoint is used to check the health of mirror server, and if the mirror is unhealthy,
|
||||
// the request will fallback to the next mirror or the original registry server.
|
||||
// Use $host/v2 as default if left empty.
|
||||
"ping_url": "http://127.0.0.1:40901/server/ping",
|
||||
// Interval time (s) to check and recover unavailable mirror. Use 5 as default if left empty.
|
||||
"health_check_interval": 5,
|
||||
// Failure counts before disabling this mirror. Use 5 as default if left empty.
|
||||
"failure_limit": 5,
|
||||
},
|
||||
{
|
||||
"host": "http://dragonfly2.io:65001",
|
||||
"headers": {
|
||||
"X-Dragonfly-Registry": "https://index.docker.io"
|
||||
}
|
||||
},
|
||||
}
|
||||
],
|
||||
...
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nydus-error"
|
||||
version = "0.2.1"
|
||||
version = "0.2.2"
|
||||
description = "Error handling utilities for Nydus Image Service"
|
||||
authors = ["The Nydus Developers"]
|
||||
license = "Apache-2.0 OR BSD-3-Clause"
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nydus-rafs"
|
||||
version = "0.1.0"
|
||||
version = "0.1.1"
|
||||
description = "The RAFS filesystem format for Nydus Image Service"
|
||||
authors = ["The Nydus Developers"]
|
||||
license = "Apache-2.0 OR BSD-3-Clause"
|
||||
|
|
|
@ -405,12 +405,13 @@ impl Bootstrap {
|
|||
let inode_table_size = inode_table.size();
|
||||
|
||||
// Set prefetch table
|
||||
let (prefetch_table_size, prefetch_table_entries) =
|
||||
if let Some(prefetch_table) = ctx.prefetch.get_rafsv5_prefetch_table() {
|
||||
(prefetch_table.size(), prefetch_table.len() as u32)
|
||||
} else {
|
||||
(0, 0u32)
|
||||
};
|
||||
let (prefetch_table_size, prefetch_table_entries) = if let Some(prefetch_table) =
|
||||
ctx.prefetch.get_rafsv5_prefetch_table(&bootstrap_ctx.nodes)
|
||||
{
|
||||
(prefetch_table.size(), prefetch_table.len() as u32)
|
||||
} else {
|
||||
(0, 0u32)
|
||||
};
|
||||
|
||||
// Set blob table, use sha256 string (length 64) as blob id if not specified
|
||||
let prefetch_table_offset = super_block_size + inode_table_size;
|
||||
|
@ -481,7 +482,9 @@ impl Bootstrap {
|
|||
.context("failed to store inode table")?;
|
||||
|
||||
// Dump prefetch table
|
||||
if let Some(mut prefetch_table) = ctx.prefetch.get_rafsv5_prefetch_table() {
|
||||
if let Some(mut prefetch_table) =
|
||||
ctx.prefetch.get_rafsv5_prefetch_table(&bootstrap_ctx.nodes)
|
||||
{
|
||||
prefetch_table
|
||||
.store(bootstrap_ctx.writer.as_mut())
|
||||
.context("failed to store prefetch table")?;
|
||||
|
|
|
@ -165,12 +165,12 @@ impl Prefetch {
|
|||
indexes
|
||||
}
|
||||
|
||||
pub fn get_rafsv5_prefetch_table(&mut self) -> Option<RafsV5PrefetchTable> {
|
||||
pub fn get_rafsv5_prefetch_table(&mut self, nodes: &[Node]) -> Option<RafsV5PrefetchTable> {
|
||||
if self.policy == PrefetchPolicy::Fs {
|
||||
let mut prefetch_table = RafsV5PrefetchTable::new();
|
||||
for i in self.readahead_patterns.values().filter_map(|v| *v) {
|
||||
// Rafs v5 has inode number equal to index.
|
||||
prefetch_table.add_entry(i as u32);
|
||||
prefetch_table.add_entry(nodes[i as usize - 1].inode.ino() as u32);
|
||||
}
|
||||
Some(prefetch_table)
|
||||
} else {
|
||||
|
|
|
@ -3,11 +3,12 @@
|
|||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::{
|
||||
ffi::OsString,
|
||||
fs::Permissions,
|
||||
io::{Error, ErrorKind, Write},
|
||||
ops::DerefMut,
|
||||
os::unix::prelude::PermissionsExt,
|
||||
path::Path,
|
||||
path::{Path, PathBuf},
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
|
@ -164,7 +165,8 @@ impl RafsInspector {
|
|||
fn cmd_stat_file(&self, file_name: &str) -> Result<Option<Value>, anyhow::Error> {
|
||||
// Stat current directory
|
||||
if file_name == "." {
|
||||
return self.stat_single_file(self.cur_dir_ino);
|
||||
let inode = self.rafs_meta.get_inode(self.cur_dir_ino, false)?;
|
||||
return self.stat_single_file(Some(inode.parent()), self.cur_dir_ino);
|
||||
}
|
||||
|
||||
// Walk through children inodes to find the file
|
||||
|
@ -173,7 +175,7 @@ impl RafsInspector {
|
|||
dir_inode.walk_children_inodes(0, &mut |_inode, child_name, child_ino, _offset| {
|
||||
if child_name == file_name {
|
||||
// Print file information
|
||||
if let Err(e) = self.stat_single_file(child_ino) {
|
||||
if let Err(e) = self.stat_single_file(Some(dir_inode.ino()), child_ino) {
|
||||
return Err(Error::new(ErrorKind::Other, e));
|
||||
}
|
||||
|
||||
|
@ -273,6 +275,36 @@ Compressed Size: {compressed_size}
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
// Convert an inode number to a file path, the rafs v6 file is handled separately.
|
||||
fn path_from_ino(&self, ino: u64) -> Result<PathBuf, anyhow::Error> {
|
||||
let inode = self.rafs_meta.superblock.get_inode(ino, false)?;
|
||||
if ino == self.rafs_meta.superblock.root_ino() {
|
||||
return Ok(self
|
||||
.rafs_meta
|
||||
.superblock
|
||||
.get_inode(ino, false)?
|
||||
.name()
|
||||
.into());
|
||||
}
|
||||
|
||||
let mut file_path = PathBuf::from("");
|
||||
if self.rafs_meta.meta.is_v6() && !inode.is_dir() {
|
||||
self.rafs_meta.walk_dir(
|
||||
self.rafs_meta.superblock.root_ino(),
|
||||
None,
|
||||
&mut |inode, path| {
|
||||
if inode.ino() == ino {
|
||||
file_path = PathBuf::from(path);
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
)?;
|
||||
} else {
|
||||
file_path = self.rafs_meta.path_from_ino(ino as u64)?;
|
||||
};
|
||||
Ok(file_path)
|
||||
}
|
||||
|
||||
// Implement command "prefetch"
|
||||
fn cmd_list_prefetch(&self) -> Result<Option<Value>, anyhow::Error> {
|
||||
let mut guard = self.bootstrap.lock().unwrap();
|
||||
|
@ -283,7 +315,7 @@ Compressed Size: {compressed_size}
|
|||
let o = if self.request_mode {
|
||||
let mut value = json!([]);
|
||||
for ino in prefetch_inos {
|
||||
let path = self.rafs_meta.path_from_ino(ino as u64)?;
|
||||
let path = self.path_from_ino(ino as u64)?;
|
||||
let v = json!({"inode": ino, "path": path});
|
||||
value.as_array_mut().unwrap().push(v);
|
||||
}
|
||||
|
@ -294,7 +326,7 @@ Compressed Size: {compressed_size}
|
|||
self.rafs_meta.meta.prefetch_table_entries
|
||||
);
|
||||
for ino in prefetch_inos {
|
||||
let path = self.rafs_meta.path_from_ino(ino as u64)?;
|
||||
let path = self.path_from_ino(ino as u64)?;
|
||||
println!(
|
||||
r#"Inode Number:{inode_number:10} | Path: {path:?} "#,
|
||||
path = path,
|
||||
|
@ -362,15 +394,56 @@ Blob ID: {}
|
|||
Ok(None)
|
||||
}
|
||||
|
||||
/// Walkthrough the file tree rooted at ino, calling cb for each file or directory
|
||||
/// in the tree by DFS order, including ino, please ensure ino is a directory.
|
||||
fn walk_dir(
|
||||
&self,
|
||||
ino: u64,
|
||||
parent: Option<&PathBuf>,
|
||||
parent_ino: Option<u64>,
|
||||
cb: &mut dyn FnMut(Option<u64>, &dyn RafsInode, &Path) -> anyhow::Result<()>,
|
||||
) -> anyhow::Result<()> {
|
||||
let inode = self.rafs_meta.superblock.get_inode(ino, false)?;
|
||||
if !inode.is_dir() {
|
||||
bail!("inode {} is not a directory", ino);
|
||||
}
|
||||
self.walk_dir_inner(inode.as_ref(), parent, parent_ino, cb)
|
||||
}
|
||||
|
||||
fn walk_dir_inner(
|
||||
&self,
|
||||
inode: &dyn RafsInode,
|
||||
parent: Option<&PathBuf>,
|
||||
parent_ino: Option<u64>,
|
||||
cb: &mut dyn FnMut(Option<u64>, &dyn RafsInode, &Path) -> anyhow::Result<()>,
|
||||
) -> anyhow::Result<()> {
|
||||
let path = if let Some(parent) = parent {
|
||||
parent.join(inode.name())
|
||||
} else {
|
||||
PathBuf::from("/")
|
||||
};
|
||||
cb(parent_ino, inode, &path)?;
|
||||
if !inode.is_dir() {
|
||||
return Ok(());
|
||||
}
|
||||
let child_count = inode.get_child_count();
|
||||
for idx in 0..child_count {
|
||||
let child = inode.get_child_by_index(idx)?;
|
||||
self.walk_dir_inner(child.as_ref(), Some(&path), Some(inode.ino()), cb)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Implement command "icheck"
|
||||
fn cmd_check_inode(&self, ino: u64) -> Result<Option<Value>, anyhow::Error> {
|
||||
self.rafs_meta.walk_dir(
|
||||
self.walk_dir(
|
||||
self.rafs_meta.superblock.root_ino(),
|
||||
None,
|
||||
&mut |inode, path| {
|
||||
None,
|
||||
&mut |parent, inode, path| {
|
||||
if inode.ino() == ino {
|
||||
println!(r#"{}"#, path.to_string_lossy(),);
|
||||
self.stat_single_file(ino)?;
|
||||
self.stat_single_file(parent, ino)?;
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
|
@ -380,14 +453,41 @@ Blob ID: {}
|
|||
}
|
||||
|
||||
impl RafsInspector {
|
||||
/// Get file name of the inode, the rafs v6 file is handled separately.
|
||||
fn get_file_name(&self, parent_inode: &dyn RafsInode, inode: &dyn RafsInode) -> OsString {
|
||||
let mut filename = OsString::from("");
|
||||
if self.rafs_meta.meta.is_v6() && !inode.is_dir() {
|
||||
parent_inode
|
||||
.walk_children_inodes(
|
||||
0,
|
||||
&mut |_inode: Option<Arc<dyn RafsInode>>, name: OsString, cur_ino, _offset| {
|
||||
if cur_ino == inode.ino() {
|
||||
filename = name;
|
||||
}
|
||||
Ok(PostWalkAction::Continue)
|
||||
},
|
||||
)
|
||||
.unwrap();
|
||||
} else {
|
||||
filename = inode.name();
|
||||
}
|
||||
filename
|
||||
}
|
||||
|
||||
// print information of single file
|
||||
fn stat_single_file(&self, ino: u64) -> Result<Option<Value>, anyhow::Error> {
|
||||
fn stat_single_file(
|
||||
&self,
|
||||
parent_ino: Option<u64>,
|
||||
ino: u64,
|
||||
) -> Result<Option<Value>, anyhow::Error> {
|
||||
// get RafsInode of current ino
|
||||
let inode = self.rafs_meta.get_inode(ino, false)?;
|
||||
let inode_attr = inode.get_attr();
|
||||
|
||||
println!(
|
||||
r#"
|
||||
if let Some(parent_ino) = parent_ino {
|
||||
let parent = self.rafs_meta.superblock.get_inode(parent_ino, false)?;
|
||||
println!(
|
||||
r#"
|
||||
Inode Number: {inode_number}
|
||||
Name: {name:?}
|
||||
Size: {size}
|
||||
|
@ -400,19 +500,20 @@ GID: {gid}
|
|||
Mtime: {mtime}
|
||||
MtimeNsec: {mtime_nsec}
|
||||
Blocks: {blocks}"#,
|
||||
inode_number = inode.ino(),
|
||||
name = inode.name(),
|
||||
size = inode.size(),
|
||||
parent = inode.parent(),
|
||||
mode = inode_attr.mode,
|
||||
permissions = Permissions::from_mode(inode_attr.mode).mode(),
|
||||
nlink = inode_attr.nlink,
|
||||
uid = inode_attr.uid,
|
||||
gid = inode_attr.gid,
|
||||
mtime = inode_attr.mtime,
|
||||
mtime_nsec = inode_attr.mtimensec,
|
||||
blocks = inode_attr.blocks,
|
||||
);
|
||||
inode_number = inode.ino(),
|
||||
name = self.get_file_name(parent.as_ref(), inode.as_ref()),
|
||||
size = inode.size(),
|
||||
parent = parent.ino(),
|
||||
mode = inode_attr.mode,
|
||||
permissions = Permissions::from_mode(inode_attr.mode).mode(),
|
||||
nlink = inode_attr.nlink,
|
||||
uid = inode_attr.uid,
|
||||
gid = inode_attr.gid,
|
||||
mtime = inode_attr.mtime,
|
||||
mtime_nsec = inode_attr.mtimensec,
|
||||
blocks = inode_attr.blocks,
|
||||
);
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
#![deny(warnings)]
|
||||
#[macro_use(crate_authors, crate_version)]
|
||||
#[macro_use(crate_authors)]
|
||||
extern crate clap;
|
||||
#[macro_use]
|
||||
extern crate anyhow;
|
||||
|
@ -542,7 +542,7 @@ fn init_log(matches: &ArgMatches) -> Result<()> {
|
|||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let (bti_string, build_info) = BuildTimeInfo::dump(crate_version!());
|
||||
let (bti_string, build_info) = BuildTimeInfo::dump();
|
||||
|
||||
let cmd = prepare_cmd_args(bti_string);
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
#![deny(warnings)]
|
||||
|
||||
#[macro_use(crate_authors, crate_version)]
|
||||
#[macro_use(crate_authors)]
|
||||
extern crate clap;
|
||||
#[macro_use]
|
||||
extern crate anyhow;
|
||||
|
@ -25,11 +25,13 @@ mod commands;
|
|||
use commands::{
|
||||
CommandBackend, CommandCache, CommandDaemon, CommandFsStats, CommandMount, CommandUmount,
|
||||
};
|
||||
use nydus_app::BuildTimeInfo;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
let (_, build_info) = BuildTimeInfo::dump();
|
||||
let app = App::new("A client to query and configure the nydusd daemon\n")
|
||||
.version(crate_version!())
|
||||
.version(build_info.package_ver.as_str())
|
||||
.author(crate_authors!())
|
||||
.arg(
|
||||
Arg::with_name("sock")
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
// SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
|
||||
#![deny(warnings)]
|
||||
#![allow(dead_code)]
|
||||
#[macro_use(crate_version)]
|
||||
extern crate clap;
|
||||
#[macro_use]
|
||||
extern crate log;
|
||||
|
@ -263,7 +262,7 @@ fn append_fuse_options(app: App<'static, 'static>) -> App<'static, 'static> {
|
|||
Arg::with_name("threads")
|
||||
.long("thread-num")
|
||||
.short("T")
|
||||
.default_value("1")
|
||||
.default_value("4")
|
||||
.help("Number of worker threads to serve IO requests")
|
||||
.takes_value(true)
|
||||
.required(false)
|
||||
|
@ -707,7 +706,7 @@ fn process_singleton_arguments(
|
|||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let (bti_string, bti) = BuildTimeInfo::dump(crate_version!());
|
||||
let (bti_string, bti) = BuildTimeInfo::dump();
|
||||
let cmd_options = prepare_commandline_options().version(bti_string.as_str());
|
||||
let args = cmd_options.clone().get_matches();
|
||||
let logging_file = args.value_of("log-file").map(|l| l.into());
|
||||
|
@ -722,7 +721,7 @@ fn main() -> Result<()> {
|
|||
|
||||
setup_logging(logging_file, level, rotation_size)?;
|
||||
|
||||
dump_program_info(crate_version!());
|
||||
dump_program_info();
|
||||
handle_rlimit_nofile_option(&args, "rlimit-nofile")?;
|
||||
|
||||
match args.subcommand_name() {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nydus-storage"
|
||||
version = "0.5.0"
|
||||
version = "0.5.1"
|
||||
description = "Storage subsystem for Nydus Image Service"
|
||||
authors = ["The Nydus Developers"]
|
||||
license = "Apache-2.0 OR BSD-3-Clause"
|
||||
|
|
|
@ -12,7 +12,6 @@ use std::sync::Arc;
|
|||
use std::thread;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use arc_swap::ArcSwapOption;
|
||||
use log::{max_level, Level};
|
||||
|
||||
use reqwest::header::{HeaderName, HeaderValue};
|
||||
|
@ -134,7 +133,7 @@ impl<R: Read + Send + 'static> Read for Progress<R> {
|
|||
|
||||
/// HTTP request data to send to server.
|
||||
#[derive(Clone)]
|
||||
pub enum ReqBody<R> {
|
||||
pub enum ReqBody<R: Clone> {
|
||||
Read(Progress<R>, usize),
|
||||
Buf(Vec<u8>),
|
||||
Form(HashMap<String, String>),
|
||||
|
@ -222,30 +221,42 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Re
|
|||
#[derive(Debug)]
|
||||
pub(crate) struct Connection {
|
||||
client: Client,
|
||||
proxy: Option<Proxy>,
|
||||
mirror_state: MirrorState,
|
||||
shutdown: AtomicBool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct MirrorState {
|
||||
mirrors: Vec<Arc<Mirror>>,
|
||||
/// Current mirror, None if there is no mirror available.
|
||||
current: ArcSwapOption<Mirror>,
|
||||
proxy: Option<Arc<Proxy>>,
|
||||
pub mirrors: Vec<Arc<Mirror>>,
|
||||
pub shutdown: AtomicBool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Mirror {
|
||||
/// Information for mirror from configuration file.
|
||||
config: MirrorConfig,
|
||||
pub config: MirrorConfig,
|
||||
/// Mirror status, it will be set to false by atomic operation when mirror is not work.
|
||||
status: AtomicBool,
|
||||
/// Falied times for mirror, the status will be marked as false when failed_times = failure_limit.
|
||||
/// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit.
|
||||
failed_times: AtomicU8,
|
||||
/// Failed limit for mirror.
|
||||
/// Failure count for which mirror is considered unavailable.
|
||||
failure_limit: u8,
|
||||
}
|
||||
|
||||
impl Mirror {
|
||||
/// Convert original URL to mirror URL.
|
||||
fn mirror_url(&self, url: &str) -> ConnectionResult<Url> {
|
||||
let mirror_host = Url::parse(self.config.host.as_ref()).map_err(ConnectionError::Url)?;
|
||||
let mut current_url = Url::parse(url).map_err(ConnectionError::Url)?;
|
||||
|
||||
current_url
|
||||
.set_scheme(mirror_host.scheme())
|
||||
.map_err(|_| ConnectionError::Scheme)?;
|
||||
current_url
|
||||
.set_host(mirror_host.host_str())
|
||||
.map_err(|_| ConnectionError::Host)?;
|
||||
current_url
|
||||
.set_port(mirror_host.port())
|
||||
.map_err(|_| ConnectionError::Port)?;
|
||||
Ok(current_url)
|
||||
}
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
/// Create a new connection according to the configuration.
|
||||
pub fn new(config: &ConnectionConfig) -> Result<Arc<Connection>> {
|
||||
|
@ -258,13 +269,13 @@ impl Connection {
|
|||
} else {
|
||||
None
|
||||
};
|
||||
Some(Proxy {
|
||||
Some(Arc::new(Proxy {
|
||||
client: Self::build_connection(&config.proxy.url, config)?,
|
||||
health: ProxyHealth::new(config.proxy.check_interval, ping_url),
|
||||
fallback: config.proxy.fallback,
|
||||
use_http: config.proxy.use_http,
|
||||
replace_scheme: AtomicI16::new(SCHEME_REVERSION_CACHE_UNSET),
|
||||
})
|
||||
}))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
@ -275,34 +286,34 @@ impl Connection {
|
|||
mirrors.push(Arc::new(Mirror {
|
||||
config: mirror_config.clone(),
|
||||
status: AtomicBool::from(true),
|
||||
// Maybe read from configuration file
|
||||
failure_limit: 5,
|
||||
failed_times: AtomicU8::from(0),
|
||||
failure_limit: mirror_config.failure_limit,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
let current = if let Some(first_mirror) = mirrors.first() {
|
||||
ArcSwapOption::from(Some(first_mirror.clone()))
|
||||
} else {
|
||||
ArcSwapOption::from(None)
|
||||
};
|
||||
|
||||
let connection = Arc::new(Connection {
|
||||
client,
|
||||
proxy,
|
||||
mirror_state: MirrorState { mirrors, current },
|
||||
mirrors,
|
||||
shutdown: AtomicBool::new(false),
|
||||
});
|
||||
|
||||
if let Some(proxy) = &connection.proxy {
|
||||
if proxy.health.ping_url.is_some() {
|
||||
let conn = connection.clone();
|
||||
let connect_timeout = config.connect_timeout;
|
||||
// Start proxy's health checking thread.
|
||||
connection.start_proxy_health_thread(config.connect_timeout as u64);
|
||||
|
||||
// Start mirrors' health checking thread.
|
||||
connection.start_mirrors_health_thread(config.timeout as u64);
|
||||
|
||||
Ok(connection)
|
||||
}
|
||||
|
||||
fn start_proxy_health_thread(&self, connect_timeout: u64) {
|
||||
if let Some(proxy) = self.proxy.as_ref() {
|
||||
if proxy.health.ping_url.is_some() {
|
||||
let proxy = proxy.clone();
|
||||
// Spawn thread to update the health status of proxy server
|
||||
thread::spawn(move || {
|
||||
let proxy = conn.proxy.as_ref().unwrap();
|
||||
let ping_url = proxy.health.ping_url.as_ref().unwrap();
|
||||
let mut last_success = true;
|
||||
|
||||
|
@ -333,20 +344,60 @@ impl Connection {
|
|||
proxy.health.set(false)
|
||||
});
|
||||
|
||||
if conn.shutdown.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
thread::sleep(proxy.health.check_interval);
|
||||
if conn.shutdown.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
// TODO: check mirrors' health
|
||||
}
|
||||
|
||||
Ok(connection)
|
||||
fn start_mirrors_health_thread(&self, timeout: u64) {
|
||||
for mirror in self.mirrors.iter() {
|
||||
let mirror_cloned = mirror.clone();
|
||||
thread::spawn(move || {
|
||||
let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() {
|
||||
format!("{}/v2", mirror_cloned.config.host)
|
||||
} else {
|
||||
mirror_cloned.config.ping_url.clone()
|
||||
};
|
||||
info!("Mirror health checking url: {}", mirror_health_url);
|
||||
|
||||
let client = Client::new();
|
||||
loop {
|
||||
// Try to recover the mirror server when it is unavailable.
|
||||
if !mirror_cloned.status.load(Ordering::Relaxed) {
|
||||
info!(
|
||||
"Mirror server {} unhealthy, try to recover",
|
||||
mirror_cloned.config.host
|
||||
);
|
||||
|
||||
let _ = client
|
||||
.get(mirror_health_url.as_str())
|
||||
.timeout(Duration::from_secs(timeout as u64))
|
||||
.send()
|
||||
.map(|resp| {
|
||||
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
|
||||
// the mirror server is recovered.
|
||||
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
|
||||
info!("Mirror server {} recovered", mirror_cloned.config.host);
|
||||
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
|
||||
mirror_cloned.status.store(true, Ordering::Relaxed);
|
||||
}
|
||||
})
|
||||
.map_err(|e| {
|
||||
warn!(
|
||||
"Mirror server {} is not recovered: {}",
|
||||
mirror_cloned.config.host, e
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_secs(
|
||||
mirror_cloned.config.health_check_interval,
|
||||
));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Shutdown the connection.
|
||||
|
@ -354,8 +405,15 @@ impl Connection {
|
|||
self.shutdown.store(true, Ordering::Release);
|
||||
}
|
||||
|
||||
/// Send a request to server and wait for response.
|
||||
pub fn call<R: Read + Send + 'static>(
|
||||
/// If the auth_through is enable, all requests are send to the mirror server.
|
||||
/// If the auth_through disabled, e.g. P2P/Dragonfly, we try to avoid sending
|
||||
/// non-authorization request to the mirror server, which causes performance loss.
|
||||
/// requesting_auth means this request is to get authorization from a server,
|
||||
/// which must be a non-authorization request.
|
||||
/// IOW, only the requesting_auth is false and the headers contain authorization token,
|
||||
/// we send this request to mirror.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn call<R: Read + Clone + Send + 'static>(
|
||||
&self,
|
||||
method: Method,
|
||||
url: &str,
|
||||
|
@ -363,6 +421,8 @@ impl Connection {
|
|||
data: Option<ReqBody<R>>,
|
||||
headers: &mut HeaderMap,
|
||||
catch_status: bool,
|
||||
// This means the request is dedicated to authorization.
|
||||
requesting_auth: bool,
|
||||
) -> ConnectionResult<Response> {
|
||||
if self.shutdown.load(Ordering::Acquire) {
|
||||
return Err(ConnectionError::Disconnected);
|
||||
|
@ -370,11 +430,7 @@ impl Connection {
|
|||
|
||||
if let Some(proxy) = &self.proxy {
|
||||
if proxy.health.ok() {
|
||||
let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
|
||||
Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
|
||||
Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
|
||||
_ => None,
|
||||
};
|
||||
let data_cloned = data.as_ref().cloned();
|
||||
|
||||
let http_url: Option<String>;
|
||||
let mut replaced_url = url;
|
||||
|
@ -425,93 +481,83 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
let current_mirror = self.mirror_state.current.load();
|
||||
|
||||
if let Some(mirror) = current_mirror.as_ref() {
|
||||
let data_cloned: Option<ReqBody<R>> = match data.as_ref() {
|
||||
Some(ReqBody::Form(form)) => Some(ReqBody::Form(form.clone())),
|
||||
Some(ReqBody::Buf(buf)) => Some(ReqBody::Buf(buf.clone())),
|
||||
_ => None,
|
||||
};
|
||||
|
||||
let mirror_host = Url::parse(&mirror.config.host).map_err(ConnectionError::Url)?;
|
||||
|
||||
let mut current_url = Url::parse(url).map_err(ConnectionError::Url)?;
|
||||
|
||||
current_url
|
||||
.set_scheme(mirror_host.scheme())
|
||||
.map_err(|_| ConnectionError::Scheme)?;
|
||||
current_url
|
||||
.set_host(mirror_host.host_str())
|
||||
.map_err(|_| ConnectionError::Host)?;
|
||||
current_url
|
||||
.set_port(mirror_host.port())
|
||||
.map_err(|_| ConnectionError::Port)?;
|
||||
|
||||
if let Some(working_headers) = &mirror.config.headers {
|
||||
for (key, value) in working_headers.iter() {
|
||||
headers.insert(
|
||||
HeaderName::from_str(key).unwrap(),
|
||||
HeaderValue::from_str(value).unwrap(),
|
||||
);
|
||||
if !self.mirrors.is_empty() {
|
||||
let mut fallback_due_auth = false;
|
||||
for mirror in self.mirrors.iter() {
|
||||
// With configuration `auth_through` disabled, we should not intend to send authentication
|
||||
// request to mirror. Mainly because mirrors like P2P/Dragonfly has a poor performance when
|
||||
// relaying non-data requests. But it's still possible that ever returned token is expired.
|
||||
// So mirror might still respond us with status code UNAUTHORIZED, which should be handle
|
||||
// by sending authentication request to the original registry.
|
||||
//
|
||||
// - For non-authentication request with token in request header, handle is as usual requests to registry.
|
||||
// This request should already take token in header.
|
||||
// - For authentication request
|
||||
// 1. auth_through is disabled(false): directly pass below mirror translations and jump to original registry handler.
|
||||
// 2. auth_through is enabled(true): try to get authenticated from mirror and should also handle status code UNAUTHORIZED.
|
||||
if !mirror.config.auth_through
|
||||
&& (!headers.contains_key(HEADER_AUTHORIZATION) || requesting_auth)
|
||||
{
|
||||
fallback_due_auth = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let result = self.call_inner(
|
||||
&self.client,
|
||||
method.clone(),
|
||||
current_url.to_string().as_str(),
|
||||
&query,
|
||||
data_cloned,
|
||||
headers,
|
||||
catch_status,
|
||||
false,
|
||||
);
|
||||
if mirror.status.load(Ordering::Relaxed) {
|
||||
let data_cloned = data.as_ref().cloned();
|
||||
|
||||
match result {
|
||||
Ok(resp) => {
|
||||
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
|
||||
return Ok(resp);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"request mirror server failed, mirror: {}, error: {:?}",
|
||||
format!("{:?}", mirror).to_lowercase(),
|
||||
err
|
||||
);
|
||||
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
|
||||
warn!(
|
||||
"reach to fail limit {}, disable mirror: {}",
|
||||
mirror.failure_limit,
|
||||
format!("{:?}", mirror).to_lowercase()
|
||||
for (key, value) in mirror.config.headers.iter() {
|
||||
headers.insert(
|
||||
HeaderName::from_str(key).unwrap(),
|
||||
HeaderValue::from_str(value).unwrap(),
|
||||
);
|
||||
mirror.status.store(false, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
let mut idx = 0;
|
||||
loop {
|
||||
if idx == self.mirror_state.mirrors.len() {
|
||||
break None;
|
||||
}
|
||||
let m = &self.mirror_state.mirrors[idx];
|
||||
if m.status.load(Ordering::Relaxed) {
|
||||
warn!(
|
||||
"mirror server has been changed to {}",
|
||||
format!("{:?}", m).to_lowercase()
|
||||
);
|
||||
break Some(m);
|
||||
}
|
||||
let current_url = mirror.mirror_url(url)?;
|
||||
debug!("mirror server url {}", current_url);
|
||||
|
||||
idx += 1;
|
||||
let result = self.call_inner(
|
||||
&self.client,
|
||||
method.clone(),
|
||||
current_url.as_str(),
|
||||
&query,
|
||||
data_cloned,
|
||||
headers,
|
||||
catch_status,
|
||||
false,
|
||||
);
|
||||
|
||||
match result {
|
||||
Ok(resp) => {
|
||||
// If the response status >= INTERNAL_SERVER_ERROR, move to the next mirror server.
|
||||
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
|
||||
return Ok(resp);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"request mirror server failed, mirror: {:?}, error: {:?}",
|
||||
mirror, err
|
||||
);
|
||||
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
|
||||
warn!(
|
||||
"reach to failure limit {}, disable mirror: {:?}",
|
||||
mirror.failure_limit, mirror
|
||||
);
|
||||
mirror.status.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
.map(|m| self.mirror_state.current.store(Some(m.clone())))
|
||||
.unwrap_or_else(|| self.mirror_state.current.store(None));
|
||||
}
|
||||
}
|
||||
// Remove mirror-related headers to avoid sending them to the next mirror server and original registry.
|
||||
for (key, _) in mirror.config.headers.iter() {
|
||||
headers.remove(HeaderName::from_str(key).unwrap());
|
||||
}
|
||||
}
|
||||
if !fallback_due_auth {
|
||||
warn!("Request to all mirror server failed, fallback to original server.");
|
||||
}
|
||||
warn!("Failed to request mirror server, fallback to original server.");
|
||||
}
|
||||
|
||||
self.call_inner(
|
||||
|
@ -555,7 +601,7 @@ impl Connection {
|
|||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn call_inner<R: Read + Send + 'static>(
|
||||
fn call_inner<R: Read + Clone + Send + 'static>(
|
||||
&self,
|
||||
client: &Client,
|
||||
method: Method,
|
||||
|
|
|
@ -143,7 +143,15 @@ impl BlobReader for OssReader {
|
|||
|
||||
let resp = self
|
||||
.connection
|
||||
.call::<&[u8]>(Method::HEAD, url.as_str(), None, None, &mut headers, true)
|
||||
.call::<&[u8]>(
|
||||
Method::HEAD,
|
||||
url.as_str(),
|
||||
None,
|
||||
None,
|
||||
&mut headers,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
.map_err(OssError::Request)?;
|
||||
let content_length = resp
|
||||
.headers()
|
||||
|
@ -178,7 +186,15 @@ impl BlobReader for OssReader {
|
|||
// Safe because the the call() is a synchronous operation.
|
||||
let mut resp = self
|
||||
.connection
|
||||
.call::<&[u8]>(Method::GET, url.as_str(), None, None, &mut headers, true)
|
||||
.call::<&[u8]>(
|
||||
Method::GET,
|
||||
url.as_str(),
|
||||
None,
|
||||
None,
|
||||
&mut headers,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
.map_err(OssError::Request)?;
|
||||
Ok(resp
|
||||
.copy_to(&mut buf)
|
||||
|
|
|
@ -5,8 +5,12 @@
|
|||
//! Storage backend driver to access blobs on container image registry.
|
||||
use std::collections::HashMap;
|
||||
use std::io::{Error, Read, Result};
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::thread;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use arc_swap::ArcSwapOption;
|
||||
use reqwest::blocking::Response;
|
||||
pub use reqwest::header::HeaderMap;
|
||||
use reqwest::header::{HeaderValue, CONTENT_LENGTH};
|
||||
|
@ -102,6 +106,12 @@ impl HashCache {
|
|||
#[derive(Clone, serde::Deserialize)]
|
||||
struct TokenResponse {
|
||||
token: String,
|
||||
#[serde(default = "default_expires_in")]
|
||||
expires_in: u64,
|
||||
}
|
||||
|
||||
fn default_expires_in() -> u64 {
|
||||
10 * 60
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
|
@ -110,7 +120,7 @@ struct BasicAuth {
|
|||
realm: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
struct BearerAuth {
|
||||
realm: String,
|
||||
service: String,
|
||||
|
@ -149,6 +159,11 @@ struct RegistryState {
|
|||
// Cache 30X redirect url
|
||||
// Example: RwLock<HashMap<"<blob_id>", "<redirected_url>">>
|
||||
cached_redirect: HashCache,
|
||||
|
||||
// The expiration time of the token, which is obtained from the registry server.
|
||||
refresh_token_time: ArcSwapOption<u64>,
|
||||
// Cache bearer auth for refreshing token.
|
||||
cached_bearer_auth: ArcSwapOption<BearerAuth>,
|
||||
}
|
||||
|
||||
impl RegistryState {
|
||||
|
@ -198,6 +213,7 @@ impl RegistryState {
|
|||
Some(ReqBody::Form(form)),
|
||||
&mut headers,
|
||||
true,
|
||||
true,
|
||||
)
|
||||
.map_err(|e| einval!(format!("registry auth server request failed {:?}", e)))?;
|
||||
let ret: TokenResponse = token_resp.json().map_err(|e| {
|
||||
|
@ -206,6 +222,19 @@ impl RegistryState {
|
|||
e
|
||||
))
|
||||
})?;
|
||||
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
|
||||
self.refresh_token_time
|
||||
.store(Some(Arc::new(now_timestamp.as_secs() + ret.expires_in)));
|
||||
info!(
|
||||
"cached_bearer_auth: {:?}, next time: {}",
|
||||
auth,
|
||||
now_timestamp.as_secs() + ret.expires_in
|
||||
);
|
||||
}
|
||||
|
||||
// Cache bearer auth for refreshing token.
|
||||
self.cached_bearer_auth.store(Some(Arc::new(auth)));
|
||||
|
||||
Ok(ret.token)
|
||||
}
|
||||
|
||||
|
@ -312,7 +341,7 @@ impl RegistryReader {
|
|||
/// Request: POST https://my-registry.com/test/repo/blobs/uploads
|
||||
/// header: authorization: Basic base64(<username:password>)
|
||||
/// Response: status: 200 Ok
|
||||
fn request<R: Read + Send + 'static>(
|
||||
fn request<R: Read + Clone + Send + 'static>(
|
||||
&self,
|
||||
method: Method,
|
||||
url: &str,
|
||||
|
@ -336,16 +365,40 @@ impl RegistryReader {
|
|||
if let Some(data) = data {
|
||||
return self
|
||||
.connection
|
||||
.call(method, url, None, Some(data), &mut headers, catch_status)
|
||||
.call(
|
||||
method,
|
||||
url,
|
||||
None,
|
||||
Some(data),
|
||||
&mut headers,
|
||||
catch_status,
|
||||
false,
|
||||
)
|
||||
.map_err(RegistryError::Request);
|
||||
}
|
||||
|
||||
// Try to request registry server with `authorization` header
|
||||
let resp = self
|
||||
let mut resp = self
|
||||
.connection
|
||||
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false)
|
||||
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false, false)
|
||||
.map_err(RegistryError::Request)?;
|
||||
if resp.status() == StatusCode::UNAUTHORIZED {
|
||||
if headers.contains_key(HEADER_AUTHORIZATION) {
|
||||
// If we request registry (harbor server) with expired authorization token,
|
||||
// the `www-authenticate: Basic realm="harbor"` in response headers is not expected.
|
||||
// Related code in harbor:
|
||||
// https://github.com/goharbor/harbor/blob/v2.5.3/src/server/middleware/v2auth/auth.go#L98
|
||||
//
|
||||
// We can remove the expired authorization token and
|
||||
// resend the request to get the correct "www-authenticate" value.
|
||||
headers.remove(HEADER_AUTHORIZATION);
|
||||
|
||||
resp = self
|
||||
.connection
|
||||
.call::<&[u8]>(method.clone(), url, None, None, &mut headers, false, false)
|
||||
.map_err(RegistryError::Request)?;
|
||||
};
|
||||
|
||||
if let Some(resp_auth_header) = resp.headers().get(HEADER_WWW_AUTHENTICATE) {
|
||||
// Get token from registry authorization server
|
||||
if let Some(auth) = RegistryState::parse_auth(resp_auth_header, &self.state.auth) {
|
||||
|
@ -353,6 +406,7 @@ impl RegistryReader {
|
|||
.state
|
||||
.get_auth_header(auth, &self.connection)
|
||||
.map_err(|e| RegistryError::Common(e.to_string()))?;
|
||||
|
||||
headers.insert(
|
||||
HEADER_AUTHORIZATION,
|
||||
HeaderValue::from_str(auth_header.as_str()).unwrap(),
|
||||
|
@ -361,7 +415,7 @@ impl RegistryReader {
|
|||
// Try to request registry server with `authorization` header again
|
||||
let resp = self
|
||||
.connection
|
||||
.call(method, url, None, data, &mut headers, catch_status)
|
||||
.call(method, url, None, data, &mut headers, catch_status, false)
|
||||
.map_err(RegistryError::Request)?;
|
||||
|
||||
let status = resp.status();
|
||||
|
@ -417,6 +471,7 @@ impl RegistryReader {
|
|||
None,
|
||||
&mut headers,
|
||||
false,
|
||||
false,
|
||||
)
|
||||
.map_err(RegistryError::Request)?;
|
||||
|
||||
|
@ -473,6 +528,7 @@ impl RegistryReader {
|
|||
None,
|
||||
&mut headers,
|
||||
true,
|
||||
false,
|
||||
)
|
||||
.map_err(RegistryError::Request);
|
||||
match resp_ret {
|
||||
|
@ -577,13 +633,27 @@ impl Registry {
|
|||
blob_url_scheme: config.blob_url_scheme,
|
||||
blob_redirected_host: config.blob_redirected_host,
|
||||
cached_redirect: HashCache::new(),
|
||||
refresh_token_time: ArcSwapOption::new(None),
|
||||
cached_bearer_auth: ArcSwapOption::new(None),
|
||||
});
|
||||
|
||||
Ok(Registry {
|
||||
let mirrors = connection.mirrors.clone();
|
||||
|
||||
let registry = Registry {
|
||||
connection,
|
||||
state,
|
||||
metrics: BackendMetrics::new(id, "registry"),
|
||||
})
|
||||
};
|
||||
|
||||
for mirror in mirrors.iter() {
|
||||
if !mirror.config.auth_through {
|
||||
registry.start_refresh_token_thread();
|
||||
info!("Refresh token thread started.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(registry)
|
||||
}
|
||||
|
||||
fn get_authorization_info(auth: &Option<String>) -> Result<(String, String)> {
|
||||
|
@ -610,6 +680,72 @@ impl Registry {
|
|||
Ok((String::new(), String::new()))
|
||||
}
|
||||
}
|
||||
|
||||
fn start_refresh_token_thread(&self) {
|
||||
let conn = self.connection.clone();
|
||||
let state = self.state.clone();
|
||||
// The default refresh token internal is 10 minutes.
|
||||
let refresh_check_internal = 10 * 60;
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
if let Ok(now_timestamp) = SystemTime::now().duration_since(UNIX_EPOCH) {
|
||||
if let Some(next_refresh_timestamp) = state.refresh_token_time.load().as_deref()
|
||||
{
|
||||
// If the token will expire in next refresh check internal, get new token now.
|
||||
// Add 20 seconds to handle critical cases.
|
||||
if now_timestamp.as_secs() + refresh_check_internal + 20
|
||||
>= *next_refresh_timestamp
|
||||
{
|
||||
if let Some(cached_bearer_auth) =
|
||||
state.cached_bearer_auth.load().as_deref()
|
||||
{
|
||||
let mut cached_bearer_auth_clone = cached_bearer_auth.clone();
|
||||
if let Ok(url) = Url::parse(&cached_bearer_auth_clone.realm) {
|
||||
let last_cached_auth = state.cached_auth.get();
|
||||
|
||||
let query: Vec<(_, _)> =
|
||||
url.query_pairs().filter(|p| p.0 != "grant_type").collect();
|
||||
|
||||
let mut refresh_url = url.clone();
|
||||
refresh_url.set_query(None);
|
||||
|
||||
for pair in query {
|
||||
refresh_url.query_pairs_mut().append_pair(
|
||||
&pair.0.to_string()[..],
|
||||
&pair.1.to_string()[..],
|
||||
);
|
||||
}
|
||||
refresh_url
|
||||
.query_pairs_mut()
|
||||
.append_pair("grant_type", "refresh_token");
|
||||
cached_bearer_auth_clone.realm = refresh_url.to_string();
|
||||
|
||||
let token = state.get_token(cached_bearer_auth_clone, &conn);
|
||||
|
||||
if let Ok(token) = token {
|
||||
let new_cached_auth = format!("Bearer {}", token);
|
||||
info!(
|
||||
"Authorization token for registry has been refreshed."
|
||||
);
|
||||
// Refresh authorization token
|
||||
state.cached_auth.set(&last_cached_auth, new_cached_auth);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if conn.shutdown.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
thread::sleep(Duration::from_secs(refresh_check_internal));
|
||||
if conn.shutdown.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl BlobBackend for Registry {
|
||||
|
@ -695,6 +831,8 @@ mod tests {
|
|||
blob_redirected_host: "oss.alibaba-inc.com".to_string(),
|
||||
cached_auth: Default::default(),
|
||||
cached_redirect: Default::default(),
|
||||
refresh_token_time: ArcSwapOption::new(None),
|
||||
cached_bearer_auth: ArcSwapOption::new(None),
|
||||
};
|
||||
|
||||
assert_eq!(
|
||||
|
|
|
@ -579,7 +579,28 @@ impl FileCacheEntry {
|
|||
}
|
||||
|
||||
if !bitmap.wait_for_range_ready(chunk_index, count)? {
|
||||
Err(eio!("failed to read data from storage backend"))
|
||||
if prefetch {
|
||||
return Err(eio!("failed to read data from storage backend"));
|
||||
}
|
||||
// if we are in ondemand path, retry for the timeout chunks
|
||||
for chunk in chunks {
|
||||
if self.chunk_map.is_ready(chunk.as_ref())? {
|
||||
continue;
|
||||
}
|
||||
info!("retry for timeout chunk, {}", chunk.id());
|
||||
let mut buf = alloc_buf(chunk.uncompressed_size() as usize);
|
||||
self.read_raw_chunk(chunk.as_ref(), &mut buf, false, None)
|
||||
.map_err(|e| eio!(format!("read_raw_chunk failed, {:?}", e)))?;
|
||||
if self.dio_enabled {
|
||||
self.adjust_buffer_for_dio(&mut buf)
|
||||
}
|
||||
Self::persist_chunk(&self.file, chunk.uncompressed_offset(), &buf)
|
||||
.map_err(|e| eio!(format!("do_fetch_chunk failed to persist data, {:?}", e)))?;
|
||||
self.chunk_map
|
||||
.set_ready_and_clear_pending(chunk.as_ref())
|
||||
.unwrap_or_else(|e| error!("set chunk ready failed, {}", e));
|
||||
}
|
||||
Ok(total_size)
|
||||
} else {
|
||||
Ok(total_size)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "nydus-utils"
|
||||
version = "0.3.1"
|
||||
version = "0.3.2"
|
||||
description = "Utilities and helpers for Nydus Image Service"
|
||||
authors = ["The Nydus Developers"]
|
||||
license = "Apache-2.0 OR BSD-3-Clause"
|
||||
|
|
Loading…
Reference in New Issue