diff --git a/operator/controllers/operation_controller.go b/operator/controllers/operation_controller.go index 2419d86..23b1f44 100644 --- a/operator/controllers/operation_controller.go +++ b/operator/controllers/operation_controller.go @@ -24,7 +24,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/tools/record" - capierrors "sigs.k8s.io/cluster-api/errors" "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -55,7 +54,7 @@ type OperationReconciler struct { func (r *OperationReconciler) SetupWithManager(mgr ctrl.Manager) error { err := ctrl.NewControllerManagedBy(mgr). For(&operatorv1.Operation{}). - Owns(&operatorv1.RuntimeTaskGroup{}). + Owns(&operatorv1.RuntimeTaskGroup{}). // force reconcile operation every time one of the owned TaskGroups change Complete(r) //TODO: watch DS for operation Daemonsets @@ -106,9 +105,6 @@ func (r *OperationReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, rerr e // Reconcile the Operation if err := r.reconcileOperation(operation, log); err != nil { - if requeueErr, ok := errors.Cause(err).(capierrors.HasRequeueAfterError); ok { - return ctrl.Result{Requeue: true, RequeueAfter: requeueErr.GetRequeueAfter()}, nil - } return ctrl.Result{}, err } @@ -183,7 +179,14 @@ func (r *OperationReconciler) reconcileOperation(operation *operatorv1.Operation } } // Handle non-deleted Operation - err = r.reconcileNormal(operation, log) + + // gets controlled taskGroups items (desired vs actual) + taskGroups, err := r.reconcileTaskGroups(operation, log) + if err != nil { + return err + } + + err = r.reconcileNormal(operation, taskGroups, log) if err != nil { return } @@ -214,23 +217,18 @@ func (r *OperationReconciler) reconcileLabels(operation *operatorv1.Operation) { } } -func (r *OperationReconciler) reconcileNormal(operation *operatorv1.Operation, log logr.Logger) error { - // If the Operation doesn't have finalizer, add it. - //if !util.Contains(operation.Finalizers, operatorv1.OperationFinalizer) { - // operation.Finalizers = append(operation.Finalizers, operatorv1.OperationFinalizer) - //} - +func (r *OperationReconciler) reconcileTaskGroups(operation *operatorv1.Operation, log logr.Logger) (*taskGroupReconcileList, error) { // gets all the desired TaskGroup objects for the current operation // Nb. this is the domain knowledge encoded into operation implementations desired, err := operations.TaskGroupList(operation) if err != nil { - return errors.Wrap(err, "failed to get desired TaskGroup list") + return nil, errors.Wrap(err, "failed to get desired TaskGroup list") } // gets the current TaskGroup objects related to this Operation actual, err := listTaskGroupsByLabels(r.Client, operation.Labels) if err != nil { - return errors.Wrap(err, "failed to list TaskGroup") + return nil, errors.Wrap(err, "failed to list TaskGroup") } r.Log.Info("reconciling", "desired-TaskGroups", len(desired.Items), "TaskGroups", len(actual.Items)) @@ -245,64 +243,71 @@ func (r *OperationReconciler) reconcileNormal(operation *operatorv1.Operation, l operation.Status.FailedGroups = int32(len(taskGroups.failed)) operation.Status.InvalidGroups = int32(len(taskGroups.invalid)) + return taskGroups, nil +} + +func (r *OperationReconciler) reconcileNormal(operation *operatorv1.Operation, taskGroups *taskGroupReconcileList, log logr.Logger) error { + // If the Operation doesn't have finalizer, add it. + //if !util.Contains(operation.Finalizers, operatorv1.OperationFinalizer) { + // operation.Finalizers = append(operation.Finalizers, operatorv1.OperationFinalizer) + //} + // if there are TaskGroup not yet completed (pending or running), cleanup error messages (required e.g. after recovery) // NB. It is necessary to give priority to running vs errors so the operation controller keeps alive/restarts // the DaemonsSet for processing tasks - activeTaskGroups := len(taskGroups.pending) + len(taskGroups.running) - if activeTaskGroups > 0 { + if taskGroups.activeTaskGroups() > 0 { operation.Status.ResetError() - } - - // if there are invalid combinations (e.g. a TaskGroup without a desired TaskGroup) - // stop creating new TaskGroup and eventually set the error - if len(taskGroups.invalid) > 0 { - if activeTaskGroups == 0 { + } else { + // if there are invalid combinations (e.g. a TaskGroup without a desired TaskGroup) + // set the error and stop creating new TaskGroups + if len(taskGroups.invalid) > 0 { // TODO: improve error message operation.Status.SetError( operatorerrors.NewOperationReconciliationError("something invalid"), ) + return nil } - // stop creating new TaskGroup - return nil - } - // if there are failed TaskGroup - // stop creating new TaskGroup and eventually set the error - if len(taskGroups.failed) > 0 { - if activeTaskGroups == 0 { + // if there are failed TaskGroup + // set the error and stop creating new TaskGroups + if len(taskGroups.failed) > 0 { // TODO: improve error message operation.Status.SetError( operatorerrors.NewOperationReplicaError("something failed"), ) + return nil } - return nil } // TODO: manage adopt tasks/tasks to be orphaned - // at this point we are sure that there are no invalid current/desired TaskGroup combinations or failed TaskGroup - - // if the completed TaskGroup have reached the number of expected TaskGroup, the Operation is completed - if len(taskGroups.completed) == len(taskGroups.all) { - // NB. we are setting this condition explicitly in order to avoid that the Operation accidentally - // restarts to create TaskGroup - operation.Status.Paused = false - operation.Status.SetCompletionTime() - } - - // otherwise, proceed creating TaskGroup - // if nil, set the Operation start time if operation.Status.StartTime == nil { operation.Status.SetStartTime() //TODO: add a signature so we can detect if someone/something changes the operations while it is processed + return nil } + // if the completed TaskGroup have reached the number of expected TaskGroup, the Operation is completed + // NB. we are doing this before checking pause because if everything is completed, does not make sense to pause + if len(taskGroups.completed) == len(taskGroups.all) { + // NB. we are setting this condition explicitly in order to avoid that the Operation accidentally + // restarts to create TaskGroup + operation.Status.SetCompletionTime() + } + + // if the TaskGroup is paused, return + if operation.Status.Paused { + return nil + } + + // otherwise, proceed creating TaskGroup + // if there are still TaskGroup to be created if len(taskGroups.tobeCreated) > 0 { // if there no TaskGroup not yet completed (pending or running) - if activeTaskGroups == 0 { + if taskGroups.activeTaskGroups() == 0 { // create the next TaskGroup in the ordered sequence nextTaskGroup := taskGroups.tobeCreated[0].planned log.WithValues("task-group", nextTaskGroup.Name).Info("creating task") @@ -326,33 +331,36 @@ func (r *OperationReconciler) reconcileDelete(operation *operatorv1.Operation) e } func (r *OperationReconciler) reconcilePhase(operation *operatorv1.Operation) { - // Set the phase to "pending" if nil. - if operation.Status.Phase == "" { - operation.Status.SetTypedPhase(operatorv1.OperationPhasePending) - } - - // Set the phase to "running" if start date is set. - if operation.Status.StartTime != nil { - operation.Status.SetTypedPhase(operatorv1.OperationPhaseRunning) - } - - // Set the phase to "paused" if paused set. - if operation.Status.Paused { - operation.Status.SetTypedPhase(operatorv1.OperationPhasePaused) - } - - // Set the phase to "succeeded" if completion date is set. - if operation.Status.CompletionTime != nil { - operation.Status.SetTypedPhase(operatorv1.OperationPhaseSucceeded) + // Set the phase to "deleting" if the deletion timestamp is set. + if !operation.DeletionTimestamp.IsZero() { + operation.Status.SetTypedPhase(operatorv1.OperationPhaseDeleted) + return } // Set the phase to "failed" if any of Status.ErrorReason or Status.ErrorMessage is not-nil. if operation.Status.ErrorReason != nil || operation.Status.ErrorMessage != nil { operation.Status.SetTypedPhase(operatorv1.OperationPhaseFailed) + return } - // Set the phase to "deleting" if the deletion timestamp is set. - if !operation.DeletionTimestamp.IsZero() { - operation.Status.SetTypedPhase(operatorv1.OperationPhaseDeleted) + // Set the phase to "succeeded" if completion date is set. + if operation.Status.CompletionTime != nil { + operation.Status.SetTypedPhase(operatorv1.OperationPhaseSucceeded) + return } + + // Set the phase to "paused" if paused set. + if operation.Status.Paused { + operation.Status.SetTypedPhase(operatorv1.OperationPhasePaused) + return + } + + // Set the phase to "running" if start date is set. + if operation.Status.StartTime != nil { + operation.Status.SetTypedPhase(operatorv1.OperationPhaseRunning) + return + } + + // Set the phase to "pending" if nil. + operation.Status.SetTypedPhase(operatorv1.OperationPhasePending) } diff --git a/operator/controllers/operation_controller_test.go b/operator/controllers/operation_controller_test.go index d69c19d..0c4f50e 100644 --- a/operator/controllers/operation_controller_test.go +++ b/operator/controllers/operation_controller_test.go @@ -17,11 +17,17 @@ limitations under the License. package controllers import ( + "context" "reflect" "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/runtime/log" operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1" ) @@ -257,3 +263,372 @@ func TestOperatorReconcilePause(t *testing.T) { }) } } + +func TestOperationReconciler_Reconcile(t *testing.T) { + type fields struct { + Objs []runtime.Object + } + type args struct { + req ctrl.Request + } + tests := []struct { + name string + fields fields + args args + want ctrl.Result + wantErr bool + }{ + { + name: "Reconcile does nothing if operation does not exist", + fields: fields{}, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + { + name: "Reconcile does nothing if the operation is already completed", + fields: fields{ + Objs: []runtime.Object{ + &operatorv1.Operation{ + TypeMeta: metav1.TypeMeta{ + Kind: "Operation", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-operation", + }, + Status: operatorv1.OperationStatus{ + CompletionTime: timePtr(metav1.Now()), + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo-operation"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + { + name: "Reconcile pass", + fields: fields{ + Objs: []runtime.Object{ + &operatorv1.Operation{ + TypeMeta: metav1.TypeMeta{ + Kind: "Operation", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-operation", + }, + Spec: operatorv1.OperationSpec{ + OperatorDescriptor: operatorv1.OperatorDescriptor{ + CustomOperation: &operatorv1.CustomOperationSpec{}, + }, + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo-operation"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &OperationReconciler{ + Client: fake.NewFakeClientWithScheme(setupScheme(), tt.fields.Objs...), + AgentImage: "some-image", //making reconcile operation pass + MetricsRBAC: false, + Log: log.Log, + recorder: record.NewFakeRecorder(1), + } + got, err := r.Reconcile(tt.args.req) + if (err != nil) != tt.wantErr { + t.Errorf("Reconcile() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Reconcile() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestOperationReconciler_reconcileNormal(t *testing.T) { + type args struct { + operation *operatorv1.Operation + taskGroups *taskGroupReconcileList + } + type want struct { + operation *operatorv1.Operation + } + tests := []struct { + name string + args args + want want + wantTaskGroups int + wantErr bool + }{ + { + name: "Reconcile sets error if a taskGroup is failed and no taskGroup is active", + args: args{ + operation: &operatorv1.Operation{}, + taskGroups: &taskGroupReconcileList{ + all: []*taskGroupReconcileItem{ + {}, + }, + failed: []*taskGroupReconcileItem{ + {}, + }, + }, + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + ErrorMessage: stringPtr("error"), //using error as a marker for "whatever error was raised" + }, + }, + }, + wantTaskGroups: 0, + wantErr: false, + }, + { + name: "Reconcile sets error if a taskGroup is invalid and no taskGroup is active", + args: args{ + operation: &operatorv1.Operation{}, + taskGroups: &taskGroupReconcileList{ + all: []*taskGroupReconcileItem{ + {}, + }, + invalid: []*taskGroupReconcileItem{ + {}, + }, + }, + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + ErrorMessage: stringPtr("error"), //using error as a marker for "whatever error was raised" + }, + }, + }, + wantTaskGroups: 0, + wantErr: false, + }, + { + name: "Reconcile set start time", + args: args{ + operation: &operatorv1.Operation{}, + taskGroups: &taskGroupReconcileList{}, + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTaskGroups: 0, + wantErr: false, + }, + { + name: "Reconcile reset error if a taskGroup is active", + args: args{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + ErrorMessage: stringPtr("error"), + }, + }, + taskGroups: &taskGroupReconcileList{ + all: []*taskGroupReconcileItem{ + {}, + {}, + }, + running: []*taskGroupReconcileItem{ + {}, + }, + failed: []*taskGroupReconcileItem{ // failed should be ignored if a taskGroup is active + {}, + }, + }, + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTaskGroups: 0, + wantErr: false, + }, + { + name: "Reconcile set completion time if no more taskGroup to create", + args: args{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Now()), + }, + }, + taskGroups: &taskGroupReconcileList{}, //empty list of nodes -> no more task to create + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + CompletionTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it completed" + }, + }, + }, + wantTaskGroups: 0, + wantErr: false, + }, + { + name: "Reconcile do nothing if paused", + args: args{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Now()), + Paused: true, + }, + }, + taskGroups: &taskGroupReconcileList{ + all: []*taskGroupReconcileItem{ + {}, + }, + tobeCreated: []*taskGroupReconcileItem{ + {}, + }, + }, + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + Paused: true, + }, + }, + }, + wantTaskGroups: 0, + wantErr: false, + }, + { + name: "Reconcile creates a taskGroup if nothing running", + args: args{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Now()), + }, + }, + taskGroups: &taskGroupReconcileList{ + all: []*taskGroupReconcileItem{ + {}, + }, + tobeCreated: []*taskGroupReconcileItem{ + { + planned: &operatorv1.RuntimeTaskGroup{}, + }, + }, + }, + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTaskGroups: 1, + wantErr: false, + }, + { + name: "Reconcile does not creates a taskGroup if something running", + args: args{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Now()), + }, + }, + taskGroups: &taskGroupReconcileList{ + all: []*taskGroupReconcileItem{ + {}, + {}, + }, + tobeCreated: []*taskGroupReconcileItem{ + {}, + }, + running: []*taskGroupReconcileItem{ + {}, + }, + }, + }, + want: want{ + operation: &operatorv1.Operation{ + Status: operatorv1.OperationStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTaskGroups: 0, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := fake.NewFakeClientWithScheme(setupScheme()) + + r := &OperationReconciler{ + Client: c, + recorder: record.NewFakeRecorder(1), + Log: log.Log, + } + if err := r.reconcileNormal(tt.args.operation, tt.args.taskGroups, r.Log); (err != nil) != tt.wantErr { + t.Errorf("reconcileNormal() error = %v, wantErr %v", err, tt.wantErr) + } + + fixupWantOperation(tt.want.operation, tt.args.operation) + + if !reflect.DeepEqual(tt.args.operation, tt.want.operation) { + t.Errorf("reconcileNormal() = %v, want %v", tt.args.operation, tt.want.operation) + } + + taskGroups := &operatorv1.RuntimeTaskGroupList{} + if err := c.List(context.Background(), taskGroups); err != nil { + t.Fatalf("List() error = %v", err) + } + + if len(taskGroups.Items) != tt.wantTaskGroups { + t.Errorf("reconcileNormal() = %v taskGroups, want %v taskGroups", len(taskGroups.Items), tt.wantTaskGroups) + } + }) + } +} + +func fixupWantOperation(want *operatorv1.Operation, got *operatorv1.Operation) { + // In case want.StartTime is a marker, replace it with the current CompletionTime + if want.CreationTimestamp.IsZero() { + want.CreationTimestamp = got.CreationTimestamp + } + + // In case want.ErrorMessage is a marker, replace it with the current error + if want.Status.ErrorMessage != nil && *want.Status.ErrorMessage == "error" && got.Status.ErrorMessage != nil { + want.Status.ErrorMessage = got.Status.ErrorMessage + want.Status.ErrorReason = got.Status.ErrorReason + } + + // In case want.StartTime is a marker, replace it with the current CompletionTime + if want.Status.StartTime != nil && want.Status.StartTime.IsZero() && got.Status.StartTime != nil { + want.Status.StartTime = got.Status.StartTime + } + // In case want.CompletionTime is a marker, replace it with the current CompletionTime + if want.Status.CompletionTime != nil && want.Status.CompletionTime.IsZero() && got.Status.CompletionTime != nil { + want.Status.CompletionTime = got.Status.CompletionTime + } +} diff --git a/operator/controllers/runtimetask_controller.go b/operator/controllers/runtimetask_controller.go index 9003acf..0630d4a 100644 --- a/operator/controllers/runtimetask_controller.go +++ b/operator/controllers/runtimetask_controller.go @@ -22,11 +22,9 @@ import ( "time" "github.com/go-logr/logr" - "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/record" - capierrors "sigs.k8s.io/cluster-api/errors" "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -59,7 +57,7 @@ func (r *RuntimeTaskReconciler) SetupWithManager(mgr ctrl.Manager) error { err := ctrl.NewControllerManagedBy(mgr). For(&operatorv1.RuntimeTask{}). - Watches( + Watches( // force reconcile Task every time the parent TaskGroup changes &source.Kind{Type: &operatorv1.RuntimeTaskGroup{}}, &handler.EnqueueRequestsFromMapFunc{ToRequests: mapFunc}, ). @@ -93,28 +91,20 @@ func (r *RuntimeTaskReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, rerr return ctrl.Result{}, nil } - // Fetch the parent TaskGroup instance, if any + // Fetch the parent TaskGroup instance taskgroup, err := getOwnerTaskGroup(ctx, r.Client, task.ObjectMeta) if err != nil { return ctrl.Result{}, err } - // Fetch the parent Operation instance, if any - var operation *operatorv1.Operation - if taskgroup != nil { - operation, err = getOwnerOperation(ctx, r.Client, taskgroup.ObjectMeta) - if err != nil { - return ctrl.Result{}, err - } + // Fetch the parent Operation instance + operation, err := getOwnerOperation(ctx, r.Client, taskgroup.ObjectMeta) + if err != nil { + return ctrl.Result{}, err } // If the controller is set to manage Task for a specific operation, ignore everything else - if r.Operation != "" && r.Operation != operation.Name { - return ctrl.Result{}, nil - } - - // If the controller is set to manage headless Task, ignore everything else - if r.Operation == "" && operation != nil { + if r.Operation != operation.Name { return ctrl.Result{}, nil } @@ -135,23 +125,16 @@ func (r *RuntimeTaskReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, rerr // Reconcile the Task if err := r.reconcileTask(operation, taskgroup, task, log); err != nil { - if requeueErr, ok := errors.Cause(err).(capierrors.HasRequeueAfterError); ok { - return ctrl.Result{Requeue: true, RequeueAfter: requeueErr.GetRequeueAfter()}, nil - } return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *RuntimeTaskReconciler) reconcileTask(operation *operatorv1.Operation, taskgroup *operatorv1.RuntimeTaskGroup, task *operatorv1.RuntimeTask, log logr.Logger) (err error) { - // gets relevant settings from top level objects (or use defaults) - executionMode := operatorv1.OperationExecutionModeAuto - operationPaused := false + // gets relevant settings from top level objects + executionMode := operation.Spec.GetTypedOperationExecutionMode() + operationPaused := operation.Status.Paused - if operation != nil { - executionMode = operation.Spec.GetTypedOperationExecutionMode() - operationPaused = operation.Status.Paused - } // Reconcile recovery from errors recovered := r.reconcileRecovery(executionMode, task, log) @@ -210,6 +193,7 @@ func (r *RuntimeTaskReconciler) reconcileRecovery(executionMode operatorv1.Opera task.Status.Paused = true } } + default: //TODO: error (if possible do validation before getting here) } diff --git a/operator/controllers/runtimetask_controller_test.go b/operator/controllers/runtimetask_controller_test.go index 9a4b054..642e8aa 100644 --- a/operator/controllers/runtimetask_controller_test.go +++ b/operator/controllers/runtimetask_controller_test.go @@ -20,8 +20,14 @@ import ( "reflect" "testing" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/runtime/log" "k8s.io/kubeadm/operator/errors" @@ -485,7 +491,7 @@ func TestRuntimeTaskReconciler_reconcileRecovery(t *testing.T) { Status: operatorv1.RuntimeTaskStatus{ ErrorReason: runtimeTaskStatusErrorPtr(errors.RuntimeTaskExecutionError), ErrorMessage: stringPtr("error"), - CurrentCommand: 2, + CurrentCommand: 1, }, }, }, @@ -502,7 +508,7 @@ func TestRuntimeTaskReconciler_reconcileRecovery(t *testing.T) { CompletionTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it completes" ErrorReason: nil, // error removed ErrorMessage: nil, - CurrentCommand: 2, // next command + CurrentCommand: 1, // next command }, }, events: 1, @@ -525,7 +531,7 @@ func TestRuntimeTaskReconciler_reconcileRecovery(t *testing.T) { t.Errorf("reconcileRecovery() = %v, want %v", got, tt.want.ret) } - fixupTimes(tt.want.task, tt.args.task) + fixupWantTask(tt.want.task, tt.args.task) if !reflect.DeepEqual(tt.args.task, tt.want.task) { t.Errorf("reconcileRecovery() = %v, want %v", tt.args.task, tt.want.task) @@ -785,7 +791,7 @@ func TestRuntimeTaskReconciler_reconcileNormal(t *testing.T) { t.Errorf("reconcileNormal() error = %v, wantErr %v", err, tt.wantErr) } - fixupTimes(tt.want.task, tt.args.task) + fixupWantTask(tt.want.task, tt.args.task) if !reflect.DeepEqual(tt.args.task, tt.want.task) { t.Errorf("reconcileRecovery() = %v, want %v", tt.args.task, tt.want.task) @@ -798,7 +804,305 @@ func TestRuntimeTaskReconciler_reconcileNormal(t *testing.T) { } } -func fixupTimes(want *operatorv1.RuntimeTask, got *operatorv1.RuntimeTask) { +func TestRuntimeTaskReconciler_Reconcile(t *testing.T) { + type fields struct { + NodeName string + Operation string + Objs []runtime.Object + } + type args struct { + req ctrl.Request + } + tests := []struct { + name string + fields fields + args args + want ctrl.Result + wantErr bool + }{ + { + name: "Reconcile does nothing if task does not exist", + fields: fields{}, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + { + name: "Reconcile does nothing if task doesn't target the node the controller is supervising", + fields: fields{ + NodeName: "foo-node", + Objs: []runtime.Object{ + &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "bar-task", + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "bar-node", + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "bar-task"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + { + name: "Reconcile does nothing if the task is already completed", + fields: fields{ + NodeName: "foo-node", + Objs: []runtime.Object{ + &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-task", + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "foo-node", + }, + Status: operatorv1.RuntimeTaskStatus{ + CompletionTime: timePtr(metav1.Now()), + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo-task"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + { + name: "Reconcile fails if failing to retrieve parent taskgroup", + fields: fields{ + NodeName: "foo-node", + Objs: []runtime.Object{ + &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-task", + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "foo-node", + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo-task"}}, + }, + want: ctrl.Result{}, + wantErr: true, + }, + { + name: "Reconcile fails if failing to retrieve parent operation", + fields: fields{ + NodeName: "foo-node", + Objs: []runtime.Object{ + &operatorv1.RuntimeTaskGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTaskGroup", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-taskgroup", + }, + }, + &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-task", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: operatorv1.GroupVersion.String(), + Kind: "RuntimeTaskGroup", + Name: "foo-taskgroup", + }, + }, + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "foo-node", + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo-task"}}, + }, + want: ctrl.Result{}, + wantErr: true, + }, + { + name: "Reconcile does nothing if task doesn't belong to the operation the controller is supervising", + fields: fields{ + NodeName: "foo-node", + Operation: "foo-operation", + Objs: []runtime.Object{ + &operatorv1.Operation{ + TypeMeta: metav1.TypeMeta{ + Kind: "Operation", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "bar-operation", + }, + }, + &operatorv1.RuntimeTaskGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTaskGroup", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-taskgroup", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: operatorv1.GroupVersion.String(), + Kind: "Operation", + Name: "bar-operation", + }, + }, + }, + }, + &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-task", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: operatorv1.GroupVersion.String(), + Kind: "RuntimeTaskGroup", + Name: "foo-taskgroup", + }, + }, + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "foo-node", + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo-task"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + { + name: "Reconcile pass", + fields: fields{ + NodeName: "foo-node", + Operation: "foo-operation", + Objs: []runtime.Object{ + &operatorv1.Operation{ + TypeMeta: metav1.TypeMeta{ + Kind: "Operation", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-operation", + }, + }, + &operatorv1.RuntimeTaskGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTaskGroup", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-taskgroup", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: operatorv1.GroupVersion.String(), + Kind: "Operation", + Name: "foo-operation", + }, + }, + }, + }, + &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-task", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: operatorv1.GroupVersion.String(), + Kind: "RuntimeTaskGroup", + Name: "foo-taskgroup", + }, + }, + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "foo-node", + }, + }, + }, + }, + args: args{ + req: ctrl.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "foo-task"}}, + }, + want: ctrl.Result{}, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &RuntimeTaskReconciler{ + Client: fake.NewFakeClientWithScheme(setupScheme(), tt.fields.Objs...), + NodeName: tt.fields.NodeName, + Operation: tt.fields.Operation, + recorder: record.NewFakeRecorder(1), + Log: log.Log, + } + got, err := r.Reconcile(tt.args.req) + if (err != nil) != tt.wantErr { + t.Errorf("Reconcile() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("Reconcile() got = %v, want %v", got, tt.want) + } + }) + } +} + +func fixupWantTask(want *operatorv1.RuntimeTask, got *operatorv1.RuntimeTask) { + // In case want.StartTime is a marker, replace it with the current CompletionTime + if want.CreationTimestamp.IsZero() { + want.CreationTimestamp = got.CreationTimestamp + } + // In case want.ErrorMessage is a marker, replace it with the current error if want.Status.ErrorMessage != nil && *want.Status.ErrorMessage == "error" && got.Status.ErrorMessage != nil { want.Status.ErrorMessage = got.Status.ErrorMessage @@ -818,3 +1122,17 @@ func fixupTimes(want *operatorv1.RuntimeTask, got *operatorv1.RuntimeTask) { func runtimeTaskStatusErrorPtr(s errors.RuntimeTaskStatusError) *errors.RuntimeTaskStatusError { return &s } + +func setupScheme() *runtime.Scheme { + scheme := runtime.NewScheme() + if err := operatorv1.AddToScheme(scheme); err != nil { + panic(err) + } + if err := corev1.AddToScheme(scheme); err != nil { + panic(err) + } + if err := appsv1.AddToScheme(scheme); err != nil { + panic(err) + } + return scheme +} diff --git a/operator/controllers/runtimetask_reconcile.go b/operator/controllers/runtimetask_reconcile.go index ffd00ec..771135c 100644 --- a/operator/controllers/runtimetask_reconcile.go +++ b/operator/controllers/runtimetask_reconcile.go @@ -150,3 +150,7 @@ func (t *taskReconcileList) deriveViews() { } } } + +func (t *taskReconcileList) activeTasks() int { + return len(t.pending) + len(t.running) +} diff --git a/operator/controllers/runtimetask_reconcile_test.go b/operator/controllers/runtimetask_reconcile_test.go index ac61e2a..a2dbd38 100644 --- a/operator/controllers/runtimetask_reconcile_test.go +++ b/operator/controllers/runtimetask_reconcile_test.go @@ -262,3 +262,7 @@ func timePtr(t metav1.Time) *metav1.Time { func stringPtr(s string) *string { return &s } + +func boolPtr(s bool) *bool { + return &s +} diff --git a/operator/controllers/runtimetaskgroup_controller.go b/operator/controllers/runtimetaskgroup_controller.go index 6b69caa..4200aed 100644 --- a/operator/controllers/runtimetaskgroup_controller.go +++ b/operator/controllers/runtimetaskgroup_controller.go @@ -25,7 +25,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" - capierrors "sigs.k8s.io/cluster-api/errors" "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -56,8 +55,8 @@ func (r *RuntimeTaskGroupReconciler) SetupWithManager(mgr ctrl.Manager) error { err := ctrl.NewControllerManagedBy(mgr). For(&operatorv1.RuntimeTaskGroup{}). - Owns(&operatorv1.RuntimeTask{}). - Watches( + Owns(&operatorv1.RuntimeTask{}). // force reconcile TaskGroup every time one of the owned TaskGroups change + Watches( // force reconcile TaskGroup every time the parent operation changes &source.Kind{Type: &operatorv1.Operation{}}, &handler.EnqueueRequestsFromMapFunc{ToRequests: mapFunc}, ). @@ -109,23 +108,15 @@ func (r *RuntimeTaskGroupReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, // Reconcile the TaskGroup if err := r.reconcileTaskGroup(operation, taskgroup, log); err != nil { - if requeueErr, ok := errors.Cause(err).(capierrors.HasRequeueAfterError); ok { - return ctrl.Result{Requeue: true, RequeueAfter: requeueErr.GetRequeueAfter()}, nil - } return ctrl.Result{}, err } return ctrl.Result{}, nil } func (r *RuntimeTaskGroupReconciler) reconcileTaskGroup(operation *operatorv1.Operation, taskgroup *operatorv1.RuntimeTaskGroup, log logr.Logger) (err error) { - // gets relevant settings from top level objects (or use defaults) - executionMode := operatorv1.OperationExecutionModeAuto - operationPaused := false - - if operation != nil { - executionMode = operation.Spec.GetTypedOperationExecutionMode() - operationPaused = operation.Status.Paused - } + // gets relevant settings from top level objects + executionMode := operation.Spec.GetTypedOperationExecutionMode() + operationPaused := operation.Status.Paused // Reconcile paused override from top level objects r.reconcilePauseOverride(operationPaused, taskgroup) @@ -134,19 +125,26 @@ func (r *RuntimeTaskGroupReconciler) reconcileTaskGroup(operation *operatorv1.Op if !taskgroup.DeletionTimestamp.IsZero() { err = r.reconcileDelete(taskgroup) if err != nil { - return + return err } } // Handle non-deleted TaskGroup - err = r.reconcileNormal(executionMode, taskgroup, log) + + // gets controlled tasks items (desired vs actual) + tasks, err := r.reconcileTasks(executionMode, taskgroup, log) if err != nil { - return + return err + } + + err = r.reconcileNormal(executionMode, taskgroup, tasks, log) + if err != nil { + return err } // Always reconcile Phase at the end r.reconcilePhase(taskgroup) - return + return nil } func (r *RuntimeTaskGroupReconciler) reconcilePauseOverride(operationPaused bool, taskgroup *operatorv1.RuntimeTaskGroup) { @@ -158,17 +156,12 @@ func (r *RuntimeTaskGroupReconciler) reconcilePauseOverride(operationPaused bool taskgroup.Status.Paused = taskgrouppaused } -func (r *RuntimeTaskGroupReconciler) reconcileNormal(executionMode operatorv1.OperationExecutionMode, taskgroup *operatorv1.RuntimeTaskGroup, log logr.Logger) error { - // If the TaskGroup doesn't have finalizer, add it. - //if !util.Contains(taskgroup.Finalizers, operatorv1alpha1.TaskGroupFinalizer) { - // taskgroup.Finalizers = append(taskgroup.Finalizers, operatorv1alpha1.TaskGroupFinalizer) - //} - +func (r *RuntimeTaskGroupReconciler) reconcileTasks(executionMode operatorv1.OperationExecutionMode, taskgroup *operatorv1.RuntimeTaskGroup, log logr.Logger) (*taskReconcileList, error) { // gets all the Node object matching the taskgroup.Spec.NodeSelector // those are the Node where the task taskgroup.Spec.Template should be replicated (desired tasks) nodes, err := listNodesBySelector(r.Client, &taskgroup.Spec.NodeSelector) if err != nil { - return errors.Wrap(err, "failed to list nodes") + return nil, errors.Wrap(err, "failed to list nodes") } desired := filterNodes(nodes, taskgroup.Spec.GetTypedTaskGroupNodeFilter()) @@ -177,7 +170,7 @@ func (r *RuntimeTaskGroupReconciler) reconcileNormal(executionMode operatorv1.Op // those are the current Task objects controlled by this deployment current, err := listTasksBySelector(r.Client, &taskgroup.Spec.Selector) if err != nil { - return errors.Wrap(err, "failed to list tasks") + return nil, errors.Wrap(err, "failed to list tasks") } log.Info("reconciling", "Nodes", len(desired), "Tasks", len(current.Items)) @@ -192,47 +185,58 @@ func (r *RuntimeTaskGroupReconciler) reconcileNormal(executionMode operatorv1.Op taskgroup.Status.FailedNodes = int32(len(tasks.failed)) taskgroup.Status.InvalidNodes = int32(len(tasks.invalid)) + return tasks, nil +} + +func (r *RuntimeTaskGroupReconciler) reconcileNormal(executionMode operatorv1.OperationExecutionMode, taskgroup *operatorv1.RuntimeTaskGroup, tasks *taskReconcileList, log logr.Logger) error { + // If the TaskGroup doesn't have finalizer, add it. + //if !util.Contains(taskgroup.Finalizers, operatorv1alpha1.TaskGroupFinalizer) { + // taskgroup.Finalizers = append(taskgroup.Finalizers, operatorv1alpha1.TaskGroupFinalizer) + //} + // If there are Tasks not yet completed (pending or running), cleanup error messages (required e.g. after recovery) // NB. It is necessary to give priority to running vs errors so the operation controller keeps alive/restarts // the DaemonsSet for processing tasks - activeTask := len(tasks.pending) + len(tasks.running) - if activeTask > 0 { + if tasks.activeTasks() > 0 { taskgroup.Status.ResetError() - } - - // if there are invalid combinations (e.g. a Node with more than one Task, or a Task without a Node), - // stop creating new Tasks and eventually set the error - if len(tasks.invalid) > 0 { - if activeTask == 0 { + } else { + // if there are invalid combinations (e.g. a Node with more than one Task, or a Task without a Node), + // set the error and stop creating new Tasks + if len(tasks.invalid) > 0 { taskgroup.Status.SetError( operatorerrors.NewRuntimeTaskGroupReconciliationError("something invalid"), ) + return nil } - return nil - } - // if there are failed tasks - // stop creating new Tasks and eventually set the error - if len(tasks.failed) > 0 { - if activeTask == 0 { + // if there are failed tasks + // set the error and stop creating new Tasks + if len(tasks.failed) > 0 { taskgroup.Status.SetError( operatorerrors.NewRuntimeTaskGroupReplicaError("something failed"), ) + return nil } - return nil } // TODO: manage adopt tasks/tasks to be orphaned - // At this point we are sure that there are no invalid Node/Task combinations or failed task + // if nil, set the TaskGroup start time + if taskgroup.Status.StartTime == nil { + taskgroup.Status.SetStartTime() + + //TODO: add a signature so we can detect if someone/something changes the taskgroup while it is processed + + return nil + } // if the completed Task have reached the number of expected Task, the TaskGroup is completed // NB. we are doing this before checking pause because if everything is completed, does not make sense to pause if len(tasks.completed) == len(tasks.all) { - // NB. we are setting this condition explicitly in order to avoid that the deployment accidentally + // NB. we are setting this condition explicitly in order to avoid that the taskGroup accidentally // restarts to create tasks - taskgroup.Status.Paused = false taskgroup.Status.SetCompletionTime() + return nil } // if the TaskGroup is paused, return @@ -242,19 +246,12 @@ func (r *RuntimeTaskGroupReconciler) reconcileNormal(executionMode operatorv1.Op // otherwise, proceed creating tasks - // if nil, set the TaskGroup start time - if taskgroup.Status.StartTime == nil { - taskgroup.Status.SetStartTime() - - //TODO: add a signature so we can detect if someone/something changes the taskgroup while it is processed - } - // if there are still Tasks to be created if len(tasks.tobeCreated) > 0 { //TODO: manage different deployment strategy e.g. parallel // if there no existing Tasks not yet completed (pending or running) - if activeTask == 0 { + if tasks.activeTasks() == 0 { // create a Task for the next node in the ordered sequence nextNode := tasks.tobeCreated[0].node.Name log.WithValues("node-name", nextNode).Info("creating task") @@ -281,11 +278,12 @@ func (r *RuntimeTaskGroupReconciler) createTasksReplica(executionMode operatorv1 task := &operatorv1.RuntimeTask{ TypeMeta: metav1.TypeMeta{ - Kind: gv.WithKind("Task").Kind, + Kind: "RuntimeTask", APIVersion: gv.String(), }, ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%s-%s", taskgroup.Name, nodeName), //TODO: GeneratedName? + Namespace: taskgroup.Namespace, Labels: taskgroup.Spec.Template.Labels, Annotations: taskgroup.Spec.Template.Annotations, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(taskgroup, taskgroup.GroupVersionKind())}, @@ -312,33 +310,36 @@ func (r *RuntimeTaskGroupReconciler) reconcileDelete(taskgroup *operatorv1.Runti } func (r *RuntimeTaskGroupReconciler) reconcilePhase(taskgroup *operatorv1.RuntimeTaskGroup) { - // Set the phase to "pending" if nil. - if taskgroup.Status.Phase == "" { - taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhasePending) - } - - // Set the phase to "running" if start date is set. - if taskgroup.Status.StartTime != nil { - taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseRunning) - } - - // Set the phase to "paused" if paused set. - if taskgroup.Status.Paused { - taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhasePaused) - } - - // Set the phase to "succeeded" if completion date is set. - if taskgroup.Status.CompletionTime != nil { - taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseSucceeded) + // Set the phase to "deleting" if the deletion timestamp is set. + if !taskgroup.DeletionTimestamp.IsZero() { + taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseDeleted) + return } // Set the phase to "failed" if any of Status.ErrorReason or Status.ErrorMessage is not nil. if taskgroup.Status.ErrorReason != nil || taskgroup.Status.ErrorMessage != nil { taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseFailed) + return } - // Set the phase to "deleting" if the deletion timestamp is set. - if !taskgroup.DeletionTimestamp.IsZero() { - taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseDeleted) + // Set the phase to "succeeded" if completion date is set. + if taskgroup.Status.CompletionTime != nil { + taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseSucceeded) + return } + + // Set the phase to "paused" if paused set. + if taskgroup.Status.Paused { + taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhasePaused) + return + } + + // Set the phase to "running" if start date is set. + if taskgroup.Status.StartTime != nil { + taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseRunning) + return + } + + // Set the phase to "pending". + taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhasePending) } diff --git a/operator/controllers/runtimetaskgroup_controller_test.go b/operator/controllers/runtimetaskgroup_controller_test.go index 7283f28..80d378e 100644 --- a/operator/controllers/runtimetaskgroup_controller_test.go +++ b/operator/controllers/runtimetaskgroup_controller_test.go @@ -17,11 +17,16 @@ limitations under the License. package controllers import ( + "context" "reflect" "testing" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/runtime/log" operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1" ) @@ -249,3 +254,458 @@ func TestRuntimeTaskGorupReconcilePauseOverride(t *testing.T) { }) } } + +func TestRuntimeTaskGroupReconciler_createTasksReplica(t *testing.T) { + type args struct { + executionMode operatorv1.OperationExecutionMode + taskgroup *operatorv1.RuntimeTaskGroup + nodeName string + } + tests := []struct { + name string + args args + want *operatorv1.RuntimeTask + wantErr bool + }{ + { + name: "Create a task", + args: args{ + executionMode: operatorv1.OperationExecutionModeAuto, + taskgroup: &operatorv1.RuntimeTaskGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTaskGroup", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-taskgroup", + }, + Spec: operatorv1.RuntimeTaskGroupSpec{ + Template: operatorv1.RuntimeTaskTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo-label": "foo"}, + Annotations: map[string]string{"foo-annotation": "foo"}, + }, + Spec: operatorv1.RuntimeTaskSpec{ + Commands: []operatorv1.CommandDescriptor{ + {}, + }, + }, + }, + }, + }, + nodeName: "foo-node", + }, + want: &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-taskgroup-foo-node", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: operatorv1.GroupVersion.String(), + Kind: "RuntimeTaskGroup", + Name: "foo-taskgroup", + UID: "", + Controller: boolPtr(true), + BlockOwnerDeletion: boolPtr(true), + }, + }, + CreationTimestamp: metav1.Time{}, //using zero as a marker for "whatever time it was created" + Labels: map[string]string{"foo-label": "foo"}, + Annotations: map[string]string{"foo-annotation": "foo"}, + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "foo-node", + Commands: []operatorv1.CommandDescriptor{ + {}, + }, + }, + Status: operatorv1.RuntimeTaskStatus{ + Phase: string(operatorv1.RuntimeTaskPhasePending), + Paused: false, + }, + }, + wantErr: false, + }, + { + name: "Create a paused task if execution mode=Controlled", + args: args{ + executionMode: operatorv1.OperationExecutionModeControlled, + taskgroup: &operatorv1.RuntimeTaskGroup{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTaskGroup", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-taskgroup", + }, + Spec: operatorv1.RuntimeTaskGroupSpec{ + Template: operatorv1.RuntimeTaskTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"foo-label": "foo"}, + Annotations: map[string]string{"foo-annotation": "foo"}, + }, + Spec: operatorv1.RuntimeTaskSpec{ + Commands: []operatorv1.CommandDescriptor{ + {}, + }, + }, + }, + }, + }, + nodeName: "foo-node", + }, + want: &operatorv1.RuntimeTask{ + TypeMeta: metav1.TypeMeta{ + Kind: "RuntimeTask", + APIVersion: operatorv1.GroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo-taskgroup-foo-node", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: operatorv1.GroupVersion.String(), + Kind: "RuntimeTaskGroup", + Name: "foo-taskgroup", + UID: "", + Controller: boolPtr(true), + BlockOwnerDeletion: boolPtr(true), + }, + }, + CreationTimestamp: metav1.Time{}, //using zero as a marker for "whatever time it was created" + Labels: map[string]string{"foo-label": "foo"}, + Annotations: map[string]string{"foo-annotation": "foo"}, + }, + Spec: operatorv1.RuntimeTaskSpec{ + NodeName: "foo-node", + Commands: []operatorv1.CommandDescriptor{ + {}, + }, + }, + Status: operatorv1.RuntimeTaskStatus{ + Phase: string(operatorv1.RuntimeTaskPhasePending), + Paused: true, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &RuntimeTaskGroupReconciler{ + Client: fake.NewFakeClientWithScheme(setupScheme()), + Log: log.Log, + } + if err := r.createTasksReplica(tt.args.executionMode, tt.args.taskgroup, tt.args.nodeName); (err != nil) != tt.wantErr { + t.Errorf("createTasksReplica() error = %v, wantErr %v", err, tt.wantErr) + } + + got := &operatorv1.RuntimeTask{} + key := client.ObjectKey{ + Namespace: tt.want.Namespace, + Name: tt.want.Name, + } + if err := r.Client.Get(context.Background(), key, got); err != nil { + t.Errorf("Get() error = %v", err) + return + } + + fixupWantTask(tt.want, got) + + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("createTasksReplica() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestRuntimeTaskGroupReconciler_reconcileNormal(t *testing.T) { + type args struct { + executionMode operatorv1.OperationExecutionMode + taskgroup *operatorv1.RuntimeTaskGroup + tasks *taskReconcileList + } + type want struct { + taskgroup *operatorv1.RuntimeTaskGroup + } + tests := []struct { + name string + args args + want want + wantTasks int + wantErr bool + }{ + { + name: "Reconcile sets error if a task is failed and no task is active", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{}, + tasks: &taskReconcileList{ + all: []*taskReconcileItem{ + {}, + }, + failed: []*taskReconcileItem{ + {}, + }, + }, + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + ErrorMessage: stringPtr("error"), //using error as a marker for "whatever error was raised" + }, + }, + }, + wantTasks: 0, + wantErr: false, + }, + { + name: "Reconcile sets error if a task is invalid and no task is active", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{}, + tasks: &taskReconcileList{ + all: []*taskReconcileItem{ + {}, + }, + invalid: []*taskReconcileItem{ + {}, + }, + }, + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + ErrorMessage: stringPtr("error"), //using error as a marker for "whatever error was raised" + }, + }, + }, + wantTasks: 0, + wantErr: false, + }, + { + name: "Reconcile set start time", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{}, + tasks: &taskReconcileList{}, + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTasks: 0, + wantErr: false, + }, + { + name: "Reconcile reset error if a task is active", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + ErrorMessage: stringPtr("error"), + }, + }, + tasks: &taskReconcileList{ + all: []*taskReconcileItem{ + {}, + {}, + }, + running: []*taskReconcileItem{ + {}, + }, + failed: []*taskReconcileItem{ // failed should be ignored if a task is active + {}, + }, + }, + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTasks: 0, + wantErr: false, + }, + { + name: "Reconcile set completion time if no more task to create", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Now()), + }, + }, + tasks: &taskReconcileList{}, //empty list of nodes -> no more task to create + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + CompletionTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it completed" + }, + }, + }, + wantTasks: 0, + wantErr: false, + }, + { + name: "Reconcile do nothing if paused", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Now()), + Paused: true, + }, + }, + tasks: &taskReconcileList{ + all: []*taskReconcileItem{ + {}, + }, + tobeCreated: []*taskReconcileItem{ + {}, + }, + }, + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + Paused: true, + }, + }, + }, + wantTasks: 0, + wantErr: false, + }, + { + name: "Reconcile creates a task if nothing running", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Now()), + }, + }, + tasks: &taskReconcileList{ + all: []*taskReconcileItem{ + {}, + }, + tobeCreated: []*taskReconcileItem{ + { + node: &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + }, + }, + }, + }, + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTasks: 1, + wantErr: false, + }, + { + name: "Reconcile does not creates a task if something running", + args: args{ + executionMode: "", + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Now()), + }, + }, + tasks: &taskReconcileList{ + all: []*taskReconcileItem{ + {}, + {}, + }, + tobeCreated: []*taskReconcileItem{ + {}, + }, + running: []*taskReconcileItem{ + {}, + }, + }, + }, + want: want{ + taskgroup: &operatorv1.RuntimeTaskGroup{ + Status: operatorv1.RuntimeTaskGroupStatus{ + StartTime: timePtr(metav1.Time{}), //using zero as a marker for "whatever time it started" + }, + }, + }, + wantTasks: 0, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := fake.NewFakeClientWithScheme(setupScheme()) + + r := &RuntimeTaskGroupReconciler{ + Client: c, + recorder: record.NewFakeRecorder(1), + Log: log.Log, + } + if err := r.reconcileNormal(tt.args.executionMode, tt.args.taskgroup, tt.args.tasks, r.Log); (err != nil) != tt.wantErr { + t.Errorf("reconcileNormal() error = %v, wantErr %v", err, tt.wantErr) + } + + fixupWantTaskGroup(tt.want.taskgroup, tt.args.taskgroup) + + if !reflect.DeepEqual(tt.args.taskgroup, tt.want.taskgroup) { + t.Errorf("reconcileNormal() = %v, want %v", tt.args.taskgroup, tt.want.taskgroup) + } + + tasks := &operatorv1.RuntimeTaskList{} + if err := c.List(context.Background(), tasks); err != nil { + t.Fatalf("List() error = %v", err) + } + + if len(tasks.Items) != tt.wantTasks { + t.Errorf("reconcileNormal() = %v tasks, want %v tasks", len(tasks.Items), tt.wantTasks) + } + }) + } +} + +func fixupWantTaskGroup(want *operatorv1.RuntimeTaskGroup, got *operatorv1.RuntimeTaskGroup) { + // In case want.StartTime is a marker, replace it with the current CompletionTime + if want.CreationTimestamp.IsZero() { + want.CreationTimestamp = got.CreationTimestamp + } + + // In case want.ErrorMessage is a marker, replace it with the current error + if want.Status.ErrorMessage != nil && *want.Status.ErrorMessage == "error" && got.Status.ErrorMessage != nil { + want.Status.ErrorMessage = got.Status.ErrorMessage + want.Status.ErrorReason = got.Status.ErrorReason + } + + // In case want.StartTime is a marker, replace it with the current CompletionTime + if want.Status.StartTime != nil && want.Status.StartTime.IsZero() && got.Status.StartTime != nil { + want.Status.StartTime = got.Status.StartTime + } + // In case want.CompletionTime is a marker, replace it with the current CompletionTime + if want.Status.CompletionTime != nil && want.Status.CompletionTime.IsZero() && got.Status.CompletionTime != nil { + want.Status.CompletionTime = got.Status.CompletionTime + } +} diff --git a/operator/controllers/runtimetaskgroup_reconcile.go b/operator/controllers/runtimetaskgroup_reconcile.go index 0d475c8..2e24201 100644 --- a/operator/controllers/runtimetaskgroup_reconcile.go +++ b/operator/controllers/runtimetaskgroup_reconcile.go @@ -141,3 +141,7 @@ func (a *taskGroupReconcileList) deriveViews() { } } } + +func (a *taskGroupReconcileList) activeTaskGroups() int { + return len(a.pending) + len(a.running) +} diff --git a/operator/controllers/suite_test.go b/operator/controllers/suite_test.go deleted file mode 100644 index eb85692..0000000 --- a/operator/controllers/suite_test.go +++ /dev/null @@ -1,89 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controllers - -import ( - "path/filepath" - "testing" - - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - - "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/envtest" - logf "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/log/zap" - - operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1" - // +kubebuilder:scaffold:imports -) - -// These tests use Ginkgo (BDD-style Go testing framework). Refer to -// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. - -//TODO: Add tests in a follow up PR - -var cfg *rest.Config -var k8sClient client.Client -var testEnv *envtest.Environment - -func TestAPIs(t *testing.T) { - RegisterFailHandler(Fail) - - RunSpecsWithDefaultAndCustomReporters(t, - "Controller Suite", - []Reporter{envtest.NewlineReporter{}}) -} - -var _ = BeforeSuite(func(done Done) { - logf.SetLogger(zap.LoggerTo(GinkgoWriter, true)) - - By("bootstrapping test environment") - testEnv = &envtest.Environment{ - CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")}, - } - - var err error - cfg, err = testEnv.Start() - Expect(err).ToNot(HaveOccurred()) - Expect(cfg).ToNot(BeNil()) - - err = operatorv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = operatorv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - err = operatorv1.AddToScheme(scheme.Scheme) - Expect(err).NotTo(HaveOccurred()) - - // +kubebuilder:scaffold:scheme - - k8sClient, err = client.New(cfg, client.Options{Scheme: scheme.Scheme}) - Expect(err).ToNot(HaveOccurred()) - Expect(k8sClient).ToNot(BeNil()) - - close(done) -}, 60) - -var _ = AfterSuite(func() { - By("tearing down the test environment") - err := testEnv.Stop() - Expect(err).ToNot(HaveOccurred()) -}) diff --git a/operator/controllers/util.go b/operator/controllers/util.go index 71edbc0..1ff637e 100644 --- a/operator/controllers/util.go +++ b/operator/controllers/util.go @@ -199,7 +199,7 @@ func createDaemonSet(c client.Client, operation *operatorv1.Operation, namespace extraLabels, err := operations.DaemonSetNodeSelectorLabels(operation) if err != nil { - return errors.Wrapf(err, "failed to get operation specific labels DaemonSet %s/%s", daemonSet.Namespace, daemonSet.Name) + return errors.Wrapf(err, "failed to get NodeSelector for the operation DaemonSet %s/%s", daemonSet.Namespace, daemonSet.Name) } if len(extraLabels) > 0 { daemonSet.Spec.Template.Spec.NodeSelector = extraLabels @@ -233,6 +233,9 @@ func createDaemonSet(c client.Client, operation *operatorv1.Operation, namespace } else { // Expose /metrics on default (insecure) port + if daemonSet.Annotations == nil { + daemonSet.Annotations = map[string]string{} + } daemonSet.Annotations["prometheus.io/scrape"] = "true" daemonSet.Spec.Template.Spec.Containers[0].Ports = []corev1.ContainerPort{ { @@ -314,7 +317,6 @@ func operationToTaskGroupRequests(c client.Client, o handler.MapObject) []ctrl.R } func getOwnerOperation(ctx context.Context, c client.Client, obj metav1.ObjectMeta) (*operatorv1.Operation, error) { - //TODO: check for controller ref instead of Operation/Kind for _, ref := range obj.OwnerReferences { if ref.Kind == "Operation" && ref.APIVersion == operatorv1.GroupVersion.String() { operation := &operatorv1.Operation{} @@ -322,14 +324,13 @@ func getOwnerOperation(ctx context.Context, c client.Client, obj metav1.ObjectMe Namespace: obj.Namespace, Name: ref.Name, } - err := c.Get(ctx, key, operation) - if err != nil { + if err := c.Get(ctx, key, operation); err != nil { return nil, errors.Wrapf(err, "error reading controller ref for %s/%s", obj.Namespace, obj.Name) } return operation, nil } } - return nil, nil + return nil, errors.Errorf("missing controller ref for %s/%s", obj.Namespace, obj.Name) } type matchingSelector struct { @@ -418,20 +419,18 @@ func taskGroupToTaskRequests(c client.Client, o handler.MapObject) []ctrl.Reques } func getOwnerTaskGroup(ctx context.Context, c client.Client, obj metav1.ObjectMeta) (*operatorv1.RuntimeTaskGroup, error) { - //TODO: check for controller ref instead of Operation/Kind for _, ref := range obj.OwnerReferences { - if ref.Kind == "TaskGroup" && ref.APIVersion == operatorv1.GroupVersion.String() { + if ref.Kind == "RuntimeTaskGroup" && ref.APIVersion == operatorv1.GroupVersion.String() { taskgroup := &operatorv1.RuntimeTaskGroup{} key := client.ObjectKey{ Namespace: obj.Namespace, Name: ref.Name, } - err := c.Get(ctx, key, taskgroup) - if err != nil { + if err := c.Get(ctx, key, taskgroup); err != nil { return nil, errors.Wrapf(err, "error reading controller ref for %s/%s", obj.Namespace, obj.Name) } return taskgroup, nil } } - return nil, nil + return nil, errors.Errorf("missing controller ref for %s/%s", obj.Namespace, obj.Name) } diff --git a/operator/controllers/util_test.go b/operator/controllers/util_test.go new file mode 100644 index 0000000..a96b04d --- /dev/null +++ b/operator/controllers/util_test.go @@ -0,0 +1,107 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/kubeadm/operator/api/v1alpha1" + operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1" +) + +//TODO: more tests + +func Test_filterNodes(t *testing.T) { + nodes := &corev1.NodeList{ + Items: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "n3", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "n1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "n2", + }, + }, + }, + } + + type args struct { + filter v1alpha1.RuntimeTaskGroupNodeFilter + } + tests := []struct { + name string + args args + want []corev1.Node + }{ + { + name: "filter all return all nodes", + args: args{ + filter: operatorv1.RuntimeTaskGroupNodeFilterAll, + }, + want: nodes.Items, + }, + { + name: "Filter head return the first node", + args: args{ + filter: operatorv1.RuntimeTaskGroupNodeFilterHead, + }, + want: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "n1", + }, + }, + }, + }, + { + name: "Filter tail return the last two nodes", + args: args{ + filter: operatorv1.RuntimeTaskGroupNodeFilterTail, + }, + want: []corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "n2", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "n3", + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := filterNodes(nodes, tt.args.filter); !reflect.DeepEqual(got, tt.want) { + t.Errorf("filterNodes() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/operator/go.mod b/operator/go.mod index 3441768..5720712 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -4,8 +4,6 @@ go 1.12 require ( github.com/go-logr/logr v0.1.0 - github.com/onsi/ginkgo v1.8.0 - github.com/onsi/gomega v1.5.0 github.com/pkg/errors v0.8.1 k8s.io/api v0.0.0-20190409021203-6e4e0e4f393b k8s.io/apimachinery v0.0.0-20190404173353-6a84e37a896d diff --git a/operator/main.go b/operator/main.go index 1c6c257..749db6e 100644 --- a/operator/main.go +++ b/operator/main.go @@ -20,6 +20,7 @@ import ( "flag" "os" + "github.com/pkg/errors" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -64,25 +65,30 @@ func main() { var enableLeaderElection bool // common flags - flag.StringVar(&mode, "mode", string(modeManager), "One of [manger, agent].") - flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.") + flag.StringVar(&mode, "mode", string(modeManager), "One of [manger, agent]") + flag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to") // manager flags - flag.StringVar(&pod, "manager-pod", "", "The pod the manager is running in.") - flag.StringVar(&namespace, "manager-namespace", "", "The namespace the manager is running in.") - flag.StringVar(&image, "agent-image", "", "The image that should be used for agent the DaemonSet. If empty, the manager image will be used.") - flag.BoolVar(&metricsRBAC, "agent-metrics-rbac", true, "Expose the /metrics endpoint for agent the DaemonSet with RBAC authn/z.") + flag.StringVar(&pod, "manager-pod", "", "The pod the manager is running in") + flag.StringVar(&namespace, "manager-namespace", "", "The namespace the manager is running in") //TODO: implement in all the controllers + flag.StringVar(&image, "agent-image", "", "The image that should be used for agent the DaemonSet. If empty, the manager image will be used") //TODO: remove; always use manager image + flag.BoolVar(&metricsRBAC, "agent-metrics-rbac", true, "Use RBAC authn/z for the /metrics endpoint of agents") flag.BoolVar(&enableLeaderElection, "enable-leader-election", false, - "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager.") + "Enable leader election for controller manager. Enabling this will ensure there is only one active controller manager") // agent flags - flag.StringVar(&nodeName, "agent-node-name", "", "The node that the agent manager should control.") - flag.StringVar(&operation, "agent-operation", "", "The operation that the agent manager should control. If empty, the agent will control headless Task only.") + flag.StringVar(&nodeName, "agent-node-name", "", "The node that the agent manager should control") + flag.StringVar(&operation, "agent-operation", "", "The operation that the agent manager should control. If empty, the agent will control headless Task only") flag.Parse() ctrl.SetLogger(klogr.New()) + if managerMode(mode) != modeManager && managerMode(mode) != modeAgent { + setupLog.Error(errors.New("invalid value"), "unable to create controllers with an invalid --mode value") + os.Exit(1) + } + if managerMode(mode) == modeAgent { enableLeaderElection = false } @@ -122,6 +128,15 @@ func main() { } if managerMode(mode) == modeAgent { + if nodeName == "" { + setupLog.Error(err, "unable to create controller without the --agent-node-name value set", "controller", "RuntimeTask") + os.Exit(1) + } + if nodeName == "" { + setupLog.Error(err, "unable to create controller without the --agent-operation value set", "controller", "RuntimeTask") + os.Exit(1) + } + if err = (&controllers.RuntimeTaskReconciler{ Client: mgr.GetClient(), NodeName: nodeName,