chaos-operator/controllers/chaosengine_controller.go

863 lines
34 KiB
Go

/*
Copyright 2019 LitmusChaos Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"github.com/go-logr/logr"
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
"github.com/litmuschaos/chaos-operator/pkg/analytics"
dynamicclientset "github.com/litmuschaos/chaos-operator/pkg/client/dynamic"
chaosTypes "github.com/litmuschaos/chaos-operator/pkg/types"
"github.com/litmuschaos/chaos-operator/pkg/utils"
"github.com/litmuschaos/chaos-operator/pkg/utils/retry"
"github.com/litmuschaos/elves/kubernetes/container"
"github.com/litmuschaos/elves/kubernetes/pod"
"github.com/pkg/errors"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"os"
"reflect"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"strings"
"time"
)
const finalizer = "chaosengine.litmuschaos.io/finalizer"
// ChaosEngineReconciler reconciles a ChaosEngine object
type ChaosEngineReconciler struct {
// This client, initialized using mgr.Client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client.Client
// Used for serializing and deserializing API objects(group, version, and kind)
Scheme *runtime.Scheme
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
Recorder record.EventRecorder
}
// reconcileEngine contains details of reconcileEngine
type reconcileEngine struct {
r *ChaosEngineReconciler
reqLogger logr.Logger
}
// podEngineRunner contains the information of pod
type podEngineRunner struct {
pod, engineRunner *corev1.Pod
*reconcileEngine
}
//+kubebuilder:rbac:groups=litmuschaos.io,resources=chaosengines,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=litmuschaos.io,resources=chaosengines/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=litmuschaos.io,resources=chaosengines/finalizers,verbs=update
// Reconcile reads that state of the cluster for a ChaosEngine object and makes changes based on the state read
// and what is in the ChaosEngine.Spec
// Note:
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue
func (r *ChaosEngineReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
reqLogger := startReqLogger(request)
engine := &chaosTypes.EngineInfo{}
if err := r.getChaosEngineInstance(engine, request); err != nil {
if k8serrors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
// Handle deletion of ChaosEngine
if engine.Instance.ObjectMeta.GetDeletionTimestamp() != nil {
return r.reconcileForDelete(engine, request)
}
// Start the reconcile by setting default values into ChaosEngine
if requeue, err := r.initEngine(engine); err != nil {
if requeue {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
// Handling of normal execution of ChaosEngine
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusInitialized {
return r.reconcileForCreationAndRunning(engine, reqLogger)
}
// Handling Graceful completion of ChaosEngine
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateStop && engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusCompleted {
return r.reconcileForComplete(engine, request)
}
// Handling forceful Abort of ChaosEngine
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateStop && engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusInitialized {
return r.reconcileForDelete(engine, request)
}
// Handling restarting of ChaosEngine post Abort
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && (engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusStopped) {
return r.reconcileForRestartAfterAbort(engine, request)
}
// Handling restarting of ChaosEngine post Completion
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && (engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusCompleted) {
return r.reconcileForRestartAfterComplete(engine, request)
}
return ctrl.Result{}, nil
}
// getChaosRunnerENV return the env required for chaos-runner
func getChaosRunnerENV(engine *chaosTypes.EngineInfo, ClientUUID string) []corev1.EnvVar {
var envDetails utils.ENVDetails
envDetails.SetEnv("CHAOSENGINE", engine.Instance.Name).
SetEnv("TARGETS", engine.Targets).
SetEnv("EXPERIMENT_LIST", fmt.Sprint(strings.Join(engine.AppExperiments, ","))).
SetEnv("CHAOS_SVC_ACC", engine.Instance.Spec.ChaosServiceAccount).
SetEnv("AUXILIARY_APPINFO", engine.Instance.Spec.AuxiliaryAppInfo).
SetEnv("CLIENT_UUID", ClientUUID).
SetEnv("CHAOS_NAMESPACE", engine.Instance.Namespace)
return envDetails.ENV
}
// getChaosRunnerLabels return the labels required for chaos-runner
func getChaosRunnerLabels(cr *litmuschaosv1alpha1.ChaosEngine) map[string]string {
labels := map[string]string{
"app": cr.Name,
"chaosUID": string(cr.UID),
"app.kubernetes.io/component": "chaos-runner",
"app.kubernetes.io/part-of": "litmus",
}
for k, v := range cr.Spec.Components.Runner.RunnerLabels {
labels[k] = v
}
return labels
}
// newGoRunnerPodForCR defines a new go-based Runner Pod
func (r *ChaosEngineReconciler) newGoRunnerPodForCR(engine *chaosTypes.EngineInfo) (*corev1.Pod, error) {
var experiment litmuschaosv1alpha1.ChaosExperiment
if err := r.Client.Get(context.TODO(), types.NamespacedName{Name: engine.Instance.Spec.Experiments[0].Name, Namespace: engine.Instance.Namespace}, &experiment); err != nil {
return nil, err
}
engine.VolumeOpts.VolumeOperations(engine.Instance.Spec.Components.Runner.ConfigMaps, engine.Instance.Spec.Components.Runner.Secrets)
containerForRunner := container.NewBuilder().
WithEnvsNew(getChaosRunnerENV(engine, analytics.ClientUUID)).
WithName("chaos-runner").
WithImage(engine.Instance.Spec.Components.Runner.Image).
WithImagePullPolicy(corev1.PullIfNotPresent)
if engine.Instance.Spec.Components.Runner.ImagePullPolicy != "" {
containerForRunner.WithImagePullPolicy(engine.Instance.Spec.Components.Runner.ImagePullPolicy)
}
if engine.Instance.Spec.Components.Runner.Args != nil {
containerForRunner.WithArgumentsNew(engine.Instance.Spec.Components.Runner.Args)
}
if engine.VolumeOpts.VolumeMounts != nil {
containerForRunner.WithVolumeMountsNew(engine.VolumeOpts.VolumeMounts)
}
if engine.Instance.Spec.Components.Runner.Command != nil {
containerForRunner.WithCommandNew(engine.Instance.Spec.Components.Runner.Command)
}
if !reflect.DeepEqual(engine.Instance.Spec.Components.Runner.Resources, corev1.ResourceRequirements{}) {
containerForRunner.WithResourceRequirements(engine.Instance.Spec.Components.Runner.Resources)
}
if !reflect.DeepEqual(experiment.Spec.Definition.SecurityContext.ContainerSecurityContext, corev1.SecurityContext{}) {
containerForRunner.WithSecurityContext(experiment.Spec.Definition.SecurityContext.ContainerSecurityContext)
}
podForRunner := pod.NewBuilder().
WithName(engine.Instance.Name + "-runner").
WithNamespace(engine.Instance.Namespace).
WithAnnotations(engine.Instance.Spec.Components.Runner.RunnerAnnotation).
WithLabels(getChaosRunnerLabels(engine.Instance)).
WithServiceAccountName(engine.Instance.Spec.ChaosServiceAccount).
WithRestartPolicy("OnFailure").
WithContainerBuilder(containerForRunner)
if engine.Instance.Spec.Components.Runner.Tolerations != nil {
podForRunner.WithTolerations(engine.Instance.Spec.Components.Runner.Tolerations...)
}
if len(engine.Instance.Spec.Components.Runner.NodeSelector) != 0 {
podForRunner.WithNodeSelector(engine.Instance.Spec.Components.Runner.NodeSelector)
}
if engine.VolumeOpts.VolumeBuilders != nil {
podForRunner.WithVolumeBuilders(engine.VolumeOpts.VolumeBuilders)
}
if engine.Instance.Spec.Components.Runner.ImagePullSecrets != nil {
podForRunner.WithImagePullSecrets(engine.Instance.Spec.Components.Runner.ImagePullSecrets)
}
if !reflect.DeepEqual(experiment.Spec.Definition.SecurityContext.PodSecurityContext, corev1.PodSecurityContext{}) {
podForRunner.WithSecurityContext(experiment.Spec.Definition.SecurityContext.PodSecurityContext)
}
runnerPod, err := podForRunner.Build()
if err != nil {
return nil, err
}
if err := controllerutil.SetControllerReference(engine.Instance, runnerPod, r.Scheme); err != nil {
return nil, err
}
return runnerPod, nil
}
// engineRunnerPod to Check if the engineRunner pod already exists, else create
func engineRunnerPod(runnerPod *podEngineRunner) error {
if err := runnerPod.r.Client.Get(context.TODO(), types.NamespacedName{Name: runnerPod.engineRunner.Name, Namespace: runnerPod.engineRunner.Namespace}, runnerPod.pod); err != nil && k8serrors.IsNotFound(err) {
runnerPod.reqLogger.Info("Creating a new engineRunner Pod", "Pod.Namespace", runnerPod.engineRunner.Namespace, "Pod.Name", runnerPod.engineRunner.Name)
if err = runnerPod.r.Client.Create(context.TODO(), runnerPod.engineRunner); err != nil {
if k8serrors.IsAlreadyExists(err) {
runnerPod.reqLogger.Info("Skip reconcile: engineRunner Pod already exists", "Pod.Namespace", runnerPod.pod.Namespace, "Pod.Name", runnerPod.pod.Name)
return nil
}
return err
}
// Pod created successfully - don't reconcile
runnerPod.reqLogger.Info("engineRunner Pod created successfully")
return nil
} else if err != nil {
return err
}
runnerPod.reqLogger.Info("Skip reconcile: engineRunner Pod already exists", "Pod.Namespace", runnerPod.pod.Namespace, "Pod.Name", runnerPod.pod.Name)
return nil
}
// Fetch the ChaosEngine instance
func (r *ChaosEngineReconciler) getChaosEngineInstance(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
instance := &litmuschaosv1alpha1.ChaosEngine{}
if err := r.Client.Get(context.TODO(), request.NamespacedName, instance); err != nil {
// Error reading the object - reconcile the request.
return err
}
engine.Instance = instance
engine.AppInfo = instance.Spec.Appinfo
engine.Selectors = instance.Spec.Selectors
return nil
}
// Check if the engineRunner pod already exists, else create
func (r *ChaosEngineReconciler) checkEngineRunnerPod(engine *chaosTypes.EngineInfo, reqLogger logr.Logger) error {
if len(engine.AppExperiments) == 0 {
return errors.New("application experiment list is empty")
}
engineRunner, err := r.newGoRunnerPodForCR(engine)
if err != nil {
return err
}
// Create an object of engine reconcile.
engineReconcile := &reconcileEngine{
r: r,
reqLogger: reqLogger,
}
// Creates an object of engineRunner Pod
runnerPod := &podEngineRunner{
pod: &corev1.Pod{},
engineRunner: engineRunner,
reconcileEngine: engineReconcile,
}
return engineRunnerPod(runnerPod)
}
// setChaosResourceImage take the runner image from engine spec
// if it is not there then it will take from chaos-operator env
// at last if it is not able to find image in engine spec and operator env then it will take default images
func setChaosResourceImage(engine *chaosTypes.EngineInfo) {
ChaosRunnerImage := os.Getenv("CHAOS_RUNNER_IMAGE")
if engine.Instance.Spec.Components.Runner.Image == "" && ChaosRunnerImage == "" {
engine.Instance.Spec.Components.Runner.Image = chaosTypes.DefaultChaosRunnerImage
} else if engine.Instance.Spec.Components.Runner.Image == "" {
engine.Instance.Spec.Components.Runner.Image = ChaosRunnerImage
}
}
// reconcileForDelete reconciles for deletion/force deletion of Chaos Engine
func (r *ChaosEngineReconciler) reconcileForDelete(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
patch := client.MergeFrom(engine.Instance.DeepCopy())
chaosTypes.Log.Info("Checking if there are any chaos resources to be deleted for", "chaosengine", engine.Instance.Name)
chaosPodList := &corev1.PodList{}
opts := []client.ListOption{
client.InNamespace(request.NamespacedName.Namespace),
client.MatchingLabels{"chaosUID": string(engine.Instance.UID)},
}
if err := r.Client.List(context.TODO(), chaosPodList, opts...); err != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to list chaos experiment pods")
return reconcile.Result{}, err
}
if len(chaosPodList.Items) != 0 {
chaosTypes.Log.Info("Performing a force delete of chaos experiment pods", "chaosengine", engine.Instance.Name)
err := r.forceRemoveChaosResources(engine, request)
if err != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to delete chaos experiment pods")
return reconcile.Result{}, err
}
}
// update the chaos status in result for abort cases
if err := r.updateChaosStatus(engine, request); err != nil {
return reconcile.Result{}, err
}
if engine.Instance.ObjectMeta.Finalizers != nil {
engine.Instance.ObjectMeta.Finalizers = utils.RemoveString(engine.Instance.ObjectMeta.Finalizers, "chaosengine.litmuschaos.io/finalizer")
}
// Update ChaosEngine ExperimentStatuses, with aborted Status.
updateExperimentStatusesForStop(engine)
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusStopped
if err := r.Client.Patch(context.TODO(), engine.Instance, patch); err != nil && !k8serrors.IsNotFound(err) {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to update chaosengine")
return reconcile.Result{}, fmt.Errorf("unable to remove finalizer from chaosEngine Resource, due to error: %v", err)
}
// we are repeating this condition/check here as we want the events for 'ChaosEngineStopped'
// generated only after successful finalizer removal from the chaosengine resource
if len(chaosPodList.Items) != 0 {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "ChaosEngineStopped", "Chaos resources deleted successfully")
}
return reconcile.Result{}, nil
}
// forceRemoveChaosResources force removes all chaos-related pods
func (r *ChaosEngineReconciler) forceRemoveChaosResources(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
optsDelete := []client.DeleteAllOfOption{client.InNamespace(request.NamespacedName.Namespace), client.MatchingLabels{"chaosUID": string(engine.Instance.UID)}, client.PropagationPolicy(v1.DeletePropagationBackground)}
if engine.Instance.Spec.TerminationGracePeriodSeconds != 0 {
optsDelete = append(optsDelete, client.GracePeriodSeconds(engine.Instance.Spec.TerminationGracePeriodSeconds))
}
var (
deleteEvent []string
err []error
)
if errJob := r.Client.DeleteAllOf(context.TODO(), &batchv1.Job{}, optsDelete...); errJob != nil {
err = append(err, errJob)
deleteEvent = append(deleteEvent, "Jobs, ")
}
if errPod := r.Client.DeleteAllOf(context.TODO(), &corev1.Pod{}, optsDelete...); errPod != nil {
err = append(err, errPod)
deleteEvent = append(deleteEvent, "Pods, ")
}
if err != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to delete chaos resources: %v allocated to chaosengine", strings.Join(deleteEvent, ""))
return fmt.Errorf("unable to delete ChaosResources due to %v", err)
}
return nil
}
// updateEngineState updates Chaos Engine Status with given State
func (r *ChaosEngineReconciler) updateEngineState(engine *chaosTypes.EngineInfo, state litmuschaosv1alpha1.EngineState) error {
patch := client.MergeFrom(engine.Instance.DeepCopy())
engine.Instance.Spec.EngineState = state
if err := r.Client.Patch(context.TODO(), engine.Instance, patch); err != nil {
return fmt.Errorf("unable to patch state of chaosEngine Resource, due to error: %v", err)
}
return nil
}
// checkRunnerContainerCompletedStatus check for the runner pod's container status for Completed
func (r *ChaosEngineReconciler) checkRunnerContainerCompletedStatus(engine *chaosTypes.EngineInfo) (bool, error) {
runnerPod := corev1.Pod{}
isCompleted := false
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: engine.Instance.Name + "-runner", Namespace: engine.Instance.Namespace}, &runnerPod)
if err != nil {
return isCompleted, err
}
if runnerPod.Status.Phase == corev1.PodRunning || runnerPod.Status.Phase == corev1.PodSucceeded {
for _, container := range runnerPod.Status.ContainerStatuses {
if container.Name == "chaos-runner" && container.State.Terminated != nil {
if container.State.Terminated.Reason == "Completed" {
isCompleted = !container.Ready
}
}
}
}
return isCompleted, nil
}
// gracefullyRemoveDefaultChaosResources removes all chaos-resources gracefully
func (r *ChaosEngineReconciler) gracefullyRemoveDefaultChaosResources(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
if engine.Instance.Spec.JobCleanUpPolicy == litmuschaosv1alpha1.CleanUpPolicyDelete {
if err := r.gracefullyRemoveChaosPods(engine, request); err != nil {
return reconcile.Result{}, err
}
}
return reconcile.Result{}, nil
}
// gracefullyRemoveChaosPods removes chaos default resources gracefully
func (r *ChaosEngineReconciler) gracefullyRemoveChaosPods(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
optsList := []client.ListOption{
client.InNamespace(request.NamespacedName.Namespace), client.MatchingLabels{"app": engine.Instance.Name, "chaosUID": string(engine.Instance.UID)},
}
var podList corev1.PodList
if errList := r.Client.List(context.TODO(), &podList, optsList...); errList != nil {
return errList
}
for _, v := range podList.Items {
if errDel := r.Client.Delete(context.TODO(), &v, []client.DeleteOption{}...); errDel != nil {
return errDel
}
}
return nil
}
// reconcileForComplete reconciles for graceful completion of Chaos Engine
func (r *ChaosEngineReconciler) reconcileForComplete(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
if _, err := r.gracefullyRemoveDefaultChaosResources(engine, request); err != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos completion) Unable to delete chaos pods upon chaos completion")
return reconcile.Result{}, err
}
if err := r.updateEngineState(engine, litmuschaosv1alpha1.EngineStateStop); err != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos completion) Unable to update chaosengine")
return reconcile.Result{}, fmt.Errorf("unable to Update Engine State: %v", err)
}
return reconcile.Result{}, nil
}
// reconcileForRestartAfterAbort reconciles for restart of ChaosEngine after it was aborted previously
func (r *ChaosEngineReconciler) reconcileForRestartAfterAbort(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
if err := r.forceRemoveChaosResources(engine, request); err != nil {
return reconcile.Result{}, err
}
if requeue, err := r.updateEngineForRestart(engine); err != nil {
if requeue {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
// reconcileForRestartAfterComplete reconciles for restart of ChaosEngine after it has completed successfully
func (r *ChaosEngineReconciler) reconcileForRestartAfterComplete(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
patch := client.MergeFrom(engine.Instance.DeepCopy())
if err := r.forceRemoveChaosResources(engine, request); err != nil {
return reconcile.Result{}, err
}
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusInitialized
engine.Instance.Status.Experiments = nil
// finalizers have been retained in a completed chaosengine till this point (as chaos pods may be "retained")
// as per the jobCleanUpPolicy. Stale finalizer is removed so that initEngine() generates the
// ChaosEngineInitialized event and re-adds the finalizer before starting chaos.
if engine.Instance.ObjectMeta.Finalizers != nil {
engine.Instance.ObjectMeta.Finalizers = utils.RemoveString(engine.Instance.ObjectMeta.Finalizers, "chaosengine.litmuschaos.io/finalizer")
}
if err := r.Client.Patch(context.TODO(), engine.Instance, patch); err != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos restart) Unable to update chaosengine")
return reconcile.Result{}, fmt.Errorf("unable to patch state & remove stale finalizer in chaosEngine Resource, due to error: %v", err)
}
return reconcile.Result{}, nil
}
// initEngine initialize Chaos Engine, and add a finalizer to it.
func (r *ChaosEngineReconciler) initEngine(engine *chaosTypes.EngineInfo) (bool, error) {
if engine.Instance.Spec.EngineState == "" {
engine.Instance.Spec.EngineState = litmuschaosv1alpha1.EngineStateActive
}
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && engine.Instance.Status.EngineStatus == "" {
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusInitialized
}
if engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusInitialized {
if engine.Instance.ObjectMeta.Finalizers == nil {
engine.Instance.ObjectMeta.Finalizers = append(engine.Instance.ObjectMeta.Finalizers, finalizer)
if err := r.Client.Update(context.TODO(), engine.Instance, &client.UpdateOptions{}); err != nil {
if k8serrors.IsConflict(err) {
return true, err
}
return false, fmt.Errorf("unable to initialize ChaosEngine, because of Update Error: %v", err)
}
// generate the ChaosEngineInitialized event once finalizer has been added
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "ChaosEngineInitialized", "Identifying app under test & launching %s", engine.Instance.Name+"-runner")
}
}
return false, nil
}
// reconcileForCreationAndRunning reconciles for Chaos execution of Chaos Engine
func (r *ChaosEngineReconciler) reconcileForCreationAndRunning(engine *chaosTypes.EngineInfo, reqLogger logr.Logger) (reconcile.Result, error) {
var runner corev1.Pod
if err := r.Client.Get(context.TODO(), types.NamespacedName{Name: engine.Instance.Name + "-runner", Namespace: engine.Instance.Namespace}, &runner); err != nil {
if k8serrors.IsNotFound(err) {
return r.createRunnerPod(engine, reqLogger)
}
return reconcile.Result{}, err
}
isCompleted, err := r.checkRunnerContainerCompletedStatus(engine)
if err != nil {
if k8serrors.IsNotFound(err) {
return reconcile.Result{Requeue: true}, nil
}
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos running) Unable to check chaos status")
return reconcile.Result{}, err
}
if isCompleted {
if requeue, err := r.updateEngineForComplete(engine, isCompleted); err != nil {
if requeue {
return reconcile.Result{Requeue: true}, nil
}
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos completed) Unable to update chaos engine")
return reconcile.Result{}, err
}
}
reqLogger.Info("Skip reconcile: engineRunner Pod already exists", "Pod.Namespace", runner.Namespace, "Pod.Name", runner.Name)
return reconcile.Result{}, nil
}
func (r *ChaosEngineReconciler) createRunnerPod(engine *chaosTypes.EngineInfo, reqLogger logr.Logger) (reconcile.Result, error) {
if err := r.setExperimentDetails(engine); err != nil {
if updateEngineErr := r.updateEngineState(engine, litmuschaosv1alpha1.EngineStateStop); updateEngineErr != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to update chaosengine")
return reconcile.Result{}, fmt.Errorf("unable to Update Engine State: %v", err)
}
return reconcile.Result{}, err
}
// Check if the engineRunner pod already exists, else create
if err := r.checkEngineRunnerPod(engine, reqLogger); err != nil {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos start) Unable to get chaos resources")
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
func (r *ChaosEngineReconciler) setExperimentDetails(engine *chaosTypes.EngineInfo) error {
// Get the image for runner pod from chaosengine spec,operator env or default values.
setChaosResourceImage(engine)
if engine.Selectors != nil && engine.Selectors.Workloads == nil && engine.Selectors.Pods == nil {
return fmt.Errorf("specify one out of workloads or pods")
}
if (engine.AppInfo.AppKind != "") != (engine.AppInfo.Applabel != "") {
return fmt.Errorf("incomplete appinfo, provide appkind and applabel both")
}
engine.Targets = getTargets(engine)
var appExperiments []string
for _, exp := range engine.Instance.Spec.Experiments {
appExperiments = append(appExperiments, exp.Name)
}
engine.AppExperiments = appExperiments
chaosTypes.Log.Info("Targets derived from Chaosengine is ", "targets", engine.Targets)
chaosTypes.Log.Info("Exp list derived from chaosengine is ", "appExpirements", appExperiments)
chaosTypes.Log.Info("Runner image derived from chaosengine is", "runnerImage", engine.Instance.Spec.Components.Runner.Image)
return nil
}
func getTargets(engine *chaosTypes.EngineInfo) string {
if engine.Selectors == nil && reflect.DeepEqual(engine.AppInfo, litmuschaosv1alpha1.ApplicationParams{}) {
return ""
}
var targets []string
if engine.Selectors != nil {
if engine.Selectors.Workloads != nil {
for _, w := range engine.Selectors.Workloads {
var filter string
if w.Names != "" {
filter = w.Names
} else {
filter = w.Labels
}
target := strings.Join([]string{string(w.Kind), w.Namespace, fmt.Sprintf("[%v]", filter)}, ":")
targets = append(targets, target)
}
return strings.Join(targets, ";")
}
for _, w := range engine.Selectors.Pods {
target := strings.Join([]string{"pod", w.Namespace, fmt.Sprintf("[%v]", w.Names)}, ":")
targets = append(targets, target)
}
return strings.Join(targets, ";")
}
if engine.AppInfo.Appns == "" {
engine.AppInfo.Appns = engine.Instance.Namespace
}
if engine.AppInfo.AppKind == "" {
engine.AppInfo.AppKind = "KIND"
}
return strings.Join([]string{engine.AppInfo.AppKind, engine.AppInfo.Appns, fmt.Sprintf("[%v]", engine.AppInfo.Applabel)}, ":")
}
// updateExperimentStatusesForStop updates ChaosEngine.Status.Experiment with Abort Status.
func updateExperimentStatusesForStop(engine *chaosTypes.EngineInfo) {
for i := range engine.Instance.Status.Experiments {
if engine.Instance.Status.Experiments[i].Status == litmuschaosv1alpha1.ExperimentStatusRunning || engine.Instance.Status.Experiments[i].Status == litmuschaosv1alpha1.ExperimentStatusWaiting {
engine.Instance.Status.Experiments[i].Status = litmuschaosv1alpha1.ExperimentStatusAborted
engine.Instance.Status.Experiments[i].Verdict = "Stopped"
engine.Instance.Status.Experiments[i].LastUpdateTime = v1.Now()
}
}
}
func startReqLogger(request reconcile.Request) logr.Logger {
reqLogger := chaosTypes.Log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
reqLogger.Info("Reconciling ChaosEngine")
return reqLogger
}
func (r *ChaosEngineReconciler) updateEngineForComplete(engine *chaosTypes.EngineInfo, isCompleted bool) (bool, error) {
if engine.Instance.Status.EngineStatus != litmuschaosv1alpha1.EngineStatusCompleted {
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusCompleted
engine.Instance.Spec.EngineState = litmuschaosv1alpha1.EngineStateStop
if err := r.Client.Update(context.TODO(), engine.Instance, &client.UpdateOptions{}); err != nil {
if k8serrors.IsConflict(err) {
return true, err
}
return false, fmt.Errorf("unable to update ChaosEngine Status, due to update error: %v", err)
}
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "ChaosEngineCompleted", "ChaosEngine completed, will delete or retain the resources according to jobCleanUpPolicy")
}
return false, nil
}
func (r *ChaosEngineReconciler) updateEngineForRestart(engine *chaosTypes.EngineInfo) (bool, error) {
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "RestartInProgress", "ChaosEngine is restarted")
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusInitialized
engine.Instance.Status.Experiments = nil
if err := r.Client.Update(context.TODO(), engine.Instance, &client.UpdateOptions{}); err != nil {
if k8serrors.IsConflict(err) {
return true, err
}
return false, fmt.Errorf("unable to restart ChaosEngine, due to update error: %v", err)
}
return false, nil
}
// updateChaosStatus update the chaos status inside the chaosresult
func (r *ChaosEngineReconciler) updateChaosStatus(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
if err := r.waitForChaosPodTermination(engine, request); err != nil {
return err
}
// skipping CRD validation for the namespace scoped operator
if os.Getenv("WATCH_NAMESPACE") == "" {
found, err := isResultCRDAvailable()
if err != nil {
return err
}
if !found {
return nil
}
}
return r.updateChaosResult(engine, request)
}
// updateChaosResult update the chaosstatus and annotation inside the chaosresult
func (r *ChaosEngineReconciler) updateChaosResult(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
chaosresultList := &litmuschaosv1alpha1.ChaosResultList{}
opts := []client.ListOption{
client.InNamespace(request.NamespacedName.Namespace),
client.MatchingLabels{},
}
if err := r.Client.List(context.TODO(), chaosresultList, opts...); err != nil {
return err
}
for _, result := range chaosresultList.Items {
if result.Labels["chaosUID"] == string(engine.Instance.UID) {
if len(result.ObjectMeta.Annotations) == 0 {
return nil
}
targetsList, annotations := getChaosStatus(result)
result.Status.History.Targets = targetsList
result.ObjectMeta.Annotations = annotations
chaosTypes.Log.Info("updating chaos status inside chaosresult", "chaosresult", result.Name)
return r.Client.Update(context.TODO(), &result, &client.UpdateOptions{})
}
}
return nil
}
// waitForChaosPodTermination wait until the termination of chaos pod after abort
func (r *ChaosEngineReconciler) waitForChaosPodTermination(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
opts := []client.ListOption{
client.InNamespace(request.NamespacedName.Namespace),
client.MatchingLabels{"chaosUID": string(engine.Instance.UID)},
}
return retry.
Times(uint(180)).
Wait(1 * time.Second).
Try(func(attempt uint) error {
chaosPodList := &corev1.PodList{}
if err := r.Client.List(context.TODO(), chaosPodList, opts...); err != nil {
return err
}
if len(chaosPodList.Items) != 0 {
return errors.Errorf("chaos pods are not deleted yet")
}
return nil
})
}
// getChaosStatus return the target application details along with their chaos status
func getChaosStatus(result litmuschaosv1alpha1.ChaosResult) ([]litmuschaosv1alpha1.TargetDetails, map[string]string) {
annotations := result.ObjectMeta.Annotations
targetsList := result.Status.History.Targets
for k, v := range annotations {
switch strings.ToLower(v) {
case "injected", "reverted", "targeted":
kind := strings.TrimSpace(strings.Split(k, "/")[0])
name := strings.TrimSpace(strings.Split(k, "/")[1])
if !updateTargets(name, v, &targetsList) {
targetsList = append(targetsList, litmuschaosv1alpha1.TargetDetails{
Name: name,
Kind: kind,
ChaosStatus: v,
})
}
delete(annotations, k)
}
}
return targetsList, annotations
}
// isResultCRDAvailable check the existence of chaosresult CRD inside cluster
func isResultCRDAvailable() (bool, error) {
dynamicClient, err := dynamicclientset.CreateClientSet()
if err != nil {
return false, err
}
// defining the gvr for the requested resource
gvr := schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
}
resultList, err := (*dynamicClient).Resource(gvr).List(context.Background(), v1.ListOptions{})
if err != nil {
return false, err
}
// check the presence of chaosresult CRD inside cluster
for _, crd := range resultList.Items {
if crd.GetName() == chaosTypes.ResultCRDName {
return true, nil
}
}
return false, nil
}
// updates the chaos status of targets which is already present inside history.targets
func updateTargets(name, status string, data *[]litmuschaosv1alpha1.TargetDetails) bool {
for i := range *data {
if (*data)[i].Name == name {
(*data)[i].ChaosStatus = status
return true
}
}
return false
}
// SetupWithManager sets up the controller with the Manager.
func (r *ChaosEngineReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&litmuschaosv1alpha1.ChaosEngine{}).
Owns(&corev1.Pod{}).
Complete(r)
}