add concurrency control when pull image in daemon (#1918)

* add concurrency control when pull image in daemon
add ut for daemon and test limited worker pool
try to ignore pkg client code coverage

Signed-off-by: Abner-1 <yuanyuxing.yyx@alibaba-inc.com>

* use chan worker pool as default daemon worker pool

Signed-off-by: Abner-1 <yuanyuxing.yyx@alibaba-inc.com>

---------

Signed-off-by: Abner-1 <yuanyuxing.yyx@alibaba-inc.com>
This commit is contained in:
Abner 2025-04-16 21:57:24 +08:00 committed by GitHub
parent a79a4fb21e
commit 318165b7ea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 576 additions and 45 deletions

View File

@ -5,8 +5,8 @@ on:
branches:
- master
- release-*
pull_request: {}
workflow_dispatch: {}
pull_request: { }
workflow_dispatch: { }
# Declare default permissions as read only.
permissions: read-all
@ -91,7 +91,6 @@ jobs:
# run: find ./ -name "*.sh" | grep -v vendor | xargs shellcheck
# - name: Lint markdown files
# run: find ./ -name "*.md" | grep -v vendor | grep -v commandline | grep -v .github | grep -v swagger | grep -v api | xargs mdl -r ~MD010,~MD013,~MD014,~MD022,~MD024,~MD029,~MD031,~MD032,~MD033,~MD036
# - name: Check markdown links
# run: |
# set +e

View File

@ -52,14 +52,16 @@ lint: golangci-lint ## Run golangci-lint against code.
test: generate fmt vet manifests envtest ## Run tests
echo $(ENVTEST)
go build -o pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin pkg/daemon/criruntime/imageruntime/fake_plugin/main.go && chmod +x pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile raw-cover.out
rm pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin
grep -v "pkg/client" raw-cover.out > cover.out
atest:
echo $(ENVTEST)
go build -o pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin pkg/daemon/criruntime/imageruntime/fake_plugin/main.go && chmod +x pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile cover.out
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile raw-cover.out
rm pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin
grep -v "pkg/client" raw-cover.out > cover.out
coverage-report: ## Generate cover.html from cover.out
go tool cover -html=cover.out -o cover.html

View File

@ -130,6 +130,10 @@ type NodeImageStatus struct {
// +optional
Pulling int32 `json:"pulling"`
// The number of pulling tasks which are waiting.
// +optional
Waiting int32 `json:"waiting"`
// all statuses of active image pulling tasks
ImageStatuses map[string]ImageStatus `json:"imageStatuses,omitempty"`

View File

@ -47,6 +47,13 @@ var (
enablePprof = flag.Bool("enable-pprof", true, "Enable pprof for daemon.")
pluginConfigFile = flag.String("plugin-config-file", "/kruise/CredentialProviderPlugin.yaml", "The path of plugin config file.")
pluginBinDir = flag.String("plugin-bin-dir", "/kruise/plugins", "The path of directory of plugin binaries.")
// TODO: After the feature is stable, the default value should also be restricted, e.g. 5.
// Users can set this value to limit the number of workers for pulling images,
// preventing the consumption of all available disk IOPS or network bandwidth,
// which could otherwise impact the performance of other running pods.
maxWorkersForPullImage = flag.Int("max-workers-for-pull-image", -1, "The maximum number of workers for pulling images.")
)
func main() {
@ -71,7 +78,7 @@ func main() {
}()
}
ctx := signals.SetupSignalHandler()
d, err := daemon.NewDaemon(cfg, *bindAddr)
d, err := daemon.NewDaemon(cfg, *bindAddr, *maxWorkersForPullImage)
if err != nil {
klog.Fatalf("Failed to new daemon: %v", err)
}

View File

@ -312,6 +312,10 @@ spec:
description: The number of pulling tasks which reached phase Succeeded.
format: int32
type: integer
waiting:
description: The number of pulling tasks which are waiting.
format: int32
type: integer
required:
- desired
type: object

View File

@ -103,6 +103,7 @@ spec:
- --logtostderr=true
- -v=5
- --feature-gates=AllAlpha=true,AllBeta=true
- --max-workers-for-pull-image=2
image: controller:latest
imagePullPolicy: Always
securityContext:

View File

@ -23,7 +23,19 @@ import (
"net/http"
"sync"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
kruiseapis "github.com/openkruise/kruise/apis"
"github.com/openkruise/kruise/pkg/client"
@ -36,18 +48,6 @@ import (
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/features"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/prometheus/client_golang/prometheus/promhttp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
var (
@ -85,7 +85,7 @@ type daemon struct {
}
// NewDaemon create a daemon
func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) {
func NewDaemon(cfg *rest.Config, bindAddress string, MaxWorkersForPullImages int) (Daemon, error) {
if cfg == nil {
return nil, fmt.Errorf("cfg can not be nil")
}
@ -133,6 +133,8 @@ func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) {
PodInformer: podInformer,
RuntimeFactory: runtimeFactory,
Healthz: healthz,
MaxWorkersForPullImages: MaxWorkersForPullImages,
}
puller, err := imagepuller.NewController(opts, secretManager, cfg)

View File

@ -0,0 +1,49 @@
package imagepuller
import (
"sync"
"testing"
"time"
)
func TestChanPool_Submit_Start_Stop(t *testing.T) {
taskCount := 10
poolSize := 3
pool := NewChanPool(poolSize)
go pool.Start()
var wg sync.WaitGroup
wg.Add(taskCount)
executionCount := 0
mu := sync.Mutex{}
for i := 0; i < taskCount; i++ {
pool.Submit(func() {
defer wg.Done()
time.Sleep(10 * time.Millisecond)
mu.Lock()
executionCount++
mu.Unlock()
})
}
wg.Wait()
pool.Stop()
if executionCount != taskCount {
t.Errorf("expected %d tasks to be executed, but got %d", taskCount, executionCount)
}
}
func TestChanPool_StopWithoutTasks(t *testing.T) {
pool := NewChanPool(2)
go pool.Start()
pool.Stop()
select {
case <-time.After(100 * time.Millisecond):
t.Errorf("expected Stop to complete immediately, but it took too long")
default:
}
}

View File

@ -24,13 +24,6 @@ import (
"reflect"
"time"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client"
kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -45,6 +38,14 @@ import (
"k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/client"
kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned"
listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1"
daemonoptions "github.com/openkruise/kruise/pkg/daemon/options"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob"
)
type Controller struct {
@ -65,12 +66,12 @@ func NewController(opts daemonoptions.Options, secretManager daemonutil.SecretMa
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: genericClient.KubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(opts.Scheme, v1.EventSource{Component: "kruise-daemon-imagepuller", Host: opts.NodeName})
queue := workqueue.NewNamedRateLimitingQueue(
// Backoff duration from 500ms to 50~55s
// For nodeimage controller will mark a image:tag task failed (not responded for a long time) if daemon does not report status in 60s.
workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(rand.Intn(5000))),
"imagepuller",
)
// Backoff duration from 500ms to 50~55s
// For nodeimage controller will mark an image:tag task failed (not responded for a long time) if daemon does not report status in 60s.
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(rand.Intn(5000)))
queue := workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{
Name: "imagepuller",
})
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -94,6 +95,11 @@ func NewController(opts daemonoptions.Options, secretManager daemonutil.SecretMa
},
})
if opts.MaxWorkersForPullImages > 0 {
klog.InfoS("set image pull worker number", "worker", opts.MaxWorkersForPullImages)
workerLimitedPool = NewChanPool(opts.MaxWorkersForPullImages)
go workerLimitedPool.Start()
}
puller, err := newRealPuller(opts.RuntimeFactory.GetImageService(), secretManager, recorder)
if err != nil {
return nil, fmt.Errorf("failed to new puller: %v", err)
@ -165,6 +171,9 @@ func (c *Controller) Run(stop <-chan struct{}) {
klog.Info("Started puller controller successfully")
<-stop
if workerLimitedPool != nil {
workerLimitedPool.Stop()
}
}
// processNextWorkItem will read a single work item off the workqueue and
@ -244,6 +253,8 @@ func (c *Controller) sync(key string) (retErr error) {
newStatus.Failed++
case appsv1alpha1.ImagePhasePulling:
newStatus.Pulling++
case appsv1alpha1.ImagePhaseWaiting:
newStatus.Waiting++
}
}
}

View File

@ -0,0 +1,358 @@
package imagepuller
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime"
)
type fakeRuntime struct {
mu sync.Mutex
images map[string]*imageStatus
}
type imageStatus struct {
progress int
statusCh chan int
}
func (f *fakeRuntime) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (imageruntime.ImagePullStatusReader, error) {
f.mu.Lock()
defer f.mu.Unlock()
key := imageName + ":" + tag
if _, exist := f.images[key]; !exist {
f.images[key] = &imageStatus{
progress: 0,
statusCh: make(chan int, 10),
}
}
r := newFakeStatusReader(key, f.images[key])
go func() {
ticker := time.NewTicker(time.Second * 1)
defer ticker.Stop()
for {
select {
case <-ticker.C:
func() {
f.mu.Lock()
defer f.mu.Unlock()
val, exist := f.images[key]
if !exist {
return
}
if val.progress >= 100 {
r.ch <- imageruntime.ImagePullStatus{
Err: nil,
Process: 100,
DetailInfo: "finished",
Finish: true,
}
}
}()
case <-r.done:
func() {
f.mu.Lock()
defer f.mu.Unlock()
delete(f.images, key)
}()
return
}
}
}()
return &r, nil
}
func (f *fakeRuntime) ListImages(ctx context.Context) ([]imageruntime.ImageInfo, error) {
f.mu.Lock()
defer f.mu.Unlock()
var images []imageruntime.ImageInfo
for name := range f.images {
images = append(images, imageruntime.ImageInfo{ID: name})
}
return images, nil
}
func (f *fakeRuntime) increaseProgress(image string, delta int) {
key := image
f.mu.Lock()
defer f.mu.Unlock()
klog.Infof("increase progress %v +%v", key, delta)
if status, exist := f.images[key]; exist {
status.progress += delta
if status.progress > 100 {
status.progress = 100
}
status.statusCh <- status.progress
}
}
func (f *fakeRuntime) clean() {
f.mu.Lock()
defer f.mu.Unlock()
f.images = make(map[string]*imageStatus)
}
type fakeStatusReader struct {
image string
status *imageStatus
ch chan imageruntime.ImagePullStatus
mu sync.Mutex
closed bool
done chan struct{}
}
func newFakeStatusReader(image string, status *imageStatus) fakeStatusReader {
return fakeStatusReader{
image: image,
status: status,
ch: make(chan imageruntime.ImagePullStatus, 10),
done: make(chan struct{}),
}
}
func (r *fakeStatusReader) C() <-chan imageruntime.ImagePullStatus {
return r.ch
}
func (r *fakeStatusReader) Close() {
r.mu.Lock()
defer r.mu.Unlock()
if !r.closed {
klog.InfoS("Closing reader", "image", r.image)
close(r.done)
r.closed = true
}
}
type fakeSecret struct {
}
func (f *fakeSecret) GetSecrets(secret []appsv1alpha1.ReferenceObject) ([]v1.Secret, error) {
return nil, nil
}
// test case here
func realPullerSyncFn(t *testing.T, limitedPool bool) {
eventRecorder := record.NewFakeRecorder(100)
secretManager := fakeSecret{}
baseNodeImage := &appsv1alpha1.NodeImage{
Spec: appsv1alpha1.NodeImageSpec{
Images: map[string]appsv1alpha1.ImageSpec{
"nginx": {
Tags: []appsv1alpha1.ImageTagSpec{
{Tag: "latest", Version: 1},
},
},
},
},
}
testCases := []struct {
name string
prePools map[string]struct{}
inputSpec *appsv1alpha1.NodeImage
expectPools sets.String
expectErr bool
}{
{
name: "add new image",
inputSpec: baseNodeImage,
expectPools: sets.NewString("nginx"),
},
{
name: "remove image",
prePools: map[string]struct{}{"redis": {}},
inputSpec: baseNodeImage,
expectPools: sets.NewString("nginx"),
},
{
name: "add image",
prePools: map[string]struct{}{"nginx": {}},
inputSpec: &appsv1alpha1.NodeImage{
Spec: appsv1alpha1.NodeImageSpec{
Images: map[string]appsv1alpha1.ImageSpec{
"nginx": {
Tags: []appsv1alpha1.ImageTagSpec{
{Tag: "latest", Version: 1},
},
},
"busybox": {
Tags: []appsv1alpha1.ImageTagSpec{
{Tag: "1.15", Version: 1},
},
},
},
},
},
expectPools: sets.NewString("nginx", "busybox"),
},
}
fakeRuntime := &fakeRuntime{images: make(map[string]*imageStatus)}
workerLimitedPool = NewChanPool(2)
go workerLimitedPool.Start()
p, _ := newRealPuller(fakeRuntime, &secretManager, eventRecorder)
nameSuffuix := "_NoLimitPool"
if limitedPool {
nameSuffuix = "_LimitPool"
}
for _, tc := range testCases {
t.Run(tc.name+nameSuffuix, func(t *testing.T) {
for poolName := range tc.prePools {
p.workerPools[poolName] = newRealWorkerPool(poolName, fakeRuntime, &secretManager, eventRecorder)
}
ref, _ := reference.GetReference(scheme, tc.inputSpec)
err := p.Sync(tc.inputSpec, ref)
if (err != nil) != tc.expectErr {
t.Fatalf("expect error %v, but got %v", tc.expectErr, err)
}
actualPools := sets.String{}
for name := range p.workerPools {
p.workerPools[name].Stop()
actualPools.Insert(name)
}
if !actualPools.Equal(tc.expectPools) {
t.Errorf("expect pools %v, but got %v", tc.expectPools.List(), actualPools.List())
}
for prePool := range tc.prePools {
if !tc.expectPools.Has(prePool) {
if pool, exists := p.workerPools[prePool]; exists {
if pool.(*realWorkerPool).active {
t.Errorf("expect pool %s to be stopped", prePool)
}
}
}
}
})
}
t.Log("clean fake runtime")
time.Sleep(1 * time.Second)
infos, _ := fakeRuntime.ListImages(context.Background())
for _, info := range infos {
fakeRuntime.increaseProgress(info.ID, 100)
}
time.Sleep(2 * time.Second)
// clear errgroup
for _, val := range p.workerPools {
val.Stop()
}
fakeRuntime.clean()
}
func TestRealPullerSync(t *testing.T) {
realPullerSyncFn(t, false)
realPullerSyncFn(t, true)
}
var scheme *runtime.Scheme
func init() {
scheme = runtime.NewScheme()
utilruntime.Must(appsv1alpha1.AddToScheme(scheme))
utilruntime.Must(appsv1beta1.AddToScheme(scheme))
utilruntime.Must(policyv1alpha1.AddToScheme(scheme))
}
func TestRealPullerSyncWithLimitedPool(t *testing.T) {
eventRecorder := record.NewFakeRecorder(100)
secretManager := fakeSecret{}
baseNodeImage := &appsv1alpha1.NodeImage{
Spec: appsv1alpha1.NodeImageSpec{
Images: map[string]appsv1alpha1.ImageSpec{
"nginx": {
Tags: []appsv1alpha1.ImageTagSpec{
{Tag: "latest", Version: 1},
},
},
"busybox": {
Tags: []appsv1alpha1.ImageTagSpec{
{Tag: "latest", Version: 1},
},
},
"redis": {
Tags: []appsv1alpha1.ImageTagSpec{
{Tag: "latest", Version: 1},
},
},
},
},
}
r := &fakeRuntime{images: make(map[string]*imageStatus)}
workerLimitedPool = NewChanPool(2)
go workerLimitedPool.Start()
p, _ := newRealPuller(r, &secretManager, eventRecorder)
ref, _ := reference.GetReference(scheme, baseNodeImage)
err := p.Sync(baseNodeImage, ref)
assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
checkPhase := func(expectRunning, expectWaiting int) {
running, waiting := 0, 0
for imageName := range baseNodeImage.Spec.Images {
status := p.GetStatus(imageName)
t.Log(status)
if status == nil {
t.Errorf("expect image %s to be running", imageName)
continue
}
if status.Tags[0].Phase == appsv1alpha1.ImagePhasePulling {
running++
} else if status.Tags[0].Phase == appsv1alpha1.ImagePhaseWaiting {
waiting++
}
}
assert.Equal(t, expectRunning, running)
assert.Equal(t, expectWaiting, waiting)
}
// first pulling 2, and 1 waiting
checkPhase(2, 1)
infos, _ := r.ListImages(context.Background())
if len(infos) > 0 {
r.increaseProgress(infos[0].ID, 100)
}
time.Sleep(2 * time.Second)
// when 1 success, 2 pulling, 0 waiting
checkPhase(2, 0)
infos, _ = r.ListImages(context.Background())
for _, info := range infos {
r.increaseProgress(info.ID, 100)
}
time.Sleep(2 * time.Second)
// when 3 success, 0 pulling, 0 waiting
checkPhase(0, 0)
t.Log("clean fake runtime")
// clear errgroup
for _, val := range p.workerPools {
val.Stop()
}
r.clean()
}

View File

@ -23,15 +23,16 @@ import (
"sync"
"time"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/util"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"github.com/openkruise/kruise/pkg/util"
)
const (
@ -47,6 +48,8 @@ const (
PullImageFailed = "PullImageFailed"
)
var workerLimitedPool ImagePullWorkerPool
type puller interface {
Sync(obj *appsv1alpha1.NodeImage, ref *v1.ObjectReference) error
GetStatus(imageName string) *appsv1alpha1.ImageStatus
@ -264,6 +267,8 @@ func (w *realWorkerPool) UpdateStatus(status *appsv1alpha1.ImageTagStatus) {
}
func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, sandboxConfig *appsv1alpha1.SandboxConfig, secrets []v1.Secret, runtime runtimeimage.ImageService, statusUpdater imageStatusUpdater, ref *v1.ObjectReference, eventRecorder record.EventRecorder) *pullWorker {
image := name + ":" + tagSpec.Tag
klog.V(5).InfoS("new pull worker", "image", image)
o := &pullWorker{
name: name,
tagSpec: tagSpec,
@ -276,7 +281,31 @@ func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, sandboxConfig
active: true,
stopCh: make(chan struct{}),
}
go o.Run()
go func() {
newStatus := &appsv1alpha1.ImageTagStatus{
Tag: tagSpec.Tag,
Phase: appsv1alpha1.ImagePhaseWaiting,
Version: tagSpec.Version,
}
o.statusUpdater.UpdateStatus(newStatus)
klog.V(5).InfoS("pull worker waiting", "image", image)
fn := func() {
klog.V(5).InfoS("pull worker start", "image", image)
o.Run()
klog.V(5).InfoS("pull worker end", "image", image)
}
if workerLimitedPool != nil {
workerLimitedPool.Submit(func() {
fn()
})
} else {
// no limited_pool
klog.V(5).InfoS("pull worker without limited_pool", "image", image)
fn()
}
}()
return o
}
@ -444,8 +473,16 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im
// make it asynchronous for CRI runtime will block in pulling image
var statusReader runtimeimage.ImagePullStatusReader
pullChan := make(chan struct{})
// Why add this? Directly assigning to `statusReader` and `err` can cause race conditions.
// Case: When a worker is pulling data, stopping the pull worker might lead to concurrent writes to the local parameter `err`.
// (line 483 and 503)
// To avoid this, we introduce `readerCh` and `errCh` for communication between multiple goroutines.
readerCh := make(chan runtimeimage.ImagePullStatusReader, 1)
errCh := make(chan error, 1)
go func() {
statusReader, err = w.runtime.PullImage(ctx, w.name, tag, w.secrets, w.sandboxConfig)
statusReader, err := w.runtime.PullImage(ctx, w.name, tag, w.secrets, w.sandboxConfig)
readerCh <- statusReader
errCh <- err
close(pullChan)
}()
@ -453,6 +490,7 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im
select {
case <-pullChan:
}
statusReader := <-readerCh
if statusReader != nil {
statusReader.Close()
}
@ -468,6 +506,8 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im
klog.V(2).InfoS("Pulling image canceled", "name", w.name, "tag", tag)
return fmt.Errorf("pulling image %s:%s is canceled", w.name, tag)
case <-pullChan:
statusReader = <-readerCh
err = <-errCh
if err != nil {
return err
}

View File

@ -0,0 +1,50 @@
package imagepuller
import (
"sync"
"k8s.io/klog/v2"
)
type Task func()
type ImagePullWorkerPool interface {
Submit(fn Task)
Start()
Stop()
}
type chanPool struct {
queue chan Task
wg sync.WaitGroup
maxWorks int
}
func NewChanPool(n int) ImagePullWorkerPool {
return &chanPool{
queue: make(chan Task, n),
maxWorks: n,
}
}
func (p *chanPool) Submit(task Task) {
p.queue <- task
}
func (p *chanPool) Start() {
for i := 0; i < p.maxWorks; i++ {
go func() {
defer p.wg.Done()
for task := range p.queue {
task()
}
}()
p.wg.Add(1)
}
}
func (p *chanPool) Stop() {
close(p.queue)
p.wg.Wait()
klog.Info("all worker in image pull worker pool stopped")
}

View File

@ -17,11 +17,12 @@ limitations under the License.
package options
import (
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
runtimeclient "sigs.k8s.io/controller-runtime/pkg/client"
daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime"
daemonutil "github.com/openkruise/kruise/pkg/daemon/util"
)
type Options struct {
@ -32,4 +33,6 @@ type Options struct {
RuntimeFactory daemonruntime.Factory
Healthz *daemonutil.Healthz
MaxWorkersForPullImages int
}

View File

@ -363,7 +363,7 @@ func (c *Controller) syncUpdateNodePodProbeStatus() error {
klog.ErrorS(err, "NodePodProbe update status failed", "nodeName", c.nodeName)
return err
}
klog.InfoS("NodePodProbe(%s) update status success", "nodeName", c.nodeName, "from", commonutil.DumpJSON(npp.Status), "to", commonutil.DumpJSON(nppClone.Status))
klog.InfoS("NodePodProbe update status success", "nodeName", c.nodeName, "from", commonutil.DumpJSON(npp.Status), "to", commonutil.DumpJSON(nppClone.Status))
return nil
}

View File

@ -23,15 +23,16 @@ import (
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/test/e2e/framework"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
clientset "k8s.io/client-go/kubernetes"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/test/e2e/framework"
)
var _ = SIGDescribe("ContainerMeta", func() {