From f016c396ec693962d09101dd2f1836ed760479dd Mon Sep 17 00:00:00 2001 From: justinsb Date: Fri, 30 Dec 2022 10:55:40 -0500 Subject: [PATCH] gce: try to avoid concurrent IAM project operations We set up a process-wide table of mutexes, to avoid concurrent IAM operations on GCE projects. Best-effort is reasonable here, we will retry, but avoiding concurrent operations just avoids logspam and a needless retry from self-conflicts. --- pkg/mutexes/localmutexes.go | 68 +++++++++++++++++++ upup/pkg/fi/cloudup/gce/gce_cloud.go | 6 ++ .../fi/cloudup/gcetasks/projectiambinding.go | 5 ++ 3 files changed, 79 insertions(+) create mode 100644 pkg/mutexes/localmutexes.go diff --git a/pkg/mutexes/localmutexes.go b/pkg/mutexes/localmutexes.go new file mode 100644 index 0000000000..f128eddeb3 --- /dev/null +++ b/pkg/mutexes/localmutexes.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mutexes + +import ( + "sync" +) + +var InProcess LocalMutexes + +// LocalMutexes is a store of named mutexes, used to avoid concurrent local operations. +// For example, GCE project IAM mutation uses a read / conditional-write approach, +// so we try to avoid making two local concurrent calls to the same project. +type LocalMutexes struct { + mutex sync.Mutex + mutexes map[string]*localMutex +} + +func (m *LocalMutexes) Get(key string) LocalMutex { + m.mutex.Lock() + defer m.mutex.Unlock() + + if m.mutexes == nil { + m.mutexes = make(map[string]*localMutex) + } + + l := m.mutexes[key] + if l == nil { + l = &localMutex{} + m.mutexes[key] = l + } + return l +} + +// LocalMutex is the interface for local mutexes. +type LocalMutex interface { + Lock() + Unlock() +} + +// localMutex implements LocalMutex. +type localMutex struct { + m sync.Mutex +} + +// Lock implements LocalMutex. +func (m *localMutex) Lock() { + m.m.Lock() +} + +// Unlock implements LocalMutex. +func (m *localMutex) Unlock() { + m.m.Unlock() +} diff --git a/upup/pkg/fi/cloudup/gce/gce_cloud.go b/upup/pkg/fi/cloudup/gce/gce_cloud.go index 685e3c8afa..4481ac6be5 100644 --- a/upup/pkg/fi/cloudup/gce/gce_cloud.go +++ b/upup/pkg/fi/cloudup/gce/gce_cloud.go @@ -33,6 +33,7 @@ import ( "k8s.io/kops/dnsprovider/pkg/dnsprovider" "k8s.io/kops/dnsprovider/pkg/dnsprovider/providers/google/clouddns" "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/mutexes" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/gce/gcemetadata" ) @@ -55,6 +56,11 @@ type GCECloud interface { CloudResourceManager() *cloudresourcemanager.Service } +// MutexForProjectIAM returns a mutex to prevent local concurrent operations on project IAM. +func MutexForProjectIAM(projectID string) mutexes.LocalMutex { + return mutexes.InProcess.Get("iam/projects/" + projectID) +} + type gceCloudImplementation struct { compute *computeClientImpl storage *storage.Service diff --git a/upup/pkg/fi/cloudup/gcetasks/projectiambinding.go b/upup/pkg/fi/cloudup/gcetasks/projectiambinding.go index 546402674d..24c61359f1 100644 --- a/upup/pkg/fi/cloudup/gcetasks/projectiambinding.go +++ b/upup/pkg/fi/cloudup/gcetasks/projectiambinding.go @@ -104,6 +104,11 @@ func (_ *ProjectIAMBinding) RenderGCE(t *gce.GCEAPITarget, a, e, changes *Projec member := fi.ValueOf(e.Member) role := fi.ValueOf(e.Role) + // Avoid concurrent operations + localMutex := gce.MutexForProjectIAM(projectID) + localMutex.Lock() + defer localMutex.Unlock() + request := &cloudresourcemanager.GetIamPolicyRequest{} policy, err := t.Cloud.CloudResourceManager().Projects.GetIamPolicy(projectID, request).Context(ctx).Do() if err != nil {