refactor the grace system (#226)

Signed-off-by: yunbo <yunbo10124scut@gmail.com>
Co-authored-by: yunbo <yunbo10124scut@gmail.com>
This commit is contained in:
myname4423 2024-08-13 15:17:38 +08:00 committed by GitHub
parent 78273c2998
commit 5378dc2cf7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 997 additions and 422 deletions

View File

@ -76,19 +76,18 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
klog.Infof("rollout(%s/%s) canary step jumped", c.Rollout.Namespace, c.Rollout.Name)
return nil
}
gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.Rollout.Spec.Strategy.GetTrafficRouting(), defaultGracePeriodSeconds)
// When the first batch is trafficRouting rolling and the next steps are rolling release,
// We need to clean up the canary-related resources first and then rollout the rest of the batch.
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
if currentStep.Traffic == nil && len(currentStep.Matches) == 0 {
tr := newTrafficRoutingContext(c)
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil {
return err
} else if !done {
klog.Infof("rollout(%s/%s) cleaning up canary-related resources", c.Rollout.Namespace, c.Rollout.Name)
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
expectedTime := time.Now().Add(tr.RecheckDuration)
c.RecheckTime = &expectedTime
return nil
}
@ -116,11 +115,11 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true)
if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) {
klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.RestoreStableService(tr)
retry, err := m.trafficRoutingManager.RestoreStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
} else if retry {
expectedTime := time.Now().Add(tr.RecheckDuration)
c.RecheckTime = &expectedTime
return nil
}
@ -146,11 +145,11 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
if canaryStatus.CurrentStepIndex == 1 {
if !tr.DisableGenerateCanaryService {
klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.PatchStableService(tr)
retry, err := m.trafficRoutingManager.PatchStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
} else if retry {
expectedTime := time.Now().Add(tr.RecheckDuration)
c.RecheckTime = &expectedTime
return nil
}
@ -195,7 +194,13 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateTrafficRouting, canaryStatus.CurrentStepState)
}
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
// in two cases, we should wait the default grace period
// - a period after CanaryStepStateUpgrade is just done (https://github.com/openkruise/rollouts/pull/185)
// - a period after CanaryStepStateTrafficRouting is just done
if tr.RecheckDuration <= 0 {
tr.RecheckDuration = time.Duration(trafficrouting.GetGraceSeconds(c.Rollout.Spec.Strategy.GetTrafficRouting(), defaultGracePeriodSeconds)) * time.Second
}
expectedTime := time.Now().Add(tr.RecheckDuration)
c.RecheckTime = &expectedTime
case v1beta1.CanaryStepStateMetricsAnalysis:
@ -369,7 +374,7 @@ func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, erro
}
tr := newTrafficRoutingContext(c)
// 2. remove stable service the pod revision selector, so stable service will be selector all version pods.
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr, true)
done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err
@ -380,7 +385,7 @@ func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, erro
return done, err
}
// 4. modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods.
done, err = m.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
done, err = m.trafficRoutingManager.FinalisingTrafficRouting(tr)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err

View File

@ -88,6 +88,7 @@ var (
Ingress: &v1beta1.IngressTrafficRouting{
Name: "echoserver",
},
GracePeriodSeconds: 0, // To facilitate testing, don't wait after traffic routing operation
},
},
},

View File

