From 01287f4fe7f3f7ba6e795efad0e668c9f8e8bb12 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 11 Jul 2023 11:23:44 +0800 Subject: [PATCH] fix: change model state in the same scheduler id (#2537) * fix: change model state Signed-off-by: Gaius * feat: remove objectStorageConfig in manager rpcserver Signed-off-by: Gaius --------- Signed-off-by: Gaius --- manager/manager.go | 2 +- manager/models/model.go | 2 +- manager/rpcserver/manager_server_v1.go | 41 +++++++++++--------------- manager/rpcserver/manager_server_v2.go | 41 +++++++++++--------------- manager/rpcserver/rpcserver.go | 25 +++++++--------- manager/service/model.go | 21 +++++++++++-- 6 files changed, 67 insertions(+), 65 deletions(-) diff --git a/manager/manager.go b/manager/manager.go index 7deabf074..9389dd06f 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -214,7 +214,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { } // Initialize GRPC server - _, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, &cfg.ObjectStorage, options...) + _, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, options...) if err != nil { return nil, err } diff --git a/manager/models/model.go b/manager/models/model.go index 1a0d48fa8..32d4e2a12 100644 --- a/manager/models/model.go +++ b/manager/models/model.go @@ -35,7 +35,7 @@ const ( // TODO(Gaius) Add regression analysis parameters. type Model struct { BaseModel - Name string `gorm:"column:name;type:varchar(256);index:uk_model_name,unique;not null;comment:name" json:"name"` + Name string `gorm:"column:name;type:varchar(256);not null;comment:name" json:"name"` Type string `gorm:"column:type;type:varchar(256);index:uk_model,unique;not null;comment:type" json:"type"` BIO string `gorm:"column:bio;type:varchar(1024);comment:biography" json:"bio"` Version string `gorm:"column:version;type:varchar(256);index:uk_model,unique;not null;comment:model version" json:"version"` diff --git a/manager/rpcserver/manager_server_v1.go b/manager/rpcserver/manager_server_v1.go index 283828f9e..ec6bdc5e7 100644 --- a/manager/rpcserver/manager_server_v1.go +++ b/manager/rpcserver/manager_server_v1.go @@ -75,25 +75,20 @@ type managerServerV1 struct { // Object storage interface. objectStorage objectstorage.ObjectStorage - - // Object storage configuration. - objectStorageConfig *config.ObjectStorageConfig } // newManagerServerV1 returns v1 version of the manager server. func newManagerServerV1( cfg *config.Config, database *database.Database, cache *cache.Cache, peerCache pkgcache.Cache, searcher searcher.Searcher, - objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, -) managerv1.ManagerServer { + objectStorage objectstorage.ObjectStorage) managerv1.ManagerServer { return &managerServerV1{ - config: cfg, - db: database.DB, - rdb: database.RDB, - cache: cache, - peerCache: peerCache, - searcher: searcher, - objectStorage: objectStorage, - objectStorageConfig: objectStorageConfig, + config: cfg, + db: database.DB, + rdb: database.RDB, + cache: cache, + peerCache: peerCache, + searcher: searcher, + objectStorage: objectStorage, } } @@ -611,17 +606,17 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis // Get object storage configuration. func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.GetObjectStorageRequest) (*managerv1.ObjectStorage, error) { - if !s.objectStorageConfig.Enable { + if !s.config.ObjectStorage.Enable { return nil, status.Error(codes.Internal, "object storage is disabled") } return &managerv1.ObjectStorage{ - Name: s.objectStorageConfig.Name, - Region: s.objectStorageConfig.Region, - Endpoint: s.objectStorageConfig.Endpoint, - AccessKey: s.objectStorageConfig.AccessKey, - SecretKey: s.objectStorageConfig.SecretKey, - S3ForcePathStyle: s.objectStorageConfig.S3ForcePathStyle, + Name: s.config.ObjectStorage.Name, + Region: s.config.ObjectStorage.Region, + Endpoint: s.config.ObjectStorage.Endpoint, + AccessKey: s.config.ObjectStorage.AccessKey, + SecretKey: s.config.ObjectStorage.SecretKey, + S3ForcePathStyle: s.config.ObjectStorage.S3ForcePathStyle, }, nil } @@ -629,13 +624,13 @@ func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.G func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBucketsRequest) (*managerv1.ListBucketsResponse, error) { log := logger.WithHostnameAndIP(req.Hostname, req.Ip) - if !s.objectStorageConfig.Enable { + if !s.config.ObjectStorage.Enable { log.Warn("object storage is disabled") return nil, status.Error(codes.Internal, "object storage is disabled") } var pbListBucketsResponse managerv1.ListBucketsResponse - cacheKey := pkgredis.MakeBucketKeyInManager(s.objectStorageConfig.Name) + cacheKey := pkgredis.MakeBucketKeyInManager(s.config.ObjectStorage.Name) // Cache hit. if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil { @@ -749,7 +744,7 @@ func (s *managerServerV1) ListApplications(ctx context.Context, req *managerv1.L func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.CreateModelRequest) (*emptypb.Empty, error) { log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp()) - if !s.objectStorageConfig.Enable { + if !s.config.ObjectStorage.Enable { log.Warn("object storage is disabled") return nil, status.Error(codes.Internal, "object storage is disabled") } diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index 9fa227aed..baee3ac67 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -75,25 +75,20 @@ type managerServerV2 struct { // Object storage interface. objectStorage objectstorage.ObjectStorage - - // Object storage configuration. - objectStorageConfig *config.ObjectStorageConfig } // newManagerServerV2 returns v2 version of the manager server. func newManagerServerV2( cfg *config.Config, database *database.Database, cache *cache.Cache, peerCache pkgcache.Cache, searcher searcher.Searcher, - objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, -) managerv2.ManagerServer { + objectStorage objectstorage.ObjectStorage) managerv2.ManagerServer { return &managerServerV2{ - config: cfg, - db: database.DB, - rdb: database.RDB, - cache: cache, - peerCache: peerCache, - searcher: searcher, - objectStorage: objectStorage, - objectStorageConfig: objectStorageConfig, + config: cfg, + db: database.DB, + rdb: database.RDB, + cache: cache, + peerCache: peerCache, + searcher: searcher, + objectStorage: objectStorage, } } @@ -610,29 +605,29 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis // Get object storage configuration. func (s *managerServerV2) GetObjectStorage(ctx context.Context, req *managerv2.GetObjectStorageRequest) (*managerv2.ObjectStorage, error) { - if !s.objectStorageConfig.Enable { + if !s.config.ObjectStorage.Enable { return nil, status.Error(codes.NotFound, "object storage is disabled") } return &managerv2.ObjectStorage{ - Name: s.objectStorageConfig.Name, - Region: s.objectStorageConfig.Region, - Endpoint: s.objectStorageConfig.Endpoint, - AccessKey: s.objectStorageConfig.AccessKey, - SecretKey: s.objectStorageConfig.SecretKey, - S3ForcePathStyle: s.objectStorageConfig.S3ForcePathStyle, + Name: s.config.ObjectStorage.Name, + Region: s.config.ObjectStorage.Region, + Endpoint: s.config.ObjectStorage.Endpoint, + AccessKey: s.config.ObjectStorage.AccessKey, + SecretKey: s.config.ObjectStorage.SecretKey, + S3ForcePathStyle: s.config.ObjectStorage.S3ForcePathStyle, }, nil } // List buckets configuration. func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBucketsRequest) (*managerv2.ListBucketsResponse, error) { - if !s.objectStorageConfig.Enable { + if !s.config.ObjectStorage.Enable { return nil, status.Error(codes.NotFound, "object storage is disabled") } log := logger.WithHostnameAndIP(req.Hostname, req.Ip) var pbListBucketsResponse managerv2.ListBucketsResponse - cacheKey := pkgredis.MakeBucketKeyInManager(s.objectStorageConfig.Name) + cacheKey := pkgredis.MakeBucketKeyInManager(s.config.ObjectStorage.Name) // Cache hit. if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil { @@ -746,7 +741,7 @@ func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.L func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.CreateModelRequest) (*emptypb.Empty, error) { log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp()) - if !s.objectStorageConfig.Enable { + if !s.config.ObjectStorage.Enable { log.Warn("object storage is disabled") return nil, status.Error(codes.Internal, "object storage is disabled") } diff --git a/manager/rpcserver/rpcserver.go b/manager/rpcserver/rpcserver.go index 164c6692b..59bcb35a5 100644 --- a/manager/rpcserver/rpcserver.go +++ b/manager/rpcserver/rpcserver.go @@ -81,9 +81,6 @@ type Server struct { // Object storage interface. objectStorage objectstorage.ObjectStorage - // Object storage configuration. - objectStorageConfig *config.ObjectStorageConfig - // serverOptions is server options of grpc. serverOptions []grpc.ServerOption @@ -123,17 +120,15 @@ func WithGRPCServerOptions(opts []grpc.ServerOption) Option { // New returns a new manager server from the given options. func New( cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher, - objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, opts ...Option, -) (*Server, *grpc.Server, error) { + objectStorage objectstorage.ObjectStorage, opts ...Option) (*Server, *grpc.Server, error) { s := &Server{ - config: cfg, - db: database.DB, - rdb: database.RDB, - cache: cache, - peerCache: pkgcache.New(DefaultPeerCacheExpiration, DefaultPeerCacheCleanupInterval), - searcher: searcher, - objectStorage: objectStorage, - objectStorageConfig: objectStorageConfig, + config: cfg, + db: database.DB, + rdb: database.RDB, + cache: cache, + peerCache: pkgcache.New(DefaultPeerCacheExpiration, DefaultPeerCacheCleanupInterval), + searcher: searcher, + objectStorage: objectStorage, } // Peer cache is evicted, and the metrics of the peer should be released. @@ -150,8 +145,8 @@ func New( } return s, managerserver.New( - newManagerServerV1(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage, s.objectStorageConfig), - newManagerServerV2(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage, s.objectStorageConfig), + newManagerServerV1(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage), + newManagerServerV2(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage), newSecurityServerV1(s.selfSignedCert), s.serverOptions...), nil } diff --git a/manager/service/model.go b/manager/service/model.go index aed592cc2..c049155fc 100644 --- a/manager/service/model.go +++ b/manager/service/model.go @@ -38,6 +38,20 @@ func (s *service) DestroyModel(ctx context.Context, id uint) error { return err } + // If the model is active, return an error. + if model.State == models.ModelVersionStateActive { + return errors.New("cannot delete an active model") + } + + version, err := strconv.Atoi(model.Version) + if err != nil { + return err + } + + if err := s.objectStorage.DeleteObject(ctx, s.config.Trainer.BucketName, types.MakeObjectKeyOfModelFile(model.Name, version)); err != nil { + return err + } + if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Model{}, id).Error; err != nil { return err } @@ -116,12 +130,15 @@ func (s *service) updateModelStateToActive(ctx context.Context, model *models.Mo return err } - if err := tx.Model(&models.Model{}).Where("state = ?", models.ModelVersionStateActive).Update("state", models.ModelVersionStateInactive).Error; err != nil { + if err := tx.Model(&models.Model{}).Where(&models.Model{ + SchedulerID: model.SchedulerID, + State: models.ModelVersionStateActive, + }).Updates(&models.Model{State: models.ModelVersionStateInactive}).Error; err != nil { tx.Rollback() return err } - if err := tx.Model(model).Update("state", models.ModelVersionStateActive).Error; err != nil { + if err := tx.Model(model).Updates(&models.Model{State: models.ModelVersionStateActive}).Error; err != nil { tx.Rollback() return err }