feat: scheduler add default biz tag (#1164)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-03-16 14:01:38 +08:00
parent 010887ab66
commit 603db21e17
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
3 changed files with 13 additions and 6 deletions

View File

@ -35,11 +35,11 @@ import (
)
const (
// Default value of biz tag
DefaultBizTag = "unknow"
// Download tiny file timeout
downloadTinyFileContextTimeout = 2 * time.Minute
// Default value of biz tag
defaultBizTag = "unknow"
)
const (
@ -159,7 +159,7 @@ type Peer struct {
func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p := &Peer{
ID: id,
BizTag: defaultBizTag,
BizTag: DefaultBizTag,
Pieces: &bitset.BitSet{},
pieceCosts: []int64{},
Stream: &atomic.Value{},

View File

@ -78,7 +78,7 @@ func (s *Service) RegisterPeerTask(ctx context.Context, req *rpcscheduler.PeerTa
}
host := s.registerHost(ctx, req)
peer := s.registerPeer(ctx, req, task, host)
peer.Log.Infof("register peer task request: %#v", req)
peer.Log.Infof("register peer task request: %#v %#v %#v", req, req.UrlMeta, req.HostLoad)
// Task has been successful
if task.FSM.Is(resource.TaskStateSucceeded) {
@ -435,7 +435,12 @@ func (s *Service) registerHost(ctx context.Context, req *rpcscheduler.PeerTaskRe
// registerPeer creates a new peer or reuses a previous peer
func (s *Service) registerPeer(ctx context.Context, req *rpcscheduler.PeerTaskRequest, task *resource.Task, host *resource.Host) *resource.Peer {
peer, loaded := s.resource.PeerManager().LoadOrStore(resource.NewPeer(req.PeerId, task, host, resource.WithBizTag(req.UrlMeta.Tag)))
var options []resource.PeerOption
if req.UrlMeta.Tag != "" {
options = append(options, resource.WithBizTag(req.UrlMeta.Tag))
}
peer, loaded := s.resource.PeerManager().LoadOrStore(resource.NewPeer(req.PeerId, task, host, options...))
if !loaded {
peer.Log.Info("create new peer")
return peer

View File

@ -1778,6 +1778,7 @@ func TestService_registerPeer(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(peer.ID, mockPeerID)
assert.Equal(peer.BizTag, resource.DefaultBizTag)
},
},
{
@ -1795,6 +1796,7 @@ func TestService_registerPeer(t *testing.T) {
expect: func(t *testing.T, peer *resource.Peer) {
assert := assert.New(t)
assert.Equal(peer.ID, mockPeerID)
assert.Equal(peer.BizTag, resource.DefaultBizTag)
},
},
}