diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 20d735feb..1618a0f62 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -5,8 +5,8 @@ on: branches: - master - release-* - pull_request: {} - workflow_dispatch: {} + pull_request: { } + workflow_dispatch: { } # Declare default permissions as read only. permissions: read-all @@ -91,7 +91,6 @@ jobs: # run: find ./ -name "*.sh" | grep -v vendor | xargs shellcheck # - name: Lint markdown files # run: find ./ -name "*.md" | grep -v vendor | grep -v commandline | grep -v .github | grep -v swagger | grep -v api | xargs mdl -r ~MD010,~MD013,~MD014,~MD022,~MD024,~MD029,~MD031,~MD032,~MD033,~MD036 - # - name: Check markdown links # run: | # set +e diff --git a/Makefile b/Makefile index fb52264a7..fcc4ac8ac 100644 --- a/Makefile +++ b/Makefile @@ -52,14 +52,16 @@ lint: golangci-lint ## Run golangci-lint against code. test: generate fmt vet manifests envtest ## Run tests echo $(ENVTEST) go build -o pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin pkg/daemon/criruntime/imageruntime/fake_plugin/main.go && chmod +x pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile raw-cover.out rm pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin + grep -v "pkg/client" raw-cover.out > cover.out atest: echo $(ENVTEST) go build -o pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin pkg/daemon/criruntime/imageruntime/fake_plugin/main.go && chmod +x pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin - KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile cover.out + KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) -p path)" go test -race ./pkg/... -coverprofile raw-cover.out rm pkg/daemon/criruntime/imageruntime/fake_plugin/fake-credential-plugin + grep -v "pkg/client" raw-cover.out > cover.out coverage-report: ## Generate cover.html from cover.out go tool cover -html=cover.out -o cover.html diff --git a/apis/apps/v1alpha1/nodeimage_types.go b/apis/apps/v1alpha1/nodeimage_types.go index 1edaca18d..bdaddf023 100644 --- a/apis/apps/v1alpha1/nodeimage_types.go +++ b/apis/apps/v1alpha1/nodeimage_types.go @@ -130,6 +130,10 @@ type NodeImageStatus struct { // +optional Pulling int32 `json:"pulling"` + // The number of pulling tasks which are waiting. + // +optional + Waiting int32 `json:"waiting"` + // all statuses of active image pulling tasks ImageStatuses map[string]ImageStatus `json:"imageStatuses,omitempty"` diff --git a/cmd/daemon/main.go b/cmd/daemon/main.go index 594d16d73..0a6eb4590 100644 --- a/cmd/daemon/main.go +++ b/cmd/daemon/main.go @@ -47,6 +47,13 @@ var ( enablePprof = flag.Bool("enable-pprof", true, "Enable pprof for daemon.") pluginConfigFile = flag.String("plugin-config-file", "/kruise/CredentialProviderPlugin.yaml", "The path of plugin config file.") pluginBinDir = flag.String("plugin-bin-dir", "/kruise/plugins", "The path of directory of plugin binaries.") + + // TODO: After the feature is stable, the default value should also be restricted, e.g. 5. + + // Users can set this value to limit the number of workers for pulling images, + // preventing the consumption of all available disk IOPS or network bandwidth, + // which could otherwise impact the performance of other running pods. + maxWorkersForPullImage = flag.Int("max-workers-for-pull-image", -1, "The maximum number of workers for pulling images.") ) func main() { @@ -71,7 +78,7 @@ func main() { }() } ctx := signals.SetupSignalHandler() - d, err := daemon.NewDaemon(cfg, *bindAddr) + d, err := daemon.NewDaemon(cfg, *bindAddr, *maxWorkersForPullImage) if err != nil { klog.Fatalf("Failed to new daemon: %v", err) } diff --git a/config/crd/bases/apps.kruise.io_nodeimages.yaml b/config/crd/bases/apps.kruise.io_nodeimages.yaml index 9447d67f4..a39bc2d35 100644 --- a/config/crd/bases/apps.kruise.io_nodeimages.yaml +++ b/config/crd/bases/apps.kruise.io_nodeimages.yaml @@ -312,6 +312,10 @@ spec: description: The number of pulling tasks which reached phase Succeeded. format: int32 type: integer + waiting: + description: The number of pulling tasks which are waiting. + format: int32 + type: integer required: - desired type: object diff --git a/config/manager/manager.yaml b/config/manager/manager.yaml index 82ce7fef0..3a49b3bed 100644 --- a/config/manager/manager.yaml +++ b/config/manager/manager.yaml @@ -103,6 +103,7 @@ spec: - --logtostderr=true - -v=5 - --feature-gates=AllAlpha=true,AllBeta=true + - --max-workers-for-pull-image=2 image: controller:latest imagePullPolicy: Always securityContext: diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 249061907..7b2c0057f 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -23,7 +23,19 @@ import ( "net/http" "sync" + "github.com/prometheus/client_golang/prometheus/promhttp" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" + clientset "k8s.io/client-go/kubernetes" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/metrics" kruiseapis "github.com/openkruise/kruise/apis" "github.com/openkruise/kruise/pkg/client" @@ -36,18 +48,6 @@ import ( daemonutil "github.com/openkruise/kruise/pkg/daemon/util" "github.com/openkruise/kruise/pkg/features" utilfeature "github.com/openkruise/kruise/pkg/util/feature" - "github.com/prometheus/client_golang/prometheus/promhttp" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/watch" - clientset "k8s.io/client-go/kubernetes" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/metrics" ) var ( @@ -85,7 +85,7 @@ type daemon struct { } // NewDaemon create a daemon -func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) { +func NewDaemon(cfg *rest.Config, bindAddress string, MaxWorkersForPullImages int) (Daemon, error) { if cfg == nil { return nil, fmt.Errorf("cfg can not be nil") } @@ -133,6 +133,8 @@ func NewDaemon(cfg *rest.Config, bindAddress string) (Daemon, error) { PodInformer: podInformer, RuntimeFactory: runtimeFactory, Healthz: healthz, + + MaxWorkersForPullImages: MaxWorkersForPullImages, } puller, err := imagepuller.NewController(opts, secretManager, cfg) diff --git a/pkg/daemon/imagepuller/imagepull_worker_pool_test.go b/pkg/daemon/imagepuller/imagepull_worker_pool_test.go new file mode 100644 index 000000000..18616d35e --- /dev/null +++ b/pkg/daemon/imagepuller/imagepull_worker_pool_test.go @@ -0,0 +1,49 @@ +package imagepuller + +import ( + "sync" + "testing" + "time" +) + +func TestChanPool_Submit_Start_Stop(t *testing.T) { + taskCount := 10 + poolSize := 3 + pool := NewChanPool(poolSize) + go pool.Start() + var wg sync.WaitGroup + wg.Add(taskCount) + + executionCount := 0 + mu := sync.Mutex{} + + for i := 0; i < taskCount; i++ { + pool.Submit(func() { + defer wg.Done() + time.Sleep(10 * time.Millisecond) + mu.Lock() + executionCount++ + mu.Unlock() + }) + } + + wg.Wait() + pool.Stop() + + if executionCount != taskCount { + t.Errorf("expected %d tasks to be executed, but got %d", taskCount, executionCount) + } +} + +func TestChanPool_StopWithoutTasks(t *testing.T) { + pool := NewChanPool(2) + go pool.Start() + pool.Stop() + + select { + case <-time.After(100 * time.Millisecond): + t.Errorf("expected Stop to complete immediately, but it took too long") + default: + + } +} diff --git a/pkg/daemon/imagepuller/imagepuller_controller.go b/pkg/daemon/imagepuller/imagepuller_controller.go index 398f0c21d..5ea8d29e5 100644 --- a/pkg/daemon/imagepuller/imagepuller_controller.go +++ b/pkg/daemon/imagepuller/imagepuller_controller.go @@ -24,13 +24,6 @@ import ( "reflect" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/client" - kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" - listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" - daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" - daemonutil "github.com/openkruise/kruise/pkg/daemon/util" - utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,6 +38,14 @@ import ( "k8s.io/client-go/tools/reference" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/client" + kruiseclient "github.com/openkruise/kruise/pkg/client/clientset/versioned" + listersalpha1 "github.com/openkruise/kruise/pkg/client/listers/apps/v1alpha1" + daemonoptions "github.com/openkruise/kruise/pkg/daemon/options" + daemonutil "github.com/openkruise/kruise/pkg/daemon/util" + utilimagejob "github.com/openkruise/kruise/pkg/util/imagejob" ) type Controller struct { @@ -65,12 +66,12 @@ func NewController(opts daemonoptions.Options, secretManager daemonutil.SecretMa eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: genericClient.KubeClient.CoreV1().Events("")}) recorder := eventBroadcaster.NewRecorder(opts.Scheme, v1.EventSource{Component: "kruise-daemon-imagepuller", Host: opts.NodeName}) - queue := workqueue.NewNamedRateLimitingQueue( - // Backoff duration from 500ms to 50~55s - // For nodeimage controller will mark a image:tag task failed (not responded for a long time) if daemon does not report status in 60s. - workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(rand.Intn(5000))), - "imagepuller", - ) + // Backoff duration from 500ms to 50~55s + // For nodeimage controller will mark an image:tag task failed (not responded for a long time) if daemon does not report status in 60s. + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(500*time.Millisecond, 50*time.Second+time.Millisecond*time.Duration(rand.Intn(5000))) + queue := workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{ + Name: "imagepuller", + }) informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -94,6 +95,11 @@ func NewController(opts daemonoptions.Options, secretManager daemonutil.SecretMa }, }) + if opts.MaxWorkersForPullImages > 0 { + klog.InfoS("set image pull worker number", "worker", opts.MaxWorkersForPullImages) + workerLimitedPool = NewChanPool(opts.MaxWorkersForPullImages) + go workerLimitedPool.Start() + } puller, err := newRealPuller(opts.RuntimeFactory.GetImageService(), secretManager, recorder) if err != nil { return nil, fmt.Errorf("failed to new puller: %v", err) @@ -165,6 +171,9 @@ func (c *Controller) Run(stop <-chan struct{}) { klog.Info("Started puller controller successfully") <-stop + if workerLimitedPool != nil { + workerLimitedPool.Stop() + } } // processNextWorkItem will read a single work item off the workqueue and @@ -244,6 +253,8 @@ func (c *Controller) sync(key string) (retErr error) { newStatus.Failed++ case appsv1alpha1.ImagePhasePulling: newStatus.Pulling++ + case appsv1alpha1.ImagePhaseWaiting: + newStatus.Waiting++ } } } diff --git a/pkg/daemon/imagepuller/imagepuller_test.go b/pkg/daemon/imagepuller/imagepuller_test.go new file mode 100644 index 000000000..32e0e59ef --- /dev/null +++ b/pkg/daemon/imagepuller/imagepuller_test.go @@ -0,0 +1,358 @@ +package imagepuller + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/tools/reference" + "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1" + "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime" +) + +type fakeRuntime struct { + mu sync.Mutex + images map[string]*imageStatus +} + +type imageStatus struct { + progress int + statusCh chan int +} + +func (f *fakeRuntime) PullImage(ctx context.Context, imageName, tag string, pullSecrets []v1.Secret, sandboxConfig *appsv1alpha1.SandboxConfig) (imageruntime.ImagePullStatusReader, error) { + f.mu.Lock() + defer f.mu.Unlock() + + key := imageName + ":" + tag + if _, exist := f.images[key]; !exist { + f.images[key] = &imageStatus{ + progress: 0, + statusCh: make(chan int, 10), + } + } + r := newFakeStatusReader(key, f.images[key]) + go func() { + ticker := time.NewTicker(time.Second * 1) + defer ticker.Stop() + for { + select { + case <-ticker.C: + func() { + f.mu.Lock() + defer f.mu.Unlock() + val, exist := f.images[key] + if !exist { + return + } + if val.progress >= 100 { + r.ch <- imageruntime.ImagePullStatus{ + Err: nil, + Process: 100, + DetailInfo: "finished", + Finish: true, + } + } + }() + case <-r.done: + func() { + f.mu.Lock() + defer f.mu.Unlock() + delete(f.images, key) + }() + return + } + } + }() + return &r, nil +} + +func (f *fakeRuntime) ListImages(ctx context.Context) ([]imageruntime.ImageInfo, error) { + f.mu.Lock() + defer f.mu.Unlock() + + var images []imageruntime.ImageInfo + for name := range f.images { + images = append(images, imageruntime.ImageInfo{ID: name}) + } + return images, nil +} + +func (f *fakeRuntime) increaseProgress(image string, delta int) { + key := image + f.mu.Lock() + defer f.mu.Unlock() + klog.Infof("increase progress %v +%v", key, delta) + + if status, exist := f.images[key]; exist { + status.progress += delta + if status.progress > 100 { + status.progress = 100 + } + status.statusCh <- status.progress + } +} + +func (f *fakeRuntime) clean() { + f.mu.Lock() + defer f.mu.Unlock() + f.images = make(map[string]*imageStatus) +} + +type fakeStatusReader struct { + image string + status *imageStatus + ch chan imageruntime.ImagePullStatus + mu sync.Mutex + closed bool + done chan struct{} +} + +func newFakeStatusReader(image string, status *imageStatus) fakeStatusReader { + return fakeStatusReader{ + image: image, + status: status, + ch: make(chan imageruntime.ImagePullStatus, 10), + done: make(chan struct{}), + } +} + +func (r *fakeStatusReader) C() <-chan imageruntime.ImagePullStatus { + return r.ch +} + +func (r *fakeStatusReader) Close() { + r.mu.Lock() + defer r.mu.Unlock() + + if !r.closed { + klog.InfoS("Closing reader", "image", r.image) + close(r.done) + r.closed = true + } +} + +type fakeSecret struct { +} + +func (f *fakeSecret) GetSecrets(secret []appsv1alpha1.ReferenceObject) ([]v1.Secret, error) { + return nil, nil +} + +// test case here +func realPullerSyncFn(t *testing.T, limitedPool bool) { + eventRecorder := record.NewFakeRecorder(100) + secretManager := fakeSecret{} + + baseNodeImage := &appsv1alpha1.NodeImage{ + Spec: appsv1alpha1.NodeImageSpec{ + Images: map[string]appsv1alpha1.ImageSpec{ + "nginx": { + Tags: []appsv1alpha1.ImageTagSpec{ + {Tag: "latest", Version: 1}, + }, + }, + }, + }, + } + + testCases := []struct { + name string + prePools map[string]struct{} + inputSpec *appsv1alpha1.NodeImage + expectPools sets.String + expectErr bool + }{ + { + name: "add new image", + inputSpec: baseNodeImage, + expectPools: sets.NewString("nginx"), + }, + { + name: "remove image", + prePools: map[string]struct{}{"redis": {}}, + inputSpec: baseNodeImage, + expectPools: sets.NewString("nginx"), + }, + { + name: "add image", + prePools: map[string]struct{}{"nginx": {}}, + inputSpec: &appsv1alpha1.NodeImage{ + Spec: appsv1alpha1.NodeImageSpec{ + Images: map[string]appsv1alpha1.ImageSpec{ + "nginx": { + Tags: []appsv1alpha1.ImageTagSpec{ + {Tag: "latest", Version: 1}, + }, + }, + "busybox": { + Tags: []appsv1alpha1.ImageTagSpec{ + {Tag: "1.15", Version: 1}, + }, + }, + }, + }, + }, + expectPools: sets.NewString("nginx", "busybox"), + }, + } + fakeRuntime := &fakeRuntime{images: make(map[string]*imageStatus)} + workerLimitedPool = NewChanPool(2) + go workerLimitedPool.Start() + p, _ := newRealPuller(fakeRuntime, &secretManager, eventRecorder) + + nameSuffuix := "_NoLimitPool" + if limitedPool { + nameSuffuix = "_LimitPool" + } + for _, tc := range testCases { + t.Run(tc.name+nameSuffuix, func(t *testing.T) { + for poolName := range tc.prePools { + p.workerPools[poolName] = newRealWorkerPool(poolName, fakeRuntime, &secretManager, eventRecorder) + } + ref, _ := reference.GetReference(scheme, tc.inputSpec) + err := p.Sync(tc.inputSpec, ref) + if (err != nil) != tc.expectErr { + t.Fatalf("expect error %v, but got %v", tc.expectErr, err) + } + + actualPools := sets.String{} + for name := range p.workerPools { + p.workerPools[name].Stop() + actualPools.Insert(name) + } + if !actualPools.Equal(tc.expectPools) { + t.Errorf("expect pools %v, but got %v", tc.expectPools.List(), actualPools.List()) + } + + for prePool := range tc.prePools { + if !tc.expectPools.Has(prePool) { + if pool, exists := p.workerPools[prePool]; exists { + if pool.(*realWorkerPool).active { + t.Errorf("expect pool %s to be stopped", prePool) + } + } + } + } + }) + } + + t.Log("clean fake runtime") + time.Sleep(1 * time.Second) + infos, _ := fakeRuntime.ListImages(context.Background()) + for _, info := range infos { + fakeRuntime.increaseProgress(info.ID, 100) + } + time.Sleep(2 * time.Second) + + // clear errgroup + for _, val := range p.workerPools { + val.Stop() + } + fakeRuntime.clean() +} + +func TestRealPullerSync(t *testing.T) { + realPullerSyncFn(t, false) + realPullerSyncFn(t, true) +} + +var scheme *runtime.Scheme + +func init() { + scheme = runtime.NewScheme() + utilruntime.Must(appsv1alpha1.AddToScheme(scheme)) + utilruntime.Must(appsv1beta1.AddToScheme(scheme)) + utilruntime.Must(policyv1alpha1.AddToScheme(scheme)) +} + +func TestRealPullerSyncWithLimitedPool(t *testing.T) { + eventRecorder := record.NewFakeRecorder(100) + secretManager := fakeSecret{} + + baseNodeImage := &appsv1alpha1.NodeImage{ + Spec: appsv1alpha1.NodeImageSpec{ + Images: map[string]appsv1alpha1.ImageSpec{ + "nginx": { + Tags: []appsv1alpha1.ImageTagSpec{ + {Tag: "latest", Version: 1}, + }, + }, + "busybox": { + Tags: []appsv1alpha1.ImageTagSpec{ + {Tag: "latest", Version: 1}, + }, + }, + "redis": { + Tags: []appsv1alpha1.ImageTagSpec{ + {Tag: "latest", Version: 1}, + }, + }, + }, + }, + } + + r := &fakeRuntime{images: make(map[string]*imageStatus)} + workerLimitedPool = NewChanPool(2) + go workerLimitedPool.Start() + p, _ := newRealPuller(r, &secretManager, eventRecorder) + + ref, _ := reference.GetReference(scheme, baseNodeImage) + err := p.Sync(baseNodeImage, ref) + assert.Nil(t, err) + + time.Sleep(100 * time.Millisecond) + checkPhase := func(expectRunning, expectWaiting int) { + running, waiting := 0, 0 + for imageName := range baseNodeImage.Spec.Images { + status := p.GetStatus(imageName) + t.Log(status) + if status == nil { + t.Errorf("expect image %s to be running", imageName) + continue + } + if status.Tags[0].Phase == appsv1alpha1.ImagePhasePulling { + running++ + } else if status.Tags[0].Phase == appsv1alpha1.ImagePhaseWaiting { + waiting++ + } + } + assert.Equal(t, expectRunning, running) + assert.Equal(t, expectWaiting, waiting) + } + // first pulling 2, and 1 waiting + checkPhase(2, 1) + infos, _ := r.ListImages(context.Background()) + if len(infos) > 0 { + r.increaseProgress(infos[0].ID, 100) + } + time.Sleep(2 * time.Second) + // when 1 success, 2 pulling, 0 waiting + checkPhase(2, 0) + infos, _ = r.ListImages(context.Background()) + for _, info := range infos { + r.increaseProgress(info.ID, 100) + } + time.Sleep(2 * time.Second) + // when 3 success, 0 pulling, 0 waiting + checkPhase(0, 0) + + t.Log("clean fake runtime") + // clear errgroup + for _, val := range p.workerPools { + val.Stop() + } + + r.clean() +} diff --git a/pkg/daemon/imagepuller/imagepuller_worker.go b/pkg/daemon/imagepuller/imagepuller_worker.go index 4c3111b4c..57cff8d13 100644 --- a/pkg/daemon/imagepuller/imagepuller_worker.go +++ b/pkg/daemon/imagepuller/imagepuller_worker.go @@ -23,15 +23,16 @@ import ( "sync" "time" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime" - daemonutil "github.com/openkruise/kruise/pkg/daemon/util" - "github.com/openkruise/kruise/pkg/util" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + runtimeimage "github.com/openkruise/kruise/pkg/daemon/criruntime/imageruntime" + daemonutil "github.com/openkruise/kruise/pkg/daemon/util" + "github.com/openkruise/kruise/pkg/util" ) const ( @@ -47,6 +48,8 @@ const ( PullImageFailed = "PullImageFailed" ) +var workerLimitedPool ImagePullWorkerPool + type puller interface { Sync(obj *appsv1alpha1.NodeImage, ref *v1.ObjectReference) error GetStatus(imageName string) *appsv1alpha1.ImageStatus @@ -264,6 +267,8 @@ func (w *realWorkerPool) UpdateStatus(status *appsv1alpha1.ImageTagStatus) { } func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, sandboxConfig *appsv1alpha1.SandboxConfig, secrets []v1.Secret, runtime runtimeimage.ImageService, statusUpdater imageStatusUpdater, ref *v1.ObjectReference, eventRecorder record.EventRecorder) *pullWorker { + image := name + ":" + tagSpec.Tag + klog.V(5).InfoS("new pull worker", "image", image) o := &pullWorker{ name: name, tagSpec: tagSpec, @@ -276,7 +281,31 @@ func newPullWorker(name string, tagSpec appsv1alpha1.ImageTagSpec, sandboxConfig active: true, stopCh: make(chan struct{}), } - go o.Run() + + go func() { + newStatus := &appsv1alpha1.ImageTagStatus{ + Tag: tagSpec.Tag, + Phase: appsv1alpha1.ImagePhaseWaiting, + Version: tagSpec.Version, + } + o.statusUpdater.UpdateStatus(newStatus) + klog.V(5).InfoS("pull worker waiting", "image", image) + fn := func() { + klog.V(5).InfoS("pull worker start", "image", image) + o.Run() + klog.V(5).InfoS("pull worker end", "image", image) + } + if workerLimitedPool != nil { + workerLimitedPool.Submit(func() { + fn() + }) + } else { + // no limited_pool + klog.V(5).InfoS("pull worker without limited_pool", "image", image) + fn() + } + + }() return o } @@ -444,8 +473,16 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im // make it asynchronous for CRI runtime will block in pulling image var statusReader runtimeimage.ImagePullStatusReader pullChan := make(chan struct{}) + // Why add this? Directly assigning to `statusReader` and `err` can cause race conditions. + // Case: When a worker is pulling data, stopping the pull worker might lead to concurrent writes to the local parameter `err`. + // (line 483 and 503) + // To avoid this, we introduce `readerCh` and `errCh` for communication between multiple goroutines. + readerCh := make(chan runtimeimage.ImagePullStatusReader, 1) + errCh := make(chan error, 1) go func() { - statusReader, err = w.runtime.PullImage(ctx, w.name, tag, w.secrets, w.sandboxConfig) + statusReader, err := w.runtime.PullImage(ctx, w.name, tag, w.secrets, w.sandboxConfig) + readerCh <- statusReader + errCh <- err close(pullChan) }() @@ -453,6 +490,7 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im select { case <-pullChan: } + statusReader := <-readerCh if statusReader != nil { statusReader.Close() } @@ -468,6 +506,8 @@ func (w *pullWorker) doPullImage(ctx context.Context, newStatus *appsv1alpha1.Im klog.V(2).InfoS("Pulling image canceled", "name", w.name, "tag", tag) return fmt.Errorf("pulling image %s:%s is canceled", w.name, tag) case <-pullChan: + statusReader = <-readerCh + err = <-errCh if err != nil { return err } diff --git a/pkg/daemon/imagepuller/imagepuller_worker_pool.go b/pkg/daemon/imagepuller/imagepuller_worker_pool.go new file mode 100644 index 000000000..e21cf746c --- /dev/null +++ b/pkg/daemon/imagepuller/imagepuller_worker_pool.go @@ -0,0 +1,50 @@ +package imagepuller + +import ( + "sync" + + "k8s.io/klog/v2" +) + +type Task func() + +type ImagePullWorkerPool interface { + Submit(fn Task) + Start() + Stop() +} + +type chanPool struct { + queue chan Task + wg sync.WaitGroup + maxWorks int +} + +func NewChanPool(n int) ImagePullWorkerPool { + return &chanPool{ + queue: make(chan Task, n), + maxWorks: n, + } +} + +func (p *chanPool) Submit(task Task) { + p.queue <- task +} + +func (p *chanPool) Start() { + for i := 0; i < p.maxWorks; i++ { + go func() { + defer p.wg.Done() + for task := range p.queue { + task() + } + }() + p.wg.Add(1) + } +} + +func (p *chanPool) Stop() { + close(p.queue) + p.wg.Wait() + klog.Info("all worker in image pull worker pool stopped") +} diff --git a/pkg/daemon/options/options.go b/pkg/daemon/options/options.go index df63be520..be92be487 100644 --- a/pkg/daemon/options/options.go +++ b/pkg/daemon/options/options.go @@ -17,11 +17,12 @@ limitations under the License. package options import ( - daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" - daemonutil "github.com/openkruise/kruise/pkg/daemon/util" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" runtimeclient "sigs.k8s.io/controller-runtime/pkg/client" + + daemonruntime "github.com/openkruise/kruise/pkg/daemon/criruntime" + daemonutil "github.com/openkruise/kruise/pkg/daemon/util" ) type Options struct { @@ -32,4 +33,6 @@ type Options struct { RuntimeFactory daemonruntime.Factory Healthz *daemonutil.Healthz + + MaxWorkersForPullImages int } diff --git a/pkg/daemon/podprobe/pod_probe_controller.go b/pkg/daemon/podprobe/pod_probe_controller.go index 1108fe32b..124fe291e 100644 --- a/pkg/daemon/podprobe/pod_probe_controller.go +++ b/pkg/daemon/podprobe/pod_probe_controller.go @@ -363,7 +363,7 @@ func (c *Controller) syncUpdateNodePodProbeStatus() error { klog.ErrorS(err, "NodePodProbe update status failed", "nodeName", c.nodeName) return err } - klog.InfoS("NodePodProbe(%s) update status success", "nodeName", c.nodeName, "from", commonutil.DumpJSON(npp.Status), "to", commonutil.DumpJSON(nppClone.Status)) + klog.InfoS("NodePodProbe update status success", "nodeName", c.nodeName, "from", commonutil.DumpJSON(npp.Status), "to", commonutil.DumpJSON(nppClone.Status)) return nil } diff --git a/test/e2e/apps/containermeta.go b/test/e2e/apps/containermeta.go index b166b6025..f46d0e7e2 100644 --- a/test/e2e/apps/containermeta.go +++ b/test/e2e/apps/containermeta.go @@ -23,15 +23,16 @@ import ( "github.com/onsi/ginkgo" "github.com/onsi/gomega" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" - "github.com/openkruise/kruise/pkg/util" - "github.com/openkruise/kruise/test/e2e/framework" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" clientset "k8s.io/client-go/kubernetes" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/util" + "github.com/openkruise/kruise/test/e2e/framework" ) var _ = SIGDescribe("ContainerMeta", func() {