@ -431,11 +431,6 @@ func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error)
if subStatus == nil {
return true, nil
}
gracePeriodSeconds := c.Rollout.Spec.Strategy.GetTrafficRouting()[0].GracePeriodSeconds
// To ensure respect for graceful time between these steps, we set start timer before the first step
if len(subStatus.FinalisingStep) == 0 {
subStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
}
tr := newTrafficRoutingContext(c)
klog.Infof("rollout(%s/%s) Finalising Step is %s", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep)
switch subStatus.FinalisingStep {
@ -446,21 +441,11 @@ func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error)
// firstly, restore the gateway resources (ingress/gatewayAPI/Istio), that means
// only stable Service will accept the traffic
case v1beta1.FinalisingStepTypeGateway:
//TODO - RestoreGateway returns (bool, error) pair instead of error only.
// return (fasle, nil): gateway is patched successfully, but we need time to observe; recheck later
// return (true, nil): gateway is patched successfully, and accepts the update successfully; go to next step then
// return (false, error): gateway encounters error when patched, or the update is not accepted; recheck later
err := r.trafficRoutingManager.RestoreGateway(tr)
if err != nil {
retry, err := r.trafficRoutingManager.RestoreGateway(tr)
if err != nil || retry {
subStatus.LastUpdateTime = tr.LastUpdateTime
return false, err
}
// usually, GracePeriodSeconds means duration to wait after an operation is done,
// we use defaultGracePeriodSeconds+1 here because the timer started before the RestoreGateway step
if subStatus.LastUpdateTime != nil && time.Since(subStatus.LastUpdateTime.Time) < time.Second*time.Duration(gracePeriodSeconds+1) {
klog.Infof("rollout(%s/%s) in step (%s), and wait %d seconds", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep, gracePeriodSeconds+1)
return false, nil
}
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep)
subStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
subStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR
@ -490,7 +475,8 @@ func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error)
first step of v3 release.
*/
case v1beta1.FinalisingStepTypeDeleteCanaryService:
err := r.trafficRoutingManager.RemoveCanaryService(tr)
// ignore the grace period because it is the last step
_, err := r.trafficRoutingManager.RemoveCanaryService(tr)
if err != nil {
subStatus.LastUpdateTime = tr.LastUpdateTime
return false, err

View File

@ -533,7 +533,7 @@ func TestReconcileRolloutProgressing(t *testing.T) {
},
},
{
name: "ReconcileRolloutProgressing rolling -> continueRelease1",
name: "ReconcileRolloutProgressing rolling -> continueRelease1", // add grace time to test the first step: restoring the gateway
getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) {
dep1 := deploymentDemo.DeepCopy()
dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
@ -558,10 +558,14 @@ func TestReconcileRolloutProgressing(t *testing.T) {
return []*apps.Deployment{dep1, dep2}, []*apps.ReplicaSet{rs1, rs2}
},
getNetwork: func() ([]*corev1.Service, []*netv1.Ingress) {
return []*corev1.Service{demoService.DeepCopy()}, []*netv1.Ingress{demoIngress.DeepCopy()}
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = c2.Name + "-canary"
return []*corev1.Service{demoService.DeepCopy()}, []*netv1.Ingress{c1, c2}
},
getRollout: func() (*v1beta1.Rollout, *v1beta1.BatchRelease, *v1alpha1.TrafficRouting) {
obj := rolloutDemo.DeepCopy()
obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1 // add grace time to test fine step in continuous logic
obj.Status.CanaryStatus.ObservedWorkloadGeneration = 2
obj.Status.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd"
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
@ -635,7 +639,7 @@ func TestReconcileRolloutProgressing(t *testing.T) {
obj.Status.CanaryStatus.CanaryReadyReplicas = 3
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
obj.Status.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService
obj.Status.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
util.SetRolloutCondition(&obj.Status, *cond)

View File

@ -123,13 +123,13 @@ func (r *TrafficRoutingReconciler) Reconcile(ctx context.Context, req ctrl.Reque
done, err = r.trafficRoutingManager.DoTrafficRouting(newTrafficRoutingContext(tr))
}
case v1alpha1.TrafficRoutingPhaseFinalizing:
done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr), false)
done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr))
if done {
newStatus.Phase = v1alpha1.TrafficRoutingPhaseHealthy
newStatus.Message = "TrafficRouting is Healthy"
}
case v1alpha1.TrafficRoutingPhaseTerminating:
done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr), false)
done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr))
if done {
// remove trafficRouting finalizer
err = r.handleFinalizer(tr)

View File

@ -27,12 +27,13 @@ import (
"github.com/openkruise/rollouts/pkg/trafficrouting/network/gateway"
"github.com/openkruise/rollouts/pkg/trafficrouting/network/ingress"
"github.com/openkruise/rollouts/pkg/util"
"github.com/openkruise/rollouts/pkg/util/grace"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
utilpointer "k8s.io/utils/pointer"
"k8s.io/utils/integer"
"sigs.k8s.io/controller-runtime/pkg/client"
)
@ -59,6 +60,8 @@ type TrafficRoutingContext struct {
LastUpdateTime *metav1.Time
// won't work for Ingress and Gateway
DisableGenerateCanaryService bool
// recheck time
RecheckDuration time.Duration
}
// Manager responsible for adjusting network resources
@ -158,8 +161,6 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
klog.Errorf("%s patch canary service(%s) selector failed: %s", c.Key, canaryService.Name, err.Error())
return false, err
}
// update canary service time, and wait 3 seconds, just to be safe
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("%s patch canary service(%s) selector(%s=%s) success",
c.Key, canaryService.Name, c.RevisionLabelKey, c.CanaryRevision)
serviceModified = true
@ -172,12 +173,12 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
return false, err
}
serviceModified = true
// update stable service time, and wait 3 seconds, just to be safe
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("add %s stable service(%s) selector(%s=%s) success",
c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision)
}
if serviceModified {
// modification occurred, wait a grace period
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
return false, nil
}
}
@ -199,211 +200,139 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
return true, nil
}
func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestoreStableService bool) (bool, error) {
func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName)
if err != nil {
klog.Errorf("%s newTrafficRoutingController failed: %s", c.Key, err.Error())
return false, err
}
noCanaryService := c.OnlyTrafficRouting || c.DisableGenerateCanaryService
// The "already-finalised" conditions of this FinalisingTrafficRouting function are:
// 1. the stable service has no selector
// 2. AND canary service has been cleaned up
var stableServiceRestored, canaryServiceRemoved bool
// check condition 1
stableService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: trafficRouting.Service}}
if err := m.Get(context.TODO(), client.ObjectKeyFromObject(stableService), stableService); err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
return false, err
} else {
stableServiceRestored = errors.IsNotFound(err) || stableService.Spec.Selector[c.RevisionLabelKey] == ""
}
// check condition 2
cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
if err := m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s get canary service(%s) failed: %s", c.Key, cServiceName, err.Error())
return false, err
} else {
// if noCanaryService is true, we have never created canary service
canaryServiceRemoved = errors.IsNotFound(err) || noCanaryService
}
// only if both conditions are met
if canaryServiceRemoved && stableServiceRestored {
/*
even both of the conditions are met, we call Finalise
1. In rollout failure case, this step ensures that the canary-ingress can be deleted in a time.
2. For scenario that noCanaryService is true, stable Service is never patched, canary Service is never created,
What we need to do is just to call Finalise
note that, calling Finalise even for multiple times is not a big thing:
1. it does nothing if it has already called once, since the corresponding annotation has been cleared
2. the Finalish won't wait graceful period for now
*/
if err = trController.Finalise(context.TODO()); err != nil {
return false, err
}
return true, nil
}
klog.Infof("%s start finalising traffic routing", c.Key)
// remove stable service the pod revision selector, so stable service will be selector all version pods.
verify, err := m.restoreStableService(c)
if err != nil || !verify {
if retry, err := m.RestoreStableService(c); err != nil || retry {
return false, err
} else if onlyRestoreStableService {
return true, nil
}
// First route 100% traffic to stable service
c.Strategy.Traffic = utilpointer.StringPtr("0%")
verify, err = trController.EnsureRoutes(context.TODO(), &c.Strategy)
if err != nil {
return false, err
} else if !verify {
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
return false, nil
}
if c.LastUpdateTime != nil {
// After restore the stable service configuration, give network provider 3 seconds to react
if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) {
klog.Infof("%s route 100% traffic to stable service, and wait a moment", c.Key)
return false, nil
}
}
klog.Infof("%s restore stable service success", c.Key)
// modify network(ingress & gateway api) configuration, route all traffic to stable service
if err = trController.Finalise(context.TODO()); err != nil {
if retry, err := m.RestoreGateway(c); err != nil || retry {
return false, err
}
// end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service;
// because canary service is stable service (ie. no external canary service was created at all)
if !noCanaryService {
// remove canary service
err = m.Delete(context.TODO(), cService)
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error())
return false, err
}
klog.Infof("%s remove canary service(%s) success", c.Key, cService.Name)
klog.Infof("%s restore gateway success", c.Key)
// remove canary service
if retry, err := m.RemoveCanaryService(c); err != nil || retry {
return false, err
}
klog.Infof("%s remove canary service success, finalising traffic routing is done", c.Key)
return true, nil
}
// RestoreGateway restore gateway resources without graceful time
func (m *Manager) RestoreGateway(c *TrafficRoutingContext) error {
// returns:
// - if error is not nil, usually we need to retry later. Only if error is nil, we consider the bool.
// - The bool value indicates whether retry is needed. If true, it usually means
// gateway resources have been updated and we need to wait for `graceSeconds`.
//
// only if error is nil AND retry is false, this calling can be considered as completed
func (m *Manager) RestoreGateway(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return nil
return false, nil
}
trafficRouting := c.ObjectRef[0]
cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName)
// build up the network provider
stableService := c.ObjectRef[0].Service
cServiceName := getCanaryServiceName(stableService, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
trController, err := newNetworkProvider(m.Client, c, stableService, cServiceName)
if err != nil {
klog.Errorf("%s newTrafficRoutingController failed: %s", c.Key, err.Error())
return err
return false, err
}
return trController.Finalise(context.TODO())
}
// RemoveCanaryService find and delete canary Service. stable Service won't be modified
func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) error {
if len(c.ObjectRef) == 0 {
return nil
}
trafficRouting := c.ObjectRef[0]
cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
// end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service;
// because canary service is stable service (ie. no external canary service was created at all)
if !(c.OnlyTrafficRouting || c.DisableGenerateCanaryService) {
// remove canary service
err := m.Delete(context.TODO(), cService)
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error())
return err
// restore Gateway/Ingress/Istio
graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds)
retry, remaining, err := grace.RunWithGraceSeconds(string(c.OwnerRef.UID), "restoreGateway", graceSeconds, func() (bool, error) {
modified, err := trController.Finalise(context.TODO())
if modified {
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
}
klog.Infof("%s remove canary service(%s) success", c.Key, cService.Name)
}
return nil
return modified, err
})
UpdateRecheckDuration(c, remaining)
return retry, err
}
// returning (false, nil) means the update has been submitted, and no error occurred
// but we need to wait graceful time before returning true
// returns:
// - if error is not nil, usually we need to retry later. Only if error is nil, we consider the bool.
// - The bool value indicates whether retry is needed. If true, it usually means
// canary service has been deleted and we need to wait for `graceSeconds`.
//
// only if error is nil AND retry is false, this calling can be considered as completed
func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return false, nil
}
// end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service;
// because canary service is stable service (ie. canary service is never created from the beginning)
if c.OnlyTrafficRouting || c.DisableGenerateCanaryService {
return false, nil
}
cServiceName := getCanaryServiceName(c.ObjectRef[0].Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
key := types.NamespacedName{
Namespace: c.Namespace,
Name: cServiceName,
}
// remove canary service
graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds)
retry, remaining, err := grace.RunWithGraceSeconds(key.String(), "removeCanaryService", graceSeconds, func() (bool, error) {
err := m.Delete(context.TODO(), cService)
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error())
return false, err
}
return true, nil
})
UpdateRecheckDuration(c, remaining)
return retry, err
}
// returns:
// - if error is not nil, usually we need to retry later. Only if error is nil, we consider the bool.
// - The bool value indicates whether retry is needed. If true, it usually means
// stable service has been updated (ie. patched) and we need to wait for `graceSeconds`.
//
// only if error is nil AND retry is false, this calling can be considered as completed
func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
return false, nil
}
if c.OnlyTrafficRouting || c.DisableGenerateCanaryService {
return true, nil
}
gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.ObjectRef, defaultGracePeriodSeconds)
trafficRouting := c.ObjectRef[0]
//fetch stable service
// fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
serviceName := c.ObjectRef[0].Service
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: serviceName}, stableService)
if err != nil {
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
// not found, wait a moment, retry
if errors.IsNotFound(err) {
return false, nil
}
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, serviceName, err.Error())
return false, err
}
if stableService.Spec.Selector[c.RevisionLabelKey] == c.StableRevision {
if c.LastUpdateTime == nil {
return true, nil
}
if time.Since(c.LastUpdateTime.Time) < time.Second*time.Duration(gracePeriodSeconds) {
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, gracePeriodSeconds)
return false, nil
}
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success and complete", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision)
return true, nil
}
// patch stable service to only select the stable pods
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.StableRevision)
if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch stable service(%s) selector failed: %s", c.Key, stableService.Name, err.Error())
return false, err
}
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, gracePeriodSeconds)
return false, nil
}
// returning (false, nil) means the update has been submitted, and no error occurred
// but we need to wait graceful time before returning true
func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
if errors.IsNotFound(err) {
return true, nil
}
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
return false, err
}
// restore stable Service
verify, err := m.restoreStableService(c)
if err != nil || !verify {
return false, err
}
return true, nil
graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds)
retry, remaining, err := grace.RunWithGraceSeconds(string(stableService.UID), "patchService", graceSeconds, func() (bool, error) {
modified := false
if stableService.Spec.Selector[c.RevisionLabelKey] != c.StableRevision {
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.StableRevision)
if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch stable service(%s) selector failed: %s", c.Key, stableService.Name, err.Error())
return false, err
}
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
modified = true
}
return modified, nil
})
UpdateRecheckDuration(c, remaining)
return retry, err
}
func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, cService string) (network.NetworkProvider, error) {
@ -475,41 +404,46 @@ func (m *Manager) createCanaryService(c *TrafficRoutingContext, cService string,
}
// remove stable service the pod revision selector, so stable service will be selector all version pods.
func (m *Manager) restoreStableService(c *TrafficRoutingContext) (bool, error) {
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
if errors.IsNotFound(err) {
return true, nil
}
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
return false, err
}
if stableService.Spec.Selector[c.RevisionLabelKey] != "" {
body := fmt.Sprintf(`{"spec":{"selector":{"%s":null}}}`, c.RevisionLabelKey)
if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
return false, err
}
klog.Infof("remove %s stable service(%s) pod revision selector, and wait a moment", c.Key, trafficRouting.Service)
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
// returns:
// - if error is not nil, usually we need to retry later. Only if error is nil, we consider the bool.
// - The bool value indicates whether retry is needed. If true, it usually means
// stable service has been updated (ie. restored) and we need to wait for `graceSeconds`.
//
// only if error is nil AND retry is false, this calling can be considered as completed
func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return false, nil
}
if c.LastUpdateTime == nil {
// fetch the stable Service
stableService := &corev1.Service{}
serviceName := c.ObjectRef[0].Service
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: serviceName}, stableService)
if errors.IsNotFound(err) {
return true, nil
}
// After restore the stable service configuration, give network provider 3 seconds to react
if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) {
klog.Infof("%s restoring stable service(%s), and wait a moment", c.Key, trafficRouting.Service)
return false, nil
if err != nil {
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, serviceName, err.Error())
return false, err
}
klog.Infof("%s doFinalising stable service(%s) success", c.Key, trafficRouting.Service)
return true, nil
// restore stable Service
graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds)
retry, remaining, err := grace.RunWithGraceSeconds(string(stableService.UID), "restoreService", graceSeconds, func() (bool, error) {
modified := false
if stableService.Spec.Selector[c.RevisionLabelKey] != "" {
body := fmt.Sprintf(`{"spec":{"selector":{"%s":null}}}`, c.RevisionLabelKey)
if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch stable service(%s) failed: %s", c.Key, serviceName, err.Error())
return false, err
}
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
modified = true
}
return modified, nil
})
UpdateRecheckDuration(c, remaining)
return retry, err
}
func getCanaryServiceName(sService string, onlyTrafficRouting bool, disableGenerateCanaryService bool) string {
@ -518,3 +452,27 @@ func getCanaryServiceName(sService string, onlyTrafficRouting bool, disableGener
}
return fmt.Sprintf("%s-canary", sService)
}
func GetGraceSeconds(refs []v1beta1.TrafficRoutingRef, defaultSeconds int32) (graceSeconds int32) {
if len(refs) == 0 {
klog.Infof("no trafficRoutingRef, use defaultGracePeriodSeconds(%d)", defaultSeconds)
return defaultSeconds
}
for i := range refs {
graceSeconds = integer.Int32Max(graceSeconds, refs[i].GracePeriodSeconds)
}
// user may intentionally set graceSeconds as 0 (if not provided, defaults to 3)
// we respect it
if graceSeconds < 0 {
klog.Infof("negative graceSeconds(%d), use defaultGracePeriodSeconds(%d)", graceSeconds, defaultSeconds)
return defaultSeconds
}
klog.Infof("use graceSeconds(%d)", graceSeconds)
return
}
func UpdateRecheckDuration(c *TrafficRoutingContext, remaining time.Duration) {
if c.RecheckDuration < remaining {
c.RecheckDuration = remaining
}
}

