dragonfly/cdnsystem/supervisor/task/manager.go

248 lines
8.2 KiB
Go

/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package task
import (
"context"
"encoding/json"
"fmt"
"time"
"d7y.io/dragonfly/v2/cdnsystem/config"
cdnerrors "d7y.io/dragonfly/v2/cdnsystem/errors"
"d7y.io/dragonfly/v2/cdnsystem/supervisor"
"d7y.io/dragonfly/v2/cdnsystem/supervisor/gc"
"d7y.io/dragonfly/v2/cdnsystem/types"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/structure/syncmap"
"d7y.io/dragonfly/v2/pkg/synclock"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)
// Ensure that Manager implements the SeedTaskMgr and gcExecutor interfaces
var _ supervisor.SeedTaskMgr = (*Manager)(nil)
var _ gc.Executor = (*Manager)(nil)
var tracer trace.Tracer
func init() {
tracer = otel.Tracer("cdn-task-manager")
}
// Manager is an implementation of the interface of TaskMgr.
type Manager struct {
cfg *config.Config
taskStore *syncmap.SyncMap
accessTimeMap *syncmap.SyncMap
taskURLUnReachableStore *syncmap.SyncMap
cdnMgr supervisor.CDNMgr
progressMgr supervisor.SeedProgressMgr
}
// NewManager returns a new Manager Object.
func NewManager(cfg *config.Config, cdnMgr supervisor.CDNMgr, progressMgr supervisor.SeedProgressMgr) (*Manager, error) {
taskMgr := &Manager{
cfg: cfg,
taskStore: syncmap.NewSyncMap(),
accessTimeMap: syncmap.NewSyncMap(),
taskURLUnReachableStore: syncmap.NewSyncMap(),
cdnMgr: cdnMgr,
progressMgr: progressMgr,
}
progressMgr.SetTaskMgr(taskMgr)
gc.Register("task", cfg.GCInitialDelay, cfg.GCMetaInterval, taskMgr)
return taskMgr, nil
}
func (tm *Manager) Register(ctx context.Context, req *types.TaskRegisterRequest) (pieceChan <-chan *types.SeedPiece, err error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanTaskRegister)
defer span.End()
task, err := tm.addOrUpdateTask(ctx, req)
if err != nil {
span.RecordError(err)
logger.WithTaskID(req.TaskID).Infof("failed to add or update task with req: %+v: %v", req, err)
return nil, err
}
taskBytes, _ := json.Marshal(task)
span.SetAttributes(config.AttributeTaskInfo.String(string(taskBytes)))
logger.WithTaskID(task.TaskID).Debugf("success get task info: %+v", task)
// update accessTime for taskId
if err := tm.accessTimeMap.Add(task.TaskID, time.Now()); err != nil {
logger.WithTaskID(task.TaskID).Warnf("failed to update accessTime: %v", err)
}
// trigger CDN
if err := tm.triggerCdnSyncAction(ctx, task); err != nil {
return nil, errors.Wrapf(err, "trigger cdn")
}
logger.WithTaskID(task.TaskID).Infof("successfully trigger cdn sync action")
// watch seed progress
return tm.progressMgr.WatchSeedProgress(ctx, task.TaskID)
}
// triggerCdnSyncAction
func (tm *Manager) triggerCdnSyncAction(ctx context.Context, task *types.SeedTask) error {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanTriggerCDNSyncAction)
defer span.End()
synclock.Lock(task.TaskID, true)
if !task.IsFrozen() {
span.SetAttributes(config.AttributeTaskStatus.String(task.CdnStatus))
logger.WithTaskID(task.TaskID).Infof("seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
synclock.UnLock(task.TaskID, true)
return nil
}
synclock.UnLock(task.TaskID, true)
synclock.Lock(task.TaskID, false)
defer synclock.UnLock(task.TaskID, false)
// reconfirm
span.SetAttributes(config.AttributeTaskStatus.String(task.CdnStatus))
if !task.IsFrozen() {
logger.WithTaskID(task.TaskID).Infof("reconfirm find seedTask is running or has been downloaded successfully, status: %s", task.CdnStatus)
return nil
}
if task.IsWait() {
tm.progressMgr.InitSeedProgress(ctx, task.TaskID)
logger.WithTaskID(task.TaskID).Infof("successfully init seed progress for task")
}
updatedTask, err := tm.updateTask(task.TaskID, &types.SeedTask{
CdnStatus: types.TaskInfoCdnStatusRunning,
})
if err != nil {
return errors.Wrapf(err, "update task")
}
// triggerCDN goroutine
go func() {
updateTaskInfo, err := tm.cdnMgr.TriggerCDN(ctx, task)
if err != nil {
logger.WithTaskID(task.TaskID).Errorf("trigger cdn get error: %v", err)
}
go func() {
if err := tm.progressMgr.PublishTask(ctx, task.TaskID, updateTaskInfo); err != nil {
logger.WithTaskID(task.TaskID).Errorf("failed to publish task: %v", err)
}
}()
updatedTask, err = tm.updateTask(task.TaskID, updateTaskInfo)
if err != nil {
logger.WithTaskID(task.TaskID).Errorf("failed to update task: %v", err)
}
logger.WithTaskID(task.TaskID).Infof("successfully update task cdn updatedTask: %+v", updatedTask)
}()
return nil
}
func (tm *Manager) getTask(taskID string) (*types.SeedTask, error) {
if stringutils.IsBlank(taskID) {
return nil, errors.Wrap(cdnerrors.ErrInvalidValue, "taskID is empty")
}
v, err := tm.taskStore.Get(taskID)
if err != nil {
if errors.Cause(err) == dferrors.ErrDataNotFound {
return nil, errors.Wrapf(cdnerrors.ErrDataNotFound, "task not found")
}
return nil, err
}
// type assertion
if info, ok := v.(*types.SeedTask); ok {
return info, nil
}
return nil, errors.Wrapf(cdnerrors.ErrConvertFailed, "origin object: %+v", v)
}
func (tm Manager) Get(taskID string) (*types.SeedTask, error) {
task, err := tm.getTask(taskID)
// update accessTime for taskID
if err := tm.accessTimeMap.Add(taskID, time.Now()); err != nil {
logger.WithTaskID(taskID).Warnf("failed to update accessTime: %v", err)
}
return task, err
}
func (tm Manager) Exist(taskID string) bool {
_, err := tm.getTask(taskID)
return err == nil || !cdnerrors.IsDataNotFound(err)
}
func (tm Manager) GetAccessTime() (*syncmap.SyncMap, error) {
return tm.accessTimeMap, nil
}
func (tm Manager) Delete(taskID string) error {
tm.accessTimeMap.Delete(taskID)
tm.taskURLUnReachableStore.Delete(taskID)
tm.taskStore.Delete(taskID)
tm.progressMgr.Clear(taskID)
return nil
}
func (tm *Manager) GetPieces(ctx context.Context, taskID string) (pieces []*types.SeedPiece, err error) {
synclock.Lock(taskID, true)
defer synclock.UnLock(taskID, true)
return tm.progressMgr.GetPieces(ctx, taskID)
}
const (
// gcTasksTimeout specifies the timeout for tasks gc.
// If the actual execution time exceeds this threshold, a warning will be thrown.
gcTasksTimeout = 2.0 * time.Second
)
func (tm *Manager) GC() error {
logger.Debugf("start the task meta gc job")
var removedTaskCount int
startTime := time.Now()
// get all taskIDs and the corresponding accessTime
taskAccessMap, err := tm.GetAccessTime()
if err != nil {
return fmt.Errorf("gc tasks: failed to get task accessTime map for GC: %v", err)
}
// range all tasks and determine whether they are expired
taskIDs := taskAccessMap.ListKeyAsStringSlice()
totalTaskNums := len(taskIDs)
for _, taskID := range taskIDs {
atime, err := taskAccessMap.GetAsTime(taskID)
if err != nil {
logger.GcLogger.With("type", "meta").Errorf("gc tasks: failed to get access time taskID(%s): %v", taskID, err)
continue
}
if time.Since(atime) < tm.cfg.TaskExpireTime {
continue
}
// gc task memory data
logger.GcLogger.With("type", "meta").Infof("gc task: start to deal with task: %s", taskID)
tm.Delete(taskID)
removedTaskCount++
}
// slow GC detected, report it with a log warning
if timeDuring := time.Since(startTime); timeDuring > gcTasksTimeout {
logger.GcLogger.With("type", "meta").Warnf("gc tasks: %d cost: %.3f", removedTaskCount, timeDuring.Seconds())
}
logger.GcLogger.With("type", "meta").Infof("gc tasks: successfully full gc task count(%d), remainder count(%d)", removedTaskCount, totalTaskNums-removedTaskCount)
return nil
}