feat: scheduler blocks cdn (#1079)

* feat: scheduler blocks cdn

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-02-17 15:03:54 +08:00
parent d52782ff38
commit 060429c675
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
14 changed files with 428 additions and 128 deletions

View File

@ -44,11 +44,11 @@ base:
# logDir is the log storage directory
# in linux, default value is /var/log/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/logs
logDir: ''
logDir: ""
# manager configuration
manager:
addr: ''
addr: ""
cdnClusterID: 0
keepAlive:
interval: 5s
@ -116,7 +116,7 @@ verbose: false
pprof-port: -1
# jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""
# service name used in tracer
# default: dragonfly-cdn

View File

@ -8,22 +8,22 @@ gcInterval: 1m0s
# daemon work directory, daemon will change current working directory to this
# in linux, default value is /usr/local/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly
workHome: ''
workHome: ""
# cacheDir is dynconfig cache storage directory
# in linux, default value is /var/cache/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/cache
cacheDir: ''
cacheDir: ""
# logDir is the log storage directory
# in linux, default value is /var/log/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/logs
logDir: ''
logDir: ""
# dataDir is the download data storage directory
# in linux, default value is /var/lib/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/data
dataDir: ''
dataDir: ""
# when daemon exit, keep peer task data or not
# it is usefully when upgrade daemon service, all local cache will be saved
@ -41,7 +41,7 @@ verbose: false
pprof-port: -1
# jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""
# all addresses of all schedulers
# the schedulers of all daemons should be same in one region or zone.
@ -75,13 +75,13 @@ host:
# when local ip is different with access ip, advertiseIP should be set
advertiseIP: 0.0.0.0
# geographical location, separated by "|" characters
location: ''
location: ""
# idc deployed by daemon
idc: ''
idc: ""
# security domain deployed by daemon, network isolation between different security domains
securityDomain: ''
securityDomain: ""
# network topology, separated by "|" characters
netTopology: ''
netTopology: ""
# daemon hostname
# hostname: ""
@ -116,9 +116,9 @@ download:
# security option
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tlsConfig: null
# download service listen address
# current, only support unix domain socket
@ -131,9 +131,9 @@ download:
peerGRPC:
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tcpListen:
# listen address
listen: 0.0.0.0
@ -151,9 +151,9 @@ upload:
rateLimit: 100Mi
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tcpListen:
# listen address
listen: 0.0.0.0
@ -196,17 +196,17 @@ proxy:
# when defaultFilter: "Expires&Signature", for example:
# http://localhost/xyz?Expires=111&Signature=222 and http://localhost/xyz?Expires=333&Signature=999
# is same task
defaultFilter: 'Expires&Signature'
defaultFilter: "Expires&Signature"
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tcpListen:
# namespace stands the linux net namespace, like /proc/1/ns/net
# it's useful for running daemon in pod with ip allocated and listening the special port in host net namespace
# Linux only
namespace: ''
namespace: ""
# listen address
listen: 0.0.0.0
# listen port, daemon will try to listen
@ -248,8 +248,8 @@ proxy:
hijackHTTPS:
# key pair used to hijack https requests
cert: ''
key: ''
cert: ""
key: ""
hosts:
- regx: mirror.aliyuncs.com:443 # regexp to match request hosts
# whether to ignore https certificate errors
@ -260,7 +260,7 @@ proxy:
maxConcurrency: 0
whiteList:
# the host of the whitelist
- host: ''
- host: ""
# match whitelist hosts
regx:
# port that need to be added to the whitelist

View File

@ -70,4 +70,4 @@ verbose: false
pprof-port: -1
# jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""

View File

@ -11,22 +11,24 @@ server:
# cacheDir is dynconfig cache storage directory
# in linux, default value is /var/cache/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/cache
cacheDir: ''
cacheDir: ""
# logDir is the log storage directory
# in linux, default value is /var/log/dragonfly
# in macos(just for testing), default value is /Users/$USER/.dragonfly/logs
logDir: ''
logDir: ""
# scheduler policy configuration
scheduler:
# algorithm configuration to use different scheduling algorithms,
# default configuration supports "default" and "ml"
# "default" is the rule-based scheduling algorithm, "ml" is the machine learning scheduling algorithm
# "default" is the rule-based scheduling algorithm,
# "ml" is the machine learning scheduling algorithm
# It also supports user plugin extension, the algorithm value is "plugin",
# and the compiled `d7y-scheduler-plugin-evaluator.so` file is added to
# the dragonfly working directory plugins
algorithm: default
# backSourceCount is the number of backsource clients when the CDN is unavailable
# backSourceCount is the number of backsource clients
# when the CDN is unavailable
backSourceCount: 3
# retry scheduling back-to-source limit times
retryBackSourceLimit: 5
@ -57,11 +59,11 @@ dynConfig:
# scheduler host configuration
host:
# idc is the idc of scheduler instance
idc: ''
idc: ""
# netTopology is the net topology of scheduler instance
netTopology: ''
netTopology: ""
# location is the location of scheduler instance
location: ''
location: ""
# manager configuration
manager:
@ -76,7 +78,15 @@ manager:
# interval
interval: 5s
# machinery async job configuration, see https://github.com/RichardKnop/machinery
# cdn configuration
cdn:
# scheduler enable cdn as P2P peer,
# if the value is false, P2P network will not be back-to-source through
# cdn but by dfdaemon and preheat feature does not work
enable: true
# machinery async job configuration,
# see https://github.com/RichardKnop/machinery
job:
# scheduler enable job service
enable: true
@ -89,11 +99,11 @@ job:
# redis configuration
redis:
# host
host: ''
host: ""
# port
port: 6379
# password
password: ''
password: ""
# brokerDB
brokerDB: 1
# backendDB
@ -104,7 +114,7 @@ metrics:
# scheduler enable metrics service
enable: false
# metrics service address
addr: ':8000'
addr: ":8000"
# enable peer host metrics
enablePeerHost: false
@ -119,4 +129,4 @@ verbose: false
pprof-port: -1
# jaeger endpoint url, like: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""

View File

@ -48,12 +48,12 @@ base:
# cdn 日志目录
# linux 上默认目录 /var/log/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/logs
logDir: ''
logDir: ""
# CDN 连接的 manager可以不指定。
# 各项配置默认值如下。如果 addr 为空字符串CDN将不会连接manager。
manager:
addr: ''
addr: ""
cdnClusterID: 0
keepAlive:
interval: 5s
@ -61,9 +61,9 @@ base:
# 主机信息
host:
# 地理位置
location: ''
location: ""
# IDC(Internet Data Center),互联网数据中心
idc: ''
idc: ""
# 开启数据收集服务
# metrics:
@ -123,7 +123,7 @@ pprof-port: -1
# jaeger 地址
# 默认使用空字符串(不配置 jaeger, 例如: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""
# tracer 中使用的 service-name
# 默认值dragonfly-cdn

View File

@ -8,22 +8,22 @@ gcInterval: 1m0s
# daemon 工作目录
# linux 上默认目录 /usr/local/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly
workHome: ''
workHome: ""
# daemon 动态配置缓存目录
# linux 上默认目录 /var/cache/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/cache
cacheDir: ''
cacheDir: ""
# daemon 日志目录
# linux 上默认目录 /var/log/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/logs
logDir: ''
logDir: ""
# daemon 数据目录
# linux 上默认目录为 /var/lib/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/data/
dataDir: ''
dataDir: ""
# 当 daemon 退出是, 是否保存缓存数据
# 保留缓存数据在升级 daemon 的时候比较有用
@ -41,7 +41,7 @@ pprof-port: -1
# jaeger 地址
# 默认使用空字符串(不配置 jaeger, 例如: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""
# 调度器地址
# 尽量使用同一个地区的调度器.
@ -73,13 +73,13 @@ host:
# 其他 daemon 可以通过这个 IP 地址连接过来
advertiseIP: 0.0.0.0
# 地理信息, 通过 "|" 符号分隔
location: ''
location: ""
# 机房信息
idc: ''
idc: ""
# 安全域信息,不同安全域之间网络隔离
securityDomain: ''
securityDomain: ""
# 网络拓扑结构,通过 "|" 符号分隔
netTopology: ''
netTopology: ""
# 主机名称
# hostname: ""
@ -96,9 +96,9 @@ download:
# 安全选项
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tlsConfig: null
# 下载服务监听地址dfget 下载文件将通过该地址连接到 daemon
# 目前是支持 unix domain socket
@ -111,9 +111,9 @@ download:
peerGRPC:
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tcpListen:
# 监听地址
listen: 0.0.0.0
@ -130,9 +130,9 @@ upload:
rateLimit: 100Mi
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tcpListen:
# 监听地址
listen: 0.0.0.0
@ -174,17 +174,17 @@ proxy:
# 例如defaultFilter: "Expires&Signature":
# http://localhost/xyz?Expires=111&Signature=222 and http://localhost/xyz?Expires=333&Signature=999
# 是相同的 task
defaultFilter: 'Expires&Signature'
defaultFilter: "Expires&Signature"
security:
insecure: true
cacert: ''
cert: ''
key: ''
cacert: ""
cert: ""
key: ""
tcpListen:
# 监听的网络命名空间, 例如:/proc/1/ns/net
# 主要用在部署 kubernetes 中的时候daemon 不使用 host network 时,监听宿主机的端口
# 仅支持 Linux
namespace: ''
namespace: ""
# 监听地址
listen: 0.0.0.0
# 监听端口
@ -222,8 +222,8 @@ proxy:
hijackHTTPS:
# https 劫持的证书和密钥
# 建议自签 CA 并更新主机证书链
cert: ''
key: ''
cert: ""
key: ""
# 需要走蜻蜓 p2p 的流量
hosts:
- regx: mirror.aliyuncs.com:443 # 正则匹配
@ -236,7 +236,7 @@ proxy:
# 白名单,如果设置了,仅白名单内可以走代理,其他的都拒绝
whiteList:
# 主机信息
- host: ''
- host: ""
# 正则匹配
regx:
# 端口白名单

View File

@ -69,4 +69,4 @@ pprof-port: -1
# jaeger 地址
# 默认使用空字符串(不配置 jaeger, 例如: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""

View File

@ -11,11 +11,11 @@ server:
# daemon 动态配置缓存目录
# linux 上默认目录 /var/cache/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/cache
cacheDir: ''
cacheDir: ""
# daemon 日志目录
# linux 上默认目录 /var/log/dragonfly
# macos(仅开发、测试), 默认目录是 /Users/$USER/.dragonfly/logs
logDir: ''
logDir: ""
# scheduler 调度策略配置
scheduler:
@ -55,18 +55,18 @@ dynConfig:
# 实例主机信息
host:
# 实例所在机房
idc: ''
idc: ""
# 实例网络拓扑信息
netTopology: ''
netTopology: ""
# 实例所在的地理位置信息
location: ''
location: ""
# manager 配置
manager:
# 启动与 manager 的连接
enable: true
# manager 访问地址
addr: ''
addr: ""
# 注册的 scheduler 集群 ID
schedulerClusterID:
# manager 心跳配置
@ -74,6 +74,13 @@ manager:
# 保持心跳的时间间隔
interval: 5s
# cdn 配置
cdn:
# 启动 cdn 作为 P2P 网络节点,
# 如果值为 false 第一次回源请求不通过 cdn 而是通过 dfdaemon 直接回源,
# 而且无法使用预热功能
enable: true
# machinery 异步任务配置,配置参考 https://github.com/RichardKnop/machinery
job:
# 启动 job 服务
@ -87,11 +94,11 @@ job:
# redis 配置
redis:
# 服务地址
host: ''
host: ""
# 服务端口
port: 6379
# 密码
password: ''
password: ""
# broker 数据库
brokerDB: 1
# backend 数据库
@ -102,7 +109,7 @@ metrics:
# 启动数据收集服务
enable: false
# 数据服务地址
addr: ':8000'
addr: ":8000"
# 开机收集 peer host 数据
enablePeerHost: false
@ -117,4 +124,4 @@ pprof-port: -1
# jaeger 地址
# 默认使用空字符串(不配置 jaeger, 例如: http://jaeger.dragonfly.svc:14268/api/traces
jaeger: ''
jaeger: ""

View File

@ -42,6 +42,9 @@ type Config struct {
// Manager configuration
Manager *ManagerConfig `yaml:"manager" mapstructure:"manager"`
// CDN configuration
CDN *CDNConfig `yaml:"cdn" mapstructure:"cdn"`
// Host configuration
Host *HostConfig `yaml:"host" mapstructure:"host"`
@ -87,6 +90,9 @@ func New() *Config {
Interval: 5 * time.Second,
},
},
CDN: &CDNConfig{
Enable: true,
},
Job: &JobConfig{
Enable: true,
GlobalWorkerNum: 10,
@ -301,6 +307,11 @@ type ManagerConfig struct {
KeepAlive KeepAliveConfig `yaml:"keepAlive" mapstructure:"keepAlive"`
}
type CDNConfig struct {
// Enable is to enable cdn as P2P peer
Enable bool `yaml:"enable" mapstructure:"enable"`
}
type KeepAliveConfig struct {
// Keep alive interval
Interval time.Duration `yaml:"interval" mapstructure:"interval"`

View File

@ -70,6 +70,9 @@ func TestConfig_Load(t *testing.T) {
Interval: 5 * time.Second,
},
},
CDN: &CDNConfig{
Enable: true,
},
Job: &JobConfig{
Enable: true,
GlobalWorkerNum: 1,

View File

@ -36,6 +36,9 @@ manager:
keepAlive:
interval: 5000000000
cdn:
enable: true
job:
enable: true
globalWorkerNum: 1

View File

@ -18,6 +18,7 @@ package job
import (
"context"
"errors"
"strings"
"github.com/go-http-utils/headers"
@ -101,36 +102,40 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
return t, nil
}
func (t *job) Serve() {
func (j *job) Serve() {
go func() {
logger.Infof("ready to launch %d worker(s) on global queue", t.config.Job.GlobalWorkerNum)
if err := t.globalJob.LaunchWorker("global_worker", int(t.config.Job.GlobalWorkerNum)); err != nil {
logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum)
if err := j.globalJob.LaunchWorker("global_worker", int(j.config.Job.GlobalWorkerNum)); err != nil {
logger.Fatalf("global queue worker error: %v", err)
}
}()
go func() {
logger.Infof("ready to launch %d worker(s) on scheduler queue", t.config.Job.SchedulerWorkerNum)
if err := t.schedulerJob.LaunchWorker("scheduler_worker", int(t.config.Job.SchedulerWorkerNum)); err != nil {
logger.Infof("ready to launch %d worker(s) on scheduler queue", j.config.Job.SchedulerWorkerNum)
if err := j.schedulerJob.LaunchWorker("scheduler_worker", int(j.config.Job.SchedulerWorkerNum)); err != nil {
logger.Fatalf("scheduler queue worker error: %v", err)
}
}()
go func() {
logger.Infof("ready to launch %d worker(s) on local queue", t.config.Job.LocalWorkerNum)
if err := t.localJob.LaunchWorker("local_worker", int(t.config.Job.LocalWorkerNum)); err != nil {
logger.Infof("ready to launch %d worker(s) on local queue", j.config.Job.LocalWorkerNum)
if err := j.localJob.LaunchWorker("local_worker", int(j.config.Job.LocalWorkerNum)); err != nil {
logger.Fatalf("scheduler queue worker error: %v", err)
}
}()
}
func (t *job) Stop() {
t.globalJob.Worker.Quit()
t.schedulerJob.Worker.Quit()
t.localJob.Worker.Quit()
func (j *job) Stop() {
j.globalJob.Worker.Quit()
j.schedulerJob.Worker.Quit()
j.localJob.Worker.Quit()
}
func (t *job) preheat(ctx context.Context, req string) error {
func (j *job) preheat(ctx context.Context, req string) error {
if !j.config.CDN.Enable {
return errors.New("scheduler has disabled cdn")
}
request := &internaljob.PreheatRequest{}
if err := internaljob.UnmarshalRequest(req, request); err != nil {
logger.Errorf("unmarshal request err: %v, request body: %s", err, req)
@ -161,7 +166,7 @@ func (t *job) preheat(ctx context.Context, req string) error {
log := logger.WithTaskIDAndURL(taskID, request.URL)
log.Infof("preheat %s headers: %#v, tag: %s, range: %s, filter: %s, digest: %s",
request.URL, urlMeta.Header, urlMeta.Tag, urlMeta.Range, urlMeta.Filter, urlMeta.Digest)
stream, err := t.resource.CDN().Client().ObtainSeeds(ctx, &cdnsystem.SeedRequest{
stream, err := j.resource.CDN().Client().ObtainSeeds(ctx, &cdnsystem.SeedRequest{
TaskId: taskID,
Url: request.URL,
UrlMeta: urlMeta,

View File

@ -358,20 +358,10 @@ func (s *Service) registerTask(ctx context.Context, req *rpcscheduler.PeerTaskRe
return nil, err
}
// Start seed cdn task
go func() {
task.Log.Infof("trigger cdn download task and task status is %s", task.FSM.Current())
peer, endOfPiece, err := s.resource.CDN().TriggerTask(context.Background(), task)
if err != nil {
task.Log.Errorf("trigger cdn download task failed: %v", err)
s.handleTaskFail(ctx, task)
return
}
// Update the task status first to help peer scheduling evaluation and scoring
s.handleTaskSuccess(ctx, task, endOfPiece)
s.handlePeerSuccess(ctx, peer)
}()
// Start trigger cdn task
if s.config.CDN.Enable {
go s.triggerCDNTask(ctx, task)
}
return task, nil
}
@ -409,6 +399,21 @@ func (s *Service) registerPeer(ctx context.Context, req *rpcscheduler.PeerTaskRe
return peer
}
// triggerCDNTask starts trigger cdn task
func (s *Service) triggerCDNTask(ctx context.Context, task *resource.Task) {
task.Log.Infof("trigger cdn download task and task status is %s", task.FSM.Current())
peer, endOfPiece, err := s.resource.CDN().TriggerTask(context.Background(), task)
if err != nil {
task.Log.Errorf("trigger cdn download task failed: %v", err)
s.handleTaskFail(ctx, task)
return
}
// Update the task status first to help peer scheduling evaluation and scoring
s.handleTaskSuccess(ctx, task, endOfPiece)
s.handlePeerSuccess(ctx, peer)
}
// handleBeginOfPiece handles begin of piece
func (s *Service) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
switch peer.FSM.Current() {
@ -501,18 +506,11 @@ func (s *Service) handlePieceFail(ctx context.Context, peer *resource.Peer, piec
fallthrough
case base.Code_CDNTaskNotFound:
s.handlePeerFail(ctx, parent)
go func() {
parent.Log.Info("cdn restart seed task")
cdnPeer, endOfPiece, err := s.resource.CDN().TriggerTask(context.Background(), parent.Task)
if err != nil {
peer.Log.Errorf("retrigger task failed: %v", err)
s.handleTaskFail(ctx, parent.Task)
return
}
s.handleTaskSuccess(ctx, cdnPeer.Task, endOfPiece)
s.handlePeerSuccess(ctx, cdnPeer)
}()
// Start trigger cdn task
if s.config.CDN.Enable {
go s.triggerCDNTask(ctx, parent.Task)
}
default:
}

View File

@ -1212,12 +1212,19 @@ func TestService_LeaveTask(t *testing.T) {
func TestService_registerTask(t *testing.T) {
tests := []struct {
name string
req *rpcscheduler.PeerTaskRequest
run func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder)
name string
config *config.Config
req *rpcscheduler.PeerTaskRequest
run func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder)
}{
{
name: "task already exists and state is TaskStateRunning",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: true,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
@ -1239,6 +1246,12 @@ func TestService_registerTask(t *testing.T) {
},
{
name: "task already exists and state is TaskStateSucceeded",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: true,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
@ -1260,6 +1273,12 @@ func TestService_registerTask(t *testing.T) {
},
{
name: "task state is TaskStatePending",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: true,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
@ -1285,6 +1304,12 @@ func TestService_registerTask(t *testing.T) {
},
{
name: "task state is TaskStateFailed",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: true,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
@ -1310,6 +1335,12 @@ func TestService_registerTask(t *testing.T) {
},
{
name: "task state is TaskStatePending, but trigger cdn failed",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: true,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
@ -1335,6 +1366,12 @@ func TestService_registerTask(t *testing.T) {
},
{
name: "task state is TaskStateFailed, but trigger cdn failed",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: true,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
@ -1352,6 +1389,57 @@ func TestService_registerTask(t *testing.T) {
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(mockPeer, &rpcscheduler.PeerResult{}, errors.New("foo")).Times(1),
)
task, err := svc.registerTask(context.Background(), req)
assert := assert.New(t)
assert.NoError(err)
assert.EqualValues(mockTask, task)
},
},
{
name: "task state is TaskStatePending and disable cdn",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: false,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
},
run: func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
mockTask.FSM.SetState(resource.TaskStatePending)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
)
task, err := svc.registerTask(context.Background(), req)
assert := assert.New(t)
assert.NoError(err)
assert.EqualValues(mockTask, task)
},
},
{
name: "task state is TaskStateFailed and disable cdn",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{
Enable: false,
},
},
req: &rpcscheduler.PeerTaskRequest{
Url: mockTaskURL,
UrlMeta: mockTaskURLMeta,
},
run: func(t *testing.T, svc *Service, req *rpcscheduler.PeerTaskRequest, mockTask *resource.Task, mockPeer *resource.Peer, taskManager resource.TaskManager, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
mockTask.FSM.SetState(resource.TaskStateFailed)
gomock.InOrder(
mr.TaskManager().Return(taskManager).Times(1),
mt.LoadOrStore(gomock.Any()).Return(mockTask, true).Times(1),
)
task, err := svc.registerTask(context.Background(), req)
assert := assert.New(t)
assert.NoError(err)
@ -1367,7 +1455,8 @@ func TestService_registerTask(t *testing.T) {
scheduler := mocks.NewMockScheduler(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
svc := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig)
svc := New(tc.config, res, scheduler, dynconfig)
taskManager := resource.NewMockTaskManager(ctl)
mockHost := resource.NewHost(mockRawHost)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
@ -1466,6 +1555,69 @@ func TestService_registerHost(t *testing.T) {
}
}
func TestService_triggerCDNTask(t *testing.T) {
tests := []struct {
name string
mock func(task *resource.Task, peer *resource.Peer, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mc *resource.MockCDNMockRecorder)
expect func(t *testing.T, task *resource.Task, peer *resource.Peer)
}{
{
name: "trigger cdn task",
mock: func(task *resource.Task, peer *resource.Peer, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mc *resource.MockCDNMockRecorder) {
task.FSM.SetState(resource.TaskStateRunning)
peer.FSM.SetState(resource.PeerStateRunning)
gomock.InOrder(
mr.CDN().Return(cdn).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any()).Return(peer, &rpcscheduler.PeerResult{
TotalPieceCount: 3,
ContentLength: 1024,
}, nil).Times(1),
)
},
expect: func(t *testing.T, task *resource.Task, peer *resource.Peer) {
assert := assert.New(t)
assert.True(task.FSM.Is(resource.TaskStateSucceeded))
assert.Equal(task.TotalPieceCount.Load(), int32(3))
assert.Equal(task.ContentLength.Load(), int64(1024))
assert.True(peer.FSM.Is(resource.PeerStateSucceeded))
},
},
{
name: "trigger cdn task failed",
mock: func(task *resource.Task, peer *resource.Peer, cdn resource.CDN, mr *resource.MockResourceMockRecorder, mc *resource.MockCDNMockRecorder) {
task.FSM.SetState(resource.TaskStateRunning)
gomock.InOrder(
mr.CDN().Return(cdn).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any()).Return(peer, &rpcscheduler.PeerResult{}, errors.New("foo")).Times(1),
)
},
expect: func(t *testing.T, task *resource.Task, peer *resource.Peer) {
assert := assert.New(t)
assert.True(task.FSM.Is(resource.TaskStateFailed))
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduler := mocks.NewMockScheduler(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
cdn := resource.NewMockCDN(ctl)
mockHost := resource.NewHost(mockRawHost)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskBackToSourceLimit, mockTaskURLMeta)
peer := resource.NewPeer(mockPeerID, task, mockHost)
svc := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig)
tc.mock(task, peer, cdn, res.EXPECT(), cdn.EXPECT())
svc.triggerCDNTask(context.Background(), task)
tc.expect(t, task, peer)
})
}
}
func TestService_handleBeginOfPiece(t *testing.T) {
tests := []struct {
name string
@ -1688,13 +1840,19 @@ func TestService_handlePieceFail(t *testing.T) {
tests := []struct {
name string
config *config.Config
piece *rpcscheduler.PieceResult
peer *resource.Peer
parent *resource.Peer
run func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder)
}{
{
name: "peer state is PeerStateBackToSource",
name: "peer state is PeerStateBackToSource",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{},
peer: resource.NewPeer(mockPeerID, mockTask, mockHost),
parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost),
@ -1708,6 +1866,11 @@ func TestService_handlePieceFail(t *testing.T) {
},
{
name: "can not found parent",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientWaitPieceReady,
DstPid: mockCDNPeerID,
@ -1729,6 +1892,11 @@ func TestService_handlePieceFail(t *testing.T) {
},
{
name: "piece result code is Code_ClientPieceDownloadFail and parent state set PeerEventDownloadFailed",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientPieceDownloadFail,
DstPid: mockCDNPeerID,
@ -1754,6 +1922,11 @@ func TestService_handlePieceFail(t *testing.T) {
},
{
name: "piece result code is Code_PeerTaskNotFound and parent state set PeerEventDownloadFailed",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_PeerTaskNotFound,
DstPid: mockCDNPeerID,
@ -1779,6 +1952,11 @@ func TestService_handlePieceFail(t *testing.T) {
},
{
name: "piece result code is Code_ClientPieceNotFound and parent is not CDN",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientPieceNotFound,
DstPid: mockCDNPeerID,
@ -1803,6 +1981,11 @@ func TestService_handlePieceFail(t *testing.T) {
},
{
name: "piece result code is Code_CDNError and parent state set PeerEventDownloadFailed",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_CDNError,
DstPid: mockCDNPeerID,
@ -1828,6 +2011,11 @@ func TestService_handlePieceFail(t *testing.T) {
},
{
name: "piece result code is Code_CDNTaskDownloadFail and parent state set PeerEventDownloadFailed",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_CDNTaskDownloadFail,
DstPid: mockCDNPeerID,
@ -1853,6 +2041,11 @@ func TestService_handlePieceFail(t *testing.T) {
},
{
name: "piece result code is Code_ClientPieceRequestFail",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientPieceRequestFail,
DstPid: mockCDNPeerID,
@ -1876,8 +2069,78 @@ func TestService_handlePieceFail(t *testing.T) {
assert.True(parent.FSM.Is(resource.PeerStateRunning))
},
},
{
name: "piece result code is Code_CDNTaskNotFound",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_CDNTaskNotFound,
DstPid: mockCDNPeerID,
},
peer: resource.NewPeer(mockPeerID, mockTask, mockHost),
parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost),
run: func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
var wg sync.WaitGroup
wg.Add(2)
defer wg.Wait()
peer.FSM.SetState(resource.PeerStateRunning)
parent.FSM.SetState(resource.PeerStateRunning)
blocklist := set.NewSafeSet()
blocklist.Add(parent.ID)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
mr.CDN().Do(func() { wg.Done() }).Return(cdn).Times(1),
mc.TriggerTask(gomock.Any(), gomock.Any()).Do(func(ctx context.Context, task *resource.Task) { wg.Done() }).Return(peer, &rpcscheduler.PeerResult{}, nil).Times(1),
)
svc.handlePieceFail(context.Background(), peer, piece)
assert := assert.New(t)
assert.True(parent.FSM.Is(resource.PeerStateFailed))
},
},
{
name: "piece result code is Code_CDNTaskNotFound and disable cdn",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: false},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_CDNTaskNotFound,
DstPid: mockCDNPeerID,
},
peer: resource.NewPeer(mockPeerID, mockTask, mockHost),
parent: resource.NewPeer(mockCDNPeerID, mockTask, mockHost),
run: func(t *testing.T, svc *Service, peer *resource.Peer, parent *resource.Peer, piece *rpcscheduler.PieceResult, peerManager resource.PeerManager, cdn resource.CDN, ms *mocks.MockSchedulerMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, mc *resource.MockCDNMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
parent.FSM.SetState(resource.PeerStateRunning)
blocklist := set.NewSafeSet()
blocklist.Add(parent.ID)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(parent.ID)).Return(parent, true).Times(1),
ms.ScheduleParent(gomock.Any(), gomock.Eq(peer), gomock.Eq(blocklist)).Return().Times(1),
)
svc.handlePieceFail(context.Background(), peer, piece)
assert := assert.New(t)
assert.True(peer.FSM.Is(resource.PeerStateRunning))
assert.True(parent.FSM.Is(resource.PeerStateFailed))
},
},
{
name: "piece result code is unknow",
config: &config.Config{
Scheduler: mockSchedulerConfig,
CDN: &config.CDNConfig{Enable: true},
Metrics: &config.MetricsConfig{EnablePeerHost: true},
},
piece: &rpcscheduler.PieceResult{
Code: base.Code_ClientPieceRequestFail,
DstPid: mockCDNPeerID,
@ -1912,7 +2175,7 @@ func TestService_handlePieceFail(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
peerManager := resource.NewMockPeerManager(ctl)
cdn := resource.NewMockCDN(ctl)
svc := New(&config.Config{Scheduler: mockSchedulerConfig, Metrics: &config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig)
svc := New(tc.config, res, scheduler, dynconfig)
tc.run(t, svc, tc.peer, tc.parent, tc.piece, peerManager, cdn, scheduler.EXPECT(), res.EXPECT(), peerManager.EXPECT(), cdn.EXPECT())
})