system-agent/pkg/applyinator/applyinator.go

606 lines
22 KiB
Go

package applyinator
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/rancher/system-agent/pkg/image"
"github.com/rancher/system-agent/pkg/prober"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
type Applyinator struct {
mu *sync.Mutex
workDir string
preserveWorkDir bool
appliedPlanDir string
interlockDir string
imageUtil *image.Utility
}
// CalculatedPlan is passed into Applyinator and is a Plan with checksum calculated
type CalculatedPlan struct {
Plan Plan
Checksum string
}
type Plan struct {
Files []File `json:"files,omitempty"`
OneTimeInstructions []OneTimeInstruction `json:"instructions,omitempty"`
Probes map[string]prober.Probe `json:"probes,omitempty"`
PeriodicInstructions []PeriodicInstruction `json:"periodicInstructions,omitempty"`
// ResetFailureCountOnStartup denotes whether the system-agent should reset the failure count
// and applied-checksum for plans that are force applied each time the system-agent starts.
ResetFailureCountOnStartup bool `json:"resetFailureCountOnStartup,omitempty"`
}
type CommonInstruction struct {
Name string `json:"name,omitempty"`
Image string `json:"image,omitempty"`
Env []string `json:"env,omitempty"`
Args []string `json:"args,omitempty"`
Command string `json:"command,omitempty"`
}
type PeriodicInstruction struct {
CommonInstruction
PeriodSeconds int `json:"periodSeconds,omitempty"` // default 600, i.e. 10 minutes
SaveStderrOutput bool `json:"saveStderrOutput,omitempty"`
}
type PeriodicInstructionOutput struct {
Name string `json:"name"`
Stdout []byte `json:"stdout"` // Stdout is a byte array of the gzip+base64 stdout output
Stderr []byte `json:"stderr"` // Stderr is a byte array of the gzip+base64 stderr output
ExitCode int `json:"exitCode"` // ExitCode is an int representing the exit code of the last run instruction
LastSuccessfulRunTime string `json:"lastSuccessfulRunTime"` // LastSuccessfulRunTime is a time.UnixDate formatted string of the last successful time (exit code 0) the instruction was run
Failures int `json:"failures"` // Failures is the number of time the periodic instruction has failed to run
LastFailedRunTime string `json:"lastFailedRunTime"` // LastFailedRunTime is a time.UnixDate formatted string of the time that the periodic instruction started failing
}
type OneTimeInstruction struct {
CommonInstruction
SaveOutput bool `json:"saveOutput,omitempty"`
}
// Path would be `/etc/kubernetes/ssl/ca.pem`, Content is base64 encoded.
// If Directory is true, then we are creating a directory, not a file
type File struct {
Content string `json:"content,omitempty"`
Directory bool `json:"directory,omitempty"`
UID int `json:"uid,omitempty"`
GID int `json:"gid,omitempty"`
Path string `json:"path,omitempty"`
Permissions string `json:"permissions,omitempty"` // internally, the string will be converted to a uint32 to satisfy os.FileMode
}
const appliedPlanFileSuffix = "-applied.plan"
const applyinatorDateCodeLayout = "20060102-150405"
const defaultCommand = "/run.sh"
const cattleAgentExecutionPwdEnvKey = "CATTLE_AGENT_EXECUTION_PWD"
const cattleAgentAttemptKey = "CATTLE_AGENT_ATTEMPT_NUMBER"
const planRetentionPolicyCount = 64
const restartPendingInterlockFile = "restart-pending"
const applyinatorActiveInterlockFile = "applyinator-active"
const restartPendingTimeout = 5 * time.Minute // Wait a maximum of 5 minutes before force-applying a plan if a restart is pending.
func NewApplyinator(workDir string, preserveWorkDir bool, appliedPlanDir, interlockDir string, imageUtil *image.Utility) *Applyinator {
return &Applyinator{
mu: &sync.Mutex{},
workDir: workDir,
preserveWorkDir: preserveWorkDir,
appliedPlanDir: appliedPlanDir,
interlockDir: interlockDir,
imageUtil: imageUtil,
}
}
func CalculatePlan(rawPlan []byte) (CalculatedPlan, error) {
var cp CalculatedPlan
var plan Plan
if err := json.Unmarshal(rawPlan, &plan); err != nil {
return cp, err
}
cp.Checksum = checksum(rawPlan)
cp.Plan = plan
return cp, nil
}
func checksum(input []byte) string {
h := sha256.New()
h.Write(input)
return fmt.Sprintf("%x", h.Sum(nil))
}
type ApplyOutput struct {
OneTimeOutput []byte
OneTimeApplySucceeded bool
PeriodicOutput []byte
PeriodicApplySucceeded bool
}
type ApplyInput struct {
CalculatedPlan CalculatedPlan
RunOneTimeInstructions bool
OneTimeInstructionAttempts int
ReconcileFiles bool
ExistingOneTimeOutput []byte
ExistingPeriodicOutput []byte
}
// Apply accepts a context, calculated plan, a bool to indicate whether to run the onetime instructions, the existing onetimeinstruction output, and an input byte slice which is a base64+gzip json-marshalled map of PeriodicInstructionOutput
// entries where the key is the PeriodicInstructionOutput.Name. It outputs a revised versions of the existing outputs, and if specified, runs the one time instructions. Notably, ApplyOutput.OneTimeApplySucceeded will be false if ApplyInput.RunOneTimeInstructions is false
func (a *Applyinator) Apply(ctx context.Context, input ApplyInput) (ApplyOutput, error) {
logrus.Debugf("[Applyinator] Applying plan with checksum %s", input.CalculatedPlan.Checksum)
logrus.Tracef("[Applyinator] Applying plan - attempting to get lock")
output := ApplyOutput{
OneTimeOutput: input.ExistingOneTimeOutput,
PeriodicOutput: input.ExistingPeriodicOutput,
}
a.mu.Lock()
logrus.Tracef("[Applyinator] Applying plan - lock achieved")
defer a.mu.Unlock()
now := time.Now()
nowUnixTimeString := now.Format(time.UnixDate)
nowString := now.Format(applyinatorDateCodeLayout)
// Check to see if we are safe to apply.
if a.interlockDir != "" {
restartPendingInterlockFilePath := filepath.Join(a.interlockDir, restartPendingInterlockFile)
applyinatorActiveInterlockFilePath := filepath.Join(a.interlockDir, applyinatorActiveInterlockFile)
// First off, remove check and remove the active interlock as the applyinator is not actually active
if _, err := os.Stat(applyinatorActiveInterlockFile); err == nil {
err = os.Remove(applyinatorActiveInterlockFile)
if err != nil {
logrus.Errorf("unable to remove applyinator active interlock file %s: %v", applyinatorActiveInterlockFilePath, err)
}
}
if _, err := os.Stat(restartPendingInterlockFilePath); err == nil {
// check the restart pending interlock file to see if we've passed our threshold for blocking
fileContents, err := os.ReadFile(restartPendingInterlockFilePath)
if err != nil {
return output, fmt.Errorf("unable to read restart pending interlock file %s: %w", restartPendingInterlockFilePath, err)
}
// Parse the time out of the file and determine if we have passed our time threshold
t, err := time.Parse(time.UnixDate, string(fileContents))
if err != nil {
// If we are unable to parse the first observed time out of the file, write "now" as the first observed time of the file.
if err := os.WriteFile(restartPendingInterlockFilePath, []byte(nowUnixTimeString), 0600); err != nil {
return output, fmt.Errorf("unable to write first-observed time to restart pending interlock file %s: %w", restartPendingInterlockFilePath, err)
}
return output, fmt.Errorf("restart is pending for system-agent, waiting %s until ignoring pending restart", restartPendingTimeout.String())
}
if now.Before(t.Add(restartPendingTimeout)) {
return output, fmt.Errorf("restart is pending for system-agent, waiting %s until ignoring pending restart", t.Add(restartPendingTimeout).Sub(now).String())
}
// remove the restart pending file
err = os.Remove(restartPendingInterlockFilePath)
if err != nil {
logrus.Errorf("error encountered while removing restart pending interlock file %s: %v", restartPendingInterlockFilePath, err)
}
}
// At this point, there is no restart-pending and we can continue with applyinator reconciliation, so create the applyinator-active file
err := os.WriteFile(applyinatorActiveInterlockFilePath, []byte(nowUnixTimeString), 0600)
if err != nil {
logrus.Errorf("unable to write applyinator active interlock file %s: %v", applyinatorActiveInterlockFilePath, err)
}
defer func() {
// Remove the Applyinator Active Interlock File
err = os.Remove(applyinatorActiveInterlockFilePath)
if err != nil {
logrus.Errorf("unable to remove applyinator active interlock file %s: %v", applyinatorActiveInterlockFilePath, err)
}
}()
}
executionDir := filepath.Join(a.workDir, nowString)
logrus.Tracef("[Applyinator] Applying calculated node plan contents %v", input.CalculatedPlan.Checksum)
logrus.Tracef("[Applyinator] Using %s as execution directory", executionDir)
if a.appliedPlanDir != "" {
logrus.Debugf("[Applyinator] Writing applied calculated plan contents to historical plan directory %s", a.appliedPlanDir)
if err := os.MkdirAll(a.appliedPlanDir, 0700); err != nil {
logrus.Errorf("error creawting applied plan directory: %v", err)
}
if err := a.writePlanToDisk(now, &input.CalculatedPlan); err != nil {
logrus.Errorf("error writing applied plan to disk: %v", err)
}
if err := a.appliedPlanRetentionPolicy(planRetentionPolicyCount); err != nil {
logrus.Errorf("error while applying plan retention policy: %v", err)
}
}
if input.ReconcileFiles {
for _, file := range input.CalculatedPlan.Plan.Files {
if file.Directory {
logrus.Debugf("[Applyinator] Creating directory %s", file.Path)
if err := createDirectory(file); err != nil {
return output, err
}
} else {
logrus.Debugf("[Applyinator] Writing file %s", file.Path)
if err := writeBase64ContentToFile(file); err != nil {
return output, err
}
}
}
}
if !a.preserveWorkDir {
logrus.Debugf("[Applyinator] Cleaning working directory before applying %s", a.workDir)
if err := os.RemoveAll(a.workDir); err != nil {
return output, err
}
}
if input.RunOneTimeInstructions {
logrus.Infof("[Applyinator] Applying one-time instructions for plan with checksum %s", input.CalculatedPlan.Checksum)
executionOutputs := map[string][]byte{}
if len(input.ExistingOneTimeOutput) > 0 {
objectBuffer, err := generateByteBufferFromBytes(input.ExistingOneTimeOutput)
if err != nil {
return output, err
}
if err := json.Unmarshal(objectBuffer.Bytes(), &executionOutputs); err != nil {
return output, err
}
}
oneTimeApplySucceeded := true
for index, instruction := range input.CalculatedPlan.Plan.OneTimeInstructions {
logrus.Debugf("[Applyinator] Executing instruction %d attempt %d for plan %s", index, input.OneTimeInstructionAttempts, input.CalculatedPlan.Checksum)
executionInstructionDir := filepath.Join(executionDir, input.CalculatedPlan.Checksum+"_"+strconv.Itoa(index))
prefix := input.CalculatedPlan.Checksum + "_" + strconv.Itoa(index)
executeOutput, _, exitCode, err := a.execute(ctx, prefix, executionInstructionDir, instruction.CommonInstruction, true, input.OneTimeInstructionAttempts)
if err != nil || exitCode != 0 {
logrus.Errorf("error executing instruction %d: %v", index, err)
oneTimeApplySucceeded = false
}
if instruction.Name == "" && instruction.SaveOutput {
logrus.Errorf("instruction does not have a name set, cannot save output data")
} else if instruction.SaveOutput {
executionOutputs[instruction.Name] = executeOutput
}
// If we have failed to apply our one-time instructions, we need to break in order to stop subsequent instructions from executing.
if !oneTimeApplySucceeded {
break
}
}
output.OneTimeApplySucceeded = oneTimeApplySucceeded
marshalledExecutionOutputs, err := json.Marshal(executionOutputs)
if err != nil {
return output, err
}
oneTimeApplyOutput, err := gzipByteSlice(marshalledExecutionOutputs)
if err != nil {
return output, err
}
output.OneTimeOutput = oneTimeApplyOutput
}
periodicOutputs := map[string]PeriodicInstructionOutput{}
if len(input.ExistingPeriodicOutput) > 0 {
objectBuffer, err := generateByteBufferFromBytes(input.ExistingPeriodicOutput)
if err != nil {
return output, err
}
if err := json.Unmarshal(objectBuffer.Bytes(), &periodicOutputs); err != nil {
return output, err
}
}
periodicApplySucceeded := true
for index, instruction := range input.CalculatedPlan.Plan.PeriodicInstructions {
if instruction.Name == "" {
logrus.Errorf("periodic instruction %d did not have name, unable to run", index)
continue
}
var previousRunTime, lastFailureTime string
var failures int
if po, ok := periodicOutputs[instruction.Name]; ok {
if po.LastSuccessfulRunTime != "" {
logrus.Debugf("[Applyinator] Got periodic output for instruction %s and am now parsing last successful run time %s", instruction.Name, po.LastSuccessfulRunTime)
t, err := time.Parse(time.UnixDate, po.LastSuccessfulRunTime)
if err != nil {
logrus.Errorf("error encountered during parsing of last successful run time: %v", err)
} else {
previousRunTime = po.LastSuccessfulRunTime
if instruction.PeriodSeconds == 0 {
instruction.PeriodSeconds = 600 // set default period to 600 seconds
}
if now.Before(t.Add(time.Second*time.Duration(instruction.PeriodSeconds))) && !input.RunOneTimeInstructions {
logrus.Debugf("[Applyinator] Not running periodic instruction %s as period duration has not elapsed since last successful run", instruction.Name)
continue
}
}
}
if po.LastFailedRunTime != "" {
logrus.Debugf("[Applyinator] Got periodic output for instruction %s and am now parsing last failed time %s", instruction.Name, po.LastFailedRunTime)
t, err := time.Parse(time.UnixDate, po.LastFailedRunTime)
if err != nil {
logrus.Errorf("error encountered during parsing of last failed run time: %+v", err)
} else {
lastFailureTime = po.LastFailedRunTime
failures = po.Failures
failureCooldown := failures
if failures > 6 {
failureCooldown = 6
} else if failures == 0 {
failureCooldown = 1
}
logrus.Debugf("[Applyinator] Instruction %s - Last failed run attempt was %s, failures: %d, failureCooldown: %d", instruction.Name, lastFailureTime, failures, failureCooldown)
if now.Before(t.Add(time.Second*time.Duration(30*failureCooldown))) && !input.RunOneTimeInstructions {
logrus.Debugf("[Applyinator] Not running periodic instruction %s as failure cooldown has not elapsed since last failed run", instruction.Name)
continue
}
}
}
}
logrus.Debugf("[Applyinator] Executing periodic instruction %d for plan %s", index, input.CalculatedPlan.Checksum)
executionInstructionDir := filepath.Join(executionDir, input.CalculatedPlan.Checksum+"_"+strconv.Itoa(index))
prefix := input.CalculatedPlan.Checksum + "_" + strconv.Itoa(index)
stdout, stderr, exitCode, err := a.execute(ctx, prefix, executionInstructionDir, instruction.CommonInstruction, false, failures+1)
if err != nil || exitCode != 0 {
periodicApplySucceeded = false
}
lsrt := nowUnixTimeString
if exitCode != 0 {
lsrt = previousRunTime
lastFailureTime = nowUnixTimeString
failures++
} else {
// reset last failure time and failure count
lastFailureTime = ""
failures = 0
}
if !instruction.SaveStderrOutput {
stderr = []byte{}
}
periodicOutputs[instruction.Name] = PeriodicInstructionOutput{
Name: instruction.Name,
Stdout: stdout,
Stderr: stderr,
ExitCode: exitCode,
LastSuccessfulRunTime: lsrt,
LastFailedRunTime: lastFailureTime,
Failures: failures,
}
if !periodicApplySucceeded {
break
}
}
output.PeriodicApplySucceeded = periodicApplySucceeded
marshalledExecutionOutputs, err := json.Marshal(periodicOutputs)
if err != nil {
return output, err
}
periodicApplyOutput, err := gzipByteSlice(marshalledExecutionOutputs)
if err != nil {
return output, err
}
output.PeriodicOutput = periodicApplyOutput
return output, nil
}
func gzipByteSlice(input []byte) ([]byte, error) {
var gzOutput bytes.Buffer
gzWriter := gzip.NewWriter(&gzOutput)
gzWriter.Write(input)
if err := gzWriter.Close(); err != nil {
return []byte{}, err
}
return gzOutput.Bytes(), nil
}
func generateByteBufferFromBytes(input []byte) (*bytes.Buffer, error) {
buffer := bytes.NewBuffer(input)
gzReader, err := gzip.NewReader(buffer)
if err != nil {
return nil, err
}
var objectBuffer bytes.Buffer
_, err = io.Copy(&objectBuffer, gzReader)
if err != nil {
return nil, err
}
return &objectBuffer, nil
}
func (a *Applyinator) appliedPlanRetentionPolicy(retention int) error {
planFiles, err := a.getAppliedPlanFiles()
if err != nil {
return err
}
if len(planFiles) <= retention {
return nil
}
sort.Slice(planFiles, func(i, j int) bool {
return planFiles[i].Name() < planFiles[j].Name()
})
delCount := len(planFiles) - retention
for _, df := range planFiles[:delCount] {
historicalPlanFile := filepath.Join(a.appliedPlanDir, df.Name())
logrus.Infof("[Applyinator] Removing historical applied plan (retention policy count: %d) %s", retention, historicalPlanFile)
if err := os.Remove(historicalPlanFile); err != nil {
return err
}
}
return nil
}
func (a *Applyinator) getAppliedPlanFiles() ([]os.DirEntry, error) {
var planFiles []os.DirEntry
dirListedPlanFiles, err := os.ReadDir(a.appliedPlanDir)
if err != nil {
return nil, err
}
for _, f := range dirListedPlanFiles {
if strings.HasSuffix(f.Name(), appliedPlanFileSuffix) && !f.IsDir() {
planFiles = append(planFiles, f)
}
}
return planFiles, nil
}
func (a *Applyinator) writePlanToDisk(now time.Time, plan *CalculatedPlan) error {
planFiles, err := a.getAppliedPlanFiles()
if err != nil {
return err
}
file := now.Format(applyinatorDateCodeLayout) + appliedPlanFileSuffix
anpString, err := json.Marshal(plan)
if err != nil {
return err
}
if len(planFiles) != 0 {
sort.Slice(planFiles, func(i, j int) bool {
return planFiles[i].Name() > planFiles[j].Name()
})
existingFileContent, err := ioutil.ReadFile(filepath.Join(a.appliedPlanDir, planFiles[0].Name()))
if err != nil {
return err
}
if bytes.Equal(existingFileContent, anpString) {
logrus.Debugf("[Applyinator] Not writing applied plan to file %s as the last file written (%s) had identical contents", file, planFiles[0].Name())
return nil
}
}
return writeContentToFile(filepath.Join(a.appliedPlanDir, file), os.Getuid(), os.Getgid(), 0600, anpString)
}
func (a *Applyinator) execute(ctx context.Context, prefix, executionDir string, instruction CommonInstruction, combinedOutput bool, attempt int) ([]byte, []byte, int, error) {
if instruction.Image == "" {
logrus.Infof("[Applyinator] No image provided, creating empty working directory %s", executionDir)
if err := createDirectory(File{Directory: true, Path: executionDir}); err != nil {
logrus.Errorf("error while creating empty working directory: %v", err)
return nil, nil, -1, err
}
} else {
logrus.Infof("[Applyinator] Extracting image %s to directory %s", instruction.Image, executionDir)
if err := a.imageUtil.Stage(executionDir, instruction.Image); err != nil {
logrus.Errorf("error while staging: %v", err)
return nil, nil, -1, err
}
}
command := instruction.Command
if command == "" {
logrus.Debugf("[Applyinator] Command was not specified, defaulting to %s%s", executionDir, defaultCommand)
command = executionDir + defaultCommand
}
cmd := exec.CommandContext(ctx, command, instruction.Args...)
logrus.Infof("[Applyinator] Running command: %s %v", instruction.Command, instruction.Args)
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, instruction.Env...)
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", cattleAgentExecutionPwdEnvKey, executionDir))
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%d", cattleAgentAttemptKey, attempt))
cmd.Env = append(cmd.Env, "PATH="+os.Getenv("PATH")+":"+executionDir)
cmd.Dir = executionDir
stdout, err := cmd.StdoutPipe()
if err != nil {
logrus.Errorf("error setting up stdout pipe: %v", err)
return nil, nil, -1, err
}
defer stdout.Close()
stderr, err := cmd.StderrPipe()
if err != nil {
logrus.Errorf("error setting up stderr pipe: %v", err)
return nil, nil, -1, err
}
defer stderr.Close()
var (
eg = errgroup.Group{}
stdoutWriteLock *sync.Mutex
stderrWriteLock *sync.Mutex
stdoutBuffer bytes.Buffer
stderrBuffer bytes.Buffer
)
if combinedOutput {
stderrBuffer = stdoutBuffer
stdoutWriteLock = &sync.Mutex{}
stderrWriteLock = stdoutWriteLock
} else {
stdoutWriteLock = &sync.Mutex{}
stderrWriteLock = &sync.Mutex{}
}
eg.Go(func() error {
return streamLogs("["+prefix+":stdout]", &stdoutBuffer, stdout, stdoutWriteLock)
})
eg.Go(func() error {
return streamLogs("["+prefix+":stderr]", &stderrBuffer, stderr, stderrWriteLock)
})
if err := cmd.Start(); err != nil {
return nil, nil, -1, err
}
// Wait for I/O to complete before calling cmd.Wait() because cmd.Wait() will close the I/O pipes.
_ = eg.Wait()
exitCode := 0
if err := cmd.Wait(); err != nil {
if ee, ok := err.(*exec.ExitError); ok {
exitCode = ee.ExitCode()
} else {
exitCode = -1
}
}
logrus.Infof("[Applyinator] Command %s %v finished with err: %v and exit code: %d", instruction.Command, instruction.Args, err, exitCode)
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), exitCode, err
}
// streamLogs accepts a prefix, outputBuffer, reader, and buffer lock and will scan input from the reader and write it
// to the output buffer while also logging anything that comes from the reader with the prefix.
func streamLogs(prefix string, outputBuffer *bytes.Buffer, reader io.Reader, lock *sync.Mutex) error {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
logrus.Infof("%s: %s", prefix, scanner.Text())
lock.Lock()
outputBuffer.Write(append(scanner.Bytes(), []byte("\n")...))
lock.Unlock()
}
return nil
}