Merge pull request #1861 from fabriziopandini/taskgroup-controller

kubeadm-operator: implement the taskgroup controller
This commit is contained in:
Kubernetes Prow Robot 2019-10-31 07:59:37 -07:00 committed by GitHub
commit fe0d2ce5a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 561 additions and 12 deletions

View File

@ -0,0 +1,151 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"sort"
corev1 "k8s.io/api/core/v1"
operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1"
)
// Reconcile Task is implemented by matching current Nodes and desired Task, so the controller
// can determine what is necessary to do next
// taskReconcileItem defines match between desired Task and the corresponding current Node match.
// supported combinations are:
// - desired existing, current missing (current to be created)
// - desired existing, current existing (current to be operated)
// - desired missing, current existing (invalid)
type taskReconcileItem struct {
name string
node *corev1.Node
tasks []operatorv1.RuntimeTask
}
func newTaskGroupChildProxy(node *corev1.Node, tasks ...operatorv1.RuntimeTask) *taskReconcileItem {
var name string
if node != nil {
name = node.Name
} else {
name = tasks[0].Spec.NodeName
}
return &taskReconcileItem{
name: name,
node: node,
tasks: tasks,
}
}
type taskReconcileList struct {
all []*taskReconcileItem
invalid []*taskReconcileItem
tobeCreated []*taskReconcileItem
pending []*taskReconcileItem
running []*taskReconcileItem
completed []*taskReconcileItem
failed []*taskReconcileItem
}
func reconcileTasks(nodes []corev1.Node, tasks *operatorv1.RuntimeTaskList) *taskReconcileList {
// Build an empty match for each desired Task (1 for each node)
// N.B. we are storing matches in a Map so we can match Node and Task by NodeName
matchMap := map[string]*taskReconcileItem{}
for _, n := range nodes {
x := n // copies the node to a local variable in order to avoid it to get overridden at the next iteration
matchMap[x.Name] = newTaskGroupChildProxy(&x)
}
// Match the the current Task with desired Task (1 for each node in scope).
for _, t := range tasks.Items {
// in case a current task has a corresponding desired task, match them
// NB. if there are more that one match, we track this, but this is an inconsistency
// (more that one Task for the same node)
if v, ok := matchMap[t.Spec.NodeName]; ok {
// TODO(fabriziopandini): might be we want to check if the task was exactly the expected task
v.tasks = append(v.tasks, t)
continue
}
// in case a current task does not have desired task, we track this, but this is an inconsistency
// (a Task does not matching any existing node)
matchMap[t.Spec.NodeName] = newTaskGroupChildProxy(nil, t)
}
// Transpose the matchMap into a list
matchList := &taskReconcileList{
all: []*taskReconcileItem{},
invalid: []*taskReconcileItem{},
tobeCreated: []*taskReconcileItem{},
pending: []*taskReconcileItem{},
running: []*taskReconcileItem{},
completed: []*taskReconcileItem{},
failed: []*taskReconcileItem{},
}
for _, v := range matchMap {
matchList.all = append(matchList.all, v)
}
// ensure the list is sorted in a predictable way
sort.Slice(matchList.all, func(i, j int) bool { return matchList.all[i].name < matchList.all[j].name })
// Build all the derived views, so we can have a quick glance at tasks in different states
matchList.deriveViews()
return matchList
}
func (t *taskReconcileList) deriveViews() {
for _, v := range t.all {
switch {
case v.node != nil:
switch len(v.tasks) {
case 0:
// If there is no Task for a Node, the task has to be created by this controller
t.tobeCreated = append(t.tobeCreated, v)
case 1:
// Failed (and not recovering)
if (v.tasks[0].Status.ErrorReason != nil || v.tasks[0].Status.ErrorMessage != nil) &&
(v.tasks[0].Spec.GetTypedTaskRecoveryStrategy() == operatorv1.RuntimeTaskRecoveryUnknownStrategy) {
t.failed = append(t.failed, v)
continue
}
// Completed
if v.tasks[0].Status.CompletionTime != nil {
t.completed = append(t.completed, v)
continue
}
// Running (nb. paused Task or recovering Task fall into this counter)
if v.tasks[0].Status.StartTime != nil {
t.running = append(t.running, v)
continue
}
// Pending
t.pending = append(t.pending, v)
default:
// if there are more that one Task for the same node, this is an invalid condition
// NB. in this case it counts as a single replica, even if there are more than one Task
t.invalid = append(t.invalid, v)
}
case v.node == nil:
// if there is a Task without matching node, this is an invalid condition
t.invalid = append(t.invalid, v)
}
}
}

