diff --git a/docs/en/preheat/api.md b/docs/en/preheat/api.md index 98b5a6cf9..41e0c2afe 100644 --- a/docs/en/preheat/api.md +++ b/docs/en/preheat/api.md @@ -2,32 +2,34 @@ Use preheat apis for preheating. First create a POST request for preheating, you can refer to [create preheat api document](../api-reference/api-reference.md#create-preheat) -If the `scheduler_cluster_id` does not exist, it means to preheat all scheduler clusters. +If the `scheduler_cluster_ids` does not exist, it means to preheat all scheduler clusters. ```bash -curl --request POST 'http://dragonfly-manager:8080/api/v1/preheats' \ +curl --location --request POST 'http://dragonfly-manager:8080/api/v1/jobs' \ --header 'Content-Type: application/json' \ --data-raw '{ - "type": "image", - "url": "https://registry-1.docker.io/v2/library/busybox/manifests/latest", - "scheduler_cluster_id": 1 + "type": "preheat", + "args": { + "type": "image", + "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" + } }' ``` If the output of command above has content like ```bash -{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"PENDING","create_at":"2021-10-09T11:54:50.6182794Z"} +{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "PENDING", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }} ``` Polling the preheating status with id. if status is `SUCCESS`, preheating is successful, you can refer to [get preheat api document](../api-reference/api-reference.md#get-preheat) ```bash -curl --request GET 'http://dragonfly-manager:8080/api/v1/preheats/group_28439e0b-d4c3-43bf-945e-482b54c49dc5' +curl --request GET 'http://dragonfly-manager:8080/api/v1/jobs/1' ``` If the status is `SUCCESS`, the preheating is successful. ```bash -{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"SUCCESS","create_at":"2021-10-09T11:54:50.5712334Z"} -``` \ No newline at end of file +{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "SUCCESS", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }} +``` diff --git a/docs/zh-CN/user-guide/preheat/preheat.md b/docs/zh-CN/user-guide/preheat/preheat.md index 85ec40a8a..f7878679b 100644 --- a/docs/zh-CN/user-guide/preheat/preheat.md +++ b/docs/zh-CN/user-guide/preheat/preheat.md @@ -10,32 +10,34 @@ TODO 用户使用 api 进行预热。首先发送 POST 请求创建预热任务,具体 api 可以参考文档 [create preheat api document](../../api/api.md#create-preheat)。 -如果 `scheduler_cluster_id` 不存在,表示对所有 scheduler cluster 进行预热。 +如果 `scheduler_cluster_ids` 不存在,表示对所有 scheduler cluster 进行预热。 ```bash -curl --request POST 'http://dragonfly-manager:8080/api/v1/preheats' \ +curl --location --request POST 'http://dragonfly-manager:8080/api/v1/jobs' \ --header 'Content-Type: application/json' \ --data-raw '{ - "type": "image", - "url": "https://registry-1.docker.io/v2/library/busybox/manifests/latest", - "scheduler_cluster_id": 1 + "type": "preheat", + "args": { + "type": "image", + "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" + } }' ``` 命令行日志返回预热任务 ID。 ```bash -{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"PENDING","create_at":"2021-10-09T11:54:50.6182794Z"} +{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "PENDING", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }} ``` 使用预热任务 ID 轮训查询任务是否成功,具体 api 可以参考文档 [get preheat api document](../../api/api.md#get-preheat)。 ```bash -curl --request GET 'http://dragonfly-manager:8080/api/v1/preheats/group_28439e0b-d4c3-43bf-945e-482b54c49dc5' +curl --request GET 'http://dragonfly-manager:8080/api/v1/jobs/1' ``` 如果返回预热任务状态为 `SUCCESS`,表示预热成功。 ```bash -{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"SUCCESS","create_at":"2021-10-09T11:54:50.5712334Z"} +{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "SUCCESS", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }} ``` diff --git a/internal/dflog/logger.go b/internal/dflog/logger.go index 2ef940ed8..3458132d0 100644 --- a/internal/dflog/logger.go +++ b/internal/dflog/logger.go @@ -47,6 +47,7 @@ func init() { SetStatPeerLogger(log) SetStatSeedLogger(log) SetDownloadLogger(log) + SetJobLogger(sugar) } } diff --git a/manager/database/database.go b/manager/database/database.go index 4d1ee37fc..f409af27f 100644 --- a/manager/database/database.go +++ b/manager/database/database.go @@ -107,6 +107,7 @@ func newMyqsl(cfg *config.MysqlConfig) (*gorm.DB, error) { func migrate(db *gorm.DB) error { return db.Set("gorm:table_options", "DEFAULT CHARSET=utf8mb4 ROW_FORMAT=Dynamic").AutoMigrate( + &model.Job{}, &model.CDNCluster{}, &model.CDN{}, &model.SchedulerCluster{}, diff --git a/manager/handlers/job.go b/manager/handlers/job.go new file mode 100644 index 000000000..a0ba86c59 --- /dev/null +++ b/manager/handlers/job.go @@ -0,0 +1,171 @@ +package handlers + +import ( + "net/http" + + "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/types" + "github.com/gin-gonic/gin" + "github.com/gin-gonic/gin/binding" +) + +// @Summary Create Job +// @Description create by json config +// @Tags Job +// @Accept json +// @Produce json +// @Param Job body types.CreateJobRequest true "Job" +// @Success 200 {object} model.Job +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /jobs [post] +func (h *Handlers) CreateJob(ctx *gin.Context) { + var json types.CreateJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + switch json.Type { + case job.PreheatJob: + var json types.CreatePreheatJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreatePreheatJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) + return + } + + ctx.JSON(http.StatusOK, job) + default: + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"}) + } +} + +// @Summary Destroy Job +// @Description Destroy by id +// @Tags Job +// @Accept json +// @Produce json +// @Param id path string true "id" +// @Success 200 +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /jobs/{id} [delete] +func (h *Handlers) DestroyJob(ctx *gin.Context) { + var params types.JobParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + if err := h.service.DestroyJob(ctx.Request.Context(), params.ID); err != nil { + ctx.Error(err) + return + } + + ctx.Status(http.StatusOK) +} + +// @Summary Update Job +// @Description Update by json config +// @Tags Job +// @Accept json +// @Produce json +// @Param id path string true "id" +// @Param Job body types.UpdateJobRequest true "Job" +// @Success 200 {object} model.Job +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /jobs/{id} [patch] +func (h *Handlers) UpdateJob(ctx *gin.Context) { + var params types.JobParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.Error(err) + return + } + + var json types.UpdateJobRequest + if err := ctx.ShouldBindJSON(&json); err != nil { + ctx.Error(err) + return + } + + job, err := h.service.UpdateJob(ctx.Request.Context(), params.ID, json) + if err != nil { + ctx.Error(err) + return + } + + ctx.JSON(http.StatusOK, job) +} + +// @Summary Get Job +// @Description Get Job by id +// @Tags Job +// @Accept json +// @Produce json +// @Param id path string true "id" +// @Success 200 {object} model.Job +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /jobs/{id} [get] +func (h *Handlers) GetJob(ctx *gin.Context) { + var params types.JobParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.GetJob(ctx.Request.Context(), params.ID) + if err != nil { + ctx.Error(err) + return + } + + ctx.JSON(http.StatusOK, job) +} + +// @Summary Get Jobs +// @Description Get Jobs +// @Tags Job +// @Accept json +// @Produce json +// @Param page query int true "current page" default(0) +// @Param per_page query int true "return max item count, default 10, max 50" default(10) minimum(2) maximum(50) +// @Success 200 {object} []model.Job +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /jobs [get] +func (h *Handlers) GetJobs(ctx *gin.Context) { + var query types.GetJobsQuery + if err := ctx.ShouldBindQuery(&query); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + h.setPaginationDefault(&query.Page, &query.PerPage) + jobs, err := h.service.GetJobs(ctx.Request.Context(), query) + if err != nil { + ctx.Error(err) + return + } + + totalCount, err := h.service.JobTotalCount(ctx.Request.Context(), query) + if err != nil { + ctx.Error(err) + return + } + + h.setPaginationLinkHeader(ctx, query.Page, query.PerPage, int(totalCount)) + ctx.JSON(http.StatusOK, jobs) +} diff --git a/manager/handlers/preheat.go b/manager/handlers/preheat.go index 8353a5dff..36ace9e58 100644 --- a/manager/handlers/preheat.go +++ b/manager/handlers/preheat.go @@ -23,60 +23,6 @@ import ( "github.com/gin-gonic/gin" ) -// @Summary Create Preheat -// @Description create by json config -// @Tags Preheat -// @Accept json -// @Produce json -// @Param CDN body types.CreatePreheatRequest true "Preheat" -// @Success 200 {object} types.Preheat -// @Failure 400 -// @Failure 404 -// @Failure 500 -// @Router /preheats [post] -func (h *Handlers) CreatePreheat(ctx *gin.Context) { - var json types.CreatePreheatRequest - if err := ctx.ShouldBindJSON(&json); err != nil { - ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) - return - } - - preheat, err := h.service.CreatePreheat(ctx.Request.Context(), json) - if err != nil { - ctx.Error(err) - return - } - - ctx.JSON(http.StatusOK, preheat) -} - -// @Summary Get Preheat -// @Description Get Preheat by id -// @Tags Preheat -// @Accept json -// @Produce json -// @Param id path string true "id" -// @Success 200 {object} types.Preheat -// @Failure 400 -// @Failure 404 -// @Failure 500 -// @Router /preheats/{id} [get] -func (h *Handlers) GetPreheat(ctx *gin.Context) { - var params types.PreheatParams - if err := ctx.ShouldBindUri(¶ms); err != nil { - ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) - return - } - - preheat, err := h.service.GetPreheat(ctx.Request.Context(), params.ID) - if err != nil { - ctx.Error(err) - return - } - - ctx.JSON(http.StatusOK, preheat) -} - // @Summary Create V1 Preheat // @Description create by json config // @Tags Preheat @@ -116,7 +62,7 @@ func (h *Handlers) CreateV1Preheat(ctx *gin.Context) { // @Failure 500 // @Router /preheats/{id} [get] func (h *Handlers) GetV1Preheat(ctx *gin.Context) { - var params types.PreheatParams + var params types.V1PreheatParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return diff --git a/manager/job/job.go b/manager/job/job.go index 9837c8a9d..6f90ac0bd 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -22,6 +22,7 @@ import ( ) type Job struct { + *internaljob.Job Preheat } @@ -43,6 +44,20 @@ func New(cfg *config.Config) (*Job, error) { } return &Job{ + Job: j, Preheat: p, }, nil } + +func (j *Job) GetGroupJobState(id string) (*internaljob.GroupJobState, error) { + groupJobState, err := j.Job.GetGroupJobState(id) + if err != nil { + return nil, err + } + + return &internaljob.GroupJobState{ + GroupUUID: groupJobState.GroupUUID, + State: groupJobState.State, + CreatedAt: groupJobState.CreatedAt, + }, nil +} diff --git a/manager/job/preheat.go b/manager/job/preheat.go index 5935bc8f5..7d3a92f46 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -52,8 +52,7 @@ const ( var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)") type Preheat interface { - CreatePreheat(context.Context, []model.Scheduler, types.CreatePreheatRequest) (*types.Preheat, error) - GetPreheat(context.Context, string) (*types.Preheat, error) + CreatePreheat(context.Context, []model.Scheduler, types.PreheatArgs) (*internaljob.GroupJobState, error) } type preheat struct { @@ -75,20 +74,7 @@ func newPreheat(job *internaljob.Job, bizTag string) (Preheat, error) { }, nil } -func (p *preheat) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) { - groupJobState, err := p.job.GetGroupJobState(id) - if err != nil { - return nil, err - } - - return &types.Preheat{ - ID: groupJobState.GroupUUID, - Status: groupJobState.State, - CreatedAt: groupJobState.CreatedAt, - }, nil -} - -func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Scheduler, json types.CreatePreheatRequest) (*types.Preheat, error) { +func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Scheduler, json types.PreheatArgs) (*internaljob.GroupJobState, error) { var span trace.Span ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer)) span.SetAttributes(config.AttributePreheatType.String(json.Type)) @@ -136,7 +122,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule return p.createGroupJob(ctx, files, queues) } -func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*types.Preheat, error) { +func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { signatures := []*machineryv1tasks.Signature{} var urls []string for i := range files { @@ -169,9 +155,9 @@ func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.Prehe } logger.Infof("create preheat group job successed, group uuid: %s, urls:%s", group.GroupUUID, urls) - return &types.Preheat{ - ID: group.GroupUUID, - Status: machineryv1tasks.StatePending, + return &internaljob.GroupJobState{ + GroupUUID: group.GroupUUID, + State: machineryv1tasks.StatePending, CreatedAt: time.Now(), }, nil } diff --git a/manager/model/cdn_cluster.go b/manager/model/cdn_cluster.go index 9081b94f2..ebc516ca1 100644 --- a/manager/model/cdn_cluster.go +++ b/manager/model/cdn_cluster.go @@ -21,9 +21,10 @@ type CDNCluster struct { Name string `gorm:"column:name;type:varchar(256);index:uk_cdn_cluster_name,unique;not null;comment:name" json:"name"` BIO string `gorm:"column:bio;type:varchar(1024);comment:biography" json:"bio"` Config JSONMap `gorm:"column:config;not null;comment:configuration" json:"config"` - SchedulerClusters []SchedulerCluster `gorm:"many2many:cdn_cluster_scheduler_cluster;" json:"-"` + SchedulerClusters []SchedulerCluster `gorm:"many2many:cdn_cluster_scheduler_cluster;" json:"scheduler_clusters"` IsDefault bool `gorm:"column:is_default;not null;default:false;comment:default cdn cluster" json:"is_default"` CDNs []CDN `json:"-"` SecurityGroupID uint `gorm:"comment:security group id" json:"security_group_id"` SecurityGroup SecurityGroup `json:"-"` + Jobs []Job `gorm:"many2many:job_cdn_cluster;" json:"jobs"` } diff --git a/manager/model/job.go b/manager/model/job.go new file mode 100644 index 000000000..663fd8fb2 --- /dev/null +++ b/manager/model/job.go @@ -0,0 +1,31 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package model + +type Job struct { + Model + TaskID string `gorm:"column:task_id;type:varchar(256);not null;comment:task id" json:"task_id"` + BIO string `gorm:"column:bio;type:varchar(1024);comment:biography" json:"bio"` + Type string `gorm:"column:type;type:varchar(256);comment:type" json:"type"` + Status string `gorm:"column:status;type:varchar(256);not null;default:'PENDING';comment:service status" json:"status"` + Args JSONMap `gorm:"column:args;not null;comment:task request args" json:"args"` + Result JSONMap `gorm:"column:result;comment:task result" json:"result"` + UserID uint `gorm:"column:user_id;comment:user id" json:"user_id"` + User User `json:"-"` + CDNClusters []CDNCluster `gorm:"many2many:job_cdn_cluster;" json:"cdn_clusters"` + SchedulerClusters []SchedulerCluster `gorm:"many2many:job_scheduler_cluster;" json:"scheduler_clusters"` +} diff --git a/manager/model/scheduler_cluster.go b/manager/model/scheduler_cluster.go index a40ef725e..c0f7f5122 100644 --- a/manager/model/scheduler_cluster.go +++ b/manager/model/scheduler_cluster.go @@ -28,4 +28,5 @@ type SchedulerCluster struct { Schedulers []Scheduler `json:"-"` SecurityGroupID uint `gorm:"comment:security group id" json:"security_group_id"` SecurityGroup SecurityGroup `json:"-"` + Jobs []Job `gorm:"many2many:job_scheduler_cluster;" json:"jobs"` } diff --git a/manager/router/router.go b/manager/router/router.go index 87dd88707..abd3358f6 100644 --- a/manager/router/router.go +++ b/manager/router/router.go @@ -182,10 +182,13 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( config.GET(":id", h.GetConfig) config.GET("", h.GetConfigs) - // Preheat - ph := apiv1.Group("/preheats") - ph.POST("", h.CreatePreheat) - ph.GET(":id", h.GetPreheat) + // Job + job := apiv1.Group("/jobs") + job.POST("", h.CreateJob) + job.DELETE(":id", h.DestroyJob) + job.PATCH(":id", h.UpdateJob) + job.GET(":id", h.GetJob) + job.GET("", h.GetJobs) // Compatible with the V1 preheat. pv1 := r.Group("preheats") diff --git a/manager/service/job.go b/manager/service/job.go new file mode 100644 index 000000000..82cf67bf6 --- /dev/null +++ b/manager/service/job.go @@ -0,0 +1,242 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +import ( + "context" + "fmt" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/manager/model" + "d7y.io/dragonfly/v2/manager/types" + "d7y.io/dragonfly/v2/pkg/retry" + "d7y.io/dragonfly/v2/pkg/util/structutils" + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" +) + +func (s *rest) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*model.Job, error) { + var schedulers []model.Scheduler + var schedulerClusters []model.SchedulerCluster + + if len(json.SchedulerClusterIDs) != 0 { + for _, schedulerClusterID := range json.SchedulerClusterIDs { + schedulerCluster := model.SchedulerCluster{} + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { + return nil, err + } + schedulerClusters = append(schedulerClusters, schedulerCluster) + + scheduler := model.Scheduler{} + if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + Status: model.SchedulerStatusActive, + }).Error; err != nil { + return nil, err + } + schedulers = append(schedulers, scheduler) + } + } else { + if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { + return nil, err + } + + for _, schedulerCluster := range schedulerClusters { + scheduler := model.Scheduler{} + if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{ + SchedulerClusterID: schedulerCluster.ID, + Status: model.SchedulerStatusActive, + }).Error; err != nil { + continue + } + + schedulers = append(schedulers, scheduler) + } + } + + groupJobState, err := s.job.CreatePreheat(ctx, schedulers, json.Args) + if err != nil { + return nil, err + } + + args, err := structutils.StructToMap(json.Args) + if err != nil { + return nil, err + } + + job := model.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + Status: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: schedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + go s.pollingJob(context.Background(), job.ID, job.TaskID) + + return &job, nil +} + +func (s *rest) pollingJob(ctx context.Context, id uint, taskID string) { + var job model.Job + + retry.Run(ctx, func() (interface{}, bool, error) { + groupJob, err := s.job.GetGroupJobState(taskID) + if err != nil { + logger.Errorf("polling job %d and task %s failed: %v", id, taskID, err) + return nil, false, err + } + + if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{ + Status: groupJob.State, + }).Error; err != nil { + logger.Errorf("polling job %d and task %s store failed: %v", id, taskID, err) + return nil, true, err + } + + switch job.Status { + case machineryv1tasks.StateSuccess: + logger.Infof("polling job %d and task %s is finally successful", id, taskID) + return nil, true, nil + case machineryv1tasks.StateFailure: + logger.Errorf("polling job %d and task %s is finally failed", id, taskID) + return nil, true, nil + default: + return nil, false, fmt.Errorf("polling job %d and task %s status is %s", id, taskID, job.Status) + } + }, 5, 10, 120, nil) + + // Polling timeout + if job.Status != machineryv1tasks.StateSuccess && job.Status != machineryv1tasks.StateFailure { + job := model.Job{} + if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{ + Status: machineryv1tasks.StateFailure, + }).Error; err != nil { + logger.Errorf("polling job %d and task %s store failed: %v", id, taskID, err) + } + logger.Errorf("polling job %d and task %s timeout", id, taskID) + } +} + +func (s *rest) DestroyJob(ctx context.Context, id uint) error { + job := model.Job{} + if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil { + return err + } + + if err := s.db.WithContext(ctx).Unscoped().Delete(&model.Job{}, id).Error; err != nil { + return err + } + + return nil +} + +func (s *rest) UpdateJob(ctx context.Context, id uint, json types.UpdateJobRequest) (*model.Job, error) { + job := model.Job{} + if err := s.db.WithContext(ctx).Preload("CDNClusters").Preload("SchedulerClusters").First(&job, id).Updates(model.Job{ + BIO: json.BIO, + UserID: json.UserID, + }).Error; err != nil { + return nil, err + } + + return &job, nil +} + +func (s *rest) GetJob(ctx context.Context, id uint) (*model.Job, error) { + job := model.Job{} + if err := s.db.WithContext(ctx).Preload("CDNClusters").Preload("SchedulerClusters").First(&job, id).Error; err != nil { + return nil, err + } + + return &job, nil +} + +func (s *rest) GetJobs(ctx context.Context, q types.GetJobsQuery) (*[]model.Job, error) { + jobs := []model.Job{} + if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Job{ + Type: q.Type, + Status: q.Status, + UserID: q.UserID, + }).Find(&jobs).Error; err != nil { + return nil, err + } + + return &jobs, nil +} + +func (s *rest) JobTotalCount(ctx context.Context, q types.GetJobsQuery) (int64, error) { + var count int64 + if err := s.db.WithContext(ctx).Model(&model.Job{}).Where(&model.Job{ + Type: q.Type, + Status: q.Status, + UserID: q.UserID, + }).Count(&count).Error; err != nil { + return 0, err + } + + return count, nil +} + +func (s *rest) AddJobToSchedulerClusters(ctx context.Context, id, schedulerClusterIDs []uint) error { + job := model.Job{} + if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil { + return err + } + + var schedulerClusters []*model.SchedulerCluster + for _, schedulerClusterID := range schedulerClusterIDs { + schedulerCluster := model.SchedulerCluster{} + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { + return err + } + schedulerClusters = append(schedulerClusters, &schedulerCluster) + } + + if err := s.db.WithContext(ctx).Model(&job).Association("SchedulerClusters").Append(schedulerClusters); err != nil { + return err + } + + return nil +} + +func (s *rest) AddJobToCDNClusters(ctx context.Context, id, cdnClusterIDs []uint) error { + job := model.Job{} + if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil { + return err + } + + var cdnClusters []*model.CDNCluster + for _, cdnClusterID := range cdnClusterIDs { + cdnCluster := model.CDNCluster{} + if err := s.db.WithContext(ctx).First(&cdnCluster, cdnClusterID).Error; err != nil { + return err + } + cdnClusters = append(cdnClusters, &cdnCluster) + } + + if err := s.db.WithContext(ctx).Model(&job).Association("CDNClusters").Append(cdnClusters); err != nil { + return err + } + + return nil +} diff --git a/manager/service/preheat.go b/manager/service/preheat.go index 89e5e2f01..f8bcd0c8d 100644 --- a/manager/service/preheat.go +++ b/manager/service/preheat.go @@ -18,10 +18,14 @@ package service import ( "context" - "time" + "strconv" + logger "d7y.io/dragonfly/v2/internal/dflog" + internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" ) @@ -40,76 +44,41 @@ const ( V1PreheatingStatusFail = "FAIL" ) -func (s *rest) CreatePreheat(ctx context.Context, json types.CreatePreheatRequest) (*types.Preheat, error) { - if json.SchedulerClusterID != nil { - schedulerCluster := model.SchedulerCluster{} - if err := s.db.WithContext(ctx).First(&schedulerCluster, json.SchedulerClusterID).Error; err != nil { - return nil, err - } - - scheduler := model.Scheduler{} - if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{ - SchedulerClusterID: schedulerCluster.ID, - Status: model.SchedulerStatusActive, - }).Error; err != nil { - return nil, err - } - - return s.job.CreatePreheat(ctx, []model.Scheduler{scheduler}, json) - } - - schedulerClusters := []model.SchedulerCluster{} - if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { - return nil, err - } - - var schedulers []model.Scheduler - for _, schedulerCluster := range schedulerClusters { - scheduler := model.Scheduler{} - if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{ - SchedulerClusterID: schedulerCluster.ID, - Status: model.SchedulerStatusActive, - }).Error; err != nil { - continue - } - - schedulers = append(schedulers, scheduler) - } - - return s.job.CreatePreheat(ctx, schedulers, json) -} - -func (s *rest) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) { - return s.job.GetPreheat(ctx, id) -} - func (s *rest) CreateV1Preheat(ctx context.Context, json types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) { - p, err := s.CreatePreheat(ctx, types.CreatePreheatRequest{ - Type: json.Type, - URL: json.URL, - Filter: json.Filter, - Headers: json.Headers, + job, err := s.CreatePreheatJob(ctx, types.CreatePreheatJobRequest{ + Type: internaljob.PreheatJob, + Args: types.PreheatArgs{ + Type: json.Type, + URL: json.URL, + Filter: json.Filter, + Headers: json.Headers, + }, }) if err != nil { return nil, err } return &types.CreateV1PreheatResponse{ - ID: p.ID, + ID: strconv.FormatUint(uint64(job.ID), 10), }, nil } -func (s *rest) GetV1Preheat(ctx context.Context, id string) (*types.GetV1PreheatResponse, error) { - p, err := s.job.GetPreheat(ctx, id) +func (s *rest) GetV1Preheat(ctx context.Context, rawID string) (*types.GetV1PreheatResponse, error) { + id, err := strconv.ParseUint(rawID, 10, 32) if err != nil { - return nil, err + logger.Errorf("preheat convert error", err) + } + + job := model.Job{} + if err := s.db.WithContext(ctx).First(&job, uint(id)).Error; err != nil { + return nil, status.Error(codes.Unknown, err.Error()) } return &types.GetV1PreheatResponse{ - ID: p.ID, - Status: convertStatus(p.Status), - StartTime: p.CreatedAt.String(), - FinishTime: time.Now().String(), + ID: strconv.FormatUint(uint64(job.ID), 10), + Status: convertStatus(job.Status), + StartTime: job.CreatedAt.String(), + FinishTime: job.UpdatedAt.String(), }, nil } diff --git a/manager/service/service.go b/manager/service/service.go index 935163fb7..b075794cf 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -110,8 +110,13 @@ type REST interface { GetConfigs(context.Context, types.GetConfigsQuery) (*[]model.Config, error) ConfigTotalCount(context.Context, types.GetConfigsQuery) (int64, error) - CreatePreheat(context.Context, types.CreatePreheatRequest) (*types.Preheat, error) - GetPreheat(context.Context, string) (*types.Preheat, error) + CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*model.Job, error) + DestroyJob(context.Context, uint) error + UpdateJob(context.Context, uint, types.UpdateJobRequest) (*model.Job, error) + GetJob(context.Context, uint) (*model.Job, error) + GetJobs(context.Context, types.GetJobsQuery) (*[]model.Job, error) + JobTotalCount(context.Context, types.GetJobsQuery) (int64, error) + CreateV1Preheat(context.Context, types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) GetV1Preheat(context.Context, string) (*types.GetV1PreheatResponse, error) } diff --git a/manager/types/job.go b/manager/types/job.go new file mode 100644 index 000000000..9b68432a9 --- /dev/null +++ b/manager/types/job.go @@ -0,0 +1,60 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package types + +type CreateJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args map[string]interface{} `json:"args" binding:"omitempty"` + Result map[string]interface{} `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + CDNClusterIDs []uint `json:"cdn_cluster_ids" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` +} + +type UpdateJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` +} + +type JobParams struct { + ID uint `uri:"id" binding:"required"` +} + +type GetJobsQuery struct { + Type string `form:"type" binding:"omitempty"` + Status string `form:"status" binding:"omitempty,oneof=PENDING RECEIVED STARTED RETRY SUCCESS FAILURE"` + UserID uint `form:"user_id" binding:"omitempty"` + Page int `form:"page" binding:"omitempty,gte=1"` + PerPage int `form:"per_page" binding:"omitempty,gte=1,lte=50"` +} + +type CreatePreheatJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args PreheatArgs `json:"args" binding:"omitempty"` + Result map[string]interface{} `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` +} + +type PreheatArgs struct { + Type string `json:"type" binding:"required,oneof=image file"` + URL string `json:"url" binding:"required"` + Filter string `json:"filter" binding:"omitempty"` + Headers map[string]string `json:"headers" binding:"omitempty"` +} diff --git a/manager/types/preheat.go b/manager/types/preheat.go index 6ccec7a1c..930de0c8a 100644 --- a/manager/types/preheat.go +++ b/manager/types/preheat.go @@ -16,26 +16,6 @@ package types -import "time" - -type PreheatParams struct { - ID string `uri:"id" binding:"required"` -} - -type CreatePreheatRequest struct { - SchedulerClusterID *uint `json:"scheduler_cluster_id" binding:"omitempty"` - Type string `json:"type" binding:"required,oneof=image file"` - URL string `json:"url" binding:"required"` - Filter string `json:"filter" binding:"omitempty"` - Headers map[string]string `json:"headers" binding:"omitempty"` -} - -type Preheat struct { - ID string `json:"id"` - Status string `json:"status"` - CreatedAt time.Time `json:"create_at"` -} - type CreateV1PreheatRequest struct { Type string `json:"type" binding:"required,oneof=image file"` URL string `json:"url" binding:"required"` @@ -44,12 +24,16 @@ type CreateV1PreheatRequest struct { } type CreateV1PreheatResponse struct { - ID string `json:"ID"` + ID string `json:"id"` } type GetV1PreheatResponse struct { - ID string `json:"ID"` + ID string `json:"id"` Status string `json:"status"` StartTime string `json:"startTime,omitempty"` FinishTime string `json:"finishTime,omitempty"` } + +type V1PreheatParams struct { + ID string `uri:"id" binding:"required"` +} diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 33edc442c..5a8c03c6f 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -68,7 +68,7 @@ func New(ctx context.Context, cfg *config.JobConfig, clusterID uint, hostname st logger.Errorf("create scheduler job queue error: %v", err) return nil, err } - logger.Infof("create global job queue: %v", schedulerJob) + logger.Infof("create scheduler job queue: %v", schedulerJob) localQueue, err := internaljob.GetSchedulerQueue(clusterID, hostname) if err != nil { diff --git a/test/e2e/manager/constants.go b/test/e2e/manager/constants.go index 292e9880f..fdd18a9b4 100644 --- a/test/e2e/manager/constants.go +++ b/test/e2e/manager/constants.go @@ -21,7 +21,7 @@ const ( managerService = "dragonfly-manager.dragonfly-system.svc" managerPort = "8080" - preheatPath = "api/v1/preheats" + preheatPath = "api/v1/jobs" managerTag = "d7y/manager" dragonflyNamespace = "dragonfly-system" diff --git a/test/e2e/manager/preheat.go b/test/e2e/manager/preheat.go index 95ea511d3..00d4e35e9 100644 --- a/test/e2e/manager/preheat.go +++ b/test/e2e/manager/preheat.go @@ -25,8 +25,11 @@ import ( "time" "d7y.io/dragonfly/v2/internal/idgen" + internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/rpc/base" + "d7y.io/dragonfly/v2/pkg/util/structutils" "d7y.io/dragonfly/v2/test/e2e/e2eutil" machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" . "github.com/onsi/ginkgo" //nolint @@ -53,17 +56,25 @@ var _ = Describe("Preheat with manager", func() { sha256sum1 := strings.Split(string(out), " ")[0] // preheat file - out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, - map[string]interface{}{"type": "file", "url": url}, + req, err := structutils.StructToMap(types.CreatePreheatJobRequest{ + Type: internaljob.PreheatJob, + Args: types.PreheatArgs{ + Type: "file", + URL: url, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req, fmt.Sprintf("http://%s:%s/%s", managerService, managerPort, preheatPath)).CombinedOutput() fmt.Println(string(out)) Expect(err).NotTo(HaveOccurred()) // wait for success - preheatJob := &types.Preheat{} - err = json.Unmarshal(out, preheatJob) + job := &model.Job{} + err = json.Unmarshal(out, job) Expect(err).NotTo(HaveOccurred()) - done := waitForDone(preheatJob, fsPod) + done := waitForDone(job, fsPod) Expect(done).Should(BeTrue()) // generate task_id, also the filename @@ -100,17 +111,25 @@ var _ = Describe("Preheat with manager", func() { fsPod := getFileServerExec() // preheat file - out, err := fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, - map[string]interface{}{"type": "image", "url": url}, + req, err := structutils.StructToMap(types.CreatePreheatJobRequest{ + Type: internaljob.PreheatJob, + Args: types.PreheatArgs{ + Type: "image", + URL: url, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + out, err := fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req, fmt.Sprintf("http://%s:%s/%s", managerService, managerPort, preheatPath)).CombinedOutput() fmt.Println(string(out)) Expect(err).NotTo(HaveOccurred()) // wait for success - preheatJob := &types.Preheat{} - err = json.Unmarshal(out, preheatJob) + job := &model.Job{} + err = json.Unmarshal(out, job) Expect(err).NotTo(HaveOccurred()) - done := waitForDone(preheatJob, fsPod) + done := waitForDone(job, fsPod) Expect(done).Should(BeTrue()) for i, cdnTaskID := range cdnTaskIDs { @@ -153,17 +172,25 @@ var _ = Describe("Preheat with manager", func() { fsPod := getFileServerExec() // use a curl to preheat the same file, git a id to wait for success - out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, - map[string]interface{}{"type": "file", "url": url}, + req, err := structutils.StructToMap(types.CreatePreheatJobRequest{ + Type: internaljob.PreheatJob, + Args: types.PreheatArgs{ + Type: "file", + URL: url, + }, + }) + + Expect(err).NotTo(HaveOccurred()) + out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req, fmt.Sprintf("http://%s:%s/%s", managerService, managerPort, preheatPath)).CombinedOutput() fmt.Println(string(out)) Expect(err).NotTo(HaveOccurred()) // wait for success - preheatJob := &types.Preheat{} - err = json.Unmarshal(out, preheatJob) + job := &model.Job{} + err = json.Unmarshal(out, job) Expect(err).NotTo(HaveOccurred()) - done := waitForDone(preheatJob, fsPod) + done := waitForDone(job, fsPod) Expect(done).Should(BeTrue()) // generate task id to find the file @@ -179,7 +206,7 @@ var _ = Describe("Preheat with manager", func() { }) }) -func waitForDone(preheat *types.Preheat, pod *e2eutil.PodExec) bool { +func waitForDone(preheat *model.Job, pod *e2eutil.PodExec) bool { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) @@ -190,7 +217,7 @@ func waitForDone(preheat *types.Preheat, pod *e2eutil.PodExec) bool { return false case <-ticker.C: out, err := pod.CurlCommand("", nil, nil, - fmt.Sprintf("http://%s:%s/%s/%s", managerService, managerPort, preheatPath, preheat.ID)).CombinedOutput() + fmt.Sprintf("http://%s:%s/%s/%d", managerService, managerPort, preheatPath, preheat.ID)).CombinedOutput() fmt.Println(string(out)) Expect(err).NotTo(HaveOccurred()) err = json.Unmarshal(out, preheat)