fix: change model state in the same scheduler id (#2537)

* fix: change model state

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: remove objectStorageConfig in manager rpcserver

Signed-off-by: Gaius <gaius.qi@gmail.com>

---------

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-07-11 11:23:44 +08:00 committed by GitHub
parent 28f162a43c
commit 01287f4fe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 67 additions and 65 deletions

View File

@ -214,7 +214,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
}
// Initialize GRPC server
_, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, &cfg.ObjectStorage, options...)
_, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, options...)
if err != nil {
return nil, err
}

View File

@ -35,7 +35,7 @@ const (
// TODO(Gaius) Add regression analysis parameters.
type Model struct {
BaseModel
Name string `gorm:"column:name;type:varchar(256);index:uk_model_name,unique;not null;comment:name" json:"name"`
Name string `gorm:"column:name;type:varchar(256);not null;comment:name" json:"name"`
Type string `gorm:"column:type;type:varchar(256);index:uk_model,unique;not null;comment:type" json:"type"`
BIO string `gorm:"column:bio;type:varchar(1024);comment:biography" json:"bio"`
Version string `gorm:"column:version;type:varchar(256);index:uk_model,unique;not null;comment:model version" json:"version"`

View File

@ -75,25 +75,20 @@ type managerServerV1 struct {
// Object storage interface.
objectStorage objectstorage.ObjectStorage
// Object storage configuration.
objectStorageConfig *config.ObjectStorageConfig
}
// newManagerServerV1 returns v1 version of the manager server.
func newManagerServerV1(
cfg *config.Config, database *database.Database, cache *cache.Cache, peerCache pkgcache.Cache, searcher searcher.Searcher,
objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig,
) managerv1.ManagerServer {
objectStorage objectstorage.ObjectStorage) managerv1.ManagerServer {
return &managerServerV1{
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
peerCache: peerCache,
searcher: searcher,
objectStorage: objectStorage,
objectStorageConfig: objectStorageConfig,
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
peerCache: peerCache,
searcher: searcher,
objectStorage: objectStorage,
}
}
@ -611,17 +606,17 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis
// Get object storage configuration.
func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.GetObjectStorageRequest) (*managerv1.ObjectStorage, error) {
if !s.objectStorageConfig.Enable {
if !s.config.ObjectStorage.Enable {
return nil, status.Error(codes.Internal, "object storage is disabled")
}
return &managerv1.ObjectStorage{
Name: s.objectStorageConfig.Name,
Region: s.objectStorageConfig.Region,
Endpoint: s.objectStorageConfig.Endpoint,
AccessKey: s.objectStorageConfig.AccessKey,
SecretKey: s.objectStorageConfig.SecretKey,
S3ForcePathStyle: s.objectStorageConfig.S3ForcePathStyle,
Name: s.config.ObjectStorage.Name,
Region: s.config.ObjectStorage.Region,
Endpoint: s.config.ObjectStorage.Endpoint,
AccessKey: s.config.ObjectStorage.AccessKey,
SecretKey: s.config.ObjectStorage.SecretKey,
S3ForcePathStyle: s.config.ObjectStorage.S3ForcePathStyle,
}, nil
}
@ -629,13 +624,13 @@ func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.G
func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBucketsRequest) (*managerv1.ListBucketsResponse, error) {
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
if !s.objectStorageConfig.Enable {
if !s.config.ObjectStorage.Enable {
log.Warn("object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}
var pbListBucketsResponse managerv1.ListBucketsResponse
cacheKey := pkgredis.MakeBucketKeyInManager(s.objectStorageConfig.Name)
cacheKey := pkgredis.MakeBucketKeyInManager(s.config.ObjectStorage.Name)
// Cache hit.
if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil {
@ -749,7 +744,7 @@ func (s *managerServerV1) ListApplications(ctx context.Context, req *managerv1.L
func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.CreateModelRequest) (*emptypb.Empty, error) {
log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp())
if !s.objectStorageConfig.Enable {
if !s.config.ObjectStorage.Enable {
log.Warn("object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}

View File

@ -75,25 +75,20 @@ type managerServerV2 struct {
// Object storage interface.
objectStorage objectstorage.ObjectStorage
// Object storage configuration.
objectStorageConfig *config.ObjectStorageConfig
}
// newManagerServerV2 returns v2 version of the manager server.
func newManagerServerV2(
cfg *config.Config, database *database.Database, cache *cache.Cache, peerCache pkgcache.Cache, searcher searcher.Searcher,
objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig,
) managerv2.ManagerServer {
objectStorage objectstorage.ObjectStorage) managerv2.ManagerServer {
return &managerServerV2{
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
peerCache: peerCache,
searcher: searcher,
objectStorage: objectStorage,
objectStorageConfig: objectStorageConfig,
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
peerCache: peerCache,
searcher: searcher,
objectStorage: objectStorage,
}
}
@ -610,29 +605,29 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis
// Get object storage configuration.
func (s *managerServerV2) GetObjectStorage(ctx context.Context, req *managerv2.GetObjectStorageRequest) (*managerv2.ObjectStorage, error) {
if !s.objectStorageConfig.Enable {
if !s.config.ObjectStorage.Enable {
return nil, status.Error(codes.NotFound, "object storage is disabled")
}
return &managerv2.ObjectStorage{
Name: s.objectStorageConfig.Name,
Region: s.objectStorageConfig.Region,
Endpoint: s.objectStorageConfig.Endpoint,
AccessKey: s.objectStorageConfig.AccessKey,
SecretKey: s.objectStorageConfig.SecretKey,
S3ForcePathStyle: s.objectStorageConfig.S3ForcePathStyle,
Name: s.config.ObjectStorage.Name,
Region: s.config.ObjectStorage.Region,
Endpoint: s.config.ObjectStorage.Endpoint,
AccessKey: s.config.ObjectStorage.AccessKey,
SecretKey: s.config.ObjectStorage.SecretKey,
S3ForcePathStyle: s.config.ObjectStorage.S3ForcePathStyle,
}, nil
}
// List buckets configuration.
func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBucketsRequest) (*managerv2.ListBucketsResponse, error) {
if !s.objectStorageConfig.Enable {
if !s.config.ObjectStorage.Enable {
return nil, status.Error(codes.NotFound, "object storage is disabled")
}
log := logger.WithHostnameAndIP(req.Hostname, req.Ip)
var pbListBucketsResponse managerv2.ListBucketsResponse
cacheKey := pkgredis.MakeBucketKeyInManager(s.objectStorageConfig.Name)
cacheKey := pkgredis.MakeBucketKeyInManager(s.config.ObjectStorage.Name)
// Cache hit.
if err := s.cache.Get(ctx, cacheKey, &pbListBucketsResponse); err != nil {
@ -746,7 +741,7 @@ func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.L
func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.CreateModelRequest) (*emptypb.Empty, error) {
log := logger.WithHostnameAndIP(req.GetHostname(), req.GetIp())
if !s.objectStorageConfig.Enable {
if !s.config.ObjectStorage.Enable {
log.Warn("object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}

View File

@ -81,9 +81,6 @@ type Server struct {
// Object storage interface.
objectStorage objectstorage.ObjectStorage
// Object storage configuration.
objectStorageConfig *config.ObjectStorageConfig
// serverOptions is server options of grpc.
serverOptions []grpc.ServerOption
@ -123,17 +120,15 @@ func WithGRPCServerOptions(opts []grpc.ServerOption) Option {
// New returns a new manager server from the given options.
func New(
cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher,
objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, opts ...Option,
) (*Server, *grpc.Server, error) {
objectStorage objectstorage.ObjectStorage, opts ...Option) (*Server, *grpc.Server, error) {
s := &Server{
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
peerCache: pkgcache.New(DefaultPeerCacheExpiration, DefaultPeerCacheCleanupInterval),
searcher: searcher,
objectStorage: objectStorage,
objectStorageConfig: objectStorageConfig,
config: cfg,
db: database.DB,
rdb: database.RDB,
cache: cache,
peerCache: pkgcache.New(DefaultPeerCacheExpiration, DefaultPeerCacheCleanupInterval),
searcher: searcher,
objectStorage: objectStorage,
}
// Peer cache is evicted, and the metrics of the peer should be released.
@ -150,8 +145,8 @@ func New(
}
return s, managerserver.New(
newManagerServerV1(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage, s.objectStorageConfig),
newManagerServerV2(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage, s.objectStorageConfig),
newManagerServerV1(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage),
newManagerServerV2(s.config, database, s.cache, s.peerCache, s.searcher, s.objectStorage),
newSecurityServerV1(s.selfSignedCert),
s.serverOptions...), nil
}

View File

@ -38,6 +38,20 @@ func (s *service) DestroyModel(ctx context.Context, id uint) error {
return err
}
// If the model is active, return an error.
if model.State == models.ModelVersionStateActive {
return errors.New("cannot delete an active model")
}
version, err := strconv.Atoi(model.Version)
if err != nil {
return err
}
if err := s.objectStorage.DeleteObject(ctx, s.config.Trainer.BucketName, types.MakeObjectKeyOfModelFile(model.Name, version)); err != nil {
return err
}
if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Model{}, id).Error; err != nil {
return err
}
@ -116,12 +130,15 @@ func (s *service) updateModelStateToActive(ctx context.Context, model *models.Mo
return err
}
if err := tx.Model(&models.Model{}).Where("state = ?", models.ModelVersionStateActive).Update("state", models.ModelVersionStateInactive).Error; err != nil {
if err := tx.Model(&models.Model{}).Where(&models.Model{
SchedulerID: model.SchedulerID,
State: models.ModelVersionStateActive,
}).Updates(&models.Model{State: models.ModelVersionStateInactive}).Error; err != nil {
tx.Rollback()
return err
}
if err := tx.Model(model).Update("state", models.ModelVersionStateActive).Error; err != nil {
if err := tx.Model(model).Updates(&models.Model{State: models.ModelVersionStateActive}).Error; err != nil {
tx.Rollback()
return err
}