Update CRI image service to pull using transfer service
- adds a transfer service progress reporter to handle timeouts. Also other test fixes - fallback to local image pull when configuration conflict Signed-off-by: Tony Fang <nhfang@amazon.com> Co-authored-by: Swagat Bora <sbora@amazon.com>
This commit is contained in:
parent
d5534c6014
commit
b694be29a0
|
@ -37,6 +37,7 @@ import (
|
|||
sandboxsapi "github.com/containerd/containerd/api/services/sandbox/v1"
|
||||
snapshotsapi "github.com/containerd/containerd/api/services/snapshots/v1"
|
||||
"github.com/containerd/containerd/api/services/tasks/v1"
|
||||
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
|
||||
versionservice "github.com/containerd/containerd/api/services/version/v1"
|
||||
apitypes "github.com/containerd/containerd/api/types"
|
||||
"github.com/containerd/containerd/v2/core/containers"
|
||||
|
@ -55,6 +56,8 @@ import (
|
|||
sandboxproxy "github.com/containerd/containerd/v2/core/sandbox/proxy"
|
||||
"github.com/containerd/containerd/v2/core/snapshots"
|
||||
snproxy "github.com/containerd/containerd/v2/core/snapshots/proxy"
|
||||
"github.com/containerd/containerd/v2/core/transfer"
|
||||
transferproxy "github.com/containerd/containerd/v2/core/transfer/proxy"
|
||||
"github.com/containerd/containerd/v2/defaults"
|
||||
"github.com/containerd/containerd/v2/pkg/dialer"
|
||||
"github.com/containerd/containerd/v2/pkg/namespaces"
|
||||
|
@ -752,6 +755,16 @@ func (c *Client) SandboxController(name string) sandbox.Controller {
|
|||
return sandboxproxy.NewSandboxController(sandboxsapi.NewControllerClient(c.conn), name)
|
||||
}
|
||||
|
||||
// TranferService returns the underlying transferrer
|
||||
func (c *Client) TransferService() transfer.Transferrer {
|
||||
if c.transferService != nil {
|
||||
return c.transferService
|
||||
}
|
||||
c.connMu.Lock()
|
||||
defer c.connMu.Unlock()
|
||||
return transferproxy.NewTransferrer(transferapi.NewTransferClient(c.conn), c.streamCreator())
|
||||
}
|
||||
|
||||
// VersionService returns the underlying VersionClient
|
||||
func (c *Client) VersionService() versionservice.VersionClient {
|
||||
c.connMu.Lock()
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/containerd/containerd/v2/core/leases"
|
||||
"github.com/containerd/containerd/v2/core/sandbox"
|
||||
"github.com/containerd/containerd/v2/core/snapshots"
|
||||
"github.com/containerd/containerd/v2/core/transfer"
|
||||
"github.com/containerd/containerd/v2/pkg/namespaces"
|
||||
"github.com/containerd/containerd/v2/plugins"
|
||||
srv "github.com/containerd/containerd/v2/plugins/services"
|
||||
|
@ -50,6 +51,7 @@ type services struct {
|
|||
introspectionService introspection.Service
|
||||
sandboxStore sandbox.Store
|
||||
sandboxers map[string]sandbox.Controller
|
||||
transferService transfer.Transferrer
|
||||
}
|
||||
|
||||
// ServicesOpt allows callers to set options on the services
|
||||
|
@ -163,6 +165,13 @@ func WithSandboxStore(client sandbox.Store) ServicesOpt {
|
|||
}
|
||||
}
|
||||
|
||||
// WithTransferService sets the transfer service.
|
||||
func WithTransferService(tr transfer.Transferrer) ServicesOpt {
|
||||
return func(s *services) {
|
||||
s.transferService = tr
|
||||
}
|
||||
}
|
||||
|
||||
// WithInMemoryServices is suitable for cases when there is need to use containerd's client from
|
||||
// another (in-memory) containerd plugin (such as CRI).
|
||||
func WithInMemoryServices(ic *plugin.InitContext) Opt {
|
||||
|
@ -178,6 +187,9 @@ func WithInMemoryServices(ic *plugin.InitContext) Opt {
|
|||
plugins.SandboxStorePlugin: func(i interface{}) ServicesOpt {
|
||||
return WithSandboxStore(i.(sandbox.Store))
|
||||
},
|
||||
plugins.TransferPlugin: func(i interface{}) ServicesOpt {
|
||||
return WithTransferService(i.(transfer.Transferrer))
|
||||
},
|
||||
} {
|
||||
i, err := ic.GetSingle(t)
|
||||
if err != nil {
|
||||
|
|
|
@ -93,6 +93,7 @@ func NewOCIRegistry(ctx context.Context, ref string, opts ...Opt) (*OCIRegistry,
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
hostOptions := config.HostOptions{}
|
||||
if ropts.hostDir != "" {
|
||||
hostOptions.HostDir = config.HostDirFromRoot(ropts.hostDir)
|
||||
|
@ -111,10 +112,12 @@ func NewOCIRegistry(ctx context.Context, ref string, opts ...Opt) (*OCIRegistry,
|
|||
if ropts.defaultScheme != "" {
|
||||
hostOptions.DefaultScheme = ropts.defaultScheme
|
||||
}
|
||||
|
||||
resolver := docker.NewResolver(docker.ResolverOptions{
|
||||
Hosts: config.ConfigureHosts(ctx, hostOptions),
|
||||
Headers: ropts.headers,
|
||||
})
|
||||
|
||||
return &OCIRegistry{
|
||||
reference: ref,
|
||||
headers: ropts.headers,
|
||||
|
|
|
@ -185,6 +185,7 @@ version = 3
|
|||
image_pull_progress_timeout = '5m0s'
|
||||
image_pull_with_sync_fs = false
|
||||
stats_collect_period = 10
|
||||
use_local_image_pull = false
|
||||
|
||||
[plugins.'io.containerd.cri.v1.images'.pinned_images]
|
||||
sandbox = 'registry.k8s.io/pause:3.10'
|
||||
|
@ -329,8 +330,8 @@ version = 2
|
|||
tolerate_missing_hugetlb_controller = true
|
||||
|
||||
# ignore_image_defined_volumes ignores volumes defined by the image. Useful for better resource
|
||||
# isolation, security and early detection of issues in the mount configuration when using
|
||||
# ReadOnlyRootFilesystem since containers won't silently mount a temporary volume.
|
||||
# isolation, security and early detection of issues in the mount configuration when using
|
||||
# ReadOnlyRootFilesystem since containers won't silently mount a temporary volume.
|
||||
ignore_image_defined_volumes = false
|
||||
|
||||
# netns_mounts_under_state_dir places all mounts for network namespaces under StateDir/netns
|
||||
|
@ -338,14 +339,6 @@ version = 2
|
|||
# requires that all containers are deleted.
|
||||
netns_mounts_under_state_dir = false
|
||||
|
||||
# 'plugins."io.containerd.grpc.v1.cri".x509_key_pair_streaming' contains a x509 valid key pair to stream with tls.
|
||||
[plugins."io.containerd.grpc.v1.cri".x509_key_pair_streaming]
|
||||
# tls_cert_file is the filepath to the certificate paired with the "tls_key_file"
|
||||
tls_cert_file = ""
|
||||
|
||||
# tls_key_file is the filepath to the private key paired with the "tls_cert_file"
|
||||
tls_key_file = ""
|
||||
|
||||
# max_container_log_line_size is the maximum log line size in bytes for a container.
|
||||
# Log line longer than the limit will be split into multiple lines. -1 means no
|
||||
# limit.
|
||||
|
@ -420,6 +413,14 @@ version = 2
|
|||
# For example, the value can be '5h', '2h30m', '10s'.
|
||||
drain_exec_sync_io_timeout = "0s"
|
||||
|
||||
# 'plugins."io.containerd.grpc.v1.cri".x509_key_pair_streaming' contains a x509 valid key pair to stream with tls.
|
||||
[plugins."io.containerd.grpc.v1.cri".x509_key_pair_streaming]
|
||||
# tls_cert_file is the filepath to the certificate paired with the "tls_key_file"
|
||||
tls_cert_file = ""
|
||||
|
||||
# tls_key_file is the filepath to the private key paired with the "tls_cert_file"
|
||||
tls_key_file = ""
|
||||
|
||||
# 'plugins."io.containerd.grpc.v1.cri".containerd' contains config related to containerd
|
||||
[plugins."io.containerd.grpc.v1.cri".containerd]
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ import (
|
|||
_ "github.com/containerd/containerd/v2/plugins/services/snapshots"
|
||||
_ "github.com/containerd/containerd/v2/plugins/services/tasks"
|
||||
_ "github.com/containerd/containerd/v2/plugins/services/version"
|
||||
_ "github.com/containerd/containerd/v2/plugins/transfer"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
|
|
@ -18,7 +18,9 @@ package integration
|
|||
|
||||
import (
|
||||
// Register for linux platforms
|
||||
_ "github.com/containerd/containerd/v2/plugins/imageverifier" // WithInMemoryServices will fail otherwise
|
||||
_ "github.com/containerd/containerd/v2/plugins/sandbox" // WithInMemoryServices will fail otherwise
|
||||
_ "github.com/containerd/containerd/v2/plugins/services/sandbox" // WithInMemoryServices will fail otherwise
|
||||
_ "github.com/containerd/containerd/v2/plugins/snapshots/overlay/plugin"
|
||||
_ "github.com/containerd/containerd/v2/plugins/streaming" // WithInMemoryServices will fail otherwise
|
||||
)
|
||||
|
|
|
@ -125,9 +125,9 @@ func TestRestartMonitor(t *testing.T) {
|
|||
)
|
||||
|
||||
configTOML := fmt.Sprintf(`
|
||||
version = 2
|
||||
version = 3
|
||||
[plugins]
|
||||
[plugins."io.containerd.internal.v1.restart"]
|
||||
[plugins."io.containerd.monitor.container.v1.restart"]
|
||||
interval = "%s"
|
||||
`, interval.String())
|
||||
client, _, cleanup := newDaemonWithConfig(t, configTOML)
|
||||
|
|
|
@ -61,9 +61,12 @@ func TestCRIImagePullTimeout(t *testing.T) {
|
|||
t.Skip()
|
||||
}
|
||||
|
||||
t.Run("HoldingContentOpenWriter", testCRIImagePullTimeoutByHoldingContentOpenWriter)
|
||||
t.Run("NoDataTransferred", testCRIImagePullTimeoutByNoDataTransferred)
|
||||
t.Run("SlowCommitWriter", testCRIImagePullTimeoutBySlowCommitWriter)
|
||||
t.Run("HoldingContentOpenWriterWithLocalPull", testCRIImagePullTimeoutByHoldingContentOpenWriterWithLocal)
|
||||
t.Run("HoldingContentOpenWriterWithTransferService", testCRIImagePullTimeoutByHoldingContentOpenWriterWithTransfer)
|
||||
t.Run("NoDataTransferredWithLocalPull", testCRIImagePullTimeoutByNoDataTransferredWithLocal)
|
||||
t.Run("NoDataTransferredWithTransferService", testCRIImagePullTimeoutByNoDataTransferredWithTransfer)
|
||||
t.Run("SlowCommitWriterWithLocalPull", testCRIImagePullTimeoutBySlowCommitWriterWithLocal)
|
||||
t.Run("SlowCommitWriterWithTransferService", testCRIImagePullTimeoutBySlowCommitWriterWithTransfer)
|
||||
}
|
||||
|
||||
// testCRIImagePullTimeoutBySlowCommitWriter tests that
|
||||
|
@ -79,15 +82,13 @@ func TestCRIImagePullTimeout(t *testing.T) {
|
|||
// ImagePull.
|
||||
//
|
||||
// It's reproducer for #9347.
|
||||
func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T, useLocal bool) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
delayDuration := 2 * defaultImagePullProgressTimeout
|
||||
cli := buildLocalContainerdClient(t, tmpDir, tweakContentInitFnWithDelayer(delayDuration))
|
||||
|
||||
criService, err := initLocalCRIImageService(cli, tmpDir, criconfig.Registry{})
|
||||
criService, err := initLocalCRIImageService(cli, tmpDir, criconfig.Registry{}, useLocal)
|
||||
assert.NoError(t, err)
|
||||
|
||||
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
||||
|
@ -96,6 +97,16 @@ func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
func testCRIImagePullTimeoutBySlowCommitWriterWithLocal(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCRIImagePullTimeoutBySlowCommitWriter(t, true)
|
||||
}
|
||||
|
||||
func testCRIImagePullTimeoutBySlowCommitWriterWithTransfer(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCRIImagePullTimeoutBySlowCommitWriter(t, false)
|
||||
}
|
||||
|
||||
// testCRIImagePullTimeoutByHoldingContentOpenWriter tests that
|
||||
//
|
||||
// It should not cancel if there is no active http requests.
|
||||
|
@ -103,14 +114,12 @@ func testCRIImagePullTimeoutBySlowCommitWriter(t *testing.T) {
|
|||
// When there are several pulling requests for the same blob content, there
|
||||
// will only one active http request. It is singleflight. For the waiting pulling
|
||||
// request, we should not cancel.
|
||||
func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T, useLocal bool) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
||||
|
||||
criService, err := initLocalCRIImageService(cli, tmpDir, criconfig.Registry{})
|
||||
criService, err := initLocalCRIImageService(cli, tmpDir, criconfig.Registry{}, useLocal)
|
||||
assert.NoError(t, err)
|
||||
|
||||
ctx := namespaces.WithNamespace(logtest.WithT(context.Background(), t), k8sNamespace)
|
||||
|
@ -225,6 +234,16 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {
|
|||
assert.NoError(t, <-errCh)
|
||||
}
|
||||
|
||||
func testCRIImagePullTimeoutByHoldingContentOpenWriterWithLocal(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCRIImagePullTimeoutByHoldingContentOpenWriter(t, true)
|
||||
}
|
||||
|
||||
func testCRIImagePullTimeoutByHoldingContentOpenWriterWithTransfer(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCRIImagePullTimeoutByHoldingContentOpenWriter(t, false)
|
||||
}
|
||||
|
||||
// testCRIImagePullTimeoutByNoDataTransferred tests that
|
||||
//
|
||||
// It should fail because there is no data transferred in open http request.
|
||||
|
@ -237,9 +256,7 @@ func testCRIImagePullTimeoutByHoldingContentOpenWriter(t *testing.T) {
|
|||
//
|
||||
// This case uses ghcr.io/containerd/volume-ownership:2.1 which has one layer > 3MB.
|
||||
// The circuit breaker will enable after transferred 3MB in one connection.
|
||||
func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T, useLocal bool) {
|
||||
tmpDir := t.TempDir()
|
||||
|
||||
cli := buildLocalContainerdClient(t, tmpDir, nil)
|
||||
|
@ -288,7 +305,12 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {
|
|||
},
|
||||
},
|
||||
} {
|
||||
criService, err := initLocalCRIImageService(cli, tmpDir, registryCfg)
|
||||
// Skip Mirrors configuration (idx=1) when using transfer service
|
||||
if idx == 1 && !useLocal {
|
||||
t.Log("Skipping Mirrors configuration with Transfer service as it's not supported")
|
||||
continue
|
||||
}
|
||||
criService, err := initLocalCRIImageService(cli, tmpDir, registryCfg, useLocal)
|
||||
assert.NoError(t, err)
|
||||
|
||||
dctx, _, err := cli.WithLease(ctx)
|
||||
|
@ -307,6 +329,16 @@ func testCRIImagePullTimeoutByNoDataTransferred(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testCRIImagePullTimeoutByNoDataTransferredWithLocal(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCRIImagePullTimeoutByNoDataTransferred(t, true)
|
||||
}
|
||||
|
||||
func testCRIImagePullTimeoutByNoDataTransferredWithTransfer(t *testing.T) {
|
||||
t.Parallel()
|
||||
testCRIImagePullTimeoutByNoDataTransferred(t, false)
|
||||
}
|
||||
|
||||
func setupLocalMirrorRegistry(srv *mirrorRegistryServer) *httptest.Server {
|
||||
return httptest.NewServer(srv)
|
||||
}
|
||||
|
@ -473,7 +505,7 @@ func (l *ioCopyLimiter) limitedCopy(ctx context.Context, dst io.Writer, src io.R
|
|||
//
|
||||
// NOTE: We don't need to start the CRI plugin here because we just need the
|
||||
// ImageService API.
|
||||
func initLocalCRIImageService(client *containerd.Client, tmpDir string, registryCfg criconfig.Registry) (criserver.ImageService, error) {
|
||||
func initLocalCRIImageService(client *containerd.Client, tmpDir string, registryCfg criconfig.Registry, useLocalPull bool) (criserver.ImageService, error) {
|
||||
containerdRootDir := filepath.Join(tmpDir, "root")
|
||||
|
||||
cfg := criconfig.ImageConfig{
|
||||
|
@ -481,6 +513,7 @@ func initLocalCRIImageService(client *containerd.Client, tmpDir string, registry
|
|||
Registry: registryCfg,
|
||||
ImagePullProgressTimeout: defaultImagePullProgressTimeout.String(),
|
||||
StatsCollectPeriod: 10,
|
||||
UseLocalImagePull: useLocalPull,
|
||||
}
|
||||
|
||||
return images.NewService(cfg, &images.CRIImageServiceOptions{
|
||||
|
@ -491,5 +524,6 @@ func initLocalCRIImageService(client *containerd.Client, tmpDir string, registry
|
|||
Content: client.ContentStore(),
|
||||
Images: client.ImageService(),
|
||||
Client: client,
|
||||
Transferrer: client.TransferService(),
|
||||
})
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net/url"
|
||||
gruntime "runtime"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
|
@ -335,6 +336,12 @@ type ImageConfig struct {
|
|||
|
||||
// StatsCollectPeriod is the period (in seconds) of snapshots stats collection.
|
||||
StatsCollectPeriod int `toml:"stats_collect_period" json:"statsCollectPeriod"`
|
||||
|
||||
// Uses client.Pull to pull images locally, instead of containerd's Transfer Service.
|
||||
// By default it is set to false, i.e. use transfer service to pull images.
|
||||
// When transfer service is used to pull images, pull related configs, like max_concurrent_downloads
|
||||
// and unpack_config are configured under [plugins."io.containerd.transfer.v1.local"]
|
||||
UseLocalImagePull bool `toml:"use_local_image_pull" json:"useLocalImagePull"`
|
||||
}
|
||||
|
||||
// RuntimeConfig contains toml config related to CRI plugin,
|
||||
|
@ -402,7 +409,6 @@ type RuntimeConfig struct {
|
|||
// For more details about CDI configuration please refer to
|
||||
// https://tags.cncf.io/container-device-interface#containerd-configuration
|
||||
CDISpecDirs []string `toml:"cdi_spec_dirs" json:"cdiSpecDirs"`
|
||||
|
||||
// DrainExecSyncIOTimeout is the maximum duration to wait for ExecSync
|
||||
// API' IO EOF event after exec init process exits. A zero value means
|
||||
// there is no timeout.
|
||||
|
@ -520,6 +526,79 @@ func ValidateImageConfig(ctx context.Context, c *ImageConfig) ([]deprecation.War
|
|||
return warnings, nil
|
||||
}
|
||||
|
||||
// CheckLocalImagePullConfigs checks if there are CRI Image Config options configured that are not supported
|
||||
// with transfer service and sets UseLocalImagePull to true. This ensures compatibility with configurations
|
||||
// that aren't supported or need to be configured differently when using transfer service.
|
||||
func CheckLocalImagePullConfigs(ctx context.Context, c *ImageConfig) {
|
||||
// If already using local image pull, no need to check for conflicts
|
||||
if c.UseLocalImagePull {
|
||||
return
|
||||
}
|
||||
|
||||
// List of Config options that automatically trigger fallback to local image pull
|
||||
localPullOnlyConfigs := []struct {
|
||||
Name string
|
||||
IsPresent func() bool
|
||||
Reason string
|
||||
}{
|
||||
{
|
||||
Name: "DisableSnapshotAnnotations",
|
||||
IsPresent: func() bool {
|
||||
if gruntime.GOOS == "windows" {
|
||||
return c.DisableSnapshotAnnotations
|
||||
}
|
||||
return !c.DisableSnapshotAnnotations
|
||||
},
|
||||
Reason: "moved to snapshotter plugin when using transfer service",
|
||||
},
|
||||
{
|
||||
Name: "DiscardUnpackedLayers",
|
||||
IsPresent: func() bool { return c.DiscardUnpackedLayers },
|
||||
Reason: "not supported with transfer service",
|
||||
},
|
||||
{
|
||||
Name: "Registry.Mirrors",
|
||||
IsPresent: func() bool { return len(c.Registry.Mirrors) > 0 },
|
||||
Reason: "not supported with transfer service (also deprecated)",
|
||||
},
|
||||
{
|
||||
Name: "Registry.Configs",
|
||||
IsPresent: func() bool { return len(c.Registry.Configs) > 0 },
|
||||
Reason: "not supported with transfer service (also deprecated)",
|
||||
},
|
||||
{
|
||||
Name: "Registry.Auths",
|
||||
IsPresent: func() bool { return len(c.Registry.Auths) > 0 },
|
||||
Reason: "not supported with transfer service (also deprecated)",
|
||||
},
|
||||
{
|
||||
Name: "MaxConcurrentDownloads",
|
||||
IsPresent: func() bool { return c.MaxConcurrentDownloads != 3 },
|
||||
Reason: "must be configured in transfer service plugin: plugins.\"io.containerd.transfer.v1.local\"",
|
||||
},
|
||||
{
|
||||
Name: "ImagePullWithSyncFs",
|
||||
IsPresent: func() bool { return c.ImagePullWithSyncFs },
|
||||
Reason: "not supported with transfer service",
|
||||
},
|
||||
}
|
||||
|
||||
for _, config := range localPullOnlyConfigs {
|
||||
if config.IsPresent() {
|
||||
// Fall back to local image pull
|
||||
c.UseLocalImagePull = true
|
||||
log.G(ctx).Warnf(
|
||||
"Found '%s' in CRI config which is incompatible with transfer service (%s). "+
|
||||
"Falling back to local image pull mode.",
|
||||
config.Name,
|
||||
config.Reason,
|
||||
)
|
||||
// Break after first conflict is found
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ValidateRuntimeConfig validates the given runtime configuration.
|
||||
func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation.Warning, error) {
|
||||
var warnings []deprecation.Warning
|
||||
|
|
|
@ -18,10 +18,11 @@ package config
|
|||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
criruntime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/v2/internal/cri/opts"
|
||||
"github.com/containerd/containerd/v2/pkg/deprecation"
|
||||
|
@ -341,35 +342,35 @@ func TestValidateConfig(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestHostAccessingSandbox(t *testing.T) {
|
||||
privilegedContext := &runtime.PodSandboxConfig{
|
||||
Linux: &runtime.LinuxPodSandboxConfig{
|
||||
SecurityContext: &runtime.LinuxSandboxSecurityContext{
|
||||
privilegedContext := &criruntime.PodSandboxConfig{
|
||||
Linux: &criruntime.LinuxPodSandboxConfig{
|
||||
SecurityContext: &criruntime.LinuxSandboxSecurityContext{
|
||||
Privileged: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
nonPrivilegedContext := &runtime.PodSandboxConfig{
|
||||
Linux: &runtime.LinuxPodSandboxConfig{
|
||||
SecurityContext: &runtime.LinuxSandboxSecurityContext{
|
||||
nonPrivilegedContext := &criruntime.PodSandboxConfig{
|
||||
Linux: &criruntime.LinuxPodSandboxConfig{
|
||||
SecurityContext: &criruntime.LinuxSandboxSecurityContext{
|
||||
Privileged: false,
|
||||
},
|
||||
},
|
||||
}
|
||||
hostNamespace := &runtime.PodSandboxConfig{
|
||||
Linux: &runtime.LinuxPodSandboxConfig{
|
||||
SecurityContext: &runtime.LinuxSandboxSecurityContext{
|
||||
hostNamespace := &criruntime.PodSandboxConfig{
|
||||
Linux: &criruntime.LinuxPodSandboxConfig{
|
||||
SecurityContext: &criruntime.LinuxSandboxSecurityContext{
|
||||
Privileged: false,
|
||||
NamespaceOptions: &runtime.NamespaceOption{
|
||||
Network: runtime.NamespaceMode_NODE,
|
||||
Pid: runtime.NamespaceMode_NODE,
|
||||
Ipc: runtime.NamespaceMode_NODE,
|
||||
NamespaceOptions: &criruntime.NamespaceOption{
|
||||
Network: criruntime.NamespaceMode_NODE,
|
||||
Pid: criruntime.NamespaceMode_NODE,
|
||||
Ipc: criruntime.NamespaceMode_NODE,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
config *runtime.PodSandboxConfig
|
||||
config *criruntime.PodSandboxConfig
|
||||
want bool
|
||||
}{
|
||||
{"Security Context is nil", nil, false},
|
||||
|
@ -385,3 +386,125 @@ func TestHostAccessingSandbox(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckLocalImagePullConfigs(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
imageConfigFn func(*ImageConfig)
|
||||
expectLocalPull bool
|
||||
}{
|
||||
{
|
||||
name: "already using local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.UseLocalImagePull = true
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "no conflicting configs",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.Snapshotter = "overlayfs"
|
||||
ic.ImagePullProgressTimeout = "5m"
|
||||
ic.StatsCollectPeriod = 10
|
||||
},
|
||||
expectLocalPull: false,
|
||||
},
|
||||
{
|
||||
name: "DisableSnapshotAnnotations triggers local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
if runtime.GOOS == "windows" {
|
||||
ic.DisableSnapshotAnnotations = true
|
||||
} else {
|
||||
ic.DisableSnapshotAnnotations = false
|
||||
}
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "DiscardUnpackedLayers triggers local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.DiscardUnpackedLayers = true
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "Registry.Mirrors triggers local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.Registry.Mirrors = map[string]Mirror{
|
||||
"docker.io": {
|
||||
Endpoints: []string{"https://mirror.example.com"},
|
||||
},
|
||||
}
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "Registry.Configs triggers local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.Registry.Configs = map[string]RegistryConfig{
|
||||
"docker.io": {
|
||||
Auth: &AuthConfig{
|
||||
Username: "user",
|
||||
Password: "pass",
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "Registry.Auths triggers local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.Registry.Auths = map[string]AuthConfig{
|
||||
"https://docker.io": {
|
||||
Username: "user",
|
||||
Password: "pass",
|
||||
},
|
||||
}
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "MaxConcurrentDownloads triggers local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.MaxConcurrentDownloads = 5
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "MaxConcurrentDownloads when set to 3 don't trigger local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.MaxConcurrentDownloads = 3
|
||||
},
|
||||
expectLocalPull: false,
|
||||
},
|
||||
{
|
||||
name: "ImagePullWithSyncFs triggers local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.ImagePullWithSyncFs = true
|
||||
},
|
||||
expectLocalPull: true,
|
||||
},
|
||||
{
|
||||
name: "Registry.ConfigPath and Headers don't trigger local pull",
|
||||
imageConfigFn: func(ic *ImageConfig) {
|
||||
ic.Registry.ConfigPath = "/etc/containerd/certs.d"
|
||||
ic.Registry.Headers = map[string][]string{
|
||||
"User-Agent": {"containerd/2.x"},
|
||||
}
|
||||
},
|
||||
expectLocalPull: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
config := DefaultImageConfig()
|
||||
imageConfig := &config
|
||||
tc.imageConfigFn(imageConfig)
|
||||
CheckLocalImagePullConfigs(ctx, imageConfig)
|
||||
assert.Equal(t, tc.expectLocalPull, imageConfig.UseLocalImagePull)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/containerd/imgcrypt/v2"
|
||||
"github.com/containerd/imgcrypt/v2/images/encryption"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/platforms"
|
||||
distribution "github.com/distribution/reference"
|
||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
@ -44,6 +45,9 @@ import (
|
|||
containerdimages "github.com/containerd/containerd/v2/core/images"
|
||||
"github.com/containerd/containerd/v2/core/remotes/docker"
|
||||
"github.com/containerd/containerd/v2/core/remotes/docker/config"
|
||||
"github.com/containerd/containerd/v2/core/transfer"
|
||||
transferimage "github.com/containerd/containerd/v2/core/transfer/image"
|
||||
"github.com/containerd/containerd/v2/core/transfer/registry"
|
||||
"github.com/containerd/containerd/v2/internal/cri/annotations"
|
||||
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
||||
crilabels "github.com/containerd/containerd/v2/internal/cri/labels"
|
||||
|
@ -150,6 +154,7 @@ func (c *CRIImageService) PullImage(ctx context.Context, name string, credential
|
|||
if err != nil {
|
||||
return "", fmt.Errorf("failed to parse image reference %q: %w", name, err)
|
||||
}
|
||||
|
||||
ref := namedRef.String()
|
||||
if ref != name {
|
||||
log.G(ctx).Debugf("PullImage using normalized image ref: %q", ref)
|
||||
|
@ -160,61 +165,32 @@ func (c *CRIImageService) PullImage(ctx context.Context, name string, credential
|
|||
return "", fmt.Errorf("failed to parse image_pull_progress_timeout %q: %w", c.config.ImagePullProgressTimeout, err)
|
||||
}
|
||||
|
||||
var (
|
||||
pctx, pcancel = context.WithCancel(ctx)
|
||||
|
||||
pullReporter = newPullProgressReporter(ref, pcancel, imagePullProgressTimeout)
|
||||
|
||||
resolver = docker.NewResolver(docker.ResolverOptions{
|
||||
Headers: c.config.Registry.Headers,
|
||||
Hosts: c.registryHosts(ctx, credentials, pullReporter.optionUpdateClient),
|
||||
})
|
||||
)
|
||||
|
||||
defer pcancel()
|
||||
snapshotter, err := c.snapshotterFromPodSandboxConfig(ctx, ref, sandboxConfig)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
log.G(ctx).Debugf("PullImage %q with snapshotter %s", ref, snapshotter)
|
||||
|
||||
span.SetAttributes(
|
||||
tracing.Attribute("image.ref", ref),
|
||||
tracing.Attribute("snapshotter.name", snapshotter),
|
||||
)
|
||||
|
||||
labels := c.getLabels(ctx, ref)
|
||||
|
||||
pullOpts := []containerd.RemoteOpt{
|
||||
containerd.WithResolver(resolver),
|
||||
containerd.WithPullSnapshotter(snapshotter),
|
||||
containerd.WithPullUnpack,
|
||||
containerd.WithPullLabels(labels),
|
||||
containerd.WithMaxConcurrentDownloads(c.config.MaxConcurrentDownloads),
|
||||
containerd.WithUnpackOpts([]containerd.UnpackOpt{
|
||||
containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
|
||||
containerd.WithUnpackApplyOpts(diff.WithSyncFs(c.config.ImagePullWithSyncFs)),
|
||||
}),
|
||||
// If UseLocalImagePull is true, use client.Pull to pull the image, else use transfer service by default.
|
||||
//
|
||||
// Transfer service does not currently support all the CRI image config options.
|
||||
// TODO: Add support for DisableSnapshotAnnotations, DiscardUnpackedLayers, ImagePullWithSyncFs and unpackDuplicationSuppressor
|
||||
var image containerd.Image
|
||||
if c.config.UseLocalImagePull {
|
||||
image, err = c.pullImageWithLocalPull(ctx, ref, credentials, snapshotter, labels, imagePullProgressTimeout)
|
||||
} else {
|
||||
image, err = c.pullImageWithTransferService(ctx, ref, credentials, snapshotter, labels, imagePullProgressTimeout)
|
||||
}
|
||||
|
||||
// Temporarily removed for v2 upgrade
|
||||
//pullOpts = append(pullOpts, c.encryptedImagesPullOpts()...)
|
||||
if !c.config.DisableSnapshotAnnotations {
|
||||
pullOpts = append(pullOpts,
|
||||
containerd.WithImageHandlerWrapper(snpkg.AppendInfoHandlerWrapper(ref)))
|
||||
}
|
||||
|
||||
if c.config.DiscardUnpackedLayers {
|
||||
// Allows GC to clean layers up from the content store after unpacking
|
||||
pullOpts = append(pullOpts,
|
||||
containerd.WithChildLabelMap(containerdimages.ChildGCLabelsFilterLayers))
|
||||
}
|
||||
|
||||
pullReporter.start(pctx)
|
||||
image, err := c.client.Pull(pctx, ref, pullOpts...)
|
||||
pcancel()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
span.AddEvent("Pull and unpack image complete")
|
||||
|
||||
configDesc, err := image.Config(ctx)
|
||||
|
@ -255,6 +231,104 @@ func (c *CRIImageService) PullImage(ctx context.Context, name string, credential
|
|||
return imageID, nil
|
||||
}
|
||||
|
||||
// pullImageWithLocalPull handles image pulling using the local client.
|
||||
func (c *CRIImageService) pullImageWithLocalPull(
|
||||
ctx context.Context,
|
||||
ref string,
|
||||
credentials func(string) (string, string, error),
|
||||
snapshotter string,
|
||||
labels map[string]string,
|
||||
imagePullProgressTimeout time.Duration,
|
||||
) (containerd.Image, error) {
|
||||
pctx, pcancel := context.WithCancel(ctx)
|
||||
defer pcancel()
|
||||
pullReporter := newPullProgressReporter(ref, pcancel, imagePullProgressTimeout)
|
||||
resolver := docker.NewResolver(docker.ResolverOptions{
|
||||
Headers: c.config.Registry.Headers,
|
||||
Hosts: c.registryHosts(ctx, credentials, pullReporter.optionUpdateClient),
|
||||
})
|
||||
|
||||
log.G(ctx).Debugf("PullImage %q with snapshotter %s using client.Pull()", ref, snapshotter)
|
||||
pullOpts := []containerd.RemoteOpt{
|
||||
containerd.WithResolver(resolver),
|
||||
containerd.WithPullSnapshotter(snapshotter),
|
||||
containerd.WithPullUnpack,
|
||||
containerd.WithPullLabels(labels),
|
||||
containerd.WithMaxConcurrentDownloads(c.config.MaxConcurrentDownloads),
|
||||
containerd.WithUnpackOpts([]containerd.UnpackOpt{
|
||||
containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
|
||||
containerd.WithUnpackApplyOpts(diff.WithSyncFs(c.config.ImagePullWithSyncFs)),
|
||||
}),
|
||||
}
|
||||
|
||||
// Temporarily removed for v2 upgrade
|
||||
//pullOpts = append(pullOpts, c.encryptedImagesPullOpts()...)
|
||||
if !c.config.DisableSnapshotAnnotations {
|
||||
pullOpts = append(pullOpts,
|
||||
containerd.WithImageHandlerWrapper(snpkg.AppendInfoHandlerWrapper(ref)))
|
||||
}
|
||||
|
||||
if c.config.DiscardUnpackedLayers {
|
||||
// Allows GC to clean layers up from the content store after unpacking
|
||||
pullOpts = append(pullOpts,
|
||||
containerd.WithChildLabelMap(containerdimages.ChildGCLabelsFilterLayers))
|
||||
}
|
||||
|
||||
pullReporter.start(pctx)
|
||||
image, err := c.client.Pull(pctx, ref, pullOpts...)
|
||||
pcancel()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
|
||||
}
|
||||
return image, nil
|
||||
}
|
||||
|
||||
// pullImageWithTransferService handles image pulling using the transfer service.
|
||||
func (c *CRIImageService) pullImageWithTransferService(
|
||||
ctx context.Context,
|
||||
ref string,
|
||||
credentials func(string) (string, string, error),
|
||||
snapshotter string,
|
||||
labels map[string]string,
|
||||
imagePullProgressTimeout time.Duration,
|
||||
) (containerd.Image, error) {
|
||||
log.G(ctx).Debugf("PullImage %q with snapshotter %s using transfer service", ref, snapshotter)
|
||||
rctx, rcancel := context.WithCancel(ctx)
|
||||
defer rcancel()
|
||||
transferProgressReporter := newTransferProgressReporter(ref, rcancel, imagePullProgressTimeout)
|
||||
|
||||
// Set image store opts
|
||||
var sopts []transferimage.StoreOpt
|
||||
sopts = append(sopts, transferimage.WithPlatforms(platforms.DefaultSpec()))
|
||||
sopts = append(sopts, transferimage.WithUnpack(platforms.DefaultSpec(), snapshotter))
|
||||
sopts = append(sopts, transferimage.WithImageLabels(labels))
|
||||
is := transferimage.NewStore(ref, sopts...)
|
||||
log.G(ctx).Debugf("Getting new CRI credentials")
|
||||
ch := newCRICredentials(ref, credentials)
|
||||
opts := []registry.Opt{registry.WithCredentials(ch)}
|
||||
opts = append(opts, registry.WithHeaders(c.config.Registry.Headers))
|
||||
opts = append(opts, registry.WithHostDir(c.config.Registry.ConfigPath))
|
||||
reg, err := registry.NewOCIRegistry(ctx, ref, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create OCI registry: %w", err)
|
||||
}
|
||||
|
||||
transferProgressReporter.start(rctx)
|
||||
log.G(ctx).Debugf("Calling cri transfer service")
|
||||
err = c.transferrer.Transfer(rctx, reg, is, transfer.WithProgress(transferProgressReporter.createProgressFunc(rctx)))
|
||||
rcancel()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to pull and unpack image %q: %w", ref, err)
|
||||
}
|
||||
|
||||
// Image should be pulled, unpacked and present in containerd image store at this moment
|
||||
image, err := c.client.GetImage(ctx, ref)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get image %q from containerd image store: %w", ref, err)
|
||||
}
|
||||
return image, nil
|
||||
}
|
||||
|
||||
// ParseAuth parses AuthConfig and returns username and password/secret required by containerd.
|
||||
func ParseAuth(auth *runtime.AuthConfig, host string) (string, string, error) {
|
||||
if auth == nil {
|
||||
|
@ -775,3 +849,186 @@ func (c *CRIImageService) snapshotterFromPodSandboxConfig(ctx context.Context, i
|
|||
|
||||
return snapshotter, nil
|
||||
}
|
||||
|
||||
type criCredentials struct {
|
||||
ref string
|
||||
credentials func(string) (string, string, error)
|
||||
}
|
||||
|
||||
func newCRICredentials(ref string, credentials func(string) (string, string, error)) registry.CredentialHelper {
|
||||
return &criCredentials{
|
||||
ref: ref,
|
||||
credentials: credentials,
|
||||
}
|
||||
}
|
||||
|
||||
// GetCredentials gets credential from criCredentials makes criCredentials a registry.CredentialHelper
|
||||
func (cc *criCredentials) GetCredentials(ctx context.Context, ref string, host string) (registry.Credentials, error) {
|
||||
if cc.credentials == nil {
|
||||
return registry.Credentials{}, fmt.Errorf("credential handler not initialized for ref %q", ref)
|
||||
}
|
||||
|
||||
if ref != cc.ref {
|
||||
return registry.Credentials{}, fmt.Errorf("invalid ref %q, expected %q", ref, cc.ref)
|
||||
}
|
||||
|
||||
username, secret, err := cc.credentials(host)
|
||||
if err != nil {
|
||||
return registry.Credentials{}, fmt.Errorf("failed to get credentials for %q: %w", host, err)
|
||||
}
|
||||
return registry.Credentials{
|
||||
Host: host,
|
||||
Username: username,
|
||||
Secret: secret,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type transferProgressReporter struct {
|
||||
ref string
|
||||
pc chan transfer.Progress
|
||||
cancel context.CancelFunc
|
||||
timeout time.Duration
|
||||
reqReporter pullRequestReporter
|
||||
statuses map[string]*transfer.Progress
|
||||
lastSeenBytesRead uint64
|
||||
lastSeenTimestamp time.Time
|
||||
}
|
||||
|
||||
func newTransferProgressReporter(ref string, cancel context.CancelFunc, timeout time.Duration) *transferProgressReporter {
|
||||
return &transferProgressReporter{
|
||||
ref: ref,
|
||||
cancel: cancel,
|
||||
timeout: timeout,
|
||||
pc: make(chan transfer.Progress),
|
||||
statuses: make(map[string]*transfer.Progress),
|
||||
}
|
||||
}
|
||||
|
||||
func (reporter *transferProgressReporter) handleProgress(p transfer.Progress) {
|
||||
// We only need to handle Progress Nodes that represent
|
||||
// valid requests to a remote registry, so Progress nodes
|
||||
// without 'Name', 'Desc' or 'Total' can be ignored
|
||||
if p.Name == "" || p.Desc == nil || p.Total == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
switch p.Event {
|
||||
case "waiting":
|
||||
// 'Waiting' events can be either when the layer is waiting to be
|
||||
// downloaded and no progress has been made. Or when we have made
|
||||
// some progress but `waiting` for more content to be downloaded.
|
||||
if p.Progress == 0 {
|
||||
return
|
||||
}
|
||||
fallthrough // Handle non-zero waiting progress same as downloading
|
||||
case "downloading":
|
||||
var curProgress int64
|
||||
if node, ok := reporter.statuses[p.Name]; !ok {
|
||||
curProgress = p.Progress
|
||||
reporter.reqReporter.incRequest()
|
||||
} else {
|
||||
curProgress = p.Progress - node.Progress
|
||||
}
|
||||
reporter.statuses[p.Name] = &p
|
||||
if curProgress > 0 {
|
||||
reporter.IncBytesRead(curProgress)
|
||||
}
|
||||
|
||||
// Download may be complete, but waiting for content
|
||||
// to be written. In this case, we no longer consider it
|
||||
// as an active requests.
|
||||
if p.Progress == p.Total {
|
||||
reporter.reqReporter.decRequest()
|
||||
delete(reporter.statuses, p.Name)
|
||||
}
|
||||
|
||||
case "complete":
|
||||
if node, exists := reporter.statuses[p.Name]; exists {
|
||||
if curProgress := p.Progress - node.Progress; curProgress > 0 {
|
||||
reporter.IncBytesRead(curProgress)
|
||||
}
|
||||
reporter.reqReporter.decRequest()
|
||||
delete(reporter.statuses, p.Name)
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (reporter *transferProgressReporter) IncBytesRead(bytes int64) {
|
||||
reporter.reqReporter.incByteRead(uint64(bytes))
|
||||
}
|
||||
|
||||
func (reporter *transferProgressReporter) start(ctx context.Context) {
|
||||
if reporter.timeout == 0 {
|
||||
log.G(ctx).Infof("no timeout and will not start pulling image %s reporter", reporter.ref)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
var (
|
||||
reportInterval = defaultPullProgressReportInterval
|
||||
)
|
||||
|
||||
reporter.lastSeenBytesRead = uint64(0)
|
||||
reporter.lastSeenTimestamp = time.Now()
|
||||
|
||||
// check progress more frequently if timeout < default internal
|
||||
if reporter.timeout < reportInterval {
|
||||
reportInterval = reporter.timeout / 2
|
||||
}
|
||||
|
||||
var ticker = time.NewTicker(reportInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case p := <-reporter.pc:
|
||||
reporter.handleProgress(p)
|
||||
case <-ticker.C:
|
||||
reporter.checkProgress(ctx, reportInterval)
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
activeReqs, bytesRead := reporter.reqReporter.status()
|
||||
log.G(ctx).Infof("stop pulling image %s: active requests=%v, bytes read=%v", reporter.ref, activeReqs, bytesRead)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (reporter *transferProgressReporter) checkProgress(ctx context.Context, reportInterval time.Duration) {
|
||||
activeReqs, bytesRead := reporter.reqReporter.status()
|
||||
|
||||
lastSeenBytesRead := reporter.lastSeenBytesRead
|
||||
lastSeenTimestamp := reporter.lastSeenTimestamp
|
||||
|
||||
log.G(ctx).WithField("ref", reporter.ref).
|
||||
WithField("activeReqs", activeReqs).
|
||||
WithField("totalBytesRead", bytesRead).
|
||||
WithField("lastSeenBytesRead", lastSeenBytesRead).
|
||||
WithField("lastSeenTimestamp", lastSeenTimestamp.Format(time.RFC3339)).
|
||||
WithField("reportInterval", reportInterval).
|
||||
Debugf("progress for image pull")
|
||||
|
||||
if activeReqs == 0 || bytesRead > lastSeenBytesRead {
|
||||
reporter.lastSeenBytesRead = bytesRead
|
||||
reporter.lastSeenTimestamp = time.Now()
|
||||
return
|
||||
}
|
||||
|
||||
if time.Since(lastSeenTimestamp) > reporter.timeout {
|
||||
log.G(ctx).Errorf("cancel pulling image %s because of no progress in %v", reporter.ref, reporter.timeout)
|
||||
reporter.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func (reporter *transferProgressReporter) createProgressFunc(ctx context.Context) transfer.ProgressFunc {
|
||||
return func(p transfer.Progress) {
|
||||
select {
|
||||
case reporter.pc <- p:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,15 +20,18 @@ import (
|
|||
"context"
|
||||
"encoding/base64"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/platforms"
|
||||
|
||||
"github.com/containerd/containerd/v2/core/transfer"
|
||||
"github.com/containerd/containerd/v2/internal/cri/annotations"
|
||||
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
||||
"github.com/containerd/containerd/v2/internal/cri/labels"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func TestParseAuth(t *testing.T) {
|
||||
|
@ -490,3 +493,247 @@ func TestImageGetLabels(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransferProgressReporter(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
setup func(*transferProgressReporter) chan struct{}
|
||||
progress []transfer.Progress
|
||||
check func(*testing.T, *transferProgressReporter, <-chan struct{})
|
||||
}{
|
||||
{
|
||||
name: "PullImageWithCompleteEvent",
|
||||
progress: []transfer.Progress{
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 1000,
|
||||
Event: "complete",
|
||||
},
|
||||
},
|
||||
check: func(t *testing.T, r *transferProgressReporter, cancelCalled <-chan struct{}) {
|
||||
activeReqs, totalBytesRead := r.reqReporter.status()
|
||||
assert.Equal(t, int32(0), activeReqs, "Expected 0 active requests")
|
||||
assert.Equal(t, uint64(1000), totalBytesRead, "Expected 1000 bytes read")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "FinishedDownloadingWithNoCompleteEvent",
|
||||
progress: []transfer.Progress{
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 1000,
|
||||
Event: "downloading",
|
||||
},
|
||||
},
|
||||
check: func(t *testing.T, r *transferProgressReporter, cancelCalled <-chan struct{}) {
|
||||
activeReqs, totalBytesRead := r.reqReporter.status()
|
||||
assert.Equal(t, int32(0), activeReqs, "Expected 0 active requests")
|
||||
assert.Equal(t, uint64(1000), totalBytesRead, "Expected 1000 bytes read")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "NilDescriptorInProgressNode",
|
||||
progress: []transfer.Progress{
|
||||
{
|
||||
Name: "layer1",
|
||||
Total: 1000,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
},
|
||||
check: func(t *testing.T, r *transferProgressReporter, cancelCalled <-chan struct{}) {
|
||||
assert.Equal(t, int32(0), r.reqReporter.activeReqs, "Expected zero active request")
|
||||
assert.Equal(t, uint64(0), r.reqReporter.totalBytesRead, "Expected zero bytes read")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "EmptyTotalInProgressNode",
|
||||
progress: []transfer.Progress{
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 0,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
},
|
||||
check: func(t *testing.T, r *transferProgressReporter, cancelCalled <-chan struct{}) {
|
||||
activeReqs, totalBytesRead := r.reqReporter.status()
|
||||
assert.Equal(t, int32(0), activeReqs, "Expected zero active request")
|
||||
assert.Equal(t, uint64(0), totalBytesRead, "Expected zero bytes read")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "TimeoutDuringPull",
|
||||
setup: func(r *transferProgressReporter) chan struct{} {
|
||||
r.timeout = 100 * time.Millisecond
|
||||
|
||||
cancelCalled := make(chan struct{})
|
||||
originalCancel := r.cancel
|
||||
r.cancel = func() {
|
||||
originalCancel()
|
||||
close(cancelCalled)
|
||||
}
|
||||
|
||||
return cancelCalled
|
||||
},
|
||||
progress: []transfer.Progress{
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
},
|
||||
check: func(t *testing.T, r *transferProgressReporter, cancelCalled <-chan struct{}) {
|
||||
select {
|
||||
case <-cancelCalled:
|
||||
// Expected behavior: cancel was called
|
||||
case <-time.After(150 * time.Millisecond):
|
||||
t.Error("Cancel function was not called within the expected timeframe")
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "MultipleRequests",
|
||||
progress: []transfer.Progress{
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef1",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 500,
|
||||
Event: "downloading",
|
||||
},
|
||||
{
|
||||
Name: "layer2",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef2",
|
||||
Size: 2000,
|
||||
},
|
||||
Total: 2000,
|
||||
Progress: 1000,
|
||||
Event: "downloading",
|
||||
},
|
||||
{
|
||||
Name: "layer1",
|
||||
Desc: &ocispec.Descriptor{
|
||||
MediaType: "application/vnd.oci.image.layer.v1.tar+gzip",
|
||||
Digest: "sha256:abcdef1",
|
||||
Size: 1000,
|
||||
},
|
||||
Total: 1000,
|
||||
Progress: 1000,
|
||||
Event: "complete",
|
||||
},
|
||||
},
|
||||
check: func(t *testing.T, r *transferProgressReporter, cancelCalled <-chan struct{}) {
|
||||
activeReqs, totalBytesRead := r.reqReporter.status()
|
||||
assert.Equal(t, int32(1), activeReqs, "Expected one active request")
|
||||
assert.Equal(t, uint64(2000), totalBytesRead, "Expected 2000 bytes read")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
reporter := &transferProgressReporter{
|
||||
reqReporter: pullRequestReporter{},
|
||||
pc: make(chan transfer.Progress),
|
||||
statuses: make(map[string]*transfer.Progress),
|
||||
ref: "test-image:latest",
|
||||
timeout: 30 * time.Second,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
var cancelCalled chan struct{}
|
||||
if tt.setup != nil {
|
||||
cancelCalled = tt.setup(reporter)
|
||||
}
|
||||
|
||||
go reporter.start(ctx)
|
||||
|
||||
for _, progress := range tt.progress {
|
||||
reporter.pc <- progress
|
||||
time.Sleep(50 * time.Millisecond) // Allow some time for processing
|
||||
}
|
||||
|
||||
if tt.check != nil {
|
||||
tt.check(t, reporter, cancelCalled)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,16 +24,17 @@ import (
|
|||
"github.com/containerd/containerd/v2/core/content"
|
||||
"github.com/containerd/containerd/v2/core/images"
|
||||
"github.com/containerd/containerd/v2/core/snapshots"
|
||||
"github.com/containerd/containerd/v2/core/transfer"
|
||||
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
||||
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
|
||||
snapshotstore "github.com/containerd/containerd/v2/internal/cri/store/snapshot"
|
||||
"github.com/containerd/containerd/v2/internal/kmutex"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/platforms"
|
||||
|
||||
docker "github.com/distribution/reference"
|
||||
imagedigest "github.com/opencontainers/go-digest"
|
||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
)
|
||||
|
||||
|
@ -65,6 +66,8 @@ type CRIImageService struct {
|
|||
imageStore *imagestore.Store
|
||||
// snapshotStore stores information of all snapshots.
|
||||
snapshotStore *snapshotstore.Store
|
||||
// transferrer is used to pull image with transfer service
|
||||
transferrer transfer.Transferrer
|
||||
// unpackDuplicationSuppressor is used to make sure that there is only
|
||||
// one in-flight fetch request or unpack handler for a given descriptor's
|
||||
// or chain ID.
|
||||
|
@ -87,6 +90,8 @@ type CRIImageServiceOptions struct {
|
|||
Snapshotters map[string]snapshots.Snapshotter
|
||||
|
||||
Client imageClient
|
||||
|
||||
Transferrer transfer.Transferrer
|
||||
}
|
||||
|
||||
// NewService creates a new CRI Image Service
|
||||
|
@ -108,6 +113,7 @@ func NewService(config criconfig.ImageConfig, options *CRIImageServiceOptions) (
|
|||
imageFSPaths: options.ImageFSPaths,
|
||||
runtimePlatforms: options.RuntimePlatforms,
|
||||
snapshotStore: snapshotstore.NewStore(),
|
||||
transferrer: options.Transferrer,
|
||||
unpackDuplicationSuppressor: kmutex.New(),
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/core/metadata"
|
||||
"github.com/containerd/containerd/v2/core/snapshots"
|
||||
"github.com/containerd/containerd/v2/core/transfer"
|
||||
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
||||
"github.com/containerd/containerd/v2/internal/cri/constants"
|
||||
"github.com/containerd/containerd/v2/internal/cri/server/images"
|
||||
|
@ -49,6 +50,7 @@ func init() {
|
|||
plugins.SandboxStorePlugin,
|
||||
plugins.ServicePlugin, // For client
|
||||
plugins.SnapshotPlugin, // For root directory properties
|
||||
plugins.TransferPlugin, // For pulling image using transfer service
|
||||
plugins.WarningPlugin,
|
||||
},
|
||||
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
|
@ -71,11 +73,21 @@ func init() {
|
|||
}
|
||||
}
|
||||
|
||||
if !config.UseLocalImagePull {
|
||||
criconfig.CheckLocalImagePullConfigs(ic.Context, &config)
|
||||
}
|
||||
|
||||
ts, err := ic.GetSingle(plugins.TransferPlugin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options := &images.CRIImageServiceOptions{
|
||||
Content: mdb.ContentStore(),
|
||||
RuntimePlatforms: map[string]images.ImagePlatform{},
|
||||
Snapshotters: map[string]snapshots.Snapshotter{},
|
||||
ImageFSPaths: map[string]string{},
|
||||
Transferrer: ts.(transfer.Transferrer),
|
||||
}
|
||||
|
||||
ctrdCli, err := containerd.New(
|
||||
|
|
|
@ -140,6 +140,11 @@ func init() {
|
|||
return nil, fmt.Errorf("no matching diff plugins: %w", errdefs.ErrNotFound)
|
||||
}
|
||||
|
||||
// If CheckPlatformSupported is false, we will match all platforms
|
||||
if !config.CheckPlatformSupported {
|
||||
target = platforms.All
|
||||
}
|
||||
|
||||
up := unpack.Platform{
|
||||
Platform: target,
|
||||
SnapshotterKey: uc.Snapshotter,
|
||||
|
@ -163,6 +168,9 @@ type transferConfig struct {
|
|||
// MaxConcurrentUploadedLayers is the max concurrent uploads for push
|
||||
MaxConcurrentUploadedLayers int `toml:"max_concurrent_uploaded_layers"`
|
||||
|
||||
// CheckPlatformSupported enables platform check specified in UnpackConfiguration
|
||||
CheckPlatformSupported bool `toml:"check_platform_supported"`
|
||||
|
||||
// UnpackConfiguration is used to read config from toml
|
||||
UnpackConfiguration []unpackConfiguration `toml:"unpack_config,omitempty"`
|
||||
|
||||
|
@ -185,5 +193,6 @@ func defaultConfig() *transferConfig {
|
|||
return &transferConfig{
|
||||
MaxConcurrentDownloads: 3,
|
||||
MaxConcurrentUploadedLayers: 3,
|
||||
CheckPlatformSupported: false,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ const (
|
|||
RuntimePlugin plugin.Type = "io.containerd.runtime.v1"
|
||||
// RuntimePluginV2 implements a runtime v2
|
||||
RuntimePluginV2 plugin.Type = "io.containerd.runtime.v2"
|
||||
// ServicePlugin implements a internal service
|
||||
// ServicePlugin implements an internal service
|
||||
ServicePlugin plugin.Type = "io.containerd.service.v1"
|
||||
// GRPCPlugin implements a grpc service
|
||||
GRPCPlugin plugin.Type = "io.containerd.grpc.v1"
|
||||
|
@ -55,7 +55,7 @@ const (
|
|||
LeasePlugin plugin.Type = "io.containerd.lease.v1"
|
||||
// StreamingPlugin implements a stream manager
|
||||
StreamingPlugin plugin.Type = "io.containerd.streaming.v1"
|
||||
// TracingProcessorPlugin implements a open telemetry span processor
|
||||
// TracingProcessorPlugin implements an open telemetry span processor
|
||||
TracingProcessorPlugin plugin.Type = "io.containerd.tracing.processor.v1"
|
||||
// NRIApiPlugin implements the NRI adaptation interface for containerd.
|
||||
NRIApiPlugin plugin.Type = "io.containerd.nri.v1"
|
||||
|
|
Loading…
Reference in New Issue