Compare commits

..

3 Commits

Author SHA1 Message Date
Richard Hrmo 7aaec368da Add featuregate for releasing leader election lease on sigterm 2025-08-12 09:02:21 +02:00
Richard Hrmo 8fe75b1062 go get k8s.io/apiserver/pkg/server && go mod tidy && go mod vendor 2025-08-08 12:48:40 +02:00
Eddie 462cd5450e
Populate `VolumeError.ErrorCode` field in VolumeAttachment object (#662)
* Populate VolumeError.ErrorCode field

Signed-off-by: Eddie Torres <torredil@amazon.com>

* Guard patching AttachError.ErrorCode behind MutableCSINodeAllocatableCount feature gate

Signed-off-by: Eddie Torres <torredil@amazon.com>

* Fix make test

Signed-off-by: Eddie Torres <torredil@amazon.com>

---------

Signed-off-by: Eddie Torres <torredil@amazon.com>
2025-08-01 06:09:38 -07:00
8 changed files with 292 additions and 31 deletions

View File

@ -103,7 +103,7 @@ var (
)
func main() {
flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "Comma-seprated list of key=value pairs that describe feature gates for alpha/experimental features. "+
flag.Var(utilflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
"Options are:\n"+strings.Join(utilfeature.DefaultFeatureGate.KnownFeatures(), "\n"))
fg := featuregate.NewFeatureGate()
@ -120,7 +120,7 @@ func main() {
}
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(featureGates); err != nil {
logger.Error(err, "Error while parsing feature gates")
logger.Error(err, "failed to store flag gates", "featureGates", featureGates)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
@ -298,17 +298,15 @@ func main() {
)
// handle SIGTERM and SIGINT by cancelling the context.
var (
globalCtx context.Context // shuts down the whole process, incl. leader election
terminate func() // called when all controllers are finished
controllerCtx context.Context // shuts down all controllers on a signal
shutdownHandler <-chan struct{} // called when the signal is received
signalReceived bool
)
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
globalCtx, terminate = context.WithCancel(ctx)
ctx, terminate = context.WithCancel(ctx) // shuts down the whole process, incl. leader election
var cancelControllerCtx context.CancelFunc
controllerCtx, cancelControllerCtx = context.WithCancel(globalCtx)
controllerCtx, cancelControllerCtx = context.WithCancel(ctx)
shutdownHandler = server.SetupSignalHandler()
defer terminate()
@ -316,7 +314,6 @@ func main() {
go func() {
defer cancelControllerCtx()
<-shutdownHandler
signalReceived = true
logger.Info("Received SIGTERM or SIGINT signal, shutting down controller.")
}()
}
@ -324,13 +321,10 @@ func main() {
run := func(ctx context.Context) {
if utilfeature.DefaultFeatureGate.Enabled(features.ReleaseLeaderElectionOnExit) {
var wg sync.WaitGroup
stopCh := shutdownHandler
factory.Start(stopCh)
factory.Start(shutdownHandler)
ctrl.Run(controllerCtx, int(*workerThreads), &wg)
if signalReceived {
wg.Wait()
terminate()
}
wg.Wait()
terminate()
} else {
stopCh := ctx.Done()
factory.Start(stopCh)

10
go.mod
View File

@ -13,11 +13,11 @@ require (
github.com/kubernetes-csi/csi-lib-utils v0.22.0
github.com/kubernetes-csi/csi-test/v5 v5.3.1
google.golang.org/grpc v1.72.1
k8s.io/api v0.33.2
k8s.io/apimachinery v0.33.2
k8s.io/apiserver v0.33.2
k8s.io/client-go v0.33.2
k8s.io/component-base v0.33.2
k8s.io/api v0.33.3
k8s.io/apimachinery v0.33.3
k8s.io/apiserver v0.33.3
k8s.io/client-go v0.33.3
k8s.io/component-base v0.33.3
k8s.io/csi-translation-lib v0.33.0
k8s.io/klog/v2 v2.130.1
)

4
go.sum
View File

@ -292,8 +292,8 @@ k8s.io/api v0.33.0 h1:yTgZVn1XEe6opVpP1FylmNrIFWuDqe2H0V8CT5gxfIU=
k8s.io/api v0.33.0/go.mod h1:CTO61ECK/KU7haa3qq8sarQ0biLq2ju405IZAd9zsiM=
k8s.io/apimachinery v0.33.0 h1:1a6kHrJxb2hs4t8EE5wuR/WxKDwGN1FKH3JvDtA0CIQ=
k8s.io/apimachinery v0.33.0/go.mod h1:BHW0YOu7n22fFv/JkYOEfkUYNRN0fj0BlvMFWA7b+SM=
k8s.io/apiserver v0.33.2 h1:KGTRbxn2wJagJowo29kKBp4TchpO1DRO3g+dB/KOJN4=
k8s.io/apiserver v0.33.2/go.mod h1:9qday04wEAMLPWWo9AwqCZSiIn3OYSZacDyu/AcoM/M=
k8s.io/apiserver v0.33.3 h1:Wv0hGc+QFdMJB4ZSiHrCgN3zL3QRatu56+rpccKC3J4=
k8s.io/apiserver v0.33.3/go.mod h1:05632ifFEe6TxwjdAIrwINHWE2hLwyADFk5mBsQa15E=
k8s.io/client-go v0.33.0 h1:UASR0sAYVUzs2kYuKn/ZakZlcs2bEHaizrrHUZg0G98=
k8s.io/client-go v0.33.0/go.mod h1:kGkd+l/gNGg8GYWAPr0xF1rRKvVWvzh9vmZAMXtaKOg=
k8s.io/component-base v0.33.0 h1:Ot4PyJI+0JAD9covDhwLp9UNkUja209OzsJ4FzScBNk=

View File

@ -26,11 +26,14 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"github.com/kubernetes-csi/external-attacher/pkg/features"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
@ -610,11 +613,21 @@ func (h *csiHandler) saveAttachError(ctx context.Context, va *storage.VolumeAtta
logger := klog.FromContext(ctx)
logger.V(4).Info("Saving attach error")
clone := va.DeepCopy()
clone.Status.AttachError = &storage.VolumeError{
volumeError := &storage.VolumeError{
Message: err.Error(),
Time: metav1.Now(),
}
if utilfeature.DefaultFeatureGate.Enabled(features.MutableCSINodeAllocatableCount) {
if st, ok := status.FromError(err); ok {
errorCode := int32(st.Code())
volumeError.ErrorCode = &errorCode
}
}
clone.Status.AttachError = volumeError
var newVa *storage.VolumeAttachment
if newVa, err = h.patchVA(ctx, va, clone, "status"); err != nil {
return va, err

View File

@ -23,9 +23,12 @@ import (
"testing"
"time"
"google.golang.org/grpc/codes"
"github.com/kubernetes-csi/csi-lib-utils/connection"
"github.com/kubernetes-csi/external-attacher/pkg/attacher"
"github.com/kubernetes-csi/external-attacher/pkg/features"
"google.golang.org/grpc/status"
v1 "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -33,9 +36,11 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
core "k8s.io/client-go/testing"
featuregatetesting "k8s.io/component-base/featuregate/testing"
csitranslator "k8s.io/csi-translation-lib"
"k8s.io/klog/v2"
_ "k8s.io/klog/v2/ktesting/init"
@ -269,6 +274,16 @@ func patch(original, new interface{}) []byte {
return patch
}
func vaWithAttachErrorAndCode(va *storage.VolumeAttachment, message string, code codes.Code) *storage.VolumeAttachment {
errorCode := int32(code)
va.Status.AttachError = &storage.VolumeError{
Message: message,
Time: metav1.Time{},
ErrorCode: &errorCode,
}
return va
}
func TestCSIHandler(t *testing.T) {
vaGroupResourceVersion := schema.GroupVersionResource{
Group: storage.GroupName,
@ -1403,6 +1418,61 @@ func TestCSIHandler(t *testing.T) {
runTests(t, csiHandlerFactory, tests)
}
func TestVolumeAttachmentWithErrorCode(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.MutableCSINodeAllocatableCount, true)
vaGroupResourceVersion := schema.GroupVersionResource{
Group: storage.GroupName,
Version: "v1",
Resource: "volumeattachments",
}
var noMetadata map[string]string
var noAttrs map[string]string
var noSecrets map[string]string
var notDetached = false
var success error
var readWrite = false
test := testCase{
name: "CSI attach fails with gRPC error -> controller saves ErrorCode and retries",
initialObjects: []runtime.Object{pvWithFinalizer(), csiNode()},
addedVA: va(false, "", nil),
expectedActions: []core.Action{
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false, "", nil), va(false, fin, ann))),
// The CSI call fails, so the controller saves the error status.
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone,
testPVName+"-"+testNodeName,
types.MergePatchType, patch(va(false, fin, ann),
vaWithAttachErrorAndCode(va(false, fin, ann), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted)), "status"),
// On retry, the controller reads the original VA again and tries to re-apply the finalizer/annotation.
core.NewPatchAction(vaGroupResourceVersion, metav1.NamespaceNone, testPVName+"-"+testNodeName,
types.MergePatchType, patch(
vaWithAttachErrorAndCode(va(false, "", nil), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted),
vaWithAttachErrorAndCode(va(false, fin, ann), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted),
)),
// The CSI call succeeds now, and the controller clears the error and marks the VA as attached.
core.NewPatchSubresourceAction(vaGroupResourceVersion, metav1.NamespaceNone,
testPVName+"-"+testNodeName,
types.MergePatchType, patch(
vaWithAttachErrorAndCode(va(false, fin, ann), "rpc error: code = ResourceExhausted desc = mock rpc error", codes.ResourceExhausted),
va(true /*attached*/, fin, ann),
),
"status"),
},
expectedCSICalls: []csiCall{
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, status.Error(codes.ResourceExhausted, "mock rpc error"), notDetached, noMetadata, 0},
{"attach", testVolumeHandle, testNodeID, noAttrs, noSecrets, readWrite, success, notDetached, noMetadata, 0},
},
}
runTests(t, csiHandlerFactory, []testCase{test})
}
func TestCSIHandlerReconcileVA(t *testing.T) {
nID := map[string]string{
vaNodeIDAnnotation: testNodeID,

View File

@ -22,16 +22,25 @@ import (
)
const (
// Releases leader election lease on sigterm / sigint
ReleaseLeaderElectionOnExit featuregate.Feature = "ReleaseLeaderElectionOnExit"
// owner: @torredil @gnufied @msau42
// kep: https://kep.k8s.io/4876
// alpha: v1.33
// beta: v1.34
//
// Makes CSINode.Spec.Drivers[*].Allocatable.Count mutable, allowing CSI drivers to
// update the number of volumes that can be allocated on a node. Additionally, enables
// setting ErrorCode field in VolumeAttachment status.
ReleaseLeaderElectionOnExit featuregate.Feature = "ReleaseLeaderElectionOnExit"
MutableCSINodeAllocatableCount featuregate.Feature = "MutableCSINodeAllocatableCount"
)
func init() {
feature.DefaultMutableFeatureGate.Add(defaultKubernetesFeatureGates)
}
// defaultKubernetesFeatureGates consists of all known feature keys specific to external-snapshotter.
// defaultKubernetesFeatureGates consists of all known feature keys specific to external-attacher.
// To add a new feature, define a key for it above and add it here.
var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
ReleaseLeaderElectionOnExit: {Default: false, PreRelease: featuregate.Alpha},
ReleaseLeaderElectionOnExit: {Default: false, PreRelease: featuregate.Alpha},
MutableCSINodeAllocatableCount: {Default: false, PreRelease: featuregate.Beta},
}

View File

@ -0,0 +1,174 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"strings"
"sync"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/component-base/featuregate"
)
var (
overrideLock sync.Mutex
featureFlagOverride map[featuregate.Feature]string
emulationVersionOverride string
emulationVersionOverrideValue *version.Version
)
func init() {
featureFlagOverride = map[featuregate.Feature]string{}
}
// SetFeatureGateDuringTest sets the specified gate to the specified value for duration of the test.
// Fails when it detects second call to the same flag or is unable to set or restore feature flag.
//
// WARNING: Can leak set variable when called in test calling t.Parallel(), however second attempt to set the same feature flag will cause fatal.
//
// Example use:
//
// featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.<FeatureName>, true)
func SetFeatureGateDuringTest(tb TB, gate featuregate.FeatureGate, f featuregate.Feature, value bool) {
tb.Helper()
detectParallelOverrideCleanup := detectParallelOverride(tb, f)
originalValue := gate.Enabled(f)
originalEmuVer := gate.(featuregate.MutableVersionedFeatureGate).EmulationVersion()
originalExplicitlySet := gate.(featuregate.MutableVersionedFeatureGate).ExplicitlySet(f)
// Specially handle AllAlpha and AllBeta
if f == "AllAlpha" || f == "AllBeta" {
// Iterate over individual gates so their individual values get restored
for k, v := range gate.(featuregate.MutableFeatureGate).GetAll() {
if k == "AllAlpha" || k == "AllBeta" {
continue
}
if (f == "AllAlpha" && v.PreRelease == featuregate.Alpha) || (f == "AllBeta" && v.PreRelease == featuregate.Beta) {
SetFeatureGateDuringTest(tb, gate, k, value)
}
}
}
if err := gate.(featuregate.MutableFeatureGate).Set(fmt.Sprintf("%s=%v", f, value)); err != nil {
tb.Errorf("error setting %s=%v: %v", f, value, err)
}
tb.Cleanup(func() {
tb.Helper()
detectParallelOverrideCleanup()
emuVer := gate.(featuregate.MutableVersionedFeatureGate).EmulationVersion()
if !emuVer.EqualTo(originalEmuVer) {
tb.Fatalf("change of feature gate emulation version from %s to %s in the chain of SetFeatureGateDuringTest is not allowed\nuse SetFeatureGateEmulationVersionDuringTest to change emulation version in tests",
originalEmuVer.String(), emuVer.String())
}
if originalExplicitlySet {
if err := gate.(featuregate.MutableFeatureGate).Set(fmt.Sprintf("%s=%v", f, originalValue)); err != nil {
tb.Errorf("error restoring %s=%v: %v", f, originalValue, err)
}
} else {
if err := gate.(featuregate.MutableVersionedFeatureGate).ResetFeatureValueToDefault(f); err != nil {
tb.Errorf("error restoring %s=%v: %v", f, originalValue, err)
}
}
})
}
// SetFeatureGateEmulationVersionDuringTest sets the specified gate to the specified emulation version for duration of the test.
// Fails when it detects second call to set a different emulation version or is unable to set or restore emulation version.
// WARNING: Can leak set variable when called in test calling t.Parallel(), however second attempt to set a different emulation version will cause fatal.
// Example use:
// featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.31"))
func SetFeatureGateEmulationVersionDuringTest(tb TB, gate featuregate.FeatureGate, ver *version.Version) {
tb.Helper()
detectParallelOverrideCleanup := detectParallelOverrideEmulationVersion(tb, ver)
originalEmuVer := gate.(featuregate.MutableVersionedFeatureGate).EmulationVersion()
if err := gate.(featuregate.MutableVersionedFeatureGate).SetEmulationVersion(ver); err != nil {
tb.Fatalf("failed to set emulation version to %s during test: %v", ver.String(), err)
}
tb.Cleanup(func() {
tb.Helper()
detectParallelOverrideCleanup()
if err := gate.(featuregate.MutableVersionedFeatureGate).SetEmulationVersion(originalEmuVer); err != nil {
tb.Fatalf("failed to restore emulation version to %s during test", originalEmuVer.String())
}
})
}
func detectParallelOverride(tb TB, f featuregate.Feature) func() {
tb.Helper()
overrideLock.Lock()
defer overrideLock.Unlock()
beforeOverrideTestName := featureFlagOverride[f]
if beforeOverrideTestName != "" && !sameTestOrSubtest(tb, beforeOverrideTestName) {
tb.Fatalf("Detected parallel setting of a feature gate by both %q and %q", beforeOverrideTestName, tb.Name())
}
featureFlagOverride[f] = tb.Name()
return func() {
tb.Helper()
overrideLock.Lock()
defer overrideLock.Unlock()
if afterOverrideTestName := featureFlagOverride[f]; afterOverrideTestName != tb.Name() {
tb.Fatalf("Detected parallel setting of a feature gate between both %q and %q", afterOverrideTestName, tb.Name())
}
featureFlagOverride[f] = beforeOverrideTestName
}
}
func detectParallelOverrideEmulationVersion(tb TB, ver *version.Version) func() {
tb.Helper()
overrideLock.Lock()
defer overrideLock.Unlock()
beforeOverrideTestName := emulationVersionOverride
beforeOverrideValue := emulationVersionOverrideValue
if ver.EqualTo(beforeOverrideValue) {
return func() {}
}
if beforeOverrideTestName != "" && !sameTestOrSubtest(tb, beforeOverrideTestName) {
tb.Fatalf("Detected parallel setting of a feature gate emulation version by both %q and %q", beforeOverrideTestName, tb.Name())
}
emulationVersionOverride = tb.Name()
emulationVersionOverrideValue = ver
return func() {
tb.Helper()
overrideLock.Lock()
defer overrideLock.Unlock()
if afterOverrideTestName := emulationVersionOverride; afterOverrideTestName != tb.Name() {
tb.Fatalf("Detected parallel setting of a feature gate emulation version between both %q and %q", afterOverrideTestName, tb.Name())
}
emulationVersionOverride = beforeOverrideTestName
emulationVersionOverrideValue = beforeOverrideValue
}
}
func sameTestOrSubtest(tb TB, testName string) bool {
// Assumes that "/" is not used in test names.
return tb.Name() == testName || strings.HasPrefix(tb.Name(), testName+"/")
}
type TB interface {
Cleanup(func())
Error(args ...any)
Errorf(format string, args ...any)
Fatal(args ...any)
Fatalf(format string, args ...any)
Helper()
Name() string
}

11
vendor/modules.txt vendored
View File

@ -505,7 +505,7 @@ gopkg.in/inf.v0
# gopkg.in/yaml.v3 v3.0.1
## explicit
gopkg.in/yaml.v3
# k8s.io/api v0.33.2 => k8s.io/api v0.33.0
# k8s.io/api v0.33.3 => k8s.io/api v0.33.0
## explicit; go 1.24.0
k8s.io/api/admission/v1
k8s.io/api/admission/v1beta1
@ -567,7 +567,7 @@ k8s.io/api/storage/v1
k8s.io/api/storage/v1alpha1
k8s.io/api/storage/v1beta1
k8s.io/api/storagemigration/v1alpha1
# k8s.io/apimachinery v0.33.2 => k8s.io/apimachinery v0.33.0
# k8s.io/apimachinery v0.33.3 => k8s.io/apimachinery v0.33.0
## explicit; go 1.24.0
k8s.io/apimachinery/pkg/api/equality
k8s.io/apimachinery/pkg/api/errors
@ -634,7 +634,7 @@ k8s.io/apimachinery/pkg/version
k8s.io/apimachinery/pkg/watch
k8s.io/apimachinery/third_party/forked/golang/json
k8s.io/apimachinery/third_party/forked/golang/reflect
# k8s.io/apiserver v0.33.2
# k8s.io/apiserver v0.33.3
## explicit; go 1.24.0
k8s.io/apiserver/pkg/admission
k8s.io/apiserver/pkg/admission/configuration
@ -763,7 +763,7 @@ k8s.io/apiserver/pkg/util/x509metrics
k8s.io/apiserver/pkg/validation
k8s.io/apiserver/pkg/warning
k8s.io/apiserver/plugin/pkg/authenticator/token/webhook
# k8s.io/client-go v0.33.2 => k8s.io/client-go v0.33.0
# k8s.io/client-go v0.33.3 => k8s.io/client-go v0.33.0
## explicit; go 1.24.0
k8s.io/client-go/applyconfigurations
k8s.io/client-go/applyconfigurations/admissionregistration/v1
@ -1100,11 +1100,12 @@ k8s.io/client-go/util/homedir
k8s.io/client-go/util/keyutil
k8s.io/client-go/util/watchlist
k8s.io/client-go/util/workqueue
# k8s.io/component-base v0.33.2 => k8s.io/component-base v0.33.0
# k8s.io/component-base v0.33.3 => k8s.io/component-base v0.33.0
## explicit; go 1.24.0
k8s.io/component-base/cli/flag
k8s.io/component-base/compatibility
k8s.io/component-base/featuregate
k8s.io/component-base/featuregate/testing
k8s.io/component-base/logs
k8s.io/component-base/logs/api/v1
k8s.io/component-base/logs/internal/setverbositylevel