View File

@ -18,36 +18,326 @@ package controllers
import (
"context"
"fmt"
"github.com/go-logr/logr"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1"
operatorerrors "k8s.io/kubeadm/operator/errors"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
// RuntimeTaskGroupReconciler reconciles a RuntimeTaskGroup object
type RuntimeTaskGroupReconciler struct {
client.Client
Log logr.Logger
recorder record.EventRecorder
Log logr.Logger
}
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch
// +kubebuilder:rbac:groups=operator.kubeadm.x-k8s.io,resources=runtimetaskgroups,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=operator.kubeadm.x-k8s.io,resources=runtimetaskgroups/status,verbs=get;update;patch
// SetupWithManager configures the controller for calling the reconciler
func (r *RuntimeTaskGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
var mapFunc handler.ToRequestsFunc = func(o handler.MapObject) []reconcile.Request {
return operationToTaskGroupRequests(r.Client, o)
}
err := ctrl.NewControllerManagedBy(mgr).
For(&operatorv1.RuntimeTaskGroup{}).
Owns(&operatorv1.RuntimeTask{}).
Watches(
&source.Kind{Type: &operatorv1.Operation{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: mapFunc},
).
Complete(r)
r.recorder = mgr.GetEventRecorderFor("taskgroup-controller")
return err
}
// Reconcile a runtimetaskgroup
func (r *RuntimeTaskGroupReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
_ = context.Background()
_ = r.Log.WithValues("runtimetaskgroup", req.NamespacedName)
func (r *RuntimeTaskGroupReconciler) Reconcile(req ctrl.Request) (_ ctrl.Result, rerr error) {
ctx := context.Background()
log := r.Log.WithValues("task-group", req.NamespacedName)
// your logic here
// Fetch the TaskGroup instance
taskgroup := &operatorv1.RuntimeTaskGroup{}
if err := r.Client.Get(ctx, req.NamespacedName, taskgroup); err != nil {
if apierrors.IsNotFound(err) {
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
// Ignore the TaskGroup if it is already completed or failed
if taskgroup.Status.CompletionTime != nil {
return ctrl.Result{}, nil
}
// Fetch the Operation instance
operation, err := getOwnerOperation(ctx, r.Client, taskgroup.ObjectMeta)
if err != nil {
return ctrl.Result{}, err
}
// Initialize the patch helper
patchHelper, err := patch.NewHelper(taskgroup, r)
if err != nil {
return ctrl.Result{}, err
}
// Always attempt to Patch the TaskGroup object and status after each reconciliation.
defer func() {
if err := patchHelper.Patch(ctx, taskgroup); err != nil {
log.Error(err, "failed to patch TaskGroup")
if rerr == nil {
rerr = err
}
}
}()
// Reconcile the TaskGroup
if err := r.reconcileTaskGroup(operation, taskgroup, log); err != nil {
if requeueErr, ok := errors.Cause(err).(capierrors.HasRequeueAfterError); ok {
return ctrl.Result{Requeue: true, RequeueAfter: requeueErr.GetRequeueAfter()}, nil
}
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
// SetupWithManager configures the controller for calling the reconciler
func (r *RuntimeTaskGroupReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&operatorv1.RuntimeTaskGroup{}).
Complete(r)
func (r *RuntimeTaskGroupReconciler) reconcileTaskGroup(operation *operatorv1.Operation, taskgroup *operatorv1.RuntimeTaskGroup, log logr.Logger) (err error) {
// gets relevant settings from top level objects (or use defaults)
executionMode := operatorv1.OperationExecutionModeAuto
operationPaused := false
if operation != nil {
executionMode = operation.Spec.GetTypedOperationExecutionMode()
operationPaused = operation.Status.Paused
}
// Reconcile paused override from top level objects
r.reconcilePauseOverride(operationPaused, taskgroup)
// Handle deleted TaskGroup
if !taskgroup.DeletionTimestamp.IsZero() {
err = r.reconcileDelete(taskgroup)
if err != nil {
return
}
}
// Handle non-deleted TaskGroup
err = r.reconcileNormal(executionMode, taskgroup, log)
if err != nil {
return
}
// Always reconcile Phase at the end
r.reconcilePhase(taskgroup)
return
}
func (r *RuntimeTaskGroupReconciler) reconcilePauseOverride(operationPaused bool, taskgroup *operatorv1.RuntimeTaskGroup) {
// record paused override state change, if any
taskgrouppaused := operationPaused
recordPausedChange(r.recorder, taskgroup, taskgroup.Status.Paused, taskgrouppaused, "by top level objects")
// update status with paused override setting from top level objects
taskgroup.Status.Paused = taskgrouppaused
}
func (r *RuntimeTaskGroupReconciler) reconcileNormal(executionMode operatorv1.OperationExecutionMode, taskgroup *operatorv1.RuntimeTaskGroup, log logr.Logger) error {
// If the TaskGroup doesn't have finalizer, add it.
//if !util.Contains(taskgroup.Finalizers, operatorv1alpha1.TaskGroupFinalizer) {
// taskgroup.Finalizers = append(taskgroup.Finalizers, operatorv1alpha1.TaskGroupFinalizer)
//}
// gets all the Node object matching the taskgroup.Spec.NodeSelector
// those are the Node where the task taskgroup.Spec.Template should be replicated (desired tasks)
nodes, err := listNodesBySelector(r.Client, &taskgroup.Spec.NodeSelector)
if err != nil {
return errors.Wrap(err, "failed to list nodes")
}
desired := filterNodes(nodes, taskgroup.Spec.GetTypedTaskGroupNodeFilter())
// gets all the Task objects matching the taskgroup.Spec.Selector.
// those are the current Task objects controlled by this deployment
current, err := listTasksBySelector(r.Client, &taskgroup.Spec.Selector)
if err != nil {
return errors.Wrap(err, "failed to list tasks")
}
log.Info("reconciling", "Nodes", len(desired), "Tasks", len(current.Items))
// match current and desired state, so the controller can determine what is necessary to do next
tasks := reconcileTasks(desired, current)
// update replica counters
taskgroup.Status.Nodes = int32(len(tasks.all))
taskgroup.Status.RunningNodes = int32(len(tasks.running))
taskgroup.Status.SucceededNodes = int32(len(tasks.completed))
taskgroup.Status.FailedNodes = int32(len(tasks.failed))
taskgroup.Status.InvalidNodes = int32(len(tasks.invalid))
// If there are Tasks not yet completed (pending or running), cleanup error messages (required e.g. after recovery)
// NB. It is necessary to give priority to running vs errors so the operation controller keeps alive/restarts
// the DaemonsSet for processing tasks
activeTask := len(tasks.pending) + len(tasks.running)
if activeTask > 0 {
taskgroup.Status.ResetError()
}
// if there are invalid combinations (e.g. a Node with more than one Task, or a Task without a Node),
// stop creating new Tasks and eventually set the error
if len(tasks.invalid) > 0 {
if activeTask == 0 {
taskgroup.Status.SetError(
operatorerrors.NewRuntimeTaskGroupReconciliationError("something invalid"),
)
}
return nil
}
// if there are failed tasks
// stop creating new Tasks and eventually set the error
if len(tasks.failed) > 0 {
if activeTask == 0 {
taskgroup.Status.SetError(
operatorerrors.NewRuntimeTaskGroupReplicaError("something failed"),
)
}
return nil
}
// TODO: manage adopt tasks/tasks to be orphaned
// At this point we are sure that there are no invalid Node/Task combinations or failed task
// if the completed Task have reached the number of expected Task, the TaskGroup is completed
// NB. we are doing this before checking pause because if everything is completed, does not make sense to pause
if len(tasks.completed) == len(tasks.all) {
// NB. we are setting this condition explicitly in order to avoid that the deployment accidentally
// restarts to create tasks
taskgroup.Status.Paused = false
taskgroup.Status.SetCompletionTime()
}
// if the TaskGroup is paused, return
if taskgroup.Status.Paused {
return nil
}
// otherwise, proceed creating tasks
// if nil, set the TaskGroup start time
if taskgroup.Status.StartTime == nil {
taskgroup.Status.SetStartTime()
//TODO: add a signature so we can detect if someone/something changes the taskgroup while it is processed
}
// if there are still Tasks to be created
if len(tasks.tobeCreated) > 0 {
//TODO: manage different deployment strategy e.g. parallel
// if there no existing Tasks not yet completed (pending or running)
if activeTask == 0 {
// create a Task for the next node in the ordered sequence
nextNode := tasks.tobeCreated[0].node.Name
log.WithValues("node-name", nextNode).Info("creating task")
err := r.createTasksReplica(executionMode, taskgroup, nextNode)
if err != nil {
return errors.Wrap(err, "Failed to create Task replica")
}
}
}
return nil
}
func (r *RuntimeTaskGroupReconciler) createTasksReplica(executionMode operatorv1.OperationExecutionMode, taskgroup *operatorv1.RuntimeTaskGroup, nodeName string) error {
r.Log.Info("Creating task replica", "node", nodeName)
gv := operatorv1.GroupVersion
paused := false
if executionMode == operatorv1.OperationExecutionModeControlled {
paused = true
}
task := &operatorv1.RuntimeTask{
TypeMeta: metav1.TypeMeta{
Kind: gv.WithKind("Task").Kind,
APIVersion: gv.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", taskgroup.Name, nodeName), //TODO: GeneratedName?
Labels: taskgroup.Spec.Template.Labels,
Annotations: taskgroup.Spec.Template.Annotations,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(taskgroup, taskgroup.GroupVersionKind())},
},
Spec: operatorv1.RuntimeTaskSpec{
NodeName: nodeName,
Commands: taskgroup.Spec.Template.Spec.Commands,
},
Status: operatorv1.RuntimeTaskStatus{
Phase: string(operatorv1.RuntimeTaskPhasePending),
Paused: paused,
},
}
return r.Client.Create(context.Background(), task)
}
func (r *RuntimeTaskGroupReconciler) reconcileDelete(taskgroup *operatorv1.RuntimeTaskGroup) error {
// TaskGroup is deleted so remove the finalizer.
//taskgroup.Finalizers = util.Filter(taskgroup.Finalizers, operatorv1alpha1.TaskGroupFinalizer)
return nil
}
func (r *RuntimeTaskGroupReconciler) reconcilePhase(taskgroup *operatorv1.RuntimeTaskGroup) {
// Set the phase to "pending" if nil.
if taskgroup.Status.Phase == "" {
taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhasePending)
}
// Set the phase to "running" if start date is set.
if taskgroup.Status.StartTime != nil {
taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseRunning)
}
// Set the phase to "paused" if paused set.
if taskgroup.Status.Paused {
taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhasePaused)
}
// Set the phase to "succeeded" if completion date is set.
if taskgroup.Status.CompletionTime != nil {
taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseSucceeded)
}
// Set the phase to "failed" if any of Status.ErrorReason or Status.ErrorMessage is not nil.
if taskgroup.Status.ErrorReason != nil || taskgroup.Status.ErrorMessage != nil {
taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseFailed)
}
// Set the phase to "deleting" if the deletion timestamp is set.
if !taskgroup.DeletionTimestamp.IsZero() {
taskgroup.Status.SetTypedPhase(operatorv1.RuntimeTaskGroupPhaseDeleted)
}
}

View File

@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"sort"
"strings"
"github.com/pkg/errors"
@ -27,12 +28,15 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
operatorv1 "k8s.io/kubeadm/operator/api/v1alpha1"
"k8s.io/kubeadm/operator/operations"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
)
func getImage(c client.Client, namespace, name string) (string, error) {
@ -286,3 +290,107 @@ func recordPausedChange(recorder record.EventRecorder, obj runtime.Object, curre
recorder.Event(obj, corev1.EventTypeNormal, reason, message)
}
}
func operationToTaskGroupRequests(c client.Client, o handler.MapObject) []ctrl.Request {
var result []ctrl.Request
operation, ok := o.Object.(*operatorv1.Operation)
if !ok {
return nil
}
actual, err := listTaskGroupsByLabels(c, operation.Labels)
if err != nil {
return nil
}
for _, ms := range actual.Items {
name := client.ObjectKey{Namespace: ms.Namespace, Name: ms.Name}
result = append(result, ctrl.Request{NamespacedName: name})
}
return result
}
func getOwnerOperation(ctx context.Context, c client.Client, obj metav1.ObjectMeta) (*operatorv1.Operation, error) {
//TODO: check for controller ref instead of Operation/Kind
for _, ref := range obj.OwnerReferences {
if ref.Kind == "Operation" && ref.APIVersion == operatorv1.GroupVersion.String() {
operation := &operatorv1.Operation{}
key := client.ObjectKey{
Namespace: obj.Namespace,
Name: ref.Name,
}
err := c.Get(ctx, key, operation)
if err != nil {
return nil, errors.Wrapf(err, "error reading controller ref for %s/%s", obj.Namespace, obj.Name)
}
return operation, nil
}
}
return nil, nil
}
type matchingSelector struct {
selector labels.Selector
}
func (m matchingSelector) ApplyToList(opts *client.ListOptions) {
opts.LabelSelector = m.selector
}
func listNodesBySelector(c client.Client, selector *metav1.LabelSelector) (*corev1.NodeList, error) {
s, err := metav1.LabelSelectorAsSelector(selector)
if err != nil {
return nil, errors.Wrap(err, "failed to convert TaskGroup.Spec.NodeSelector to a selector")
}
o := matchingSelector{selector: s}
nodes := &corev1.NodeList{}
if err := c.List(
context.Background(), nodes,
o,
); err != nil {
return nil, err
}
return nodes, nil
}
func filterNodes(nodes *corev1.NodeList, filter operatorv1.RuntimeTaskGroupNodeFilter) []corev1.Node {
if len(nodes.Items) == 0 {
return nodes.Items
}
if filter == operatorv1.RuntimeTaskGroupNodeFilterAll || filter == operatorv1.RuntimeTaskGroupNodeUnknownFilter {
return nodes.Items
}
// in order to ensure a predictable result, nodes are sorted by name before applying the filter
sort.Slice(nodes.Items, func(i, j int) bool { return nodes.Items[i].Name < nodes.Items[j].Name })
if filter == operatorv1.RuntimeTaskGroupNodeFilterHead {
return nodes.Items[:1]
}
// filter == operatorv1alpha1.TaskGroupNodeFilterTail
return nodes.Items[1:]
}
func listTasksBySelector(c client.Client, selector *metav1.LabelSelector) (*operatorv1.RuntimeTaskList, error) {
selectorMap, err := metav1.LabelSelectorAsMap(selector)
if err != nil {
return nil, errors.Wrap(err, "failed to convert TaskGroup.Spec.Selector to a selector")
}
tasks := &operatorv1.RuntimeTaskList{}
if err := c.List(
context.Background(), tasks,
client.MatchingLabels(selectorMap),
); err != nil {
return nil, err
}
return tasks, nil
}