diff --git a/go.mod b/go.mod index 82d95fca5..f146cab4a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.156 + d7y.io/api/v2 v2.0.157 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.8 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index e42dda4d8..74cef5ce0 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -d7y.io/api/v2 v2.0.156 h1:KjKnhCeSQaxugUCiZcq5BkulmKgqKHBh0iZDvU5eDZM= -d7y.io/api/v2 v2.0.156/go.mod h1:wvA3IIh6Gjwsh1HsxujyELTh4cqlNtXIaYxDeVobN/w= +d7y.io/api/v2 v2.0.157 h1:nzVAY1TBazmYzXwbNuGl//leUvsyYEt/fqObNEJ9CEw= +d7y.io/api/v2 v2.0.157/go.mod h1:wvA3IIh6Gjwsh1HsxujyELTh4cqlNtXIaYxDeVobN/w= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index 36f3ae27d..7256939e6 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -52,6 +52,13 @@ func WithConcurrentUploadLimit(limit int32) HostOption { } } +// WithDisableShared sets host's DisableShared. +func WithDisableShared(disableShared bool) HostOption { + return func(h *Host) { + h.DisableShared = disableShared + } +} + // WithOS sets host's os. func WithOS(os string) HostOption { return func(h *Host) { @@ -152,6 +159,10 @@ type Host struct { // ObjectStoragePort is object storage port. ObjectStoragePort int32 + // DisableShared is whether the host is disabled for + // shared with other peers. + DisableShared bool + // Host OS. OS string @@ -363,6 +374,7 @@ func NewHost( Hostname: hostname, Port: port, DownloadPort: downloadPort, + DisableShared: false, ConcurrentUploadLimit: atomic.NewInt32(int32(concurrentUploadLimit)), ConcurrentUploadCount: atomic.NewInt32(0), UploadCount: atomic.NewInt64(0), diff --git a/scheduler/resource/peer_manager.go b/scheduler/resource/peer_manager.go index 2976b165b..c7eaa5e83 100644 --- a/scheduler/resource/peer_manager.go +++ b/scheduler/resource/peer_manager.go @@ -167,6 +167,18 @@ func (p *peerManager) RunGC() error { return true } + // If host is disabled shared, then set the peer state to PeerStateLeave and then delete peer. + // Avoid the disabled shared host to be scheduled, and store the unused peer in the peer manager. + if peer.Host.DisableShared { + peer.Log.Info("peer host is disabled shared, causing the peer to leave") + if err := peer.FSM.Event(context.Background(), PeerEventLeave); err != nil { + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + return true + } + + return true + } + // If the peer's elapsed of downloading piece exceeds the pieceDownloadTimeout, // then sets the peer state to PeerStateLeave and then delete peer. if peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateBackToSource) { diff --git a/scheduler/resource/peer_manager_test.go b/scheduler/resource/peer_manager_test.go index 7437491b2..8598a686c 100644 --- a/scheduler/resource/peer_manager_test.go +++ b/scheduler/resource/peer_manager_test.go @@ -347,6 +347,35 @@ func TestPeerManager_RunGC(t *testing.T) { assert.Equal(peer.FSM.Current(), PeerStateLeave) }, }, + { + name: "peer reclaimed with disabled shared", + gcConfig: &config.GCConfig{ + PieceDownloadTimeout: 5 * time.Minute, + PeerGCInterval: 1 * time.Second, + PeerTTL: 1 * time.Microsecond, + HostTTL: 10 * time.Second, + }, + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, peerManager PeerManager, mockHost *Host, mockTask *Task, mockPeer *Peer) { + assert := assert.New(t) + peerManager.Store(mockPeer) + mockPeer.Host.DisableShared = true + err := peerManager.RunGC() + assert.NoError(err) + + peer, loaded := peerManager.Load(mockPeer.ID) + assert.Equal(loaded, true) + assert.Equal(peer.FSM.Current(), PeerStateLeave) + + err = peerManager.RunGC() + assert.NoError(err) + + _, loaded = peerManager.Load(mockPeer.ID) + assert.Equal(loaded, false) + }, + }, { name: "peer download piece timeout and peer state is PeerStateRunning", gcConfig: &config.GCConfig{ diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index e63e8d48a..e9af5195d 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -516,6 +516,12 @@ func (s *scheduling) filterCandidateParents(peer *resource.Peer, blocklist set.S continue } + // Candidate parent is disable shared. + if candidateParent.Host.DisableShared { + peer.Log.Debugf("parent %s host %s is not selected because it is disable shared", candidateParent.ID, candidateParent.Host.ID) + continue + } + // Candidate parent host is not allowed to be the same as the peer host, // because dfdaemon cannot handle the situation // where two tasks are downloading and downloading each other. diff --git a/scheduler/scheduling/scheduling_test.go b/scheduler/scheduling/scheduling_test.go index 27a9b935c..008975254 100644 --- a/scheduler/scheduling/scheduling_test.go +++ b/scheduler/scheduling/scheduling_test.go @@ -838,6 +838,22 @@ func TestScheduling_FindCandidateParents(t *testing.T) { assert.False(ok) }, }, + { + name: "parent is disabled share data with other peers", + mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { + peer.FSM.SetState(resource.PeerStateReceivedNormal) + mockPeers[0].FSM.SetState(resource.PeerStateRunning) + peer.Task.StorePeer(peer) + peer.Task.StorePeer(mockPeers[0]) + mockPeers[0].Host.DisableShared = true + + md.GetSchedulerClusterConfig().Return(types.SchedulerClusterConfig{}, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, peer *resource.Peer, mockPeers []*resource.Peer, parents []*resource.Peer, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, { name: "find back-to-source parent", mock: func(peer *resource.Peer, mockPeers []*resource.Peer, blocklist set.SafeSet[string], md *configmocks.MockDynconfigInterfaceMockRecorder) { diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index c082c57bf..4e49eb65d 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -513,6 +513,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ host, loaded := v.resource.HostManager().Load(req.Host.GetId()) if !loaded { options := []resource.HostOption{ + resource.WithDisableShared(req.Host.GetDisableShared()), resource.WithOS(req.Host.GetOs()), resource.WithPlatform(req.Host.GetPlatform()), resource.WithPlatformFamily(req.Host.GetPlatformFamily()), @@ -610,6 +611,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ host.Port = req.Host.GetPort() host.DownloadPort = req.Host.GetDownloadPort() host.Type = types.HostType(req.Host.GetType()) + host.DisableShared = req.Host.GetDisableShared() host.OS = req.Host.GetOs() host.Platform = req.Host.GetPlatform() host.PlatformFamily = req.Host.GetPlatformFamily() diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index eeef14cc3..1fdfd2ceb 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -435,6 +435,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Ip: "127.0.0.1", Port: 8003, DownloadPort: 8001, + DisableShared: true, Os: "darwin", Platform: "darwin", PlatformFamily: "Standalone Workstation", @@ -505,6 +506,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.Equal(host.IP, req.Host.Ip) assert.Equal(host.Port, req.Host.Port) assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) assert.Equal(host.OS, req.Host.Os) assert.Equal(host.Platform, req.Host.Platform) assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) @@ -541,6 +543,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Ip: "127.0.0.1", Port: 8003, DownloadPort: 8001, + DisableShared: false, Os: "darwin", Platform: "darwin", PlatformFamily: "Standalone Workstation", @@ -611,6 +614,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.Equal(host.IP, req.Host.Ip) assert.Equal(host.Port, req.Host.Port) assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) assert.Equal(host.OS, req.Host.Os) assert.Equal(host.Platform, req.Host.Platform) assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) @@ -647,6 +651,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Ip: "127.0.0.1", Port: 8003, DownloadPort: 8001, + DisableShared: true, Os: "darwin", Platform: "darwin", PlatformFamily: "Standalone Workstation", @@ -718,6 +723,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.Equal(host.IP, req.Host.Ip) assert.Equal(host.Port, req.Host.Port) assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) assert.Equal(host.OS, req.Host.Os) assert.Equal(host.Platform, req.Host.Platform) assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) @@ -749,6 +755,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Ip: "127.0.0.1", Port: 8003, DownloadPort: 8001, + DisableShared: false, Os: "darwin", Platform: "darwin", PlatformFamily: "Standalone Workstation", @@ -820,6 +827,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.Equal(host.IP, req.Host.Ip) assert.Equal(host.Port, req.Host.Port) assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.DisableShared, req.Host.DisableShared) assert.Equal(host.OS, req.Host.Os) assert.Equal(host.Platform, req.Host.Platform) assert.Equal(host.PlatformVersion, req.Host.PlatformVersion)