feat: correct grpc error code and implement StatPeer and LeavePeer (#2115)

Correct grpc error from codes.Unknow to codes.Internal in gorm
operation. Implement StatPeer and LeavePeer interface in v2 version of
grpc.

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-02-27 16:50:43 +08:00
parent 1b9353ba8d
commit 7306aec508
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
7 changed files with 619 additions and 105 deletions

View File

@ -113,7 +113,7 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee
HostName: req.HostName,
SeedPeerClusterID: uint(req.SeedPeerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
// Marshal config of seed peer cluster.
@ -184,7 +184,8 @@ func (s *managerServerV1) UpdateSeedPeer(ctx context.Context, req *managerv1.Upd
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createSeedPeer(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.db.WithContext(ctx).Model(&seedPeer).Updates(model.SeedPeer{
@ -197,7 +198,7 @@ func (s *managerServerV1) UpdateSeedPeer(ctx context.Context, req *managerv1.Upd
ObjectStoragePort: req.ObjectStoragePort,
SeedPeerClusterID: uint(req.SeedPeerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -237,7 +238,7 @@ func (s *managerServerV1) createSeedPeer(ctx context.Context, req *managerv1.Upd
}
if err := s.db.WithContext(ctx).Create(&seedPeer).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv1.SeedPeer{
@ -276,7 +277,7 @@ func (s *managerServerV1) GetScheduler(ctx context.Context, req *managerv1.GetSc
HostName: req.HostName,
SchedulerClusterID: uint(req.SchedulerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
// Marshal config of scheduler.
@ -373,7 +374,8 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createScheduler(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.db.WithContext(ctx).Model(&scheduler).Updates(model.Scheduler{
@ -383,7 +385,7 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up
Port: req.Port,
SchedulerClusterID: uint(req.SchedulerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -417,7 +419,7 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up
}
if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv1.Scheduler{
@ -463,7 +465,7 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis
log.Debugf("%s cache miss", cacheKey)
var schedulerClusters []model.SchedulerCluster
if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo)
@ -541,7 +543,7 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis
// Get object storage configuration.
func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.GetObjectStorageRequest) (*managerv1.ObjectStorage, error) {
if !s.objectStorageConfig.Enable {
return nil, status.Error(codes.NotFound, "object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}
return &managerv1.ObjectStorage{
@ -557,7 +559,7 @@ func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.G
// List buckets configuration.
func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBucketsRequest) (*managerv1.ListBucketsResponse, error) {
if !s.objectStorageConfig.Enable {
return nil, status.Error(codes.NotFound, "object storage is disabled")
return nil, status.Error(codes.Internal, "object storage is disabled")
}
log := logger.WithHostnameAndIP(req.HostName, req.Ip)
@ -574,7 +576,7 @@ func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBu
log.Debugf("%s cache miss", cacheKey)
buckets, err := s.objectStorage.ListBucketMetadatas(ctx)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
// Construct schedulers.
@ -601,7 +603,7 @@ func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBu
func (s *managerServerV1) ListModels(ctx context.Context, req *managerv1.ListModelsRequest) (*managerv1.ListModelsResponse, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
models := []*managerv1.Model{}
@ -609,7 +611,7 @@ func (s *managerServerV1) ListModels(ctx context.Context, req *managerv1.ListMod
for iter.Next(ctx) {
var model types.Model
if err := s.rdb.Get(ctx, iter.Val()).Scan(&model); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
models = append(models, &managerv1.Model{
@ -633,12 +635,12 @@ func (s *managerServerV1) ListModels(ctx context.Context, req *managerv1.ListMod
func (s *managerServerV1) GetModel(ctx context.Context, req *managerv1.GetModelRequest) (*managerv1.Model, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
var model types.Model
if err := s.rdb.Get(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Scan(&model); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv1.Model{
@ -657,7 +659,7 @@ func (s *managerServerV1) GetModel(ctx context.Context, req *managerv1.GetModelR
func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.CreateModelRequest) (*managerv1.Model, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
model := types.Model{
@ -672,7 +674,7 @@ func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.Create
}
if _, err := s.rdb.Set(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, model.ID), &model, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv1.Model{
@ -691,7 +693,7 @@ func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.Create
func (s *managerServerV1) UpdateModel(ctx context.Context, req *managerv1.UpdateModelRequest) (*managerv1.Model, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
model, err := s.GetModel(ctx, &managerv1.GetModelRequest{
@ -699,7 +701,7 @@ func (s *managerServerV1) UpdateModel(ctx context.Context, req *managerv1.Update
ModelId: req.ModelId,
})
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
model.VersionId = req.VersionId
@ -715,7 +717,7 @@ func (s *managerServerV1) UpdateModel(ctx context.Context, req *managerv1.Update
CreatedAt: model.CreatedAt.AsTime(),
UpdatedAt: model.UpdatedAt.AsTime(),
}, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return model, nil
@ -727,16 +729,16 @@ func (s *managerServerV1) DeleteModel(ctx context.Context, req *managerv1.Delete
SchedulerId: req.SchedulerId,
ModelId: req.ModelId,
}); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if _, err := s.rdb.Del(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return nil, nil
@ -746,7 +748,7 @@ func (s *managerServerV1) DeleteModel(ctx context.Context, req *managerv1.Delete
func (s *managerServerV1) ListModelVersions(ctx context.Context, req *managerv1.ListModelVersionsRequest) (*managerv1.ListModelVersionsResponse, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersions := []*managerv1.ModelVersion{}
@ -754,7 +756,7 @@ func (s *managerServerV1) ListModelVersions(ctx context.Context, req *managerv1.
for iter.Next(ctx) {
var modelVersion types.ModelVersion
if err := s.rdb.Get(ctx, iter.Val()).Scan(&modelVersion); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersions = append(modelVersions, &managerv1.ModelVersion{
@ -778,12 +780,12 @@ func (s *managerServerV1) ListModelVersions(ctx context.Context, req *managerv1.
func (s *managerServerV1) GetModelVersion(ctx context.Context, req *managerv1.GetModelVersionRequest) (*managerv1.ModelVersion, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
var modelVersion types.ModelVersion
if err := s.rdb.Get(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Scan(&modelVersion); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv1.ModelVersion{
@ -802,7 +804,7 @@ func (s *managerServerV1) GetModelVersion(ctx context.Context, req *managerv1.Ge
func (s *managerServerV1) CreateModelVersion(ctx context.Context, req *managerv1.CreateModelVersionRequest) (*managerv1.ModelVersion, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersion := types.ModelVersion{
@ -817,7 +819,7 @@ func (s *managerServerV1) CreateModelVersion(ctx context.Context, req *managerv1
}
if _, err := s.rdb.Set(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, modelVersion.ID), &modelVersion, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv1.ModelVersion{
@ -836,7 +838,7 @@ func (s *managerServerV1) CreateModelVersion(ctx context.Context, req *managerv1
func (s *managerServerV1) UpdateModelVersion(ctx context.Context, req *managerv1.UpdateModelVersionRequest) (*managerv1.ModelVersion, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersion, err := s.GetModelVersion(ctx, &managerv1.GetModelVersionRequest{
@ -845,7 +847,7 @@ func (s *managerServerV1) UpdateModelVersion(ctx context.Context, req *managerv1
VersionId: req.VersionId,
})
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if req.Mae > 0 {
@ -880,7 +882,7 @@ func (s *managerServerV1) UpdateModelVersion(ctx context.Context, req *managerv1
CreatedAt: modelVersion.CreatedAt.AsTime(),
UpdatedAt: modelVersion.UpdatedAt.AsTime(),
}, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return modelVersion, nil
@ -893,16 +895,16 @@ func (s *managerServerV1) DeleteModelVersion(ctx context.Context, req *managerv1
ModelId: req.ModelId,
VersionId: req.VersionId,
}); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if _, err := s.rdb.Del(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return nil, nil
@ -986,7 +988,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er
req, err := stream.Recv()
if err != nil {
logger.Errorf("keepalive failed for the first time: %s", err.Error())
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
hostName := req.HostName
ip := req.Ip
@ -1005,7 +1007,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er
}).Updates(model.Scheduler{
State: model.SchedulerStateActive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -1025,7 +1027,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er
}).Updates(model.SeedPeer{
State: model.SeedPeerStateActive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -1048,7 +1050,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er
}).Updates(model.Scheduler{
State: model.SchedulerStateInactive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -1068,7 +1070,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er
}).Updates(model.SeedPeer{
State: model.SeedPeerStateInactive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(

View File

@ -113,7 +113,7 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee
HostName: req.HostName,
SeedPeerClusterID: uint(req.SeedPeerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
// Marshal config of seed peer cluster.
@ -184,7 +184,8 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createSeedPeer(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.db.WithContext(ctx).Model(&seedPeer).Updates(model.SeedPeer{
@ -197,7 +198,7 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd
ObjectStoragePort: req.ObjectStoragePort,
SeedPeerClusterID: uint(req.SeedPeerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -237,7 +238,7 @@ func (s *managerServerV2) createSeedPeer(ctx context.Context, req *managerv2.Upd
}
if err := s.db.WithContext(ctx).Create(&seedPeer).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv2.SeedPeer{
@ -276,7 +277,7 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc
HostName: req.HostName,
SchedulerClusterID: uint(req.SchedulerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
// Marshal config of scheduler.
@ -373,7 +374,8 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createScheduler(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.db.WithContext(ctx).Model(&scheduler).Updates(model.Scheduler{
@ -383,7 +385,7 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up
Port: req.Port,
SchedulerClusterID: uint(req.SchedulerClusterId),
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -417,7 +419,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up
}
if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv2.Scheduler{
@ -463,7 +465,7 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis
log.Debugf("%s cache miss", cacheKey)
var schedulerClusters []model.SchedulerCluster
if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo)
@ -573,7 +575,7 @@ func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBu
log.Debugf("%s cache miss", cacheKey)
buckets, err := s.objectStorage.ListBucketMetadatas(ctx)
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
// Construct schedulers.
@ -600,7 +602,7 @@ func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBu
func (s *managerServerV2) ListModels(ctx context.Context, req *managerv2.ListModelsRequest) (*managerv2.ListModelsResponse, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
models := []*managerv2.Model{}
@ -608,7 +610,7 @@ func (s *managerServerV2) ListModels(ctx context.Context, req *managerv2.ListMod
for iter.Next(ctx) {
var model types.Model
if err := s.rdb.Get(ctx, iter.Val()).Scan(&model); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
models = append(models, &managerv2.Model{
@ -632,12 +634,12 @@ func (s *managerServerV2) ListModels(ctx context.Context, req *managerv2.ListMod
func (s *managerServerV2) GetModel(ctx context.Context, req *managerv2.GetModelRequest) (*managerv2.Model, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
var model types.Model
if err := s.rdb.Get(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Scan(&model); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv2.Model{
@ -656,7 +658,7 @@ func (s *managerServerV2) GetModel(ctx context.Context, req *managerv2.GetModelR
func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.CreateModelRequest) (*managerv2.Model, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
model := types.Model{
@ -671,7 +673,7 @@ func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.Create
}
if _, err := s.rdb.Set(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, model.ID), &model, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv2.Model{
@ -690,7 +692,7 @@ func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.Create
func (s *managerServerV2) UpdateModel(ctx context.Context, req *managerv2.UpdateModelRequest) (*managerv2.Model, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
model, err := s.GetModel(ctx, &managerv2.GetModelRequest{
@ -698,7 +700,7 @@ func (s *managerServerV2) UpdateModel(ctx context.Context, req *managerv2.Update
ModelId: req.ModelId,
})
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
model.VersionId = req.VersionId
@ -714,7 +716,7 @@ func (s *managerServerV2) UpdateModel(ctx context.Context, req *managerv2.Update
CreatedAt: model.CreatedAt.AsTime(),
UpdatedAt: model.UpdatedAt.AsTime(),
}, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return model, nil
@ -726,16 +728,16 @@ func (s *managerServerV2) DeleteModel(ctx context.Context, req *managerv2.Delete
SchedulerId: req.SchedulerId,
ModelId: req.ModelId,
}); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if _, err := s.rdb.Del(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return nil, nil
@ -745,7 +747,7 @@ func (s *managerServerV2) DeleteModel(ctx context.Context, req *managerv2.Delete
func (s *managerServerV2) ListModelVersions(ctx context.Context, req *managerv2.ListModelVersionsRequest) (*managerv2.ListModelVersionsResponse, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersions := []*managerv2.ModelVersion{}
@ -753,7 +755,7 @@ func (s *managerServerV2) ListModelVersions(ctx context.Context, req *managerv2.
for iter.Next(ctx) {
var modelVersion types.ModelVersion
if err := s.rdb.Get(ctx, iter.Val()).Scan(&modelVersion); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersions = append(modelVersions, &managerv2.ModelVersion{
@ -777,12 +779,12 @@ func (s *managerServerV2) ListModelVersions(ctx context.Context, req *managerv2.
func (s *managerServerV2) GetModelVersion(ctx context.Context, req *managerv2.GetModelVersionRequest) (*managerv2.ModelVersion, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
var modelVersion types.ModelVersion
if err := s.rdb.Get(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Scan(&modelVersion); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv2.ModelVersion{
@ -801,7 +803,7 @@ func (s *managerServerV2) GetModelVersion(ctx context.Context, req *managerv2.Ge
func (s *managerServerV2) CreateModelVersion(ctx context.Context, req *managerv2.CreateModelVersionRequest) (*managerv2.ModelVersion, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersion := types.ModelVersion{
@ -816,7 +818,7 @@ func (s *managerServerV2) CreateModelVersion(ctx context.Context, req *managerv2
}
if _, err := s.rdb.Set(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, modelVersion.ID), &modelVersion, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return &managerv2.ModelVersion{
@ -835,7 +837,7 @@ func (s *managerServerV2) CreateModelVersion(ctx context.Context, req *managerv2
func (s *managerServerV2) UpdateModelVersion(ctx context.Context, req *managerv2.UpdateModelVersionRequest) (*managerv2.ModelVersion, error) {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
modelVersion, err := s.GetModelVersion(ctx, &managerv2.GetModelVersionRequest{
@ -844,7 +846,7 @@ func (s *managerServerV2) UpdateModelVersion(ctx context.Context, req *managerv2
VersionId: req.VersionId,
})
if err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if req.Mae > 0 {
@ -879,7 +881,7 @@ func (s *managerServerV2) UpdateModelVersion(ctx context.Context, req *managerv2
CreatedAt: modelVersion.CreatedAt.AsTime(),
UpdatedAt: modelVersion.UpdatedAt.AsTime(),
}, 0).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return modelVersion, nil
@ -892,16 +894,16 @@ func (s *managerServerV2) DeleteModelVersion(ctx context.Context, req *managerv2
ModelId: req.ModelId,
VersionId: req.VersionId,
}); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if _, err := s.rdb.Del(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Result(); err != nil {
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
return nil, nil
@ -927,7 +929,7 @@ func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.L
return nil, status.Error(codes.NotFound, err.Error())
}
return nil, status.Error(codes.Unknown, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if len(applications) == 0 {
@ -985,7 +987,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
req, err := stream.Recv()
if err != nil {
logger.Errorf("keepalive failed for the first time: %s", err.Error())
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
hostName := req.HostName
ip := req.Ip
@ -1004,7 +1006,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
}).Updates(model.Scheduler{
State: model.SchedulerStateActive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -1024,7 +1026,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
}).Updates(model.SeedPeer{
State: model.SeedPeerStateActive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -1047,7 +1049,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
}).Updates(model.Scheduler{
State: model.SchedulerStateInactive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(
@ -1067,7 +1069,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er
}).Updates(model.SeedPeer{
State: model.SeedPeerStateInactive,
}).Error; err != nil {
return status.Error(codes.Unknown, err.Error())
return status.Error(codes.Internal, err.Error())
}
if err := s.cache.Delete(

View File

@ -46,6 +46,48 @@ var (
// Variables declared for metrics.
var (
AnnouncePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peer_total",
Help: "Counter of the number of the announcing peer.",
}, []string{"tag", "app"})
AnnouncePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "announce_peer_failure_total",
Help: "Counter of the number of failed of the announcing peer.",
}, []string{"tag", "app"})
StatPeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_peer_total",
Help: "Counter of the number of the stat peer.",
})
StatPeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "stat_peer_failure_total",
Help: "Counter of the number of failed of the stat peer.",
})
LeavePeerCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "leave_peer_total",
Help: "Counter of the number of the leaving peer.",
})
LeavePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "leave_peer_failure_total",
Help: "Counter of the number of failed of the leaving peer.",
})
RegisterTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,

View File

@ -25,6 +25,7 @@ import (
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
@ -54,14 +55,27 @@ func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePe
return nil
}
// Checks information of peer.
// StatPeer checks information of peer.
func (s *schedulerServerV2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) {
return nil, nil
metrics.StatPeerCount.Inc()
peer, err := s.service.StatPeer(ctx, req)
if err != nil {
metrics.StatPeerFailureCount.Inc()
return nil, err
}
return peer, nil
}
// LeavePeer releases peer in scheduler.
func (s *schedulerServerV2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) (*emptypb.Empty, error) {
return nil, nil
metrics.LeavePeerCount.Inc()
if err := s.service.LeavePeer(ctx, req); err != nil {
metrics.LeavePeerFailureCount.Inc()
return new(emptypb.Empty), err
}
return new(emptypb.Empty), nil
}
// TODO exchange peer api definition.

View File

@ -198,6 +198,16 @@ var (
}
mockURLMetaRange = "0-9"
mockPieceMD5 = digest.New(digest.AlgorithmMD5, "86d3f3a95c324c9479bd8986968f4327")
mockPiece = resource.Piece{
Number: 1,
ParentID: "foo",
Offset: 2,
Length: 10,
Digest: mockPieceMD5,
TrafficType: commonv2.TrafficType_REMOTE_PEER,
Cost: 1 * time.Minute,
CreatedAt: time.Now(),
}
)
func TestService_NewV1(t *testing.T) {
@ -227,7 +237,7 @@ func TestService_NewV1(t *testing.T) {
}
}
func TestService_RegisterPeerTask(t *testing.T) {
func TestServiceV1_RegisterPeerTask(t *testing.T) {
tests := []struct {
name string
req *schedulerv1.PeerTaskRequest
@ -911,7 +921,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
}
}
func TestService_ReportPieceResult(t *testing.T) {
func TestServiceV1_ReportPieceResult(t *testing.T) {
tests := []struct {
name string
mock func(
@ -1164,7 +1174,7 @@ func TestService_ReportPieceResult(t *testing.T) {
}
}
func TestService_ReportPeerResult(t *testing.T) {
func TestServiceV1_ReportPeerResult(t *testing.T) {
tests := []struct {
name string
req *schedulerv1.PeerResult
@ -1361,7 +1371,7 @@ func TestService_ReportPeerResult(t *testing.T) {
}
}
func TestService_StatTask(t *testing.T) {
func TestServiceV1_StatTask(t *testing.T) {
tests := []struct {
name string
mock func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder)
@ -1423,7 +1433,7 @@ func TestService_StatTask(t *testing.T) {
}
}
func TestService_AnnounceTask(t *testing.T) {
func TestServiceV1_AnnounceTask(t *testing.T) {
tests := []struct {
name string
req *schedulerv1.AnnounceTaskRequest
@ -1722,7 +1732,7 @@ func TestService_AnnounceTask(t *testing.T) {
}
}
func TestService_LeaveTask(t *testing.T) {
func TestServiceV1_LeaveTask(t *testing.T) {
tests := []struct {
name string
mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
@ -1920,7 +1930,7 @@ func TestService_LeaveTask(t *testing.T) {
}
}
func TestService_LeaveHost(t *testing.T) {
func TestServiceV1_LeaveHost(t *testing.T) {
tests := []struct {
name string
mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder)
@ -2137,7 +2147,7 @@ func TestService_LeaveHost(t *testing.T) {
}
}
func TestService_triggerTask(t *testing.T) {
func TestServiceV1_triggerTask(t *testing.T) {
tests := []struct {
name string
config *config.Config
@ -2604,7 +2614,7 @@ func TestService_triggerTask(t *testing.T) {
}
}
func TestService_storeTask(t *testing.T) {
func TestServiceV1_storeTask(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T, svc *V1, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder)
@ -2698,7 +2708,7 @@ func TestService_storeTask(t *testing.T) {
}
}
func TestService_storeHost(t *testing.T) {
func TestServiceV1_storeHost(t *testing.T) {
tests := []struct {
name string
peerHost *schedulerv1.PeerHost
@ -2783,7 +2793,7 @@ func TestService_storeHost(t *testing.T) {
}
}
func TestService_storePeer(t *testing.T) {
func TestServiceV1_storePeer(t *testing.T) {
tests := []struct {
name string
run func(t *testing.T, svc *V1, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
@ -2863,7 +2873,7 @@ func TestService_storePeer(t *testing.T) {
}
}
func TestService_triggerSeedPeerTask(t *testing.T) {
func TestServiceV1_triggerSeedPeerTask(t *testing.T) {
tests := []struct {
name string
mock func(task *resource.Task, peer *resource.Peer, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder)
@ -2929,7 +2939,7 @@ func TestService_triggerSeedPeerTask(t *testing.T) {
}
}
func TestService_handleBeginOfPiece(t *testing.T) {
func TestServiceV1_handleBeginOfPiece(t *testing.T) {
tests := []struct {
name string
mock func(peer *resource.Peer, scheduling *mocks.MockSchedulingMockRecorder)
@ -3010,7 +3020,7 @@ func TestService_handleBeginOfPiece(t *testing.T) {
}
}
func TestService_handlePieceSuccess(t *testing.T) {
func TestServiceV1_handlePieceSuccess(t *testing.T) {
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -3145,7 +3155,7 @@ func TestService_handlePieceSuccess(t *testing.T) {
}
}
func TestService_handlePieceFail(t *testing.T) {
func TestServiceV1_handlePieceFail(t *testing.T) {
tests := []struct {
name string
@ -3339,7 +3349,7 @@ func TestService_handlePieceFail(t *testing.T) {
}
}
func TestService_handlePeerSuccess(t *testing.T) {
func TestServiceV1_handlePeerSuccess(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if _, err := w.Write([]byte{1}); err != nil {
w.WriteHeader(http.StatusInternalServerError)
@ -3464,7 +3474,7 @@ func TestService_handlePeerSuccess(t *testing.T) {
}
}
func TestService_handlePeerFail(t *testing.T) {
func TestServiceV1_handlePeerFail(t *testing.T) {
tests := []struct {
name string
@ -3545,7 +3555,7 @@ func TestService_handlePeerFail(t *testing.T) {
}
}
func TestService_handleTaskSuccess(t *testing.T) {
func TestServiceV1_handleTaskSuccess(t *testing.T) {
tests := []struct {
name string
result *schedulerv1.PeerResult
@ -3626,7 +3636,7 @@ func TestService_handleTaskSuccess(t *testing.T) {
}
}
func TestService_handleTaskFail(t *testing.T) {
func TestServiceV1_handleTaskFail(t *testing.T) {
rst := status.Newf(codes.Aborted, "response is not valid")
st, err := rst.WithDetails(&errordetailsv1.SourceError{Temporary: false})
if err != nil {

View File

@ -18,11 +18,19 @@ package service
import (
"context"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
commonv2 "d7y.io/api/pkg/apis/common/v2"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
@ -69,13 +77,196 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
return nil
}
// Checks information of peer.
// StatPeer checks information of peer.
func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) {
return nil, nil
logger.WithTaskID(req.TaskId).Infof("stat peer request: %#v", req)
peer, loaded := v.resource.PeerManager().Load(req.PeerId)
if !loaded {
return nil, status.Errorf(codes.NotFound, "peer %s not found", req.PeerId)
}
resp := &commonv2.Peer{
Id: peer.ID,
Priority: peer.Priority,
Cost: durationpb.New(peer.Cost.Load()),
State: peer.FSM.Current(),
NeedBackToSource: peer.NeedBackToSource.Load(),
CreatedAt: timestamppb.New(peer.CreatedAt.Load()),
UpdatedAt: timestamppb.New(peer.UpdatedAt.Load()),
}
// Set range to response.
if peer.Range != nil {
resp.Range = &commonv2.Range{
Start: peer.Range.Start,
Length: peer.Range.Length,
}
}
// Set pieces to response.
peer.Pieces.Range(func(key, value any) bool {
piece, ok := value.(*resource.Piece)
if !ok {
peer.Log.Errorf("invalid piece %s %#v", key, value)
return true
}
respPiece := &commonv2.Piece{
Number: piece.Number,
ParentId: piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
Cost: durationpb.New(piece.Cost),
CreatedAt: timestamppb.New(piece.CreatedAt),
}
if piece.Digest != nil {
respPiece.Digest = piece.Digest.String()
}
resp.Pieces = append(resp.Pieces, respPiece)
return true
})
// Set task to response.
resp.Task = &commonv2.Task{
Id: peer.Task.ID,
Type: peer.Task.Type,
Url: peer.Task.URL,
Tag: peer.Task.Tag,
Application: peer.Task.Application,
Filters: peer.Task.Filters,
Header: peer.Task.Header,
PieceLength: peer.Task.PieceLength,
ContentLength: peer.Task.ContentLength.Load(),
PieceCount: peer.Task.TotalPieceCount.Load(),
SizeScope: peer.Task.SizeScope(),
State: peer.Task.FSM.Current(),
PeerCount: int32(peer.Task.PeerCount()),
CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()),
}
// Set digest to response.
if peer.Task.Digest != nil {
resp.Task.Digest = peer.Task.Digest.String()
}
// Set pieces to response.
peer.Task.Pieces.Range(func(key, value any) bool {
piece, ok := value.(*resource.Piece)
if !ok {
peer.Task.Log.Errorf("invalid piece %s %#v", key, value)
return true
}
respPiece := &commonv2.Piece{
Number: piece.Number,
ParentId: piece.ParentID,
Offset: piece.Offset,
Length: piece.Length,
TrafficType: piece.TrafficType,
Cost: durationpb.New(piece.Cost),
CreatedAt: timestamppb.New(piece.CreatedAt),
}
if piece.Digest != nil {
respPiece.Digest = piece.Digest.String()
}
resp.Task.Pieces = append(resp.Task.Pieces, respPiece)
return true
})
// Set host to response.
resp.Host = &commonv2.Host{
Id: peer.Host.ID,
Type: uint32(peer.Host.Type),
Hostname: peer.Host.Hostname,
Ip: peer.Host.IP,
Port: peer.Host.Port,
DownloadPort: peer.Host.DownloadPort,
Os: peer.Host.OS,
Platform: peer.Host.Platform,
PlatformFamily: peer.Host.PlatformFamily,
PlatformVersion: peer.Host.PlatformVersion,
KernelVersion: peer.Host.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: peer.Host.CPU.LogicalCount,
PhysicalCount: peer.Host.CPU.PhysicalCount,
Percent: peer.Host.CPU.Percent,
ProcessPercent: peer.Host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: peer.Host.CPU.Times.User,
System: peer.Host.CPU.Times.System,
Idle: peer.Host.CPU.Times.Idle,
Nice: peer.Host.CPU.Times.Nice,
Iowait: peer.Host.CPU.Times.Iowait,
Irq: peer.Host.CPU.Times.Irq,
Softirq: peer.Host.CPU.Times.Softirq,
Steal: peer.Host.CPU.Times.Steal,
Guest: peer.Host.CPU.Times.Guest,
GuestNice: peer.Host.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: peer.Host.Memory.Total,
Available: peer.Host.Memory.Available,
Used: peer.Host.Memory.Used,
UsedPercent: peer.Host.Memory.UsedPercent,
ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
Free: peer.Host.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: peer.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
SecurityDomain: peer.Host.Network.SecurityDomain,
Location: peer.Host.Network.Location,
Idc: peer.Host.Network.IDC,
},
Disk: &commonv2.Disk{
Total: peer.Host.Disk.Total,
Free: peer.Host.Disk.Free,
Used: peer.Host.Disk.Used,
UsedPercent: peer.Host.Disk.UsedPercent,
InodesTotal: peer.Host.Disk.InodesTotal,
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: peer.Host.Build.GitVersion,
GitCommit: peer.Host.Build.GitCommit,
GoVersion: peer.Host.Build.GoVersion,
Platform: peer.Host.Build.Platform,
},
}
return resp, nil
}
// LeavePeer releases peer in scheduler.
func (v *V2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) error {
logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("leave peer request: %#v", req)
peer, loaded := v.resource.PeerManager().Load(req.PeerId)
if !loaded {
metrics.LeavePeerFailureCount.Inc()
msg := fmt.Sprintf("peer %s not found", req.PeerId)
logger.Error(msg)
return status.Error(codes.NotFound, msg)
}
metrics.LeavePeerCount.Inc()
if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil {
metrics.LeavePeerFailureCount.Inc()
msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg)
return status.Error(codes.FailedPrecondition, msg)
}
return nil
}

View File

@ -17,11 +17,19 @@
package service
import (
"context"
"reflect"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
commonv2 "d7y.io/api/pkg/apis/common/v2"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
"d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
@ -56,3 +64,248 @@ func TestService_NewV2(t *testing.T) {
})
}
}
func TestServiceV2_StatPeer(t *testing.T) {
tests := []struct {
name string
mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
expect func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error)
}{
{
name: "peer not found",
mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Any()).Return(nil, false).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID))
},
},
{
name: "peer has been loaded",
mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
peer.StorePiece(&mockPiece)
peer.Task.StorePiece(&mockPiece)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Any()).Return(peer, true).Times(1),
)
},
expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) {
assert := assert.New(t)
assert.EqualValues(resp, &commonv2.Peer{
Id: peer.ID,
Range: &commonv2.Range{
Start: peer.Range.Start,
Length: peer.Range.Length,
},
Priority: peer.Priority,
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
},
Cost: durationpb.New(peer.Cost.Load()),
State: peer.FSM.Current(),
Task: &commonv2.Task{
Id: peer.Task.ID,
Type: peer.Task.Type,
Url: peer.Task.URL,
Digest: peer.Task.Digest.String(),
Tag: peer.Task.Tag,
Application: peer.Task.Application,
Filters: peer.Task.Filters,
Header: peer.Task.Header,
PieceLength: peer.Task.PieceLength,
ContentLength: peer.Task.ContentLength.Load(),
PieceCount: peer.Task.TotalPieceCount.Load(),
SizeScope: peer.Task.SizeScope(),
Pieces: []*commonv2.Piece{
{
Number: mockPiece.Number,
ParentId: mockPiece.ParentID,
Offset: mockPiece.Offset,
Length: mockPiece.Length,
Digest: mockPiece.Digest.String(),
TrafficType: mockPiece.TrafficType,
Cost: durationpb.New(mockPiece.Cost),
CreatedAt: timestamppb.New(mockPiece.CreatedAt),
},
},
State: peer.Task.FSM.Current(),
PeerCount: int32(peer.Task.PeerCount()),
CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()),
},
Host: &commonv2.Host{
Id: peer.Host.ID,
Type: uint32(peer.Host.Type),
Hostname: peer.Host.Hostname,
Ip: peer.Host.IP,
Port: peer.Host.Port,
DownloadPort: peer.Host.DownloadPort,
Os: peer.Host.OS,
Platform: peer.Host.Platform,
PlatformFamily: peer.Host.PlatformFamily,
PlatformVersion: peer.Host.PlatformVersion,
KernelVersion: peer.Host.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: peer.Host.CPU.LogicalCount,
PhysicalCount: peer.Host.CPU.PhysicalCount,
Percent: peer.Host.CPU.Percent,
ProcessPercent: peer.Host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: peer.Host.CPU.Times.User,
System: peer.Host.CPU.Times.System,
Idle: peer.Host.CPU.Times.Idle,
Nice: peer.Host.CPU.Times.Nice,
Iowait: peer.Host.CPU.Times.Iowait,
Irq: peer.Host.CPU.Times.Irq,
Softirq: peer.Host.CPU.Times.Softirq,
Steal: peer.Host.CPU.Times.Steal,
Guest: peer.Host.CPU.Times.Guest,
GuestNice: peer.Host.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: peer.Host.Memory.Total,
Available: peer.Host.Memory.Available,
Used: peer.Host.Memory.Used,
UsedPercent: peer.Host.Memory.UsedPercent,
ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
Free: peer.Host.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: peer.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
SecurityDomain: peer.Host.Network.SecurityDomain,
Location: peer.Host.Network.Location,
Idc: peer.Host.Network.IDC,
},
Disk: &commonv2.Disk{
Total: peer.Host.Disk.Total,
Free: peer.Host.Disk.Free,
Used: peer.Host.Disk.Used,
UsedPercent: peer.Host.Disk.UsedPercent,
InodesTotal: peer.Host.Disk.InodesTotal,
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: peer.Host.Build.GitVersion,
GitCommit: peer.Host.Build.GitCommit,
GoVersion: peer.Host.Build.GoVersion,
Platform: peer.Host.Build.Platform,
},
},
NeedBackToSource: peer.NeedBackToSource.Load(),
CreatedAt: timestamppb.New(peer.CreatedAt.Load()),
UpdatedAt: timestamppb.New(peer.UpdatedAt.Load()),
})
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT())
resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})
tc.expect(t, peer, resp, err)
})
}
}
func TestService_LeavePeer(t *testing.T) {
tests := []struct {
name string
mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder)
expect func(t *testing.T, err error)
}{
{
name: "peer not found",
mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Any()).Return(nil, false).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID))
},
},
{
name: "peer fsm event failed",
mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
peer.FSM.SetState(resource.PeerStateLeave)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Any()).Return(peer, true).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "peer fsm event failed: event Leave inappropriate in current state Leave"))
},
},
{
name: "peer leaves succeeded",
mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Any()).Return(peer, true).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT())
tc.expect(t, svc.LeavePeer(context.Background(), &schedulerv2.LeavePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}))
})
}
}