Uses runnerInfo as a value type for runner map, to store runtime info of the runner. Currently the slot and the model reference used.

This commit is contained in:
ilopezluna 2025-07-02 16:07:41 +02:00
parent d47c0f84bb
commit dfeb5677b4
No known key found for this signature in database
GPG Key ID: 5171774528BC2462
2 changed files with 51 additions and 43 deletions

View File

@ -48,6 +48,14 @@ type runnerKey struct {
mode inference.BackendMode
}
// runnerInfo holds information about a runner including its slot and the original model reference used to load it.
type runnerInfo struct {
// slot is the slot index where the runner is stored.
slot int
// modelRef is the original model reference (tag) used to load the runner.
modelRef string
}
// loader manages the loading and unloading of backend runners. It regulates
// active backends in a manner that avoids exhausting system resources. Loaders
// assume that all of their backends have been installed, so no load requests
@ -80,7 +88,7 @@ type loader struct {
// polling. Each signaling channel should be buffered (with size 1).
waiters map[chan<- struct{}]bool
// runners maps runner keys to their slot index.
runners map[runnerKey]int
runners map[runnerKey]runnerInfo
// slots maps slot indices to associated runners. A slot is considered free
// if the runner value in it is nil.
slots []*runner
@ -151,7 +159,7 @@ func newLoader(
guard: make(chan struct{}, 1),
availableMemory: totalMemory,
waiters: make(map[chan<- struct{}]bool),
runners: make(map[runnerKey]int, nSlots),
runners: make(map[runnerKey]runnerInfo, nSlots),
slots: make([]*runner, nSlots),
references: make([]uint, nSlots),
allocations: make([]uint64, nSlots),
@ -196,12 +204,12 @@ func (l *loader) broadcast() {
// lock. It returns the number of remaining runners.
func (l *loader) evict(idleOnly bool) int {
now := time.Now()
for r, slot := range l.runners {
unused := l.references[slot] == 0
idle := unused && now.Sub(l.timestamps[slot]) > l.runnerIdleTimeout
for r, runnerInfo := range l.runners {
unused := l.references[runnerInfo.slot] == 0
idle := unused && now.Sub(l.timestamps[runnerInfo.slot]) > l.runnerIdleTimeout
defunct := false
select {
case <-l.slots[slot].done:
case <-l.slots[runnerInfo.slot].done:
defunct = true
default:
}
@ -209,11 +217,11 @@ func (l *loader) evict(idleOnly bool) int {
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
r.backend, r.modelID, r.mode,
)
l.slots[slot].terminate()
l.slots[slot] = nil
l.availableMemory += l.allocations[slot]
l.allocations[slot] = 0
l.timestamps[slot] = time.Time{}
l.slots[runnerInfo.slot].terminate()
l.slots[runnerInfo.slot] = nil
l.availableMemory += l.allocations[runnerInfo.slot]
l.allocations[runnerInfo.slot] = 0
l.timestamps[runnerInfo.slot] = time.Time{}
delete(l.runners, r)
}
}
@ -224,17 +232,17 @@ func (l *loader) evict(idleOnly bool) int {
// It returns the number of remaining runners.
func (l *loader) evictRunner(backend, model string, mode inference.BackendMode) int {
allBackends := backend == ""
for r, slot := range l.runners {
unused := l.references[slot] == 0
for r, runnerInfo := range l.runners {
unused := l.references[runnerInfo.slot] == 0
if unused && (allBackends || r.backend == backend) && r.modelID == model && r.mode == mode {
l.log.Infof("Evicting %s backend runner with model %s in %s mode",
r.backend, r.modelID, r.mode,
)
l.slots[slot].terminate()
l.slots[slot] = nil
l.availableMemory += l.allocations[slot]
l.allocations[slot] = 0
l.timestamps[slot] = time.Time{}
l.slots[runnerInfo.slot].terminate()
l.slots[runnerInfo.slot] = nil
l.availableMemory += l.allocations[runnerInfo.slot]
l.allocations[runnerInfo.slot] = 0
l.timestamps[runnerInfo.slot] = time.Time{}
delete(l.runners, r)
}
}
@ -283,15 +291,15 @@ func stopAndDrainTimer(timer *time.Timer) {
func (l *loader) idleCheckDuration() time.Duration {
// Compute the oldest usage time for any idle runner.
var oldest time.Time
for _, slot := range l.runners {
for _, runnerInfo := range l.runners {
select {
case <-l.slots[slot].done:
case <-l.slots[runnerInfo.slot].done:
// Check immediately if a runner is defunct
return 0
default:
}
if l.references[slot] == 0 {
timestamp := l.timestamps[slot]
if l.references[runnerInfo.slot] == 0 {
timestamp := l.timestamps[runnerInfo.slot]
if oldest.IsZero() || timestamp.Before(oldest) {
oldest = timestamp
}
@ -382,7 +390,7 @@ func (l *loader) run(ctx context.Context) {
// load allocates a runner using the specified backend and modelID. If allocated,
// it should be released by the caller using the release mechanism (once the
// runner is no longer needed).
func (l *loader) load(ctx context.Context, backendName, modelID string, mode inference.BackendMode) (*runner, error) {
func (l *loader) load(ctx context.Context, backendName, modelID, modelRef string, mode inference.BackendMode) (*runner, error) {
// Grab the backend.
backend, ok := l.backends[backendName]
if !ok {
@ -430,17 +438,17 @@ func (l *loader) load(ctx context.Context, backendName, modelID string, mode inf
existing, ok := l.runners[runnerKey{backendName, modelID, mode}]
if ok {
select {
case <-l.slots[existing].done:
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, modelID)
if l.references[existing] == 0 {
case <-l.slots[existing.slot].done:
l.log.Warnf("%s runner for %s is defunct. Waiting for it to be evicted.", backendName, existing.modelRef)
if l.references[existing.slot] == 0 {
l.evictRunner(backendName, modelID, mode)
} else {
goto WaitForChange
}
default:
l.references[existing] += 1
l.timestamps[existing] = time.Time{}
return l.slots[existing], nil
l.references[existing.slot] += 1
l.timestamps[existing.slot] = time.Time{}
return l.slots[existing.slot], nil
}
}
@ -492,7 +500,7 @@ func (l *loader) load(ctx context.Context, backendName, modelID string, mode inf
// Perform registration and return the runner.
l.availableMemory -= memory
l.runners[runnerKey{backendName, modelID, mode}] = slot
l.runners[runnerKey{backendName, modelID, mode}] = runnerInfo{slot, modelRef}
l.slots[slot] = runner
l.references[slot] = 1
l.allocations[slot] = memory
@ -524,17 +532,17 @@ func (l *loader) release(runner *runner) {
slot := l.runners[runnerKey{runner.backend.Name(), runner.model, runner.mode}]
// Decrement the runner's reference count.
l.references[slot] -= 1
l.references[slot.slot] -= 1
// If the runner's reference count is now zero, then check if it is still
// active, and record now as its idle start time and signal the idle
// checker.
if l.references[slot] == 0 {
if l.references[slot.slot] == 0 {
select {
case <-runner.done:
l.evictRunner(runner.backend.Name(), runner.model, runner.mode)
default:
l.timestamps[slot] = time.Now()
l.timestamps[slot.slot] = time.Now()
select {
case l.idleCheck <- struct{}{}:
default:

View File

@ -241,7 +241,7 @@ func (s *Scheduler) handleOpenAIInference(w http.ResponseWriter, r *http.Request
modelID := s.modelManager.ResolveModelID(request.Model)
// Request a runner to execute the request and defer its release.
runner, err := s.loader.load(r.Context(), backend.Name(), modelID, backendMode)
runner, err := s.loader.load(r.Context(), backend.Name(), modelID, request.Model, backendMode)
if err != nil {
http.Error(w, fmt.Errorf("unable to load runner: %w", err).Error(), http.StatusInternalServerError)
return
@ -297,17 +297,17 @@ func (s *Scheduler) getLoaderStatus(ctx context.Context) []BackendStatus {
result := make([]BackendStatus, 0, len(s.loader.runners))
for key, slot := range s.loader.runners {
if s.loader.slots[slot] != nil {
for key, runnerInfo := range s.loader.runners {
if s.loader.slots[runnerInfo.slot] != nil {
status := BackendStatus{
BackendName: key.backend,
ModelName: key.modelID,
ModelName: runnerInfo.modelRef,
Mode: key.mode.String(),
LastUsed: time.Time{},
}
if s.loader.references[slot] == 0 {
status.LastUsed = s.loader.timestamps[slot]
if s.loader.references[runnerInfo.slot] == 0 {
status.LastUsed = s.loader.timestamps[runnerInfo.slot]
}
result = append(result, status)
@ -444,8 +444,8 @@ func (s *Scheduler) GetAllActiveRunners() []metrics.ActiveRunner {
mode: parseBackendMode(backend.Mode),
}
if slot, exists := s.loader.runners[key]; exists {
socket, err := RunnerSocketPath(slot)
if runnerInfo, exists := s.loader.runners[key]; exists {
socket, err := RunnerSocketPath(runnerInfo.slot)
if err != nil {
s.log.Warnf("Failed to get socket path for runner %s/%s: %v", backend.BackendName, backend.ModelName, err)
continue
@ -482,9 +482,9 @@ func (s *Scheduler) GetLlamaCppSocket() (string, error) {
mode: parseBackendMode(backend.Mode),
}
if slot, exists := s.loader.runners[key]; exists {
if runnerInfo, exists := s.loader.runners[key]; exists {
// Use the RunnerSocketPath function to get the socket path
return RunnerSocketPath(slot)
return RunnerSocketPath(runnerInfo.slot)
}
}
}