diff --git a/manager/rpcserver/manager_server_v1.go b/manager/rpcserver/manager_server_v1.go index 6228cd07e..a69294e28 100644 --- a/manager/rpcserver/manager_server_v1.go +++ b/manager/rpcserver/manager_server_v1.go @@ -113,7 +113,7 @@ func (s *managerServerV1) GetSeedPeer(ctx context.Context, req *managerv1.GetSee HostName: req.HostName, SeedPeerClusterID: uint(req.SeedPeerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // Marshal config of seed peer cluster. @@ -184,7 +184,8 @@ func (s *managerServerV1) UpdateSeedPeer(ctx context.Context, req *managerv1.Upd if errors.Is(err, gorm.ErrRecordNotFound) { return s.createSeedPeer(ctx, req) } - return nil, status.Error(codes.Unknown, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) } if err := s.db.WithContext(ctx).Model(&seedPeer).Updates(model.SeedPeer{ @@ -197,7 +198,7 @@ func (s *managerServerV1) UpdateSeedPeer(ctx context.Context, req *managerv1.Upd ObjectStoragePort: req.ObjectStoragePort, SeedPeerClusterID: uint(req.SeedPeerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -237,7 +238,7 @@ func (s *managerServerV1) createSeedPeer(ctx context.Context, req *managerv1.Upd } if err := s.db.WithContext(ctx).Create(&seedPeer).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv1.SeedPeer{ @@ -276,7 +277,7 @@ func (s *managerServerV1) GetScheduler(ctx context.Context, req *managerv1.GetSc HostName: req.HostName, SchedulerClusterID: uint(req.SchedulerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // Marshal config of scheduler. @@ -373,7 +374,8 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up if errors.Is(err, gorm.ErrRecordNotFound) { return s.createScheduler(ctx, req) } - return nil, status.Error(codes.Unknown, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) } if err := s.db.WithContext(ctx).Model(&scheduler).Updates(model.Scheduler{ @@ -383,7 +385,7 @@ func (s *managerServerV1) UpdateScheduler(ctx context.Context, req *managerv1.Up Port: req.Port, SchedulerClusterID: uint(req.SchedulerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -417,7 +419,7 @@ func (s *managerServerV1) createScheduler(ctx context.Context, req *managerv1.Up } if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv1.Scheduler{ @@ -463,7 +465,7 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis log.Debugf("%s cache miss", cacheKey) var schedulerClusters []model.SchedulerCluster if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo) @@ -541,7 +543,7 @@ func (s *managerServerV1) ListSchedulers(ctx context.Context, req *managerv1.Lis // Get object storage configuration. func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.GetObjectStorageRequest) (*managerv1.ObjectStorage, error) { if !s.objectStorageConfig.Enable { - return nil, status.Error(codes.NotFound, "object storage is disabled") + return nil, status.Error(codes.Internal, "object storage is disabled") } return &managerv1.ObjectStorage{ @@ -557,7 +559,7 @@ func (s *managerServerV1) GetObjectStorage(ctx context.Context, req *managerv1.G // List buckets configuration. func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBucketsRequest) (*managerv1.ListBucketsResponse, error) { if !s.objectStorageConfig.Enable { - return nil, status.Error(codes.NotFound, "object storage is disabled") + return nil, status.Error(codes.Internal, "object storage is disabled") } log := logger.WithHostnameAndIP(req.HostName, req.Ip) @@ -574,7 +576,7 @@ func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBu log.Debugf("%s cache miss", cacheKey) buckets, err := s.objectStorage.ListBucketMetadatas(ctx) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // Construct schedulers. @@ -601,7 +603,7 @@ func (s *managerServerV1) ListBuckets(ctx context.Context, req *managerv1.ListBu func (s *managerServerV1) ListModels(ctx context.Context, req *managerv1.ListModelsRequest) (*managerv1.ListModelsResponse, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } models := []*managerv1.Model{} @@ -609,7 +611,7 @@ func (s *managerServerV1) ListModels(ctx context.Context, req *managerv1.ListMod for iter.Next(ctx) { var model types.Model if err := s.rdb.Get(ctx, iter.Val()).Scan(&model); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } models = append(models, &managerv1.Model{ @@ -633,12 +635,12 @@ func (s *managerServerV1) ListModels(ctx context.Context, req *managerv1.ListMod func (s *managerServerV1) GetModel(ctx context.Context, req *managerv1.GetModelRequest) (*managerv1.Model, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } var model types.Model if err := s.rdb.Get(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Scan(&model); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv1.Model{ @@ -657,7 +659,7 @@ func (s *managerServerV1) GetModel(ctx context.Context, req *managerv1.GetModelR func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.CreateModelRequest) (*managerv1.Model, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } model := types.Model{ @@ -672,7 +674,7 @@ func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.Create } if _, err := s.rdb.Set(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, model.ID), &model, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv1.Model{ @@ -691,7 +693,7 @@ func (s *managerServerV1) CreateModel(ctx context.Context, req *managerv1.Create func (s *managerServerV1) UpdateModel(ctx context.Context, req *managerv1.UpdateModelRequest) (*managerv1.Model, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } model, err := s.GetModel(ctx, &managerv1.GetModelRequest{ @@ -699,7 +701,7 @@ func (s *managerServerV1) UpdateModel(ctx context.Context, req *managerv1.Update ModelId: req.ModelId, }) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } model.VersionId = req.VersionId @@ -715,7 +717,7 @@ func (s *managerServerV1) UpdateModel(ctx context.Context, req *managerv1.Update CreatedAt: model.CreatedAt.AsTime(), UpdatedAt: model.UpdatedAt.AsTime(), }, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return model, nil @@ -727,16 +729,16 @@ func (s *managerServerV1) DeleteModel(ctx context.Context, req *managerv1.Delete SchedulerId: req.SchedulerId, ModelId: req.ModelId, }); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if _, err := s.rdb.Del(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return nil, nil @@ -746,7 +748,7 @@ func (s *managerServerV1) DeleteModel(ctx context.Context, req *managerv1.Delete func (s *managerServerV1) ListModelVersions(ctx context.Context, req *managerv1.ListModelVersionsRequest) (*managerv1.ListModelVersionsResponse, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersions := []*managerv1.ModelVersion{} @@ -754,7 +756,7 @@ func (s *managerServerV1) ListModelVersions(ctx context.Context, req *managerv1. for iter.Next(ctx) { var modelVersion types.ModelVersion if err := s.rdb.Get(ctx, iter.Val()).Scan(&modelVersion); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersions = append(modelVersions, &managerv1.ModelVersion{ @@ -778,12 +780,12 @@ func (s *managerServerV1) ListModelVersions(ctx context.Context, req *managerv1. func (s *managerServerV1) GetModelVersion(ctx context.Context, req *managerv1.GetModelVersionRequest) (*managerv1.ModelVersion, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } var modelVersion types.ModelVersion if err := s.rdb.Get(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Scan(&modelVersion); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv1.ModelVersion{ @@ -802,7 +804,7 @@ func (s *managerServerV1) GetModelVersion(ctx context.Context, req *managerv1.Ge func (s *managerServerV1) CreateModelVersion(ctx context.Context, req *managerv1.CreateModelVersionRequest) (*managerv1.ModelVersion, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersion := types.ModelVersion{ @@ -817,7 +819,7 @@ func (s *managerServerV1) CreateModelVersion(ctx context.Context, req *managerv1 } if _, err := s.rdb.Set(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, modelVersion.ID), &modelVersion, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv1.ModelVersion{ @@ -836,7 +838,7 @@ func (s *managerServerV1) CreateModelVersion(ctx context.Context, req *managerv1 func (s *managerServerV1) UpdateModelVersion(ctx context.Context, req *managerv1.UpdateModelVersionRequest) (*managerv1.ModelVersion, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersion, err := s.GetModelVersion(ctx, &managerv1.GetModelVersionRequest{ @@ -845,7 +847,7 @@ func (s *managerServerV1) UpdateModelVersion(ctx context.Context, req *managerv1 VersionId: req.VersionId, }) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if req.Mae > 0 { @@ -880,7 +882,7 @@ func (s *managerServerV1) UpdateModelVersion(ctx context.Context, req *managerv1 CreatedAt: modelVersion.CreatedAt.AsTime(), UpdatedAt: modelVersion.UpdatedAt.AsTime(), }, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return modelVersion, nil @@ -893,16 +895,16 @@ func (s *managerServerV1) DeleteModelVersion(ctx context.Context, req *managerv1 ModelId: req.ModelId, VersionId: req.VersionId, }); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if _, err := s.rdb.Del(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return nil, nil @@ -986,7 +988,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er req, err := stream.Recv() if err != nil { logger.Errorf("keepalive failed for the first time: %s", err.Error()) - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } hostName := req.HostName ip := req.Ip @@ -1005,7 +1007,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er }).Updates(model.Scheduler{ State: model.SchedulerStateActive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -1025,7 +1027,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er }).Updates(model.SeedPeer{ State: model.SeedPeerStateActive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -1048,7 +1050,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er }).Updates(model.Scheduler{ State: model.SchedulerStateInactive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -1068,7 +1070,7 @@ func (s *managerServerV1) KeepAlive(stream managerv1.Manager_KeepAliveServer) er }).Updates(model.SeedPeer{ State: model.SeedPeerStateInactive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( diff --git a/manager/rpcserver/manager_server_v2.go b/manager/rpcserver/manager_server_v2.go index 2250a01a1..116431bf7 100644 --- a/manager/rpcserver/manager_server_v2.go +++ b/manager/rpcserver/manager_server_v2.go @@ -113,7 +113,7 @@ func (s *managerServerV2) GetSeedPeer(ctx context.Context, req *managerv2.GetSee HostName: req.HostName, SeedPeerClusterID: uint(req.SeedPeerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // Marshal config of seed peer cluster. @@ -184,7 +184,8 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd if errors.Is(err, gorm.ErrRecordNotFound) { return s.createSeedPeer(ctx, req) } - return nil, status.Error(codes.Unknown, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) } if err := s.db.WithContext(ctx).Model(&seedPeer).Updates(model.SeedPeer{ @@ -197,7 +198,7 @@ func (s *managerServerV2) UpdateSeedPeer(ctx context.Context, req *managerv2.Upd ObjectStoragePort: req.ObjectStoragePort, SeedPeerClusterID: uint(req.SeedPeerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -237,7 +238,7 @@ func (s *managerServerV2) createSeedPeer(ctx context.Context, req *managerv2.Upd } if err := s.db.WithContext(ctx).Create(&seedPeer).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv2.SeedPeer{ @@ -276,7 +277,7 @@ func (s *managerServerV2) GetScheduler(ctx context.Context, req *managerv2.GetSc HostName: req.HostName, SchedulerClusterID: uint(req.SchedulerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // Marshal config of scheduler. @@ -373,7 +374,8 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up if errors.Is(err, gorm.ErrRecordNotFound) { return s.createScheduler(ctx, req) } - return nil, status.Error(codes.Unknown, err.Error()) + + return nil, status.Error(codes.Internal, err.Error()) } if err := s.db.WithContext(ctx).Model(&scheduler).Updates(model.Scheduler{ @@ -383,7 +385,7 @@ func (s *managerServerV2) UpdateScheduler(ctx context.Context, req *managerv2.Up Port: req.Port, SchedulerClusterID: uint(req.SchedulerClusterId), }).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -417,7 +419,7 @@ func (s *managerServerV2) createScheduler(ctx context.Context, req *managerv2.Up } if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv2.Scheduler{ @@ -463,7 +465,7 @@ func (s *managerServerV2) ListSchedulers(ctx context.Context, req *managerv2.Lis log.Debugf("%s cache miss", cacheKey) var schedulerClusters []model.SchedulerCluster if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("SeedPeerClusters.SeedPeers", "state = ?", "active").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } log.Debugf("list scheduler clusters %v with hostInfo %#v", getSchedulerClusterNames(schedulerClusters), req.HostInfo) @@ -573,7 +575,7 @@ func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBu log.Debugf("%s cache miss", cacheKey) buckets, err := s.objectStorage.ListBucketMetadatas(ctx) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } // Construct schedulers. @@ -600,7 +602,7 @@ func (s *managerServerV2) ListBuckets(ctx context.Context, req *managerv2.ListBu func (s *managerServerV2) ListModels(ctx context.Context, req *managerv2.ListModelsRequest) (*managerv2.ListModelsResponse, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } models := []*managerv2.Model{} @@ -608,7 +610,7 @@ func (s *managerServerV2) ListModels(ctx context.Context, req *managerv2.ListMod for iter.Next(ctx) { var model types.Model if err := s.rdb.Get(ctx, iter.Val()).Scan(&model); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } models = append(models, &managerv2.Model{ @@ -632,12 +634,12 @@ func (s *managerServerV2) ListModels(ctx context.Context, req *managerv2.ListMod func (s *managerServerV2) GetModel(ctx context.Context, req *managerv2.GetModelRequest) (*managerv2.Model, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } var model types.Model if err := s.rdb.Get(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Scan(&model); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv2.Model{ @@ -656,7 +658,7 @@ func (s *managerServerV2) GetModel(ctx context.Context, req *managerv2.GetModelR func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.CreateModelRequest) (*managerv2.Model, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } model := types.Model{ @@ -671,7 +673,7 @@ func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.Create } if _, err := s.rdb.Set(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, model.ID), &model, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv2.Model{ @@ -690,7 +692,7 @@ func (s *managerServerV2) CreateModel(ctx context.Context, req *managerv2.Create func (s *managerServerV2) UpdateModel(ctx context.Context, req *managerv2.UpdateModelRequest) (*managerv2.Model, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } model, err := s.GetModel(ctx, &managerv2.GetModelRequest{ @@ -698,7 +700,7 @@ func (s *managerServerV2) UpdateModel(ctx context.Context, req *managerv2.Update ModelId: req.ModelId, }) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } model.VersionId = req.VersionId @@ -714,7 +716,7 @@ func (s *managerServerV2) UpdateModel(ctx context.Context, req *managerv2.Update CreatedAt: model.CreatedAt.AsTime(), UpdatedAt: model.UpdatedAt.AsTime(), }, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return model, nil @@ -726,16 +728,16 @@ func (s *managerServerV2) DeleteModel(ctx context.Context, req *managerv2.Delete SchedulerId: req.SchedulerId, ModelId: req.ModelId, }); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if _, err := s.rdb.Del(ctx, cache.MakeModelKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId)).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return nil, nil @@ -745,7 +747,7 @@ func (s *managerServerV2) DeleteModel(ctx context.Context, req *managerv2.Delete func (s *managerServerV2) ListModelVersions(ctx context.Context, req *managerv2.ListModelVersionsRequest) (*managerv2.ListModelVersionsResponse, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersions := []*managerv2.ModelVersion{} @@ -753,7 +755,7 @@ func (s *managerServerV2) ListModelVersions(ctx context.Context, req *managerv2. for iter.Next(ctx) { var modelVersion types.ModelVersion if err := s.rdb.Get(ctx, iter.Val()).Scan(&modelVersion); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersions = append(modelVersions, &managerv2.ModelVersion{ @@ -777,12 +779,12 @@ func (s *managerServerV2) ListModelVersions(ctx context.Context, req *managerv2. func (s *managerServerV2) GetModelVersion(ctx context.Context, req *managerv2.GetModelVersionRequest) (*managerv2.ModelVersion, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } var modelVersion types.ModelVersion if err := s.rdb.Get(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Scan(&modelVersion); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv2.ModelVersion{ @@ -801,7 +803,7 @@ func (s *managerServerV2) GetModelVersion(ctx context.Context, req *managerv2.Ge func (s *managerServerV2) CreateModelVersion(ctx context.Context, req *managerv2.CreateModelVersionRequest) (*managerv2.ModelVersion, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersion := types.ModelVersion{ @@ -816,7 +818,7 @@ func (s *managerServerV2) CreateModelVersion(ctx context.Context, req *managerv2 } if _, err := s.rdb.Set(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, modelVersion.ID), &modelVersion, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return &managerv2.ModelVersion{ @@ -835,7 +837,7 @@ func (s *managerServerV2) CreateModelVersion(ctx context.Context, req *managerv2 func (s *managerServerV2) UpdateModelVersion(ctx context.Context, req *managerv2.UpdateModelVersionRequest) (*managerv2.ModelVersion, error) { scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } modelVersion, err := s.GetModelVersion(ctx, &managerv2.GetModelVersionRequest{ @@ -844,7 +846,7 @@ func (s *managerServerV2) UpdateModelVersion(ctx context.Context, req *managerv2 VersionId: req.VersionId, }) if err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if req.Mae > 0 { @@ -879,7 +881,7 @@ func (s *managerServerV2) UpdateModelVersion(ctx context.Context, req *managerv2 CreatedAt: modelVersion.CreatedAt.AsTime(), UpdatedAt: modelVersion.UpdatedAt.AsTime(), }, 0).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return modelVersion, nil @@ -892,16 +894,16 @@ func (s *managerServerV2) DeleteModelVersion(ctx context.Context, req *managerv2 ModelId: req.ModelId, VersionId: req.VersionId, }); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } scheduler := model.Scheduler{} if err := s.db.WithContext(ctx).First(&scheduler, req.SchedulerId).Error; err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if _, err := s.rdb.Del(ctx, cache.MakeModelVersionKey(scheduler.SchedulerClusterID, scheduler.HostName, scheduler.IP, req.ModelId, req.VersionId)).Result(); err != nil { - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } return nil, nil @@ -927,7 +929,7 @@ func (s *managerServerV2) ListApplications(ctx context.Context, req *managerv2.L return nil, status.Error(codes.NotFound, err.Error()) } - return nil, status.Error(codes.Unknown, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if len(applications) == 0 { @@ -985,7 +987,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er req, err := stream.Recv() if err != nil { logger.Errorf("keepalive failed for the first time: %s", err.Error()) - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } hostName := req.HostName ip := req.Ip @@ -1004,7 +1006,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er }).Updates(model.Scheduler{ State: model.SchedulerStateActive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -1024,7 +1026,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er }).Updates(model.SeedPeer{ State: model.SeedPeerStateActive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -1047,7 +1049,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er }).Updates(model.Scheduler{ State: model.SchedulerStateInactive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( @@ -1067,7 +1069,7 @@ func (s *managerServerV2) KeepAlive(stream managerv2.Manager_KeepAliveServer) er }).Updates(model.SeedPeer{ State: model.SeedPeerStateInactive, }).Error; err != nil { - return status.Error(codes.Unknown, err.Error()) + return status.Error(codes.Internal, err.Error()) } if err := s.cache.Delete( diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 31c304ad3..acb80741b 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -46,6 +46,48 @@ var ( // Variables declared for metrics. var ( + AnnouncePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "announce_peer_total", + Help: "Counter of the number of the announcing peer.", + }, []string{"tag", "app"}) + + AnnouncePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "announce_peer_failure_total", + Help: "Counter of the number of failed of the announcing peer.", + }, []string{"tag", "app"}) + + StatPeerCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "stat_peer_total", + Help: "Counter of the number of the stat peer.", + }) + + StatPeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "stat_peer_failure_total", + Help: "Counter of the number of failed of the stat peer.", + }) + + LeavePeerCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "leave_peer_total", + Help: "Counter of the number of the leaving peer.", + }) + + LeavePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "leave_peer_failure_total", + Help: "Counter of the number of failed of the leaving peer.", + }) + RegisterTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 990ea7c4b..81697470e 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -25,6 +25,7 @@ import ( schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/service" @@ -54,14 +55,27 @@ func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePe return nil } -// Checks information of peer. +// StatPeer checks information of peer. func (s *schedulerServerV2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) { - return nil, nil + metrics.StatPeerCount.Inc() + peer, err := s.service.StatPeer(ctx, req) + if err != nil { + metrics.StatPeerFailureCount.Inc() + return nil, err + } + + return peer, nil } // LeavePeer releases peer in scheduler. func (s *schedulerServerV2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) (*emptypb.Empty, error) { - return nil, nil + metrics.LeavePeerCount.Inc() + if err := s.service.LeavePeer(ctx, req); err != nil { + metrics.LeavePeerFailureCount.Inc() + return new(emptypb.Empty), err + } + + return new(emptypb.Empty), nil } // TODO exchange peer api definition. diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 6dee55035..b05fec126 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -198,6 +198,16 @@ var ( } mockURLMetaRange = "0-9" mockPieceMD5 = digest.New(digest.AlgorithmMD5, "86d3f3a95c324c9479bd8986968f4327") + mockPiece = resource.Piece{ + Number: 1, + ParentID: "foo", + Offset: 2, + Length: 10, + Digest: mockPieceMD5, + TrafficType: commonv2.TrafficType_REMOTE_PEER, + Cost: 1 * time.Minute, + CreatedAt: time.Now(), + } ) func TestService_NewV1(t *testing.T) { @@ -227,7 +237,7 @@ func TestService_NewV1(t *testing.T) { } } -func TestService_RegisterPeerTask(t *testing.T) { +func TestServiceV1_RegisterPeerTask(t *testing.T) { tests := []struct { name string req *schedulerv1.PeerTaskRequest @@ -911,7 +921,7 @@ func TestService_RegisterPeerTask(t *testing.T) { } } -func TestService_ReportPieceResult(t *testing.T) { +func TestServiceV1_ReportPieceResult(t *testing.T) { tests := []struct { name string mock func( @@ -1164,7 +1174,7 @@ func TestService_ReportPieceResult(t *testing.T) { } } -func TestService_ReportPeerResult(t *testing.T) { +func TestServiceV1_ReportPeerResult(t *testing.T) { tests := []struct { name string req *schedulerv1.PeerResult @@ -1361,7 +1371,7 @@ func TestService_ReportPeerResult(t *testing.T) { } } -func TestService_StatTask(t *testing.T) { +func TestServiceV1_StatTask(t *testing.T) { tests := []struct { name string mock func(mockTask *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) @@ -1423,7 +1433,7 @@ func TestService_StatTask(t *testing.T) { } } -func TestService_AnnounceTask(t *testing.T) { +func TestServiceV1_AnnounceTask(t *testing.T) { tests := []struct { name string req *schedulerv1.AnnounceTaskRequest @@ -1722,7 +1732,7 @@ func TestService_AnnounceTask(t *testing.T) { } } -func TestService_LeaveTask(t *testing.T) { +func TestServiceV1_LeaveTask(t *testing.T) { tests := []struct { name string mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) @@ -1920,7 +1930,7 @@ func TestService_LeaveTask(t *testing.T) { } } -func TestService_LeaveHost(t *testing.T) { +func TestServiceV1_LeaveHost(t *testing.T) { tests := []struct { name string mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) @@ -2137,7 +2147,7 @@ func TestService_LeaveHost(t *testing.T) { } } -func TestService_triggerTask(t *testing.T) { +func TestServiceV1_triggerTask(t *testing.T) { tests := []struct { name string config *config.Config @@ -2604,7 +2614,7 @@ func TestService_triggerTask(t *testing.T) { } } -func TestService_storeTask(t *testing.T) { +func TestServiceV1_storeTask(t *testing.T) { tests := []struct { name string run func(t *testing.T, svc *V1, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) @@ -2698,7 +2708,7 @@ func TestService_storeTask(t *testing.T) { } } -func TestService_storeHost(t *testing.T) { +func TestServiceV1_storeHost(t *testing.T) { tests := []struct { name string peerHost *schedulerv1.PeerHost @@ -2783,7 +2793,7 @@ func TestService_storeHost(t *testing.T) { } } -func TestService_storePeer(t *testing.T) { +func TestServiceV1_storePeer(t *testing.T) { tests := []struct { name string run func(t *testing.T, svc *V1, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) @@ -2863,7 +2873,7 @@ func TestService_storePeer(t *testing.T) { } } -func TestService_triggerSeedPeerTask(t *testing.T) { +func TestServiceV1_triggerSeedPeerTask(t *testing.T) { tests := []struct { name string mock func(task *resource.Task, peer *resource.Peer, seedPeer resource.SeedPeer, mr *resource.MockResourceMockRecorder, mc *resource.MockSeedPeerMockRecorder) @@ -2929,7 +2939,7 @@ func TestService_triggerSeedPeerTask(t *testing.T) { } } -func TestService_handleBeginOfPiece(t *testing.T) { +func TestServiceV1_handleBeginOfPiece(t *testing.T) { tests := []struct { name string mock func(peer *resource.Peer, scheduling *mocks.MockSchedulingMockRecorder) @@ -3010,7 +3020,7 @@ func TestService_handleBeginOfPiece(t *testing.T) { } } -func TestService_handlePieceSuccess(t *testing.T) { +func TestServiceV1_handlePieceSuccess(t *testing.T) { mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) @@ -3145,7 +3155,7 @@ func TestService_handlePieceSuccess(t *testing.T) { } } -func TestService_handlePieceFail(t *testing.T) { +func TestServiceV1_handlePieceFail(t *testing.T) { tests := []struct { name string @@ -3339,7 +3349,7 @@ func TestService_handlePieceFail(t *testing.T) { } } -func TestService_handlePeerSuccess(t *testing.T) { +func TestServiceV1_handlePeerSuccess(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if _, err := w.Write([]byte{1}); err != nil { w.WriteHeader(http.StatusInternalServerError) @@ -3464,7 +3474,7 @@ func TestService_handlePeerSuccess(t *testing.T) { } } -func TestService_handlePeerFail(t *testing.T) { +func TestServiceV1_handlePeerFail(t *testing.T) { tests := []struct { name string @@ -3545,7 +3555,7 @@ func TestService_handlePeerFail(t *testing.T) { } } -func TestService_handleTaskSuccess(t *testing.T) { +func TestServiceV1_handleTaskSuccess(t *testing.T) { tests := []struct { name string result *schedulerv1.PeerResult @@ -3626,7 +3636,7 @@ func TestService_handleTaskSuccess(t *testing.T) { } } -func TestService_handleTaskFail(t *testing.T) { +func TestServiceV1_handleTaskFail(t *testing.T) { rst := status.Newf(codes.Aborted, "response is not valid") st, err := rst.WithDetails(&errordetailsv1.SourceError{Temporary: false}) if err != nil { diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index b46531964..f8c7585aa 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -18,11 +18,19 @@ package service import ( "context" + "fmt" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" commonv2 "d7y.io/api/pkg/apis/common/v2" schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" + logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/storage" @@ -69,13 +77,196 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error return nil } -// Checks information of peer. +// StatPeer checks information of peer. func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) { - return nil, nil + logger.WithTaskID(req.TaskId).Infof("stat peer request: %#v", req) + + peer, loaded := v.resource.PeerManager().Load(req.PeerId) + if !loaded { + return nil, status.Errorf(codes.NotFound, "peer %s not found", req.PeerId) + } + + resp := &commonv2.Peer{ + Id: peer.ID, + Priority: peer.Priority, + Cost: durationpb.New(peer.Cost.Load()), + State: peer.FSM.Current(), + NeedBackToSource: peer.NeedBackToSource.Load(), + CreatedAt: timestamppb.New(peer.CreatedAt.Load()), + UpdatedAt: timestamppb.New(peer.UpdatedAt.Load()), + } + + // Set range to response. + if peer.Range != nil { + resp.Range = &commonv2.Range{ + Start: peer.Range.Start, + Length: peer.Range.Length, + } + } + + // Set pieces to response. + peer.Pieces.Range(func(key, value any) bool { + piece, ok := value.(*resource.Piece) + if !ok { + peer.Log.Errorf("invalid piece %s %#v", key, value) + return true + } + + respPiece := &commonv2.Piece{ + Number: piece.Number, + ParentId: piece.ParentID, + Offset: piece.Offset, + Length: piece.Length, + TrafficType: piece.TrafficType, + Cost: durationpb.New(piece.Cost), + CreatedAt: timestamppb.New(piece.CreatedAt), + } + + if piece.Digest != nil { + respPiece.Digest = piece.Digest.String() + } + + resp.Pieces = append(resp.Pieces, respPiece) + return true + }) + + // Set task to response. + resp.Task = &commonv2.Task{ + Id: peer.Task.ID, + Type: peer.Task.Type, + Url: peer.Task.URL, + Tag: peer.Task.Tag, + Application: peer.Task.Application, + Filters: peer.Task.Filters, + Header: peer.Task.Header, + PieceLength: peer.Task.PieceLength, + ContentLength: peer.Task.ContentLength.Load(), + PieceCount: peer.Task.TotalPieceCount.Load(), + SizeScope: peer.Task.SizeScope(), + State: peer.Task.FSM.Current(), + PeerCount: int32(peer.Task.PeerCount()), + CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()), + } + + // Set digest to response. + if peer.Task.Digest != nil { + resp.Task.Digest = peer.Task.Digest.String() + } + + // Set pieces to response. + peer.Task.Pieces.Range(func(key, value any) bool { + piece, ok := value.(*resource.Piece) + if !ok { + peer.Task.Log.Errorf("invalid piece %s %#v", key, value) + return true + } + + respPiece := &commonv2.Piece{ + Number: piece.Number, + ParentId: piece.ParentID, + Offset: piece.Offset, + Length: piece.Length, + TrafficType: piece.TrafficType, + Cost: durationpb.New(piece.Cost), + CreatedAt: timestamppb.New(piece.CreatedAt), + } + + if piece.Digest != nil { + respPiece.Digest = piece.Digest.String() + } + + resp.Task.Pieces = append(resp.Task.Pieces, respPiece) + return true + }) + + // Set host to response. + resp.Host = &commonv2.Host{ + Id: peer.Host.ID, + Type: uint32(peer.Host.Type), + Hostname: peer.Host.Hostname, + Ip: peer.Host.IP, + Port: peer.Host.Port, + DownloadPort: peer.Host.DownloadPort, + Os: peer.Host.OS, + Platform: peer.Host.Platform, + PlatformFamily: peer.Host.PlatformFamily, + PlatformVersion: peer.Host.PlatformVersion, + KernelVersion: peer.Host.KernelVersion, + Cpu: &commonv2.CPU{ + LogicalCount: peer.Host.CPU.LogicalCount, + PhysicalCount: peer.Host.CPU.PhysicalCount, + Percent: peer.Host.CPU.Percent, + ProcessPercent: peer.Host.CPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: peer.Host.CPU.Times.User, + System: peer.Host.CPU.Times.System, + Idle: peer.Host.CPU.Times.Idle, + Nice: peer.Host.CPU.Times.Nice, + Iowait: peer.Host.CPU.Times.Iowait, + Irq: peer.Host.CPU.Times.Irq, + Softirq: peer.Host.CPU.Times.Softirq, + Steal: peer.Host.CPU.Times.Steal, + Guest: peer.Host.CPU.Times.Guest, + GuestNice: peer.Host.CPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: peer.Host.Memory.Total, + Available: peer.Host.Memory.Available, + Used: peer.Host.Memory.Used, + UsedPercent: peer.Host.Memory.UsedPercent, + ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent, + Free: peer.Host.Memory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: peer.Host.Network.TCPConnectionCount, + UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount, + SecurityDomain: peer.Host.Network.SecurityDomain, + Location: peer.Host.Network.Location, + Idc: peer.Host.Network.IDC, + }, + Disk: &commonv2.Disk{ + Total: peer.Host.Disk.Total, + Free: peer.Host.Disk.Free, + Used: peer.Host.Disk.Used, + UsedPercent: peer.Host.Disk.UsedPercent, + InodesTotal: peer.Host.Disk.InodesTotal, + InodesUsed: peer.Host.Disk.InodesUsed, + InodesFree: peer.Host.Disk.InodesFree, + InodesUsedPercent: peer.Host.Disk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: peer.Host.Build.GitVersion, + GitCommit: peer.Host.Build.GitCommit, + GoVersion: peer.Host.Build.GoVersion, + Platform: peer.Host.Build.Platform, + }, + } + + return resp, nil } // LeavePeer releases peer in scheduler. func (v *V2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) error { + logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("leave peer request: %#v", req) + + peer, loaded := v.resource.PeerManager().Load(req.PeerId) + if !loaded { + metrics.LeavePeerFailureCount.Inc() + msg := fmt.Sprintf("peer %s not found", req.PeerId) + logger.Error(msg) + return status.Error(codes.NotFound, msg) + } + metrics.LeavePeerCount.Inc() + + if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil { + metrics.LeavePeerFailureCount.Inc() + msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) + peer.Log.Error(msg) + return status.Error(codes.FailedPrecondition, msg) + } + return nil } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 0bdb37ad6..4a4287820 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -17,11 +17,19 @@ package service import ( + "context" "reflect" "testing" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv2 "d7y.io/api/pkg/apis/common/v2" + schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" @@ -56,3 +64,248 @@ func TestService_NewV2(t *testing.T) { }) } } + +func TestServiceV2_StatPeer(t *testing.T) { + tests := []struct { + name string + mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + expect func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) + }{ + { + name: "peer not found", + mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Any()).Return(nil, false).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) { + assert := assert.New(t) + assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID)) + }, + }, + { + name: "peer has been loaded", + mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + peer.StorePiece(&mockPiece) + peer.Task.StorePiece(&mockPiece) + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Any()).Return(peer, true).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) { + assert := assert.New(t) + assert.EqualValues(resp, &commonv2.Peer{ + Id: peer.ID, + Range: &commonv2.Range{ + Start: peer.Range.Start, + Length: peer.Range.Length, + }, + Priority: peer.Priority, + Pieces: []*commonv2.Piece{ + { + Number: mockPiece.Number, + ParentId: mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + Cost: durationpb.New(peer.Cost.Load()), + State: peer.FSM.Current(), + Task: &commonv2.Task{ + Id: peer.Task.ID, + Type: peer.Task.Type, + Url: peer.Task.URL, + Digest: peer.Task.Digest.String(), + Tag: peer.Task.Tag, + Application: peer.Task.Application, + Filters: peer.Task.Filters, + Header: peer.Task.Header, + PieceLength: peer.Task.PieceLength, + ContentLength: peer.Task.ContentLength.Load(), + PieceCount: peer.Task.TotalPieceCount.Load(), + SizeScope: peer.Task.SizeScope(), + Pieces: []*commonv2.Piece{ + { + Number: mockPiece.Number, + ParentId: mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + State: peer.Task.FSM.Current(), + PeerCount: int32(peer.Task.PeerCount()), + CreatedAt: timestamppb.New(peer.Task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()), + }, + Host: &commonv2.Host{ + Id: peer.Host.ID, + Type: uint32(peer.Host.Type), + Hostname: peer.Host.Hostname, + Ip: peer.Host.IP, + Port: peer.Host.Port, + DownloadPort: peer.Host.DownloadPort, + Os: peer.Host.OS, + Platform: peer.Host.Platform, + PlatformFamily: peer.Host.PlatformFamily, + PlatformVersion: peer.Host.PlatformVersion, + KernelVersion: peer.Host.KernelVersion, + Cpu: &commonv2.CPU{ + LogicalCount: peer.Host.CPU.LogicalCount, + PhysicalCount: peer.Host.CPU.PhysicalCount, + Percent: peer.Host.CPU.Percent, + ProcessPercent: peer.Host.CPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: peer.Host.CPU.Times.User, + System: peer.Host.CPU.Times.System, + Idle: peer.Host.CPU.Times.Idle, + Nice: peer.Host.CPU.Times.Nice, + Iowait: peer.Host.CPU.Times.Iowait, + Irq: peer.Host.CPU.Times.Irq, + Softirq: peer.Host.CPU.Times.Softirq, + Steal: peer.Host.CPU.Times.Steal, + Guest: peer.Host.CPU.Times.Guest, + GuestNice: peer.Host.CPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: peer.Host.Memory.Total, + Available: peer.Host.Memory.Available, + Used: peer.Host.Memory.Used, + UsedPercent: peer.Host.Memory.UsedPercent, + ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent, + Free: peer.Host.Memory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: peer.Host.Network.TCPConnectionCount, + UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount, + SecurityDomain: peer.Host.Network.SecurityDomain, + Location: peer.Host.Network.Location, + Idc: peer.Host.Network.IDC, + }, + Disk: &commonv2.Disk{ + Total: peer.Host.Disk.Total, + Free: peer.Host.Disk.Free, + Used: peer.Host.Disk.Used, + UsedPercent: peer.Host.Disk.UsedPercent, + InodesTotal: peer.Host.Disk.InodesTotal, + InodesUsed: peer.Host.Disk.InodesUsed, + InodesFree: peer.Host.Disk.InodesFree, + InodesUsedPercent: peer.Host.Disk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: peer.Host.Build.GitVersion, + GitCommit: peer.Host.Build.GitCommit, + GoVersion: peer.Host.Build.GoVersion, + Platform: peer.Host.Build.Platform, + }, + }, + NeedBackToSource: peer.NeedBackToSource.Load(), + CreatedAt: timestamppb.New(peer.CreatedAt.Load()), + UpdatedAt: timestamppb.New(peer.UpdatedAt.Load()), + }) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + peerManager := resource.NewMockPeerManager(ctl) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange)) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + + tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT()) + resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}) + tc.expect(t, peer, resp, err) + }) + } +} + +func TestService_LeavePeer(t *testing.T) { + tests := []struct { + name string + mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + expect func(t *testing.T, err error) + }{ + { + name: "peer not found", + mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Any()).Return(nil, false).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.ErrorIs(err, status.Errorf(codes.NotFound, "peer %s not found", mockPeerID)) + }, + }, + { + name: "peer fsm event failed", + mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + peer.FSM.SetState(resource.PeerStateLeave) + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Any()).Return(peer, true).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.ErrorIs(err, status.Error(codes.FailedPrecondition, "peer fsm event failed: event Leave inappropriate in current state Leave")) + }, + }, + { + name: "peer leaves succeeded", + mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Any()).Return(peer, true).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + peerManager := resource.NewMockPeerManager(ctl) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange)) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + + tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT()) + tc.expect(t, svc.LeavePeer(context.Background(), &schedulerv2.LeavePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})) + }) + } +}