OpenAIRecorder: Remove records on model eviction/termination
Signed-off-by: Dorin Geman <dorin.geman@docker.com>
This commit is contained in:
parent
be8f3e6696
commit
3904f2314d
|
|
@ -13,6 +13,7 @@ import (
|
|||
"github.com/docker/model-runner/pkg/inference"
|
||||
"github.com/docker/model-runner/pkg/inference/models"
|
||||
"github.com/docker/model-runner/pkg/logging"
|
||||
"github.com/docker/model-runner/pkg/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -92,6 +93,8 @@ type loader struct {
|
|||
timestamps []time.Time
|
||||
// runnerConfigs maps model names to runner configurations
|
||||
runnerConfigs map[runnerKey]inference.BackendConfiguration
|
||||
// openAIRecorder is used to record OpenAI API inference requests and responses.
|
||||
openAIRecorder *metrics.OpenAIRecorder
|
||||
}
|
||||
|
||||
// newLoader creates a new loader.
|
||||
|
|
@ -99,6 +102,7 @@ func newLoader(
|
|||
log logging.Logger,
|
||||
backends map[string]inference.Backend,
|
||||
modelManager *models.Manager,
|
||||
openAIRecorder *metrics.OpenAIRecorder,
|
||||
) *loader {
|
||||
// Compute the number of runner slots to allocate. Because of RAM and VRAM
|
||||
// limitations, it's unlikely that we'll ever be able to fully populate
|
||||
|
|
@ -153,6 +157,7 @@ func newLoader(
|
|||
allocations: make([]uint64, nSlots),
|
||||
timestamps: make([]time.Time, nSlots),
|
||||
runnerConfigs: make(map[runnerKey]inference.BackendConfiguration),
|
||||
openAIRecorder: openAIRecorder,
|
||||
}
|
||||
l.guard <- struct{}{}
|
||||
return l
|
||||
|
|
@ -462,7 +467,7 @@ func (l *loader) load(ctx context.Context, backendName, model string, mode infer
|
|||
}
|
||||
// Create the runner.
|
||||
l.log.Infof("Loading %s backend runner with model %s in %s mode", backendName, model, mode)
|
||||
runner, err := run(l.log, backend, model, mode, slot, runnerConfig)
|
||||
runner, err := run(l.log, backend, model, mode, slot, runnerConfig, l.openAIRecorder)
|
||||
if err != nil {
|
||||
l.log.Warnf("Unable to start %s backend runner with model %s in %s mode: %v",
|
||||
backendName, model, mode, err,
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import (
|
|||
|
||||
"github.com/docker/model-runner/pkg/inference"
|
||||
"github.com/docker/model-runner/pkg/logging"
|
||||
"github.com/docker/model-runner/pkg/metrics"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -63,6 +64,8 @@ type runner struct {
|
|||
proxy *httputil.ReverseProxy
|
||||
// proxyLog is the stream used for logging by proxy.
|
||||
proxyLog io.Closer
|
||||
// openAIRecorder is used to record OpenAI API inference requests and responses.
|
||||
openAIRecorder *metrics.OpenAIRecorder
|
||||
// err is the error returned by the runner's backend, only valid after done is closed.
|
||||
err error
|
||||
}
|
||||
|
|
@ -75,6 +78,7 @@ func run(
|
|||
mode inference.BackendMode,
|
||||
slot int,
|
||||
runnerConfig *inference.BackendConfiguration,
|
||||
openAIRecorder *metrics.OpenAIRecorder,
|
||||
) (*runner, error) {
|
||||
// Create a dialer / transport that target backend on the specified slot.
|
||||
socket, err := RunnerSocketPath(slot)
|
||||
|
|
@ -124,16 +128,17 @@ func run(
|
|||
runDone := make(chan struct{})
|
||||
|
||||
r := &runner{
|
||||
log: log,
|
||||
backend: backend,
|
||||
model: model,
|
||||
mode: mode,
|
||||
cancel: runCancel,
|
||||
done: runDone,
|
||||
transport: transport,
|
||||
client: client,
|
||||
proxy: proxy,
|
||||
proxyLog: proxyLog,
|
||||
log: log,
|
||||
backend: backend,
|
||||
model: model,
|
||||
mode: mode,
|
||||
cancel: runCancel,
|
||||
done: runDone,
|
||||
transport: transport,
|
||||
client: client,
|
||||
proxy: proxy,
|
||||
proxyLog: proxyLog,
|
||||
openAIRecorder: openAIRecorder,
|
||||
}
|
||||
|
||||
proxy.ErrorHandler = func(w http.ResponseWriter, req *http.Request, err error) {
|
||||
|
|
@ -236,6 +241,8 @@ func (r *runner) terminate() {
|
|||
if err := r.proxyLog.Close(); err != nil {
|
||||
r.log.Warnf("Unable to close reverse proxy log writer: %v", err)
|
||||
}
|
||||
|
||||
r.openAIRecorder.RemoveModel(r.model)
|
||||
}
|
||||
|
||||
// ServeHTTP implements net/http.Handler.ServeHTTP. It forwards requests to the
|
||||
|
|
|
|||
|
|
@ -56,6 +56,8 @@ func NewScheduler(
|
|||
allowedOrigins []string,
|
||||
tracker *metrics.Tracker,
|
||||
) *Scheduler {
|
||||
openAIRecorder := metrics.NewOpenAIRecorder(log.WithField("component", "openai-recorder"))
|
||||
|
||||
// Create the scheduler.
|
||||
s := &Scheduler{
|
||||
log: log,
|
||||
|
|
@ -63,10 +65,10 @@ func NewScheduler(
|
|||
defaultBackend: defaultBackend,
|
||||
modelManager: modelManager,
|
||||
installer: newInstaller(log, backends, httpClient),
|
||||
loader: newLoader(log, backends, modelManager),
|
||||
loader: newLoader(log, backends, modelManager, openAIRecorder),
|
||||
router: http.NewServeMux(),
|
||||
tracker: tracker,
|
||||
openAIRecorder: metrics.NewOpenAIRecorder(log.WithField("component", "openai-recorder")),
|
||||
openAIRecorder: openAIRecorder,
|
||||
}
|
||||
|
||||
// Register routes.
|
||||
|
|
|
|||
|
|
@ -226,3 +226,15 @@ func (r *OpenAIRecorder) GetRecordsByModel(model string) []*RequestResponsePair
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *OpenAIRecorder) RemoveModel(model string) {
|
||||
r.m.Lock()
|
||||
defer r.m.Unlock()
|
||||
|
||||
if _, exists := r.records[model]; exists {
|
||||
delete(r.records, model)
|
||||
r.log.Infof("Removed records for model: %s", model)
|
||||
} else {
|
||||
r.log.Warnf("No records found for model: %s", model)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue