feat: add preheat otel (#741)

* feat: add preheat otel

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2021-10-21 19:21:56 +08:00
parent 03774882fd
commit 7a8575eca8
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
29 changed files with 399 additions and 374 deletions

View File

@ -20,4 +20,12 @@ import "go.opentelemetry.io/otel/attribute"
const (
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
)
const (
SpanPreheat = "preheat"
SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry"
)

View File

@ -41,7 +41,7 @@ func (h *Handlers) CreateCDN(ctx *gin.Context) {
return
}
cdn, err := h.service.CreateCDN(json)
cdn, err := h.service.CreateCDN(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -68,7 +68,7 @@ func (h *Handlers) DestroyCDN(ctx *gin.Context) {
return
}
if err := h.service.DestroyCDN(params.ID); err != nil {
if err := h.service.DestroyCDN(ctx.Request.Context(), params.ID); err != nil {
ctx.Error(err)
return
}
@ -101,7 +101,7 @@ func (h *Handlers) UpdateCDN(ctx *gin.Context) {
return
}
cdn, err := h.service.UpdateCDN(params.ID, json)
cdn, err := h.service.UpdateCDN(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -128,7 +128,7 @@ func (h *Handlers) GetCDN(ctx *gin.Context) {
return
}
cdn, err := h.service.GetCDN(params.ID)
cdn, err := h.service.GetCDN(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -157,13 +157,13 @@ func (h *Handlers) GetCDNs(ctx *gin.Context) {
}
h.setPaginationDefault(&query.Page, &query.PerPage)
cdns, err := h.service.GetCDNs(query)
cdns, err := h.service.GetCDNs(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
totalCount, err := h.service.CDNTotalCount(query)
totalCount, err := h.service.CDNTotalCount(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return

View File

@ -42,7 +42,7 @@ func (h *Handlers) CreateCDNCluster(ctx *gin.Context) {
}
if json.SecurityGroupDomain != "" {
cdn, err := h.service.CreateCDNClusterWithSecurityGroupDomain(json)
cdn, err := h.service.CreateCDNClusterWithSecurityGroupDomain(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -52,7 +52,7 @@ func (h *Handlers) CreateCDNCluster(ctx *gin.Context) {
return
}
cdnCluster, err := h.service.CreateCDNCluster(json)
cdnCluster, err := h.service.CreateCDNCluster(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -79,7 +79,7 @@ func (h *Handlers) DestroyCDNCluster(ctx *gin.Context) {
return
}
if err := h.service.DestroyCDNCluster(params.ID); err != nil {
if err := h.service.DestroyCDNCluster(ctx.Request.Context(), params.ID); err != nil {
ctx.Error(err)
return
}
@ -113,7 +113,7 @@ func (h *Handlers) UpdateCDNCluster(ctx *gin.Context) {
}
if json.SecurityGroupDomain != "" {
cdn, err := h.service.UpdateCDNClusterWithSecurityGroupDomain(params.ID, json)
cdn, err := h.service.UpdateCDNClusterWithSecurityGroupDomain(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -123,7 +123,7 @@ func (h *Handlers) UpdateCDNCluster(ctx *gin.Context) {
return
}
cdnCluster, err := h.service.UpdateCDNCluster(params.ID, json)
cdnCluster, err := h.service.UpdateCDNCluster(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -150,7 +150,7 @@ func (h *Handlers) GetCDNCluster(ctx *gin.Context) {
return
}
cdnCluster, err := h.service.GetCDNCluster(params.ID)
cdnCluster, err := h.service.GetCDNCluster(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -179,13 +179,13 @@ func (h *Handlers) GetCDNClusters(ctx *gin.Context) {
}
h.setPaginationDefault(&query.Page, &query.PerPage)
cdns, err := h.service.GetCDNClusters(query)
cdns, err := h.service.GetCDNClusters(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
totalCount, err := h.service.CDNClusterTotalCount(query)
totalCount, err := h.service.CDNClusterTotalCount(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
@ -214,7 +214,7 @@ func (h *Handlers) AddCDNToCDNCluster(ctx *gin.Context) {
return
}
if err := h.service.AddCDNToCDNCluster(params.ID, params.CDNID); err != nil {
if err := h.service.AddCDNToCDNCluster(ctx.Request.Context(), params.ID, params.CDNID); err != nil {
ctx.Error(err)
return
}
@ -241,7 +241,7 @@ func (h *Handlers) AddSchedulerClusterToCDNCluster(ctx *gin.Context) {
return
}
if err := h.service.AddSchedulerClusterToCDNCluster(params.ID, params.SchedulerClusterID); err != nil {
if err := h.service.AddSchedulerClusterToCDNCluster(ctx.Request.Context(), params.ID, params.SchedulerClusterID); err != nil {
ctx.Error(err)
return
}

View File

@ -41,7 +41,7 @@ func (h *Handlers) CreateOauth(ctx *gin.Context) {
return
}
oauth, err := h.service.CreateOauth(json)
oauth, err := h.service.CreateOauth(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -68,7 +68,7 @@ func (h *Handlers) DestroyOauth(ctx *gin.Context) {
return
}
if err := h.service.DestroyOauth(params.ID); err != nil {
if err := h.service.DestroyOauth(ctx.Request.Context(), params.ID); err != nil {
ctx.Error(err)
return
}
@ -101,7 +101,7 @@ func (h *Handlers) UpdateOauth(ctx *gin.Context) {
return
}
oauth, err := h.service.UpdateOauth(params.ID, json)
oauth, err := h.service.UpdateOauth(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -128,7 +128,7 @@ func (h *Handlers) GetOauth(ctx *gin.Context) {
return
}
oauth, err := h.service.GetOauth(params.ID)
oauth, err := h.service.GetOauth(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -157,13 +157,13 @@ func (h *Handlers) GetOauths(ctx *gin.Context) {
}
h.setPaginationDefault(&query.Page, &query.PerPage)
oauth, err := h.service.GetOauths(query)
oauth, err := h.service.GetOauths(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
totalCount, err := h.service.OauthTotalCount(query)
totalCount, err := h.service.OauthTotalCount(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return

View File

@ -32,6 +32,6 @@ import (
// @Router /permissions [get]
func (h *Handlers) GetPermissions(g *gin.Engine) func(ctx *gin.Context) {
return func(ctx *gin.Context) {
ctx.JSON(http.StatusOK, h.service.GetPermissions(g))
ctx.JSON(http.StatusOK, h.service.GetPermissions(ctx.Request.Context(), g))
}
}

View File

@ -41,7 +41,7 @@ func (h *Handlers) CreatePreheat(ctx *gin.Context) {
return
}
preheat, err := h.service.CreatePreheat(json)
preheat, err := h.service.CreatePreheat(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -68,7 +68,7 @@ func (h *Handlers) GetPreheat(ctx *gin.Context) {
return
}
preheat, err := h.service.GetPreheat(params.ID)
preheat, err := h.service.GetPreheat(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -95,7 +95,7 @@ func (h *Handlers) CreateV1Preheat(ctx *gin.Context) {
return
}
preheat, err := h.service.CreateV1Preheat(json)
preheat, err := h.service.CreateV1Preheat(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -122,7 +122,7 @@ func (h *Handlers) GetV1Preheat(ctx *gin.Context) {
return
}
preheat, err := h.service.GetV1Preheat(params.ID)
preheat, err := h.service.GetV1Preheat(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return

View File

@ -40,7 +40,7 @@ func (h *Handlers) CreateRole(ctx *gin.Context) {
return
}
if err := h.service.CreateRole(json); err != nil {
if err := h.service.CreateRole(ctx.Request.Context(), json); err != nil {
ctx.Error(err)
return
}
@ -65,7 +65,7 @@ func (h *Handlers) DestroyRole(ctx *gin.Context) {
return
}
if ok, err := h.service.DestroyRole(params.Role); err != nil {
if ok, err := h.service.DestroyRole(ctx.Request.Context(), params.Role); err != nil {
ctx.Error(err)
return
} else if !ok {
@ -93,7 +93,7 @@ func (h *Handlers) GetRole(ctx *gin.Context) {
return
}
ctx.JSON(http.StatusOK, h.service.GetRole(params.Role))
ctx.JSON(http.StatusOK, h.service.GetRole(ctx.Request.Context(), params.Role))
}
// @Summary Get Roles
@ -106,7 +106,7 @@ func (h *Handlers) GetRole(ctx *gin.Context) {
// @Failure 500
// @Router /roles [get]
func (h *Handlers) GetRoles(ctx *gin.Context) {
roles := h.service.GetRoles()
roles := h.service.GetRoles(ctx.Request.Context())
ctx.JSON(http.StatusOK, roles)
}
@ -134,7 +134,7 @@ func (h *Handlers) AddPermissionForRole(ctx *gin.Context) {
return
}
if ok, err := h.service.AddPermissionForRole(params.Role, json); err != nil {
if ok, err := h.service.AddPermissionForRole(ctx.Request.Context(), params.Role, json); err != nil {
ctx.Error(err)
return
} else if !ok {
@ -169,7 +169,7 @@ func (h *Handlers) DeletePermissionForRole(ctx *gin.Context) {
return
}
if ok, err := h.service.DeletePermissionForRole(params.Role, json); err != nil {
if ok, err := h.service.DeletePermissionForRole(ctx.Request.Context(), params.Role, json); err != nil {
ctx.Error(err)
return
} else if !ok {

View File

@ -41,7 +41,7 @@ func (h *Handlers) CreateScheduler(ctx *gin.Context) {
return
}
scheduler, err := h.service.CreateScheduler(json)
scheduler, err := h.service.CreateScheduler(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -68,7 +68,7 @@ func (h *Handlers) DestroyScheduler(ctx *gin.Context) {
return
}
if err := h.service.DestroyScheduler(params.ID); err != nil {
if err := h.service.DestroyScheduler(ctx.Request.Context(), params.ID); err != nil {
ctx.Error(err)
return
}
@ -101,7 +101,7 @@ func (h *Handlers) UpdateScheduler(ctx *gin.Context) {
return
}
scheduler, err := h.service.UpdateScheduler(params.ID, json)
scheduler, err := h.service.UpdateScheduler(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -128,7 +128,7 @@ func (h *Handlers) GetScheduler(ctx *gin.Context) {
return
}
scheduler, err := h.service.GetScheduler(params.ID)
scheduler, err := h.service.GetScheduler(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -157,13 +157,13 @@ func (h *Handlers) GetSchedulers(ctx *gin.Context) {
}
h.setPaginationDefault(&query.Page, &query.PerPage)
schedulers, err := h.service.GetSchedulers(query)
schedulers, err := h.service.GetSchedulers(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
totalCount, err := h.service.SchedulerTotalCount(query)
totalCount, err := h.service.SchedulerTotalCount(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return

View File

@ -42,7 +42,7 @@ func (h *Handlers) CreateSchedulerCluster(ctx *gin.Context) {
}
if json.SecurityGroupDomain != "" {
scheduler, err := h.service.CreateSchedulerClusterWithSecurityGroupDomain(json)
scheduler, err := h.service.CreateSchedulerClusterWithSecurityGroupDomain(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -52,7 +52,7 @@ func (h *Handlers) CreateSchedulerCluster(ctx *gin.Context) {
return
}
schedulerCluster, err := h.service.CreateSchedulerCluster(json)
schedulerCluster, err := h.service.CreateSchedulerCluster(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -79,7 +79,7 @@ func (h *Handlers) DestroySchedulerCluster(ctx *gin.Context) {
return
}
if err := h.service.DestroySchedulerCluster(params.ID); err != nil {
if err := h.service.DestroySchedulerCluster(ctx.Request.Context(), params.ID); err != nil {
ctx.Error(err)
return
}
@ -113,7 +113,7 @@ func (h *Handlers) UpdateSchedulerCluster(ctx *gin.Context) {
}
if json.SecurityGroupDomain != "" {
scheduler, err := h.service.UpdateSchedulerClusterWithSecurityGroupDomain(params.ID, json)
scheduler, err := h.service.UpdateSchedulerClusterWithSecurityGroupDomain(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -123,7 +123,7 @@ func (h *Handlers) UpdateSchedulerCluster(ctx *gin.Context) {
return
}
schedulerCluster, err := h.service.UpdateSchedulerCluster(params.ID, json)
schedulerCluster, err := h.service.UpdateSchedulerCluster(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -150,7 +150,7 @@ func (h *Handlers) GetSchedulerCluster(ctx *gin.Context) {
return
}
schedulerCluster, err := h.service.GetSchedulerCluster(params.ID)
schedulerCluster, err := h.service.GetSchedulerCluster(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -179,13 +179,13 @@ func (h *Handlers) GetSchedulerClusters(ctx *gin.Context) {
}
h.setPaginationDefault(&query.Page, &query.PerPage)
schedulerClusters, err := h.service.GetSchedulerClusters(query)
schedulerClusters, err := h.service.GetSchedulerClusters(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
totalCount, err := h.service.SchedulerClusterTotalCount(query)
totalCount, err := h.service.SchedulerClusterTotalCount(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
@ -214,7 +214,7 @@ func (h *Handlers) AddSchedulerToSchedulerCluster(ctx *gin.Context) {
return
}
err := h.service.AddSchedulerToSchedulerCluster(params.ID, params.SchedulerID)
err := h.service.AddSchedulerToSchedulerCluster(ctx.Request.Context(), params.ID, params.SchedulerID)
if err != nil {
ctx.Error(err)
return

View File

@ -41,7 +41,7 @@ func (h *Handlers) CreateSecurityGroup(ctx *gin.Context) {
return
}
securityGroup, err := h.service.CreateSecurityGroup(json)
securityGroup, err := h.service.CreateSecurityGroup(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -68,7 +68,7 @@ func (h *Handlers) DestroySecurityGroup(ctx *gin.Context) {
return
}
if err := h.service.DestroySecurityGroup(params.ID); err != nil {
if err := h.service.DestroySecurityGroup(ctx.Request.Context(), params.ID); err != nil {
ctx.Error(err)
return
}
@ -101,7 +101,7 @@ func (h *Handlers) UpdateSecurityGroup(ctx *gin.Context) {
return
}
securityGroup, err := h.service.UpdateSecurityGroup(params.ID, json)
securityGroup, err := h.service.UpdateSecurityGroup(ctx.Request.Context(), params.ID, json)
if err != nil {
ctx.Error(err)
return
@ -128,7 +128,7 @@ func (h *Handlers) GetSecurityGroup(ctx *gin.Context) {
return
}
securityGroup, err := h.service.GetSecurityGroup(params.ID)
securityGroup, err := h.service.GetSecurityGroup(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -157,13 +157,13 @@ func (h *Handlers) GetSecurityGroups(ctx *gin.Context) {
}
h.setPaginationDefault(&query.Page, &query.PerPage)
securityGroups, err := h.service.GetSecurityGroups(query)
securityGroups, err := h.service.GetSecurityGroups(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
}
totalCount, err := h.service.SecurityGroupTotalCount(query)
totalCount, err := h.service.SecurityGroupTotalCount(ctx.Request.Context(), query)
if err != nil {
ctx.Error(err)
return
@ -192,7 +192,7 @@ func (h *Handlers) AddSchedulerClusterToSecurityGroup(ctx *gin.Context) {
return
}
err := h.service.AddSchedulerClusterToSecurityGroup(params.ID, params.SchedulerClusterID)
err := h.service.AddSchedulerClusterToSecurityGroup(ctx.Request.Context(), params.ID, params.SchedulerClusterID)
if err != nil {
ctx.Error(err)
return
@ -220,7 +220,7 @@ func (h *Handlers) AddCDNClusterToSecurityGroup(ctx *gin.Context) {
return
}
err := h.service.AddCDNClusterToSecurityGroup(params.ID, params.CDNClusterID)
err := h.service.AddCDNClusterToSecurityGroup(ctx.Request.Context(), params.ID, params.CDNClusterID)
if err != nil {
ctx.Error(err)
return

View File

@ -42,7 +42,7 @@ func (h *Handlers) GetUser(ctx *gin.Context) {
return
}
user, err := h.service.GetUser(params.ID)
user, err := h.service.GetUser(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -68,7 +68,7 @@ func (h *Handlers) SignUp(ctx *gin.Context) {
return
}
user, err := h.service.SignUp(json)
user, err := h.service.SignUp(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err)
return
@ -100,7 +100,7 @@ func (h *Handlers) ResetPassword(ctx *gin.Context) {
return
}
if err := h.service.ResetPassword(params.ID, json); err != nil {
if err := h.service.ResetPassword(ctx.Request.Context(), params.ID, json); err != nil {
ctx.Error(err)
return
}
@ -126,7 +126,7 @@ func (h *Handlers) OauthSignin(ctx *gin.Context) {
return
}
authURL, err := h.service.OauthSignin(params.Name)
authURL, err := h.service.OauthSignin(ctx.Request.Context(), params.Name)
if err != nil {
ctx.Error(err)
return
@ -159,7 +159,7 @@ func (h *Handlers) OauthSigninCallback(j *jwt.GinJWTMiddleware) func(*gin.Contex
return
}
user, err := h.service.OauthSigninCallback(params.Name, query.Code)
user, err := h.service.OauthSigninCallback(ctx.Request.Context(), params.Name, query.Code)
if err != nil {
ctx.Error(err)
return
@ -186,7 +186,7 @@ func (h *Handlers) GetRolesForUser(ctx *gin.Context) {
return
}
roles, err := h.service.GetRolesForUser(params.ID)
roles, err := h.service.GetRolesForUser(ctx.Request.Context(), params.ID)
if err != nil {
ctx.Error(err)
return
@ -213,7 +213,7 @@ func (h *Handlers) AddRoleToUser(ctx *gin.Context) {
return
}
if ok, err := h.service.AddRoleForUser(params); err != nil {
if ok, err := h.service.AddRoleForUser(ctx.Request.Context(), params); err != nil {
ctx.Error(err)
return
} else if !ok {
@ -242,7 +242,7 @@ func (h *Handlers) DeleteRoleForUser(ctx *gin.Context) {
return
}
if ok, err := h.service.DeleteRoleForUser(params); err != nil {
if ok, err := h.service.DeleteRoleForUser(ctx.Request.Context(), params); err != nil {
ctx.Error(err)
return
} else if !ok {

View File

@ -17,6 +17,7 @@
package job
import (
"context"
"encoding/json"
"errors"
"fmt"
@ -28,14 +29,19 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/util/net/httputils"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/distribution/distribution/v3"
"github.com/distribution/distribution/v3/manifest/schema2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
var tracer = otel.Tracer("sender")
type PreheatType string
const (
@ -46,8 +52,8 @@ const (
var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")
type Preheat interface {
CreatePreheat([]model.Scheduler, types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(string) (*types.Preheat, error)
CreatePreheat(context.Context, []model.Scheduler, types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(context.Context, string) (*types.Preheat, error)
}
type preheat struct {
@ -69,7 +75,7 @@ func newPreheat(job *internaljob.Job, bizTag string) (Preheat, error) {
}, nil
}
func (p *preheat) GetPreheat(id string) (*types.Preheat, error) {
func (p *preheat) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) {
groupJobState, err := p.job.GetGroupJobState(id)
if err != nil {
return nil, err
@ -82,7 +88,13 @@ func (p *preheat) GetPreheat(id string) (*types.Preheat, error) {
}, nil
}
func (p *preheat) CreatePreheat(schedulers []model.Scheduler, json types.CreatePreheatRequest) (*types.Preheat, error) {
func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Scheduler, json types.CreatePreheatRequest) (*types.Preheat, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributePreheatType.String(json.Type))
span.SetAttributes(config.AttributePreheatURL.String(json.URL))
defer span.End()
url := json.URL
filter := json.Filter
rawheader := json.Headers
@ -100,7 +112,7 @@ func (p *preheat) CreatePreheat(schedulers []model.Scheduler, json types.CreateP
return nil, err
}
files, err = p.getLayers(url, filter, httputils.MapToHeader(rawheader), image)
files, err = p.getLayers(ctx, url, filter, httputils.MapToHeader(rawheader), image)
if err != nil {
return nil, err
}
@ -121,10 +133,10 @@ func (p *preheat) CreatePreheat(schedulers []model.Scheduler, json types.CreateP
logger.Infof("preheat %s file url: %v queues: %v", json.URL, f.URL, queues)
}
return p.createGroupJob(files, queues)
return p.createGroupJob(ctx, files, queues)
}
func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*types.Preheat, error) {
func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*types.Preheat, error) {
signatures := []*machineryv1tasks.Signature{}
var urls []string
for i := range files {
@ -151,7 +163,7 @@ func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []i
return nil, err
}
if _, err := p.job.Server.SendGroup(group, 0); err != nil {
if _, err := p.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Error("create preheat group job failed", err)
return nil, err
}
@ -164,8 +176,11 @@ func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []i
}, nil
}
func (p *preheat) getLayers(url string, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) {
resp, err := p.getManifests(url, header)
func (p *preheat) getLayers(ctx context.Context, url string, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) {
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
resp, err := p.getManifests(ctx, url, header)
if err != nil {
return nil, err
}
@ -173,11 +188,11 @@ func (p *preheat) getLayers(url string, filter string, header http.Header, image
if resp.StatusCode/100 != 2 {
if resp.StatusCode == http.StatusUnauthorized {
token := getAuthToken(resp.Header)
token := getAuthToken(ctx, resp.Header)
bearer := "Bearer " + token
header.Add("Authorization", bearer)
resp, err = p.getManifests(url, header)
resp, err = p.getManifests(ctx, url, header)
if err != nil {
return nil, err
}
@ -194,8 +209,8 @@ func (p *preheat) getLayers(url string, filter string, header http.Header, image
return layers, nil
}
func (p *preheat) getManifests(url string, header http.Header) (*http.Response, error) {
req, err := http.NewRequest("GET", url, nil)
func (p *preheat) getManifests(ctx context.Context, url string, header http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
@ -239,13 +254,21 @@ func (p *preheat) parseLayers(resp *http.Response, url, filter string, header ht
return layers, nil
}
func getAuthToken(header http.Header) (token string) {
func getAuthToken(ctx context.Context, header http.Header) (token string) {
ctx, span := tracer.Start(ctx, config.SpanAuthWithRegistry, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
authURL := authURL(header.Values("WWW-Authenticate"))
if len(authURL) == 0 {
return
}
resp, err := http.Get(authURL)
req, err := http.NewRequestWithContext(ctx, "GET", authURL, nil)
if err != nil {
return
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}

View File

@ -17,6 +17,7 @@
package middlewares
import (
"context"
"net/http"
"time"
@ -69,7 +70,7 @@ func Jwt(service service.REST) (*jwt.GinJWTMiddleware, error) {
return "", jwt.ErrMissingLoginValues
}
user, err := service.SignIn(json)
user, err := service.SignIn(context.TODO(), json)
if err != nil {
return "", jwt.ErrFailedAuthentication
}

View File

@ -1,37 +0,0 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package middlewares
import (
"d7y.io/dragonfly/v2/manager/config"
"github.com/gin-gonic/gin"
"go.opentelemetry.io/otel"
)
const (
TracerName = "dragonfly-manager-rest"
)
func Tracer() gin.HandlerFunc {
return func(c *gin.Context) {
tracer := otel.Tracer(TracerName)
_, span := tracer.Start(c.Request.Context(), c.HandlerName())
span.SetAttributes(config.AttributeID.Float64(c.GetFloat64("id")))
defer span.End()
c.Next()
}
}

View File

@ -96,20 +96,20 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
// User
u := apiv1.Group("/users")
u.GET("/:id", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.GetUser)
u.POST("/signin", middlewares.Tracer(), jwt.LoginHandler)
u.POST("/signout", middlewares.Tracer(), jwt.LogoutHandler)
u.POST("/signup", middlewares.Tracer(), h.SignUp)
u.GET("/signin/:name", middlewares.Tracer(), h.OauthSignin)
u.GET("/signin/:name/callback", middlewares.Tracer(), h.OauthSigninCallback(jwt))
u.POST("/refresh_token", middlewares.Tracer(), jwt.RefreshHandler)
u.POST("/:id/reset_password", middlewares.Tracer(), h.ResetPassword)
u.GET("/:id/roles", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.GetRolesForUser)
u.PUT("/:id/roles/:role", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.AddRoleToUser)
u.DELETE("/:id/roles/:role", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.DeleteRoleForUser)
u.GET("/:id", jwt.MiddlewareFunc(), rbac, h.GetUser)
u.POST("/signin", jwt.LoginHandler)
u.POST("/signout", jwt.LogoutHandler)
u.POST("/signup", h.SignUp)
u.GET("/signin/:name", h.OauthSignin)
u.GET("/signin/:name/callback", h.OauthSigninCallback(jwt))
u.POST("/refresh_token", jwt.RefreshHandler)
u.POST("/:id/reset_password", h.ResetPassword)
u.GET("/:id/roles", jwt.MiddlewareFunc(), rbac, h.GetRolesForUser)
u.PUT("/:id/roles/:role", jwt.MiddlewareFunc(), rbac, h.AddRoleToUser)
u.DELETE("/:id/roles/:role", jwt.MiddlewareFunc(), rbac, h.DeleteRoleForUser)
// Role
re := apiv1.Group("/roles", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac)
re := apiv1.Group("/roles", jwt.MiddlewareFunc(), rbac)
re.POST("", h.CreateRole)
re.DELETE("/:role", h.DestroyRole)
re.GET("/:role", h.GetRole)
@ -118,11 +118,11 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
re.DELETE("/:role/permissions", h.DeletePermissionForRole)
// Permission
pm := apiv1.Group("/permissions", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac)
pm := apiv1.Group("/permissions", jwt.MiddlewareFunc(), rbac)
pm.GET("", h.GetPermissions(r))
// Oauth
oa := apiv1.Group("/oauth", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac)
oa := apiv1.Group("/oauth", jwt.MiddlewareFunc(), rbac)
oa.POST("", h.CreateOauth)
oa.DELETE(":id", h.DestroyOauth)
oa.PATCH(":id", h.UpdateOauth)
@ -130,7 +130,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
oa.GET("", h.GetOauths)
// Scheduler Cluster
sc := apiv1.Group("/scheduler-clusters", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, middlewares.Tracer())
sc := apiv1.Group("/scheduler-clusters", jwt.MiddlewareFunc(), rbac)
sc.POST("", h.CreateSchedulerCluster)
sc.DELETE(":id", h.DestroySchedulerCluster)
sc.PATCH(":id", h.UpdateSchedulerCluster)
@ -139,7 +139,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
sc.PUT(":id/schedulers/:scheduler_id", h.AddSchedulerToSchedulerCluster)
// Scheduler
s := apiv1.Group("/schedulers", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac)
s := apiv1.Group("/schedulers", jwt.MiddlewareFunc(), rbac)
s.POST("", h.CreateScheduler)
s.DELETE(":id", h.DestroyScheduler)
s.PATCH(":id", h.UpdateScheduler)
@ -147,7 +147,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
s.GET("", h.GetSchedulers)
// CDN Cluster
cc := apiv1.Group("/cdn-clusters", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac)
cc := apiv1.Group("/cdn-clusters", jwt.MiddlewareFunc(), rbac)
cc.POST("", h.CreateCDNCluster)
cc.DELETE(":id", h.DestroyCDNCluster)
cc.PATCH(":id", h.UpdateCDNCluster)
@ -157,7 +157,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
cc.PUT(":id/scheduler-clusters/:scheduler_cluster_id", h.AddSchedulerClusterToCDNCluster)
// CDN
c := apiv1.Group("/cdns", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac)
c := apiv1.Group("/cdns", jwt.MiddlewareFunc(), rbac)
c.POST("", h.CreateCDN)
c.DELETE(":id", h.DestroyCDN)
c.PATCH(":id", h.UpdateCDN)
@ -165,7 +165,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
c.GET("", h.GetCDNs)
// Security Group
sg := apiv1.Group("/security-groups", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac)
sg := apiv1.Group("/security-groups", jwt.MiddlewareFunc(), rbac)
sg.POST("", h.CreateSecurityGroup)
sg.DELETE(":id", h.DestroySecurityGroup)
sg.PATCH(":id", h.UpdateSecurityGroup)
@ -175,7 +175,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) (
sg.PUT(":id/cdn-clusters/:cdn_cluster_id", h.AddCDNClusterToSecurityGroup)
// Preheat
ph := apiv1.Group("/preheats", middlewares.Tracer())
ph := apiv1.Group("/preheats")
ph.POST("", h.CreatePreheat)
ph.GET(":id", h.GetPreheat)

View File

@ -17,11 +17,13 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
)
func (s *rest) CreateCDN(json types.CreateCDNRequest) (*model.CDN, error) {
func (s *rest) CreateCDN(ctx context.Context, json types.CreateCDNRequest) (*model.CDN, error) {
cdn := model.CDN{
HostName: json.HostName,
IDC: json.IDC,
@ -32,29 +34,29 @@ func (s *rest) CreateCDN(json types.CreateCDNRequest) (*model.CDN, error) {
CDNClusterID: json.CDNClusterID,
}
if err := s.db.Create(&cdn).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&cdn).Error; err != nil {
return nil, err
}
return &cdn, nil
}
func (s *rest) DestroyCDN(id uint) error {
func (s *rest) DestroyCDN(ctx context.Context, id uint) error {
cdn := model.CDN{}
if err := s.db.First(&cdn, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdn, id).Error; err != nil {
return err
}
if err := s.db.Unscoped().Delete(&model.CDN{}, id).Error; err != nil {
if err := s.db.WithContext(ctx).Unscoped().Delete(&model.CDN{}, id).Error; err != nil {
return err
}
return nil
}
func (s *rest) UpdateCDN(id uint, json types.UpdateCDNRequest) (*model.CDN, error) {
func (s *rest) UpdateCDN(ctx context.Context, id uint, json types.UpdateCDNRequest) (*model.CDN, error) {
cdn := model.CDN{}
if err := s.db.First(&cdn, id).Updates(model.CDN{
if err := s.db.WithContext(ctx).First(&cdn, id).Updates(model.CDN{
IDC: json.IDC,
Location: json.Location,
IP: json.IP,
@ -68,18 +70,18 @@ func (s *rest) UpdateCDN(id uint, json types.UpdateCDNRequest) (*model.CDN, erro
return &cdn, nil
}
func (s *rest) GetCDN(id uint) (*model.CDN, error) {
func (s *rest) GetCDN(ctx context.Context, id uint) (*model.CDN, error) {
cdn := model.CDN{}
if err := s.db.First(&cdn, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdn, id).Error; err != nil {
return nil, err
}
return &cdn, nil
}
func (s *rest) GetCDNs(q types.GetCDNsQuery) (*[]model.CDN, error) {
func (s *rest) GetCDNs(ctx context.Context, q types.GetCDNsQuery) (*[]model.CDN, error) {
cdns := []model.CDN{}
if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDN{
if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDN{
HostName: q.HostName,
IDC: q.IDC,
Location: q.Location,
@ -94,9 +96,9 @@ func (s *rest) GetCDNs(q types.GetCDNsQuery) (*[]model.CDN, error) {
return &cdns, nil
}
func (s *rest) CDNTotalCount(q types.GetCDNsQuery) (int64, error) {
func (s *rest) CDNTotalCount(ctx context.Context, q types.GetCDNsQuery) (int64, error) {
var count int64
if err := s.db.Model(&model.CDN{}).Where(&model.CDN{
if err := s.db.WithContext(ctx).Model(&model.CDN{}).Where(&model.CDN{
HostName: q.HostName,
IDC: q.IDC,
Location: q.Location,

View File

@ -17,12 +17,14 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/util/structutils"
)
func (s *rest) CreateCDNCluster(json types.CreateCDNClusterRequest) (*model.CDNCluster, error) {
func (s *rest) CreateCDNCluster(ctx context.Context, json types.CreateCDNClusterRequest) (*model.CDNCluster, error) {
config, err := structutils.StructToMap(json.Config)
if err != nil {
return nil, err
@ -34,32 +36,32 @@ func (s *rest) CreateCDNCluster(json types.CreateCDNClusterRequest) (*model.CDNC
Config: config,
}
if err := s.db.Create(&cdnCluster).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&cdnCluster).Error; err != nil {
return nil, err
}
return &cdnCluster, nil
}
func (s *rest) DestroyCDNCluster(id uint) error {
func (s *rest) DestroyCDNCluster(ctx context.Context, id uint) error {
cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil {
return err
}
if err := s.db.Unscoped().Delete(&model.CDNCluster{}, id).Error; err != nil {
if err := s.db.WithContext(ctx).Unscoped().Delete(&model.CDNCluster{}, id).Error; err != nil {
return err
}
return nil
}
func (s *rest) CreateCDNClusterWithSecurityGroupDomain(json types.CreateCDNClusterRequest) (*model.CDNCluster, error) {
func (s *rest) CreateCDNClusterWithSecurityGroupDomain(ctx context.Context, json types.CreateCDNClusterRequest) (*model.CDNCluster, error) {
securityGroup := model.SecurityGroup{
Domain: json.SecurityGroupDomain,
}
if err := s.db.First(&securityGroup).Error; err != nil {
return s.CreateCDNCluster(json)
if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil {
return s.CreateCDNCluster(ctx, json)
}
config, err := structutils.StructToMap(json.Config)
@ -73,7 +75,7 @@ func (s *rest) CreateCDNClusterWithSecurityGroupDomain(json types.CreateCDNClust
Config: config,
}
if err := s.db.Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil {
if err := s.db.WithContext(ctx).Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil {
return nil, err
}
@ -81,14 +83,14 @@ func (s *rest) CreateCDNClusterWithSecurityGroupDomain(json types.CreateCDNClust
return &cdnCluster, nil
}
func (s *rest) UpdateCDNCluster(id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) {
func (s *rest) UpdateCDNCluster(ctx context.Context, id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) {
config, err := structutils.StructToMap(json.Config)
if err != nil {
return nil, err
}
cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, id).Updates(model.CDNCluster{
if err := s.db.WithContext(ctx).First(&cdnCluster, id).Updates(model.CDNCluster{
Name: json.Name,
BIO: json.BIO,
Config: config,
@ -99,12 +101,12 @@ func (s *rest) UpdateCDNCluster(id uint, json types.UpdateCDNClusterRequest) (*m
return &cdnCluster, nil
}
func (s *rest) UpdateCDNClusterWithSecurityGroupDomain(id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) {
func (s *rest) UpdateCDNClusterWithSecurityGroupDomain(ctx context.Context, id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) {
securityGroup := model.SecurityGroup{
Domain: json.SecurityGroupDomain,
}
if err := s.db.First(&securityGroup).Error; err != nil {
return s.UpdateCDNCluster(id, json)
if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil {
return s.UpdateCDNCluster(ctx, id, json)
}
config, err := structutils.StructToMap(json.Config)
@ -118,25 +120,25 @@ func (s *rest) UpdateCDNClusterWithSecurityGroupDomain(id uint, json types.Updat
Config: config,
}
if err := s.db.Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil {
if err := s.db.WithContext(ctx).Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil {
return nil, err
}
return &cdnCluster, nil
}
func (s *rest) GetCDNCluster(id uint) (*model.CDNCluster, error) {
func (s *rest) GetCDNCluster(ctx context.Context, id uint) (*model.CDNCluster, error) {
cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil {
return nil, err
}
return &cdnCluster, nil
}
func (s *rest) GetCDNClusters(q types.GetCDNClustersQuery) (*[]model.CDNCluster, error) {
func (s *rest) GetCDNClusters(ctx context.Context, q types.GetCDNClustersQuery) (*[]model.CDNCluster, error) {
cdnClusters := []model.CDNCluster{}
if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDNCluster{
if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDNCluster{
Name: q.Name,
}).Find(&cdnClusters).Error; err != nil {
return nil, err
@ -145,9 +147,9 @@ func (s *rest) GetCDNClusters(q types.GetCDNClustersQuery) (*[]model.CDNCluster,
return &cdnClusters, nil
}
func (s *rest) CDNClusterTotalCount(q types.GetCDNClustersQuery) (int64, error) {
func (s *rest) CDNClusterTotalCount(ctx context.Context, q types.GetCDNClustersQuery) (int64, error) {
var count int64
if err := s.db.Model(&model.CDNCluster{}).Where(&model.CDNCluster{
if err := s.db.WithContext(ctx).Model(&model.CDNCluster{}).Where(&model.CDNCluster{
Name: q.Name,
}).Count(&count).Error; err != nil {
return 0, err
@ -156,36 +158,36 @@ func (s *rest) CDNClusterTotalCount(q types.GetCDNClustersQuery) (int64, error)
return count, nil
}
func (s *rest) AddCDNToCDNCluster(id, cdnID uint) error {
func (s *rest) AddCDNToCDNCluster(ctx context.Context, id, cdnID uint) error {
cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil {
return err
}
cdn := model.CDN{}
if err := s.db.First(&cdn, cdnID).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdn, cdnID).Error; err != nil {
return err
}
if err := s.db.Model(&cdnCluster).Association("CDNs").Append(&cdn); err != nil {
if err := s.db.WithContext(ctx).Model(&cdnCluster).Association("CDNs").Append(&cdn); err != nil {
return err
}
return nil
}
func (s *rest) AddSchedulerClusterToCDNCluster(id, schedulerClusterID uint) error {
func (s *rest) AddSchedulerClusterToCDNCluster(ctx context.Context, id, schedulerClusterID uint) error {
cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil {
return err
}
schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, schedulerClusterID).Error; err != nil {
if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {
return err
}
if err := s.db.Model(&cdnCluster).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
if err := s.db.WithContext(ctx).Model(&cdnCluster).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
return err
}

View File

@ -17,11 +17,13 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
)
func (s *rest) CreateOauth(json types.CreateOauthRequest) (*model.Oauth, error) {
func (s *rest) CreateOauth(ctx context.Context, json types.CreateOauthRequest) (*model.Oauth, error) {
oauth := model.Oauth{
Name: json.Name,
BIO: json.BIO,
@ -30,29 +32,29 @@ func (s *rest) CreateOauth(json types.CreateOauthRequest) (*model.Oauth, error)
RedirectURL: json.RedirectURL,
}
if err := s.db.Create(&oauth).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&oauth).Error; err != nil {
return nil, err
}
return &oauth, nil
}
func (s *rest) DestroyOauth(id uint) error {
func (s *rest) DestroyOauth(ctx context.Context, id uint) error {
oauth := model.Oauth{}
if err := s.db.First(&oauth, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&oauth, id).Error; err != nil {
return err
}
if err := s.db.Unscoped().Delete(&model.Oauth{}, id).Error; err != nil {
if err := s.db.WithContext(ctx).Unscoped().Delete(&model.Oauth{}, id).Error; err != nil {
return err
}
return nil
}
func (s *rest) UpdateOauth(id uint, json types.UpdateOauthRequest) (*model.Oauth, error) {
func (s *rest) UpdateOauth(ctx context.Context, id uint, json types.UpdateOauthRequest) (*model.Oauth, error) {
oauth := model.Oauth{}
if err := s.db.First(&oauth, id).Updates(model.Oauth{
if err := s.db.WithContext(ctx).First(&oauth, id).Updates(model.Oauth{
Name: json.Name,
BIO: json.BIO,
ClientID: json.ClientID,
@ -65,18 +67,18 @@ func (s *rest) UpdateOauth(id uint, json types.UpdateOauthRequest) (*model.Oauth
return &oauth, nil
}
func (s *rest) GetOauth(id uint) (*model.Oauth, error) {
func (s *rest) GetOauth(ctx context.Context, id uint) (*model.Oauth, error) {
oauth := model.Oauth{}
if err := s.db.First(&oauth, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&oauth, id).Error; err != nil {
return nil, err
}
return &oauth, nil
}
func (s *rest) GetOauths(q types.GetOauthsQuery) (*[]model.Oauth, error) {
func (s *rest) GetOauths(ctx context.Context, q types.GetOauthsQuery) (*[]model.Oauth, error) {
oauths := []model.Oauth{}
if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Oauth{
if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Oauth{
Name: q.Name,
ClientID: q.ClientID,
}).Find(&oauths).Error; err != nil {
@ -86,9 +88,9 @@ func (s *rest) GetOauths(q types.GetOauthsQuery) (*[]model.Oauth, error) {
return &oauths, nil
}
func (s *rest) OauthTotalCount(q types.GetOauthsQuery) (int64, error) {
func (s *rest) OauthTotalCount(ctx context.Context, q types.GetOauthsQuery) (int64, error) {
var count int64
if err := s.db.Model(&model.Oauth{}).Where(&model.Oauth{
if err := s.db.WithContext(ctx).Model(&model.Oauth{}).Where(&model.Oauth{
Name: q.Name,
ClientID: q.ClientID,
}).Count(&count).Error; err != nil {

View File

@ -17,10 +17,12 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/permission/rbac"
"github.com/gin-gonic/gin"
)
func (s *rest) GetPermissions(g *gin.Engine) []rbac.Permission {
func (s *rest) GetPermissions(ctx context.Context, g *gin.Engine) []rbac.Permission {
return rbac.GetPermissions(g)
}

View File

@ -17,6 +17,7 @@
package service
import (
"context"
"time"
"d7y.io/dragonfly/v2/manager/model"
@ -39,33 +40,33 @@ const (
V1PreheatingStatusFail = "FAIL"
)
func (s *rest) CreatePreheat(json types.CreatePreheatRequest) (*types.Preheat, error) {
func (s *rest) CreatePreheat(ctx context.Context, json types.CreatePreheatRequest) (*types.Preheat, error) {
if json.SchedulerClusterID != nil {
schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, json.SchedulerClusterID).Error; err != nil {
if err := s.db.WithContext(ctx).First(&schedulerCluster, json.SchedulerClusterID).Error; err != nil {
return nil, err
}
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{
if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{
SchedulerClusterID: schedulerCluster.ID,
Status: model.SchedulerStatusActive,
}).Error; err != nil {
return nil, err
}
return s.job.CreatePreheat([]model.Scheduler{scheduler}, json)
return s.job.CreatePreheat(ctx, []model.Scheduler{scheduler}, json)
}
schedulerClusters := []model.SchedulerCluster{}
if err := s.db.Find(&schedulerClusters).Error; err != nil {
if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil {
return nil, err
}
var schedulers []model.Scheduler
for _, schedulerCluster := range schedulerClusters {
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{
if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{
SchedulerClusterID: schedulerCluster.ID,
Status: model.SchedulerStatusActive,
}).Error; err != nil {
@ -75,15 +76,15 @@ func (s *rest) CreatePreheat(json types.CreatePreheatRequest) (*types.Preheat, e
schedulers = append(schedulers, scheduler)
}
return s.job.CreatePreheat(schedulers, json)
return s.job.CreatePreheat(ctx, schedulers, json)
}
func (s *rest) GetPreheat(id string) (*types.Preheat, error) {
return s.job.GetPreheat(id)
func (s *rest) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) {
return s.job.GetPreheat(ctx, id)
}
func (s *rest) CreateV1Preheat(json types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) {
p, err := s.CreatePreheat(types.CreatePreheatRequest{
func (s *rest) CreateV1Preheat(ctx context.Context, json types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) {
p, err := s.CreatePreheat(ctx, types.CreatePreheatRequest{
Type: json.Type,
URL: json.URL,
Filter: json.Filter,
@ -98,8 +99,8 @@ func (s *rest) CreateV1Preheat(json types.CreateV1PreheatRequest) (*types.Create
}, nil
}
func (s *rest) GetV1Preheat(id string) (*types.GetV1PreheatResponse, error) {
p, err := s.job.GetPreheat(id)
func (s *rest) GetV1Preheat(ctx context.Context, id string) (*types.GetV1PreheatResponse, error) {
p, err := s.job.GetPreheat(ctx, id)
if err != nil {
return nil, err
}

View File

@ -17,10 +17,12 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/types"
)
func (s *rest) CreateRole(json types.CreateRoleRequest) error {
func (s *rest) CreateRole(ctx context.Context, json types.CreateRoleRequest) error {
for _, permission := range json.Permissions {
_, err := s.enforcer.AddPermissionForUser(json.Role, permission.Object, permission.Action)
if err != nil {
@ -31,22 +33,22 @@ func (s *rest) CreateRole(json types.CreateRoleRequest) error {
return nil
}
func (s *rest) DestroyRole(role string) (bool, error) {
func (s *rest) DestroyRole(ctx context.Context, role string) (bool, error) {
return s.enforcer.DeleteRole(role)
}
func (s *rest) GetRole(role string) [][]string {
func (s *rest) GetRole(ctx context.Context, role string) [][]string {
return s.enforcer.GetPermissionsForUser(role)
}
func (s *rest) GetRoles() []string {
func (s *rest) GetRoles(ctx context.Context) []string {
return s.enforcer.GetAllSubjects()
}
func (s *rest) AddPermissionForRole(role string, json types.AddPermissionForRoleRequest) (bool, error) {
func (s *rest) AddPermissionForRole(ctx context.Context, role string, json types.AddPermissionForRoleRequest) (bool, error) {
return s.enforcer.AddPermissionForUser(role, json.Object, json.Action)
}
func (s *rest) DeletePermissionForRole(role string, json types.DeletePermissionForRoleRequest) (bool, error) {
func (s *rest) DeletePermissionForRole(ctx context.Context, role string, json types.DeletePermissionForRoleRequest) (bool, error) {
return s.enforcer.DeletePermissionForUser(role, json.Object, json.Action)
}

View File

@ -17,11 +17,13 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
)
func (s *rest) CreateScheduler(json types.CreateSchedulerRequest) (*model.Scheduler, error) {
func (s *rest) CreateScheduler(ctx context.Context, json types.CreateSchedulerRequest) (*model.Scheduler, error) {
scheduler := model.Scheduler{
HostName: json.HostName,
VIPs: json.VIPs,
@ -33,29 +35,29 @@ func (s *rest) CreateScheduler(json types.CreateSchedulerRequest) (*model.Schedu
SchedulerClusterID: json.SchedulerClusterID,
}
if err := s.db.Create(&scheduler).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil {
return nil, err
}
return &scheduler, nil
}
func (s *rest) DestroyScheduler(id uint) error {
func (s *rest) DestroyScheduler(ctx context.Context, id uint) error {
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&scheduler, id).Error; err != nil {
return err
}
if err := s.db.Unscoped().Delete(&model.Scheduler{}, id).Error; err != nil {
if err := s.db.WithContext(ctx).Unscoped().Delete(&model.Scheduler{}, id).Error; err != nil {
return err
}
return nil
}
func (s *rest) UpdateScheduler(id uint, json types.UpdateSchedulerRequest) (*model.Scheduler, error) {
func (s *rest) UpdateScheduler(ctx context.Context, id uint, json types.UpdateSchedulerRequest) (*model.Scheduler, error) {
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, id).Updates(model.Scheduler{
if err := s.db.WithContext(ctx).First(&scheduler, id).Updates(model.Scheduler{
VIPs: json.VIPs,
IDC: json.IDC,
Location: json.Location,
@ -70,18 +72,18 @@ func (s *rest) UpdateScheduler(id uint, json types.UpdateSchedulerRequest) (*mod
return &scheduler, nil
}
func (s *rest) GetScheduler(id uint) (*model.Scheduler, error) {
func (s *rest) GetScheduler(ctx context.Context, id uint) (*model.Scheduler, error) {
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&scheduler, id).Error; err != nil {
return nil, err
}
return &scheduler, nil
}
func (s *rest) GetSchedulers(q types.GetSchedulersQuery) (*[]model.Scheduler, error) {
func (s *rest) GetSchedulers(ctx context.Context, q types.GetSchedulersQuery) (*[]model.Scheduler, error) {
schedulers := []model.Scheduler{}
if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Scheduler{
if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Scheduler{
HostName: q.HostName,
IDC: q.IDC,
Location: q.Location,
@ -95,9 +97,9 @@ func (s *rest) GetSchedulers(q types.GetSchedulersQuery) (*[]model.Scheduler, er
return &schedulers, nil
}
func (s *rest) SchedulerTotalCount(q types.GetSchedulersQuery) (int64, error) {
func (s *rest) SchedulerTotalCount(ctx context.Context, q types.GetSchedulersQuery) (int64, error) {
var count int64
if err := s.db.Model(&model.Scheduler{}).Where(&model.Scheduler{
if err := s.db.WithContext(ctx).Model(&model.Scheduler{}).Where(&model.Scheduler{
HostName: q.HostName,
IDC: q.IDC,
Location: q.Location,

View File

@ -17,12 +17,14 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
"d7y.io/dragonfly/v2/pkg/util/structutils"
)
func (s *rest) CreateSchedulerCluster(json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
func (s *rest) CreateSchedulerCluster(ctx context.Context, json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
config, err := structutils.StructToMap(json.Config)
if err != nil {
return nil, err
@ -47,12 +49,12 @@ func (s *rest) CreateSchedulerCluster(json types.CreateSchedulerClusterRequest)
IsDefault: json.IsDefault,
}
if err := s.db.Create(&schedulerCluster).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&schedulerCluster).Error; err != nil {
return nil, err
}
if json.CDNClusterID > 0 {
if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil {
if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil {
return nil, err
}
}
@ -60,12 +62,12 @@ func (s *rest) CreateSchedulerCluster(json types.CreateSchedulerClusterRequest)
return &schedulerCluster, nil
}
func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(ctx context.Context, json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
securityGroup := model.SecurityGroup{
Domain: json.SecurityGroupDomain,
}
if err := s.db.First(&securityGroup).Error; err != nil {
return s.CreateSchedulerCluster(json)
if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil {
return s.CreateSchedulerCluster(ctx, json)
}
config, err := structutils.StructToMap(json.Config)
@ -92,12 +94,12 @@ func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(json types.CreateSc
IsDefault: json.IsDefault,
}
if err := s.db.Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
if err := s.db.WithContext(ctx).Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
return nil, err
}
if json.CDNClusterID > 0 {
if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil {
if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil {
return nil, err
}
}
@ -105,20 +107,20 @@ func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(json types.CreateSc
return &schedulerCluster, nil
}
func (s *rest) DestroySchedulerCluster(id uint) error {
func (s *rest) DestroySchedulerCluster(ctx context.Context, id uint) error {
schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&schedulerCluster, id).Error; err != nil {
return err
}
if err := s.db.Unscoped().Delete(&model.SchedulerCluster{}, id).Error; err != nil {
if err := s.db.WithContext(ctx).Unscoped().Delete(&model.SchedulerCluster{}, id).Error; err != nil {
return err
}
return nil
}
func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
func (s *rest) UpdateSchedulerCluster(ctx context.Context, id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
config, err := structutils.StructToMap(json.Config)
if err != nil {
return nil, err
@ -135,7 +137,7 @@ func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerCluster
}
schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, id).Updates(model.SchedulerCluster{
if err := s.db.WithContext(ctx).First(&schedulerCluster, id).Updates(model.SchedulerCluster{
Name: json.Name,
BIO: json.BIO,
Config: config,
@ -147,7 +149,7 @@ func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerCluster
}
if json.CDNClusterID > 0 {
if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil {
if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil {
return nil, err
}
}
@ -155,12 +157,12 @@ func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerCluster
return &schedulerCluster, nil
}
func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(ctx context.Context, id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) {
securityGroup := model.SecurityGroup{
Domain: json.SecurityGroupDomain,
}
if err := s.db.First(&securityGroup).Error; err != nil {
return s.UpdateSchedulerCluster(id, json)
if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil {
return s.UpdateSchedulerCluster(ctx, id, json)
}
config, err := structutils.StructToMap(json.Config)
@ -187,12 +189,12 @@ func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(id uint, json types
IsDefault: json.IsDefault,
}
if err := s.db.Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
if err := s.db.WithContext(ctx).Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
return nil, err
}
if json.CDNClusterID > 0 {
if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil {
if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil {
return nil, err
}
}
@ -200,18 +202,18 @@ func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(id uint, json types
return &schedulerCluster, nil
}
func (s *rest) GetSchedulerCluster(id uint) (*model.SchedulerCluster, error) {
func (s *rest) GetSchedulerCluster(ctx context.Context, id uint) (*model.SchedulerCluster, error) {
schedulerCluster := model.SchedulerCluster{}
if err := s.db.Preload("CDNClusters").First(&schedulerCluster, id).Error; err != nil {
if err := s.db.WithContext(ctx).Preload("CDNClusters").First(&schedulerCluster, id).Error; err != nil {
return nil, err
}
return &schedulerCluster, nil
}
func (s *rest) GetSchedulerClusters(q types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error) {
func (s *rest) GetSchedulerClusters(ctx context.Context, q types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error) {
schedulerClusters := []model.SchedulerCluster{}
if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SchedulerCluster{
if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SchedulerCluster{
Name: q.Name,
}).Preload("CDNClusters").Find(&schedulerClusters).Error; err != nil {
return nil, err
@ -220,9 +222,9 @@ func (s *rest) GetSchedulerClusters(q types.GetSchedulerClustersQuery) (*[]model
return &schedulerClusters, nil
}
func (s *rest) SchedulerClusterTotalCount(q types.GetSchedulerClustersQuery) (int64, error) {
func (s *rest) SchedulerClusterTotalCount(ctx context.Context, q types.GetSchedulerClustersQuery) (int64, error) {
var count int64
if err := s.db.Model(&model.SchedulerCluster{}).Where(&model.SchedulerCluster{
if err := s.db.WithContext(ctx).Model(&model.SchedulerCluster{}).Where(&model.SchedulerCluster{
Name: q.Name,
}).Count(&count).Error; err != nil {
return 0, err
@ -231,18 +233,18 @@ func (s *rest) SchedulerClusterTotalCount(q types.GetSchedulerClustersQuery) (in
return count, nil
}
func (s *rest) AddSchedulerToSchedulerCluster(id, schedulerID uint) error {
func (s *rest) AddSchedulerToSchedulerCluster(ctx context.Context, id, schedulerID uint) error {
schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&schedulerCluster, id).Error; err != nil {
return err
}
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, schedulerID).Error; err != nil {
if err := s.db.WithContext(ctx).First(&scheduler, schedulerID).Error; err != nil {
return err
}
if err := s.db.Model(&schedulerCluster).Association("Schedulers").Append(&scheduler); err != nil {
if err := s.db.WithContext(ctx).Model(&schedulerCluster).Association("Schedulers").Append(&scheduler); err != nil {
return err
}

View File

@ -17,11 +17,13 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/manager/types"
)
func (s *rest) CreateSecurityGroup(json types.CreateSecurityGroupRequest) (*model.SecurityGroup, error) {
func (s *rest) CreateSecurityGroup(ctx context.Context, json types.CreateSecurityGroupRequest) (*model.SecurityGroup, error) {
securityGroup := model.SecurityGroup{
Name: json.Name,
BIO: json.BIO,
@ -29,29 +31,29 @@ func (s *rest) CreateSecurityGroup(json types.CreateSecurityGroupRequest) (*mode
ProxyDomain: json.ProxyDomain,
}
if err := s.db.Create(&securityGroup).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&securityGroup).Error; err != nil {
return nil, err
}
return &securityGroup, nil
}
func (s *rest) DestroySecurityGroup(id uint) error {
func (s *rest) DestroySecurityGroup(ctx context.Context, id uint) error {
securityGroup := model.SecurityGroup{}
if err := s.db.First(&securityGroup, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil {
return err
}
if err := s.db.Unscoped().Delete(&model.SecurityGroup{}, id).Error; err != nil {
if err := s.db.WithContext(ctx).Unscoped().Delete(&model.SecurityGroup{}, id).Error; err != nil {
return err
}
return nil
}
func (s *rest) UpdateSecurityGroup(id uint, json types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error) {
func (s *rest) UpdateSecurityGroup(ctx context.Context, id uint, json types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error) {
securityGroup := model.SecurityGroup{}
if err := s.db.First(&securityGroup, id).Updates(model.SecurityGroup{
if err := s.db.WithContext(ctx).First(&securityGroup, id).Updates(model.SecurityGroup{
Name: json.Name,
BIO: json.BIO,
Domain: json.Domain,
@ -63,18 +65,18 @@ func (s *rest) UpdateSecurityGroup(id uint, json types.UpdateSecurityGroupReques
return &securityGroup, nil
}
func (s *rest) GetSecurityGroup(id uint) (*model.SecurityGroup, error) {
func (s *rest) GetSecurityGroup(ctx context.Context, id uint) (*model.SecurityGroup, error) {
securityGroup := model.SecurityGroup{}
if err := s.db.First(&securityGroup, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil {
return nil, err
}
return &securityGroup, nil
}
func (s *rest) GetSecurityGroups(q types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error) {
func (s *rest) GetSecurityGroups(ctx context.Context, q types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error) {
securityGroups := []model.SecurityGroup{}
if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SecurityGroup{
if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SecurityGroup{
Name: q.Name,
Domain: q.Domain,
}).Find(&securityGroups).Error; err != nil {
@ -84,9 +86,9 @@ func (s *rest) GetSecurityGroups(q types.GetSecurityGroupsQuery) (*[]model.Secur
return &securityGroups, nil
}
func (s *rest) SecurityGroupTotalCount(q types.GetSecurityGroupsQuery) (int64, error) {
func (s *rest) SecurityGroupTotalCount(ctx context.Context, q types.GetSecurityGroupsQuery) (int64, error) {
var count int64
if err := s.db.Model(&model.SecurityGroup{}).Where(&model.SecurityGroup{
if err := s.db.WithContext(ctx).Model(&model.SecurityGroup{}).Where(&model.SecurityGroup{
Name: q.Name,
Domain: q.Domain,
}).Count(&count).Error; err != nil {
@ -96,36 +98,36 @@ func (s *rest) SecurityGroupTotalCount(q types.GetSecurityGroupsQuery) (int64, e
return count, nil
}
func (s *rest) AddSchedulerClusterToSecurityGroup(id, schedulerClusterID uint) error {
func (s *rest) AddSchedulerClusterToSecurityGroup(ctx context.Context, id, schedulerClusterID uint) error {
securityGroup := model.SecurityGroup{}
if err := s.db.First(&securityGroup, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil {
return err
}
schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, schedulerClusterID).Error; err != nil {
if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil {
return err
}
if err := s.db.Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
if err := s.db.WithContext(ctx).Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil {
return err
}
return nil
}
func (s *rest) AddCDNClusterToSecurityGroup(id, cdnClusterID uint) error {
func (s *rest) AddCDNClusterToSecurityGroup(ctx context.Context, id, cdnClusterID uint) error {
securityGroup := model.SecurityGroup{}
if err := s.db.First(&securityGroup, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil {
return err
}
cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, cdnClusterID).Error; err != nil {
if err := s.db.WithContext(ctx).First(&cdnCluster, cdnClusterID).Error; err != nil {
return err
}
if err := s.db.Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil {
if err := s.db.WithContext(ctx).Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil {
return err
}

View File

@ -17,6 +17,8 @@
package service
import (
"context"
"d7y.io/dragonfly/v2/manager/cache"
"d7y.io/dragonfly/v2/manager/database"
"d7y.io/dragonfly/v2/manager/job"
@ -31,80 +33,80 @@ import (
)
type REST interface {
GetUser(uint) (*model.User, error)
SignIn(types.SignInRequest) (*model.User, error)
SignUp(types.SignUpRequest) (*model.User, error)
OauthSignin(string) (string, error)
OauthSigninCallback(string, string) (*model.User, error)
ResetPassword(uint, types.ResetPasswordRequest) error
GetRolesForUser(uint) ([]string, error)
AddRoleForUser(types.AddRoleForUserParams) (bool, error)
DeleteRoleForUser(types.DeleteRoleForUserParams) (bool, error)
GetUser(context.Context, uint) (*model.User, error)
SignIn(context.Context, types.SignInRequest) (*model.User, error)
SignUp(context.Context, types.SignUpRequest) (*model.User, error)
OauthSignin(context.Context, string) (string, error)
OauthSigninCallback(context.Context, string, string) (*model.User, error)
ResetPassword(context.Context, uint, types.ResetPasswordRequest) error
GetRolesForUser(context.Context, uint) ([]string, error)
AddRoleForUser(context.Context, types.AddRoleForUserParams) (bool, error)
DeleteRoleForUser(context.Context, types.DeleteRoleForUserParams) (bool, error)
CreateRole(json types.CreateRoleRequest) error
DestroyRole(string) (bool, error)
GetRole(string) [][]string
GetRoles() []string
AddPermissionForRole(string, types.AddPermissionForRoleRequest) (bool, error)
DeletePermissionForRole(string, types.DeletePermissionForRoleRequest) (bool, error)
CreateRole(context.Context, types.CreateRoleRequest) error
DestroyRole(context.Context, string) (bool, error)
GetRole(context.Context, string) [][]string
GetRoles(context.Context) []string
AddPermissionForRole(context.Context, string, types.AddPermissionForRoleRequest) (bool, error)
DeletePermissionForRole(context.Context, string, types.DeletePermissionForRoleRequest) (bool, error)
GetPermissions(*gin.Engine) []rbac.Permission
GetPermissions(context.Context, *gin.Engine) []rbac.Permission
CreateOauth(types.CreateOauthRequest) (*model.Oauth, error)
DestroyOauth(uint) error
UpdateOauth(uint, types.UpdateOauthRequest) (*model.Oauth, error)
GetOauth(uint) (*model.Oauth, error)
GetOauths(types.GetOauthsQuery) (*[]model.Oauth, error)
OauthTotalCount(types.GetOauthsQuery) (int64, error)
CreateOauth(context.Context, types.CreateOauthRequest) (*model.Oauth, error)
DestroyOauth(context.Context, uint) error
UpdateOauth(context.Context, uint, types.UpdateOauthRequest) (*model.Oauth, error)
GetOauth(context.Context, uint) (*model.Oauth, error)
GetOauths(context.Context, types.GetOauthsQuery) (*[]model.Oauth, error)
OauthTotalCount(context.Context, types.GetOauthsQuery) (int64, error)
CreateCDNCluster(types.CreateCDNClusterRequest) (*model.CDNCluster, error)
CreateCDNClusterWithSecurityGroupDomain(types.CreateCDNClusterRequest) (*model.CDNCluster, error)
DestroyCDNCluster(uint) error
UpdateCDNCluster(uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error)
UpdateCDNClusterWithSecurityGroupDomain(uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error)
GetCDNCluster(uint) (*model.CDNCluster, error)
GetCDNClusters(types.GetCDNClustersQuery) (*[]model.CDNCluster, error)
CDNClusterTotalCount(types.GetCDNClustersQuery) (int64, error)
AddCDNToCDNCluster(uint, uint) error
AddSchedulerClusterToCDNCluster(uint, uint) error
CreateCDNCluster(context.Context, types.CreateCDNClusterRequest) (*model.CDNCluster, error)
CreateCDNClusterWithSecurityGroupDomain(context.Context, types.CreateCDNClusterRequest) (*model.CDNCluster, error)
DestroyCDNCluster(context.Context, uint) error
UpdateCDNCluster(context.Context, uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error)
UpdateCDNClusterWithSecurityGroupDomain(context.Context, uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error)
GetCDNCluster(context.Context, uint) (*model.CDNCluster, error)
GetCDNClusters(context.Context, types.GetCDNClustersQuery) (*[]model.CDNCluster, error)
CDNClusterTotalCount(context.Context, types.GetCDNClustersQuery) (int64, error)
AddCDNToCDNCluster(context.Context, uint, uint) error
AddSchedulerClusterToCDNCluster(context.Context, uint, uint) error
CreateCDN(types.CreateCDNRequest) (*model.CDN, error)
DestroyCDN(uint) error
UpdateCDN(uint, types.UpdateCDNRequest) (*model.CDN, error)
GetCDN(uint) (*model.CDN, error)
GetCDNs(types.GetCDNsQuery) (*[]model.CDN, error)
CDNTotalCount(types.GetCDNsQuery) (int64, error)
CreateCDN(context.Context, types.CreateCDNRequest) (*model.CDN, error)
DestroyCDN(context.Context, uint) error
UpdateCDN(context.Context, uint, types.UpdateCDNRequest) (*model.CDN, error)
GetCDN(context.Context, uint) (*model.CDN, error)
GetCDNs(context.Context, types.GetCDNsQuery) (*[]model.CDN, error)
CDNTotalCount(context.Context, types.GetCDNsQuery) (int64, error)
CreateSchedulerCluster(types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error)
CreateSchedulerClusterWithSecurityGroupDomain(types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error)
DestroySchedulerCluster(uint) error
UpdateSchedulerCluster(uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error)
UpdateSchedulerClusterWithSecurityGroupDomain(uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error)
GetSchedulerCluster(uint) (*model.SchedulerCluster, error)
GetSchedulerClusters(types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error)
SchedulerClusterTotalCount(types.GetSchedulerClustersQuery) (int64, error)
AddSchedulerToSchedulerCluster(uint, uint) error
CreateSchedulerCluster(context.Context, types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error)
CreateSchedulerClusterWithSecurityGroupDomain(context.Context, types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error)
DestroySchedulerCluster(context.Context, uint) error
UpdateSchedulerCluster(context.Context, uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error)
UpdateSchedulerClusterWithSecurityGroupDomain(context.Context, uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error)
GetSchedulerCluster(context.Context, uint) (*model.SchedulerCluster, error)
GetSchedulerClusters(context.Context, types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error)
SchedulerClusterTotalCount(context.Context, types.GetSchedulerClustersQuery) (int64, error)
AddSchedulerToSchedulerCluster(context.Context, uint, uint) error
CreateScheduler(types.CreateSchedulerRequest) (*model.Scheduler, error)
DestroyScheduler(uint) error
UpdateScheduler(uint, types.UpdateSchedulerRequest) (*model.Scheduler, error)
GetScheduler(uint) (*model.Scheduler, error)
GetSchedulers(types.GetSchedulersQuery) (*[]model.Scheduler, error)
SchedulerTotalCount(types.GetSchedulersQuery) (int64, error)
CreateScheduler(context.Context, types.CreateSchedulerRequest) (*model.Scheduler, error)
DestroyScheduler(context.Context, uint) error
UpdateScheduler(context.Context, uint, types.UpdateSchedulerRequest) (*model.Scheduler, error)
GetScheduler(context.Context, uint) (*model.Scheduler, error)
GetSchedulers(context.Context, types.GetSchedulersQuery) (*[]model.Scheduler, error)
SchedulerTotalCount(context.Context, types.GetSchedulersQuery) (int64, error)
CreateSecurityGroup(types.CreateSecurityGroupRequest) (*model.SecurityGroup, error)
DestroySecurityGroup(uint) error
UpdateSecurityGroup(uint, types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error)
GetSecurityGroup(uint) (*model.SecurityGroup, error)
GetSecurityGroups(types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error)
SecurityGroupTotalCount(types.GetSecurityGroupsQuery) (int64, error)
AddSchedulerClusterToSecurityGroup(uint, uint) error
AddCDNClusterToSecurityGroup(uint, uint) error
CreateSecurityGroup(context.Context, types.CreateSecurityGroupRequest) (*model.SecurityGroup, error)
DestroySecurityGroup(context.Context, uint) error
UpdateSecurityGroup(context.Context, uint, types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error)
GetSecurityGroup(context.Context, uint) (*model.SecurityGroup, error)
GetSecurityGroups(context.Context, types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error)
SecurityGroupTotalCount(context.Context, types.GetSecurityGroupsQuery) (int64, error)
AddSchedulerClusterToSecurityGroup(context.Context, uint, uint) error
AddCDNClusterToSecurityGroup(context.Context, uint, uint) error
CreatePreheat(types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(string) (*types.Preheat, error)
CreateV1Preheat(types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error)
GetV1Preheat(string) (*types.GetV1PreheatResponse, error)
CreatePreheat(context.Context, types.CreatePreheatRequest) (*types.Preheat, error)
GetPreheat(context.Context, string) (*types.Preheat, error)
CreateV1Preheat(context.Context, types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error)
GetV1Preheat(context.Context, string) (*types.GetV1PreheatResponse, error)
}
type rest struct {

View File

@ -65,7 +65,7 @@ func (s *GRPC) GetCDN(ctx context.Context, req *manager.GetCDNRequest) (*manager
// Cache Miss
logger.Infof("%s cache miss", cacheKey)
cdn := model.CDN{}
if err := s.db.Preload("CDNCluster.SecurityGroup").First(&cdn, &model.CDN{
if err := s.db.WithContext(ctx).Preload("CDNCluster.SecurityGroup").First(&cdn, &model.CDN{
HostName: req.HostName,
CDNClusterID: uint(req.CdnClusterId),
}).Error; err != nil {
@ -125,7 +125,7 @@ func (s *GRPC) createCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m
CDNClusterID: uint(req.CdnClusterId),
}
if err := s.db.Create(&cdn).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&cdn).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
@ -143,7 +143,7 @@ func (s *GRPC) createCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m
func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*manager.CDN, error) {
cdn := model.CDN{}
if err := s.db.First(&cdn, model.CDN{
if err := s.db.WithContext(ctx).First(&cdn, model.CDN{
HostName: req.HostName,
CDNClusterID: uint(req.CdnClusterId),
}).Error; err != nil {
@ -153,7 +153,7 @@ func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.db.Model(&cdn).Updates(model.CDN{
if err := s.db.WithContext(ctx).Model(&cdn).Updates(model.CDN{
IDC: req.Idc,
Location: req.Location,
IP: req.Ip,
@ -165,7 +165,7 @@ func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m
}
if err := s.cache.Delete(
context.TODO(),
ctx,
cache.MakeCDNCacheKey(cdn.HostName, cdn.CDNClusterID),
); err != nil {
logger.Warnf("%s refresh keepalive status failed in cdn cluster %d", cdn.HostName, cdn.CDNClusterID)
@ -196,7 +196,7 @@ func (s *GRPC) GetScheduler(ctx context.Context, req *manager.GetSchedulerReques
// Cache Miss
logger.Infof("%s cache miss", cacheKey)
scheduler := model.Scheduler{}
if err := s.db.Preload("SchedulerCluster.SecurityGroup").Preload("SchedulerCluster.CDNClusters.CDNs", &model.CDN{
if err := s.db.WithContext(ctx).Preload("SchedulerCluster.SecurityGroup").Preload("SchedulerCluster.CDNClusters.CDNs", &model.CDN{
Status: model.CDNStatusActive,
}).First(&scheduler, &model.Scheduler{
HostName: req.HostName,
@ -307,7 +307,7 @@ func (s *GRPC) createScheduler(ctx context.Context, req *manager.UpdateScheduler
SchedulerClusterID: uint(req.SchedulerClusterId),
}
if err := s.db.Create(&scheduler).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
@ -327,7 +327,7 @@ func (s *GRPC) createScheduler(ctx context.Context, req *manager.UpdateScheduler
func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) {
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{
if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{
HostName: req.HostName,
SchedulerClusterID: uint(req.SchedulerClusterId),
}).Error; err != nil {
@ -344,7 +344,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
}
}
if err := s.db.Model(&scheduler).Updates(model.Scheduler{
if err := s.db.WithContext(ctx).Model(&scheduler).Updates(model.Scheduler{
VIPs: req.Vips,
IDC: req.Idc,
Location: req.Location,
@ -357,7 +357,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
}
if err := s.cache.Delete(
context.TODO(),
ctx,
cache.MakeSchedulerCacheKey(scheduler.HostName, scheduler.SchedulerClusterID),
); err != nil {
logger.Warnf("%s refresh keepalive status failed in scheduler cluster %d", scheduler.HostName, scheduler.SchedulerClusterID)
@ -390,14 +390,14 @@ func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRe
// Cache Miss
logger.Infof("%s cache miss", cacheKey)
var schedulerClusters []model.SchedulerCluster
if err := s.db.Preload("SecurityGroup").Find(&schedulerClusters).Error; err != nil {
if err := s.db.WithContext(ctx).Preload("SecurityGroup").Find(&schedulerClusters).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
// Search optimal scheduler cluster
schedulerCluster, ok := s.searcher.FindSchedulerCluster(schedulerClusters, req.HostInfo)
if !ok {
if err := s.db.Find(&schedulerCluster, &model.SchedulerCluster{
if err := s.db.WithContext(ctx).Find(&schedulerCluster, &model.SchedulerCluster{
IsDefault: true,
}).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
@ -405,7 +405,7 @@ func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRe
}
schedulers := []model.Scheduler{}
if err := s.db.Find(&schedulers, &model.Scheduler{
if err := s.db.WithContext(ctx).Find(&schedulers, &model.Scheduler{
Status: model.SchedulerStatusActive,
SchedulerClusterID: schedulerCluster.ID,
}).Error; err != nil {

View File

@ -17,6 +17,7 @@
package service
import (
"context"
"fmt"
"github.com/pkg/errors"
@ -30,18 +31,18 @@ import (
"golang.org/x/crypto/bcrypt"
)
func (s *rest) GetUser(id uint) (*model.User, error) {
func (s *rest) GetUser(ctx context.Context, id uint) (*model.User, error) {
user := model.User{}
if err := s.db.First(&user, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&user, id).Error; err != nil {
return nil, err
}
return &user, nil
}
func (s *rest) SignIn(json types.SignInRequest) (*model.User, error) {
func (s *rest) SignIn(ctx context.Context, json types.SignInRequest) (*model.User, error) {
user := model.User{}
if err := s.db.First(&user, model.User{
if err := s.db.WithContext(ctx).First(&user, model.User{
Name: json.Name,
}).Error; err != nil {
return nil, err
@ -54,9 +55,9 @@ func (s *rest) SignIn(json types.SignInRequest) (*model.User, error) {
return &user, nil
}
func (s *rest) ResetPassword(id uint, json types.ResetPasswordRequest) error {
func (s *rest) ResetPassword(ctx context.Context, id uint, json types.ResetPasswordRequest) error {
user := model.User{}
if err := s.db.First(&user, id).Error; err != nil {
if err := s.db.WithContext(ctx).First(&user, id).Error; err != nil {
return err
}
@ -69,7 +70,7 @@ func (s *rest) ResetPassword(id uint, json types.ResetPasswordRequest) error {
return err
}
if err := s.db.First(&user, id).Updates(model.User{
if err := s.db.WithContext(ctx).First(&user, id).Updates(model.User{
EncryptedPassword: string(encryptedPasswordBytes),
}).Error; err != nil {
return err
@ -78,7 +79,7 @@ func (s *rest) ResetPassword(id uint, json types.ResetPasswordRequest) error {
return nil
}
func (s *rest) SignUp(json types.SignUpRequest) (*model.User, error) {
func (s *rest) SignUp(ctx context.Context, json types.SignUpRequest) (*model.User, error) {
encryptedPasswordBytes, err := bcrypt.GenerateFromPassword([]byte(json.Password), bcrypt.MinCost)
if err != nil {
return nil, err
@ -95,7 +96,7 @@ func (s *rest) SignUp(json types.SignUpRequest) (*model.User, error) {
State: model.UserStateEnabled,
}
if err := s.db.Create(&user).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&user).Error; err != nil {
return nil, err
}
@ -106,9 +107,9 @@ func (s *rest) SignUp(json types.SignUpRequest) (*model.User, error) {
return &user, nil
}
func (s *rest) OauthSignin(name string) (string, error) {
func (s *rest) OauthSignin(ctx context.Context, name string) (string, error) {
oauth := model.Oauth{}
if err := s.db.First(&oauth, model.Oauth{Name: name}).Error; err != nil {
if err := s.db.WithContext(ctx).First(&oauth, model.Oauth{Name: name}).Error; err != nil {
return "", err
}
@ -120,9 +121,9 @@ func (s *rest) OauthSignin(name string) (string, error) {
return o.AuthCodeURL(), nil
}
func (s *rest) OauthSigninCallback(name, code string) (*model.User, error) {
func (s *rest) OauthSigninCallback(ctx context.Context, name, code string) (*model.User, error) {
oauth := model.Oauth{}
if err := s.db.First(&oauth, model.Oauth{Name: name}).Error; err != nil {
if err := s.db.WithContext(ctx).First(&oauth, model.Oauth{Name: name}).Error; err != nil {
return nil, err
}
@ -147,7 +148,7 @@ func (s *rest) OauthSigninCallback(name, code string) (*model.User, error) {
Avatar: oauthUser.Avatar,
State: model.UserStateEnabled,
}
if err := s.db.Create(&user).Error; err != nil {
if err := s.db.WithContext(ctx).Create(&user).Error; err != nil {
if err, ok := errors.Cause(err).(*mysql.MySQLError); ok && err.Number == mysqlerr.ER_DUP_ENTRY {
return &user, nil
}
@ -162,14 +163,14 @@ func (s *rest) OauthSigninCallback(name, code string) (*model.User, error) {
return &user, nil
}
func (s *rest) GetRolesForUser(id uint) ([]string, error) {
func (s *rest) GetRolesForUser(ctx context.Context, id uint) ([]string, error) {
return s.enforcer.GetRolesForUser(fmt.Sprint(id))
}
func (s *rest) AddRoleForUser(json types.AddRoleForUserParams) (bool, error) {
func (s *rest) AddRoleForUser(ctx context.Context, json types.AddRoleForUserParams) (bool, error) {
return s.enforcer.AddRoleForUser(fmt.Sprint(json.ID), json.Role)
}
func (s *rest) DeleteRoleForUser(json types.DeleteRoleForUserParams) (bool, error) {
func (s *rest) DeleteRoleForUser(ctx context.Context, json types.DeleteRoleForUserParams) (bool, error) {
return s.enforcer.DeleteRoleForUser(fmt.Sprint(json.ID), json.Role)
}

View File

@ -47,6 +47,7 @@ const (
SpanReportPieceResult = "report-piece-result"
SpanReportPeerResult = "report-peer-result"
SpanPeerLeave = "peer-leave"
SpanPreheat = "preheat"
)
const (

View File

@ -27,8 +27,12 @@ import (
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/core"
"github.com/go-playground/validator/v10"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
var tracer = otel.Tracer("worker")
type Job interface {
Serve() error
Stop()
@ -125,7 +129,12 @@ func (t *job) Stop() {
t.localJob.Worker.Quit()
}
func (t *job) preheat(req string) error {
func (t *job) preheat(ctx context.Context, req string) error {
// machinery can't passing context to worker, refer https://github.com/RichardKnop/machinery/issues/175
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindConsumer))
defer span.End()
request := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(req, request); err != nil {
logger.Errorf("unmarshal request err: %v, request body: %s", err, req)
@ -158,7 +167,7 @@ func (t *job) preheat(req string) error {
// Trigger CDN download seeds
plogger := logger.WithTaskIDAndURL(taskID, request.URL)
plogger.Info("ready to preheat")
stream, err := t.service.CDN.GetClient().ObtainSeeds(t.ctx, &cdnsystem.SeedRequest{
stream, err := t.service.CDN.GetClient().ObtainSeeds(ctx, &cdnsystem.SeedRequest{
TaskId: taskID,
Url: request.URL,
UrlMeta: meta,