system-upgrade-controller/pkg/upgrade/controller.go

196 lines
5.5 KiB
Go

package upgrade
import (
"context"
"errors"
"fmt"
"os"
"time"
"github.com/rancher/system-upgrade-controller/pkg/crds"
upgradectl "github.com/rancher/system-upgrade-controller/pkg/generated/controllers/upgrade.cattle.io"
"github.com/rancher/system-upgrade-controller/pkg/version"
"github.com/rancher/wrangler/v3/pkg/apply"
"github.com/rancher/wrangler/v3/pkg/crd"
batchctl "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch"
corectl "github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
"github.com/rancher/wrangler/v3/pkg/leader"
"github.com/rancher/wrangler/v3/pkg/schemes"
"github.com/rancher/wrangler/v3/pkg/start"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
)
const (
// readyDuration time to wait for CRDs to be ready.
readyDuration = time.Minute * 1
)
var (
ErrPlanNotReady = errors.New("plan is not valid and resolved")
ErrOutsideWindow = errors.New("current time is not within configured window")
ErrControllerNameRequired = errors.New("controller name is required")
ErrControllerNamespaceRequired = errors.New("controller namespace is required")
)
type Controller struct {
Namespace string
Name string
NodeName string
cfg *rest.Config
kcs *kubernetes.Clientset
clusterID string
leaderElect bool
coreFactory *corectl.Factory
batchFactory *batchctl.Factory
upgradeFactory *upgradectl.Factory
apply apply.Apply
recorder record.EventRecorder
}
func NewController(cfg *rest.Config, namespace, name, nodeName string, leaderElect bool, resync time.Duration) (ctl *Controller, err error) {
if namespace == "" {
return nil, ErrControllerNamespaceRequired
}
if name == "" {
return nil, ErrControllerNameRequired
}
if nodeName == "" {
nodeName, err = os.Hostname()
if err != nil {
return nil, err
}
}
if cfg == nil {
cfg, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
}
ctl = &Controller{
Namespace: namespace,
Name: name,
NodeName: nodeName,
cfg: cfg,
leaderElect: leaderElect,
}
ctl.kcs, err = kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
ctl.coreFactory, err = corectl.NewFactoryFromConfigWithOptions(cfg, &corectl.FactoryOptions{
Namespace: namespace,
Resync: resync,
})
if err != nil {
return nil, err
}
ctl.batchFactory, err = batchctl.NewFactoryFromConfigWithOptions(cfg, &batchctl.FactoryOptions{
Namespace: namespace,
Resync: resync,
})
if err != nil {
return nil, err
}
ctl.upgradeFactory, err = upgradectl.NewFactoryFromConfigWithOptions(cfg, &corectl.FactoryOptions{
Namespace: namespace,
Resync: resync,
})
if err != nil {
return nil, err
}
ctl.apply, err = apply.NewForConfig(cfg)
if err != nil {
return nil, err
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ctl.kcs.CoreV1().Events(metav1.NamespaceAll)})
ctl.recorder = eventBroadcaster.NewRecorder(schemes.All, corev1.EventSource{Component: ctl.Name, Host: ctl.NodeName})
return ctl, nil
}
func (ctl *Controller) Start(ctx context.Context, threads int) error {
// This is consistent with events attached to the node generated by the kubelet
// https://github.com/kubernetes/kubernetes/blob/612130dd2f4188db839ea5c2dea07a96b0ad8d1c/pkg/kubelet/kubelet.go#L479-L485
nodeRef := &corev1.ObjectReference{
Kind: "Node",
Name: ctl.NodeName,
UID: types.UID(ctl.NodeName),
Namespace: "",
}
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
systemNS, err := ctl.kcs.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{})
if err != nil {
return err
}
ctl.clusterID = fmt.Sprintf("%s", systemNS.UID)
if err := ctl.registerCRD(ctx); err != nil {
return err
}
// register our handlers
if err := ctl.handleJobs(ctx); err != nil {
return err
}
if err := ctl.handleNodes(ctx); err != nil {
return err
}
if err := ctl.handlePlans(ctx); err != nil {
return err
}
if err := ctl.handleSecrets(ctx); err != nil {
return err
}
appName := fmt.Sprintf("%s %s (%s)", version.Program, version.Version, version.GitCommit)
run := func(ctx context.Context) {
if err := start.All(ctx, threads, ctl.coreFactory, ctl.batchFactory, ctl.upgradeFactory); err != nil {
ctl.recorder.Eventf(nodeRef, corev1.EventTypeWarning, "StartFailed", "%s failed to start controllers for %s/%s: %v", appName, ctl.Namespace, ctl.Name, err)
logrus.Panicf("Failed to start controllers: %v", err)
}
ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Started", "%s running as %s/%s", appName, ctl.Namespace, ctl.Name)
}
if ctl.leaderElect {
ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Starting", "%s starting leader election for %s/%s", appName, ctl.Namespace, ctl.Name)
leader.RunOrDie(ctx, ctl.Namespace, ctl.Name, ctl.kcs, run)
} else {
run(ctx)
}
return nil
}
func (ctl *Controller) registerCRD(ctx context.Context) error {
crds, err := crds.List()
if err != nil {
return err
}
client, err := clientset.NewForConfig(ctl.cfg)
if err != nil {
return err
}
return crd.BatchCreateCRDs(ctx, client.ApiextensionsV1().CustomResourceDefinitions(), nil, readyDuration, crds)
}