Refactor: Use PostBind to update Reservation

This commit is contained in:
leggasai 2025-06-22 15:21:32 +08:00
parent 8db0404b0c
commit d1afec69b2
7 changed files with 70 additions and 9 deletions

View File

@ -198,6 +198,7 @@ func (alloc *Action) allocateResources(queues *util.PriorityQueue, jobsMap map[a
pendingTasks[job.UID] = tasksQueue
}
} else {
// todo: 需要解耦
if job.IsUseReservation() {
stmt = alloc.allocateResourcesForReservationTasks(tasks, job, jobs, allNodes)
} else {

View File

@ -924,11 +924,8 @@ func (sc *SchedulerCache) Bind(ctx context.Context, bindContexts []*BindContext)
for _, bindContext := range bindContexts {
if reason, ok := errMsg[bindContext.TaskInfo.UID]; !ok {
task := bindContext.TaskInfo
if task.IsUseReservationTask() {
if err := sc.syncBindToReservationTask(task); err != nil {
klog.Errorf("Failed to sync task %s to reservation task, err: %v", task.Name, err)
}
if err := sc.executePostBind(ctx, bindContext); err != nil {
klog.Errorf("Failed to execute postBind for task %s/%s, err: %v", bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name, err)
}
sc.Recorder.Eventf(bindContext.TaskInfo.Pod, v1.EventTypeNormal, "Scheduled", "Successfully assigned %v/%v to %v", bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name, bindContext.TaskInfo.NodeName)
} else {
@ -1231,6 +1228,8 @@ func (sc *SchedulerCache) processSyncHyperNode() {
// AddBindTask add task to be bind to a cache which consumes by go runtime
func (sc *SchedulerCache) AddBindTask(bindContext *BindContext) error {
klog.V(5).Infof("add bind task %v/%v", bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name)
// todo: 需要解耦
if bindContext.TaskInfo.IsReservationTask() {
return sc.processReservationTask(bindContext.TaskInfo)
}
@ -1259,6 +1258,7 @@ func (sc *SchedulerCache) AddBindTask(bindContext *BindContext) error {
}
task.NumaInfo = bindContext.TaskInfo.NumaInfo.Clone()
// todo: 需要解耦
if bindContext.TaskInfo.IsUseReservationTask() {
reservationTask := bindContext.TaskInfo.ReservationTaskInfo
// Remove reservation task from the node if it exists to release the reserved resources on node.
@ -1380,6 +1380,10 @@ func (sc *SchedulerCache) processReservationTask(taskInfo *schedulingapi.TaskInf
return nil
}
func (sc *SchedulerCache) SyncBindToReservationTask(task *schedulingapi.TaskInfo) error {
return sc.syncBindToReservationTask(task)
}
func (sc *SchedulerCache) syncBindToReservationTask(taskInfo *schedulingapi.TaskInfo) error {
klog.V(1).Infof("sync bind to reservation task %v/%v", taskInfo.Namespace, taskInfo.Name)
reservationTask := taskInfo.ReservationTaskInfo
@ -1457,6 +1461,24 @@ func (sc *SchedulerCache) executePreBinds(ctx context.Context, bindContexts []*B
return successfulBindContexts
}
// executePostBinds executes PostBind for one bindContext
func (sc *SchedulerCache) executePostBind(ctx context.Context, bindContext *BindContext) error {
sc.binderRegistry.mu.RLock()
defer sc.binderRegistry.mu.RUnlock()
for _, postBinder := range sc.binderRegistry.postBinders {
if postBinder != nil {
if err := postBinder.PostBind(ctx, bindContext); err != nil {
klog.Errorf("PostBind failed for task %s/%s: %v",
bindContext.TaskInfo.Namespace, bindContext.TaskInfo.Name, err)
return err
}
}
}
return nil
}
// BindTask do k8s binding with a goroutine
func (sc *SchedulerCache) BindTask() {
klog.V(5).Infof("batch bind task count %d", sc.batchNum)

View File

@ -21,14 +21,15 @@ func GetBindMethod() Binder {
// BinderRegistry is used to hold the registered binders, such as pre-binders, post-binders
type BinderRegistry struct {
mu sync.RWMutex
preBinders map[string]PreBinder
// Can add postBinders in the future
mu sync.RWMutex
preBinders map[string]PreBinder
postBinders map[string]PostBinder
}
func NewBinderRegistry() *BinderRegistry {
return &BinderRegistry{
preBinders: make(map[string]PreBinder),
preBinders: make(map[string]PreBinder),
postBinders: make(map[string]PostBinder),
}
}
@ -43,4 +44,9 @@ func (r *BinderRegistry) Register(name string, binder interface{}) {
klog.V(5).Infof("Register preBinder %s successfully", name)
r.preBinders[name] = pb
}
if pb, ok := binder.(PostBinder); ok {
klog.V(5).Infof("Register postBinder %s successfully", name)
r.postBinders[name] = pb
}
}

View File

@ -91,6 +91,8 @@ type Cache interface {
// GetReservationCache returns the reservation cache
GetReservationCache() *ReservationCache
SyncBindToReservationTask(task *api.TaskInfo) error
}
// Binder interface for binding task and hostname
@ -121,3 +123,7 @@ type PreBinder interface {
// PreBindRollBack is called when the pre-bind or bind fails.
PreBindRollBack(ctx context.Context, bindCtx *BindContext)
}
type PostBinder interface {
PostBind(ctx context.Context, bindCtx *BindContext) error
}

View File

@ -116,6 +116,7 @@ func (rc *ReservationCache) SyncTaskStatus(task *schedulerapi.TaskInfo, job *sch
return nil
}
// todo: Async use Queue
func (rc *ReservationCache) syncReservation(reservation *schedulerapi.ReservationInfo, job *schedulerapi.JobInfo) error {
rsveV1beta1, err := rc.vcClient.SchedulingV1beta1().Reservations(reservation.Reservation.Namespace).Get(context.TODO(), reservation.Reservation.Name, metav1.GetOptions{})
if err != nil {

View File

@ -965,3 +965,7 @@ func (ssn *Session) CheckReservationMatch(job *api.JobInfo) bool {
klog.V(1).Infof("[debug]: used: %v", used)
return true
}
func (ssn *Session) Cache() cache.Cache {
return ssn.cache
}

View File

@ -17,12 +17,14 @@ limitations under the License.
package reservation
import (
"context"
"fmt"
"k8s.io/klog/v2"
"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/cache"
"volcano.sh/volcano/pkg/scheduler/framework"
)
@ -33,6 +35,7 @@ const (
type reservationPlugin struct {
// Arguments given for the plugin
session *framework.Session
}
// New function returns prioritizePlugin object
@ -46,6 +49,8 @@ func (rp *reservationPlugin) Name() string {
func (rp *reservationPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(5).Infof("Enter reservation plugin ...")
rp.session = ssn
defer func() {
klog.V(5).Infof("Leaving reservation plugin...")
}()
@ -123,6 +128,22 @@ func (rp *reservationPlugin) OnSessionOpen(ssn *framework.Session) {
return nil
}
ssn.AddBestNodeFn(rp.Name(), bestNodeFn)
ssn.RegisterBinder(rp.Name(), rp)
}
func (rp *reservationPlugin) PostBind(ctx context.Context, bindCtx *cache.BindContext) error {
task := bindCtx.TaskInfo
if !task.IsUseReservationTask() {
return nil
}
if err := rp.session.Cache().SyncBindToReservationTask(task); err != nil {
klog.Errorf("Failed to sync task %s to reservation task, err: %v", task.Name, err)
}
return nil
}
func (rp *reservationPlugin) OnSessionClose(ssn *framework.Session) {}