Compare commits
2 Commits
Author | SHA1 | Date |
---|---|---|
|
f7d513844d | |
|
29dc8ec5c8 |
|
@ -74,12 +74,12 @@ pub enum NydusError {
|
|||
## Development Guidelines
|
||||
|
||||
### Storage Backend Development
|
||||
When implementing new storage backends:
|
||||
- Implement the `BlobBackend` trait
|
||||
- Support timeout, retry, and connection management
|
||||
- Add configuration in the backend config structure
|
||||
- Consider mirror/proxy support for high availability
|
||||
- Implement proper error handling and logging
|
||||
- When implementing new storage backends:
|
||||
- - Implement the `BlobBackend` trait
|
||||
- - Support timeout, retry, and connection management
|
||||
- - Add configuration in the backend config structure
|
||||
- - Consider proxy support for high availability
|
||||
- - Implement proper error handling and logging
|
||||
|
||||
### Daemon Service Development
|
||||
- Use the `NydusDaemon` trait for service implementations
|
||||
|
|
|
@ -519,9 +519,6 @@ pub struct OssConfig {
|
|||
/// Enable HTTP proxy for the read request.
|
||||
#[serde(default)]
|
||||
pub proxy: ProxyConfig,
|
||||
/// Enable mirrors for the read request.
|
||||
#[serde(default)]
|
||||
pub mirrors: Vec<MirrorConfig>,
|
||||
}
|
||||
|
||||
/// S3 configuration information to access blobs.
|
||||
|
@ -563,9 +560,6 @@ pub struct S3Config {
|
|||
/// Enable HTTP proxy for the read request.
|
||||
#[serde(default)]
|
||||
pub proxy: ProxyConfig,
|
||||
/// Enable mirrors for the read request.
|
||||
#[serde(default)]
|
||||
pub mirrors: Vec<MirrorConfig>,
|
||||
}
|
||||
|
||||
/// Http proxy configuration information to access blobs.
|
||||
|
@ -592,9 +586,6 @@ pub struct HttpProxyConfig {
|
|||
/// Enable HTTP proxy for the read request.
|
||||
#[serde(default)]
|
||||
pub proxy: ProxyConfig,
|
||||
/// Enable mirrors for the read request.
|
||||
#[serde(default)]
|
||||
pub mirrors: Vec<MirrorConfig>,
|
||||
}
|
||||
|
||||
/// Container registry configuration information to access blobs.
|
||||
|
@ -635,9 +626,6 @@ pub struct RegistryConfig {
|
|||
/// Enable HTTP proxy for the read request.
|
||||
#[serde(default)]
|
||||
pub proxy: ProxyConfig,
|
||||
/// Enable mirrors for the read request.
|
||||
#[serde(default)]
|
||||
pub mirrors: Vec<MirrorConfig>,
|
||||
}
|
||||
|
||||
/// Configuration information for blob cache manager.
|
||||
|
@ -930,41 +918,6 @@ impl Default for ProxyConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/// Configuration for registry mirror.
|
||||
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct MirrorConfig {
|
||||
/// Mirror server URL, for example http://127.0.0.1:65001.
|
||||
pub host: String,
|
||||
/// Ping URL to check mirror server health.
|
||||
#[serde(default)]
|
||||
pub ping_url: String,
|
||||
/// HTTP request headers to be passed to mirror server.
|
||||
#[serde(default)]
|
||||
pub headers: HashMap<String, String>,
|
||||
/// Interval for mirror health checking, in seconds.
|
||||
#[serde(default = "default_check_interval")]
|
||||
pub health_check_interval: u64,
|
||||
/// Maximum number of failures before marking a mirror as unusable.
|
||||
#[serde(default = "default_failure_limit")]
|
||||
pub failure_limit: u8,
|
||||
/// Elapsed time to pause mirror health check when the request is inactive, in seconds.
|
||||
#[serde(default = "default_check_pause_elapsed")]
|
||||
pub health_check_pause_elapsed: u64,
|
||||
}
|
||||
|
||||
impl Default for MirrorConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
host: String::new(),
|
||||
headers: HashMap::new(),
|
||||
health_check_interval: 5,
|
||||
failure_limit: 5,
|
||||
ping_url: String::new(),
|
||||
health_check_pause_elapsed: 300,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration information for a cached blob`.
|
||||
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
|
||||
pub struct BlobCacheEntryConfigV2 {
|
||||
|
@ -1212,10 +1165,6 @@ fn default_check_pause_elapsed() -> u64 {
|
|||
300
|
||||
}
|
||||
|
||||
fn default_failure_limit() -> u8 {
|
||||
5
|
||||
}
|
||||
|
||||
fn default_work_dir() -> String {
|
||||
".".to_string()
|
||||
}
|
||||
|
@ -1886,11 +1835,6 @@ mod tests {
|
|||
fallback = true
|
||||
check_interval = 10
|
||||
use_http = true
|
||||
[[backend.oss.mirrors]]
|
||||
host = "http://127.0.0.1:65001"
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
health_check_interval = 10
|
||||
failure_limit = 10
|
||||
"#;
|
||||
let config: ConfigV2 = toml::from_str(content).unwrap();
|
||||
assert_eq!(config.version, 2);
|
||||
|
@ -1917,14 +1861,6 @@ mod tests {
|
|||
assert_eq!(oss.proxy.check_interval, 10);
|
||||
assert!(oss.proxy.fallback);
|
||||
assert!(oss.proxy.use_http);
|
||||
|
||||
assert_eq!(oss.mirrors.len(), 1);
|
||||
let mirror = &oss.mirrors[0];
|
||||
assert_eq!(mirror.host, "http://127.0.0.1:65001");
|
||||
assert_eq!(mirror.ping_url, "http://127.0.0.1:65001/ping");
|
||||
assert!(mirror.headers.is_empty());
|
||||
assert_eq!(mirror.health_check_interval, 10);
|
||||
assert_eq!(mirror.failure_limit, 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1950,11 +1886,6 @@ mod tests {
|
|||
fallback = true
|
||||
check_interval = 10
|
||||
use_http = true
|
||||
[[backend.registry.mirrors]]
|
||||
host = "http://127.0.0.1:65001"
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
health_check_interval = 10
|
||||
failure_limit = 10
|
||||
"#;
|
||||
let config: ConfigV2 = toml::from_str(content).unwrap();
|
||||
assert_eq!(config.version, 2);
|
||||
|
@ -1983,14 +1914,6 @@ mod tests {
|
|||
assert_eq!(registry.proxy.check_interval, 10);
|
||||
assert!(registry.proxy.fallback);
|
||||
assert!(registry.proxy.use_http);
|
||||
|
||||
assert_eq!(registry.mirrors.len(), 1);
|
||||
let mirror = ®istry.mirrors[0];
|
||||
assert_eq!(mirror.host, "http://127.0.0.1:65001");
|
||||
assert_eq!(mirror.ping_url, "http://127.0.0.1:65001/ping");
|
||||
assert!(mirror.headers.is_empty());
|
||||
assert_eq!(mirror.health_check_interval, 10);
|
||||
assert_eq!(mirror.failure_limit, 10);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -2397,15 +2320,6 @@ mod tests {
|
|||
assert!(res);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_default_mirror_config() {
|
||||
let cfg = MirrorConfig::default();
|
||||
assert_eq!(cfg.host, "");
|
||||
assert_eq!(cfg.health_check_interval, 5);
|
||||
assert_eq!(cfg.failure_limit, 5);
|
||||
assert_eq!(cfg.ping_url, "");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_config_v2_from_file() {
|
||||
let content = r#"version=2
|
||||
|
@ -2615,7 +2529,6 @@ mod tests {
|
|||
#[test]
|
||||
fn test_default_value() {
|
||||
assert!(default_true());
|
||||
assert_eq!(default_failure_limit(), 5);
|
||||
assert_eq!(default_prefetch_batch_size(), 1024 * 1024);
|
||||
assert_eq!(default_prefetch_threads_count(), 8);
|
||||
}
|
||||
|
|
|
@ -319,64 +319,8 @@ or
|
|||
}
|
||||
}
|
||||
```
|
||||
|
||||
The `HttpProxy` backend also supports the `Proxy` and `Mirrors` configurations for remote usage like the `Registry backend` described above.
|
||||
|
||||
##### Enable Mirrors for Storage Backend (Recommend)
|
||||
|
||||
Nydus is deeply integrated with [Dragonfly](https://d7y.io/) P2P mirror mode, please refer the [doc](https://d7y.io/docs/next/operations/integrations/container-runtime/nydus/) to learn how configuring Nydus to use Dragonfly.
|
||||
|
||||
Add `device.backend.config.mirrors` field to enable mirrors for storage backend. The mirror can be a P2P distribution server or registry. If the request to mirror server failed, it will fall back to the original registry.
|
||||
Currently, the mirror mode is only tested in the registry backend, and in theory, the OSS backend also supports it.
|
||||
|
||||
<font color='red'>!!</font> The `mirrors` field conflicts with `proxy` field.
|
||||
|
||||
```
|
||||
{
|
||||
"device": {
|
||||
"backend": {
|
||||
"type": "registry",
|
||||
"config": {
|
||||
"mirrors": [
|
||||
{
|
||||
// Mirror server URL (including scheme), e.g. Dragonfly dfdaemon server URL
|
||||
"host": "http://dragonfly1.io:65001",
|
||||
// Headers for mirror server
|
||||
"headers": {
|
||||
// For Dragonfly dfdaemon server URL, we need to specify "X-Dragonfly-Registry" (including scheme).
|
||||
// When Dragonfly does not cache data, nydusd will pull it from "X-Dragonfly-Registry".
|
||||
// If not set "X-Dragonfly-Registry", Dragonfly will pull data from proxy.registryMirror.url.
|
||||
"X-Dragonfly-Registry": "https://index.docker.io"
|
||||
},
|
||||
// This URL endpoint is used to check the health of mirror server, and if the mirror is unhealthy,
|
||||
// the request will fallback to the next mirror or the original registry server.
|
||||
// Use $host/v2 as default if left empty.
|
||||
"ping_url": "http://127.0.0.1:40901/server/ping",
|
||||
// Interval time (s) to check and recover unavailable mirror. Use 5 as default if left empty.
|
||||
"health_check_interval": 5,
|
||||
// Failure counts before disabling this mirror. Use 5 as default if left empty.
|
||||
"failure_limit": 5,
|
||||
// Elapsed time to pause mirror health check when the request is inactive, in seconds.
|
||||
// Use 300 as default if left empty.
|
||||
"health_check_pause_elapsed": 300,
|
||||
},
|
||||
{
|
||||
"host": "http://dragonfly2.io:65001",
|
||||
"headers": {
|
||||
"X-Dragonfly-Registry": "https://index.docker.io"
|
||||
},
|
||||
}
|
||||
],
|
||||
...
|
||||
}
|
||||
},
|
||||
...
|
||||
},
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
-
|
||||
- The `HttpProxy` backend also supports the `Proxy` configuration for remote usage like the `Registry backend` described above.
|
||||
##### Enable P2P Proxy for Storage Backend
|
||||
|
||||
Add `device.backend.config.proxy` field to enable HTTP proxy for storage backend. For example, use P2P distribution service to reduce network workload and latency in large scale container cluster using [Dragonfly](https://d7y.io/) (enable centralized dfdaemon mode).
|
||||
|
|
|
@ -50,18 +50,6 @@ check_interval = 5
|
|||
# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
|
||||
use_http = false
|
||||
|
||||
[[backend.oss.mirrors]]
|
||||
# Mirror server URL, for example http://127.0.0.1:65001.
|
||||
host = "http://127.0.0.1:65001"
|
||||
# Ping URL to check mirror server health.
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
# HTTP request headers to be passed to mirror server.
|
||||
# headers =
|
||||
# Interval for mirror health checking, in seconds.
|
||||
health_check_interval = 5
|
||||
# Maximum number of failures before marking a mirror as unusable.
|
||||
failure_limit = 5
|
||||
|
||||
[backend.registy]
|
||||
# Registry http scheme, either 'http' or 'https'
|
||||
scheme = "https"
|
||||
|
@ -99,18 +87,6 @@ check_interval = 5
|
|||
# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
|
||||
use_http = false
|
||||
|
||||
[[backend.registry.mirrors]]
|
||||
# Mirror server URL, for example http://127.0.0.1:65001.
|
||||
host = "http://127.0.0.1:65001"
|
||||
# Ping URL to check mirror server health.
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
# HTTP request headers to be passed to mirror server.
|
||||
# headers =
|
||||
# Interval for mirror health checking, in seconds.
|
||||
health_check_interval = 5
|
||||
# Maximum number of failures before marking a mirror as unusable.
|
||||
failure_limit = 5
|
||||
|
||||
[cache]
|
||||
# Type of blob cache: "blobcache", "filecache", "fscache", "dummycache" or ""
|
||||
type = "filecache"
|
||||
|
|
|
@ -55,18 +55,6 @@ check_interval = 5
|
|||
# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
|
||||
use_http = false
|
||||
|
||||
[[config_v2.backend.oss.mirrors]]
|
||||
# Mirror server URL, for example http://127.0.0.1:65001.
|
||||
host = "http://127.0.0.1:65001"
|
||||
# Ping URL to check mirror server health.
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
# HTTP request headers to be passed to mirror server.
|
||||
# headers =
|
||||
# Interval for mirror health checking, in seconds.
|
||||
health_check_interval = 5
|
||||
# Maximum number of failures before marking a mirror as unusable.
|
||||
failure_limit = 5
|
||||
|
||||
[config_v2.backend.registry]
|
||||
# Registry http scheme, either 'http' or 'https'
|
||||
scheme = "https"
|
||||
|
@ -104,18 +92,6 @@ check_interval = 5
|
|||
# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
|
||||
use_http = false
|
||||
|
||||
[[config_v2.backend.registry.mirrors]]
|
||||
# Mirror server URL, for example http://127.0.0.1:65001.
|
||||
host = "http://127.0.0.1:65001"
|
||||
# Ping URL to check mirror server health.
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
# HTTP request headers to be passed to mirror server.
|
||||
# headers =
|
||||
# Interval for mirror health checking, in seconds.
|
||||
health_check_interval = 5
|
||||
# Maximum number of failures before marking a mirror as unusable.
|
||||
failure_limit = 5
|
||||
|
||||
[config_v2.cache]
|
||||
# Type of blob cache: "blobcache", "filecache", "fscache", "dummycache" or ""
|
||||
type = "filecache"
|
||||
|
|
|
@ -48,18 +48,6 @@ check_interval = 5
|
|||
# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
|
||||
use_http = false
|
||||
|
||||
[[backend.oss.mirrors]]
|
||||
# Mirror server URL, for example http://127.0.0.1:65001.
|
||||
host = "http://127.0.0.1:65001"
|
||||
# Ping URL to check mirror server health.
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
# HTTP request headers to be passed to mirror server.
|
||||
# headers =
|
||||
# Interval for mirror health checking, in seconds.
|
||||
health_check_interval = 5
|
||||
# Maximum number of failures before marking a mirror as unusable.
|
||||
failure_limit = 5
|
||||
|
||||
[backend.registy]
|
||||
# Registry http scheme, either 'http' or 'https'
|
||||
scheme = "https"
|
||||
|
@ -97,17 +85,6 @@ check_interval = 5
|
|||
# Replace URL to http to request source registry with proxy, and allow fallback to https if the proxy is unhealthy.
|
||||
use_http = false
|
||||
|
||||
[[backend.registry.mirrors]]
|
||||
# Mirror server URL, for example http://127.0.0.1:65001.
|
||||
host = "http://127.0.0.1:65001"
|
||||
# Ping URL to check mirror server health.
|
||||
ping_url = "http://127.0.0.1:65001/ping"
|
||||
# HTTP request headers to be passed to mirror server.
|
||||
# headers =
|
||||
# Interval for mirror health checking, in seconds.
|
||||
health_check_interval = 5
|
||||
# Maximum number of failures before marking a mirror as unusable.
|
||||
failure_limit = 5
|
||||
|
||||
[cache]
|
||||
# Type of blob cache: "blobcache", "filecache", "fscache", "dummycache" or ""
|
||||
|
|
|
@ -63,12 +63,6 @@ address = ":9110"
|
|||
[remote]
|
||||
convert_vpc_registry = false
|
||||
|
||||
[remote.mirrors_config]
|
||||
# Snapshotter will overwrite daemon's mirrors configuration
|
||||
# if the values loaded from this driectory are not null before starting a daemon.
|
||||
# Set to "" or an empty directory to disable it.
|
||||
#dir = "/etc/nydus/certs.d"
|
||||
|
||||
[remote.auth]
|
||||
# Fetch the private registry auth by listening to K8s API server
|
||||
enable_kubeconfig_keychain = false
|
||||
|
|
|
@ -7,14 +7,13 @@ use std::cell::RefCell;
|
|||
use std::collections::HashMap;
|
||||
use std::io::{Read, Result};
|
||||
use std::str::FromStr;
|
||||
use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU64, AtomicU8, Ordering};
|
||||
use std::sync::atomic::{AtomicBool, AtomicI16, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
use std::{fmt, thread};
|
||||
|
||||
use log::{max_level, Level};
|
||||
|
||||
use reqwest::header::{HeaderName, HeaderValue};
|
||||
use reqwest::{
|
||||
self,
|
||||
blocking::{Body, Client, Response},
|
||||
|
@ -23,7 +22,7 @@ use reqwest::{
|
|||
Method, StatusCode, Url,
|
||||
};
|
||||
|
||||
use nydus_api::{HttpProxyConfig, MirrorConfig, OssConfig, ProxyConfig, RegistryConfig, S3Config};
|
||||
use nydus_api::{HttpProxyConfig, OssConfig, ProxyConfig, RegistryConfig, S3Config};
|
||||
use url::ParseError;
|
||||
|
||||
const HEADER_AUTHORIZATION: &str = "Authorization";
|
||||
|
@ -43,8 +42,6 @@ pub enum ConnectionError {
|
|||
Format(reqwest::Error),
|
||||
Url(String, ParseError),
|
||||
Scheme(String),
|
||||
MirrorHost,
|
||||
MirrorPort,
|
||||
}
|
||||
|
||||
impl fmt::Display for ConnectionError {
|
||||
|
@ -56,8 +53,6 @@ impl fmt::Display for ConnectionError {
|
|||
ConnectionError::Format(e) => write!(f, "{}", e),
|
||||
ConnectionError::Url(s, e) => write!(f, "failed to parse URL {}, {}", s, e),
|
||||
ConnectionError::Scheme(s) => write!(f, "invalid scheme {}", s),
|
||||
ConnectionError::MirrorHost => write!(f, "invalid mirror host"),
|
||||
ConnectionError::MirrorPort => write!(f, "invalid mirror port"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -69,7 +64,6 @@ type ConnectionResult<T> = std::result::Result<T, ConnectionError>;
|
|||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ConnectionConfig {
|
||||
pub proxy: ProxyConfig,
|
||||
pub mirrors: Vec<MirrorConfig>,
|
||||
pub skip_verify: bool,
|
||||
pub timeout: u32,
|
||||
pub connect_timeout: u32,
|
||||
|
@ -80,7 +74,6 @@ impl Default for ConnectionConfig {
|
|||
fn default() -> Self {
|
||||
Self {
|
||||
proxy: ProxyConfig::default(),
|
||||
mirrors: Vec::<MirrorConfig>::new(),
|
||||
skip_verify: false,
|
||||
timeout: 5,
|
||||
connect_timeout: 5,
|
||||
|
@ -93,7 +86,6 @@ impl From<OssConfig> for ConnectionConfig {
|
|||
fn from(c: OssConfig) -> ConnectionConfig {
|
||||
ConnectionConfig {
|
||||
proxy: c.proxy,
|
||||
mirrors: c.mirrors,
|
||||
skip_verify: c.skip_verify,
|
||||
timeout: c.timeout,
|
||||
connect_timeout: c.connect_timeout,
|
||||
|
@ -106,7 +98,6 @@ impl From<S3Config> for ConnectionConfig {
|
|||
fn from(c: S3Config) -> ConnectionConfig {
|
||||
ConnectionConfig {
|
||||
proxy: c.proxy,
|
||||
mirrors: c.mirrors,
|
||||
skip_verify: c.skip_verify,
|
||||
timeout: c.timeout,
|
||||
connect_timeout: c.connect_timeout,
|
||||
|
@ -119,7 +110,6 @@ impl From<RegistryConfig> for ConnectionConfig {
|
|||
fn from(c: RegistryConfig) -> ConnectionConfig {
|
||||
ConnectionConfig {
|
||||
proxy: c.proxy,
|
||||
mirrors: c.mirrors,
|
||||
skip_verify: c.skip_verify,
|
||||
timeout: c.timeout,
|
||||
connect_timeout: c.connect_timeout,
|
||||
|
@ -132,7 +122,6 @@ impl From<HttpProxyConfig> for ConnectionConfig {
|
|||
fn from(c: HttpProxyConfig) -> ConnectionConfig {
|
||||
ConnectionConfig {
|
||||
proxy: c.proxy,
|
||||
mirrors: c.mirrors,
|
||||
skip_verify: c.skip_verify,
|
||||
timeout: c.timeout,
|
||||
connect_timeout: c.connect_timeout,
|
||||
|
@ -264,45 +253,11 @@ pub(crate) fn respond(resp: Response, catch_status: bool) -> ConnectionResult<Re
|
|||
pub(crate) struct Connection {
|
||||
client: Client,
|
||||
proxy: Option<Arc<Proxy>>,
|
||||
pub mirrors: Vec<Arc<Mirror>>,
|
||||
pub shutdown: AtomicBool,
|
||||
/// Timestamp of connection's last active request, represents as duration since UNIX_EPOCH in seconds.
|
||||
last_active: Arc<AtomicU64>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Mirror {
|
||||
/// Information for mirror from configuration file.
|
||||
pub config: MirrorConfig,
|
||||
/// Mirror status, it will be set to false by atomic operation when mirror is not work.
|
||||
status: AtomicBool,
|
||||
/// Failed times requesting mirror, the status will be marked as false when failed_times = failure_limit.
|
||||
failed_times: AtomicU8,
|
||||
/// Failure count for which mirror is considered unavailable.
|
||||
failure_limit: u8,
|
||||
}
|
||||
|
||||
impl Mirror {
|
||||
/// Convert original URL to mirror URL.
|
||||
fn mirror_url(&self, url: &str) -> ConnectionResult<Url> {
|
||||
let mirror_host = Url::parse(&self.config.host)
|
||||
.map_err(|e| ConnectionError::Url(self.config.host.clone(), e))?;
|
||||
let mut current_url =
|
||||
Url::parse(url).map_err(|e| ConnectionError::Url(url.to_string(), e))?;
|
||||
|
||||
current_url
|
||||
.set_scheme(mirror_host.scheme())
|
||||
.map_err(|_| ConnectionError::Scheme(mirror_host.scheme().to_string()))?;
|
||||
current_url
|
||||
.set_host(mirror_host.host_str())
|
||||
.map_err(|_| ConnectionError::MirrorHost)?;
|
||||
current_url
|
||||
.set_port(mirror_host.port())
|
||||
.map_err(|_| ConnectionError::MirrorPort)?;
|
||||
Ok(current_url)
|
||||
}
|
||||
}
|
||||
|
||||
impl Connection {
|
||||
/// Create a new connection according to the configuration.
|
||||
pub fn new(config: &ConnectionConfig) -> Result<Arc<Connection>> {
|
||||
|
@ -330,22 +285,9 @@ impl Connection {
|
|||
None
|
||||
};
|
||||
|
||||
let mut mirrors = Vec::new();
|
||||
for mirror_config in config.mirrors.iter() {
|
||||
if !mirror_config.host.is_empty() {
|
||||
mirrors.push(Arc::new(Mirror {
|
||||
config: mirror_config.clone(),
|
||||
status: AtomicBool::from(true),
|
||||
failed_times: AtomicU8::from(0),
|
||||
failure_limit: mirror_config.failure_limit,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
let connection = Arc::new(Connection {
|
||||
client,
|
||||
proxy,
|
||||
mirrors,
|
||||
shutdown: AtomicBool::new(false),
|
||||
last_active: Arc::new(AtomicU64::new(
|
||||
SystemTime::now()
|
||||
|
@ -358,9 +300,6 @@ impl Connection {
|
|||
// Start proxy's health checking thread.
|
||||
connection.start_proxy_health_thread(config.connect_timeout as u64);
|
||||
|
||||
// Start mirrors' health checking thread.
|
||||
connection.start_mirrors_health_thread(config.timeout as u64);
|
||||
|
||||
Ok(connection)
|
||||
}
|
||||
|
||||
|
@ -417,72 +356,6 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
fn start_mirrors_health_thread(&self, timeout: u64) {
|
||||
for mirror in self.mirrors.iter() {
|
||||
let mirror_cloned = mirror.clone();
|
||||
let last_active = Arc::clone(&self.last_active);
|
||||
|
||||
// Spawn thread to update the health status of mirror server.
|
||||
thread::spawn(move || {
|
||||
let mirror_health_url = if mirror_cloned.config.ping_url.is_empty() {
|
||||
format!("{}/v2", mirror_cloned.config.host)
|
||||
} else {
|
||||
mirror_cloned.config.ping_url.clone()
|
||||
};
|
||||
info!(
|
||||
"[mirror] start health check, ping url: {}",
|
||||
mirror_health_url
|
||||
);
|
||||
|
||||
let client = Client::new();
|
||||
loop {
|
||||
let elapsed = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
- last_active.load(Ordering::Relaxed);
|
||||
// If the connection is not active for a set time, skip mirror health check.
|
||||
if elapsed <= mirror_cloned.config.health_check_pause_elapsed {
|
||||
// Try to recover the mirror server when it is unavailable.
|
||||
if !mirror_cloned.status.load(Ordering::Relaxed) {
|
||||
info!(
|
||||
"[mirror] server unhealthy, try to recover: {}",
|
||||
mirror_cloned.config.host
|
||||
);
|
||||
|
||||
let _ = client
|
||||
.get(mirror_health_url.as_str())
|
||||
.timeout(Duration::from_secs(timeout as u64))
|
||||
.send()
|
||||
.map(|resp| {
|
||||
// If the response status is less than StatusCode::INTERNAL_SERVER_ERROR,
|
||||
// the mirror server is recovered.
|
||||
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
|
||||
info!(
|
||||
"[mirror] server recovered: {}",
|
||||
mirror_cloned.config.host
|
||||
);
|
||||
mirror_cloned.failed_times.store(0, Ordering::Relaxed);
|
||||
mirror_cloned.status.store(true, Ordering::Relaxed);
|
||||
}
|
||||
})
|
||||
.map_err(|e| {
|
||||
warn!(
|
||||
"[mirror] failed to recover server: {}, {}",
|
||||
mirror_cloned.config.host, e
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
thread::sleep(Duration::from_secs(
|
||||
mirror_cloned.config.health_check_interval,
|
||||
));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Shutdown the connection.
|
||||
pub fn shutdown(&self) {
|
||||
self.shutdown.store(true, Ordering::Release);
|
||||
|
@ -562,69 +435,6 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
let mut mirror_enabled = false;
|
||||
if !self.mirrors.is_empty() {
|
||||
mirror_enabled = true;
|
||||
for mirror in self.mirrors.iter() {
|
||||
if mirror.status.load(Ordering::Relaxed) {
|
||||
let data_cloned = data.as_ref().cloned();
|
||||
|
||||
for (key, value) in mirror.config.headers.iter() {
|
||||
headers.insert(
|
||||
HeaderName::from_str(key).unwrap(),
|
||||
HeaderValue::from_str(value).unwrap(),
|
||||
);
|
||||
}
|
||||
|
||||
let current_url = mirror.mirror_url(url)?;
|
||||
debug!("[mirror] replace to: {}", current_url);
|
||||
|
||||
let result = self.call_inner(
|
||||
&self.client,
|
||||
method.clone(),
|
||||
current_url.as_str(),
|
||||
&query,
|
||||
data_cloned,
|
||||
headers,
|
||||
catch_status,
|
||||
false,
|
||||
);
|
||||
|
||||
match result {
|
||||
Ok(resp) => {
|
||||
// If the response status >= INTERNAL_SERVER_ERROR, move to the next mirror server.
|
||||
if resp.status() < StatusCode::INTERNAL_SERVER_ERROR {
|
||||
return Ok(resp);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"[mirror] request failed, server: {:?}, {:?}",
|
||||
mirror.config.host, err
|
||||
);
|
||||
mirror.failed_times.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
if mirror.failed_times.load(Ordering::Relaxed) >= mirror.failure_limit {
|
||||
warn!(
|
||||
"[mirror] exceed failure limit {}, server disabled: {:?}",
|
||||
mirror.failure_limit, mirror
|
||||
);
|
||||
mirror.status.store(false, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Remove mirror-related headers to avoid sending them to the next mirror server and original registry.
|
||||
for (key, _) in mirror.config.headers.iter() {
|
||||
headers.remove(HeaderName::from_str(key).unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if mirror_enabled {
|
||||
warn!("[mirror] request all servers failed, fallback to original server.");
|
||||
}
|
||||
|
||||
self.call_inner(
|
||||
&self.client,
|
||||
method,
|
||||
|
@ -788,6 +598,5 @@ mod tests {
|
|||
assert!(config.proxy.fallback);
|
||||
assert_eq!(config.proxy.ping_url, "");
|
||||
assert_eq!(config.proxy.url, "");
|
||||
assert!(config.mirrors.is_empty());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -432,17 +432,21 @@ impl RegistryState {
|
|||
Some(Auth::Basic(BasicAuth { realm }))
|
||||
}
|
||||
"Bearer" => {
|
||||
if !paras.contains_key("realm")
|
||||
|| !paras.contains_key("service")
|
||||
|| !paras.contains_key("scope")
|
||||
{
|
||||
if !paras.contains_key("realm") || !paras.contains_key("service") {
|
||||
return None;
|
||||
}
|
||||
|
||||
let scope = if let Some(scope) = paras.get("scope") {
|
||||
(*scope).to_string()
|
||||
} else {
|
||||
debug!("no scope specified for token auth challenge");
|
||||
String::new()
|
||||
};
|
||||
|
||||
Some(Auth::Bearer(BearerAuth {
|
||||
realm: (*paras.get("realm").unwrap()).to_string(),
|
||||
service: (*paras.get("service").unwrap()).to_string(),
|
||||
scope: (*paras.get("scope").unwrap()).to_string(),
|
||||
scope,
|
||||
}))
|
||||
}
|
||||
_ => None,
|
||||
|
@ -861,12 +865,6 @@ impl Registry {
|
|||
let id = id.ok_or_else(|| einval!("Registry backend requires blob_id"))?;
|
||||
let con_config: ConnectionConfig = config.clone().into();
|
||||
|
||||
if !config.proxy.url.is_empty() && !config.mirrors.is_empty() {
|
||||
return Err(einval!(
|
||||
"connection: proxy and mirrors cannot be configured at the same time."
|
||||
));
|
||||
}
|
||||
|
||||
let retry_limit = con_config.retry_limit;
|
||||
let connection = Connection::new(&con_config)?;
|
||||
let auth = trim(config.auth.clone());
|
||||
|
@ -1112,12 +1110,25 @@ mod tests {
|
|||
_ => panic!("failed to parse `Bearer` authentication header"),
|
||||
}
|
||||
|
||||
// No scope is accetpable
|
||||
let str = "Bearer realm=\"https://auth.my-registry.com/token\",service=\"my-registry.com\"";
|
||||
let header = HeaderValue::from_str(str).unwrap();
|
||||
let auth = RegistryState::parse_auth(&header).unwrap();
|
||||
match auth {
|
||||
Auth::Bearer(auth) => {
|
||||
assert_eq!(&auth.realm, "https://auth.my-registry.com/token");
|
||||
assert_eq!(&auth.service, "my-registry.com");
|
||||
assert_eq!(&auth.scope, "");
|
||||
}
|
||||
_ => panic!("failed to parse `Bearer` authentication header without scope"),
|
||||
}
|
||||
|
||||
let str = "Basic realm=\"https://auth.my-registry.com/token\"";
|
||||
let header = HeaderValue::from_str(str).unwrap();
|
||||
let auth = RegistryState::parse_auth(&header).unwrap();
|
||||
match auth {
|
||||
Auth::Basic(auth) => assert_eq!(&auth.realm, "https://auth.my-registry.com/token"),
|
||||
_ => panic!("failed to parse `Bearer` authentication header"),
|
||||
_ => panic!("failed to parse `Basic` authentication header"),
|
||||
}
|
||||
|
||||
let str = "Base realm=\"https://auth.my-registry.com/token\"";
|
||||
|
|
Loading…
Reference in New Issue