View File

@ -29,6 +29,7 @@ import (
"github.com/openkruise/rollouts/api/v1beta1"
"github.com/openkruise/rollouts/pkg/util"
"github.com/openkruise/rollouts/pkg/util/configuration"
"github.com/openkruise/rollouts/pkg/util/grace"
apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
@ -156,6 +157,8 @@ var (
Ingress: &v1beta1.IngressTrafficRouting{
Name: "echoserver",
},
// webhook doesn't work in unit test, we manually set gracePeriodSeconds to 1
GracePeriodSeconds: 1,
},
},
},
@ -932,15 +935,16 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
func TestFinalisingTrafficRouting(t *testing.T) {
cases := []struct {
name string
getObj func() ([]*corev1.Service, []*netv1.Ingress)
getRollout func() (*v1beta1.Rollout, *util.Workload)
onlyRestoreStableService bool
expectObj func() ([]*corev1.Service, []*netv1.Ingress)
expectDone bool
name string
getObj func() ([]*corev1.Service, []*netv1.Ingress)
getRollout func() (*v1beta1.Rollout, *util.Workload)
onlyTrafficRouting bool
expectObj func() ([]*corev1.Service, []*netv1.Ingress)
expectNotFound func() ([]*corev1.Service, []*netv1.Ingress)
expectDone bool
}{
{
name: "FinalisingTrafficRouting test1",
name: "FinalisingTrafficRouting test1", // will restore the stable service
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s1.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v1"
@ -962,7 +966,6 @@ func TestFinalisingTrafficRouting(t *testing.T) {
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyRestoreStableService: true,
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
@ -976,10 +979,13 @@ func TestFinalisingTrafficRouting(t *testing.T) {
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2}
},
expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) {
return nil, nil
},
expectDone: false,
},
{
name: "FinalisingTrafficRouting test2",
name: "stable Service already clear", // will restore the gateway
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
@ -997,129 +1003,137 @@ func TestFinalisingTrafficRouting(t *testing.T) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(time.Hour)}
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyRestoreStableService: true,
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2"
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true"
c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100"
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2}
},
expectDone: false,
},
{
name: "FinalisingTrafficRouting test3",
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2"
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true"
c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100"
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2}
},
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyRestoreStableService: true,
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2"
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true"
c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100"
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2}
},
expectDone: true,
},
{
name: "FinalisingTrafficRouting test4",
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2"
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true"
c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100"
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2}
},
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-3 * time.Second)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyRestoreStableService: false,
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2"
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true"
c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "0"
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2}
},
expectDone: false,
},
{
name: "FinalisingTrafficRouting test5",
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2"
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true"
c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "0"
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2}
},
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-3 * time.Second)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyRestoreStableService: false,
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1}, []*netv1.Ingress{c1}
},
expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) {
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
return nil, []*netv1.Ingress{c2}
},
expectDone: false,
},
{
name: "gateway already restored", // will remove the canary service
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2"
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1}
},
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1}, []*netv1.Ingress{c1}
},
expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) {
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
return []*corev1.Service{s2}, []*netv1.Ingress{c2}
},
expectDone: false,
},
{
name: "canary Service already clear", // all is done
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1}, []*netv1.Ingress{c1}
},
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1}, []*netv1.Ingress{c1}
},
expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) {
s2 := demoService.DeepCopy()
s2.Name = "echoserver-canary"
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
return []*corev1.Service{s2}, []*netv1.Ingress{c2}
},
expectDone: true,
},
{
name: "OnlyTrafficRouting true - test 1",
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true"
c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100"
c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary"
return []*corev1.Service{s1}, []*netv1.Ingress{c1, c2}
},
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyTrafficRouting: true,
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1}, []*netv1.Ingress{c1}
},
expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) {
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
return nil, []*netv1.Ingress{c2}
},
expectDone: false,
},
{
name: "OnlyTrafficRouting true - test 2",
getObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1}, []*netv1.Ingress{c1}
},
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyTrafficRouting: true,
expectObj: func() ([]*corev1.Service, []*netv1.Ingress) {
s1 := demoService.DeepCopy()
c1 := demoIngress.DeepCopy()
return []*corev1.Service{s1}, []*netv1.Ingress{c1}
},
expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) {
c2 := demoIngress.DeepCopy()
c2.Name = "echoserver-canary"
return nil, []*netv1.Ingress{c2}
},
expectDone: true,
},
}
@ -1138,18 +1152,19 @@ func TestFinalisingTrafficRouting(t *testing.T) {
newStatus := rollout.Status.DeepCopy()
currentStep := rollout.Spec.Strategy.Canary.Steps[newStatus.CanaryStatus.CurrentStepIndex-1]
c := &TrafficRoutingContext{
Key: fmt.Sprintf("Rollout(%s/%s)", rollout.Namespace, rollout.Name),
Namespace: rollout.Namespace,
ObjectRef: rollout.Spec.Strategy.Canary.TrafficRoutings,
Strategy: currentStep.TrafficRoutingStrategy,
OwnerRef: *metav1.NewControllerRef(rollout, v1beta1.SchemeGroupVersion.WithKind("Rollout")),
RevisionLabelKey: workload.RevisionLabelKey,
StableRevision: newStatus.CanaryStatus.StableRevision,
CanaryRevision: newStatus.CanaryStatus.PodTemplateHash,
LastUpdateTime: newStatus.CanaryStatus.LastUpdateTime,
Key: fmt.Sprintf("Rollout(%s/%s)", rollout.Namespace, rollout.Name),
Namespace: rollout.Namespace,
ObjectRef: rollout.Spec.Strategy.Canary.TrafficRoutings,
Strategy: currentStep.TrafficRoutingStrategy,
OwnerRef: *metav1.NewControllerRef(rollout, v1beta1.SchemeGroupVersion.WithKind("Rollout")),
RevisionLabelKey: workload.RevisionLabelKey,
StableRevision: newStatus.CanaryStatus.StableRevision,
CanaryRevision: newStatus.CanaryStatus.PodTemplateHash,
LastUpdateTime: newStatus.CanaryStatus.LastUpdateTime,
OnlyTrafficRouting: cs.onlyTrafficRouting,
}
manager := NewTrafficRoutingManager(client)
done, err := manager.FinalisingTrafficRouting(c, cs.onlyRestoreStableService)
done, err := manager.FinalisingTrafficRouting(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
}
@ -1163,6 +1178,16 @@ func TestFinalisingTrafficRouting(t *testing.T) {
for _, obj := range ig {
checkObjEqual(client, t, obj)
}
ss, ig = cs.expectNotFound()
for _, obj := range ss {
checkNotFound(client, t, obj)
}
for _, obj := range ig {
checkNotFound(client, t, obj)
}
// empty the grace expectations to avoid making effects on the following test
grace.ResetExpectations()
})
}
}
@ -1175,6 +1200,7 @@ func TestRestoreGateway(t *testing.T) {
onlyTrafficRouting bool
expectObj func() ([]*corev1.Service, []*netv1.Ingress)
expectNotFound func() ([]*corev1.Service, []*netv1.Ingress)
retry bool
}{
{
name: "Restore Gateway test1",
@ -1212,6 +1238,7 @@ func TestRestoreGateway(t *testing.T) {
c2.Name = "echoserver-canary"
return nil, []*netv1.Ingress{c2}
},
retry: true,
},
}
@ -1241,9 +1268,12 @@ func TestRestoreGateway(t *testing.T) {
OnlyTrafficRouting: cs.onlyTrafficRouting,
}
manager := NewTrafficRoutingManager(cli)
err := manager.RestoreGateway(c)
retry, err := manager.RestoreGateway(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
t.Fatalf("RestoreGateway failed: %s", err)
}
if retry != cs.retry {
t.Fatalf("RestoreGateway expect(%v), but get(%v)", cs.retry, retry)
}
ss, ig = cs.expectObj()
for _, obj := range ss {
@ -1260,6 +1290,12 @@ func TestRestoreGateway(t *testing.T) {
for _, obj := range ig {
checkNotFound(cli, t, obj)
}
// the second call, it should be no error and no retry
time.Sleep(1 * time.Second)
retry, err = manager.RestoreGateway(c)
if err != nil || retry {
t.Fatalf("RestoreGateway failed: %s", err)
}
})
}
}
@ -1272,6 +1308,7 @@ func TestRemoveCanaryService(t *testing.T) {
onlyTrafficRouting bool
expectObj func() ([]*corev1.Service, []*netv1.Ingress)
expectNotFound func() ([]*corev1.Service, []*netv1.Ingress)
retry bool
}{
{
name: "Restore Gateway test1",
@ -1311,6 +1348,7 @@ func TestRemoveCanaryService(t *testing.T) {
s2.Name = "echoserver-canary"
return []*corev1.Service{s2}, nil
},
retry: true,
},
}
@ -1340,9 +1378,12 @@ func TestRemoveCanaryService(t *testing.T) {
OnlyTrafficRouting: cs.onlyTrafficRouting,
}
manager := NewTrafficRoutingManager(cli)
err := manager.RemoveCanaryService(c)
retry, err := manager.RemoveCanaryService(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
t.Fatalf("RemoveCanaryService failed: %s", err)
}
if retry != cs.retry {
t.Fatalf("RemoveCanaryService expect(%v), but get(%v)", false, retry)
}
ss, ig = cs.expectObj()
for _, obj := range ss {
@ -1359,6 +1400,12 @@ func TestRemoveCanaryService(t *testing.T) {
for _, obj := range ig {
checkNotFound(cli, t, obj)
}
// the second call, it should be no error and no retry
time.Sleep(1 * time.Second)
retry, err = manager.RemoveCanaryService(c)
if err != nil || retry {
t.Fatalf("RemoveCanaryService failed: %s", err)
}
})
}
}

