From 060429c67575f9988ecde1e415897426fcd81f0e Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 17 Feb 2022 15:03:54 +0800 Subject: [PATCH] feat: scheduler blocks cdn (#1079) * feat: scheduler blocks cdn Signed-off-by: Gaius --- docs/en/deployment/configuration/cdn.yaml | 6 +- docs/en/deployment/configuration/dfget.yaml | 52 ++-- docs/en/deployment/configuration/manager.yaml | 2 +- .../deployment/configuration/scheduler.yaml | 34 ++- docs/zh-CN/deployment/configuration/cdn.yaml | 10 +- .../zh-CN/deployment/configuration/dfget.yaml | 52 ++-- .../deployment/configuration/manager.yaml | 2 +- .../deployment/configuration/scheduler.yaml | 27 +- scheduler/config/config.go | 11 + scheduler/config/config_test.go | 3 + scheduler/config/testdata/scheduler.yaml | 3 + scheduler/job/job.go | 31 +- scheduler/service/service.go | 48 ++- scheduler/service/service_test.go | 275 +++++++++++++++++- 14 files changed, 428 insertions(+), 128 deletions(-) diff --git a/docs/en/deployment/configuration/cdn.yaml b/docs/en/deployment/configuration/cdn.yaml index 081815327..5f2f8d353 100644 --- a/docs/en/deployment/configuration/cdn.yaml +++ b/docs/en/deployment/configuration/cdn.yaml @@ -44,11 +44,11 @@ base: # logDir is the log storage directory # in linux, default value is /var/log/dragonfly # in macos(just for testing), default value is /Users/$USER/.dragonfly/logs - logDir: '' + logDir: "" # manager configuration manager: - addr: '' + addr: "" cdnClusterID: 0 keepAlive: interval: 5s @@ -116,7 +116,7 @@ verbose: false pprof-port: -1 # jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" # service name used in tracer # default: dragonfly-cdn diff --git a/docs/en/deployment/configuration/dfget.yaml b/docs/en/deployment/configuration/dfget.yaml index b7c786db0..620e4f240 100644 --- a/docs/en/deployment/configuration/dfget.yaml +++ b/docs/en/deployment/configuration/dfget.yaml @@ -8,22 +8,22 @@ gcInterval: 1m0s # daemon work directory, daemon will change current working directory to this # in linux, default value is /usr/local/dragonfly # in macos(just for testing), default value is /Users/$USER/.dragonfly -workHome: '' +workHome: "" # cacheDir is dynconfig cache storage directory # in linux, default value is /var/cache/dragonfly # in macos(just for testing), default value is /Users/$USER/.dragonfly/cache -cacheDir: '' +cacheDir: "" # logDir is the log storage directory # in linux, default value is /var/log/dragonfly # in macos(just for testing), default value is /Users/$USER/.dragonfly/logs -logDir: '' +logDir: "" # dataDir is the download data storage directory # in linux, default value is /var/lib/dragonfly # in macos(just for testing), default value is /Users/$USER/.dragonfly/data -dataDir: '' +dataDir: "" # when daemon exit, keep peer task data or not # it is usefully when upgrade daemon service, all local cache will be saved @@ -41,7 +41,7 @@ verbose: false pprof-port: -1 # jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" # all addresses of all schedulers # the schedulers of all daemons should be same in one region or zone. @@ -75,13 +75,13 @@ host: # when local ip is different with access ip, advertiseIP should be set advertiseIP: 0.0.0.0 # geographical location, separated by "|" characters - location: '' + location: "" # idc deployed by daemon - idc: '' + idc: "" # security domain deployed by daemon, network isolation between different security domains - securityDomain: '' + securityDomain: "" # network topology, separated by "|" characters - netTopology: '' + netTopology: "" # daemon hostname # hostname: "" @@ -116,9 +116,9 @@ download: # security option security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tlsConfig: null # download service listen address # current, only support unix domain socket @@ -131,9 +131,9 @@ download: peerGRPC: security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tcpListen: # listen address listen: 0.0.0.0 @@ -151,9 +151,9 @@ upload: rateLimit: 100Mi security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tcpListen: # listen address listen: 0.0.0.0 @@ -196,17 +196,17 @@ proxy: # when defaultFilter: "Expires&Signature", for example: # http://localhost/xyz?Expires=111&Signature=222 and http://localhost/xyz?Expires=333&Signature=999 # is same task - defaultFilter: 'Expires&Signature' + defaultFilter: "Expires&Signature" security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tcpListen: # namespace stands the linux net namespace, like /proc/1/ns/net # it's useful for running daemon in pod with ip allocated and listening the special port in host net namespace # Linux only - namespace: '' + namespace: "" # listen address listen: 0.0.0.0 # listen port, daemon will try to listen @@ -248,8 +248,8 @@ proxy: hijackHTTPS: # key pair used to hijack https requests - cert: '' - key: '' + cert: "" + key: "" hosts: - regx: mirror.aliyuncs.com:443 # regexp to match request hosts # whether to ignore https certificate errors @@ -260,7 +260,7 @@ proxy: maxConcurrency: 0 whiteList: # the host of the whitelist - - host: '' + - host: "" # match whitelist hosts regx: # port that need to be added to the whitelist diff --git a/docs/en/deployment/configuration/manager.yaml b/docs/en/deployment/configuration/manager.yaml index 85466f259..d1b61d0b4 100644 --- a/docs/en/deployment/configuration/manager.yaml +++ b/docs/en/deployment/configuration/manager.yaml @@ -70,4 +70,4 @@ verbose: false pprof-port: -1 # jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" diff --git a/docs/en/deployment/configuration/scheduler.yaml b/docs/en/deployment/configuration/scheduler.yaml index 138acebea..e5a8d2e90 100644 --- a/docs/en/deployment/configuration/scheduler.yaml +++ b/docs/en/deployment/configuration/scheduler.yaml @@ -11,22 +11,24 @@ server: # cacheDir is dynconfig cache storage directory # in linux, default value is /var/cache/dragonfly # in macos(just for testing), default value is /Users/$USER/.dragonfly/cache - cacheDir: '' + cacheDir: "" # logDir is the log storage directory # in linux, default value is /var/log/dragonfly # in macos(just for testing), default value is /Users/$USER/.dragonfly/logs - logDir: '' + logDir: "" # scheduler policy configuration scheduler: # algorithm configuration to use different scheduling algorithms, # default configuration supports "default" and "ml" - # "default" is the rule-based scheduling algorithm, "ml" is the machine learning scheduling algorithm + # "default" is the rule-based scheduling algorithm, + # "ml" is the machine learning scheduling algorithm # It also supports user plugin extension, the algorithm value is "plugin", # and the compiled `d7y-scheduler-plugin-evaluator.so` file is added to # the dragonfly working directory plugins algorithm: default - # backSourceCount is the number of backsource clients when the CDN is unavailable + # backSourceCount is the number of backsource clients + # when the CDN is unavailable backSourceCount: 3 # retry scheduling back-to-source limit times retryBackSourceLimit: 5 @@ -57,11 +59,11 @@ dynConfig: # scheduler host configuration host: # idc is the idc of scheduler instance - idc: '' + idc: "" # netTopology is the net topology of scheduler instance - netTopology: '' + netTopology: "" # location is the location of scheduler instance - location: '' + location: "" # manager configuration manager: @@ -76,7 +78,15 @@ manager: # interval interval: 5s -# machinery async job configuration, see https://github.com/RichardKnop/machinery +# cdn configuration +cdn: + # scheduler enable cdn as P2P peer, + # if the value is false, P2P network will not be back-to-source through + # cdn but by dfdaemon and preheat feature does not work + enable: true + +# machinery async job configuration, +# see https://github.com/RichardKnop/machinery job: # scheduler enable job service enable: true @@ -89,11 +99,11 @@ job: # redis configuration redis: # host - host: '' + host: "" # port port: 6379 # password - password: '' + password: "" # brokerDB brokerDB: 1 # backendDB @@ -104,7 +114,7 @@ metrics: # scheduler enable metrics service enable: false # metrics service address - addr: ':8000' + addr: ":8000" # enable peer host metrics enablePeerHost: false @@ -119,4 +129,4 @@ verbose: false pprof-port: -1 # jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" diff --git a/docs/zh-CN/deployment/configuration/cdn.yaml b/docs/zh-CN/deployment/configuration/cdn.yaml index 75050b443..be87fe999 100644 --- a/docs/zh-CN/deployment/configuration/cdn.yaml +++ b/docs/zh-CN/deployment/configuration/cdn.yaml @@ -48,12 +48,12 @@ base: # cdn 日志目录 # linux 上默认目录 /var/log/dragonfly # macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/logs - logDir: '' + logDir: "" # CDN 连接的 manager,可以不指定。 # 各项配置默认值如下。如果 addr 为空字符串,CDN将不会连接manager。 manager: - addr: '' + addr: "" cdnClusterID: 0 keepAlive: interval: 5s @@ -61,9 +61,9 @@ base: # 主机信息 host: # 地理位置 - location: '' + location: "" # IDC(Internet Data Center),互联网数据中心 - idc: '' + idc: "" # 开启数据收集服务 # metrics: @@ -123,7 +123,7 @@ pprof-port: -1 # jaeger 地址 # 默认使用空字符串(不配置 jaeger), 例如: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" # tracer 中使用的 service-name # 默认值:dragonfly-cdn diff --git a/docs/zh-CN/deployment/configuration/dfget.yaml b/docs/zh-CN/deployment/configuration/dfget.yaml index 6cdd275c9..ccc395f5e 100644 --- a/docs/zh-CN/deployment/configuration/dfget.yaml +++ b/docs/zh-CN/deployment/configuration/dfget.yaml @@ -8,22 +8,22 @@ gcInterval: 1m0s # daemon 工作目录 # linux 上默认目录 /usr/local/dragonfly # macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly -workHome: '' +workHome: "" # daemon 动态配置缓存目录 # linux 上默认目录 /var/cache/dragonfly # macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/cache -cacheDir: '' +cacheDir: "" # daemon 日志目录 # linux 上默认目录 /var/log/dragonfly # macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/logs -logDir: '' +logDir: "" # daemon 数据目录 # linux 上默认目录为 /var/lib/dragonfly # macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/data/ -dataDir: '' +dataDir: "" # 当 daemon 退出是, 是否保存缓存数据 # 保留缓存数据在升级 daemon 的时候比较有用 @@ -41,7 +41,7 @@ pprof-port: -1 # jaeger 地址 # 默认使用空字符串(不配置 jaeger), 例如: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" # 调度器地址 # 尽量使用同一个地区的调度器. @@ -73,13 +73,13 @@ host: # 其他 daemon 可以通过这个 IP 地址连接过来 advertiseIP: 0.0.0.0 # 地理信息, 通过 "|" 符号分隔 - location: '' + location: "" # 机房信息 - idc: '' + idc: "" # 安全域信息,不同安全域之间网络隔离 - securityDomain: '' + securityDomain: "" # 网络拓扑结构,通过 "|" 符号分隔 - netTopology: '' + netTopology: "" # 主机名称 # hostname: "" @@ -96,9 +96,9 @@ download: # 安全选项 security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tlsConfig: null # 下载服务监听地址,dfget 下载文件将通过该地址连接到 daemon # 目前是支持 unix domain socket @@ -111,9 +111,9 @@ download: peerGRPC: security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tcpListen: # 监听地址 listen: 0.0.0.0 @@ -130,9 +130,9 @@ upload: rateLimit: 100Mi security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tcpListen: # 监听地址 listen: 0.0.0.0 @@ -174,17 +174,17 @@ proxy: # 例如:defaultFilter: "Expires&Signature": # http://localhost/xyz?Expires=111&Signature=222 and http://localhost/xyz?Expires=333&Signature=999 # 是相同的 task - defaultFilter: 'Expires&Signature' + defaultFilter: "Expires&Signature" security: insecure: true - cacert: '' - cert: '' - key: '' + cacert: "" + cert: "" + key: "" tcpListen: # 监听的网络命名空间, 例如:/proc/1/ns/net # 主要用在部署 kubernetes 中的时候,daemon 不使用 host network 时,监听宿主机的端口 # 仅支持 Linux - namespace: '' + namespace: "" # 监听地址 listen: 0.0.0.0 # 监听端口 @@ -222,8 +222,8 @@ proxy: hijackHTTPS: # https 劫持的证书和密钥 # 建议自签 CA 并更新主机证书链 - cert: '' - key: '' + cert: "" + key: "" # 需要走蜻蜓 p2p 的流量 hosts: - regx: mirror.aliyuncs.com:443 # 正则匹配 @@ -236,7 +236,7 @@ proxy: # 白名单,如果设置了,仅白名单内可以走代理,其他的都拒绝 whiteList: # 主机信息 - - host: '' + - host: "" # 正则匹配 regx: # 端口白名单 diff --git a/docs/zh-CN/deployment/configuration/manager.yaml b/docs/zh-CN/deployment/configuration/manager.yaml index 7b7d3ff67..dbefdffa1 100644 --- a/docs/zh-CN/deployment/configuration/manager.yaml +++ b/docs/zh-CN/deployment/configuration/manager.yaml @@ -69,4 +69,4 @@ pprof-port: -1 # jaeger 地址 # 默认使用空字符串(不配置 jaeger), 例如: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" diff --git a/docs/zh-CN/deployment/configuration/scheduler.yaml b/docs/zh-CN/deployment/configuration/scheduler.yaml index 270b2b0f1..a626a8aee 100644 --- a/docs/zh-CN/deployment/configuration/scheduler.yaml +++ b/docs/zh-CN/deployment/configuration/scheduler.yaml @@ -11,11 +11,11 @@ server: # daemon 动态配置缓存目录 # linux 上默认目录 /var/cache/dragonfly # macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/cache - cacheDir: '' + cacheDir: "" # daemon 日志目录 # linux 上默认目录 /var/log/dragonfly # macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/logs - logDir: '' + logDir: "" # scheduler 调度策略配置 scheduler: @@ -55,18 +55,18 @@ dynConfig: # 实例主机信息 host: # 实例所在机房 - idc: '' + idc: "" # 实例网络拓扑信息 - netTopology: '' + netTopology: "" # 实例所在的地理位置信息 - location: '' + location: "" # manager 配置 manager: # 启动与 manager 的连接 enable: true # manager 访问地址 - addr: '' + addr: "" # 注册的 scheduler 集群 ID schedulerClusterID: # manager 心跳配置 @@ -74,6 +74,13 @@ manager: # 保持心跳的时间间隔 interval: 5s +# cdn 配置 +cdn: + # 启动 cdn 作为 P2P 网络节点, + # 如果值为 false 第一次回源请求不通过 cdn 而是通过 dfdaemon 直接回源, + # 而且无法使用预热功能 + enable: true + # machinery 异步任务配置,配置参考 https://github.com/RichardKnop/machinery job: # 启动 job 服务 @@ -87,11 +94,11 @@ job: # redis 配置 redis: # 服务地址 - host: '' + host: "" # 服务端口 port: 6379 # 密码 - password: '' + password: "" # broker 数据库 brokerDB: 1 # backend 数据库 @@ -102,7 +109,7 @@ metrics: # 启动数据收集服务 enable: false # 数据服务地址 - addr: ':8000' + addr: ":8000" # 开机收集 peer host 数据 enablePeerHost: false @@ -117,4 +124,4 @@ pprof-port: -1 # jaeger 地址 # 默认使用空字符串(不配置 jaeger), 例如: http://jaeger.dragonfly.svc:14268/api/traces -jaeger: '' +jaeger: "" diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 805484c12..ba8329d7e 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -42,6 +42,9 @@ type Config struct { // Manager configuration Manager *ManagerConfig `yaml:"manager" mapstructure:"manager"` + // CDN configuration + CDN *CDNConfig `yaml:"cdn" mapstructure:"cdn"` + // Host configuration Host *HostConfig `yaml:"host" mapstructure:"host"` @@ -87,6 +90,9 @@ func New() *Config { Interval: 5 * time.Second, }, }, + CDN: &CDNConfig{ + Enable: true, + }, Job: &JobConfig{ Enable: true, GlobalWorkerNum: 10, @@ -301,6 +307,11 @@ type ManagerConfig struct { KeepAlive KeepAliveConfig `yaml:"keepAlive" mapstructure:"keepAlive"` } +type CDNConfig struct { + // Enable is to enable cdn as P2P peer + Enable bool `yaml:"enable" mapstructure:"enable"` +} + type KeepAliveConfig struct { // Keep alive interval Interval time.Duration `yaml:"interval" mapstructure:"interval"` diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index dd8a0551a..7d2e15218 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -70,6 +70,9 @@ func TestConfig_Load(t *testing.T) { Interval: 5 * time.Second, }, }, + CDN: &CDNConfig{ + Enable: true, + }, Job: &JobConfig{ Enable: true, GlobalWorkerNum: 1, diff --git a/scheduler/config/testdata/scheduler.yaml b/scheduler/config/testdata/scheduler.yaml index df6159f29..65207ab6d 100644 --- a/scheduler/config/testdata/scheduler.yaml +++ b/scheduler/config/testdata/scheduler.yaml @@ -36,6 +36,9 @@ manager: keepAlive: interval: 5000000000 +cdn: + enable: true + job: enable: true globalWorkerNum: 1 diff --git a/scheduler/job/job.go b/scheduler/job/job.go index d0b1c7be9..4514927ae 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -18,6 +18,7 @@ package job import ( "context" + "errors" "strings" "github.com/go-http-utils/headers" @@ -101,36 +102,40 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { return t, nil } -func (t *job) Serve() { +func (j *job) Serve() { go func() { - logger.Infof("ready to launch %d worker(s) on global queue", t.config.Job.GlobalWorkerNum) - if err := t.globalJob.LaunchWorker("global_worker", int(t.config.Job.GlobalWorkerNum)); err != nil { + logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum) + if err := j.globalJob.LaunchWorker("global_worker", int(j.config.Job.GlobalWorkerNum)); err != nil { logger.Fatalf("global queue worker error: %v", err) } }() go func() { - logger.Infof("ready to launch %d worker(s) on scheduler queue", t.config.Job.SchedulerWorkerNum) - if err := t.schedulerJob.LaunchWorker("scheduler_worker", int(t.config.Job.SchedulerWorkerNum)); err != nil { + logger.Infof("ready to launch %d worker(s) on scheduler queue", j.config.Job.SchedulerWorkerNum) + if err := j.schedulerJob.LaunchWorker("scheduler_worker", int(j.config.Job.SchedulerWorkerNum)); err != nil { logger.Fatalf("scheduler queue worker error: %v", err) } }() go func() { - logger.Infof("ready to launch %d worker(s) on local queue", t.config.Job.LocalWorkerNum) - if err := t.localJob.LaunchWorker("local_worker", int(t.config.Job.LocalWorkerNum)); err != nil { + logger.Infof("ready to launch %d worker(s) on local queue", j.config.Job.LocalWorkerNum) + if err := j.localJob.LaunchWorker("local_worker", int(j.config.Job.LocalWorkerNum)); err != nil { logger.Fatalf("scheduler queue worker error: %v", err) } }() } -func (t *job) Stop() { - t.globalJob.Worker.Quit() - t.schedulerJob.Worker.Quit() - t.localJob.Worker.Quit() +func (j *job) Stop() { + j.globalJob.Worker.Quit() + j.schedulerJob.Worker.Quit() + j.localJob.Worker.Quit() } -func (t *job) preheat(ctx context.Context, req string) error { +func (j *job) preheat(ctx context.Context, req string) error { + if !j.config.CDN.Enable { + return errors.New("scheduler has disabled cdn") + } + request := &internaljob.PreheatRequest{} if err := internaljob.UnmarshalRequest(req, request); err != nil { logger.Errorf("unmarshal request err: %v, request body: %s", err, req) @@ -161,7 +166,7 @@ func (t *job) preheat(ctx context.Context, req string) error { log := logger.WithTaskIDAndURL(taskID, request.URL) log.Infof("preheat %s headers: %#v, tag: %s, range: %s, filter: %s, digest: %s", request.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest) - stream, err := t.resource.CDN().Client().ObtainSeeds(ctx, &cdnsystem.SeedRequest{ + stream, err := j.resource.CDN().Client().ObtainSeeds(ctx, &cdnsystem.SeedRequest{ TaskId: taskID, Url: request.URL, UrlMeta: urlMeta, diff --git a/scheduler/service/service.go b/scheduler/service/service.go index bda65ef7c..3f142607a 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -358,20 +358,10 @@ func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRe return nil, err } - // Start seed cdn task - go func() { - task.Log.Infof("trigger cdn download task and task status is %s", task.FSM.Current()) - peer, endOfPiece, err := s.resource.CDN().TriggerTask(context.Background(), task) - if err != nil { - task.Log.Errorf("trigger cdn download task failed: %v", err) - s.handleTaskFail(ctx, task) - return - } - - // Update the task status first to help peer scheduling evaluation and scoring - s.handleTaskSuccess(ctx, task, endOfPiece) - s.handlePeerSuccess(ctx, peer) - }() + // Start trigger cdn task + if s.config.CDN.Enable { + go s.triggerCDNTask(ctx, task) + } return task, nil } @@ -409,6 +399,21 @@ func (s *Service) registerPeer(ctx context.Context, req *rpcscheduler.PeerTaskRe return peer } +// triggerCDNTask starts trigger cdn task +func (s *Service) triggerCDNTask(ctx context.Context, task *resource.Task) { + task.Log.Infof("trigger cdn download task and task status is %s", task.FSM.Current()) + peer, endOfPiece, err := s.resource.CDN().TriggerTask(context.Background(), task) + if err != nil { + task.Log.Errorf("trigger cdn download task failed: %v", err) + s.handleTaskFail(ctx, task) + return + } + + // Update the task status first to help peer scheduling evaluation and scoring + s.handleTaskSuccess(ctx, task, endOfPiece) + s.handlePeerSuccess(ctx, peer) +} + // handleBeginOfPiece handles begin of piece func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) { switch peer.FSM.Current() { @@ -501,18 +506,11 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec fallthrough case base.Code_CDNTaskNotFound: s.handlePeerFail(ctx, parent) - go func() { - parent.Log.Info("cdn restart seed task") - cdnPeer, endOfPiece, err := s.resource.CDN().TriggerTask(context.Background(), parent.Task) - if err != nil { - peer.Log.Errorf("retrigger task failed: %v", err) - s.handleTaskFail(ctx, parent.Task) - return - } - s.handleTaskSuccess(ctx, cdnPeer.Task, endOfPiece) - s.handlePeerSuccess(ctx, cdnPeer) - }() + // Start trigger cdn task + if s.config.CDN.Enable { + go s.triggerCDNTask(ctx, parent.Task) + } default: } diff --git a/scheduler/service/service_test.go b/scheduler/service/service_test.go index b032c207a..970a9503a 100644 --- a/scheduler/service/service_test.go +++ b/scheduler/service/service_test.go @@ -1212,12 +1212,19 @@ func TestService_LeaveTask(t *testing.T) { func TestService_registerTask(t *testing.T) { tests := []struct { - name string - req *rpcscheduler.PeerTaskRequest - run func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) + name string + config *config.Config + req *rpcscheduler.PeerTaskRequest + run func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) }{ { name: "task already exists and state is TaskStateRunning", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: true, + }, + }, req: &rpcscheduler.PeerTaskRequest{ Url: mockTaskURL, UrlMeta: mockTaskURLMeta, @@ -1239,6 +1246,12 @@ func TestService_registerTask(t *testing.T) { }, { name: "task already exists and state is TaskStateSucceeded", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: true, + }, + }, req: &rpcscheduler.PeerTaskRequest{ Url: mockTaskURL, UrlMeta: mockTaskURLMeta, @@ -1260,6 +1273,12 @@ func TestService_registerTask(t *testing.T) { }, { name: "task state is TaskStatePending", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: true, + }, + }, req: &rpcscheduler.PeerTaskRequest{ Url: mockTaskURL, UrlMeta: mockTaskURLMeta, @@ -1285,6 +1304,12 @@ func TestService_registerTask(t *testing.T) { }, { name: "task state is TaskStateFailed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: true, + }, + }, req: &rpcscheduler.PeerTaskRequest{ Url: mockTaskURL, UrlMeta: mockTaskURLMeta, @@ -1310,6 +1335,12 @@ func TestService_registerTask(t *testing.T) { }, { name: "task state is TaskStatePending, but trigger cdn failed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: true, + }, + }, req: &rpcscheduler.PeerTaskRequest{ Url: mockTaskURL, UrlMeta: mockTaskURLMeta, @@ -1335,6 +1366,12 @@ func TestService_registerTask(t *testing.T) { }, { name: "task state is TaskStateFailed, but trigger cdn failed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: true, + }, + }, req: &rpcscheduler.PeerTaskRequest{ Url: mockTaskURL, UrlMeta: mockTaskURLMeta, @@ -1352,6 +1389,57 @@ func TestService_registerTask(t *testing.T) { mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, errors.New("foo")).Times(1), ) + task, err := svc.registerTask(context.Background(), req) + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(mockTask, task) + }, + }, + { + name: "task state is TaskStatePending and disable cdn", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: false, + }, + }, + req: &rpcscheduler.PeerTaskRequest{ + Url: mockTaskURL, + UrlMeta: mockTaskURLMeta, + }, + run: func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) { + + mockTask.FSM.SetState(resource.TaskStatePending) + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + ) + + task, err := svc.registerTask(context.Background(), req) + assert := assert.New(t) + assert.NoError(err) + assert.EqualValues(mockTask, task) + }, + }, + { + name: "task state is TaskStateFailed and disable cdn", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{ + Enable: false, + }, + }, + req: &rpcscheduler.PeerTaskRequest{ + Url: mockTaskURL, + UrlMeta: mockTaskURLMeta, + }, + run: func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) { + mockTask.FSM.SetState(resource.TaskStateFailed) + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1), + ) + task, err := svc.registerTask(context.Background(), req) assert := assert.New(t) assert.NoError(err) @@ -1367,7 +1455,8 @@ func TestService_registerTask(t *testing.T) { scheduler := mocks.NewMockScheduler(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - svc := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig) + svc := New(tc.config, res, scheduler, dynconfig) + taskManager := resource.NewMockTaskManager(ctl) mockHost := resource.NewHost(mockRawHost) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) @@ -1466,6 +1555,69 @@ func TestService_registerHost(t *testing.T) { } } +func TestService_triggerCDNTask(t *testing.T) { + tests := []struct { + name string + mock func(task *resource.Task, peer *resource.Peer, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mc *resource.MockCDNMockRecorder) + expect func(t *testing.T, task *resource.Task, peer *resource.Peer) + }{ + { + name: "trigger cdn task", + mock: func(task *resource.Task, peer *resource.Peer, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mc *resource.MockCDNMockRecorder) { + task.FSM.SetState(resource.TaskStateRunning) + peer.FSM.SetState(resource.PeerStateRunning) + gomock.InOrder( + mr.CDN().Return(cdn).Times(1), + mc.TriggerTask(gomock.Any(), gomock.Any()).Return(peer, &rpcscheduler.PeerResult{ + TotalPieceCount: 3, + ContentLength: 1024, + }, nil).Times(1), + ) + }, + expect: func(t *testing.T, task *resource.Task, peer *resource.Peer) { + assert := assert.New(t) + assert.True(task.FSM.Is(resource.TaskStateSucceeded)) + assert.Equal(task.TotalPieceCount.Load(), int32(3)) + assert.Equal(task.ContentLength.Load(), int64(1024)) + assert.True(peer.FSM.Is(resource.PeerStateSucceeded)) + }, + }, + { + name: "trigger cdn task failed", + mock: func(task *resource.Task, peer *resource.Peer, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mc *resource.MockCDNMockRecorder) { + task.FSM.SetState(resource.TaskStateRunning) + gomock.InOrder( + mr.CDN().Return(cdn).Times(1), + mc.TriggerTask(gomock.Any(), gomock.Any()).Return(peer, &rpcscheduler.PeerResult{}, errors.New("foo")).Times(1), + ) + }, + expect: func(t *testing.T, task *resource.Task, peer *resource.Peer) { + assert := assert.New(t) + assert.True(task.FSM.Is(resource.TaskStateFailed)) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduler := mocks.NewMockScheduler(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + cdn := resource.NewMockCDN(ctl) + mockHost := resource.NewHost(mockRawHost) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta) + peer := resource.NewPeer(mockPeerID, task, mockHost) + svc := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig) + + tc.mock(task, peer, cdn, res.EXPECT(), cdn.EXPECT()) + svc.triggerCDNTask(context.Background(), task) + tc.expect(t, task, peer) + }) + } +} + func TestService_handleBeginOfPiece(t *testing.T) { tests := []struct { name string @@ -1688,13 +1840,19 @@ func TestService_handlePieceFail(t *testing.T) { tests := []struct { name string + config *config.Config piece *rpcscheduler.PieceResult peer *resource.Peer parent *resource.Peer run func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) }{ { - name: "peer state is PeerStateBackToSource", + name: "peer state is PeerStateBackToSource", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{}, peer: resource.NewPeer(mockPeerID, mockTask, mockHost), parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost), @@ -1708,6 +1866,11 @@ func TestService_handlePieceFail(t *testing.T) { }, { name: "can not found parent", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_ClientWaitPieceReady, DstPid: mockCDNPeerID, @@ -1729,6 +1892,11 @@ func TestService_handlePieceFail(t *testing.T) { }, { name: "piece result code is Code_ClientPieceDownloadFail and parent state set PeerEventDownloadFailed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_ClientPieceDownloadFail, DstPid: mockCDNPeerID, @@ -1754,6 +1922,11 @@ func TestService_handlePieceFail(t *testing.T) { }, { name: "piece result code is Code_PeerTaskNotFound and parent state set PeerEventDownloadFailed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_PeerTaskNotFound, DstPid: mockCDNPeerID, @@ -1779,6 +1952,11 @@ func TestService_handlePieceFail(t *testing.T) { }, { name: "piece result code is Code_ClientPieceNotFound and parent is not CDN", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_ClientPieceNotFound, DstPid: mockCDNPeerID, @@ -1803,6 +1981,11 @@ func TestService_handlePieceFail(t *testing.T) { }, { name: "piece result code is Code_CDNError and parent state set PeerEventDownloadFailed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_CDNError, DstPid: mockCDNPeerID, @@ -1828,6 +2011,11 @@ func TestService_handlePieceFail(t *testing.T) { }, { name: "piece result code is Code_CDNTaskDownloadFail and parent state set PeerEventDownloadFailed", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_CDNTaskDownloadFail, DstPid: mockCDNPeerID, @@ -1853,6 +2041,11 @@ func TestService_handlePieceFail(t *testing.T) { }, { name: "piece result code is Code_ClientPieceRequestFail", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_ClientPieceRequestFail, DstPid: mockCDNPeerID, @@ -1876,8 +2069,78 @@ func TestService_handlePieceFail(t *testing.T) { assert.True(parent.FSM.Is(resource.PeerStateRunning)) }, }, + { + name: "piece result code is Code_CDNTaskNotFound", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, + piece: &rpcscheduler.PieceResult{ + Code: base.Code_CDNTaskNotFound, + DstPid: mockCDNPeerID, + }, + peer: resource.NewPeer(mockPeerID, mockTask, mockHost), + parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost), + run: func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) { + var wg sync.WaitGroup + wg.Add(2) + defer wg.Wait() + + peer.FSM.SetState(resource.PeerStateRunning) + parent.FSM.SetState(resource.PeerStateRunning) + blocklist := set.NewSafeSet() + blocklist.Add(parent.ID) + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), + ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), + mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1), + mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(peer, &rpcscheduler.PeerResult{}, nil).Times(1), + ) + + svc.handlePieceFail(context.Background(), peer, piece) + assert := assert.New(t) + assert.True(parent.FSM.Is(resource.PeerStateFailed)) + }, + }, + { + name: "piece result code is Code_CDNTaskNotFound and disable cdn", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: false}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, + piece: &rpcscheduler.PieceResult{ + Code: base.Code_CDNTaskNotFound, + DstPid: mockCDNPeerID, + }, + peer: resource.NewPeer(mockPeerID, mockTask, mockHost), + parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost), + run: func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) { + peer.FSM.SetState(resource.PeerStateRunning) + parent.FSM.SetState(resource.PeerStateRunning) + blocklist := set.NewSafeSet() + blocklist.Add(parent.ID) + gomock.InOrder( + mr.PeerManager().Return(peerManager).Times(1), + mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1), + ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1), + ) + + svc.handlePieceFail(context.Background(), peer, piece) + assert := assert.New(t) + assert.True(peer.FSM.Is(resource.PeerStateRunning)) + assert.True(parent.FSM.Is(resource.PeerStateFailed)) + }, + }, { name: "piece result code is unknow", + config: &config.Config{ + Scheduler: mockSchedulerConfig, + CDN: &config.CDNConfig{Enable: true}, + Metrics: &config.MetricsConfig{EnablePeerHost: true}, + }, piece: &rpcscheduler.PieceResult{ Code: base.Code_ClientPieceRequestFail, DstPid: mockCDNPeerID, @@ -1912,7 +2175,7 @@ func TestService_handlePieceFail(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) peerManager := resource.NewMockPeerManager(ctl) cdn := resource.NewMockCDN(ctl) - svc := New(&config.Config{Scheduler: mockSchedulerConfig, Metrics: &config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig) + svc := New(tc.config, res, scheduler, dynconfig) tc.run(t, svc, tc.peer, tc.parent, tc.piece, peerManager, cdn, scheduler.EXPECT(), res.EXPECT(), peerManager.EXPECT(), cdn.EXPECT()) })