863 lines
34 KiB
Go
863 lines
34 KiB
Go
/*
|
|
Copyright 2019 LitmusChaos Authors
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package controllers
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"github.com/go-logr/logr"
|
|
litmuschaosv1alpha1 "github.com/litmuschaos/chaos-operator/api/litmuschaos/v1alpha1"
|
|
"github.com/litmuschaos/chaos-operator/pkg/analytics"
|
|
dynamicclientset "github.com/litmuschaos/chaos-operator/pkg/client/dynamic"
|
|
chaosTypes "github.com/litmuschaos/chaos-operator/pkg/types"
|
|
"github.com/litmuschaos/chaos-operator/pkg/utils"
|
|
"github.com/litmuschaos/chaos-operator/pkg/utils/retry"
|
|
"github.com/litmuschaos/elves/kubernetes/container"
|
|
"github.com/litmuschaos/elves/kubernetes/pod"
|
|
"github.com/pkg/errors"
|
|
batchv1 "k8s.io/api/batch/v1"
|
|
corev1 "k8s.io/api/core/v1"
|
|
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/tools/record"
|
|
"os"
|
|
"reflect"
|
|
ctrl "sigs.k8s.io/controller-runtime"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const finalizer = "chaosengine.litmuschaos.io/finalizer"
|
|
|
|
// ChaosEngineReconciler reconciles a ChaosEngine object
|
|
type ChaosEngineReconciler struct {
|
|
// This client, initialized using mgr.Client() above, is a split client
|
|
// that reads objects from the cache and writes to the apiserver
|
|
client.Client
|
|
// Used for serializing and deserializing API objects(group, version, and kind)
|
|
Scheme *runtime.Scheme
|
|
// recorder is an event recorder for recording Event resources to the
|
|
// Kubernetes API.
|
|
Recorder record.EventRecorder
|
|
}
|
|
|
|
// reconcileEngine contains details of reconcileEngine
|
|
type reconcileEngine struct {
|
|
r *ChaosEngineReconciler
|
|
reqLogger logr.Logger
|
|
}
|
|
|
|
// podEngineRunner contains the information of pod
|
|
type podEngineRunner struct {
|
|
pod, engineRunner *corev1.Pod
|
|
*reconcileEngine
|
|
}
|
|
|
|
//+kubebuilder:rbac:groups=litmuschaos.io,resources=chaosengines,verbs=get;list;watch;create;update;patch;delete
|
|
//+kubebuilder:rbac:groups=litmuschaos.io,resources=chaosengines/status,verbs=get;update;patch
|
|
//+kubebuilder:rbac:groups=litmuschaos.io,resources=chaosengines/finalizers,verbs=update
|
|
|
|
// Reconcile reads that state of the cluster for a ChaosEngine object and makes changes based on the state read
|
|
// and what is in the ChaosEngine.Spec
|
|
// Note:
|
|
// The Controller will requeue the Request to be processed again if the returned error is non-nil or
|
|
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue
|
|
func (r *ChaosEngineReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) {
|
|
reqLogger := startReqLogger(request)
|
|
engine := &chaosTypes.EngineInfo{}
|
|
|
|
if err := r.getChaosEngineInstance(engine, request); err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
// Request object not found, could have been deleted after reconcile request.
|
|
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
|
|
// Return and don't requeue
|
|
return reconcile.Result{}, nil
|
|
}
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
// Handle deletion of ChaosEngine
|
|
if engine.Instance.ObjectMeta.GetDeletionTimestamp() != nil {
|
|
return r.reconcileForDelete(engine, request)
|
|
}
|
|
|
|
// Start the reconcile by setting default values into ChaosEngine
|
|
if requeue, err := r.initEngine(engine); err != nil {
|
|
if requeue {
|
|
return reconcile.Result{Requeue: true}, nil
|
|
}
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
// Handling of normal execution of ChaosEngine
|
|
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusInitialized {
|
|
return r.reconcileForCreationAndRunning(engine, reqLogger)
|
|
}
|
|
|
|
// Handling Graceful completion of ChaosEngine
|
|
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateStop && engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusCompleted {
|
|
return r.reconcileForComplete(engine, request)
|
|
}
|
|
|
|
// Handling forceful Abort of ChaosEngine
|
|
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateStop && engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusInitialized {
|
|
return r.reconcileForDelete(engine, request)
|
|
}
|
|
|
|
// Handling restarting of ChaosEngine post Abort
|
|
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && (engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusStopped) {
|
|
return r.reconcileForRestartAfterAbort(engine, request)
|
|
}
|
|
|
|
// Handling restarting of ChaosEngine post Completion
|
|
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && (engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusCompleted) {
|
|
return r.reconcileForRestartAfterComplete(engine, request)
|
|
}
|
|
|
|
return ctrl.Result{}, nil
|
|
}
|
|
|
|
// getChaosRunnerENV return the env required for chaos-runner
|
|
func getChaosRunnerENV(engine *chaosTypes.EngineInfo, ClientUUID string) []corev1.EnvVar {
|
|
|
|
var envDetails utils.ENVDetails
|
|
envDetails.SetEnv("CHAOSENGINE", engine.Instance.Name).
|
|
SetEnv("TARGETS", engine.Targets).
|
|
SetEnv("EXPERIMENT_LIST", fmt.Sprint(strings.Join(engine.AppExperiments, ","))).
|
|
SetEnv("CHAOS_SVC_ACC", engine.Instance.Spec.ChaosServiceAccount).
|
|
SetEnv("AUXILIARY_APPINFO", engine.Instance.Spec.AuxiliaryAppInfo).
|
|
SetEnv("CLIENT_UUID", ClientUUID).
|
|
SetEnv("CHAOS_NAMESPACE", engine.Instance.Namespace)
|
|
|
|
return envDetails.ENV
|
|
}
|
|
|
|
// getChaosRunnerLabels return the labels required for chaos-runner
|
|
func getChaosRunnerLabels(cr *litmuschaosv1alpha1.ChaosEngine) map[string]string {
|
|
labels := map[string]string{
|
|
"app": cr.Name,
|
|
"chaosUID": string(cr.UID),
|
|
"app.kubernetes.io/component": "chaos-runner",
|
|
"app.kubernetes.io/part-of": "litmus",
|
|
}
|
|
for k, v := range cr.Spec.Components.Runner.RunnerLabels {
|
|
labels[k] = v
|
|
}
|
|
return labels
|
|
}
|
|
|
|
// newGoRunnerPodForCR defines a new go-based Runner Pod
|
|
func (r *ChaosEngineReconciler) newGoRunnerPodForCR(engine *chaosTypes.EngineInfo) (*corev1.Pod, error) {
|
|
var experiment litmuschaosv1alpha1.ChaosExperiment
|
|
if err := r.Client.Get(context.TODO(), types.NamespacedName{Name: engine.Instance.Spec.Experiments[0].Name, Namespace: engine.Instance.Namespace}, &experiment); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
engine.VolumeOpts.VolumeOperations(engine.Instance.Spec.Components.Runner.ConfigMaps, engine.Instance.Spec.Components.Runner.Secrets)
|
|
|
|
containerForRunner := container.NewBuilder().
|
|
WithEnvsNew(getChaosRunnerENV(engine, analytics.ClientUUID)).
|
|
WithName("chaos-runner").
|
|
WithImage(engine.Instance.Spec.Components.Runner.Image).
|
|
WithImagePullPolicy(corev1.PullIfNotPresent)
|
|
|
|
if engine.Instance.Spec.Components.Runner.ImagePullPolicy != "" {
|
|
containerForRunner.WithImagePullPolicy(engine.Instance.Spec.Components.Runner.ImagePullPolicy)
|
|
}
|
|
|
|
if engine.Instance.Spec.Components.Runner.Args != nil {
|
|
containerForRunner.WithArgumentsNew(engine.Instance.Spec.Components.Runner.Args)
|
|
}
|
|
|
|
if engine.VolumeOpts.VolumeMounts != nil {
|
|
containerForRunner.WithVolumeMountsNew(engine.VolumeOpts.VolumeMounts)
|
|
}
|
|
|
|
if engine.Instance.Spec.Components.Runner.Command != nil {
|
|
containerForRunner.WithCommandNew(engine.Instance.Spec.Components.Runner.Command)
|
|
}
|
|
|
|
if !reflect.DeepEqual(engine.Instance.Spec.Components.Runner.Resources, corev1.ResourceRequirements{}) {
|
|
containerForRunner.WithResourceRequirements(engine.Instance.Spec.Components.Runner.Resources)
|
|
}
|
|
|
|
if !reflect.DeepEqual(experiment.Spec.Definition.SecurityContext.ContainerSecurityContext, corev1.SecurityContext{}) {
|
|
containerForRunner.WithSecurityContext(experiment.Spec.Definition.SecurityContext.ContainerSecurityContext)
|
|
}
|
|
|
|
podForRunner := pod.NewBuilder().
|
|
WithName(engine.Instance.Name + "-runner").
|
|
WithNamespace(engine.Instance.Namespace).
|
|
WithAnnotations(engine.Instance.Spec.Components.Runner.RunnerAnnotation).
|
|
WithLabels(getChaosRunnerLabels(engine.Instance)).
|
|
WithServiceAccountName(engine.Instance.Spec.ChaosServiceAccount).
|
|
WithRestartPolicy("OnFailure").
|
|
WithContainerBuilder(containerForRunner)
|
|
|
|
if engine.Instance.Spec.Components.Runner.Tolerations != nil {
|
|
podForRunner.WithTolerations(engine.Instance.Spec.Components.Runner.Tolerations...)
|
|
}
|
|
|
|
if len(engine.Instance.Spec.Components.Runner.NodeSelector) != 0 {
|
|
podForRunner.WithNodeSelector(engine.Instance.Spec.Components.Runner.NodeSelector)
|
|
}
|
|
|
|
if engine.VolumeOpts.VolumeBuilders != nil {
|
|
podForRunner.WithVolumeBuilders(engine.VolumeOpts.VolumeBuilders)
|
|
}
|
|
|
|
if engine.Instance.Spec.Components.Runner.ImagePullSecrets != nil {
|
|
podForRunner.WithImagePullSecrets(engine.Instance.Spec.Components.Runner.ImagePullSecrets)
|
|
}
|
|
|
|
if !reflect.DeepEqual(experiment.Spec.Definition.SecurityContext.PodSecurityContext, corev1.PodSecurityContext{}) {
|
|
podForRunner.WithSecurityContext(experiment.Spec.Definition.SecurityContext.PodSecurityContext)
|
|
}
|
|
|
|
runnerPod, err := podForRunner.Build()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := controllerutil.SetControllerReference(engine.Instance, runnerPod, r.Scheme); err != nil {
|
|
return nil, err
|
|
}
|
|
return runnerPod, nil
|
|
}
|
|
|
|
// engineRunnerPod to Check if the engineRunner pod already exists, else create
|
|
func engineRunnerPod(runnerPod *podEngineRunner) error {
|
|
if err := runnerPod.r.Client.Get(context.TODO(), types.NamespacedName{Name: runnerPod.engineRunner.Name, Namespace: runnerPod.engineRunner.Namespace}, runnerPod.pod); err != nil && k8serrors.IsNotFound(err) {
|
|
runnerPod.reqLogger.Info("Creating a new engineRunner Pod", "Pod.Namespace", runnerPod.engineRunner.Namespace, "Pod.Name", runnerPod.engineRunner.Name)
|
|
if err = runnerPod.r.Client.Create(context.TODO(), runnerPod.engineRunner); err != nil {
|
|
if k8serrors.IsAlreadyExists(err) {
|
|
runnerPod.reqLogger.Info("Skip reconcile: engineRunner Pod already exists", "Pod.Namespace", runnerPod.pod.Namespace, "Pod.Name", runnerPod.pod.Name)
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Pod created successfully - don't reconcile
|
|
runnerPod.reqLogger.Info("engineRunner Pod created successfully")
|
|
return nil
|
|
} else if err != nil {
|
|
return err
|
|
}
|
|
runnerPod.reqLogger.Info("Skip reconcile: engineRunner Pod already exists", "Pod.Namespace", runnerPod.pod.Namespace, "Pod.Name", runnerPod.pod.Name)
|
|
return nil
|
|
}
|
|
|
|
// Fetch the ChaosEngine instance
|
|
func (r *ChaosEngineReconciler) getChaosEngineInstance(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
|
|
instance := &litmuschaosv1alpha1.ChaosEngine{}
|
|
if err := r.Client.Get(context.TODO(), request.NamespacedName, instance); err != nil {
|
|
// Error reading the object - reconcile the request.
|
|
return err
|
|
}
|
|
engine.Instance = instance
|
|
engine.AppInfo = instance.Spec.Appinfo
|
|
engine.Selectors = instance.Spec.Selectors
|
|
return nil
|
|
}
|
|
|
|
// Check if the engineRunner pod already exists, else create
|
|
func (r *ChaosEngineReconciler) checkEngineRunnerPod(engine *chaosTypes.EngineInfo, reqLogger logr.Logger) error {
|
|
if len(engine.AppExperiments) == 0 {
|
|
return errors.New("application experiment list is empty")
|
|
}
|
|
|
|
engineRunner, err := r.newGoRunnerPodForCR(engine)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create an object of engine reconcile.
|
|
engineReconcile := &reconcileEngine{
|
|
r: r,
|
|
reqLogger: reqLogger,
|
|
}
|
|
// Creates an object of engineRunner Pod
|
|
runnerPod := &podEngineRunner{
|
|
pod: &corev1.Pod{},
|
|
engineRunner: engineRunner,
|
|
reconcileEngine: engineReconcile,
|
|
}
|
|
|
|
return engineRunnerPod(runnerPod)
|
|
}
|
|
|
|
// setChaosResourceImage take the runner image from engine spec
|
|
// if it is not there then it will take from chaos-operator env
|
|
// at last if it is not able to find image in engine spec and operator env then it will take default images
|
|
func setChaosResourceImage(engine *chaosTypes.EngineInfo) {
|
|
ChaosRunnerImage := os.Getenv("CHAOS_RUNNER_IMAGE")
|
|
|
|
if engine.Instance.Spec.Components.Runner.Image == "" && ChaosRunnerImage == "" {
|
|
engine.Instance.Spec.Components.Runner.Image = chaosTypes.DefaultChaosRunnerImage
|
|
} else if engine.Instance.Spec.Components.Runner.Image == "" {
|
|
engine.Instance.Spec.Components.Runner.Image = ChaosRunnerImage
|
|
}
|
|
}
|
|
|
|
// reconcileForDelete reconciles for deletion/force deletion of Chaos Engine
|
|
func (r *ChaosEngineReconciler) reconcileForDelete(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
|
|
patch := client.MergeFrom(engine.Instance.DeepCopy())
|
|
|
|
chaosTypes.Log.Info("Checking if there are any chaos resources to be deleted for", "chaosengine", engine.Instance.Name)
|
|
|
|
chaosPodList := &corev1.PodList{}
|
|
opts := []client.ListOption{
|
|
client.InNamespace(request.NamespacedName.Namespace),
|
|
client.MatchingLabels{"chaosUID": string(engine.Instance.UID)},
|
|
}
|
|
if err := r.Client.List(context.TODO(), chaosPodList, opts...); err != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to list chaos experiment pods")
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
if len(chaosPodList.Items) != 0 {
|
|
chaosTypes.Log.Info("Performing a force delete of chaos experiment pods", "chaosengine", engine.Instance.Name)
|
|
err := r.forceRemoveChaosResources(engine, request)
|
|
if err != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to delete chaos experiment pods")
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
|
|
// update the chaos status in result for abort cases
|
|
if err := r.updateChaosStatus(engine, request); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
if engine.Instance.ObjectMeta.Finalizers != nil {
|
|
engine.Instance.ObjectMeta.Finalizers = utils.RemoveString(engine.Instance.ObjectMeta.Finalizers, "chaosengine.litmuschaos.io/finalizer")
|
|
}
|
|
|
|
// Update ChaosEngine ExperimentStatuses, with aborted Status.
|
|
updateExperimentStatusesForStop(engine)
|
|
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusStopped
|
|
|
|
if err := r.Client.Patch(context.TODO(), engine.Instance, patch); err != nil && !k8serrors.IsNotFound(err) {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to update chaosengine")
|
|
return reconcile.Result{}, fmt.Errorf("unable to remove finalizer from chaosEngine Resource, due to error: %v", err)
|
|
}
|
|
|
|
// we are repeating this condition/check here as we want the events for 'ChaosEngineStopped'
|
|
// generated only after successful finalizer removal from the chaosengine resource
|
|
if len(chaosPodList.Items) != 0 {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "ChaosEngineStopped", "Chaos resources deleted successfully")
|
|
}
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
// forceRemoveChaosResources force removes all chaos-related pods
|
|
func (r *ChaosEngineReconciler) forceRemoveChaosResources(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
|
|
optsDelete := []client.DeleteAllOfOption{client.InNamespace(request.NamespacedName.Namespace), client.MatchingLabels{"chaosUID": string(engine.Instance.UID)}, client.PropagationPolicy(v1.DeletePropagationBackground)}
|
|
if engine.Instance.Spec.TerminationGracePeriodSeconds != 0 {
|
|
optsDelete = append(optsDelete, client.GracePeriodSeconds(engine.Instance.Spec.TerminationGracePeriodSeconds))
|
|
}
|
|
|
|
var (
|
|
deleteEvent []string
|
|
err []error
|
|
)
|
|
|
|
if errJob := r.Client.DeleteAllOf(context.TODO(), &batchv1.Job{}, optsDelete...); errJob != nil {
|
|
err = append(err, errJob)
|
|
deleteEvent = append(deleteEvent, "Jobs, ")
|
|
}
|
|
|
|
if errPod := r.Client.DeleteAllOf(context.TODO(), &corev1.Pod{}, optsDelete...); errPod != nil {
|
|
err = append(err, errPod)
|
|
deleteEvent = append(deleteEvent, "Pods, ")
|
|
}
|
|
if err != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to delete chaos resources: %v allocated to chaosengine", strings.Join(deleteEvent, ""))
|
|
return fmt.Errorf("unable to delete ChaosResources due to %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateEngineState updates Chaos Engine Status with given State
|
|
func (r *ChaosEngineReconciler) updateEngineState(engine *chaosTypes.EngineInfo, state litmuschaosv1alpha1.EngineState) error {
|
|
patch := client.MergeFrom(engine.Instance.DeepCopy())
|
|
engine.Instance.Spec.EngineState = state
|
|
|
|
if err := r.Client.Patch(context.TODO(), engine.Instance, patch); err != nil {
|
|
return fmt.Errorf("unable to patch state of chaosEngine Resource, due to error: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// checkRunnerContainerCompletedStatus check for the runner pod's container status for Completed
|
|
func (r *ChaosEngineReconciler) checkRunnerContainerCompletedStatus(engine *chaosTypes.EngineInfo) (bool, error) {
|
|
runnerPod := corev1.Pod{}
|
|
isCompleted := false
|
|
|
|
err := r.Client.Get(context.TODO(), types.NamespacedName{Name: engine.Instance.Name + "-runner", Namespace: engine.Instance.Namespace}, &runnerPod)
|
|
if err != nil {
|
|
return isCompleted, err
|
|
}
|
|
|
|
if runnerPod.Status.Phase == corev1.PodRunning || runnerPod.Status.Phase == corev1.PodSucceeded {
|
|
for _, container := range runnerPod.Status.ContainerStatuses {
|
|
if container.Name == "chaos-runner" && container.State.Terminated != nil {
|
|
if container.State.Terminated.Reason == "Completed" {
|
|
isCompleted = !container.Ready
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return isCompleted, nil
|
|
}
|
|
|
|
// gracefullyRemoveDefaultChaosResources removes all chaos-resources gracefully
|
|
func (r *ChaosEngineReconciler) gracefullyRemoveDefaultChaosResources(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
|
|
if engine.Instance.Spec.JobCleanUpPolicy == litmuschaosv1alpha1.CleanUpPolicyDelete {
|
|
if err := r.gracefullyRemoveChaosPods(engine, request); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
// gracefullyRemoveChaosPods removes chaos default resources gracefully
|
|
func (r *ChaosEngineReconciler) gracefullyRemoveChaosPods(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
|
|
optsList := []client.ListOption{
|
|
client.InNamespace(request.NamespacedName.Namespace), client.MatchingLabels{"app": engine.Instance.Name, "chaosUID": string(engine.Instance.UID)},
|
|
}
|
|
|
|
var podList corev1.PodList
|
|
if errList := r.Client.List(context.TODO(), &podList, optsList...); errList != nil {
|
|
return errList
|
|
}
|
|
|
|
for _, v := range podList.Items {
|
|
if errDel := r.Client.Delete(context.TODO(), &v, []client.DeleteOption{}...); errDel != nil {
|
|
return errDel
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reconcileForComplete reconciles for graceful completion of Chaos Engine
|
|
func (r *ChaosEngineReconciler) reconcileForComplete(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
|
|
if _, err := r.gracefullyRemoveDefaultChaosResources(engine, request); err != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos completion) Unable to delete chaos pods upon chaos completion")
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
if err := r.updateEngineState(engine, litmuschaosv1alpha1.EngineStateStop); err != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos completion) Unable to update chaosengine")
|
|
return reconcile.Result{}, fmt.Errorf("unable to Update Engine State: %v", err)
|
|
}
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
// reconcileForRestartAfterAbort reconciles for restart of ChaosEngine after it was aborted previously
|
|
func (r *ChaosEngineReconciler) reconcileForRestartAfterAbort(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
|
|
if err := r.forceRemoveChaosResources(engine, request); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
if requeue, err := r.updateEngineForRestart(engine); err != nil {
|
|
if requeue {
|
|
return reconcile.Result{Requeue: true}, nil
|
|
}
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
return reconcile.Result{}, nil
|
|
|
|
}
|
|
|
|
// reconcileForRestartAfterComplete reconciles for restart of ChaosEngine after it has completed successfully
|
|
func (r *ChaosEngineReconciler) reconcileForRestartAfterComplete(engine *chaosTypes.EngineInfo, request reconcile.Request) (reconcile.Result, error) {
|
|
patch := client.MergeFrom(engine.Instance.DeepCopy())
|
|
|
|
if err := r.forceRemoveChaosResources(engine, request); err != nil {
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusInitialized
|
|
engine.Instance.Status.Experiments = nil
|
|
|
|
// finalizers have been retained in a completed chaosengine till this point (as chaos pods may be "retained")
|
|
// as per the jobCleanUpPolicy. Stale finalizer is removed so that initEngine() generates the
|
|
// ChaosEngineInitialized event and re-adds the finalizer before starting chaos.
|
|
|
|
if engine.Instance.ObjectMeta.Finalizers != nil {
|
|
engine.Instance.ObjectMeta.Finalizers = utils.RemoveString(engine.Instance.ObjectMeta.Finalizers, "chaosengine.litmuschaos.io/finalizer")
|
|
}
|
|
|
|
if err := r.Client.Patch(context.TODO(), engine.Instance, patch); err != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos restart) Unable to update chaosengine")
|
|
return reconcile.Result{}, fmt.Errorf("unable to patch state & remove stale finalizer in chaosEngine Resource, due to error: %v", err)
|
|
}
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
// initEngine initialize Chaos Engine, and add a finalizer to it.
|
|
func (r *ChaosEngineReconciler) initEngine(engine *chaosTypes.EngineInfo) (bool, error) {
|
|
if engine.Instance.Spec.EngineState == "" {
|
|
engine.Instance.Spec.EngineState = litmuschaosv1alpha1.EngineStateActive
|
|
}
|
|
|
|
if engine.Instance.Spec.EngineState == litmuschaosv1alpha1.EngineStateActive && engine.Instance.Status.EngineStatus == "" {
|
|
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusInitialized
|
|
}
|
|
|
|
if engine.Instance.Status.EngineStatus == litmuschaosv1alpha1.EngineStatusInitialized {
|
|
if engine.Instance.ObjectMeta.Finalizers == nil {
|
|
engine.Instance.ObjectMeta.Finalizers = append(engine.Instance.ObjectMeta.Finalizers, finalizer)
|
|
if err := r.Client.Update(context.TODO(), engine.Instance, &client.UpdateOptions{}); err != nil {
|
|
if k8serrors.IsConflict(err) {
|
|
return true, err
|
|
}
|
|
return false, fmt.Errorf("unable to initialize ChaosEngine, because of Update Error: %v", err)
|
|
}
|
|
// generate the ChaosEngineInitialized event once finalizer has been added
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "ChaosEngineInitialized", "Identifying app under test & launching %s", engine.Instance.Name+"-runner")
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// reconcileForCreationAndRunning reconciles for Chaos execution of Chaos Engine
|
|
func (r *ChaosEngineReconciler) reconcileForCreationAndRunning(engine *chaosTypes.EngineInfo, reqLogger logr.Logger) (reconcile.Result, error) {
|
|
var runner corev1.Pod
|
|
if err := r.Client.Get(context.TODO(), types.NamespacedName{Name: engine.Instance.Name + "-runner", Namespace: engine.Instance.Namespace}, &runner); err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
return r.createRunnerPod(engine, reqLogger)
|
|
}
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
isCompleted, err := r.checkRunnerContainerCompletedStatus(engine)
|
|
if err != nil {
|
|
if k8serrors.IsNotFound(err) {
|
|
return reconcile.Result{Requeue: true}, nil
|
|
}
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos running) Unable to check chaos status")
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
if isCompleted {
|
|
if requeue, err := r.updateEngineForComplete(engine, isCompleted); err != nil {
|
|
if requeue {
|
|
return reconcile.Result{Requeue: true}, nil
|
|
}
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos completed) Unable to update chaos engine")
|
|
return reconcile.Result{}, err
|
|
}
|
|
}
|
|
|
|
reqLogger.Info("Skip reconcile: engineRunner Pod already exists", "Pod.Namespace", runner.Namespace, "Pod.Name", runner.Name)
|
|
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
func (r *ChaosEngineReconciler) createRunnerPod(engine *chaosTypes.EngineInfo, reqLogger logr.Logger) (reconcile.Result, error) {
|
|
if err := r.setExperimentDetails(engine); err != nil {
|
|
if updateEngineErr := r.updateEngineState(engine, litmuschaosv1alpha1.EngineStateStop); updateEngineErr != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos stop) Unable to update chaosengine")
|
|
return reconcile.Result{}, fmt.Errorf("unable to Update Engine State: %v", err)
|
|
}
|
|
return reconcile.Result{}, err
|
|
}
|
|
|
|
// Check if the engineRunner pod already exists, else create
|
|
if err := r.checkEngineRunnerPod(engine, reqLogger); err != nil {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeWarning, "ChaosResourcesOperationFailed", "(chaos start) Unable to get chaos resources")
|
|
return reconcile.Result{}, err
|
|
}
|
|
return reconcile.Result{}, nil
|
|
}
|
|
|
|
func (r *ChaosEngineReconciler) setExperimentDetails(engine *chaosTypes.EngineInfo) error {
|
|
// Get the image for runner pod from chaosengine spec,operator env or default values.
|
|
setChaosResourceImage(engine)
|
|
|
|
if engine.Selectors != nil && engine.Selectors.Workloads == nil && engine.Selectors.Pods == nil {
|
|
return fmt.Errorf("specify one out of workloads or pods")
|
|
}
|
|
|
|
if (engine.AppInfo.AppKind != "") != (engine.AppInfo.Applabel != "") {
|
|
return fmt.Errorf("incomplete appinfo, provide appkind and applabel both")
|
|
}
|
|
|
|
engine.Targets = getTargets(engine)
|
|
|
|
var appExperiments []string
|
|
for _, exp := range engine.Instance.Spec.Experiments {
|
|
appExperiments = append(appExperiments, exp.Name)
|
|
}
|
|
engine.AppExperiments = appExperiments
|
|
|
|
chaosTypes.Log.Info("Targets derived from Chaosengine is ", "targets", engine.Targets)
|
|
chaosTypes.Log.Info("Exp list derived from chaosengine is ", "appExpirements", appExperiments)
|
|
chaosTypes.Log.Info("Runner image derived from chaosengine is", "runnerImage", engine.Instance.Spec.Components.Runner.Image)
|
|
return nil
|
|
}
|
|
|
|
func getTargets(engine *chaosTypes.EngineInfo) string {
|
|
if engine.Selectors == nil && reflect.DeepEqual(engine.AppInfo, litmuschaosv1alpha1.ApplicationParams{}) {
|
|
return ""
|
|
}
|
|
|
|
var targets []string
|
|
|
|
if engine.Selectors != nil {
|
|
if engine.Selectors.Workloads != nil {
|
|
for _, w := range engine.Selectors.Workloads {
|
|
var filter string
|
|
if w.Names != "" {
|
|
filter = w.Names
|
|
} else {
|
|
filter = w.Labels
|
|
}
|
|
|
|
target := strings.Join([]string{string(w.Kind), w.Namespace, fmt.Sprintf("[%v]", filter)}, ":")
|
|
targets = append(targets, target)
|
|
}
|
|
return strings.Join(targets, ";")
|
|
}
|
|
|
|
for _, w := range engine.Selectors.Pods {
|
|
target := strings.Join([]string{"pod", w.Namespace, fmt.Sprintf("[%v]", w.Names)}, ":")
|
|
targets = append(targets, target)
|
|
}
|
|
return strings.Join(targets, ";")
|
|
}
|
|
|
|
if engine.AppInfo.Appns == "" {
|
|
engine.AppInfo.Appns = engine.Instance.Namespace
|
|
}
|
|
|
|
if engine.AppInfo.AppKind == "" {
|
|
engine.AppInfo.AppKind = "KIND"
|
|
}
|
|
return strings.Join([]string{engine.AppInfo.AppKind, engine.AppInfo.Appns, fmt.Sprintf("[%v]", engine.AppInfo.Applabel)}, ":")
|
|
}
|
|
|
|
// updateExperimentStatusesForStop updates ChaosEngine.Status.Experiment with Abort Status.
|
|
func updateExperimentStatusesForStop(engine *chaosTypes.EngineInfo) {
|
|
for i := range engine.Instance.Status.Experiments {
|
|
if engine.Instance.Status.Experiments[i].Status == litmuschaosv1alpha1.ExperimentStatusRunning || engine.Instance.Status.Experiments[i].Status == litmuschaosv1alpha1.ExperimentStatusWaiting {
|
|
engine.Instance.Status.Experiments[i].Status = litmuschaosv1alpha1.ExperimentStatusAborted
|
|
engine.Instance.Status.Experiments[i].Verdict = "Stopped"
|
|
engine.Instance.Status.Experiments[i].LastUpdateTime = v1.Now()
|
|
}
|
|
}
|
|
}
|
|
|
|
func startReqLogger(request reconcile.Request) logr.Logger {
|
|
reqLogger := chaosTypes.Log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)
|
|
reqLogger.Info("Reconciling ChaosEngine")
|
|
|
|
return reqLogger
|
|
}
|
|
|
|
func (r *ChaosEngineReconciler) updateEngineForComplete(engine *chaosTypes.EngineInfo, isCompleted bool) (bool, error) {
|
|
if engine.Instance.Status.EngineStatus != litmuschaosv1alpha1.EngineStatusCompleted {
|
|
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusCompleted
|
|
engine.Instance.Spec.EngineState = litmuschaosv1alpha1.EngineStateStop
|
|
if err := r.Client.Update(context.TODO(), engine.Instance, &client.UpdateOptions{}); err != nil {
|
|
if k8serrors.IsConflict(err) {
|
|
return true, err
|
|
}
|
|
return false, fmt.Errorf("unable to update ChaosEngine Status, due to update error: %v", err)
|
|
}
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "ChaosEngineCompleted", "ChaosEngine completed, will delete or retain the resources according to jobCleanUpPolicy")
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (r *ChaosEngineReconciler) updateEngineForRestart(engine *chaosTypes.EngineInfo) (bool, error) {
|
|
r.Recorder.Eventf(engine.Instance, corev1.EventTypeNormal, "RestartInProgress", "ChaosEngine is restarted")
|
|
engine.Instance.Status.EngineStatus = litmuschaosv1alpha1.EngineStatusInitialized
|
|
engine.Instance.Status.Experiments = nil
|
|
if err := r.Client.Update(context.TODO(), engine.Instance, &client.UpdateOptions{}); err != nil {
|
|
if k8serrors.IsConflict(err) {
|
|
return true, err
|
|
}
|
|
return false, fmt.Errorf("unable to restart ChaosEngine, due to update error: %v", err)
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// updateChaosStatus update the chaos status inside the chaosresult
|
|
func (r *ChaosEngineReconciler) updateChaosStatus(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
|
|
if err := r.waitForChaosPodTermination(engine, request); err != nil {
|
|
return err
|
|
}
|
|
|
|
// skipping CRD validation for the namespace scoped operator
|
|
if os.Getenv("WATCH_NAMESPACE") == "" {
|
|
found, err := isResultCRDAvailable()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !found {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
return r.updateChaosResult(engine, request)
|
|
}
|
|
|
|
// updateChaosResult update the chaosstatus and annotation inside the chaosresult
|
|
func (r *ChaosEngineReconciler) updateChaosResult(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
|
|
chaosresultList := &litmuschaosv1alpha1.ChaosResultList{}
|
|
opts := []client.ListOption{
|
|
client.InNamespace(request.NamespacedName.Namespace),
|
|
client.MatchingLabels{},
|
|
}
|
|
|
|
if err := r.Client.List(context.TODO(), chaosresultList, opts...); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, result := range chaosresultList.Items {
|
|
if result.Labels["chaosUID"] == string(engine.Instance.UID) {
|
|
if len(result.ObjectMeta.Annotations) == 0 {
|
|
return nil
|
|
}
|
|
targetsList, annotations := getChaosStatus(result)
|
|
result.Status.History.Targets = targetsList
|
|
result.ObjectMeta.Annotations = annotations
|
|
|
|
chaosTypes.Log.Info("updating chaos status inside chaosresult", "chaosresult", result.Name)
|
|
return r.Client.Update(context.TODO(), &result, &client.UpdateOptions{})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// waitForChaosPodTermination wait until the termination of chaos pod after abort
|
|
func (r *ChaosEngineReconciler) waitForChaosPodTermination(engine *chaosTypes.EngineInfo, request reconcile.Request) error {
|
|
opts := []client.ListOption{
|
|
client.InNamespace(request.NamespacedName.Namespace),
|
|
client.MatchingLabels{"chaosUID": string(engine.Instance.UID)},
|
|
}
|
|
|
|
return retry.
|
|
Times(uint(180)).
|
|
Wait(1 * time.Second).
|
|
Try(func(attempt uint) error {
|
|
chaosPodList := &corev1.PodList{}
|
|
if err := r.Client.List(context.TODO(), chaosPodList, opts...); err != nil {
|
|
return err
|
|
}
|
|
if len(chaosPodList.Items) != 0 {
|
|
return errors.Errorf("chaos pods are not deleted yet")
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// getChaosStatus return the target application details along with their chaos status
|
|
func getChaosStatus(result litmuschaosv1alpha1.ChaosResult) ([]litmuschaosv1alpha1.TargetDetails, map[string]string) {
|
|
annotations := result.ObjectMeta.Annotations
|
|
|
|
targetsList := result.Status.History.Targets
|
|
for k, v := range annotations {
|
|
switch strings.ToLower(v) {
|
|
case "injected", "reverted", "targeted":
|
|
kind := strings.TrimSpace(strings.Split(k, "/")[0])
|
|
name := strings.TrimSpace(strings.Split(k, "/")[1])
|
|
if !updateTargets(name, v, &targetsList) {
|
|
targetsList = append(targetsList, litmuschaosv1alpha1.TargetDetails{
|
|
Name: name,
|
|
Kind: kind,
|
|
ChaosStatus: v,
|
|
})
|
|
}
|
|
delete(annotations, k)
|
|
}
|
|
}
|
|
|
|
return targetsList, annotations
|
|
}
|
|
|
|
// isResultCRDAvailable check the existence of chaosresult CRD inside cluster
|
|
func isResultCRDAvailable() (bool, error) {
|
|
|
|
dynamicClient, err := dynamicclientset.CreateClientSet()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// defining the gvr for the requested resource
|
|
gvr := schema.GroupVersionResource{
|
|
Group: "apiextensions.k8s.io",
|
|
Version: "v1",
|
|
Resource: "customresourcedefinitions",
|
|
}
|
|
|
|
resultList, err := (*dynamicClient).Resource(gvr).List(context.Background(), v1.ListOptions{})
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
// check the presence of chaosresult CRD inside cluster
|
|
for _, crd := range resultList.Items {
|
|
if crd.GetName() == chaosTypes.ResultCRDName {
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
// updates the chaos status of targets which is already present inside history.targets
|
|
func updateTargets(name, status string, data *[]litmuschaosv1alpha1.TargetDetails) bool {
|
|
for i := range *data {
|
|
if (*data)[i].Name == name {
|
|
(*data)[i].ChaosStatus = status
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// SetupWithManager sets up the controller with the Manager.
|
|
func (r *ChaosEngineReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
|
return ctrl.NewControllerManagedBy(mgr).
|
|
For(&litmuschaosv1alpha1.ChaosEngine{}).
|
|
Owns(&corev1.Pod{}).
|
|
Complete(r)
|
|
}
|