feat: add jobs api (#751)

* feat: add jobs api

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2021-10-26 20:33:09 +08:00
parent 65d24cdee5
commit 1dccea01ce
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
20 changed files with 644 additions and 197 deletions

View File

@ -2,32 +2,34 @@
Use preheat apis for preheating. First create a POST request for preheating, you can refer to [create preheat api document](../api-reference/api-reference.md#create-preheat)
If the `scheduler_cluster_id` does not exist, it means to preheat all scheduler clusters.
If the `scheduler_cluster_ids` does not exist, it means to preheat all scheduler clusters.
```bash
curl --request POST 'http://dragonfly-manager:8080/api/v1/preheats' \
curl --location --request POST 'http://dragonfly-manager:8080/api/v1/jobs' \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "image",
"url": "https://registry-1.docker.io/v2/library/busybox/manifests/latest",
"scheduler_cluster_id": 1
"type": "preheat",
"args": {
"type": "image",
"url": "https://registry-1.docker.io/v2/library/redis/manifests/latest"
}
}'
```
If the output of command above has content like
```bash
{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"PENDING","create_at":"2021-10-09T11:54:50.6182794Z"}
{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "PENDING", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }}
```
Polling the preheating status with id. if status is `SUCCESS`, preheating is successful, you can refer to [get preheat api document](../api-reference/api-reference.md#get-preheat)
```bash
curl --request GET 'http://dragonfly-manager:8080/api/v1/preheats/group_28439e0b-d4c3-43bf-945e-482b54c49dc5'
curl --request GET 'http://dragonfly-manager:8080/api/v1/jobs/1'
```
If the status is `SUCCESS`, the preheating is successful.
```bash
{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"SUCCESS","create_at":"2021-10-09T11:54:50.5712334Z"}
```
{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "SUCCESS", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }}
```

View File

@ -10,32 +10,34 @@ TODO
用户使用 api 进行预热。首先发送 POST 请求创建预热任务,具体 api 可以参考文档 [create preheat api document](../../api/api.md#create-preheat)。
如果 `scheduler_cluster_id` 不存在,表示对所有 scheduler cluster 进行预热。
如果 `scheduler_cluster_ids` 不存在,表示对所有 scheduler cluster 进行预热。
```bash
curl --request POST 'http://dragonfly-manager:8080/api/v1/preheats' \
curl --location --request POST 'http://dragonfly-manager:8080/api/v1/jobs' \
--header 'Content-Type: application/json' \
--data-raw '{
"type": "image",
"url": "https://registry-1.docker.io/v2/library/busybox/manifests/latest",
"scheduler_cluster_id": 1
"type": "preheat",
"args": {
"type": "image",
"url": "https://registry-1.docker.io/v2/library/redis/manifests/latest"
}
}'
```
命令行日志返回预热任务 ID。
```bash
{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"PENDING","create_at":"2021-10-09T11:54:50.6182794Z"}
{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "PENDING", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }}
```
使用预热任务 ID 轮训查询任务是否成功,具体 api 可以参考文档 [get preheat api document](../../api/api.md#get-preheat)。
```bash
curl --request GET 'http://dragonfly-manager:8080/api/v1/preheats/group_28439e0b-d4c3-43bf-945e-482b54c49dc5'
curl --request GET 'http://dragonfly-manager:8080/api/v1/jobs/1'
```
如果返回预热任务状态为 `SUCCESS`,表示预热成功。
```bash
{"id":"group_28439e0b-d4c3-43bf-945e-482b54c49dc5","status":"SUCCESS","create_at":"2021-10-09T11:54:50.5712334Z"}
{ "id": 1 "task_id": "group_4d1ea00e-740f-4dbf-a47e-dbdc08eb33e1", "type": "preheat", "status": "SUCCESS", "args": { "filter": "", "headers": null, "type": "image", "url": "https://registry-1.docker.io/v2/library/redis/manifests/latest" }}
```

View File

@ -47,6 +47,7 @@ func init() {
SetStatPeerLogger(log)
SetStatSeedLogger(log)
SetDownloadLogger(log)
SetJobLogger(sugar)
}
}

View File

@ -107,6 +107,7 @@ func newMyqsl(cfg *config.MysqlConfig) (*gorm.DB, error) {
func migrate(db *gorm.DB) error {
return db.Set("gorm:table_options", "DEFAULT CHARSET=utf8mb4 ROW_FORMAT=Dynamic").AutoMigrate(
&model.Job{},
&model.CDNCluster{},
&model.CDN{},
&model.SchedulerCluster{},

171
manager/handlers/job.go Normal file
View File

@ -0,0 +1,171 @@
package handlers
import (
"net/http"
"d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/types"
"github.com/gin-gonic/gin"
"github.com/gin-gonic/gin/binding"
)
// @Summary Create Job
// @Description create by json config
// @Tags Job
// @Accept json
// @Produce json
// @Param Job body types.CreateJobRequest true "Job"
// @Success 200 {object} model.Job
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /jobs [post]
func (h *Handlers) CreateJob(ctx *gin.Context) {
var json types.CreateJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
switch json.Type {
case job.PreheatJob:
var json types.CreatePreheatJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
job, err := h.service.CreatePreheatJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
}
ctx.JSON(http.StatusOK, job)
default:
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"})
}
}
// @Summary Destroy Job
// @Description Destroy by id
// @Tags Job
// @Accept json
// @Produce json
// @Param id path string true "id"
// @Success 200
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /jobs/{id} [delete]
func (h *Handlers) DestroyJob(ctx *gin.Context) {
var params types.JobParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
if err := h.service.DestroyJob(ctx.Request.Context(), params.ID); err != nil {
ctx.Error(err)
return
}
ctx.Status(http.StatusOK)
}
// @Summary Update Job
// @Description Update by json config
// @Tags Job
// @Accept json
// @Produce json
// @Param id path string true "id"
// @Param Job body types.UpdateJobRequest true "Job"
// @Success 200 {object} model.Job
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /jobs/{id} [patch]
func (h *Handlers) UpdateJob(ctx *gin.Context) {
var params types.JobParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.Error(err)
return
}
var json types.UpdateJobRequest
if err := ctx.ShouldBindJSON(&json); err != nil {
ctx.Error(err)
return
}
job, err := h.service.UpdateJob(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
}
ctx.JSON(http.StatusOK, job)
}
// @Summary Get Job
// @Description Get Job by id
// @Tags Job
// @Accept json
// @Produce json
// @Param id path string true "id"
// @Success 200 {object} model.Job
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /jobs/{id} [get]
func (h *Handlers) GetJob(ctx *gin.Context) {
var params types.JobParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
job, err := h.service.GetJob(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
}
ctx.JSON(http.StatusOK, job)
}
// @Summary Get Jobs
// @Description Get Jobs
// @Tags Job
// @Accept json
// @Produce json
// @Param page query int true "current page" default(0)
// @Param per_page query int true "return max item count, default 10, max 50" default(10) minimum(2) maximum(50)
// @Success 200 {object} []model.Job
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /jobs [get]
func (h *Handlers) GetJobs(ctx *gin.Context) {
var query types.GetJobsQuery
if err := ctx.ShouldBindQuery(&query); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
h.setPaginationDefault(&query.Page, &query.PerPage)
jobs, err := h.service.GetJobs(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
totalCount, err := h.service.JobTotalCount(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
h.setPaginationLinkHeader(ctx, query.Page, query.PerPage, int(totalCount))
ctx.JSON(http.StatusOK, jobs)
}

View File

@ -23,60 +23,6 @@ import (
"github.com/gin-gonic/gin"
)
// @Summary Create Preheat
// @Description create by json config
// @Tags Preheat
// @Accept json
// @Produce json
// @Param CDN body types.CreatePreheatRequest true "Preheat"
// @Success 200 {object} types.Preheat
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /preheats [post]
func (h *Handlers) CreatePreheat(ctx *gin.Context) {
var json types.CreatePreheatRequest
if err := ctx.ShouldBindJSON(&json); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
preheat, err := h.service.CreatePreheat(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
}
ctx.JSON(http.StatusOK, preheat)
}
// @Summary Get Preheat
// @Description Get Preheat by id
// @Tags Preheat
// @Accept json
// @Produce json
// @Param id path string true "id"
// @Success 200 {object} types.Preheat
// @Failure 400
// @Failure 404
// @Failure 500
// @Router /preheats/{id} [get]
func (h *Handlers) GetPreheat(ctx *gin.Context) {
var params types.PreheatParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
preheat, err := h.service.GetPreheat(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
}
ctx.JSON(http.StatusOK, preheat)
}
// @Summary Create V1 Preheat
// @Description create by json config
// @Tags Preheat
@ -116,7 +62,7 @@ func (h *Handlers) CreateV1Preheat(ctx *gin.Context) {
// @Failure 500
// @Router /preheats/{id} [get]
func (h *Handlers) GetV1Preheat(ctx *gin.Context) {
var params types.PreheatParams
var params types.V1PreheatParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return

View File

@ -22,6 +22,7 @@ import (
)
type Job struct {
*internaljob.Job
Preheat
}
@ -43,6 +44,20 @@ func New(cfg *config.Config) (*Job, error) {
}
return &Job{
Job: j,
Preheat: p,
}, nil
}
func (j *Job) GetGroupJobState(id string) (*internaljob.GroupJobState, error) {
groupJobState, err := j.Job.GetGroupJobState(id)
if err != nil {
return nil, err
}
return &internaljob.GroupJobState{
GroupUUID: groupJobState.GroupUUID,
State: groupJobState.State,
CreatedAt: groupJobState.CreatedAt,
}, nil
}

View File

@ -52,8 +52,7 @@ const (
var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")
type Preheat interface {
CreatePreheat(context.Context, []model.Scheduler, types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(context.Context, string) (*types.Preheat, error)
CreatePreheat(context.Context, []model.Scheduler, types.PreheatArgs) (*internaljob.GroupJobState, error)
}
type preheat struct {
@ -75,20 +74,7 @@ func newPreheat(job *internaljob.Job, bizTag string) (Preheat, error) {
}, nil
}
func (p *preheat) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) {
groupJobState, err := p.job.GetGroupJobState(id)
if err != nil {
return nil, err
}
return &types.Preheat{
ID: groupJobState.GroupUUID,
Status: groupJobState.State,
CreatedAt: groupJobState.CreatedAt,
}, nil
}
func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Scheduler, json types.CreatePreheatRequest) (*types.Preheat, error) {
func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Scheduler, json types.PreheatArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributePreheatType.String(json.Type))
@ -136,7 +122,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Schedule
return p.createGroupJob(ctx, files, queues)
}
func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*types.Preheat, error) {
func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
signatures := []*machineryv1tasks.Signature{}
var urls []string
for i := range files {
@ -169,9 +155,9 @@ func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.Prehe
}
logger.Infof("create preheat group job successed, group uuid: %s urls:%s", group.GroupUUID, urls)
return &types.Preheat{
ID: group.GroupUUID,
Status: machineryv1tasks.StatePending,
return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: time.Now(),
}, nil
}

View File

@ -21,9 +21,10 @@ type CDNCluster struct {
Name string `gorm:"column:name;type:varchar(256);index:uk_cdn_cluster_name,unique;not null;comment:name" json:"name"`
BIO string `gorm:"column:bio;type:varchar(1024);comment:biography" json:"bio"`
Config JSONMap `gorm:"column:config;not null;comment:configuration" json:"config"`
SchedulerClusters []SchedulerCluster `gorm:"many2many:cdn_cluster_scheduler_cluster;" json:"-"`
SchedulerClusters []SchedulerCluster `gorm:"many2many:cdn_cluster_scheduler_cluster;" json:"scheduler_clusters"`
IsDefault bool `gorm:"column:is_default;not null;default:false;comment:default cdn cluster" json:"is_default"`
CDNs []CDN `json:"-"`
SecurityGroupID uint `gorm:"comment:security group id" json:"security_group_id"`
SecurityGroup SecurityGroup `json:"-"`
Jobs []Job `gorm:"many2many:job_cdn_cluster;" json:"jobs"`
}

31
manager/model/job.go Normal file
View File

@ -0,0 +1,31 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package model
type Job struct {
Model
TaskID string `gorm:"column:task_id;type:varchar(256);not null;comment:task id" json:"task_id"`
BIO string `gorm:"column:bio;type:varchar(1024);comment:biography" json:"bio"`
Type string `gorm:"column:type;type:varchar(256);comment:type" json:"type"`
Status string `gorm:"column:status;type:varchar(256);not null;default:'PENDING';comment:service status" json:"status"`
Args JSONMap `gorm:"column:args;not null;comment:task request args" json:"args"`
Result JSONMap `gorm:"column:result;comment:task result" json:"result"`
UserID uint `gorm:"column:user_id;comment:user id" json:"user_id"`
User User `json:"-"`
CDNClusters []CDNCluster `gorm:"many2many:job_cdn_cluster;" json:"cdn_clusters"`
SchedulerClusters []SchedulerCluster `gorm:"many2many:job_scheduler_cluster;" json:"scheduler_clusters"`
}

View File

@ -28,4 +28,5 @@ type SchedulerCluster struct {
Schedulers []Scheduler `json:"-"`
SecurityGroupID uint `gorm:"comment:security group id" json:"security_group_id"`
SecurityGroup SecurityGroup `json:"-"`
Jobs []Job `gorm:"many2many:job_scheduler_cluster;" json:"jobs"`
}

View File

@ -182,10 +182,13 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
config.GET(":id", h.GetConfig)
config.GET("", h.GetConfigs)
// Preheat
ph := apiv1.Group("/preheats")
ph.POST("", h.CreatePreheat)
ph.GET(":id", h.GetPreheat)
// Job
job := apiv1.Group("/jobs")
job.POST("", h.CreateJob)
job.DELETE(":id", h.DestroyJob)
job.PATCH(":id", h.UpdateJob)
job.GET(":id", h.GetJob)
job.GET("", h.GetJobs)
// Compatible with the V1 preheat.
pv1 := r.Group("preheats")

242
manager/service/job.go Normal file
View File

@ -0,0 +1,242 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package service
import (
"context"
"fmt"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/retry"
"d7y.io/dragonfly/v2/pkg/util/structutils"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
)
func (s *rest) CreatePreheatJob(ctx context.Context, json types.CreatePreheatJobRequest) (*model.Job, error) {
var schedulers []model.Scheduler
var schedulerClusters []model.SchedulerCluster
if len(json.SchedulerClusterIDs) != 0 {
for _, schedulerClusterID := range json.SchedulerClusterIDs {
schedulerCluster := model.SchedulerCluster{}
if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {
return nil, err
}
schedulerClusters = append(schedulerClusters, schedulerCluster)
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{
SchedulerClusterID: schedulerCluster.ID,
Status: model.SchedulerStatusActive,
}).Error; err != nil {
return nil, err
}
schedulers = append(schedulers, scheduler)
}
} else {
if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil {
return nil, err
}
for _, schedulerCluster := range schedulerClusters {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{
SchedulerClusterID: schedulerCluster.ID,
Status: model.SchedulerStatusActive,
}).Error; err != nil {
continue
}
schedulers = append(schedulers, scheduler)
}
}
groupJobState, err := s.job.CreatePreheat(ctx, schedulers, json.Args)
if err != nil {
return nil, err
}
args, err := structutils.StructToMap(json.Args)
if err != nil {
return nil, err
}
job := model.Job{
TaskID: groupJobState.GroupUUID,
BIO: json.BIO,
Type: json.Type,
Status: groupJobState.State,
Args: args,
UserID: json.UserID,
SchedulerClusters: schedulerClusters,
}
if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {
return nil, err
}
go s.pollingJob(context.Background(), job.ID, job.TaskID)
return &job, nil
}
func (s *rest) pollingJob(ctx context.Context, id uint, taskID string) {
var job model.Job
retry.Run(ctx, func() (interface{}, bool, error) {
groupJob, err := s.job.GetGroupJobState(taskID)
if err != nil {
logger.Errorf("polling job %d and task %s failed: %v", id, taskID, err)
return nil, false, err
}
if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{
Status: groupJob.State,
}).Error; err != nil {
logger.Errorf("polling job %d and task %s store failed: %v", id, taskID, err)
return nil, true, err
}
switch job.Status {
case machineryv1tasks.StateSuccess:
logger.Infof("polling job %d and task %s is finally successful", id, taskID)
return nil, true, nil
case machineryv1tasks.StateFailure:
logger.Errorf("polling job %d and task %s is finally failed", id, taskID)
return nil, true, nil
default:
return nil, false, fmt.Errorf("polling job %d and task %s status is %s", id, taskID, job.Status)
}
}, 5, 10, 120, nil)
// Polling timeout
if job.Status != machineryv1tasks.StateSuccess && job.Status != machineryv1tasks.StateFailure {
job := model.Job{}
if err := s.db.WithContext(ctx).First(&job, id).Updates(model.Job{
Status: machineryv1tasks.StateFailure,
}).Error; err != nil {
logger.Errorf("polling job %d and task %s store failed: %v", id, taskID, err)
}
logger.Errorf("polling job %d and task %s timeout", id, taskID)
}
}
func (s *rest) DestroyJob(ctx context.Context, id uint) error {
job := model.Job{}
if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {
return err
}
if err := s.db.WithContext(ctx).Unscoped().Delete(&model.Job{}, id).Error; err != nil {
return err
}
return nil
}
func (s *rest) UpdateJob(ctx context.Context, id uint, json types.UpdateJobRequest) (*model.Job, error) {
job := model.Job{}
if err := s.db.WithContext(ctx).Preload("CDNClusters").Preload("SchedulerClusters").First(&job, id).Updates(model.Job{
BIO: json.BIO,
UserID: json.UserID,
}).Error; err != nil {
return nil, err
}
return &job, nil
}
func (s *rest) GetJob(ctx context.Context, id uint) (*model.Job, error) {
job := model.Job{}
if err := s.db.WithContext(ctx).Preload("CDNClusters").Preload("SchedulerClusters").First(&job, id).Error; err != nil {
return nil, err
}
return &job, nil
}
func (s *rest) GetJobs(ctx context.Context, q types.GetJobsQuery) (*[]model.Job, error) {
jobs := []model.Job{}
if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Job{
Type: q.Type,
Status: q.Status,
UserID: q.UserID,
}).Find(&jobs).Error; err != nil {
return nil, err
}
return &jobs, nil
}
func (s *rest) JobTotalCount(ctx context.Context, q types.GetJobsQuery) (int64, error) {
var count int64
if err := s.db.WithContext(ctx).Model(&model.Job{}).Where(&model.Job{
Type: q.Type,
Status: q.Status,
UserID: q.UserID,
}).Count(&count).Error; err != nil {
return 0, err
}
return count, nil
}
func (s *rest) AddJobToSchedulerClusters(ctx context.Context, id, schedulerClusterIDs []uint) error {
job := model.Job{}
if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {
return err
}
var schedulerClusters []*model.SchedulerCluster
for _, schedulerClusterID := range schedulerClusterIDs {
schedulerCluster := model.SchedulerCluster{}
if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {
return err
}
schedulerClusters = append(schedulerClusters, &schedulerCluster)
}
if err := s.db.WithContext(ctx).Model(&job).Association("SchedulerClusters").Append(schedulerClusters); err != nil {
return err
}
return nil
}
func (s *rest) AddJobToCDNClusters(ctx context.Context, id, cdnClusterIDs []uint) error {
job := model.Job{}
if err := s.db.WithContext(ctx).First(&job, id).Error; err != nil {
return err
}
var cdnClusters []*model.CDNCluster
for _, cdnClusterID := range cdnClusterIDs {
cdnCluster := model.CDNCluster{}
if err := s.db.WithContext(ctx).First(&cdnCluster, cdnClusterID).Error; err != nil {
return err
}
cdnClusters = append(cdnClusters, &cdnCluster)
}
if err := s.db.WithContext(ctx).Model(&job).Association("CDNClusters").Append(cdnClusters); err != nil {
return err
}
return nil
}

View File

@ -18,10 +18,14 @@ package service
import (
"context"
"time"
"strconv"
logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
)
@ -40,76 +44,41 @@ const (
V1PreheatingStatusFail = "FAIL"
)
func (s *rest) CreatePreheat(ctx context.Context, json types.CreatePreheatRequest) (*types.Preheat, error) {
if json.SchedulerClusterID != nil {
schedulerCluster := model.SchedulerCluster{}
if err := s.db.WithContext(ctx).First(&schedulerCluster, json.SchedulerClusterID).Error; err != nil {
return nil, err
}
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{
SchedulerClusterID: schedulerCluster.ID,
Status: model.SchedulerStatusActive,
}).Error; err != nil {
return nil, err
}
return s.job.CreatePreheat(ctx, []model.Scheduler{scheduler}, json)
}
schedulerClusters := []model.SchedulerCluster{}
if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil {
return nil, err
}
var schedulers []model.Scheduler
for _, schedulerCluster := range schedulerClusters {
scheduler := model.Scheduler{}
if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{
SchedulerClusterID: schedulerCluster.ID,
Status: model.SchedulerStatusActive,
}).Error; err != nil {
continue
}
schedulers = append(schedulers, scheduler)
}
return s.job.CreatePreheat(ctx, schedulers, json)
}
func (s *rest) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) {
return s.job.GetPreheat(ctx, id)
}
func (s *rest) CreateV1Preheat(ctx context.Context, json types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) {
p, err := s.CreatePreheat(ctx, types.CreatePreheatRequest{
Type: json.Type,
URL: json.URL,
Filter: json.Filter,
Headers: json.Headers,
job, err := s.CreatePreheatJob(ctx, types.CreatePreheatJobRequest{
Type: internaljob.PreheatJob,
Args: types.PreheatArgs{
Type: json.Type,
URL: json.URL,
Filter: json.Filter,
Headers: json.Headers,
},
})
if err != nil {
return nil, err
}
return &types.CreateV1PreheatResponse{
ID: p.ID,
ID: strconv.FormatUint(uint64(job.ID), 10),
}, nil
}
func (s *rest) GetV1Preheat(ctx context.Context, id string) (*types.GetV1PreheatResponse, error) {
p, err := s.job.GetPreheat(ctx, id)
func (s *rest) GetV1Preheat(ctx context.Context, rawID string) (*types.GetV1PreheatResponse, error) {
id, err := strconv.ParseUint(rawID, 10, 32)
if err != nil {
return nil, err
logger.Errorf("preheat convert error", err)
}
job := model.Job{}
if err := s.db.WithContext(ctx).First(&job, uint(id)).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
return &types.GetV1PreheatResponse{
ID: p.ID,
Status: convertStatus(p.Status),
StartTime: p.CreatedAt.String(),
FinishTime: time.Now().String(),
ID: strconv.FormatUint(uint64(job.ID), 10),
Status: convertStatus(job.Status),
StartTime: job.CreatedAt.String(),
FinishTime: job.UpdatedAt.String(),
}, nil
}

View File

@ -110,8 +110,13 @@ type REST interface {
GetConfigs(context.Context, types.GetConfigsQuery) (*[]model.Config, error)
ConfigTotalCount(context.Context, types.GetConfigsQuery) (int64, error)
CreatePreheat(context.Context, types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(context.Context, string) (*types.Preheat, error)
CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*model.Job, error)
DestroyJob(context.Context, uint) error
UpdateJob(context.Context, uint, types.UpdateJobRequest) (*model.Job, error)
GetJob(context.Context, uint) (*model.Job, error)
GetJobs(context.Context, types.GetJobsQuery) (*[]model.Job, error)
JobTotalCount(context.Context, types.GetJobsQuery) (int64, error)
CreateV1Preheat(context.Context, types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error)
GetV1Preheat(context.Context, string) (*types.GetV1PreheatResponse, error)
}

60
manager/types/job.go Normal file
View File

@ -0,0 +1,60 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package types
type CreateJobRequest struct {
BIO string `json:"bio" binding:"omitempty"`
Type string `json:"type" binding:"required"`
Args map[string]interface{} `json:"args" binding:"omitempty"`
Result map[string]interface{} `json:"result" binding:"omitempty"`
UserID uint `json:"user_id" binding:"omitempty"`
CDNClusterIDs []uint `json:"cdn_cluster_ids" binding:"omitempty"`
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
}
type UpdateJobRequest struct {
BIO string `json:"bio" binding:"omitempty"`
UserID uint `json:"user_id" binding:"omitempty"`
}
type JobParams struct {
ID uint `uri:"id" binding:"required"`
}
type GetJobsQuery struct {
Type string `form:"type" binding:"omitempty"`
Status string `form:"status" binding:"omitempty,oneof=PENDING RECEIVED STARTED RETRY SUCCESS FAILURE"`
UserID uint `form:"user_id" binding:"omitempty"`
Page int `form:"page" binding:"omitempty,gte=1"`
PerPage int `form:"per_page" binding:"omitempty,gte=1,lte=50"`
}
type CreatePreheatJobRequest struct {
BIO string `json:"bio" binding:"omitempty"`
Type string `json:"type" binding:"required"`
Args PreheatArgs `json:"args" binding:"omitempty"`
Result map[string]interface{} `json:"result" binding:"omitempty"`
UserID uint `json:"user_id" binding:"omitempty"`
SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"`
}
type PreheatArgs struct {
Type string `json:"type" binding:"required,oneof=image file"`
URL string `json:"url" binding:"required"`
Filter string `json:"filter" binding:"omitempty"`
Headers map[string]string `json:"headers" binding:"omitempty"`
}

