ServiceQualities support serverless pod (#212)

This commit is contained in:
berg 2025-04-22 20:21:32 +08:00 committed by GitHub
parent 897e706a85
commit 3da984ff96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 389 additions and 23 deletions

View File

@ -18,7 +18,7 @@ package gameserverset
import (
"context"
kruisev1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruiseV1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
@ -37,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
@ -81,6 +82,17 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
return err
}
if err = c.Watch(source.Kind(mgr.GetCache(), &kruisev1alpha1.PodProbeMarker{}, &handler.TypedEnqueueRequestForObject[*kruisev1alpha1.PodProbeMarker]{}, predicate.TypedFuncs[*kruisev1alpha1.PodProbeMarker]{
UpdateFunc: func(e event.TypedUpdateEvent[*kruisev1alpha1.PodProbeMarker]) bool {
oldScS := e.ObjectOld
newScS := e.ObjectNew
return oldScS.Status.ObservedGeneration != newScS.Status.ObservedGeneration
},
})); err != nil {
klog.Error(err)
return err
}
if err = watchPod(mgr, c); err != nil {
klog.Error(err)
return err
@ -178,6 +190,16 @@ func (r *GameServerSetReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return reconcile.Result{}, err
}
gsm := NewGameServerSetManager(gss, r.Client, r.recorder)
// The serverless scenario PodProbeMarker takes effect during the Webhook phase, so need to create the PodProbeMarker in advance.
err, done := gsm.SyncPodProbeMarker()
if err != nil {
klog.Errorf("GameServerSet %s failed to synchronize PodProbeMarker in %s,because of %s.", namespacedName.Name, namespacedName.Namespace, err.Error())
return reconcile.Result{}, err
} else if !done {
return reconcile.Result{}, nil
}
// get advanced statefulset
asts := &kruiseV1beta1.StatefulSet{}
err = r.Get(ctx, namespacedName, asts)
@ -207,7 +229,7 @@ func (r *GameServerSetReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return reconcile.Result{}, err
}
gsm := NewGameServerSetManager(gss, asts, podList.Items, r.Client, r.recorder)
gsm.SyncStsAndPodList(asts, podList.Items)
// kill game servers
newReplicas := gsm.GetReplicasAfterKilling()
@ -242,12 +264,6 @@ func (r *GameServerSetReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return reconcile.Result{}, nil
}
err = gsm.SyncPodProbeMarker()
if err != nil {
klog.Errorf("GameServerSet %s failed to synchronize PodProbeMarker in %s,because of %s.", namespacedName.Name, namespacedName.Namespace, err.Error())
return reconcile.Result{}, err
}
// sync GameServerSet Status
err = gsm.SyncStatus()
if err != nil {

View File

@ -18,10 +18,6 @@ package gameserverset
import (
"context"
"sort"
"strconv"
"sync"
kruiseV1alpha1 "github.com/openkruise/kruise-api/apps/v1alpha1"
kruiseV1beta1 "github.com/openkruise/kruise-api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
@ -36,6 +32,9 @@ import (
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sort"
"strconv"
"sync"
gameKruiseV1alpha1 "github.com/openkruise/kruise-game/apis/v1alpha1"
"github.com/openkruise/kruise-game/pkg/util"
@ -47,10 +46,19 @@ type Control interface {
SyncStatus() error
IsNeedToScale() bool
IsNeedToUpdateWorkload() bool
SyncPodProbeMarker() error
SyncPodProbeMarker() (error, bool)
GetReplicasAfterKilling() *int32
SyncStsAndPodList(asts *kruiseV1beta1.StatefulSet, gsList []corev1.Pod)
}
const (
DefaultTimeoutSeconds = 5
DefaultInitialDelaySeconds = 10
DefaultPeriodSeconds = 3
DefaultSuccessThreshold = 1
DefaultFailureThreshold = 3
)
const (
ScaleReason = "Scale"
CreatePPMReason = "CreatePpm"
@ -67,16 +75,19 @@ type GameServerSetManager struct {
eventRecorder record.EventRecorder
}
func NewGameServerSetManager(gss *gameKruiseV1alpha1.GameServerSet, asts *kruiseV1beta1.StatefulSet, gsList []corev1.Pod, c client.Client, recorder record.EventRecorder) Control {
func NewGameServerSetManager(gss *gameKruiseV1alpha1.GameServerSet, c client.Client, recorder record.EventRecorder) Control {
return &GameServerSetManager{
gameServerSet: gss,
asts: asts,
podList: gsList,
client: c,
eventRecorder: recorder,
}
}
func (manager *GameServerSetManager) SyncStsAndPodList(asts *kruiseV1beta1.StatefulSet, gsList []corev1.Pod) {
manager.asts = asts
manager.podList = gsList
}
func (manager *GameServerSetManager) GetReplicasAfterKilling() *int32 {
gss := manager.gameServerSet
asts := manager.asts
@ -326,7 +337,7 @@ func (manager *GameServerSetManager) UpdateWorkload() error {
return retryErr
}
func (manager *GameServerSetManager) SyncPodProbeMarker() error {
func (manager *GameServerSetManager) SyncPodProbeMarker() (error, bool) {
gss := manager.gameServerSet
sqs := gss.Spec.ServiceQualities
c := manager.client
@ -341,27 +352,37 @@ func (manager *GameServerSetManager) SyncPodProbeMarker() error {
if err != nil {
if errors.IsNotFound(err) {
if sqs == nil {
return nil
return nil, true
}
// create ppm
manager.eventRecorder.Event(gss, corev1.EventTypeNormal, CreatePPMReason, "create PodProbeMarker")
return c.Create(ctx, createPpm(gss))
ppm = createPpm(gss)
klog.Infof("GameserverSet(%s/%s) create PodProbeMarker(%s)", gss.Namespace, gss.Name, ppm.Name)
return c.Create(ctx, ppm), false
}
return err
return err, false
}
// delete ppm
if sqs == nil {
return c.Delete(ctx, ppm)
klog.Infof("GameserverSet(%s/%s) ServiceQualities is empty, and delete PodProbeMarker", gss.Namespace, gss.Name)
return c.Delete(ctx, ppm), false
}
// update ppm
if util.GetHash(gss.Spec.ServiceQualities) != ppm.GetAnnotations()[gameKruiseV1alpha1.PpmHashKey] {
ppm.Spec.Probes = constructProbes(gss)
by, _ := json.Marshal(ppm.Spec.Probes)
manager.eventRecorder.Event(gss, corev1.EventTypeNormal, UpdatePPMReason, "update PodProbeMarker")
return c.Update(ctx, ppm)
klog.Infof("GameserverSet(%s/%s) update PodProbeMarker(%s) body(%s)", gss.Namespace, gss.Name, ppm.Name, string(by))
return c.Update(ctx, ppm), false
}
return nil
// Determine PodProbeMarker Status to ensure that PodProbeMarker resources have been processed by kruise-manager
if ppm.Generation != ppm.Status.ObservedGeneration {
klog.Infof("GameserverSet(%s/%s) PodProbeMarker(%s) status observedGeneration is inconsistent, and wait a moment", gss.Namespace, gss.Name, ppm.Name)
return nil, false
}
return nil, true
}
func constructProbes(gss *gameKruiseV1alpha1.GameServerSet) []kruiseV1alpha1.PodContainerProbe {
@ -375,6 +396,21 @@ func constructProbes(gss *gameKruiseV1alpha1.GameServerSet) []kruiseV1alpha1.Pod
},
PodConditionType: util.AddPrefixGameKruise(sq.Name),
}
if probe.Probe.TimeoutSeconds == 0 {
probe.Probe.TimeoutSeconds = DefaultTimeoutSeconds
}
if probe.Probe.InitialDelaySeconds == 0 {
probe.Probe.InitialDelaySeconds = DefaultInitialDelaySeconds
}
if probe.Probe.PeriodSeconds == 0 {
probe.Probe.PeriodSeconds = DefaultPeriodSeconds
}
if probe.Probe.SuccessThreshold == 0 {
probe.Probe.SuccessThreshold = DefaultSuccessThreshold
}
if probe.Probe.FailureThreshold == 0 {
probe.Probe.FailureThreshold = DefaultFailureThreshold
}
probes = append(probes, probe)
}
return probes

View File

@ -1264,3 +1264,317 @@ func TestGameServerSetManager_UpdateWorkload(t *testing.T) {
}
}
}
func TestGameServerSetManager_SyncPodProbeMarker(t *testing.T) {
tests := []struct {
name string
getGss func() *gameKruiseV1alpha1.GameServerSet
getPPM func() *kruiseV1alpha1.PodProbeMarker
newPPM func() *kruiseV1alpha1.PodProbeMarker
expectedDone bool
}{
{
name: "first create PPM",
getGss: func() *gameKruiseV1alpha1.GameServerSet {
obj := &gameKruiseV1alpha1.GameServerSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "xxx",
Name: "case0",
},
Spec: gameKruiseV1alpha1.GameServerSetSpec{
ServiceQualities: []gameKruiseV1alpha1.ServiceQuality{
{
Name: "healthy",
ContainerName: "main",
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
},
},
},
},
}
return obj
},
getPPM: func() *kruiseV1alpha1.PodProbeMarker {
return nil
},
newPPM: func() *kruiseV1alpha1.PodProbeMarker {
obj := &kruiseV1alpha1.PodProbeMarker{
Spec: kruiseV1alpha1.PodProbeMarkerSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"game.kruise.io/owner-gss": "case0",
},
},
Probes: []kruiseV1alpha1.PodContainerProbe{
{
Name: "healthy",
ContainerName: "main",
PodConditionType: "game.kruise.io/healthy",
Probe: kruiseV1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
InitialDelaySeconds: DefaultInitialDelaySeconds,
TimeoutSeconds: DefaultTimeoutSeconds,
PeriodSeconds: DefaultPeriodSeconds,
SuccessThreshold: DefaultSuccessThreshold,
FailureThreshold: DefaultFailureThreshold,
},
},
},
},
},
}
return obj
},
expectedDone: false,
},
{
name: "second check PPM status, and false",
getGss: func() *gameKruiseV1alpha1.GameServerSet {
obj := &gameKruiseV1alpha1.GameServerSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "xxx",
Name: "case0",
},
Spec: gameKruiseV1alpha1.GameServerSetSpec{
ServiceQualities: []gameKruiseV1alpha1.ServiceQuality{
{
Name: "healthy",
ContainerName: "main",
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
},
},
},
},
}
return obj
},
getPPM: func() *kruiseV1alpha1.PodProbeMarker {
obj := &kruiseV1alpha1.PodProbeMarker{
ObjectMeta: metav1.ObjectMeta{
Namespace: "xxx",
Name: "case0",
Generation: 1,
Annotations: map[string]string{
"game.kruise.io/ppm-hash": "3716291985",
},
},
Spec: kruiseV1alpha1.PodProbeMarkerSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"game.kruise.io/owner-gss": "case0",
},
},
Probes: []kruiseV1alpha1.PodContainerProbe{
{
Name: "healthy",
ContainerName: "main",
PodConditionType: "game.kruise.io/healthy",
Probe: kruiseV1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
InitialDelaySeconds: DefaultInitialDelaySeconds,
TimeoutSeconds: DefaultTimeoutSeconds,
PeriodSeconds: DefaultPeriodSeconds,
SuccessThreshold: DefaultSuccessThreshold,
FailureThreshold: DefaultFailureThreshold,
},
},
},
},
},
}
return obj
},
newPPM: func() *kruiseV1alpha1.PodProbeMarker {
obj := &kruiseV1alpha1.PodProbeMarker{
Spec: kruiseV1alpha1.PodProbeMarkerSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"game.kruise.io/owner-gss": "case0",
},
},
Probes: []kruiseV1alpha1.PodContainerProbe{
{
Name: "healthy",
ContainerName: "main",
PodConditionType: "game.kruise.io/healthy",
Probe: kruiseV1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
InitialDelaySeconds: DefaultInitialDelaySeconds,
TimeoutSeconds: DefaultTimeoutSeconds,
PeriodSeconds: DefaultPeriodSeconds,
SuccessThreshold: DefaultSuccessThreshold,
FailureThreshold: DefaultFailureThreshold,
},
},
},
},
},
}
return obj
},
expectedDone: false,
},
{
name: "third check PPM status, and true",
getGss: func() *gameKruiseV1alpha1.GameServerSet {
obj := &gameKruiseV1alpha1.GameServerSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "xxx",
Name: "case0",
},
Spec: gameKruiseV1alpha1.GameServerSetSpec{
ServiceQualities: []gameKruiseV1alpha1.ServiceQuality{
{
Name: "healthy",
ContainerName: "main",
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
},
},
},
},
}
return obj
},
getPPM: func() *kruiseV1alpha1.PodProbeMarker {
obj := &kruiseV1alpha1.PodProbeMarker{
ObjectMeta: metav1.ObjectMeta{
Namespace: "xxx",
Name: "case0",
Generation: 1,
Annotations: map[string]string{
"game.kruise.io/ppm-hash": "3716291985",
},
},
Spec: kruiseV1alpha1.PodProbeMarkerSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"game.kruise.io/owner-gss": "case0",
},
},
Probes: []kruiseV1alpha1.PodContainerProbe{
{
Name: "healthy",
ContainerName: "main",
PodConditionType: "game.kruise.io/healthy",
Probe: kruiseV1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
InitialDelaySeconds: DefaultInitialDelaySeconds,
TimeoutSeconds: DefaultTimeoutSeconds,
PeriodSeconds: DefaultPeriodSeconds,
SuccessThreshold: DefaultSuccessThreshold,
FailureThreshold: DefaultFailureThreshold,
},
},
},
},
},
Status: kruiseV1alpha1.PodProbeMarkerStatus{
ObservedGeneration: 1,
},
}
return obj
},
newPPM: func() *kruiseV1alpha1.PodProbeMarker {
obj := &kruiseV1alpha1.PodProbeMarker{
Spec: kruiseV1alpha1.PodProbeMarkerSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"game.kruise.io/owner-gss": "case0",
},
},
Probes: []kruiseV1alpha1.PodContainerProbe{
{
Name: "healthy",
ContainerName: "main",
PodConditionType: "game.kruise.io/healthy",
Probe: kruiseV1alpha1.ContainerProbeSpec{
Probe: corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Exec: &corev1.ExecAction{
Command: []string{"/bin/sh", "-c", "/healthy.sh"},
},
},
InitialDelaySeconds: DefaultInitialDelaySeconds,
TimeoutSeconds: DefaultTimeoutSeconds,
PeriodSeconds: DefaultPeriodSeconds,
SuccessThreshold: DefaultSuccessThreshold,
FailureThreshold: DefaultFailureThreshold,
},
},
},
},
},
}
return obj
},
expectedDone: true,
},
}
recorder := record.NewFakeRecorder(100)
for _, test := range tests {
gss := test.getGss()
objs := []client.Object{gss}
ppm := test.getPPM()
if ppm != nil {
objs = append(objs, ppm)
}
c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(objs...).Build()
manager := &GameServerSetManager{
gameServerSet: gss,
client: c,
eventRecorder: recorder,
}
err, done := manager.SyncPodProbeMarker()
if err != nil {
t.Errorf("SyncPodProbeMarker failed: %s", err.Error())
} else if done != test.expectedDone {
t.Errorf("expected(%v), but get(%v)", test.expectedDone, done)
}
newObj := &kruiseV1alpha1.PodProbeMarker{}
if err = manager.client.Get(context.TODO(), types.NamespacedName{
Namespace: gss.Namespace,
Name: gss.Name,
}, newObj); err != nil {
t.Error(err)
}
if !reflect.DeepEqual(newObj.Spec, test.newPPM().Spec) {
t.Errorf("expect new asts spec %v but got %v", test.newPPM().Spec, newObj.Spec)
}
}
}