cli-utils/pkg/apply/mutator/apply_time_mutator.go

296 lines
9.4 KiB
Go

// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package mutator
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"github.com/fluxcd/cli-utils/pkg/apply/cache"
"github.com/fluxcd/cli-utils/pkg/jsonpath"
"github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/cli-utils/pkg/object"
"github.com/fluxcd/cli-utils/pkg/object/mutation"
)
// ApplyTimeMutator mutates an object by injecting values specified by the
// apply-time-mutation annotation.
// The optional ResourceCache will be used to speed up source object lookups,
// if specified.
// Implements the Mutator interface
type ApplyTimeMutator struct {
Client dynamic.Interface
Mapper meta.RESTMapper
ResourceCache cache.ResourceCache
}
// Name returns a mutator identifier for logging.
func (atm *ApplyTimeMutator) Name() string {
return "ApplyTimeMutator"
}
// Mutate parses the apply-time-mutation annotation and loops through the
// substitutions, applying each of them to the supplied target object.
// Returns true with a reason, if mutation was performed.
func (atm *ApplyTimeMutator) Mutate(ctx context.Context, obj *unstructured.Unstructured) (bool, string, error) {
mutated := false
reason := ""
targetRef := mutation.ResourceReferenceFromUnstructured(obj)
if !mutation.HasAnnotation(obj) {
return mutated, reason, nil
}
subs, err := mutation.ReadAnnotation(obj)
if err != nil {
return mutated, reason, fmt.Errorf("failed to read annotation in object (%s): %w", targetRef, err)
}
klog.V(4).Infof("target object: %s", targetRef)
klog.V(7).Infof("target object YAML:\n%s", object.YamlStringer{O: obj})
// validate no self-references
// Early validation to avoid GETs, but won't catch sources with implicit namespace.
for _, sub := range subs {
if targetRef.Equal(sub.SourceRef) {
return mutated, reason, fmt.Errorf("invalid self-reference (%s)", sub.SourceRef)
}
}
for _, sub := range subs {
sourceRef := sub.SourceRef
// lookup REST mapping
sourceMapping, err := atm.getMapping(sourceRef)
if err != nil {
return mutated, reason, fmt.Errorf("failed to identify source object mapping (%s): %w", sourceRef, err)
}
// Default source namespace to target namesapce, if namespace-scoped
if sourceRef.Namespace == "" && sourceMapping.Scope.Name() == meta.RESTScopeNameNamespace {
sourceRef.Namespace = targetRef.Namespace
}
// validate no self-references
// Re-check to catch sources with implicit namespace.
if targetRef.Equal(sub.SourceRef) {
return mutated, reason, fmt.Errorf("invalid self-reference (%s)", sub.SourceRef)
}
// lookup source object from cache or cluster
sourceObj, err := atm.getObject(ctx, sourceMapping, sourceRef)
if err != nil {
return mutated, reason, fmt.Errorf("failed to get source object (%s): %w", sourceRef, err)
}
klog.V(4).Infof("source object: %s", sourceRef)
klog.V(7).Infof("source object YAML:\n%s", object.YamlStringer{O: sourceObj})
// lookup target field in target object
targetValue, _, err := readFieldValue(obj, sub.TargetPath)
if err != nil {
return mutated, reason, fmt.Errorf("failed to read field (%s) from target object (%s): %w", sub.TargetPath, targetRef, err)
}
// lookup source field in source object
sourceValue, found, err := readFieldValue(sourceObj, sub.SourcePath)
if err != nil {
return mutated, reason, fmt.Errorf("failed to read field (%s) from source object (%s): %w", sub.SourcePath, sourceRef, err)
}
if !found {
return mutated, reason, fmt.Errorf("source field (%s) not present in source object (%s)", sub.SourcePath, sourceRef)
}
var newValue interface{}
if sub.Token == "" {
// token not specified, replace the entire target value with the source value
newValue = sourceValue
} else {
// token specified, substitute token for source field value in target field value
targetValueString, ok := targetValue.(string)
if !ok {
return mutated, reason, fmt.Errorf("token is specified, but target field value is %T, expected string", targetValue)
}
sourceValueString, err := valueToString(sourceValue)
if err != nil {
return mutated, reason, fmt.Errorf("failed to stringify source field value (%s): %w", targetRef, err)
}
// Substitute token for source field value, if present.
// If not present, do nothing. This is common on updates.
newValue = strings.ReplaceAll(targetValueString, sub.Token, sourceValueString)
}
klog.V(5).Infof("substitution: targetRef=(%s), sourceRef=(%s): sourceValue=(%v), token=(%s), oldTargetValue=(%v), newTargetValue=(%v)",
targetRef, sourceRef, sourceValue, sub.Token, targetValue, newValue)
// update target field in target object
err = writeFieldValue(obj, sub.TargetPath, newValue)
if err != nil {
return mutated, reason, fmt.Errorf("failed to set field in target object (%s): %w", targetRef, err)
}
mutated = true
reason = fmt.Sprintf("object contained annotation: %s", mutation.Annotation)
}
if mutated {
klog.V(4).Infof("mutated target object: %s", targetRef)
klog.V(7).Infof("mutated target object YAML:\n%s", object.YamlStringer{O: obj})
}
return mutated, reason, nil
}
func (atm *ApplyTimeMutator) getMapping(ref mutation.ResourceReference) (*meta.RESTMapping, error) {
// lookup object using group api version, if specified
sourceGvk := ref.GroupVersionKind()
var mapping *meta.RESTMapping
var err error
if sourceGvk.Version != "" {
mapping, err = atm.Mapper.RESTMapping(sourceGvk.GroupKind(), sourceGvk.Version)
} else {
mapping, err = atm.Mapper.RESTMapping(sourceGvk.GroupKind())
}
if err != nil {
return nil, err
}
return mapping, nil
}
// getObject returns a cached object, if cached and cache exists, otherwise
// the object is retrieved from the cluster.
func (atm *ApplyTimeMutator) getObject(ctx context.Context, mapping *meta.RESTMapping, ref mutation.ResourceReference) (*unstructured.Unstructured, error) {
// validate source object
if ref.Name == "" {
return nil, fmt.Errorf("invalid source object: empty name")
}
if ref.Kind == "" {
return nil, fmt.Errorf("invalid source object: empty kind")
}
id := ref.ToObjMetadata()
// get object from cache
if atm.ResourceCache != nil {
result := atm.ResourceCache.Get(id)
// Use the cached version, if current/reconciled.
// Otherwise, get it from the cluster.
if result.Resource != nil && result.Status == status.CurrentStatus {
return result.Resource, nil
}
}
// get object from cluster
namespacedClient := atm.Client.Resource(mapping.Resource).Namespace(ref.Namespace)
obj, err := namespacedClient.Get(ctx, ref.Name, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
// Skip NotFound so the cache gets updated.
return nil, fmt.Errorf("failed to retrieve object from cluster: %w", err)
}
// add object to cache
if atm.ResourceCache != nil {
// If it's not cached or not current, update the cache.
// This will add external objects to the cache,
// but the user won't get status events for them.
atm.ResourceCache.Put(id, computeStatus(obj))
}
if err != nil {
// NotFound
return nil, fmt.Errorf("object not found: %w", err)
}
return obj, nil
}
// computeStatus compares the spec to the status and returns the result.
func computeStatus(obj *unstructured.Unstructured) cache.ResourceStatus {
if obj == nil {
return cache.ResourceStatus{
Resource: obj,
Status: status.NotFoundStatus,
StatusMessage: "Object not found",
}
}
result, err := status.Compute(obj)
if err != nil {
if klog.V(3).Enabled() {
ref := mutation.ResourceReferenceFromUnstructured(obj)
klog.Infof("failed to compute object status (%s): %d", ref, err)
}
return cache.ResourceStatus{
Resource: obj,
Status: status.UnknownStatus,
//StatusMessage: fmt.Sprintf("Failed to compute status: %s", err),
}
}
return cache.ResourceStatus{
Resource: obj,
Status: result.Status,
StatusMessage: result.Message,
}
}
func readFieldValue(obj *unstructured.Unstructured, path string) (interface{}, bool, error) {
if path == "" {
return nil, false, errors.New("empty path expression")
}
values, err := jsonpath.Get(obj.Object, path)
if err != nil {
return nil, false, err
}
if len(values) != 1 {
return nil, false, fmt.Errorf("expected 1 match, but found %d)", len(values))
}
return values[0], true, nil
}
func writeFieldValue(obj *unstructured.Unstructured, path string, value interface{}) error {
if path == "" {
return errors.New("empty path expression")
}
found, err := jsonpath.Set(obj.Object, path, value)
if err != nil {
return err
}
if found != 1 {
return fmt.Errorf("expected 1 match, but found %d)", found)
}
return nil
}
// valueToString converts an interface{} to a string, formatting as json for
// maps, lists. Designed to handle yaml/json/krm primitives.
func valueToString(value interface{}) (string, error) {
var valueString string
switch valueTyped := value.(type) {
case string:
valueString = valueTyped
case int, int32, int64, float32, float64, bool:
valueString = fmt.Sprintf("%v", valueTyped)
default:
jsonBytes, err := json.Marshal(valueTyped)
if err != nil {
return "", fmt.Errorf("failed to marshal value to json: %#v", value)
}
valueString = string(jsonBytes)
}
return valueString, nil
}