diff --git a/manager/config/constant_otel.go b/manager/config/constant_otel.go index 2f1d412ca..086c38fdc 100644 --- a/manager/config/constant_otel.go +++ b/manager/config/constant_otel.go @@ -19,5 +19,13 @@ package config import "go.opentelemetry.io/otel/attribute" const ( - AttributeID = attribute.Key("d7y.manager.id") + AttributeID = attribute.Key("d7y.manager.id") + AttributePreheatType = attribute.Key("d7y.manager.preheat.type") + AttributePreheatURL = attribute.Key("d7y.manager.preheat.url") +) + +const ( + SpanPreheat = "preheat" + SpanGetLayers = "get-layers" + SpanAuthWithRegistry = "auth-with-registry" ) diff --git a/manager/handlers/cdn.go b/manager/handlers/cdn.go index fe53a5b17..d85c07297 100644 --- a/manager/handlers/cdn.go +++ b/manager/handlers/cdn.go @@ -41,7 +41,7 @@ func (h *Handlers) CreateCDN(ctx *gin.Context) { return } - cdn, err := h.service.CreateCDN(json) + cdn, err := h.service.CreateCDN(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -68,7 +68,7 @@ func (h *Handlers) DestroyCDN(ctx *gin.Context) { return } - if err := h.service.DestroyCDN(params.ID); err != nil { + if err := h.service.DestroyCDN(ctx.Request.Context(), params.ID); err != nil { ctx.Error(err) return } @@ -101,7 +101,7 @@ func (h *Handlers) UpdateCDN(ctx *gin.Context) { return } - cdn, err := h.service.UpdateCDN(params.ID, json) + cdn, err := h.service.UpdateCDN(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -128,7 +128,7 @@ func (h *Handlers) GetCDN(ctx *gin.Context) { return } - cdn, err := h.service.GetCDN(params.ID) + cdn, err := h.service.GetCDN(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -157,13 +157,13 @@ func (h *Handlers) GetCDNs(ctx *gin.Context) { } h.setPaginationDefault(&query.Page, &query.PerPage) - cdns, err := h.service.GetCDNs(query) + cdns, err := h.service.GetCDNs(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return } - totalCount, err := h.service.CDNTotalCount(query) + totalCount, err := h.service.CDNTotalCount(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return diff --git a/manager/handlers/cdn_cluster.go b/manager/handlers/cdn_cluster.go index 34def4c24..7ffc4d8ac 100644 --- a/manager/handlers/cdn_cluster.go +++ b/manager/handlers/cdn_cluster.go @@ -42,7 +42,7 @@ func (h *Handlers) CreateCDNCluster(ctx *gin.Context) { } if json.SecurityGroupDomain != "" { - cdn, err := h.service.CreateCDNClusterWithSecurityGroupDomain(json) + cdn, err := h.service.CreateCDNClusterWithSecurityGroupDomain(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -52,7 +52,7 @@ func (h *Handlers) CreateCDNCluster(ctx *gin.Context) { return } - cdnCluster, err := h.service.CreateCDNCluster(json) + cdnCluster, err := h.service.CreateCDNCluster(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -79,7 +79,7 @@ func (h *Handlers) DestroyCDNCluster(ctx *gin.Context) { return } - if err := h.service.DestroyCDNCluster(params.ID); err != nil { + if err := h.service.DestroyCDNCluster(ctx.Request.Context(), params.ID); err != nil { ctx.Error(err) return } @@ -113,7 +113,7 @@ func (h *Handlers) UpdateCDNCluster(ctx *gin.Context) { } if json.SecurityGroupDomain != "" { - cdn, err := h.service.UpdateCDNClusterWithSecurityGroupDomain(params.ID, json) + cdn, err := h.service.UpdateCDNClusterWithSecurityGroupDomain(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -123,7 +123,7 @@ func (h *Handlers) UpdateCDNCluster(ctx *gin.Context) { return } - cdnCluster, err := h.service.UpdateCDNCluster(params.ID, json) + cdnCluster, err := h.service.UpdateCDNCluster(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -150,7 +150,7 @@ func (h *Handlers) GetCDNCluster(ctx *gin.Context) { return } - cdnCluster, err := h.service.GetCDNCluster(params.ID) + cdnCluster, err := h.service.GetCDNCluster(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -179,13 +179,13 @@ func (h *Handlers) GetCDNClusters(ctx *gin.Context) { } h.setPaginationDefault(&query.Page, &query.PerPage) - cdns, err := h.service.GetCDNClusters(query) + cdns, err := h.service.GetCDNClusters(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return } - totalCount, err := h.service.CDNClusterTotalCount(query) + totalCount, err := h.service.CDNClusterTotalCount(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return @@ -214,7 +214,7 @@ func (h *Handlers) AddCDNToCDNCluster(ctx *gin.Context) { return } - if err := h.service.AddCDNToCDNCluster(params.ID, params.CDNID); err != nil { + if err := h.service.AddCDNToCDNCluster(ctx.Request.Context(), params.ID, params.CDNID); err != nil { ctx.Error(err) return } @@ -241,7 +241,7 @@ func (h *Handlers) AddSchedulerClusterToCDNCluster(ctx *gin.Context) { return } - if err := h.service.AddSchedulerClusterToCDNCluster(params.ID, params.SchedulerClusterID); err != nil { + if err := h.service.AddSchedulerClusterToCDNCluster(ctx.Request.Context(), params.ID, params.SchedulerClusterID); err != nil { ctx.Error(err) return } diff --git a/manager/handlers/oauth.go b/manager/handlers/oauth.go index 2c6f09780..cabc98a52 100644 --- a/manager/handlers/oauth.go +++ b/manager/handlers/oauth.go @@ -41,7 +41,7 @@ func (h *Handlers) CreateOauth(ctx *gin.Context) { return } - oauth, err := h.service.CreateOauth(json) + oauth, err := h.service.CreateOauth(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -68,7 +68,7 @@ func (h *Handlers) DestroyOauth(ctx *gin.Context) { return } - if err := h.service.DestroyOauth(params.ID); err != nil { + if err := h.service.DestroyOauth(ctx.Request.Context(), params.ID); err != nil { ctx.Error(err) return } @@ -101,7 +101,7 @@ func (h *Handlers) UpdateOauth(ctx *gin.Context) { return } - oauth, err := h.service.UpdateOauth(params.ID, json) + oauth, err := h.service.UpdateOauth(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -128,7 +128,7 @@ func (h *Handlers) GetOauth(ctx *gin.Context) { return } - oauth, err := h.service.GetOauth(params.ID) + oauth, err := h.service.GetOauth(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -157,13 +157,13 @@ func (h *Handlers) GetOauths(ctx *gin.Context) { } h.setPaginationDefault(&query.Page, &query.PerPage) - oauth, err := h.service.GetOauths(query) + oauth, err := h.service.GetOauths(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return } - totalCount, err := h.service.OauthTotalCount(query) + totalCount, err := h.service.OauthTotalCount(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return diff --git a/manager/handlers/permission.go b/manager/handlers/permission.go index 51e9d68db..8d43f2453 100644 --- a/manager/handlers/permission.go +++ b/manager/handlers/permission.go @@ -32,6 +32,6 @@ import ( // @Router /permissions [get] func (h *Handlers) GetPermissions(g *gin.Engine) func(ctx *gin.Context) { return func(ctx *gin.Context) { - ctx.JSON(http.StatusOK, h.service.GetPermissions(g)) + ctx.JSON(http.StatusOK, h.service.GetPermissions(ctx.Request.Context(), g)) } } diff --git a/manager/handlers/preheat.go b/manager/handlers/preheat.go index bb32b4370..8353a5dff 100644 --- a/manager/handlers/preheat.go +++ b/manager/handlers/preheat.go @@ -41,7 +41,7 @@ func (h *Handlers) CreatePreheat(ctx *gin.Context) { return } - preheat, err := h.service.CreatePreheat(json) + preheat, err := h.service.CreatePreheat(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -68,7 +68,7 @@ func (h *Handlers) GetPreheat(ctx *gin.Context) { return } - preheat, err := h.service.GetPreheat(params.ID) + preheat, err := h.service.GetPreheat(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -95,7 +95,7 @@ func (h *Handlers) CreateV1Preheat(ctx *gin.Context) { return } - preheat, err := h.service.CreateV1Preheat(json) + preheat, err := h.service.CreateV1Preheat(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -122,7 +122,7 @@ func (h *Handlers) GetV1Preheat(ctx *gin.Context) { return } - preheat, err := h.service.GetV1Preheat(params.ID) + preheat, err := h.service.GetV1Preheat(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return diff --git a/manager/handlers/role.go b/manager/handlers/role.go index 318da93a3..085d3a54c 100644 --- a/manager/handlers/role.go +++ b/manager/handlers/role.go @@ -40,7 +40,7 @@ func (h *Handlers) CreateRole(ctx *gin.Context) { return } - if err := h.service.CreateRole(json); err != nil { + if err := h.service.CreateRole(ctx.Request.Context(), json); err != nil { ctx.Error(err) return } @@ -65,7 +65,7 @@ func (h *Handlers) DestroyRole(ctx *gin.Context) { return } - if ok, err := h.service.DestroyRole(params.Role); err != nil { + if ok, err := h.service.DestroyRole(ctx.Request.Context(), params.Role); err != nil { ctx.Error(err) return } else if !ok { @@ -93,7 +93,7 @@ func (h *Handlers) GetRole(ctx *gin.Context) { return } - ctx.JSON(http.StatusOK, h.service.GetRole(params.Role)) + ctx.JSON(http.StatusOK, h.service.GetRole(ctx.Request.Context(), params.Role)) } // @Summary Get Roles @@ -106,7 +106,7 @@ func (h *Handlers) GetRole(ctx *gin.Context) { // @Failure 500 // @Router /roles [get] func (h *Handlers) GetRoles(ctx *gin.Context) { - roles := h.service.GetRoles() + roles := h.service.GetRoles(ctx.Request.Context()) ctx.JSON(http.StatusOK, roles) } @@ -134,7 +134,7 @@ func (h *Handlers) AddPermissionForRole(ctx *gin.Context) { return } - if ok, err := h.service.AddPermissionForRole(params.Role, json); err != nil { + if ok, err := h.service.AddPermissionForRole(ctx.Request.Context(), params.Role, json); err != nil { ctx.Error(err) return } else if !ok { @@ -169,7 +169,7 @@ func (h *Handlers) DeletePermissionForRole(ctx *gin.Context) { return } - if ok, err := h.service.DeletePermissionForRole(params.Role, json); err != nil { + if ok, err := h.service.DeletePermissionForRole(ctx.Request.Context(), params.Role, json); err != nil { ctx.Error(err) return } else if !ok { diff --git a/manager/handlers/scheduler.go b/manager/handlers/scheduler.go index ce3d799a5..856569850 100644 --- a/manager/handlers/scheduler.go +++ b/manager/handlers/scheduler.go @@ -41,7 +41,7 @@ func (h *Handlers) CreateScheduler(ctx *gin.Context) { return } - scheduler, err := h.service.CreateScheduler(json) + scheduler, err := h.service.CreateScheduler(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -68,7 +68,7 @@ func (h *Handlers) DestroyScheduler(ctx *gin.Context) { return } - if err := h.service.DestroyScheduler(params.ID); err != nil { + if err := h.service.DestroyScheduler(ctx.Request.Context(), params.ID); err != nil { ctx.Error(err) return } @@ -101,7 +101,7 @@ func (h *Handlers) UpdateScheduler(ctx *gin.Context) { return } - scheduler, err := h.service.UpdateScheduler(params.ID, json) + scheduler, err := h.service.UpdateScheduler(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -128,7 +128,7 @@ func (h *Handlers) GetScheduler(ctx *gin.Context) { return } - scheduler, err := h.service.GetScheduler(params.ID) + scheduler, err := h.service.GetScheduler(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -157,13 +157,13 @@ func (h *Handlers) GetSchedulers(ctx *gin.Context) { } h.setPaginationDefault(&query.Page, &query.PerPage) - schedulers, err := h.service.GetSchedulers(query) + schedulers, err := h.service.GetSchedulers(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return } - totalCount, err := h.service.SchedulerTotalCount(query) + totalCount, err := h.service.SchedulerTotalCount(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return diff --git a/manager/handlers/scheduler_cluster.go b/manager/handlers/scheduler_cluster.go index 095e63a96..a54b2460b 100644 --- a/manager/handlers/scheduler_cluster.go +++ b/manager/handlers/scheduler_cluster.go @@ -42,7 +42,7 @@ func (h *Handlers) CreateSchedulerCluster(ctx *gin.Context) { } if json.SecurityGroupDomain != "" { - scheduler, err := h.service.CreateSchedulerClusterWithSecurityGroupDomain(json) + scheduler, err := h.service.CreateSchedulerClusterWithSecurityGroupDomain(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -52,7 +52,7 @@ func (h *Handlers) CreateSchedulerCluster(ctx *gin.Context) { return } - schedulerCluster, err := h.service.CreateSchedulerCluster(json) + schedulerCluster, err := h.service.CreateSchedulerCluster(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -79,7 +79,7 @@ func (h *Handlers) DestroySchedulerCluster(ctx *gin.Context) { return } - if err := h.service.DestroySchedulerCluster(params.ID); err != nil { + if err := h.service.DestroySchedulerCluster(ctx.Request.Context(), params.ID); err != nil { ctx.Error(err) return } @@ -113,7 +113,7 @@ func (h *Handlers) UpdateSchedulerCluster(ctx *gin.Context) { } if json.SecurityGroupDomain != "" { - scheduler, err := h.service.UpdateSchedulerClusterWithSecurityGroupDomain(params.ID, json) + scheduler, err := h.service.UpdateSchedulerClusterWithSecurityGroupDomain(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -123,7 +123,7 @@ func (h *Handlers) UpdateSchedulerCluster(ctx *gin.Context) { return } - schedulerCluster, err := h.service.UpdateSchedulerCluster(params.ID, json) + schedulerCluster, err := h.service.UpdateSchedulerCluster(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -150,7 +150,7 @@ func (h *Handlers) GetSchedulerCluster(ctx *gin.Context) { return } - schedulerCluster, err := h.service.GetSchedulerCluster(params.ID) + schedulerCluster, err := h.service.GetSchedulerCluster(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -179,13 +179,13 @@ func (h *Handlers) GetSchedulerClusters(ctx *gin.Context) { } h.setPaginationDefault(&query.Page, &query.PerPage) - schedulerClusters, err := h.service.GetSchedulerClusters(query) + schedulerClusters, err := h.service.GetSchedulerClusters(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return } - totalCount, err := h.service.SchedulerClusterTotalCount(query) + totalCount, err := h.service.SchedulerClusterTotalCount(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return @@ -214,7 +214,7 @@ func (h *Handlers) AddSchedulerToSchedulerCluster(ctx *gin.Context) { return } - err := h.service.AddSchedulerToSchedulerCluster(params.ID, params.SchedulerID) + err := h.service.AddSchedulerToSchedulerCluster(ctx.Request.Context(), params.ID, params.SchedulerID) if err != nil { ctx.Error(err) return diff --git a/manager/handlers/security_group.go b/manager/handlers/security_group.go index 2457688ba..3356bff32 100644 --- a/manager/handlers/security_group.go +++ b/manager/handlers/security_group.go @@ -41,7 +41,7 @@ func (h *Handlers) CreateSecurityGroup(ctx *gin.Context) { return } - securityGroup, err := h.service.CreateSecurityGroup(json) + securityGroup, err := h.service.CreateSecurityGroup(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -68,7 +68,7 @@ func (h *Handlers) DestroySecurityGroup(ctx *gin.Context) { return } - if err := h.service.DestroySecurityGroup(params.ID); err != nil { + if err := h.service.DestroySecurityGroup(ctx.Request.Context(), params.ID); err != nil { ctx.Error(err) return } @@ -101,7 +101,7 @@ func (h *Handlers) UpdateSecurityGroup(ctx *gin.Context) { return } - securityGroup, err := h.service.UpdateSecurityGroup(params.ID, json) + securityGroup, err := h.service.UpdateSecurityGroup(ctx.Request.Context(), params.ID, json) if err != nil { ctx.Error(err) return @@ -128,7 +128,7 @@ func (h *Handlers) GetSecurityGroup(ctx *gin.Context) { return } - securityGroup, err := h.service.GetSecurityGroup(params.ID) + securityGroup, err := h.service.GetSecurityGroup(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -157,13 +157,13 @@ func (h *Handlers) GetSecurityGroups(ctx *gin.Context) { } h.setPaginationDefault(&query.Page, &query.PerPage) - securityGroups, err := h.service.GetSecurityGroups(query) + securityGroups, err := h.service.GetSecurityGroups(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return } - totalCount, err := h.service.SecurityGroupTotalCount(query) + totalCount, err := h.service.SecurityGroupTotalCount(ctx.Request.Context(), query) if err != nil { ctx.Error(err) return @@ -192,7 +192,7 @@ func (h *Handlers) AddSchedulerClusterToSecurityGroup(ctx *gin.Context) { return } - err := h.service.AddSchedulerClusterToSecurityGroup(params.ID, params.SchedulerClusterID) + err := h.service.AddSchedulerClusterToSecurityGroup(ctx.Request.Context(), params.ID, params.SchedulerClusterID) if err != nil { ctx.Error(err) return @@ -220,7 +220,7 @@ func (h *Handlers) AddCDNClusterToSecurityGroup(ctx *gin.Context) { return } - err := h.service.AddCDNClusterToSecurityGroup(params.ID, params.CDNClusterID) + err := h.service.AddCDNClusterToSecurityGroup(ctx.Request.Context(), params.ID, params.CDNClusterID) if err != nil { ctx.Error(err) return diff --git a/manager/handlers/user.go b/manager/handlers/user.go index 5cc405e2e..c66086e7e 100644 --- a/manager/handlers/user.go +++ b/manager/handlers/user.go @@ -42,7 +42,7 @@ func (h *Handlers) GetUser(ctx *gin.Context) { return } - user, err := h.service.GetUser(params.ID) + user, err := h.service.GetUser(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -68,7 +68,7 @@ func (h *Handlers) SignUp(ctx *gin.Context) { return } - user, err := h.service.SignUp(json) + user, err := h.service.SignUp(ctx.Request.Context(), json) if err != nil { ctx.Error(err) return @@ -100,7 +100,7 @@ func (h *Handlers) ResetPassword(ctx *gin.Context) { return } - if err := h.service.ResetPassword(params.ID, json); err != nil { + if err := h.service.ResetPassword(ctx.Request.Context(), params.ID, json); err != nil { ctx.Error(err) return } @@ -126,7 +126,7 @@ func (h *Handlers) OauthSignin(ctx *gin.Context) { return } - authURL, err := h.service.OauthSignin(params.Name) + authURL, err := h.service.OauthSignin(ctx.Request.Context(), params.Name) if err != nil { ctx.Error(err) return @@ -159,7 +159,7 @@ func (h *Handlers) OauthSigninCallback(j *jwt.GinJWTMiddleware) func(*gin.Contex return } - user, err := h.service.OauthSigninCallback(params.Name, query.Code) + user, err := h.service.OauthSigninCallback(ctx.Request.Context(), params.Name, query.Code) if err != nil { ctx.Error(err) return @@ -186,7 +186,7 @@ func (h *Handlers) GetRolesForUser(ctx *gin.Context) { return } - roles, err := h.service.GetRolesForUser(params.ID) + roles, err := h.service.GetRolesForUser(ctx.Request.Context(), params.ID) if err != nil { ctx.Error(err) return @@ -213,7 +213,7 @@ func (h *Handlers) AddRoleToUser(ctx *gin.Context) { return } - if ok, err := h.service.AddRoleForUser(params); err != nil { + if ok, err := h.service.AddRoleForUser(ctx.Request.Context(), params); err != nil { ctx.Error(err) return } else if !ok { @@ -242,7 +242,7 @@ func (h *Handlers) DeleteRoleForUser(ctx *gin.Context) { return } - if ok, err := h.service.DeleteRoleForUser(params); err != nil { + if ok, err := h.service.DeleteRoleForUser(ctx.Request.Context(), params); err != nil { ctx.Error(err) return } else if !ok { diff --git a/manager/job/preheat.go b/manager/job/preheat.go index f6f51897c..5935bc8f5 100644 --- a/manager/job/preheat.go +++ b/manager/job/preheat.go @@ -17,6 +17,7 @@ package job import ( + "context" "encoding/json" "errors" "fmt" @@ -28,14 +29,19 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/util/net/httputils" machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/manifest/schema2" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) +var tracer = otel.Tracer("sender") + type PreheatType string const ( @@ -46,8 +52,8 @@ const ( var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)") type Preheat interface { - CreatePreheat([]model.Scheduler, types.CreatePreheatRequest) (*types.Preheat, error) - GetPreheat(string) (*types.Preheat, error) + CreatePreheat(context.Context, []model.Scheduler, types.CreatePreheatRequest) (*types.Preheat, error) + GetPreheat(context.Context, string) (*types.Preheat, error) } type preheat struct { @@ -69,7 +75,7 @@ func newPreheat(job *internaljob.Job, bizTag string) (Preheat, error) { }, nil } -func (p *preheat) GetPreheat(id string) (*types.Preheat, error) { +func (p *preheat) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) { groupJobState, err := p.job.GetGroupJobState(id) if err != nil { return nil, err @@ -82,7 +88,13 @@ func (p *preheat) GetPreheat(id string) (*types.Preheat, error) { }, nil } -func (p *preheat) CreatePreheat(schedulers []model.Scheduler, json types.CreatePreheatRequest) (*types.Preheat, error) { +func (p *preheat) CreatePreheat(ctx context.Context, schedulers []model.Scheduler, json types.CreatePreheatRequest) (*types.Preheat, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributePreheatType.String(json.Type)) + span.SetAttributes(config.AttributePreheatURL.String(json.URL)) + defer span.End() + url := json.URL filter := json.Filter rawheader := json.Headers @@ -100,7 +112,7 @@ func (p *preheat) CreatePreheat(schedulers []model.Scheduler, json types.CreateP return nil, err } - files, err = p.getLayers(url, filter, httputils.MapToHeader(rawheader), image) + files, err = p.getLayers(ctx, url, filter, httputils.MapToHeader(rawheader), image) if err != nil { return nil, err } @@ -121,10 +133,10 @@ func (p *preheat) CreatePreheat(schedulers []model.Scheduler, json types.CreateP logger.Infof("preheat %s file url: %v queues: %v", json.URL, f.URL, queues) } - return p.createGroupJob(files, queues) + return p.createGroupJob(ctx, files, queues) } -func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*types.Preheat, error) { +func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.PreheatRequest, queues []internaljob.Queue) (*types.Preheat, error) { signatures := []*machineryv1tasks.Signature{} var urls []string for i := range files { @@ -151,7 +163,7 @@ func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []i return nil, err } - if _, err := p.job.Server.SendGroup(group, 0); err != nil { + if _, err := p.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { logger.Error("create preheat group job failed", err) return nil, err } @@ -164,8 +176,11 @@ func (p *preheat) createGroupJob(files []*internaljob.PreheatRequest, queues []i }, nil } -func (p *preheat) getLayers(url string, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) { - resp, err := p.getManifests(url, header) +func (p *preheat) getLayers(ctx context.Context, url string, filter string, header http.Header, image *preheatImage) ([]*internaljob.PreheatRequest, error) { + ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer)) + defer span.End() + + resp, err := p.getManifests(ctx, url, header) if err != nil { return nil, err } @@ -173,11 +188,11 @@ func (p *preheat) getLayers(url string, filter string, header http.Header, image if resp.StatusCode/100 != 2 { if resp.StatusCode == http.StatusUnauthorized { - token := getAuthToken(resp.Header) + token := getAuthToken(ctx, resp.Header) bearer := "Bearer " + token header.Add("Authorization", bearer) - resp, err = p.getManifests(url, header) + resp, err = p.getManifests(ctx, url, header) if err != nil { return nil, err } @@ -194,8 +209,8 @@ func (p *preheat) getLayers(url string, filter string, header http.Header, image return layers, nil } -func (p *preheat) getManifests(url string, header http.Header) (*http.Response, error) { - req, err := http.NewRequest("GET", url, nil) +func (p *preheat) getManifests(ctx context.Context, url string, header http.Header) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, err } @@ -239,13 +254,21 @@ func (p *preheat) parseLayers(resp *http.Response, url, filter string, header ht return layers, nil } -func getAuthToken(header http.Header) (token string) { +func getAuthToken(ctx context.Context, header http.Header) (token string) { + ctx, span := tracer.Start(ctx, config.SpanAuthWithRegistry, trace.WithSpanKind(trace.SpanKindProducer)) + defer span.End() + authURL := authURL(header.Values("WWW-Authenticate")) if len(authURL) == 0 { return } - resp, err := http.Get(authURL) + req, err := http.NewRequestWithContext(ctx, "GET", authURL, nil) + if err != nil { + return + } + + resp, err := http.DefaultClient.Do(req) if err != nil { return } diff --git a/manager/middlewares/jwt.go b/manager/middlewares/jwt.go index 9facb585f..ac2e913fa 100644 --- a/manager/middlewares/jwt.go +++ b/manager/middlewares/jwt.go @@ -17,6 +17,7 @@ package middlewares import ( + "context" "net/http" "time" @@ -69,7 +70,7 @@ func Jwt(service service.REST) (*jwt.GinJWTMiddleware, error) { return "", jwt.ErrMissingLoginValues } - user, err := service.SignIn(json) + user, err := service.SignIn(context.TODO(), json) if err != nil { return "", jwt.ErrFailedAuthentication } diff --git a/manager/middlewares/tracer.go b/manager/middlewares/tracer.go deleted file mode 100644 index 771b7d90e..000000000 --- a/manager/middlewares/tracer.go +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package middlewares - -import ( - "d7y.io/dragonfly/v2/manager/config" - "github.com/gin-gonic/gin" - "go.opentelemetry.io/otel" -) - -const ( - TracerName = "dragonfly-manager-rest" -) - -func Tracer() gin.HandlerFunc { - return func(c *gin.Context) { - tracer := otel.Tracer(TracerName) - _, span := tracer.Start(c.Request.Context(), c.HandlerName()) - span.SetAttributes(config.AttributeID.Float64(c.GetFloat64("id"))) - defer span.End() - c.Next() - } -} diff --git a/manager/router/router.go b/manager/router/router.go index db7e54a86..7658346ff 100644 --- a/manager/router/router.go +++ b/manager/router/router.go @@ -96,20 +96,20 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( // User u := apiv1.Group("/users") - u.GET("/:id", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.GetUser) - u.POST("/signin", middlewares.Tracer(), jwt.LoginHandler) - u.POST("/signout", middlewares.Tracer(), jwt.LogoutHandler) - u.POST("/signup", middlewares.Tracer(), h.SignUp) - u.GET("/signin/:name", middlewares.Tracer(), h.OauthSignin) - u.GET("/signin/:name/callback", middlewares.Tracer(), h.OauthSigninCallback(jwt)) - u.POST("/refresh_token", middlewares.Tracer(), jwt.RefreshHandler) - u.POST("/:id/reset_password", middlewares.Tracer(), h.ResetPassword) - u.GET("/:id/roles", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.GetRolesForUser) - u.PUT("/:id/roles/:role", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.AddRoleToUser) - u.DELETE("/:id/roles/:role", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, h.DeleteRoleForUser) + u.GET("/:id", jwt.MiddlewareFunc(), rbac, h.GetUser) + u.POST("/signin", jwt.LoginHandler) + u.POST("/signout", jwt.LogoutHandler) + u.POST("/signup", h.SignUp) + u.GET("/signin/:name", h.OauthSignin) + u.GET("/signin/:name/callback", h.OauthSigninCallback(jwt)) + u.POST("/refresh_token", jwt.RefreshHandler) + u.POST("/:id/reset_password", h.ResetPassword) + u.GET("/:id/roles", jwt.MiddlewareFunc(), rbac, h.GetRolesForUser) + u.PUT("/:id/roles/:role", jwt.MiddlewareFunc(), rbac, h.AddRoleToUser) + u.DELETE("/:id/roles/:role", jwt.MiddlewareFunc(), rbac, h.DeleteRoleForUser) // Role - re := apiv1.Group("/roles", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac) + re := apiv1.Group("/roles", jwt.MiddlewareFunc(), rbac) re.POST("", h.CreateRole) re.DELETE("/:role", h.DestroyRole) re.GET("/:role", h.GetRole) @@ -118,11 +118,11 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( re.DELETE("/:role/permissions", h.DeletePermissionForRole) // Permission - pm := apiv1.Group("/permissions", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac) + pm := apiv1.Group("/permissions", jwt.MiddlewareFunc(), rbac) pm.GET("", h.GetPermissions(r)) // Oauth - oa := apiv1.Group("/oauth", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac) + oa := apiv1.Group("/oauth", jwt.MiddlewareFunc(), rbac) oa.POST("", h.CreateOauth) oa.DELETE(":id", h.DestroyOauth) oa.PATCH(":id", h.UpdateOauth) @@ -130,7 +130,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( oa.GET("", h.GetOauths) // Scheduler Cluster - sc := apiv1.Group("/scheduler-clusters", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac, middlewares.Tracer()) + sc := apiv1.Group("/scheduler-clusters", jwt.MiddlewareFunc(), rbac) sc.POST("", h.CreateSchedulerCluster) sc.DELETE(":id", h.DestroySchedulerCluster) sc.PATCH(":id", h.UpdateSchedulerCluster) @@ -139,7 +139,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( sc.PUT(":id/schedulers/:scheduler_id", h.AddSchedulerToSchedulerCluster) // Scheduler - s := apiv1.Group("/schedulers", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac) + s := apiv1.Group("/schedulers", jwt.MiddlewareFunc(), rbac) s.POST("", h.CreateScheduler) s.DELETE(":id", h.DestroyScheduler) s.PATCH(":id", h.UpdateScheduler) @@ -147,7 +147,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( s.GET("", h.GetSchedulers) // CDN Cluster - cc := apiv1.Group("/cdn-clusters", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac) + cc := apiv1.Group("/cdn-clusters", jwt.MiddlewareFunc(), rbac) cc.POST("", h.CreateCDNCluster) cc.DELETE(":id", h.DestroyCDNCluster) cc.PATCH(":id", h.UpdateCDNCluster) @@ -157,7 +157,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( cc.PUT(":id/scheduler-clusters/:scheduler_cluster_id", h.AddSchedulerClusterToCDNCluster) // CDN - c := apiv1.Group("/cdns", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac) + c := apiv1.Group("/cdns", jwt.MiddlewareFunc(), rbac) c.POST("", h.CreateCDN) c.DELETE(":id", h.DestroyCDN) c.PATCH(":id", h.UpdateCDN) @@ -165,7 +165,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( c.GET("", h.GetCDNs) // Security Group - sg := apiv1.Group("/security-groups", jwt.MiddlewareFunc(), middlewares.Tracer(), rbac) + sg := apiv1.Group("/security-groups", jwt.MiddlewareFunc(), rbac) sg.POST("", h.CreateSecurityGroup) sg.DELETE(":id", h.DestroySecurityGroup) sg.PATCH(":id", h.UpdateSecurityGroup) @@ -175,7 +175,7 @@ func Init(cfg *config.Config, service service.REST, enforcer *casbin.Enforcer) ( sg.PUT(":id/cdn-clusters/:cdn_cluster_id", h.AddCDNClusterToSecurityGroup) // Preheat - ph := apiv1.Group("/preheats", middlewares.Tracer()) + ph := apiv1.Group("/preheats") ph.POST("", h.CreatePreheat) ph.GET(":id", h.GetPreheat) diff --git a/manager/service/cdn.go b/manager/service/cdn.go index 3aad015f8..489cf3878 100644 --- a/manager/service/cdn.go +++ b/manager/service/cdn.go @@ -17,11 +17,13 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" ) -func (s *rest) CreateCDN(json types.CreateCDNRequest) (*model.CDN, error) { +func (s *rest) CreateCDN(ctx context.Context, json types.CreateCDNRequest) (*model.CDN, error) { cdn := model.CDN{ HostName: json.HostName, IDC: json.IDC, @@ -32,29 +34,29 @@ func (s *rest) CreateCDN(json types.CreateCDNRequest) (*model.CDN, error) { CDNClusterID: json.CDNClusterID, } - if err := s.db.Create(&cdn).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&cdn).Error; err != nil { return nil, err } return &cdn, nil } -func (s *rest) DestroyCDN(id uint) error { +func (s *rest) DestroyCDN(ctx context.Context, id uint) error { cdn := model.CDN{} - if err := s.db.First(&cdn, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdn, id).Error; err != nil { return err } - if err := s.db.Unscoped().Delete(&model.CDN{}, id).Error; err != nil { + if err := s.db.WithContext(ctx).Unscoped().Delete(&model.CDN{}, id).Error; err != nil { return err } return nil } -func (s *rest) UpdateCDN(id uint, json types.UpdateCDNRequest) (*model.CDN, error) { +func (s *rest) UpdateCDN(ctx context.Context, id uint, json types.UpdateCDNRequest) (*model.CDN, error) { cdn := model.CDN{} - if err := s.db.First(&cdn, id).Updates(model.CDN{ + if err := s.db.WithContext(ctx).First(&cdn, id).Updates(model.CDN{ IDC: json.IDC, Location: json.Location, IP: json.IP, @@ -68,18 +70,18 @@ func (s *rest) UpdateCDN(id uint, json types.UpdateCDNRequest) (*model.CDN, erro return &cdn, nil } -func (s *rest) GetCDN(id uint) (*model.CDN, error) { +func (s *rest) GetCDN(ctx context.Context, id uint) (*model.CDN, error) { cdn := model.CDN{} - if err := s.db.First(&cdn, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdn, id).Error; err != nil { return nil, err } return &cdn, nil } -func (s *rest) GetCDNs(q types.GetCDNsQuery) (*[]model.CDN, error) { +func (s *rest) GetCDNs(ctx context.Context, q types.GetCDNsQuery) (*[]model.CDN, error) { cdns := []model.CDN{} - if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDN{ + if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDN{ HostName: q.HostName, IDC: q.IDC, Location: q.Location, @@ -94,9 +96,9 @@ func (s *rest) GetCDNs(q types.GetCDNsQuery) (*[]model.CDN, error) { return &cdns, nil } -func (s *rest) CDNTotalCount(q types.GetCDNsQuery) (int64, error) { +func (s *rest) CDNTotalCount(ctx context.Context, q types.GetCDNsQuery) (int64, error) { var count int64 - if err := s.db.Model(&model.CDN{}).Where(&model.CDN{ + if err := s.db.WithContext(ctx).Model(&model.CDN{}).Where(&model.CDN{ HostName: q.HostName, IDC: q.IDC, Location: q.Location, diff --git a/manager/service/cdn_cluster.go b/manager/service/cdn_cluster.go index 711c3d556..42fe0aa0e 100644 --- a/manager/service/cdn_cluster.go +++ b/manager/service/cdn_cluster.go @@ -17,12 +17,14 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/util/structutils" ) -func (s *rest) CreateCDNCluster(json types.CreateCDNClusterRequest) (*model.CDNCluster, error) { +func (s *rest) CreateCDNCluster(ctx context.Context, json types.CreateCDNClusterRequest) (*model.CDNCluster, error) { config, err := structutils.StructToMap(json.Config) if err != nil { return nil, err @@ -34,32 +36,32 @@ func (s *rest) CreateCDNCluster(json types.CreateCDNClusterRequest) (*model.CDNC Config: config, } - if err := s.db.Create(&cdnCluster).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&cdnCluster).Error; err != nil { return nil, err } return &cdnCluster, nil } -func (s *rest) DestroyCDNCluster(id uint) error { +func (s *rest) DestroyCDNCluster(ctx context.Context, id uint) error { cdnCluster := model.CDNCluster{} - if err := s.db.First(&cdnCluster, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil { return err } - if err := s.db.Unscoped().Delete(&model.CDNCluster{}, id).Error; err != nil { + if err := s.db.WithContext(ctx).Unscoped().Delete(&model.CDNCluster{}, id).Error; err != nil { return err } return nil } -func (s *rest) CreateCDNClusterWithSecurityGroupDomain(json types.CreateCDNClusterRequest) (*model.CDNCluster, error) { +func (s *rest) CreateCDNClusterWithSecurityGroupDomain(ctx context.Context, json types.CreateCDNClusterRequest) (*model.CDNCluster, error) { securityGroup := model.SecurityGroup{ Domain: json.SecurityGroupDomain, } - if err := s.db.First(&securityGroup).Error; err != nil { - return s.CreateCDNCluster(json) + if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil { + return s.CreateCDNCluster(ctx, json) } config, err := structutils.StructToMap(json.Config) @@ -73,7 +75,7 @@ func (s *rest) CreateCDNClusterWithSecurityGroupDomain(json types.CreateCDNClust Config: config, } - if err := s.db.Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil { + if err := s.db.WithContext(ctx).Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil { return nil, err } @@ -81,14 +83,14 @@ func (s *rest) CreateCDNClusterWithSecurityGroupDomain(json types.CreateCDNClust return &cdnCluster, nil } -func (s *rest) UpdateCDNCluster(id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) { +func (s *rest) UpdateCDNCluster(ctx context.Context, id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) { config, err := structutils.StructToMap(json.Config) if err != nil { return nil, err } cdnCluster := model.CDNCluster{} - if err := s.db.First(&cdnCluster, id).Updates(model.CDNCluster{ + if err := s.db.WithContext(ctx).First(&cdnCluster, id).Updates(model.CDNCluster{ Name: json.Name, BIO: json.BIO, Config: config, @@ -99,12 +101,12 @@ func (s *rest) UpdateCDNCluster(id uint, json types.UpdateCDNClusterRequest) (*m return &cdnCluster, nil } -func (s *rest) UpdateCDNClusterWithSecurityGroupDomain(id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) { +func (s *rest) UpdateCDNClusterWithSecurityGroupDomain(ctx context.Context, id uint, json types.UpdateCDNClusterRequest) (*model.CDNCluster, error) { securityGroup := model.SecurityGroup{ Domain: json.SecurityGroupDomain, } - if err := s.db.First(&securityGroup).Error; err != nil { - return s.UpdateCDNCluster(id, json) + if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil { + return s.UpdateCDNCluster(ctx, id, json) } config, err := structutils.StructToMap(json.Config) @@ -118,25 +120,25 @@ func (s *rest) UpdateCDNClusterWithSecurityGroupDomain(id uint, json types.Updat Config: config, } - if err := s.db.Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil { + if err := s.db.WithContext(ctx).Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil { return nil, err } return &cdnCluster, nil } -func (s *rest) GetCDNCluster(id uint) (*model.CDNCluster, error) { +func (s *rest) GetCDNCluster(ctx context.Context, id uint) (*model.CDNCluster, error) { cdnCluster := model.CDNCluster{} - if err := s.db.First(&cdnCluster, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil { return nil, err } return &cdnCluster, nil } -func (s *rest) GetCDNClusters(q types.GetCDNClustersQuery) (*[]model.CDNCluster, error) { +func (s *rest) GetCDNClusters(ctx context.Context, q types.GetCDNClustersQuery) (*[]model.CDNCluster, error) { cdnClusters := []model.CDNCluster{} - if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDNCluster{ + if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.CDNCluster{ Name: q.Name, }).Find(&cdnClusters).Error; err != nil { return nil, err @@ -145,9 +147,9 @@ func (s *rest) GetCDNClusters(q types.GetCDNClustersQuery) (*[]model.CDNCluster, return &cdnClusters, nil } -func (s *rest) CDNClusterTotalCount(q types.GetCDNClustersQuery) (int64, error) { +func (s *rest) CDNClusterTotalCount(ctx context.Context, q types.GetCDNClustersQuery) (int64, error) { var count int64 - if err := s.db.Model(&model.CDNCluster{}).Where(&model.CDNCluster{ + if err := s.db.WithContext(ctx).Model(&model.CDNCluster{}).Where(&model.CDNCluster{ Name: q.Name, }).Count(&count).Error; err != nil { return 0, err @@ -156,36 +158,36 @@ func (s *rest) CDNClusterTotalCount(q types.GetCDNClustersQuery) (int64, error) return count, nil } -func (s *rest) AddCDNToCDNCluster(id, cdnID uint) error { +func (s *rest) AddCDNToCDNCluster(ctx context.Context, id, cdnID uint) error { cdnCluster := model.CDNCluster{} - if err := s.db.First(&cdnCluster, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil { return err } cdn := model.CDN{} - if err := s.db.First(&cdn, cdnID).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdn, cdnID).Error; err != nil { return err } - if err := s.db.Model(&cdnCluster).Association("CDNs").Append(&cdn); err != nil { + if err := s.db.WithContext(ctx).Model(&cdnCluster).Association("CDNs").Append(&cdn); err != nil { return err } return nil } -func (s *rest) AddSchedulerClusterToCDNCluster(id, schedulerClusterID uint) error { +func (s *rest) AddSchedulerClusterToCDNCluster(ctx context.Context, id, schedulerClusterID uint) error { cdnCluster := model.CDNCluster{} - if err := s.db.First(&cdnCluster, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdnCluster, id).Error; err != nil { return err } schedulerCluster := model.SchedulerCluster{} - if err := s.db.First(&schedulerCluster, schedulerClusterID).Error; err != nil { + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { return err } - if err := s.db.Model(&cdnCluster).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { + if err := s.db.WithContext(ctx).Model(&cdnCluster).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { return err } diff --git a/manager/service/oauth.go b/manager/service/oauth.go index e00948c1d..b5c2b220f 100644 --- a/manager/service/oauth.go +++ b/manager/service/oauth.go @@ -17,11 +17,13 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" ) -func (s *rest) CreateOauth(json types.CreateOauthRequest) (*model.Oauth, error) { +func (s *rest) CreateOauth(ctx context.Context, json types.CreateOauthRequest) (*model.Oauth, error) { oauth := model.Oauth{ Name: json.Name, BIO: json.BIO, @@ -30,29 +32,29 @@ func (s *rest) CreateOauth(json types.CreateOauthRequest) (*model.Oauth, error) RedirectURL: json.RedirectURL, } - if err := s.db.Create(&oauth).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&oauth).Error; err != nil { return nil, err } return &oauth, nil } -func (s *rest) DestroyOauth(id uint) error { +func (s *rest) DestroyOauth(ctx context.Context, id uint) error { oauth := model.Oauth{} - if err := s.db.First(&oauth, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&oauth, id).Error; err != nil { return err } - if err := s.db.Unscoped().Delete(&model.Oauth{}, id).Error; err != nil { + if err := s.db.WithContext(ctx).Unscoped().Delete(&model.Oauth{}, id).Error; err != nil { return err } return nil } -func (s *rest) UpdateOauth(id uint, json types.UpdateOauthRequest) (*model.Oauth, error) { +func (s *rest) UpdateOauth(ctx context.Context, id uint, json types.UpdateOauthRequest) (*model.Oauth, error) { oauth := model.Oauth{} - if err := s.db.First(&oauth, id).Updates(model.Oauth{ + if err := s.db.WithContext(ctx).First(&oauth, id).Updates(model.Oauth{ Name: json.Name, BIO: json.BIO, ClientID: json.ClientID, @@ -65,18 +67,18 @@ func (s *rest) UpdateOauth(id uint, json types.UpdateOauthRequest) (*model.Oauth return &oauth, nil } -func (s *rest) GetOauth(id uint) (*model.Oauth, error) { +func (s *rest) GetOauth(ctx context.Context, id uint) (*model.Oauth, error) { oauth := model.Oauth{} - if err := s.db.First(&oauth, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&oauth, id).Error; err != nil { return nil, err } return &oauth, nil } -func (s *rest) GetOauths(q types.GetOauthsQuery) (*[]model.Oauth, error) { +func (s *rest) GetOauths(ctx context.Context, q types.GetOauthsQuery) (*[]model.Oauth, error) { oauths := []model.Oauth{} - if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Oauth{ + if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Oauth{ Name: q.Name, ClientID: q.ClientID, }).Find(&oauths).Error; err != nil { @@ -86,9 +88,9 @@ func (s *rest) GetOauths(q types.GetOauthsQuery) (*[]model.Oauth, error) { return &oauths, nil } -func (s *rest) OauthTotalCount(q types.GetOauthsQuery) (int64, error) { +func (s *rest) OauthTotalCount(ctx context.Context, q types.GetOauthsQuery) (int64, error) { var count int64 - if err := s.db.Model(&model.Oauth{}).Where(&model.Oauth{ + if err := s.db.WithContext(ctx).Model(&model.Oauth{}).Where(&model.Oauth{ Name: q.Name, ClientID: q.ClientID, }).Count(&count).Error; err != nil { diff --git a/manager/service/permission.go b/manager/service/permission.go index 04f9e50fe..157c2705b 100644 --- a/manager/service/permission.go +++ b/manager/service/permission.go @@ -17,10 +17,12 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/permission/rbac" "github.com/gin-gonic/gin" ) -func (s *rest) GetPermissions(g *gin.Engine) []rbac.Permission { +func (s *rest) GetPermissions(ctx context.Context, g *gin.Engine) []rbac.Permission { return rbac.GetPermissions(g) } diff --git a/manager/service/preheat.go b/manager/service/preheat.go index c6bff3e34..89e5e2f01 100644 --- a/manager/service/preheat.go +++ b/manager/service/preheat.go @@ -17,6 +17,7 @@ package service import ( + "context" "time" "d7y.io/dragonfly/v2/manager/model" @@ -39,33 +40,33 @@ const ( V1PreheatingStatusFail = "FAIL" ) -func (s *rest) CreatePreheat(json types.CreatePreheatRequest) (*types.Preheat, error) { +func (s *rest) CreatePreheat(ctx context.Context, json types.CreatePreheatRequest) (*types.Preheat, error) { if json.SchedulerClusterID != nil { schedulerCluster := model.SchedulerCluster{} - if err := s.db.First(&schedulerCluster, json.SchedulerClusterID).Error; err != nil { + if err := s.db.WithContext(ctx).First(&schedulerCluster, json.SchedulerClusterID).Error; err != nil { return nil, err } scheduler := model.Scheduler{} - if err := s.db.First(&scheduler, model.Scheduler{ + if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{ SchedulerClusterID: schedulerCluster.ID, Status: model.SchedulerStatusActive, }).Error; err != nil { return nil, err } - return s.job.CreatePreheat([]model.Scheduler{scheduler}, json) + return s.job.CreatePreheat(ctx, []model.Scheduler{scheduler}, json) } schedulerClusters := []model.SchedulerCluster{} - if err := s.db.Find(&schedulerClusters).Error; err != nil { + if err := s.db.WithContext(ctx).Find(&schedulerClusters).Error; err != nil { return nil, err } var schedulers []model.Scheduler for _, schedulerCluster := range schedulerClusters { scheduler := model.Scheduler{} - if err := s.db.First(&scheduler, model.Scheduler{ + if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{ SchedulerClusterID: schedulerCluster.ID, Status: model.SchedulerStatusActive, }).Error; err != nil { @@ -75,15 +76,15 @@ func (s *rest) CreatePreheat(json types.CreatePreheatRequest) (*types.Preheat, e schedulers = append(schedulers, scheduler) } - return s.job.CreatePreheat(schedulers, json) + return s.job.CreatePreheat(ctx, schedulers, json) } -func (s *rest) GetPreheat(id string) (*types.Preheat, error) { - return s.job.GetPreheat(id) +func (s *rest) GetPreheat(ctx context.Context, id string) (*types.Preheat, error) { + return s.job.GetPreheat(ctx, id) } -func (s *rest) CreateV1Preheat(json types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) { - p, err := s.CreatePreheat(types.CreatePreheatRequest{ +func (s *rest) CreateV1Preheat(ctx context.Context, json types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) { + p, err := s.CreatePreheat(ctx, types.CreatePreheatRequest{ Type: json.Type, URL: json.URL, Filter: json.Filter, @@ -98,8 +99,8 @@ func (s *rest) CreateV1Preheat(json types.CreateV1PreheatRequest) (*types.Create }, nil } -func (s *rest) GetV1Preheat(id string) (*types.GetV1PreheatResponse, error) { - p, err := s.job.GetPreheat(id) +func (s *rest) GetV1Preheat(ctx context.Context, id string) (*types.GetV1PreheatResponse, error) { + p, err := s.job.GetPreheat(ctx, id) if err != nil { return nil, err } diff --git a/manager/service/role.go b/manager/service/role.go index f71415534..e2ba5e142 100644 --- a/manager/service/role.go +++ b/manager/service/role.go @@ -17,10 +17,12 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/types" ) -func (s *rest) CreateRole(json types.CreateRoleRequest) error { +func (s *rest) CreateRole(ctx context.Context, json types.CreateRoleRequest) error { for _, permission := range json.Permissions { _, err := s.enforcer.AddPermissionForUser(json.Role, permission.Object, permission.Action) if err != nil { @@ -31,22 +33,22 @@ func (s *rest) CreateRole(json types.CreateRoleRequest) error { return nil } -func (s *rest) DestroyRole(role string) (bool, error) { +func (s *rest) DestroyRole(ctx context.Context, role string) (bool, error) { return s.enforcer.DeleteRole(role) } -func (s *rest) GetRole(role string) [][]string { +func (s *rest) GetRole(ctx context.Context, role string) [][]string { return s.enforcer.GetPermissionsForUser(role) } -func (s *rest) GetRoles() []string { +func (s *rest) GetRoles(ctx context.Context) []string { return s.enforcer.GetAllSubjects() } -func (s *rest) AddPermissionForRole(role string, json types.AddPermissionForRoleRequest) (bool, error) { +func (s *rest) AddPermissionForRole(ctx context.Context, role string, json types.AddPermissionForRoleRequest) (bool, error) { return s.enforcer.AddPermissionForUser(role, json.Object, json.Action) } -func (s *rest) DeletePermissionForRole(role string, json types.DeletePermissionForRoleRequest) (bool, error) { +func (s *rest) DeletePermissionForRole(ctx context.Context, role string, json types.DeletePermissionForRoleRequest) (bool, error) { return s.enforcer.DeletePermissionForUser(role, json.Object, json.Action) } diff --git a/manager/service/scheduler.go b/manager/service/scheduler.go index 20b309cc9..33792eb6a 100644 --- a/manager/service/scheduler.go +++ b/manager/service/scheduler.go @@ -17,11 +17,13 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" ) -func (s *rest) CreateScheduler(json types.CreateSchedulerRequest) (*model.Scheduler, error) { +func (s *rest) CreateScheduler(ctx context.Context, json types.CreateSchedulerRequest) (*model.Scheduler, error) { scheduler := model.Scheduler{ HostName: json.HostName, VIPs: json.VIPs, @@ -33,29 +35,29 @@ func (s *rest) CreateScheduler(json types.CreateSchedulerRequest) (*model.Schedu SchedulerClusterID: json.SchedulerClusterID, } - if err := s.db.Create(&scheduler).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil { return nil, err } return &scheduler, nil } -func (s *rest) DestroyScheduler(id uint) error { +func (s *rest) DestroyScheduler(ctx context.Context, id uint) error { scheduler := model.Scheduler{} - if err := s.db.First(&scheduler, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&scheduler, id).Error; err != nil { return err } - if err := s.db.Unscoped().Delete(&model.Scheduler{}, id).Error; err != nil { + if err := s.db.WithContext(ctx).Unscoped().Delete(&model.Scheduler{}, id).Error; err != nil { return err } return nil } -func (s *rest) UpdateScheduler(id uint, json types.UpdateSchedulerRequest) (*model.Scheduler, error) { +func (s *rest) UpdateScheduler(ctx context.Context, id uint, json types.UpdateSchedulerRequest) (*model.Scheduler, error) { scheduler := model.Scheduler{} - if err := s.db.First(&scheduler, id).Updates(model.Scheduler{ + if err := s.db.WithContext(ctx).First(&scheduler, id).Updates(model.Scheduler{ VIPs: json.VIPs, IDC: json.IDC, Location: json.Location, @@ -70,18 +72,18 @@ func (s *rest) UpdateScheduler(id uint, json types.UpdateSchedulerRequest) (*mod return &scheduler, nil } -func (s *rest) GetScheduler(id uint) (*model.Scheduler, error) { +func (s *rest) GetScheduler(ctx context.Context, id uint) (*model.Scheduler, error) { scheduler := model.Scheduler{} - if err := s.db.First(&scheduler, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&scheduler, id).Error; err != nil { return nil, err } return &scheduler, nil } -func (s *rest) GetSchedulers(q types.GetSchedulersQuery) (*[]model.Scheduler, error) { +func (s *rest) GetSchedulers(ctx context.Context, q types.GetSchedulersQuery) (*[]model.Scheduler, error) { schedulers := []model.Scheduler{} - if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Scheduler{ + if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.Scheduler{ HostName: q.HostName, IDC: q.IDC, Location: q.Location, @@ -95,9 +97,9 @@ func (s *rest) GetSchedulers(q types.GetSchedulersQuery) (*[]model.Scheduler, er return &schedulers, nil } -func (s *rest) SchedulerTotalCount(q types.GetSchedulersQuery) (int64, error) { +func (s *rest) SchedulerTotalCount(ctx context.Context, q types.GetSchedulersQuery) (int64, error) { var count int64 - if err := s.db.Model(&model.Scheduler{}).Where(&model.Scheduler{ + if err := s.db.WithContext(ctx).Model(&model.Scheduler{}).Where(&model.Scheduler{ HostName: q.HostName, IDC: q.IDC, Location: q.Location, diff --git a/manager/service/scheduler_cluster.go b/manager/service/scheduler_cluster.go index d570a9b9c..2e87b69cb 100644 --- a/manager/service/scheduler_cluster.go +++ b/manager/service/scheduler_cluster.go @@ -17,12 +17,14 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/util/structutils" ) -func (s *rest) CreateSchedulerCluster(json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) { +func (s *rest) CreateSchedulerCluster(ctx context.Context, json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) { config, err := structutils.StructToMap(json.Config) if err != nil { return nil, err @@ -47,12 +49,12 @@ func (s *rest) CreateSchedulerCluster(json types.CreateSchedulerClusterRequest) IsDefault: json.IsDefault, } - if err := s.db.Create(&schedulerCluster).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&schedulerCluster).Error; err != nil { return nil, err } if json.CDNClusterID > 0 { - if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil { + if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil { return nil, err } } @@ -60,12 +62,12 @@ func (s *rest) CreateSchedulerCluster(json types.CreateSchedulerClusterRequest) return &schedulerCluster, nil } -func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) { +func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(ctx context.Context, json types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) { securityGroup := model.SecurityGroup{ Domain: json.SecurityGroupDomain, } - if err := s.db.First(&securityGroup).Error; err != nil { - return s.CreateSchedulerCluster(json) + if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil { + return s.CreateSchedulerCluster(ctx, json) } config, err := structutils.StructToMap(json.Config) @@ -92,12 +94,12 @@ func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(json types.CreateSc IsDefault: json.IsDefault, } - if err := s.db.Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { + if err := s.db.WithContext(ctx).Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { return nil, err } if json.CDNClusterID > 0 { - if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil { + if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil { return nil, err } } @@ -105,20 +107,20 @@ func (s *rest) CreateSchedulerClusterWithSecurityGroupDomain(json types.CreateSc return &schedulerCluster, nil } -func (s *rest) DestroySchedulerCluster(id uint) error { +func (s *rest) DestroySchedulerCluster(ctx context.Context, id uint) error { schedulerCluster := model.SchedulerCluster{} - if err := s.db.First(&schedulerCluster, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&schedulerCluster, id).Error; err != nil { return err } - if err := s.db.Unscoped().Delete(&model.SchedulerCluster{}, id).Error; err != nil { + if err := s.db.WithContext(ctx).Unscoped().Delete(&model.SchedulerCluster{}, id).Error; err != nil { return err } return nil } -func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) { +func (s *rest) UpdateSchedulerCluster(ctx context.Context, id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) { config, err := structutils.StructToMap(json.Config) if err != nil { return nil, err @@ -135,7 +137,7 @@ func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerCluster } schedulerCluster := model.SchedulerCluster{} - if err := s.db.First(&schedulerCluster, id).Updates(model.SchedulerCluster{ + if err := s.db.WithContext(ctx).First(&schedulerCluster, id).Updates(model.SchedulerCluster{ Name: json.Name, BIO: json.BIO, Config: config, @@ -147,7 +149,7 @@ func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerCluster } if json.CDNClusterID > 0 { - if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil { + if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil { return nil, err } } @@ -155,12 +157,12 @@ func (s *rest) UpdateSchedulerCluster(id uint, json types.UpdateSchedulerCluster return &schedulerCluster, nil } -func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) { +func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(ctx context.Context, id uint, json types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) { securityGroup := model.SecurityGroup{ Domain: json.SecurityGroupDomain, } - if err := s.db.First(&securityGroup).Error; err != nil { - return s.UpdateSchedulerCluster(id, json) + if err := s.db.WithContext(ctx).First(&securityGroup).Error; err != nil { + return s.UpdateSchedulerCluster(ctx, id, json) } config, err := structutils.StructToMap(json.Config) @@ -187,12 +189,12 @@ func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(id uint, json types IsDefault: json.IsDefault, } - if err := s.db.Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { + if err := s.db.WithContext(ctx).Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { return nil, err } if json.CDNClusterID > 0 { - if err := s.AddSchedulerClusterToCDNCluster(json.CDNClusterID, schedulerCluster.ID); err != nil { + if err := s.AddSchedulerClusterToCDNCluster(ctx, json.CDNClusterID, schedulerCluster.ID); err != nil { return nil, err } } @@ -200,18 +202,18 @@ func (s *rest) UpdateSchedulerClusterWithSecurityGroupDomain(id uint, json types return &schedulerCluster, nil } -func (s *rest) GetSchedulerCluster(id uint) (*model.SchedulerCluster, error) { +func (s *rest) GetSchedulerCluster(ctx context.Context, id uint) (*model.SchedulerCluster, error) { schedulerCluster := model.SchedulerCluster{} - if err := s.db.Preload("CDNClusters").First(&schedulerCluster, id).Error; err != nil { + if err := s.db.WithContext(ctx).Preload("CDNClusters").First(&schedulerCluster, id).Error; err != nil { return nil, err } return &schedulerCluster, nil } -func (s *rest) GetSchedulerClusters(q types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error) { +func (s *rest) GetSchedulerClusters(ctx context.Context, q types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error) { schedulerClusters := []model.SchedulerCluster{} - if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SchedulerCluster{ + if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SchedulerCluster{ Name: q.Name, }).Preload("CDNClusters").Find(&schedulerClusters).Error; err != nil { return nil, err @@ -220,9 +222,9 @@ func (s *rest) GetSchedulerClusters(q types.GetSchedulerClustersQuery) (*[]model return &schedulerClusters, nil } -func (s *rest) SchedulerClusterTotalCount(q types.GetSchedulerClustersQuery) (int64, error) { +func (s *rest) SchedulerClusterTotalCount(ctx context.Context, q types.GetSchedulerClustersQuery) (int64, error) { var count int64 - if err := s.db.Model(&model.SchedulerCluster{}).Where(&model.SchedulerCluster{ + if err := s.db.WithContext(ctx).Model(&model.SchedulerCluster{}).Where(&model.SchedulerCluster{ Name: q.Name, }).Count(&count).Error; err != nil { return 0, err @@ -231,18 +233,18 @@ func (s *rest) SchedulerClusterTotalCount(q types.GetSchedulerClustersQuery) (in return count, nil } -func (s *rest) AddSchedulerToSchedulerCluster(id, schedulerID uint) error { +func (s *rest) AddSchedulerToSchedulerCluster(ctx context.Context, id, schedulerID uint) error { schedulerCluster := model.SchedulerCluster{} - if err := s.db.First(&schedulerCluster, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&schedulerCluster, id).Error; err != nil { return err } scheduler := model.Scheduler{} - if err := s.db.First(&scheduler, schedulerID).Error; err != nil { + if err := s.db.WithContext(ctx).First(&scheduler, schedulerID).Error; err != nil { return err } - if err := s.db.Model(&schedulerCluster).Association("Schedulers").Append(&scheduler); err != nil { + if err := s.db.WithContext(ctx).Model(&schedulerCluster).Association("Schedulers").Append(&scheduler); err != nil { return err } diff --git a/manager/service/security_group.go b/manager/service/security_group.go index cd9ee709e..805fdd36d 100644 --- a/manager/service/security_group.go +++ b/manager/service/security_group.go @@ -17,11 +17,13 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" ) -func (s *rest) CreateSecurityGroup(json types.CreateSecurityGroupRequest) (*model.SecurityGroup, error) { +func (s *rest) CreateSecurityGroup(ctx context.Context, json types.CreateSecurityGroupRequest) (*model.SecurityGroup, error) { securityGroup := model.SecurityGroup{ Name: json.Name, BIO: json.BIO, @@ -29,29 +31,29 @@ func (s *rest) CreateSecurityGroup(json types.CreateSecurityGroupRequest) (*mode ProxyDomain: json.ProxyDomain, } - if err := s.db.Create(&securityGroup).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&securityGroup).Error; err != nil { return nil, err } return &securityGroup, nil } -func (s *rest) DestroySecurityGroup(id uint) error { +func (s *rest) DestroySecurityGroup(ctx context.Context, id uint) error { securityGroup := model.SecurityGroup{} - if err := s.db.First(&securityGroup, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil { return err } - if err := s.db.Unscoped().Delete(&model.SecurityGroup{}, id).Error; err != nil { + if err := s.db.WithContext(ctx).Unscoped().Delete(&model.SecurityGroup{}, id).Error; err != nil { return err } return nil } -func (s *rest) UpdateSecurityGroup(id uint, json types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error) { +func (s *rest) UpdateSecurityGroup(ctx context.Context, id uint, json types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error) { securityGroup := model.SecurityGroup{} - if err := s.db.First(&securityGroup, id).Updates(model.SecurityGroup{ + if err := s.db.WithContext(ctx).First(&securityGroup, id).Updates(model.SecurityGroup{ Name: json.Name, BIO: json.BIO, Domain: json.Domain, @@ -63,18 +65,18 @@ func (s *rest) UpdateSecurityGroup(id uint, json types.UpdateSecurityGroupReques return &securityGroup, nil } -func (s *rest) GetSecurityGroup(id uint) (*model.SecurityGroup, error) { +func (s *rest) GetSecurityGroup(ctx context.Context, id uint) (*model.SecurityGroup, error) { securityGroup := model.SecurityGroup{} - if err := s.db.First(&securityGroup, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil { return nil, err } return &securityGroup, nil } -func (s *rest) GetSecurityGroups(q types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error) { +func (s *rest) GetSecurityGroups(ctx context.Context, q types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error) { securityGroups := []model.SecurityGroup{} - if err := s.db.Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SecurityGroup{ + if err := s.db.WithContext(ctx).Scopes(model.Paginate(q.Page, q.PerPage)).Where(&model.SecurityGroup{ Name: q.Name, Domain: q.Domain, }).Find(&securityGroups).Error; err != nil { @@ -84,9 +86,9 @@ func (s *rest) GetSecurityGroups(q types.GetSecurityGroupsQuery) (*[]model.Secur return &securityGroups, nil } -func (s *rest) SecurityGroupTotalCount(q types.GetSecurityGroupsQuery) (int64, error) { +func (s *rest) SecurityGroupTotalCount(ctx context.Context, q types.GetSecurityGroupsQuery) (int64, error) { var count int64 - if err := s.db.Model(&model.SecurityGroup{}).Where(&model.SecurityGroup{ + if err := s.db.WithContext(ctx).Model(&model.SecurityGroup{}).Where(&model.SecurityGroup{ Name: q.Name, Domain: q.Domain, }).Count(&count).Error; err != nil { @@ -96,36 +98,36 @@ func (s *rest) SecurityGroupTotalCount(q types.GetSecurityGroupsQuery) (int64, e return count, nil } -func (s *rest) AddSchedulerClusterToSecurityGroup(id, schedulerClusterID uint) error { +func (s *rest) AddSchedulerClusterToSecurityGroup(ctx context.Context, id, schedulerClusterID uint) error { securityGroup := model.SecurityGroup{} - if err := s.db.First(&securityGroup, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil { return err } schedulerCluster := model.SchedulerCluster{} - if err := s.db.First(&schedulerCluster, schedulerClusterID).Error; err != nil { + if err := s.db.WithContext(ctx).First(&schedulerCluster, schedulerClusterID).Error; err != nil { return err } - if err := s.db.Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { + if err := s.db.WithContext(ctx).Model(&securityGroup).Association("SchedulerClusters").Append(&schedulerCluster); err != nil { return err } return nil } -func (s *rest) AddCDNClusterToSecurityGroup(id, cdnClusterID uint) error { +func (s *rest) AddCDNClusterToSecurityGroup(ctx context.Context, id, cdnClusterID uint) error { securityGroup := model.SecurityGroup{} - if err := s.db.First(&securityGroup, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&securityGroup, id).Error; err != nil { return err } cdnCluster := model.CDNCluster{} - if err := s.db.First(&cdnCluster, cdnClusterID).Error; err != nil { + if err := s.db.WithContext(ctx).First(&cdnCluster, cdnClusterID).Error; err != nil { return err } - if err := s.db.Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil { + if err := s.db.WithContext(ctx).Model(&securityGroup).Association("CDNClusters").Append(&cdnCluster); err != nil { return err } diff --git a/manager/service/service.go b/manager/service/service.go index a62d2aeac..0fc50dfb3 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -17,6 +17,8 @@ package service import ( + "context" + "d7y.io/dragonfly/v2/manager/cache" "d7y.io/dragonfly/v2/manager/database" "d7y.io/dragonfly/v2/manager/job" @@ -31,80 +33,80 @@ import ( ) type REST interface { - GetUser(uint) (*model.User, error) - SignIn(types.SignInRequest) (*model.User, error) - SignUp(types.SignUpRequest) (*model.User, error) - OauthSignin(string) (string, error) - OauthSigninCallback(string, string) (*model.User, error) - ResetPassword(uint, types.ResetPasswordRequest) error - GetRolesForUser(uint) ([]string, error) - AddRoleForUser(types.AddRoleForUserParams) (bool, error) - DeleteRoleForUser(types.DeleteRoleForUserParams) (bool, error) + GetUser(context.Context, uint) (*model.User, error) + SignIn(context.Context, types.SignInRequest) (*model.User, error) + SignUp(context.Context, types.SignUpRequest) (*model.User, error) + OauthSignin(context.Context, string) (string, error) + OauthSigninCallback(context.Context, string, string) (*model.User, error) + ResetPassword(context.Context, uint, types.ResetPasswordRequest) error + GetRolesForUser(context.Context, uint) ([]string, error) + AddRoleForUser(context.Context, types.AddRoleForUserParams) (bool, error) + DeleteRoleForUser(context.Context, types.DeleteRoleForUserParams) (bool, error) - CreateRole(json types.CreateRoleRequest) error - DestroyRole(string) (bool, error) - GetRole(string) [][]string - GetRoles() []string - AddPermissionForRole(string, types.AddPermissionForRoleRequest) (bool, error) - DeletePermissionForRole(string, types.DeletePermissionForRoleRequest) (bool, error) + CreateRole(context.Context, types.CreateRoleRequest) error + DestroyRole(context.Context, string) (bool, error) + GetRole(context.Context, string) [][]string + GetRoles(context.Context) []string + AddPermissionForRole(context.Context, string, types.AddPermissionForRoleRequest) (bool, error) + DeletePermissionForRole(context.Context, string, types.DeletePermissionForRoleRequest) (bool, error) - GetPermissions(*gin.Engine) []rbac.Permission + GetPermissions(context.Context, *gin.Engine) []rbac.Permission - CreateOauth(types.CreateOauthRequest) (*model.Oauth, error) - DestroyOauth(uint) error - UpdateOauth(uint, types.UpdateOauthRequest) (*model.Oauth, error) - GetOauth(uint) (*model.Oauth, error) - GetOauths(types.GetOauthsQuery) (*[]model.Oauth, error) - OauthTotalCount(types.GetOauthsQuery) (int64, error) + CreateOauth(context.Context, types.CreateOauthRequest) (*model.Oauth, error) + DestroyOauth(context.Context, uint) error + UpdateOauth(context.Context, uint, types.UpdateOauthRequest) (*model.Oauth, error) + GetOauth(context.Context, uint) (*model.Oauth, error) + GetOauths(context.Context, types.GetOauthsQuery) (*[]model.Oauth, error) + OauthTotalCount(context.Context, types.GetOauthsQuery) (int64, error) - CreateCDNCluster(types.CreateCDNClusterRequest) (*model.CDNCluster, error) - CreateCDNClusterWithSecurityGroupDomain(types.CreateCDNClusterRequest) (*model.CDNCluster, error) - DestroyCDNCluster(uint) error - UpdateCDNCluster(uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error) - UpdateCDNClusterWithSecurityGroupDomain(uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error) - GetCDNCluster(uint) (*model.CDNCluster, error) - GetCDNClusters(types.GetCDNClustersQuery) (*[]model.CDNCluster, error) - CDNClusterTotalCount(types.GetCDNClustersQuery) (int64, error) - AddCDNToCDNCluster(uint, uint) error - AddSchedulerClusterToCDNCluster(uint, uint) error + CreateCDNCluster(context.Context, types.CreateCDNClusterRequest) (*model.CDNCluster, error) + CreateCDNClusterWithSecurityGroupDomain(context.Context, types.CreateCDNClusterRequest) (*model.CDNCluster, error) + DestroyCDNCluster(context.Context, uint) error + UpdateCDNCluster(context.Context, uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error) + UpdateCDNClusterWithSecurityGroupDomain(context.Context, uint, types.UpdateCDNClusterRequest) (*model.CDNCluster, error) + GetCDNCluster(context.Context, uint) (*model.CDNCluster, error) + GetCDNClusters(context.Context, types.GetCDNClustersQuery) (*[]model.CDNCluster, error) + CDNClusterTotalCount(context.Context, types.GetCDNClustersQuery) (int64, error) + AddCDNToCDNCluster(context.Context, uint, uint) error + AddSchedulerClusterToCDNCluster(context.Context, uint, uint) error - CreateCDN(types.CreateCDNRequest) (*model.CDN, error) - DestroyCDN(uint) error - UpdateCDN(uint, types.UpdateCDNRequest) (*model.CDN, error) - GetCDN(uint) (*model.CDN, error) - GetCDNs(types.GetCDNsQuery) (*[]model.CDN, error) - CDNTotalCount(types.GetCDNsQuery) (int64, error) + CreateCDN(context.Context, types.CreateCDNRequest) (*model.CDN, error) + DestroyCDN(context.Context, uint) error + UpdateCDN(context.Context, uint, types.UpdateCDNRequest) (*model.CDN, error) + GetCDN(context.Context, uint) (*model.CDN, error) + GetCDNs(context.Context, types.GetCDNsQuery) (*[]model.CDN, error) + CDNTotalCount(context.Context, types.GetCDNsQuery) (int64, error) - CreateSchedulerCluster(types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) - CreateSchedulerClusterWithSecurityGroupDomain(types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) - DestroySchedulerCluster(uint) error - UpdateSchedulerCluster(uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) - UpdateSchedulerClusterWithSecurityGroupDomain(uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) - GetSchedulerCluster(uint) (*model.SchedulerCluster, error) - GetSchedulerClusters(types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error) - SchedulerClusterTotalCount(types.GetSchedulerClustersQuery) (int64, error) - AddSchedulerToSchedulerCluster(uint, uint) error + CreateSchedulerCluster(context.Context, types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) + CreateSchedulerClusterWithSecurityGroupDomain(context.Context, types.CreateSchedulerClusterRequest) (*model.SchedulerCluster, error) + DestroySchedulerCluster(context.Context, uint) error + UpdateSchedulerCluster(context.Context, uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) + UpdateSchedulerClusterWithSecurityGroupDomain(context.Context, uint, types.UpdateSchedulerClusterRequest) (*model.SchedulerCluster, error) + GetSchedulerCluster(context.Context, uint) (*model.SchedulerCluster, error) + GetSchedulerClusters(context.Context, types.GetSchedulerClustersQuery) (*[]model.SchedulerCluster, error) + SchedulerClusterTotalCount(context.Context, types.GetSchedulerClustersQuery) (int64, error) + AddSchedulerToSchedulerCluster(context.Context, uint, uint) error - CreateScheduler(types.CreateSchedulerRequest) (*model.Scheduler, error) - DestroyScheduler(uint) error - UpdateScheduler(uint, types.UpdateSchedulerRequest) (*model.Scheduler, error) - GetScheduler(uint) (*model.Scheduler, error) - GetSchedulers(types.GetSchedulersQuery) (*[]model.Scheduler, error) - SchedulerTotalCount(types.GetSchedulersQuery) (int64, error) + CreateScheduler(context.Context, types.CreateSchedulerRequest) (*model.Scheduler, error) + DestroyScheduler(context.Context, uint) error + UpdateScheduler(context.Context, uint, types.UpdateSchedulerRequest) (*model.Scheduler, error) + GetScheduler(context.Context, uint) (*model.Scheduler, error) + GetSchedulers(context.Context, types.GetSchedulersQuery) (*[]model.Scheduler, error) + SchedulerTotalCount(context.Context, types.GetSchedulersQuery) (int64, error) - CreateSecurityGroup(types.CreateSecurityGroupRequest) (*model.SecurityGroup, error) - DestroySecurityGroup(uint) error - UpdateSecurityGroup(uint, types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error) - GetSecurityGroup(uint) (*model.SecurityGroup, error) - GetSecurityGroups(types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error) - SecurityGroupTotalCount(types.GetSecurityGroupsQuery) (int64, error) - AddSchedulerClusterToSecurityGroup(uint, uint) error - AddCDNClusterToSecurityGroup(uint, uint) error + CreateSecurityGroup(context.Context, types.CreateSecurityGroupRequest) (*model.SecurityGroup, error) + DestroySecurityGroup(context.Context, uint) error + UpdateSecurityGroup(context.Context, uint, types.UpdateSecurityGroupRequest) (*model.SecurityGroup, error) + GetSecurityGroup(context.Context, uint) (*model.SecurityGroup, error) + GetSecurityGroups(context.Context, types.GetSecurityGroupsQuery) (*[]model.SecurityGroup, error) + SecurityGroupTotalCount(context.Context, types.GetSecurityGroupsQuery) (int64, error) + AddSchedulerClusterToSecurityGroup(context.Context, uint, uint) error + AddCDNClusterToSecurityGroup(context.Context, uint, uint) error - CreatePreheat(types.CreatePreheatRequest) (*types.Preheat, error) - GetPreheat(string) (*types.Preheat, error) - CreateV1Preheat(types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) - GetV1Preheat(string) (*types.GetV1PreheatResponse, error) + CreatePreheat(context.Context, types.CreatePreheatRequest) (*types.Preheat, error) + GetPreheat(context.Context, string) (*types.Preheat, error) + CreateV1Preheat(context.Context, types.CreateV1PreheatRequest) (*types.CreateV1PreheatResponse, error) + GetV1Preheat(context.Context, string) (*types.GetV1PreheatResponse, error) } type rest struct { diff --git a/manager/service/service_grpc.go b/manager/service/service_grpc.go index 6bba39841..2d254a5cd 100644 --- a/manager/service/service_grpc.go +++ b/manager/service/service_grpc.go @@ -65,7 +65,7 @@ func (s *GRPC) GetCDN(ctx context.Context, req *manager.GetCDNRequest) (*manager // Cache Miss logger.Infof("%s cache miss", cacheKey) cdn := model.CDN{} - if err := s.db.Preload("CDNCluster.SecurityGroup").First(&cdn, &model.CDN{ + if err := s.db.WithContext(ctx).Preload("CDNCluster.SecurityGroup").First(&cdn, &model.CDN{ HostName: req.HostName, CDNClusterID: uint(req.CdnClusterId), }).Error; err != nil { @@ -125,7 +125,7 @@ func (s *GRPC) createCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m CDNClusterID: uint(req.CdnClusterId), } - if err := s.db.Create(&cdn).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&cdn).Error; err != nil { return nil, status.Error(codes.Unknown, err.Error()) } @@ -143,7 +143,7 @@ func (s *GRPC) createCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*manager.CDN, error) { cdn := model.CDN{} - if err := s.db.First(&cdn, model.CDN{ + if err := s.db.WithContext(ctx).First(&cdn, model.CDN{ HostName: req.HostName, CDNClusterID: uint(req.CdnClusterId), }).Error; err != nil { @@ -153,7 +153,7 @@ func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m return nil, status.Error(codes.Unknown, err.Error()) } - if err := s.db.Model(&cdn).Updates(model.CDN{ + if err := s.db.WithContext(ctx).Model(&cdn).Updates(model.CDN{ IDC: req.Idc, Location: req.Location, IP: req.Ip, @@ -165,7 +165,7 @@ func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m } if err := s.cache.Delete( - context.TODO(), + ctx, cache.MakeCDNCacheKey(cdn.HostName, cdn.CDNClusterID), ); err != nil { logger.Warnf("%s refresh keepalive status failed in cdn cluster %d", cdn.HostName, cdn.CDNClusterID) @@ -196,7 +196,7 @@ func (s *GRPC) GetScheduler(ctx context.Context, req *manager.GetSchedulerReques // Cache Miss logger.Infof("%s cache miss", cacheKey) scheduler := model.Scheduler{} - if err := s.db.Preload("SchedulerCluster.SecurityGroup").Preload("SchedulerCluster.CDNClusters.CDNs", &model.CDN{ + if err := s.db.WithContext(ctx).Preload("SchedulerCluster.SecurityGroup").Preload("SchedulerCluster.CDNClusters.CDNs", &model.CDN{ Status: model.CDNStatusActive, }).First(&scheduler, &model.Scheduler{ HostName: req.HostName, @@ -307,7 +307,7 @@ func (s *GRPC) createScheduler(ctx context.Context, req *manager.UpdateScheduler SchedulerClusterID: uint(req.SchedulerClusterId), } - if err := s.db.Create(&scheduler).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&scheduler).Error; err != nil { return nil, status.Error(codes.Unknown, err.Error()) } @@ -327,7 +327,7 @@ func (s *GRPC) createScheduler(ctx context.Context, req *manager.UpdateScheduler func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) { scheduler := model.Scheduler{} - if err := s.db.First(&scheduler, model.Scheduler{ + if err := s.db.WithContext(ctx).First(&scheduler, model.Scheduler{ HostName: req.HostName, SchedulerClusterID: uint(req.SchedulerClusterId), }).Error; err != nil { @@ -344,7 +344,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler } } - if err := s.db.Model(&scheduler).Updates(model.Scheduler{ + if err := s.db.WithContext(ctx).Model(&scheduler).Updates(model.Scheduler{ VIPs: req.Vips, IDC: req.Idc, Location: req.Location, @@ -357,7 +357,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler } if err := s.cache.Delete( - context.TODO(), + ctx, cache.MakeSchedulerCacheKey(scheduler.HostName, scheduler.SchedulerClusterID), ); err != nil { logger.Warnf("%s refresh keepalive status failed in scheduler cluster %d", scheduler.HostName, scheduler.SchedulerClusterID) @@ -390,14 +390,14 @@ func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRe // Cache Miss logger.Infof("%s cache miss", cacheKey) var schedulerClusters []model.SchedulerCluster - if err := s.db.Preload("SecurityGroup").Find(&schedulerClusters).Error; err != nil { + if err := s.db.WithContext(ctx).Preload("SecurityGroup").Find(&schedulerClusters).Error; err != nil { return nil, status.Error(codes.Unknown, err.Error()) } // Search optimal scheduler cluster schedulerCluster, ok := s.searcher.FindSchedulerCluster(schedulerClusters, req.HostInfo) if !ok { - if err := s.db.Find(&schedulerCluster, &model.SchedulerCluster{ + if err := s.db.WithContext(ctx).Find(&schedulerCluster, &model.SchedulerCluster{ IsDefault: true, }).Error; err != nil { return nil, status.Error(codes.Unknown, err.Error()) @@ -405,7 +405,7 @@ func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRe } schedulers := []model.Scheduler{} - if err := s.db.Find(&schedulers, &model.Scheduler{ + if err := s.db.WithContext(ctx).Find(&schedulers, &model.Scheduler{ Status: model.SchedulerStatusActive, SchedulerClusterID: schedulerCluster.ID, }).Error; err != nil { diff --git a/manager/service/user.go b/manager/service/user.go index a8115aa49..6dee7d2c8 100644 --- a/manager/service/user.go +++ b/manager/service/user.go @@ -17,6 +17,7 @@ package service import ( + "context" "fmt" "github.com/pkg/errors" @@ -30,18 +31,18 @@ import ( "golang.org/x/crypto/bcrypt" ) -func (s *rest) GetUser(id uint) (*model.User, error) { +func (s *rest) GetUser(ctx context.Context, id uint) (*model.User, error) { user := model.User{} - if err := s.db.First(&user, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&user, id).Error; err != nil { return nil, err } return &user, nil } -func (s *rest) SignIn(json types.SignInRequest) (*model.User, error) { +func (s *rest) SignIn(ctx context.Context, json types.SignInRequest) (*model.User, error) { user := model.User{} - if err := s.db.First(&user, model.User{ + if err := s.db.WithContext(ctx).First(&user, model.User{ Name: json.Name, }).Error; err != nil { return nil, err @@ -54,9 +55,9 @@ func (s *rest) SignIn(json types.SignInRequest) (*model.User, error) { return &user, nil } -func (s *rest) ResetPassword(id uint, json types.ResetPasswordRequest) error { +func (s *rest) ResetPassword(ctx context.Context, id uint, json types.ResetPasswordRequest) error { user := model.User{} - if err := s.db.First(&user, id).Error; err != nil { + if err := s.db.WithContext(ctx).First(&user, id).Error; err != nil { return err } @@ -69,7 +70,7 @@ func (s *rest) ResetPassword(id uint, json types.ResetPasswordRequest) error { return err } - if err := s.db.First(&user, id).Updates(model.User{ + if err := s.db.WithContext(ctx).First(&user, id).Updates(model.User{ EncryptedPassword: string(encryptedPasswordBytes), }).Error; err != nil { return err @@ -78,7 +79,7 @@ func (s *rest) ResetPassword(id uint, json types.ResetPasswordRequest) error { return nil } -func (s *rest) SignUp(json types.SignUpRequest) (*model.User, error) { +func (s *rest) SignUp(ctx context.Context, json types.SignUpRequest) (*model.User, error) { encryptedPasswordBytes, err := bcrypt.GenerateFromPassword([]byte(json.Password), bcrypt.MinCost) if err != nil { return nil, err @@ -95,7 +96,7 @@ func (s *rest) SignUp(json types.SignUpRequest) (*model.User, error) { State: model.UserStateEnabled, } - if err := s.db.Create(&user).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&user).Error; err != nil { return nil, err } @@ -106,9 +107,9 @@ func (s *rest) SignUp(json types.SignUpRequest) (*model.User, error) { return &user, nil } -func (s *rest) OauthSignin(name string) (string, error) { +func (s *rest) OauthSignin(ctx context.Context, name string) (string, error) { oauth := model.Oauth{} - if err := s.db.First(&oauth, model.Oauth{Name: name}).Error; err != nil { + if err := s.db.WithContext(ctx).First(&oauth, model.Oauth{Name: name}).Error; err != nil { return "", err } @@ -120,9 +121,9 @@ func (s *rest) OauthSignin(name string) (string, error) { return o.AuthCodeURL(), nil } -func (s *rest) OauthSigninCallback(name, code string) (*model.User, error) { +func (s *rest) OauthSigninCallback(ctx context.Context, name, code string) (*model.User, error) { oauth := model.Oauth{} - if err := s.db.First(&oauth, model.Oauth{Name: name}).Error; err != nil { + if err := s.db.WithContext(ctx).First(&oauth, model.Oauth{Name: name}).Error; err != nil { return nil, err } @@ -147,7 +148,7 @@ func (s *rest) OauthSigninCallback(name, code string) (*model.User, error) { Avatar: oauthUser.Avatar, State: model.UserStateEnabled, } - if err := s.db.Create(&user).Error; err != nil { + if err := s.db.WithContext(ctx).Create(&user).Error; err != nil { if err, ok := errors.Cause(err).(*mysql.MySQLError); ok && err.Number == mysqlerr.ER_DUP_ENTRY { return &user, nil } @@ -162,14 +163,14 @@ func (s *rest) OauthSigninCallback(name, code string) (*model.User, error) { return &user, nil } -func (s *rest) GetRolesForUser(id uint) ([]string, error) { +func (s *rest) GetRolesForUser(ctx context.Context, id uint) ([]string, error) { return s.enforcer.GetRolesForUser(fmt.Sprint(id)) } -func (s *rest) AddRoleForUser(json types.AddRoleForUserParams) (bool, error) { +func (s *rest) AddRoleForUser(ctx context.Context, json types.AddRoleForUserParams) (bool, error) { return s.enforcer.AddRoleForUser(fmt.Sprint(json.ID), json.Role) } -func (s *rest) DeleteRoleForUser(json types.DeleteRoleForUserParams) (bool, error) { +func (s *rest) DeleteRoleForUser(ctx context.Context, json types.DeleteRoleForUserParams) (bool, error) { return s.enforcer.DeleteRoleForUser(fmt.Sprint(json.ID), json.Role) } diff --git a/scheduler/config/constants_otel.go b/scheduler/config/constants_otel.go index 37ebcf6db..a9ae117e5 100644 --- a/scheduler/config/constants_otel.go +++ b/scheduler/config/constants_otel.go @@ -47,6 +47,7 @@ const ( SpanReportPieceResult = "report-piece-result" SpanReportPeerResult = "report-peer-result" SpanPeerLeave = "peer-leave" + SpanPreheat = "preheat" ) const ( diff --git a/scheduler/job/job.go b/scheduler/job/job.go index 97a57f40f..33edc442c 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -27,8 +27,12 @@ import ( "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/core" "github.com/go-playground/validator/v10" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" ) +var tracer = otel.Tracer("worker") + type Job interface { Serve() error Stop() @@ -125,7 +129,12 @@ func (t *job) Stop() { t.localJob.Worker.Quit() } -func (t *job) preheat(req string) error { +func (t *job) preheat(ctx context.Context, req string) error { + // machinery can't passing context to worker, refer https://github.com/RichardKnop/machinery/issues/175 + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindConsumer)) + defer span.End() + request := &internaljob.PreheatRequest{} if err := internaljob.UnmarshalRequest(req, request); err != nil { logger.Errorf("unmarshal request err: %v, request body: %s", err, req) @@ -158,7 +167,7 @@ func (t *job) preheat(req string) error { // Trigger CDN download seeds plogger := logger.WithTaskIDAndURL(taskID, request.URL) plogger.Info("ready to preheat") - stream, err := t.service.CDN.GetClient().ObtainSeeds(t.ctx, &cdnsystem.SeedRequest{ + stream, err := t.service.CDN.GetClient().ObtainSeeds(ctx, &cdnsystem.SeedRequest{ TaskId: taskID, Url: request.URL, UrlMeta: meta,