diff --git a/go.mod b/go.mod index af31c075f..9085951ad 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.123 + d7y.io/api/v2 v2.0.126 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.8 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 60eb0adc1..8b703a904 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -d7y.io/api/v2 v2.0.123 h1:GGJy9DIaVYDHpS5PfiW2/Ad93jS4237uhmZx/lH9Zhc= -d7y.io/api/v2 v2.0.123/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U= +d7y.io/api/v2 v2.0.126 h1:mlVZHBJwOQL9PZcnsVN9Etcru+rr2nbDXBuDC8v0PhY= +d7y.io/api/v2 v2.0.126/go.mod h1:5n5c+0oceb9/Ih4xL6UNRwQEZhBztiHMf4ghb+wGx4U= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index 373a45fb0..e3a77685e 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -85,7 +85,7 @@ const ( DefaultSchedulerTaskGCInterval = 30 * time.Minute // DefaultSchedulerHostGCInterval is default interval for host gc. - DefaultSchedulerHostGCInterval = 6 * time.Hour + DefaultSchedulerHostGCInterval = 5 * time.Minute // DefaultSchedulerHostTTL is default ttl for host. DefaultSchedulerHostTTL = 1 * time.Hour diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index b0e35bba3..36f3ae27d 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -122,6 +122,13 @@ func WithBuild(build Build) HostOption { } } +// WithAnnounceInterval sets host's announce interval. +func WithAnnounceInterval(announceInterval time.Duration) HostOption { + return func(h *Host) { + h.AnnounceInterval = announceInterval + } +} + // Host contains content for host. type Host struct { // ID is host id. @@ -178,6 +185,9 @@ type Host struct { // SchedulerClusterID is the scheduler cluster id matched by scopes. SchedulerClusterID uint64 + // AnnounceInterval is the interval between host announces to scheduler. + AnnounceInterval time.Duration + // ConcurrentUploadLimit is concurrent upload limit count. ConcurrentUploadLimit *atomic.Int32 diff --git a/scheduler/resource/host_manager.go b/scheduler/resource/host_manager.go index 344240c90..6bd477112 100644 --- a/scheduler/resource/host_manager.go +++ b/scheduler/resource/host_manager.go @@ -20,6 +20,7 @@ package resource import ( "sync" + "time" "d7y.io/dragonfly/v2/pkg/container/set" pkggc "d7y.io/dragonfly/v2/pkg/gc" @@ -142,7 +143,7 @@ func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*H return hosts } -// Try to reclaim host. +// RunGC tries to reclaim host. func (h *hostManager) RunGC() error { h.Map.Range(func(_, value any) bool { host, ok := value.(*Host) @@ -151,11 +152,22 @@ func (h *hostManager) RunGC() error { return true } + // If the host's elapsed exceeds twice the announcing interval, + // then leave peers in host. + elapsed := time.Since(host.UpdatedAt.Load()) + if host.AnnounceInterval > 0 && elapsed > host.AnnounceInterval*2 { + host.Log.Info("host elapsed exceeds twice the announce interval, causing the host to leave peers") + host.LeavePeers() + return true + } + + // Reclaim the host. if host.PeerCount.Load() == 0 && host.ConcurrentUploadCount.Load() == 0 && host.Type == types.HostTypeNormal { host.Log.Info("host has been reclaimed") h.Delete(host.ID) + return true } return true diff --git a/scheduler/resource/host_manager_test.go b/scheduler/resource/host_manager_test.go index 976657205..4a245b9cb 100644 --- a/scheduler/resource/host_manager_test.go +++ b/scheduler/resource/host_manager_test.go @@ -503,6 +503,30 @@ func TestHostManager_RunGC(t *testing.T) { assert.Equal(host.ID, mockSeedHost.ID) }, }, + { + name: "host elapsed exceeds twice the announce interval", + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) { + assert := assert.New(t) + mockHost.AnnounceInterval = 1 * time.Microsecond + hostManager.Store(mockHost) + mockHost.StorePeer(mockPeer) + err := hostManager.RunGC() + assert.NoError(err) + + mockHost.Peers.Range(func(_, value any) bool { + peer := value.(*Peer) + assert.True(peer.FSM.Is(PeerStateLeave)) + return true + }) + + host, loaded := hostManager.Load(mockHost.ID) + assert.Equal(loaded, true) + assert.Equal(host.ID, mockHost.ID) + }, + }, } for _, tc := range tests { diff --git a/scheduler/resource/host_test.go b/scheduler/resource/host_test.go index 0f2923d4d..be73e4ed1 100644 --- a/scheduler/resource/host_test.go +++ b/scheduler/resource/host_test.go @@ -126,6 +126,8 @@ var ( Platform: "darwin", } + mockAnnounceInterval = 5 * time.Minute + mockHostID = idgen.HostIDV2("127.0.0.1", "foo") mockSeedHostID = idgen.HostIDV2("127.0.0.1", "bar") mockHostLocation = "baz" @@ -152,6 +154,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -176,6 +179,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -225,6 +229,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(1)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -250,6 +255,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -276,6 +282,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.OS, "linux") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -302,6 +309,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.Platform, "ubuntu") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -328,6 +336,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.PlatformFamily, "debian") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -353,6 +362,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.PlatformVersion, "22.04") + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -379,6 +389,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.Equal(host.KernelVersion, "5.15.0-27-generic") assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -405,6 +416,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.CPU, mockCPU) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -431,6 +443,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Memory, mockMemory) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -457,6 +470,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Network, mockNetwork) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -483,6 +497,7 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Disk, mockDisk) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -509,6 +524,33 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.ObjectStoragePort, int32(0)) assert.EqualValues(host.Build, mockBuild) assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, time.Duration(0)) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set announce interval", + rawHost: mockRawHost, + options: []HostOption{WithAnnounceInterval(mockAnnounceInterval)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.ObjectStoragePort, int32(0)) + assert.Equal(host.SchedulerClusterID, uint64(0)) + assert.Equal(host.AnnounceInterval, 5*time.Minute) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 8ce1c58dd..1f792cf02 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -183,6 +183,8 @@ var ( Platform: "darwin", } + mockInterval = durationpb.New(5 * time.Minute).AsDuration() + mockPeerHost = &schedulerv1.PeerHost{ Id: mockHostID, Ip: "127.0.0.1", diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 1791a1959..170386737 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -584,6 +584,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID))) } + if req.GetInterval() != nil { + options = append(options, resource.WithAnnounceInterval(req.GetInterval().AsDuration())) + } + host = resource.NewHost( req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()), @@ -673,6 +677,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ } } + if req.GetInterval() != nil { + host.AnnounceInterval = req.GetInterval().AsDuration() + } + return nil } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 74902b365..ce8546681 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -28,6 +28,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "go.uber.org/mock/gomock" @@ -488,6 +489,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -512,6 +514,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -592,6 +595,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -616,6 +620,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -696,6 +701,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -721,6 +727,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0)) @@ -796,6 +803,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { Platform: &mockBuild.Platform, }, }, + Interval: durationpb.New(5 * time.Minute), }, run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( @@ -821,6 +829,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { assert.EqualValues(host.Network, mockNetwork) assert.EqualValues(host.Disk, mockDisk) assert.EqualValues(host.Build, mockBuild) + assert.EqualValues(host.AnnounceInterval, mockInterval) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) assert.Equal(host.UploadCount.Load(), int64(0))