View File

@ -16,26 +16,6 @@
package types
import "time"
type PreheatParams struct {
ID string `uri:"id" binding:"required"`
}
type CreatePreheatRequest struct {
SchedulerClusterID *uint `json:"scheduler_cluster_id" binding:"omitempty"`
Type string `json:"type" binding:"required,oneof=image file"`
URL string `json:"url" binding:"required"`
Filter string `json:"filter" binding:"omitempty"`
Headers map[string]string `json:"headers" binding:"omitempty"`
}
type Preheat struct {
ID string `json:"id"`
Status string `json:"status"`
CreatedAt time.Time `json:"create_at"`
}
type CreateV1PreheatRequest struct {
Type string `json:"type" binding:"required,oneof=image file"`
URL string `json:"url" binding:"required"`
@ -44,12 +24,16 @@ type CreateV1PreheatRequest struct {
}
type CreateV1PreheatResponse struct {
ID string `json:"ID"`
ID string `json:"id"`
}
type GetV1PreheatResponse struct {
ID string `json:"ID"`
ID string `json:"id"`
Status string `json:"status"`
StartTime string `json:"startTime,omitempty"`
FinishTime string `json:"finishTime,omitempty"`
}
type V1PreheatParams struct {
ID string `uri:"id" binding:"required"`
}

View File

@ -68,7 +68,7 @@ func New(ctx context.Context, cfg *config.JobConfig, clusterID uint, hostname st
logger.Errorf("create scheduler job queue error: %v", err)
return nil, err
}
logger.Infof("create global job queue: %v", schedulerJob)
logger.Infof("create scheduler job queue: %v", schedulerJob)
localQueue, err := internaljob.GetSchedulerQueue(clusterID, hostname)
if err != nil {

View File

@ -21,7 +21,7 @@ const (
managerService = "dragonfly-manager.dragonfly-system.svc"
managerPort = "8080"
preheatPath = "api/v1/preheats"
preheatPath = "api/v1/jobs"
managerTag = "d7y/manager"
dragonflyNamespace = "dragonfly-system"

View File

@ -25,8 +25,11 @@ import (
"time"
"d7y.io/dragonfly/v2/internal/idgen"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/util/structutils"
"d7y.io/dragonfly/v2/test/e2e/e2eutil"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
. "github.com/onsi/ginkgo" //nolint
@ -53,17 +56,25 @@ var _ = Describe("Preheat with manager", func() {
sha256sum1 := strings.Split(string(out), " ")[0]
// preheat file
out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"},
map[string]interface{}{"type": "file", "url": url},
req, err := structutils.StructToMap(types.CreatePreheatJobRequest{
Type: internaljob.PreheatJob,
Args: types.PreheatArgs{
Type: "file",
URL: url,
},
})
Expect(err).NotTo(HaveOccurred())
out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req,
fmt.Sprintf("http://%s:%s/%s", managerService, managerPort, preheatPath)).CombinedOutput()
fmt.Println(string(out))
Expect(err).NotTo(HaveOccurred())
// wait for success
preheatJob := &types.Preheat{}
err = json.Unmarshal(out, preheatJob)
job := &model.Job{}
err = json.Unmarshal(out, job)
Expect(err).NotTo(HaveOccurred())
done := waitForDone(preheatJob, fsPod)
done := waitForDone(job, fsPod)
Expect(done).Should(BeTrue())
// generate task_id, also the filename
@ -100,17 +111,25 @@ var _ = Describe("Preheat with manager", func() {
fsPod := getFileServerExec()
// preheat file
out, err := fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"},
map[string]interface{}{"type": "image", "url": url},
req, err := structutils.StructToMap(types.CreatePreheatJobRequest{
Type: internaljob.PreheatJob,
Args: types.PreheatArgs{
Type: "image",
URL: url,
},
})
Expect(err).NotTo(HaveOccurred())
out, err := fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req,
fmt.Sprintf("http://%s:%s/%s", managerService, managerPort, preheatPath)).CombinedOutput()
fmt.Println(string(out))
Expect(err).NotTo(HaveOccurred())
// wait for success
preheatJob := &types.Preheat{}
err = json.Unmarshal(out, preheatJob)
job := &model.Job{}
err = json.Unmarshal(out, job)
Expect(err).NotTo(HaveOccurred())
done := waitForDone(preheatJob, fsPod)
done := waitForDone(job, fsPod)
Expect(done).Should(BeTrue())
for i, cdnTaskID := range cdnTaskIDs {
@ -153,17 +172,25 @@ var _ = Describe("Preheat with manager", func() {
fsPod := getFileServerExec()
// use a curl to preheat the same file, git a id to wait for success
out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"},
map[string]interface{}{"type": "file", "url": url},
req, err := structutils.StructToMap(types.CreatePreheatJobRequest{
Type: internaljob.PreheatJob,
Args: types.PreheatArgs{
Type: "file",
URL: url,
},
})
Expect(err).NotTo(HaveOccurred())
out, err = fsPod.CurlCommand("POST", map[string]string{"Content-Type": "application/json"}, req,
fmt.Sprintf("http://%s:%s/%s", managerService, managerPort, preheatPath)).CombinedOutput()
fmt.Println(string(out))
Expect(err).NotTo(HaveOccurred())
// wait for success
preheatJob := &types.Preheat{}
err = json.Unmarshal(out, preheatJob)
job := &model.Job{}
err = json.Unmarshal(out, job)
Expect(err).NotTo(HaveOccurred())
done := waitForDone(preheatJob, fsPod)
done := waitForDone(job, fsPod)
Expect(done).Should(BeTrue())
// generate task id to find the file
@ -179,7 +206,7 @@ var _ = Describe("Preheat with manager", func() {
})
})
func waitForDone(preheat *types.Preheat, pod *e2eutil.PodExec) bool {
func waitForDone(preheat *model.Job, pod *e2eutil.PodExec) bool {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
@ -190,7 +217,7 @@ func waitForDone(preheat *types.Preheat, pod *e2eutil.PodExec) bool {
return false
case <-ticker.C:
out, err := pod.CurlCommand("", nil, nil,
fmt.Sprintf("http://%s:%s/%s/%s", managerService, managerPort, preheatPath, preheat.ID)).CombinedOutput()
fmt.Sprintf("http://%s:%s/%s/%d", managerService, managerPort, preheatPath, preheat.ID)).CombinedOutput()
fmt.Println(string(out))
Expect(err).NotTo(HaveOccurred())
err = json.Unmarshal(out, preheat)