mirror of https://github.com/fluxcd/cli-utils.git
fix: Stop StatusWatcher on Forbidden API error
- This matches previous StatusPoller behavior which would error and exit if there was a 403 Forbidden error from the apiserver. - Handle status error before synchronization with immediate exit
This commit is contained in:
parent
2d682225d8
commit
57bbe71b9b
|
@ -121,6 +121,9 @@ func (tsr *TaskStatusRunner) Run(
|
|||
statusEvent.Error)
|
||||
if currentTask != nil {
|
||||
currentTask.Cancel(taskContext)
|
||||
} else {
|
||||
// tasks not started yet - abort now
|
||||
return complete(abortReason)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
@ -207,6 +210,9 @@ func (tsr *TaskStatusRunner) Run(
|
|||
klog.V(7).Infof("Runner aborting: %v", abortReason)
|
||||
if currentTask != nil {
|
||||
currentTask.Cancel(taskContext)
|
||||
} else {
|
||||
// tasks not started yet - abort now
|
||||
return complete(abortReason)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,8 @@ package watcher
|
|||
|
||||
import (
|
||||
"context"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
|
@ -55,3 +57,36 @@ func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta.
|
|||
f.Indexers,
|
||||
)
|
||||
}
|
||||
|
||||
// resourceNotFoundMessage is the condition message for metav1.StatusReasonNotFound.
|
||||
// This is necessary because the Informer doesn't properly wrap list errors.
|
||||
// https://github.com/kubernetes/client-go/blob/v0.24.0/tools/cache/reflector.go#L325
|
||||
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L448
|
||||
// TODO: Remove once fix is released (1.25+): https://github.com/kubernetes/kubernetes/pull/110076
|
||||
const resourceNotFoundMessage = "the server could not find the requested resource"
|
||||
|
||||
// containsNotFoundMessage checks if the error string contains the message for
|
||||
// StatusReasonNotFound.
|
||||
func containsNotFoundMessage(err error) bool {
|
||||
return strings.Contains(err.Error(), resourceNotFoundMessage)
|
||||
}
|
||||
|
||||
// resourceForbiddenMessagePattern is a regex pattern to match the condition
|
||||
// message for metav1.StatusForbidden.
|
||||
// This is necessary because the Informer doesn't properly wrap list errors.
|
||||
// https://github.com/kubernetes/client-go/blob/v0.24.0/tools/cache/reflector.go#L325
|
||||
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L458
|
||||
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L208
|
||||
// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L51
|
||||
// TODO: Remove once fix is released (1.25+): https://github.com/kubernetes/kubernetes/pull/110076
|
||||
const resourceForbiddenMessagePattern = `(.+) is forbidden: User "(.*)" cannot (.+) resource "(.*)" in API group "(.*)"`
|
||||
|
||||
// resourceForbiddenMessageRegexp is the pre-compiled Regexp of
|
||||
// resourceForbiddenMessagePattern.
|
||||
var resourceForbiddenMessageRegexp = regexp.MustCompile(resourceForbiddenMessagePattern)
|
||||
|
||||
// containsForbiddenMessage checks if the error string contains the message for
|
||||
// StatusForbidden.
|
||||
func containsForbiddenMessage(err error) bool {
|
||||
return resourceForbiddenMessageRegexp.Match([]byte(err.Error()))
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ package watcher
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -54,13 +54,7 @@ func TestResourceNotFoundError(t *testing.T) {
|
|||
// dynamicClient converts Status objects from the apiserver into errors.
|
||||
// So we can just return the right error here to simulate an error from
|
||||
// the apiserver.
|
||||
name := "" // unused by LIST requests
|
||||
// The apisevrer confusingly does not return apierrors.NewNotFound,
|
||||
// which has a nice constant for its error message.
|
||||
// err = apierrors.NewNotFound(exampleGR, name)
|
||||
// Instead it uses apierrors.NewGenericServerResponse, which uses
|
||||
// a hard-coded error message.
|
||||
err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false)
|
||||
err = newGenericServerResponse(action, newNotFoundResourceStatusError(action))
|
||||
return true, nil, err
|
||||
})
|
||||
},
|
||||
|
@ -88,13 +82,7 @@ func TestResourceNotFoundError(t *testing.T) {
|
|||
// dynamicClient converts Status objects from the apiserver into errors.
|
||||
// So we can just return the right error here to simulate an error from
|
||||
// the apiserver.
|
||||
name := "" // unused by LIST requests
|
||||
// The apisevrer confusingly does not return apierrors.NewNotFound,
|
||||
// which has a nice constant for its error message.
|
||||
// err = apierrors.NewNotFound(exampleGR, name)
|
||||
// Instead it uses apierrors.NewGenericServerResponse, which uses
|
||||
// a hard-coded error message.
|
||||
err = apierrors.NewGenericServerResponse(http.StatusNotFound, "list", exampleGR, name, "unused", -1, false)
|
||||
err = newGenericServerResponse(action, newNotFoundResourceStatusError(action))
|
||||
return true, nil, err
|
||||
})
|
||||
},
|
||||
|
@ -110,7 +98,67 @@ func TestResourceNotFoundError(t *testing.T) {
|
|||
t.Errorf("Expected typed NotFound error, but got untyped NotFound error: %v", err)
|
||||
default:
|
||||
// If we got this error, the test is probably broken.
|
||||
t.Errorf("Expected untyped NotFound error, but got a different error: %v", err)
|
||||
t.Errorf("Expected typed NotFound error, but got a different error: %v", err)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "List resource forbidden error",
|
||||
setup: func(fakeClient *dynamicfake.FakeDynamicClient) {
|
||||
fakeClient.PrependReactor("list", exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
|
||||
listAction := action.(clienttesting.ListAction)
|
||||
if listAction.GetNamespace() != namespace {
|
||||
assert.Fail(t, "Received unexpected LIST namespace: %s", listAction.GetNamespace())
|
||||
return false, nil, nil
|
||||
}
|
||||
// dynamicClient converts Status objects from the apiserver into errors.
|
||||
// So we can just return the right error here to simulate an error from
|
||||
// the apiserver.
|
||||
err = newGenericServerResponse(action, newForbiddenResourceStatusError(action))
|
||||
return true, nil, err
|
||||
})
|
||||
},
|
||||
errorHandler: func(t *testing.T, err error) {
|
||||
switch {
|
||||
case apierrors.IsForbidden(err):
|
||||
// If we got this error, something changed in the apiserver or
|
||||
// client. If the client changed, it might be safe to stop parsing
|
||||
// the error string.
|
||||
t.Errorf("Expected untyped Forbidden error, but got typed Forbidden error: %v", err)
|
||||
case containsForbiddenMessage(err):
|
||||
// This is the expected hack, because the Informer/Reflector
|
||||
// doesn't wrap the error with "%w".
|
||||
t.Logf("Received expected untyped Forbidden error: %v", err)
|
||||
default:
|
||||
// If we got this error, the test is probably broken.
|
||||
t.Errorf("Expected untyped Forbidden error, but got a different error: %v", err)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Watch resource forbidden error",
|
||||
setup: func(fakeClient *dynamicfake.FakeDynamicClient) {
|
||||
fakeClient.PrependWatchReactor(exampleGR.Resource, func(action clienttesting.Action) (handled bool, ret watch.Interface, err error) {
|
||||
// dynamicClient converts Status objects from the apiserver into errors.
|
||||
// So we can just return the right error here to simulate an error from
|
||||
// the apiserver.
|
||||
err = newGenericServerResponse(action, newForbiddenResourceStatusError(action))
|
||||
return true, nil, err
|
||||
})
|
||||
},
|
||||
errorHandler: func(t *testing.T, err error) {
|
||||
switch {
|
||||
case apierrors.IsForbidden(err):
|
||||
// This is the expected behavior, because the
|
||||
// Informer/Reflector DOES wrap watch errors
|
||||
t.Logf("Received expected untyped Forbidden error: %v", err)
|
||||
case containsForbiddenMessage(err):
|
||||
// If this happens, there was a regression.
|
||||
// Watch errors are expected to be wrapped with "%w"
|
||||
t.Errorf("Expected typed Forbidden error, but got untyped Forbidden error: %v", err)
|
||||
default:
|
||||
// If we got this error, the test is probably broken.
|
||||
t.Errorf("Expected typed Forbidden error, but got a different error: %v", err)
|
||||
}
|
||||
},
|
||||
},
|
||||
|
@ -164,3 +212,43 @@ func TestResourceNotFoundError(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// newForbiddenResourceStatusError emulates a Forbidden error from the apiserver
|
||||
// for a namespace-scoped resource.
|
||||
// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L36
|
||||
func newForbiddenResourceStatusError(action clienttesting.Action) *apierrors.StatusError {
|
||||
username := "unused"
|
||||
verb := action.GetVerb()
|
||||
resource := action.GetResource().Resource
|
||||
if subresource := action.GetSubresource(); len(subresource) > 0 {
|
||||
resource = resource + "/" + subresource
|
||||
}
|
||||
apiGroup := action.GetResource().Group
|
||||
namespace := action.GetNamespace()
|
||||
|
||||
// https://github.com/kubernetes/apiserver/blob/master/pkg/endpoints/handlers/responsewriters/errors.go#L51
|
||||
err := fmt.Errorf("User %q cannot %s resource %q in API group %q in the namespace %q",
|
||||
username, verb, resource, apiGroup, namespace)
|
||||
|
||||
qualifiedResource := action.GetResource().GroupResource()
|
||||
name := "" // unused by ListAndWatch
|
||||
return apierrors.NewForbidden(qualifiedResource, name, err)
|
||||
}
|
||||
|
||||
// newNotFoundResourceStatusError emulates a NotFOund error from the apiserver
|
||||
// for a resource (not an object).
|
||||
func newNotFoundResourceStatusError(action clienttesting.Action) *apierrors.StatusError {
|
||||
qualifiedResource := action.GetResource().GroupResource()
|
||||
name := "" // unused by ListAndWatch
|
||||
return apierrors.NewNotFound(qualifiedResource, name)
|
||||
}
|
||||
|
||||
// newGenericServerResponse emulates a StatusError from the apiserver.
|
||||
func newGenericServerResponse(action clienttesting.Action, statusError *apierrors.StatusError) *apierrors.StatusError {
|
||||
errorCode := int(statusError.ErrStatus.Code)
|
||||
verb := action.GetVerb()
|
||||
qualifiedResource := action.GetResource().GroupResource()
|
||||
name := statusError.ErrStatus.Details.Name
|
||||
// https://github.com/kubernetes/apimachinery/blob/v0.24.0/pkg/api/errors/errors.go#L435
|
||||
return apierrors.NewGenericServerResponse(errorCode, verb, qualifiedResource, name, statusError.Error(), -1, false)
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
|
||||
)
|
||||
|
||||
|
@ -37,6 +38,7 @@ func newEventFunnel(ctx context.Context) *eventFunnel {
|
|||
go func() {
|
||||
defer func() {
|
||||
// Don't close counterCh, otherwise AddInputChannel may panic.
|
||||
klog.V(5).Info("Closing funnel")
|
||||
close(funnel.outCh)
|
||||
close(funnel.doneCh)
|
||||
}()
|
||||
|
@ -48,6 +50,7 @@ func newEventFunnel(ctx context.Context) *eventFunnel {
|
|||
select {
|
||||
case delta := <-funnel.counterCh:
|
||||
inputs += delta
|
||||
klog.V(5).Infof("Funnel input channels (%+d): %d", delta, inputs)
|
||||
case <-ctxDoneCh:
|
||||
// Stop waiting for context closure.
|
||||
// Nil channel avoids busy waiting.
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -92,7 +91,9 @@ type ObjectStatusReporter struct {
|
|||
// ObjectFilter is used to decide which objects to ingore.
|
||||
ObjectFilter ObjectFilter
|
||||
|
||||
// TODO: handle automatic?
|
||||
// RESTScope specifies whether to ListAndWatch resources at the namespace
|
||||
// or cluster (root) level. Using root scope is more efficient, but
|
||||
// namespace scope may require fewer permissions.
|
||||
RESTScope meta.RESTScope
|
||||
|
||||
// lock guards modification of the subsequent stateful fields
|
||||
|
@ -220,10 +221,15 @@ func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event {
|
|||
return w.funnel.OutputChannel()
|
||||
}
|
||||
|
||||
// Stop triggers the cancellation of the reporter context, and closure of the
|
||||
// event channel without sending an error event.
|
||||
func (w *ObjectStatusReporter) Stop() {
|
||||
klog.V(4).Info("Stopping reporter")
|
||||
w.cancel()
|
||||
}
|
||||
|
||||
// HasSynced returns true if all the started informers have been synced.
|
||||
//
|
||||
// TODO: provide a callback function, channel, or event to avoid needing to block with a rety loop.
|
||||
//
|
||||
// Use the following to block waiting for synchronization:
|
||||
// synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
|
||||
func (w *ObjectStatusReporter) HasSynced() bool {
|
||||
|
@ -322,17 +328,6 @@ func (w *ObjectStatusReporter) startInformerNow(
|
|||
|
||||
informer := w.InformerFactory.NewInformer(ctx, mapping, gkn.Namespace)
|
||||
|
||||
// Handler called when ListAndWatch errors.
|
||||
// Custom handler stops the informer if the resource is NotFound (CRD deleted).
|
||||
err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
||||
w.watchErrorHandler(gkn, err)
|
||||
})
|
||||
if err != nil {
|
||||
// Should never happen.
|
||||
// Informer can't have started yet. We just created it.
|
||||
return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err)
|
||||
}
|
||||
|
||||
w.informerRefs[gkn].SetInformer(informer)
|
||||
|
||||
eventCh := make(chan event.Event)
|
||||
|
@ -344,6 +339,17 @@ func (w *ObjectStatusReporter) startInformerNow(
|
|||
return fmt.Errorf("informer failed to build event handler: %w", err)
|
||||
}
|
||||
|
||||
// Handler called when ListAndWatch errors.
|
||||
// Custom handler stops the informer if the resource is NotFound (CRD deleted).
|
||||
err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
|
||||
w.watchErrorHandler(gkn, eventCh, err)
|
||||
})
|
||||
if err != nil {
|
||||
// Should never happen.
|
||||
// Informer can't have started yet. We just created it.
|
||||
return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err)
|
||||
}
|
||||
|
||||
informer.AddEventHandler(w.eventHandler(ctx, eventCh))
|
||||
|
||||
// Start the informer in the background.
|
||||
|
@ -699,65 +705,61 @@ func (w *ObjectStatusReporter) newStatusCheckTaskFunc(
|
|||
}
|
||||
|
||||
func (w *ObjectStatusReporter) handleFatalError(eventCh chan<- event.Event, err error) {
|
||||
klog.V(5).Infof("Reporter error: %v", err)
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
klog.V(5).Infof("Watch closed: %v", err)
|
||||
return
|
||||
}
|
||||
eventCh <- event.Event{
|
||||
Type: event.ErrorEvent,
|
||||
Error: err,
|
||||
}
|
||||
// Terminate the reporter.
|
||||
w.cancel()
|
||||
w.Stop()
|
||||
}
|
||||
|
||||
// watchErrorHandler logs errors and cancels the informer for this GroupKind
|
||||
// if the NotFound error is received, which usually means the CRD was deleted.
|
||||
// Based on DefaultWatchErrorHandler from k8s.io/client-go@v0.23.2/tools/cache/reflector.go
|
||||
func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, err error) {
|
||||
// Note: Informers use a stop channel, not a Context, so we don't expect
|
||||
// Canceled or DeadlineExceeded here.
|
||||
func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, eventCh chan<- event.Event, err error) {
|
||||
switch {
|
||||
// Stop channel closed
|
||||
case err == io.EOF:
|
||||
// Stop channel closed
|
||||
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
|
||||
case err == io.ErrUnexpectedEOF:
|
||||
// Keep retrying
|
||||
klog.V(1).Infof("Watch closed: %v: %v", gkn, err)
|
||||
case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
|
||||
// Context cancelled
|
||||
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
|
||||
case apierrors.IsResourceExpired(err): // resourceVersion too old
|
||||
// Keep retrying
|
||||
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
|
||||
case apierrors.IsGone(err): // DEPRECATED
|
||||
// Keep retrying
|
||||
klog.V(5).Infof("Watch closed: %v: %v", gkn, err)
|
||||
case apierrors.IsNotFound(err) || containsNotFoundMessage(err): // CRD deleted or not created
|
||||
// Stop watching this resource
|
||||
klog.V(3).Infof("Watch error: %v: stopping all watchers for this GroupKind: %v", gkn, err)
|
||||
klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
|
||||
|
||||
// Stop all informers for this GK
|
||||
// Watch connection closed
|
||||
case err == io.ErrUnexpectedEOF:
|
||||
klog.V(1).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
|
||||
|
||||
// Context done
|
||||
case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
|
||||
klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
|
||||
|
||||
// resourceVersion too old
|
||||
case apierrors.IsResourceExpired(err):
|
||||
// Keep retrying
|
||||
klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
|
||||
|
||||
// Resource unregistered (DEPRECATED, see NotFound)
|
||||
case apierrors.IsGone(err):
|
||||
klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
|
||||
|
||||
// Resource not registered
|
||||
case apierrors.IsNotFound(err) || containsNotFoundMessage(err):
|
||||
klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers for this GroupKind: %v", gkn, err)
|
||||
w.forEachTargetWithGroupKind(gkn.GroupKind(), func(gkn GroupKindNamespace) {
|
||||
w.stopInformer(gkn)
|
||||
})
|
||||
|
||||
// Insufficient permissions
|
||||
case apierrors.IsForbidden(err) || containsForbiddenMessage(err):
|
||||
klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers: %v", gkn, err)
|
||||
w.handleFatalError(eventCh, err)
|
||||
|
||||
// Unexpected error
|
||||
default:
|
||||
// Keep retrying
|
||||
klog.Warningf("Watch error (will retry): %v: %v", gkn, err)
|
||||
klog.Warningf("ListAndWatch error (retry expected): %v: %v", gkn, err)
|
||||
}
|
||||
}
|
||||
|
||||
// resourceNotFoundMessage is the condition message for metav1.StatusReasonNotFound.
|
||||
const resourceNotFoundMessage = "the server could not find the requested resource"
|
||||
|
||||
// containsNotFoundMessage checks if the error string contains the message for
|
||||
// StatusReasonNotFound.
|
||||
// See k8s.io/apimachinery@v0.23.2/pkg/api/errors/errors.go
|
||||
// This is necessary because the Informer doesn't properly wrap errors.
|
||||
func containsNotFoundMessage(err error) bool {
|
||||
return strings.Contains(err.Error(), resourceNotFoundMessage)
|
||||
}
|
||||
|
||||
// informerReference tracks informer lifecycle.
|
||||
type informerReference struct {
|
||||
// lock guards the subsequent stateful fields
|
||||
|
|
Loading…
Reference in New Issue