View File

@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/klog/v2"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -112,11 +113,6 @@ func (r *customController) Initialize(ctx context.Context) error {
// when ensuring routes, first execute lua for all custom providers, then update
func (r *customController) EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error) {
done := true
// *strategy.Weight == 0 indicates traffic routing is doing finalising and tries to route whole traffic to stable service
// then directly do finalising
if strategy.Traffic != nil && *strategy.Traffic == "0%" {
return true, nil
}
var err error
customNetworkRefList := make([]*unstructured.Unstructured, len(r.conf.TrafficConf))
@ -182,8 +178,9 @@ func (r *customController) EnsureRoutes(ctx context.Context, strategy *v1beta1.T
return done, nil
}
func (r *customController) Finalise(ctx context.Context) error {
done := true
func (r *customController) Finalise(ctx context.Context) (bool, error) {
modified := false
errList := field.ErrorList{}
for _, ref := range r.conf.TrafficConf {
obj := &unstructured.Unstructured{}
obj.SetAPIVersion(ref.APIVersion)
@ -193,19 +190,19 @@ func (r *customController) Finalise(ctx context.Context) error {
klog.Infof("custom network provider %s(%s/%s) not found when finalising", ref.Kind, r.conf.RolloutNs, ref.Name)
continue
}
errList = append(errList, field.InternalError(field.NewPath("GetCustomNetworkProvider"), err))
klog.Errorf("failed to get %s(%s/%s) when finalising, process next first", ref.Kind, r.conf.RolloutNs, ref.Name)
done = false
continue
}
if err := r.restoreObject(obj); err != nil {
done = false
if updated, err := r.restoreObject(obj); err != nil {
errList = append(errList, field.InternalError(field.NewPath("RestoreCustomNetworkProvider"), err))
klog.Errorf("failed to restore %s(%s/%s) when finalising: %s", ref.Kind, r.conf.RolloutNs, ref.Name, err.Error())
} else if updated {
modified = true
}
}
if !done {
return fmt.Errorf("finalising work for %s is not done", r.conf.Key)
}
return nil
return modified, errList.ToAggregate()
}
// store spec of an object in OriginalSpecAnnotation
@ -237,11 +234,11 @@ func (r *customController) storeObject(obj *unstructured.Unstructured) error {
}
// restore an object from spec stored in OriginalSpecAnnotation
func (r *customController) restoreObject(obj *unstructured.Unstructured) error {
func (r *customController) restoreObject(obj *unstructured.Unstructured) (modified bool, err error) {
annotations := obj.GetAnnotations()
if annotations == nil || annotations[OriginalSpecAnnotation] == "" {
klog.Infof("OriginalSpecAnnotation not found in custom network provider %s(%s/%s)", obj.GetKind(), r.conf.RolloutNs, obj.GetName())
return nil
return false, nil
}
oSpecStr := annotations[OriginalSpecAnnotation]
var oSpec Data
@ -251,10 +248,10 @@ func (r *customController) restoreObject(obj *unstructured.Unstructured) error {
obj.SetLabels(oSpec.Labels)
if err := r.Update(context.TODO(), obj); err != nil {
klog.Errorf("failed to restore object %s(%s/%s) from annotation(%s): %s", obj.GetKind(), r.conf.RolloutNs, obj.GetName(), OriginalSpecAnnotation, err.Error())
return err
return false, err
}
klog.Infof("restore custom network provider %s(%s/%s) from annotation(%s) success", obj.GetKind(), obj.GetNamespace(), obj.GetName(), OriginalSpecAnnotation)
return nil
return true, nil
}
func (r *customController) executeLuaForCanary(spec Data, strategy *v1beta1.TrafficRoutingStrategy, luaScript string) (Data, error) {

View File

@ -596,6 +596,7 @@ func TestFinalise(t *testing.T) {
getUnstructured func() *unstructured.Unstructured
getConfig func() Config
expectUnstructured func() *unstructured.Unstructured
modified bool
}{
{
name: "test1, finalise VirtualService",
@ -631,6 +632,7 @@ func TestFinalise(t *testing.T) {
_ = u.UnmarshalJSON([]byte(virtualServiceDemo))
return u
},
modified: true,
},
}
@ -643,10 +645,13 @@ func TestFinalise(t *testing.T) {
return
}
c, _ := NewCustomController(fakeCli, cs.getConfig())
err = c.Finalise(context.TODO())
modified, err := c.Finalise(context.TODO())
if err != nil {
t.Fatalf("Initialize failed: %s", err.Error())
}
if cs.modified != modified {
t.Fatalf("is modified: expect(%v), but get(%v)", cs.modified, modified)
}
checkEqual(fakeCli, t, cs.expectUnstructured())
})
}

View File

@ -94,20 +94,20 @@ func (r *gatewayController) EnsureRoutes(ctx context.Context, strategy *v1beta1.
return false, nil
}
func (r *gatewayController) Finalise(ctx context.Context) error {
func (r *gatewayController) Finalise(ctx context.Context) (bool, error) {
httpRoute := &gatewayv1beta1.HTTPRoute{}
err := r.Get(ctx, types.NamespacedName{Namespace: r.conf.Namespace, Name: *r.conf.TrafficConf.HTTPRouteName}, httpRoute)
if err != nil {
if errors.IsNotFound(err) {
return nil
return false, nil
}
klog.Errorf("%s get HTTPRoute failed: %s", r.conf.Key, err.Error())
return err
return false, err
}
// desired rule
desiredRule := r.buildDesiredHTTPRoute(httpRoute.Spec.Rules, utilpointer.Int32(-1), nil, nil)
if reflect.DeepEqual(httpRoute.Spec.Rules, desiredRule) {
return nil
return false, nil
}
routeClone := &gatewayv1beta1.HTTPRoute{}
if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
@ -119,10 +119,10 @@ func (r *gatewayController) Finalise(ctx context.Context) error {
return r.Client.Update(context.TODO(), routeClone)
}); err != nil {
klog.Errorf("update %s httpRoute(%s) failed: %s", r.conf.Key, httpRoute.Name, err.Error())
return err
return false, err
}
klog.Infof("%s TrafficRouting Finalise success", r.conf.Key)
return nil
return true, nil
}
func (r *gatewayController) buildDesiredHTTPRoute(rules []gatewayv1beta1.HTTPRouteRule, weight *int32, matches []v1beta1.HttpRouteMatch,

View File

@ -147,23 +147,23 @@ func (r *ingressController) EnsureRoutes(ctx context.Context, strategy *v1beta1.
return false, nil
}
func (r *ingressController) Finalise(ctx context.Context) error {
func (r *ingressController) Finalise(ctx context.Context) (bool, error) {
canaryIngress := &netv1.Ingress{}
err := r.Get(ctx, types.NamespacedName{Namespace: r.conf.Namespace, Name: r.canaryIngressName}, canaryIngress)
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s get canary ingress(%s) failed: %s", r.conf.Key, r.canaryIngressName, err.Error())
return err
return false, err
}
if errors.IsNotFound(err) || !canaryIngress.DeletionTimestamp.IsZero() {
return nil
return false, nil
}
// immediate delete canary ingress
if err = r.Delete(ctx, canaryIngress); err != nil {
klog.Errorf("%s remove canary ingress(%s) failed: %s", r.conf.Key, canaryIngress.Name, err.Error())
return err
return false, err
}
klog.Infof("%s remove canary ingress(%s) success", r.conf.Key, canaryIngress.Name)
return nil
return true, nil
}
func (r *ingressController) buildCanaryIngress(stableIngress *netv1.Ingress) *netv1.Ingress {

View File

@ -707,6 +707,7 @@ func TestFinalise(t *testing.T) {
getConfigmap func() *corev1.ConfigMap
getIngress func() []*netv1.Ingress
expectIngress func() *netv1.Ingress
modified bool
}{
{
name: "finalise test1",
@ -726,6 +727,7 @@ func TestFinalise(t *testing.T) {
expectIngress: func() *netv1.Ingress {
return nil
},
modified: true,
},
}
@ -749,11 +751,14 @@ func TestFinalise(t *testing.T) {
t.Fatalf("NewIngressTrafficRouting failed: %s", err.Error())
return
}
err = controller.Finalise(context.TODO())
modified, err := controller.Finalise(context.TODO())
if err != nil {
t.Fatalf("EnsureRoutes failed: %s", err.Error())
return
}
if modified != cs.modified {
t.Fatalf("expect(%v), but get(%v)", cs.modified, modified)
}
canaryIngress := &netv1.Ingress{}
err = fakeCli.Get(context.TODO(), client.ObjectKey{Name: "echoserver-canary"}, canaryIngress)
if err != nil {

View File

@ -35,6 +35,6 @@ type NetworkProvider interface {
// When the first set weight is returned false, mainly to give the provider some time to process, only when again ensure, will return true
EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error)
// Finalise will do some cleanup work after the canary rollout complete, such as delete canary ingress.
// Finalise is called with a 3-second delay after completing the canary.
Finalise(ctx context.Context) error
// if error is nil, the return bool value means if the resources are modified
Finalise(ctx context.Context) (bool, error)
}

View File

@ -0,0 +1,171 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grace
import (
"flag"
"sync"
"time"
"k8s.io/klog/v2"
)
type Action string
const (
// Create action
Create Action = "create"
// Delete action
Delete Action = "delete"
// action includes: patch service selector
Update Action = "patch"
// action includes: remove service selector/restore gateway
Restore Action = "unpatch"
)
// Define variables for the default expectation timeout and DefaultGraceExpectations instance.
var (
ExpectationGraceTimeout time.Duration
DefaultGraceExpectations = NewGraceExpectations()
)
func init() {
flag.DurationVar(&ExpectationGraceTimeout, "grace-timeout", time.Minute*5, "The grace expectation timeout. Defaults 5min")
DefaultGraceExpectations.StartCleaner(ExpectationGraceTimeout)
}
// NewGraceExpectations returns a GraceExpectations.
func NewGraceExpectations() *realGraceExpectations {
return &realGraceExpectations{
controllerCache: make(map[string]timeCache),
}
}
type timeCache map[Action]*time.Time
type realGraceExpectations struct {
sync.RWMutex
controllerCache map[string]timeCache // key: parent key, workload namespace/name
}
func (r *realGraceExpectations) GetExpectations(controllerKey string) timeCache {
r.RLock()
defer r.RUnlock()
expectations := r.controllerCache[controllerKey]
if expectations == nil {
return nil
}
res := make(timeCache, len(expectations))
for k, v := range expectations {
res[k] = v
}
return res
}
func (r *realGraceExpectations) Expect(controllerKey string, action Action) {
r.Lock()
defer r.Unlock()
expectations := r.controllerCache[controllerKey]
if expectations == nil {
expectations = make(timeCache)
r.controllerCache[controllerKey] = expectations
}
recordTime := time.Now()
expectations[action] = &recordTime
}
func (r *realGraceExpectations) Observe(controllerKey string, action Action) {
r.Lock()
defer r.Unlock()
expectations := r.controllerCache[controllerKey]
if expectations == nil {
return
}
delete(expectations, action)
if len(expectations) == 0 {
delete(r.controllerCache, controllerKey)
}
}
func (r *realGraceExpectations) SatisfiedExpectations(controllerKey string, action Action, graceSeconds int32) (bool, time.Duration) {
r.RLock()
defer r.RUnlock()
expectations := r.controllerCache[controllerKey]
if expectations == nil {
return true, 0
}
recordTime, ok := expectations[action]
if !ok {
return true, 0
}
remaining := time.Duration(graceSeconds)*time.Second - time.Since(*recordTime)
if remaining <= 0 {
return true, 0
}
return false, remaining
}
func (r *realGraceExpectations) DeleteExpectations(controllerKey string) {
r.Lock()
defer r.Unlock()
delete(r.controllerCache, controllerKey)
}
// cleaning outdated items
func (r *realGraceExpectations) CleanOutdatedItems(interval time.Duration) {
r.Lock()
defer r.Unlock()
for controllerKey, expectations := range r.controllerCache {
for action, recordTime := range expectations {
if time.Since(*recordTime) > interval {
delete(expectations, action)
}
}
if len(expectations) == 0 {
klog.Infof("clean outdated item: %s", controllerKey)
delete(r.controllerCache, controllerKey)
}
}
}
// Start a goroutine to clean outdated items every 5 minutes
func (r *realGraceExpectations) StartCleaner(interval time.Duration) {
klog.Infof("start grace expectations cleaner, interval: %v", interval)
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
klog.Info("---clean outdated items---")
r.CleanOutdatedItems(interval)
}
}()
}
// warning: only used for test
func (r *realGraceExpectations) resetExpectations() {
r.Lock()
defer r.Unlock()
for controllerKey := range r.controllerCache {
delete(r.controllerCache, controllerKey)
}
}

View File

@ -0,0 +1,209 @@
package grace
import (
"testing"
"time"
)
func TestGetExpectations(t *testing.T) {
r := NewGraceExpectations()
now := time.Now()
r.controllerCache["testKey"] = timeCache{
Create: &now,
}
result := r.GetExpectations("testKey")
if result == nil || len(result) != 1 || result[Create] == nil {
t.Errorf("expected timeCache with one Create action, got %v", result)
}
}
func TestExpect(t *testing.T) {
r := NewGraceExpectations()
r.Expect("testKey", Create)
if _, exists := r.controllerCache["testKey"][Create]; !exists {
t.Errorf("expected Create action for testKey to be recorded")
}
}
func TestObserve(t *testing.T) {
r := NewGraceExpectations()
r.Expect("testKey", Create)
r.Observe("testKey", Create)
if _, exists := r.controllerCache["testKey"]; exists {
t.Errorf("expected testKey to be removed from cache after Observe")
}
}
func TestSatisfiedExpectations(t *testing.T) {
r := NewGraceExpectations()
r.Expect("testKey", Create)
// Should be unsatisfied if graceSeconds is 0 (immediate timeout)
satisfied, _ := r.SatisfiedExpectations("testKey", Create, 0)
if !satisfied {
t.Errorf("expected expectations to be satisfied immediately for 0 graceSeconds")
}
// Set a new expectation with some future grace period
r.Expect("testKey", Create)
satisfied, _ = r.SatisfiedExpectations("testKey", Create, 60)
if satisfied {
t.Errorf("expected expectations to be unsatisfied for 60 second grace period")
}
}
func TestDeleteExpectations(t *testing.T) {
r := NewGraceExpectations()
r.Expect("testKey", Create)
r.DeleteExpectations("testKey")
if _, exists := r.controllerCache["testKey"]; exists {
t.Errorf("expected testKey to be deleted from cache after DeleteExpectations")
}
}
func TestCleanOutdatedItems(t *testing.T) {
r := NewGraceExpectations()
// Set an expectation well in the past so it is outdated
past := time.Now().Add(-time.Hour)
r.controllerCache["testKey"] = timeCache{
Create: &past,
}
r.CleanOutdatedItems(ExpectationGraceTimeout)
if _, exists := r.controllerCache["testKey"]; exists {
t.Errorf("expected testKey to be removed by CleanOutdatedItems")
}
// Set a recent expectation to ensure it is not cleaned out
r.Expect("testKey", Create)
recent := time.Now()
r.controllerCache["testKey"][Create] = &recent
r.CleanOutdatedItems(ExpectationGraceTimeout)
if _, exists := r.controllerCache["testKey"]; !exists {
t.Errorf("expected testKey to not be removed by CleanOutdatedItems")
}
}
func TestResetExpectations(t *testing.T) {
r := NewGraceExpectations()
r.Expect("testKey", Create)
r.resetExpectations()
if _, exists := r.controllerCache["testKey"]; exists {
t.Errorf("expected controller cache to be empty after resetExpectations")
}
}
func TestComprehensive(t *testing.T) {
// Initialize realGraceExpectations
r := NewGraceExpectations()
// Add expectations
r.Expect("testController1", Create)
r.Expect("testController1", Delete)
r.Expect("testController2", Update)
// Validate that expectations are correctly added
if expectations := r.GetExpectations("testController1"); len(expectations) != 2 {
t.Errorf("expected 2 actions for testController1, got %d", len(expectations))
}
if expectations := r.GetExpectations("testController2"); len(expectations) != 1 {
t.Errorf("expected 1 action for testController2, got %d", len(expectations))
}
// Observe and remove a specific action
r.Observe("testController1", Create)
if expectations := r.GetExpectations("testController1"); len(expectations) != 1 {
t.Errorf("expected 1 action for testController1 after observation, got %d", len(expectations))
}
// Check satisfaction status for an existing expectation
satisfied, remaining := r.SatisfiedExpectations("testController1", Delete, 0)
if !satisfied || remaining > 0 {
t.Errorf("expected unsatisfied expectation for testController1 Delete action")
}
// Check satisfaction status for a non-existing action
satisfied, _ = r.SatisfiedExpectations("testController1", Create, 0)
if !satisfied {
t.Errorf("expected satisfied status for non-existing Create action on testController1")
}
// Advance the time and check satisfaction after timeout
r.Expect("testController3", Restore)
time.Sleep(2 * time.Second)
satisfied, _ = r.SatisfiedExpectations("testController3", Restore, 1) // 1 second grace period
if !satisfied {
t.Errorf("expected satisfied status for Restore action on testController3 after timeout")
}
// Delete expectations for a controller key
r.DeleteExpectations("testController1")
if expectations := r.GetExpectations("testController1"); len(expectations) != 0 {
t.Errorf("expected no actions for testController1 after deletion")
}
// Clean outdated items
r.Expect("testController4", Update)
outdated := time.Now().Add(-10 * time.Minute) // Set a past record time to be outdated
r.controllerCache["testController4"][Update] = &outdated
r.CleanOutdatedItems(ExpectationGraceTimeout)
if expectations := r.GetExpectations("testController4"); len(expectations) != 0 {
t.Errorf("expected no actions for testController4 after clean")
}
// Reset expectations
r.Expect("testController5", Delete)
r.resetExpectations()
if len(r.controllerCache) != 0 {
t.Errorf("expected empty controller cache after reset")
}
}
func TestStartCleaner(t *testing.T) {
// Shorten the interval for faster testing
testInterval := time.Millisecond * 100
// Initialize realGraceExpectations
r := NewGraceExpectations()
// Add mixed expectations
past := time.Now().Add(-10 * time.Minute)
recent := time.Now().Add(testInterval * 3)
r.controllerCache["testController1"] = timeCache{
Create: &past,
Update: &recent,
}
r.controllerCache["testController2"] = timeCache{
Delete: &past,
Restore: &recent,
}
// Start the cleaner with a short interval
r.StartCleaner(testInterval)
// Wait for the cleaner to run
time.Sleep(testInterval * 3)
// Verify the outdated items have been cleaned and recent ones remain
if len(r.controllerCache["testController1"]) != 1 || r.controllerCache["testController1"][Update] != &recent {
t.Errorf("expected only recent Update action for testController1 to remain, found %d", len(r.controllerCache["testController1"]))
}
if len(r.controllerCache["testController2"]) != 1 || r.controllerCache["testController2"][Restore] != &recent {
t.Errorf("expected only recent Restore action for testController2 to remain, found %d", len(r.controllerCache["testController2"]))
}
}

View File

@ -0,0 +1,65 @@
/*
Copyright 2022 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package grace
import (
"time"
"k8s.io/klog/v2"
)
func RunWithGraceSeconds(key, action string, graceSeconds int32, f func() (bool, error)) (bool, time.Duration, error) {
return runWithGraceSeconds(DefaultGraceExpectations, key, action, graceSeconds, f)
}
// Returns:
// - error: If the passed function itself returns an error, the error is directly returned.
// - bool: Tells the caller whether a retry is needed.
// - time.Duration: The remaining time to wait. (only valid when error is nil)
// The passed function `f` needs to be idempotent.
// - The bool value returned by `f` indicates whether the related resources were updated in this call.
// - If resources were updated, we need to wait for `graceSeconds`.
func runWithGraceSeconds(e *realGraceExpectations, key, action string, graceSeconds int32, f func() (bool, error)) (bool, time.Duration, error) {
modified, err := f()
if err != nil {
return true, 0, err
}
// if user specify 0, it means no need to wait
if graceSeconds == 0 {
e.Observe(key, Action(action))
return false, 0, nil
}
// if f return true, it means some resources are modified by f in this call
// we need to wait a grace period
if modified {
e.Expect(key, Action(action))
klog.Infof("function return modified, expectation created, key: %s, action: %s, expect to wait %d seconds", key, action, graceSeconds)
return true, time.Duration(graceSeconds) * time.Second, nil
}
if satisfied, remaining := e.SatisfiedExpectations(key, Action(action), graceSeconds); !satisfied {
klog.Infof("expectation unsatisfied, key: %s, action: %s, remaining/graceSeconds: %.1f/%d", key, action, remaining.Seconds(), graceSeconds)
return true, remaining, nil
}
e.Observe(key, Action(action))
klog.Infof("expectation satisfied, key: %s, action: %s", key, action)
return false, 0, nil
}
// warning: only used for test
func ResetExpectations() {
DefaultGraceExpectations.resetExpectations()
}

View File

@ -0,0 +1,132 @@
package grace
import (
"errors"
"testing"
"time"
)
func TestRunWithGraceSeconds(t *testing.T) {
tests := []struct {
name string
graceSeconds int32
modified bool
err error
expectedRetry bool
expectedErr error
}{
{name: "No modification, no grace period", graceSeconds: 0, modified: false, err: nil, expectedRetry: false, expectedErr: nil},
{name: "Modification, with grace period", graceSeconds: 10, modified: true, err: nil, expectedRetry: true, expectedErr: nil},
{name: "No modification, expectation unsatisfied", graceSeconds: 10, modified: false, err: nil, expectedRetry: true, expectedErr: nil},
{name: "Function returns error", graceSeconds: 10, modified: false, err: errors.New("test error"), expectedRetry: true, expectedErr: errors.New("test error")},
}
graceExpectations := NewGraceExpectations()
for _, cs := range tests {
t.Run(cs.name, func(t *testing.T) {
f := func() (bool, error) {
return cs.modified, cs.err
}
retry, _, err := runWithGraceSeconds(graceExpectations, "testKey", "create", cs.graceSeconds, f)
if retry != cs.expectedRetry {
t.Errorf("expected retry: %v, got: %v", cs.expectedRetry, retry)
}
if !equalErr(err, cs.expectedErr) {
t.Errorf("expected error: %v, got: %v", cs.expectedErr, err)
}
})
}
// Additional test to verify the timeout behavior
t.Run("Satisfaction of expectation over time", func(t *testing.T) {
graceSeconds := int32(1) // 1 second grace period
graceExpectations := NewGraceExpectations()
f := func() (bool, error) {
return true, nil
}
retry, _, err := runWithGraceSeconds(graceExpectations, "testKey2", "delete", graceSeconds, f)
if retry != true {
t.Errorf("expected retry: true after modification, got: %v", retry)
}
if err != nil {
t.Errorf("expected no error, got: %v", err)
}
// Wait for the grace period to be satisfied
time.Sleep(time.Duration(graceSeconds) * time.Second)
f2 := func() (bool, error) {
return false, nil
}
retry, _, err = runWithGraceSeconds(graceExpectations, "testKey2", "delete", graceSeconds, f2)
if retry != false {
t.Errorf("expected retry: false after grace period satisfied, got: %v", retry)
}
if err != nil {
t.Errorf("expected no error, got: %v", err)
}
})
}
// Comprehensive test for RunWithGraceSeconds using testFunction
func TestRunWithGraceSecondsComprehensive(t *testing.T) {
graceExpectations := NewGraceExpectations()
tests := []struct {
name string
expectedState int
graceSeconds int32
expectedRetry bool
expectedErr error
}{
{"No modification needed", 0, 0, false, nil},
{"Initial modification, create expectation", 1, 10, true, nil},
{"Function errors", -1, 10, true, errors.New("test error")},
}
for _, cs := range tests {
t.Run(cs.name, func(t *testing.T) {
f := testFunction(cs.expectedState)
retry, _, err := runWithGraceSeconds(graceExpectations, cs.name, "create", cs.graceSeconds, f)
if !equalErr(err, cs.expectedErr) {
t.Errorf("%s: expected error: %v, but got none", cs.name, cs.expectedErr)
}
if retry != cs.expectedRetry {
t.Errorf("%s: expected retry: %v, got: %v", cs.name, cs.expectedRetry, retry)
}
})
}
}
// Test function generator
func testFunction(expectedState int) func() (bool, error) {
currentState := 0
return func() (bool, error) {
if expectedState < 0 {
return true, errors.New("test error")
}
if currentState == expectedState {
return false, nil
} else {
currentState++
return true, nil
}
}
}
func equalErr(err1, err2 error) bool {
if err1 == nil && err2 == nil {
return true
}
if err1 != nil && err2 != nil {
return true
}
return false
}

View File

@ -190,13 +190,3 @@ func CheckNextBatchIndexWithCorrect(rollout *rolloutv1beta1.Rollout) {
}
}
}
func GracePeriodSecondsOrDefault(refs []rolloutv1beta1.TrafficRoutingRef, defaultSeconds int32) int32 {
if len(refs) == 0 {
return defaultSeconds
}
if refs[0].GracePeriodSeconds < 0 {
return defaultSeconds
}
return refs[0].GracePeriodSeconds
}