196 lines
5.5 KiB
Go
196 lines
5.5 KiB
Go
package upgrade
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/rancher/system-upgrade-controller/pkg/crds"
|
|
upgradectl "github.com/rancher/system-upgrade-controller/pkg/generated/controllers/upgrade.cattle.io"
|
|
"github.com/rancher/system-upgrade-controller/pkg/version"
|
|
"github.com/rancher/wrangler/v3/pkg/apply"
|
|
"github.com/rancher/wrangler/v3/pkg/crd"
|
|
batchctl "github.com/rancher/wrangler/v3/pkg/generated/controllers/batch"
|
|
corectl "github.com/rancher/wrangler/v3/pkg/generated/controllers/core"
|
|
"github.com/rancher/wrangler/v3/pkg/leader"
|
|
"github.com/rancher/wrangler/v3/pkg/schemes"
|
|
"github.com/rancher/wrangler/v3/pkg/start"
|
|
"github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/kubernetes"
|
|
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/record"
|
|
)
|
|
|
|
const (
|
|
// readyDuration time to wait for CRDs to be ready.
|
|
readyDuration = time.Minute * 1
|
|
)
|
|
|
|
var (
|
|
ErrPlanNotReady = errors.New("plan is not valid and resolved")
|
|
ErrOutsideWindow = errors.New("current time is not within configured window")
|
|
ErrControllerNameRequired = errors.New("controller name is required")
|
|
ErrControllerNamespaceRequired = errors.New("controller namespace is required")
|
|
)
|
|
|
|
type Controller struct {
|
|
Namespace string
|
|
Name string
|
|
NodeName string
|
|
|
|
cfg *rest.Config
|
|
kcs *kubernetes.Clientset
|
|
|
|
clusterID string
|
|
leaderElect bool
|
|
|
|
coreFactory *corectl.Factory
|
|
batchFactory *batchctl.Factory
|
|
upgradeFactory *upgradectl.Factory
|
|
|
|
apply apply.Apply
|
|
recorder record.EventRecorder
|
|
}
|
|
|
|
func NewController(cfg *rest.Config, namespace, name, nodeName string, leaderElect bool, resync time.Duration) (ctl *Controller, err error) {
|
|
if namespace == "" {
|
|
return nil, ErrControllerNamespaceRequired
|
|
}
|
|
if name == "" {
|
|
return nil, ErrControllerNameRequired
|
|
}
|
|
|
|
if nodeName == "" {
|
|
nodeName, err = os.Hostname()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if cfg == nil {
|
|
cfg, err = rest.InClusterConfig()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
ctl = &Controller{
|
|
Namespace: namespace,
|
|
Name: name,
|
|
NodeName: nodeName,
|
|
cfg: cfg,
|
|
leaderElect: leaderElect,
|
|
}
|
|
|
|
ctl.kcs, err = kubernetes.NewForConfig(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctl.coreFactory, err = corectl.NewFactoryFromConfigWithOptions(cfg, &corectl.FactoryOptions{
|
|
Namespace: namespace,
|
|
Resync: resync,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctl.batchFactory, err = batchctl.NewFactoryFromConfigWithOptions(cfg, &batchctl.FactoryOptions{
|
|
Namespace: namespace,
|
|
Resync: resync,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctl.upgradeFactory, err = upgradectl.NewFactoryFromConfigWithOptions(cfg, &corectl.FactoryOptions{
|
|
Namespace: namespace,
|
|
Resync: resync,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctl.apply, err = apply.NewForConfig(cfg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
eventBroadcaster.StartStructuredLogging(0)
|
|
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ctl.kcs.CoreV1().Events(metav1.NamespaceAll)})
|
|
ctl.recorder = eventBroadcaster.NewRecorder(schemes.All, corev1.EventSource{Component: ctl.Name, Host: ctl.NodeName})
|
|
|
|
return ctl, nil
|
|
}
|
|
|
|
func (ctl *Controller) Start(ctx context.Context, threads int) error {
|
|
// This is consistent with events attached to the node generated by the kubelet
|
|
// https://github.com/kubernetes/kubernetes/blob/612130dd2f4188db839ea5c2dea07a96b0ad8d1c/pkg/kubelet/kubelet.go#L479-L485
|
|
nodeRef := &corev1.ObjectReference{
|
|
Kind: "Node",
|
|
Name: ctl.NodeName,
|
|
UID: types.UID(ctl.NodeName),
|
|
Namespace: "",
|
|
}
|
|
|
|
// cluster id hack: see https://groups.google.com/forum/#!msg/kubernetes-sig-architecture/mVGobfD4TpY/nkdbkX1iBwAJ
|
|
systemNS, err := ctl.kcs.CoreV1().Namespaces().Get(ctx, metav1.NamespaceSystem, metav1.GetOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctl.clusterID = fmt.Sprintf("%s", systemNS.UID)
|
|
|
|
if err := ctl.registerCRD(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
// register our handlers
|
|
if err := ctl.handleJobs(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := ctl.handleNodes(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := ctl.handlePlans(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := ctl.handleSecrets(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
appName := fmt.Sprintf("%s %s (%s)", version.Program, version.Version, version.GitCommit)
|
|
run := func(ctx context.Context) {
|
|
if err := start.All(ctx, threads, ctl.coreFactory, ctl.batchFactory, ctl.upgradeFactory); err != nil {
|
|
ctl.recorder.Eventf(nodeRef, corev1.EventTypeWarning, "StartFailed", "%s failed to start controllers for %s/%s: %v", appName, ctl.Namespace, ctl.Name, err)
|
|
logrus.Panicf("Failed to start controllers: %v", err)
|
|
}
|
|
ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Started", "%s running as %s/%s", appName, ctl.Namespace, ctl.Name)
|
|
}
|
|
|
|
if ctl.leaderElect {
|
|
ctl.recorder.Eventf(nodeRef, corev1.EventTypeNormal, "Starting", "%s starting leader election for %s/%s", appName, ctl.Namespace, ctl.Name)
|
|
leader.RunOrDie(ctx, ctl.Namespace, ctl.Name, ctl.kcs, run)
|
|
} else {
|
|
run(ctx)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ctl *Controller) registerCRD(ctx context.Context) error {
|
|
crds, err := crds.List()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
client, err := clientset.NewForConfig(ctl.cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return crd.BatchCreateCRDs(ctx, client.ApiextensionsV1().CustomResourceDefinitions(), nil, readyDuration, crds)
|
|
}
|