feat: change PersistentCacheTask to value type and ensure empty slice response (#3945)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
4265552bef
commit
18a8026e45
|
|
@ -827,10 +827,10 @@ func (mr *MockServiceMockRecorder) GetPermissions(arg0, arg1 any) *gomock.Call {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPersistentCacheTask mocks base method.
|
// GetPersistentCacheTask mocks base method.
|
||||||
func (m *MockService) GetPersistentCacheTask(arg0 context.Context, arg1 uint, arg2 string) (*types.PersistentCacheTask, error) {
|
func (m *MockService) GetPersistentCacheTask(arg0 context.Context, arg1 uint, arg2 string) (types.PersistentCacheTask, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "GetPersistentCacheTask", arg0, arg1, arg2)
|
ret := m.ctrl.Call(m, "GetPersistentCacheTask", arg0, arg1, arg2)
|
||||||
ret0, _ := ret[0].(*types.PersistentCacheTask)
|
ret0, _ := ret[0].(types.PersistentCacheTask)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(error)
|
||||||
return ret0, ret1
|
return ret0, ret1
|
||||||
}
|
}
|
||||||
|
|
@ -842,10 +842,10 @@ func (mr *MockServiceMockRecorder) GetPersistentCacheTask(arg0, arg1, arg2 any)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPersistentCacheTasks mocks base method.
|
// GetPersistentCacheTasks mocks base method.
|
||||||
func (m *MockService) GetPersistentCacheTasks(arg0 context.Context, arg1 types.GetPersistentCacheTasksQuery) ([]*types.PersistentCacheTask, int64, error) {
|
func (m *MockService) GetPersistentCacheTasks(arg0 context.Context, arg1 types.GetPersistentCacheTasksQuery) ([]types.PersistentCacheTask, int64, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "GetPersistentCacheTasks", arg0, arg1)
|
ret := m.ctrl.Call(m, "GetPersistentCacheTasks", arg0, arg1)
|
||||||
ret0, _ := ret[0].([]*types.PersistentCacheTask)
|
ret0, _ := ret[0].([]types.PersistentCacheTask)
|
||||||
ret1, _ := ret[1].(int64)
|
ret1, _ := ret[1].(int64)
|
||||||
ret2, _ := ret[2].(error)
|
ret2, _ := ret[2].(error)
|
||||||
return ret0, ret1, ret2
|
return ret0, ret1, ret2
|
||||||
|
|
|
||||||
|
|
@ -38,24 +38,28 @@ func (s *service) DestroyPersistentCacheTask(ctx context.Context, schedulerClust
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPersistentCacheTask retrieves a persistent cache task from Redis based on query parameters.
|
// GetPersistentCacheTask retrieves a persistent cache task from Redis based on query parameters.
|
||||||
func (s *service) GetPersistentCacheTask(ctx context.Context, schedulerClusterID uint, id string) (*types.PersistentCacheTask, error) {
|
func (s *service) GetPersistentCacheTask(ctx context.Context, schedulerClusterID uint, id string) (types.PersistentCacheTask, error) {
|
||||||
return s.loadTask(ctx, schedulerClusterID, id)
|
return s.loadTask(ctx, schedulerClusterID, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPersistentCacheTasks retrieves persistent cache tasks from Redis based on query parameters.
|
// GetPersistentCacheTasks retrieves persistent cache tasks from Redis based on query parameters.
|
||||||
func (s *service) GetPersistentCacheTasks(ctx context.Context, q types.GetPersistentCacheTasksQuery) ([]*types.PersistentCacheTask, int64, error) {
|
func (s *service) GetPersistentCacheTasks(ctx context.Context, q types.GetPersistentCacheTasksQuery) ([]types.PersistentCacheTask, int64, error) {
|
||||||
tasks, err := s.loadAllTasks(ctx, q.SchedulerClusterID)
|
tasks, err := s.loadAllTasks(ctx, q.SchedulerClusterID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
return []types.PersistentCacheTask{}, 0, nil
|
||||||
|
}
|
||||||
|
|
||||||
return tasks, int64(len(tasks)), nil
|
return tasks, int64(len(tasks)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadAllTasks loads all persistent cache tasks from Redis based on the provided scheduler cluster ID.
|
// loadAllTasks loads all persistent cache tasks from Redis based on the provided scheduler cluster ID.
|
||||||
func (s *service) loadAllTasks(ctx context.Context, schedulerClusterID uint) ([]*types.PersistentCacheTask, error) {
|
func (s *service) loadAllTasks(ctx context.Context, schedulerClusterID uint) ([]types.PersistentCacheTask, error) {
|
||||||
var (
|
var (
|
||||||
tasks []*types.PersistentCacheTask
|
tasks []types.PersistentCacheTask
|
||||||
cursor uint64
|
cursor uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -126,18 +130,18 @@ func (s *service) loadAllTasks(ctx context.Context, schedulerClusterID uint) ([]
|
||||||
}
|
}
|
||||||
|
|
||||||
// loadTask loads a task from Redis based on the provided key.
|
// loadTask loads a task from Redis based on the provided key.
|
||||||
func (s *service) loadTask(ctx context.Context, schedulerClusterID uint, id string) (*types.PersistentCacheTask, error) {
|
func (s *service) loadTask(ctx context.Context, schedulerClusterID uint, id string) (types.PersistentCacheTask, error) {
|
||||||
taskKey := pkgredis.MakePersistentCacheTaskKeyInScheduler(schedulerClusterID, id)
|
taskKey := pkgredis.MakePersistentCacheTaskKeyInScheduler(schedulerClusterID, id)
|
||||||
rawTask, err := s.rdb.HGetAll(ctx, taskKey).Result()
|
rawTask, err := s.rdb.HGetAll(ctx, taskKey).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rawTask) == 0 {
|
if len(rawTask) == 0 {
|
||||||
return nil, errors.New("task not found")
|
return types.PersistentCacheTask{}, errors.New("task not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
task := &types.PersistentCacheTask{
|
task := types.PersistentCacheTask{
|
||||||
ID: rawTask["id"],
|
ID: rawTask["id"],
|
||||||
Tag: rawTask["tag"],
|
Tag: rawTask["tag"],
|
||||||
Application: rawTask["application"],
|
Application: rawTask["application"],
|
||||||
|
|
@ -147,55 +151,55 @@ func (s *service) loadTask(ctx context.Context, schedulerClusterID uint, id stri
|
||||||
// Parse PersistentReplicaCount.
|
// Parse PersistentReplicaCount.
|
||||||
persistentReplicaCount, err := strconv.ParseUint(rawTask["persistent_replica_count"], 10, 64)
|
persistentReplicaCount, err := strconv.ParseUint(rawTask["persistent_replica_count"], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.PersistentReplicaCount = persistentReplicaCount
|
task.PersistentReplicaCount = persistentReplicaCount
|
||||||
|
|
||||||
// Parse PieceLength.
|
// Parse PieceLength.
|
||||||
pieceLength, err := strconv.ParseUint(rawTask["piece_length"], 10, 64)
|
pieceLength, err := strconv.ParseUint(rawTask["piece_length"], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.PieceLength = pieceLength
|
task.PieceLength = pieceLength
|
||||||
|
|
||||||
// Parse ContentLength.
|
// Parse ContentLength.
|
||||||
contentLength, err := strconv.ParseUint(rawTask["content_length"], 10, 64)
|
contentLength, err := strconv.ParseUint(rawTask["content_length"], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.ContentLength = contentLength
|
task.ContentLength = contentLength
|
||||||
|
|
||||||
// Parse TotalPieceCount.
|
// Parse TotalPieceCount.
|
||||||
totalPieceCount, err := strconv.ParseUint(rawTask["total_piece_count"], 10, 32)
|
totalPieceCount, err := strconv.ParseUint(rawTask["total_piece_count"], 10, 32)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.TotalPieceCount = uint32(totalPieceCount)
|
task.TotalPieceCount = uint32(totalPieceCount)
|
||||||
|
|
||||||
// Parse TTL.
|
// Parse TTL.
|
||||||
ttl, err := strconv.ParseInt(rawTask["ttl"], 10, 64)
|
ttl, err := strconv.ParseInt(rawTask["ttl"], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.TTL = time.Duration(ttl)
|
task.TTL = time.Duration(ttl)
|
||||||
|
|
||||||
// Parse CreatedAt.
|
// Parse CreatedAt.
|
||||||
createdAt, err := time.Parse(time.RFC3339, rawTask["created_at"])
|
createdAt, err := time.Parse(time.RFC3339, rawTask["created_at"])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.CreatedAt = createdAt
|
task.CreatedAt = createdAt
|
||||||
|
|
||||||
// Parse UpdatedAt.
|
// Parse UpdatedAt.
|
||||||
updatedAt, err := time.Parse(time.RFC3339, rawTask["updated_at"])
|
updatedAt, err := time.Parse(time.RFC3339, rawTask["updated_at"])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.UpdatedAt = updatedAt
|
task.UpdatedAt = updatedAt
|
||||||
|
|
||||||
peers, err := s.loadAllPeersByTaskID(ctx, schedulerClusterID, task.ID)
|
peers, err := s.loadAllPeersByTaskID(ctx, schedulerClusterID, task.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return types.PersistentCacheTask{}, err
|
||||||
}
|
}
|
||||||
task.Peers = peers
|
task.Peers = peers
|
||||||
return task, nil
|
return task, nil
|
||||||
|
|
|
||||||
|
|
@ -139,8 +139,8 @@ type Service interface {
|
||||||
GetPersonalAccessTokens(context.Context, types.GetPersonalAccessTokensQuery) ([]models.PersonalAccessToken, int64, error)
|
GetPersonalAccessTokens(context.Context, types.GetPersonalAccessTokensQuery) ([]models.PersonalAccessToken, int64, error)
|
||||||
|
|
||||||
DestroyPersistentCacheTask(context.Context, uint, string) error
|
DestroyPersistentCacheTask(context.Context, uint, string) error
|
||||||
GetPersistentCacheTask(context.Context, uint, string) (*types.PersistentCacheTask, error)
|
GetPersistentCacheTask(context.Context, uint, string) (types.PersistentCacheTask, error)
|
||||||
GetPersistentCacheTasks(context.Context, types.GetPersistentCacheTasksQuery) ([]*types.PersistentCacheTask, int64, error)
|
GetPersistentCacheTasks(context.Context, types.GetPersistentCacheTasksQuery) ([]types.PersistentCacheTask, int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
|
|
|
||||||
|
|
@ -29,17 +29,17 @@ type PersistentCacheTaskParams struct {
|
||||||
|
|
||||||
type DestroyPersistentCacheTaskQuery struct {
|
type DestroyPersistentCacheTaskQuery struct {
|
||||||
// SchedulerClusterID is the scheduler cluster id of the persistent cache.
|
// SchedulerClusterID is the scheduler cluster id of the persistent cache.
|
||||||
SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"required"`
|
SchedulerClusterID uint `form:"scheduler_cluster_id" binding:"required"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetPersistentCacheTaskQuery struct {
|
type GetPersistentCacheTaskQuery struct {
|
||||||
// SchedulerClusterID is the scheduler cluster id of the persistent cache.
|
// SchedulerClusterID is the scheduler cluster id of the persistent cache.
|
||||||
SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"required"`
|
SchedulerClusterID uint `form:"scheduler_cluster_id" binding:"required"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GetPersistentCacheTasksQuery struct {
|
type GetPersistentCacheTasksQuery struct {
|
||||||
// SchedulerClusterID is the scheduler cluster id of the persistent cache.
|
// SchedulerClusterID is the scheduler cluster id of the persistent cache.
|
||||||
SchedulerClusterID uint `json:"scheduler_cluster_id" binding:"required"`
|
SchedulerClusterID uint `form:"scheduler_cluster_id" binding:"required"`
|
||||||
|
|
||||||
// Page is the page number of the persistent cache list.
|
// Page is the page number of the persistent cache list.
|
||||||
Page int `form:"page" binding:"omitempty,gte=1"`
|
Page int `form:"page" binding:"omitempty,gte=1"`
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue