From 58959be0c216ea4e888012a88a5adbd07cb05c08 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 29 Oct 2024 22:24:18 +0800 Subject: [PATCH] feat: implement delete persistent cache task in scheduler (#3619) Signed-off-by: Gaius --- client/config/mocks/dynconfig_mock.go | 2 + .../daemon/announcer/mocks/announcer_mock.go | 1 + .../objectstorage/mocks/objectstorage_mock.go | 1 + client/daemon/peer/peertask_manager_mock.go | 3 + client/daemon/peer/piece_downloader_mock.go | 1 + client/daemon/peer/piece_manager_mock.go | 1 + .../daemon/proxy/mocks/proxy_manager_mock.go | 2 + .../daemon/rpcserver/mocks/rpcserver_mock.go | 2 + .../storage/mocks/stroage_manager_mock.go | 3 + .../upload/mocks/upload_manager_mock.go | 1 + client/dfstore/mocks/dfstore_mock.go | 1 + client/util/mocks/keepalive_mock.go | 1 + internal/dynconfig/mocks/dynconfig_mock.go | 1 + .../dynconfig/mocks/manager_client_mock.go | 1 + manager/auth/oauth/mocks/oauth_mock.go | 1 + manager/job/mocks/gc_mock.go | 1 + manager/job/mocks/preheat_mock.go | 1 + manager/job/mocks/sync_peers_mock.go | 1 + manager/job/mocks/task_mock.go | 1 + manager/searcher/mocks/searcher_mock.go | 1 + manager/service/mocks/service_mock.go | 1 + pkg/cache/cache_mock.go | 1 + pkg/container/set/mocks/safe_set_mock.go | 1 + pkg/container/set/mocks/set_mock.go | 1 + pkg/dfpath/mocks/dfpath_mock.go | 1 + pkg/digest/mocks/digest_reader_mock.go | 1 + pkg/gc/gc_mock.go | 1 + pkg/gc/logger_mock.go | 1 + pkg/gc/runner_mock.go | 1 + pkg/graph/dag/mocks/dag_mock.go | 1 + pkg/graph/dg/mocks/dg_mock.go | 1 + pkg/objectstorage/mocks/objectstorage_mock.go | 1 + pkg/rpc/cdnsystem/client/mocks/client_mock.go | 1 + .../dfdaemon/client/mocks/client_v1_mock.go | 1 + .../dfdaemon/client/mocks/client_v2_mock.go | 1 + pkg/rpc/health/client/mocks/client_mock.go | 1 + .../manager/client/mocks/client_v1_mock.go | 1 + .../manager/client/mocks/client_v2_mock.go | 1 + .../scheduler/client/mocks/client_v1_mock.go | 1 + .../scheduler/client/mocks/client_v2_mock.go | 1 + pkg/source/mocks/mock_source_client.go | 5 + scheduler/announcer/mocks/announcer_mock.go | 1 + scheduler/config/config.go | 8 ++ scheduler/config/mocks/dynconfig_mock.go | 2 + scheduler/job/mocks/job_mock.go | 1 + .../resource/persistentcache/host_manager.go | 17 ++- .../persistentcache/host_manager_mock.go | 13 +- .../resource/persistentcache/peer_manager.go | 128 ++++++++++++++++-- .../persistentcache/peer_manager_mock.go | 30 ++++ .../resource/persistentcache/resource.go | 7 +- .../resource/persistentcache/resource_mock.go | 1 + .../resource/persistentcache/task_manager.go | 20 ++- .../persistentcache/task_manager_mock.go | 7 +- .../resource/standard/host_manager_mock.go | 1 + .../resource/standard/peer_manager_mock.go | 1 + scheduler/resource/standard/resource_mock.go | 1 + .../standard/seed_peer_client_mock.go | 1 + scheduler/resource/standard/seed_peer_mock.go | 1 + .../resource/standard/task_manager_mock.go | 1 + scheduler/scheduler.go | 24 +++- scheduler/scheduling/mocks/scheduling_mock.go | 1 + scheduler/service/service_v2.go | 3 +- scheduler/storage/mocks/storage_mock.go | 1 + 63 files changed, 277 insertions(+), 45 deletions(-) diff --git a/client/config/mocks/dynconfig_mock.go b/client/config/mocks/dynconfig_mock.go index 3e81ac71b..3ae8ab700 100644 --- a/client/config/mocks/dynconfig_mock.go +++ b/client/config/mocks/dynconfig_mock.go @@ -22,6 +22,7 @@ import ( type MockDynconfig struct { ctrl *gomock.Controller recorder *MockDynconfigMockRecorder + isgomock struct{} } // MockDynconfigMockRecorder is the mock recorder for MockDynconfig. @@ -226,6 +227,7 @@ func (mr *MockDynconfigMockRecorder) Stop() *gomock.Call { type MockObserver struct { ctrl *gomock.Controller recorder *MockObserverMockRecorder + isgomock struct{} } // MockObserverMockRecorder is the mock recorder for MockObserver. diff --git a/client/daemon/announcer/mocks/announcer_mock.go b/client/daemon/announcer/mocks/announcer_mock.go index bfa333c14..99db30f6d 100644 --- a/client/daemon/announcer/mocks/announcer_mock.go +++ b/client/daemon/announcer/mocks/announcer_mock.go @@ -19,6 +19,7 @@ import ( type MockAnnouncer struct { ctrl *gomock.Controller recorder *MockAnnouncerMockRecorder + isgomock struct{} } // MockAnnouncerMockRecorder is the mock recorder for MockAnnouncer. diff --git a/client/daemon/objectstorage/mocks/objectstorage_mock.go b/client/daemon/objectstorage/mocks/objectstorage_mock.go index 8840a1adb..270831ed3 100644 --- a/client/daemon/objectstorage/mocks/objectstorage_mock.go +++ b/client/daemon/objectstorage/mocks/objectstorage_mock.go @@ -20,6 +20,7 @@ import ( type MockObjectStorage struct { ctrl *gomock.Controller recorder *MockObjectStorageMockRecorder + isgomock struct{} } // MockObjectStorageMockRecorder is the mock recorder for MockObjectStorage. diff --git a/client/daemon/peer/peertask_manager_mock.go b/client/daemon/peer/peertask_manager_mock.go index 7a1daad83..a415cd203 100644 --- a/client/daemon/peer/peertask_manager_mock.go +++ b/client/daemon/peer/peertask_manager_mock.go @@ -26,6 +26,7 @@ import ( type MockTaskManager struct { ctrl *gomock.Controller recorder *MockTaskManagerMockRecorder + isgomock struct{} } // MockTaskManagerMockRecorder is the mock recorder for MockTaskManager. @@ -183,6 +184,7 @@ func (mr *MockTaskManagerMockRecorder) Subscribe(request any) *gomock.Call { type MockTask struct { ctrl *gomock.Controller recorder *MockTaskMockRecorder + isgomock struct{} } // MockTaskMockRecorder is the mock recorder for MockTask. @@ -416,6 +418,7 @@ func (mr *MockTaskMockRecorder) UpdateSourceErrorStatus(st any) *gomock.Call { type MockLogger struct { ctrl *gomock.Controller recorder *MockLoggerMockRecorder + isgomock struct{} } // MockLoggerMockRecorder is the mock recorder for MockLogger. diff --git a/client/daemon/peer/piece_downloader_mock.go b/client/daemon/peer/piece_downloader_mock.go index ea525d7b4..742d4aa0d 100644 --- a/client/daemon/peer/piece_downloader_mock.go +++ b/client/daemon/peer/piece_downloader_mock.go @@ -21,6 +21,7 @@ import ( type MockPieceDownloader struct { ctrl *gomock.Controller recorder *MockPieceDownloaderMockRecorder + isgomock struct{} } // MockPieceDownloaderMockRecorder is the mock recorder for MockPieceDownloader. diff --git a/client/daemon/peer/piece_manager_mock.go b/client/daemon/peer/piece_manager_mock.go index e39759d8f..2677ab832 100644 --- a/client/daemon/peer/piece_manager_mock.go +++ b/client/daemon/peer/piece_manager_mock.go @@ -25,6 +25,7 @@ import ( type MockPieceManager struct { ctrl *gomock.Controller recorder *MockPieceManagerMockRecorder + isgomock struct{} } // MockPieceManagerMockRecorder is the mock recorder for MockPieceManager. diff --git a/client/daemon/proxy/mocks/proxy_manager_mock.go b/client/daemon/proxy/mocks/proxy_manager_mock.go index 25b62813b..6b4d3d5a8 100644 --- a/client/daemon/proxy/mocks/proxy_manager_mock.go +++ b/client/daemon/proxy/mocks/proxy_manager_mock.go @@ -21,6 +21,7 @@ import ( type MockManager struct { ctrl *gomock.Controller recorder *MockManagerMockRecorder + isgomock struct{} } // MockManagerMockRecorder is the mock recorder for MockManager. @@ -112,6 +113,7 @@ func (mr *MockManagerMockRecorder) Watch(arg0 any) *gomock.Call { type MockConfigWatcher struct { ctrl *gomock.Controller recorder *MockConfigWatcherMockRecorder + isgomock struct{} } // MockConfigWatcherMockRecorder is the mock recorder for MockConfigWatcher. diff --git a/client/daemon/rpcserver/mocks/rpcserver_mock.go b/client/daemon/rpcserver/mocks/rpcserver_mock.go index 91b7bc90f..b0aae60ad 100644 --- a/client/daemon/rpcserver/mocks/rpcserver_mock.go +++ b/client/daemon/rpcserver/mocks/rpcserver_mock.go @@ -23,6 +23,7 @@ import ( type MockServer struct { ctrl *gomock.Controller recorder *MockServerMockRecorder + isgomock struct{} } // MockServerMockRecorder is the mock recorder for MockServer. @@ -124,6 +125,7 @@ func (mr *MockServerMockRecorder) Stop() *gomock.Call { type MockResultSender struct { ctrl *gomock.Controller recorder *MockResultSenderMockRecorder + isgomock struct{} } // MockResultSenderMockRecorder is the mock recorder for MockResultSender. diff --git a/client/daemon/storage/mocks/stroage_manager_mock.go b/client/daemon/storage/mocks/stroage_manager_mock.go index 80c7f0aeb..d1f7c40bb 100644 --- a/client/daemon/storage/mocks/stroage_manager_mock.go +++ b/client/daemon/storage/mocks/stroage_manager_mock.go @@ -26,6 +26,7 @@ import ( type MockTaskStorageDriver struct { ctrl *gomock.Controller recorder *MockTaskStorageDriverMockRecorder + isgomock struct{} } // MockTaskStorageDriverMockRecorder is the mock recorder for MockTaskStorageDriver. @@ -197,6 +198,7 @@ func (mr *MockTaskStorageDriverMockRecorder) WritePiece(ctx, req any) *gomock.Ca type MockReclaimer struct { ctrl *gomock.Controller recorder *MockReclaimerMockRecorder + isgomock struct{} } // MockReclaimerMockRecorder is the mock recorder for MockReclaimer. @@ -260,6 +262,7 @@ func (mr *MockReclaimerMockRecorder) Reclaim() *gomock.Call { type MockManager struct { ctrl *gomock.Controller recorder *MockManagerMockRecorder + isgomock struct{} } // MockManagerMockRecorder is the mock recorder for MockManager. diff --git a/client/daemon/upload/mocks/upload_manager_mock.go b/client/daemon/upload/mocks/upload_manager_mock.go index 27113f76a..de3d84a60 100644 --- a/client/daemon/upload/mocks/upload_manager_mock.go +++ b/client/daemon/upload/mocks/upload_manager_mock.go @@ -20,6 +20,7 @@ import ( type MockManager struct { ctrl *gomock.Controller recorder *MockManagerMockRecorder + isgomock struct{} } // MockManagerMockRecorder is the mock recorder for MockManager. diff --git a/client/dfstore/mocks/dfstore_mock.go b/client/dfstore/mocks/dfstore_mock.go index 160345601..a48252f40 100644 --- a/client/dfstore/mocks/dfstore_mock.go +++ b/client/dfstore/mocks/dfstore_mock.go @@ -24,6 +24,7 @@ import ( type MockDfstore struct { ctrl *gomock.Controller recorder *MockDfstoreMockRecorder + isgomock struct{} } // MockDfstoreMockRecorder is the mock recorder for MockDfstore. diff --git a/client/util/mocks/keepalive_mock.go b/client/util/mocks/keepalive_mock.go index 3c3cd1336..e79ab7874 100644 --- a/client/util/mocks/keepalive_mock.go +++ b/client/util/mocks/keepalive_mock.go @@ -20,6 +20,7 @@ import ( type MockKeepAlive struct { ctrl *gomock.Controller recorder *MockKeepAliveMockRecorder + isgomock struct{} } // MockKeepAliveMockRecorder is the mock recorder for MockKeepAlive. diff --git a/internal/dynconfig/mocks/dynconfig_mock.go b/internal/dynconfig/mocks/dynconfig_mock.go index 5ffc0ea66..2cb002d55 100644 --- a/internal/dynconfig/mocks/dynconfig_mock.go +++ b/internal/dynconfig/mocks/dynconfig_mock.go @@ -19,6 +19,7 @@ import ( type MockDynconfig[T any] struct { ctrl *gomock.Controller recorder *MockDynconfigMockRecorder[T] + isgomock struct{} } // MockDynconfigMockRecorder is the mock recorder for MockDynconfig. diff --git a/internal/dynconfig/mocks/manager_client_mock.go b/internal/dynconfig/mocks/manager_client_mock.go index 1d421bb11..4ebc19b5c 100644 --- a/internal/dynconfig/mocks/manager_client_mock.go +++ b/internal/dynconfig/mocks/manager_client_mock.go @@ -19,6 +19,7 @@ import ( type MockManagerClient struct { ctrl *gomock.Controller recorder *MockManagerClientMockRecorder + isgomock struct{} } // MockManagerClientMockRecorder is the mock recorder for MockManagerClient. diff --git a/manager/auth/oauth/mocks/oauth_mock.go b/manager/auth/oauth/mocks/oauth_mock.go index 507fcbb8c..597c50f98 100644 --- a/manager/auth/oauth/mocks/oauth_mock.go +++ b/manager/auth/oauth/mocks/oauth_mock.go @@ -21,6 +21,7 @@ import ( type MockOauth struct { ctrl *gomock.Controller recorder *MockOauthMockRecorder + isgomock struct{} } // MockOauthMockRecorder is the mock recorder for MockOauth. diff --git a/manager/job/mocks/gc_mock.go b/manager/job/mocks/gc_mock.go index fd85f2bc4..5c6601589 100644 --- a/manager/job/mocks/gc_mock.go +++ b/manager/job/mocks/gc_mock.go @@ -19,6 +19,7 @@ import ( type MockGC struct { ctrl *gomock.Controller recorder *MockGCMockRecorder + isgomock struct{} } // MockGCMockRecorder is the mock recorder for MockGC. diff --git a/manager/job/mocks/preheat_mock.go b/manager/job/mocks/preheat_mock.go index 40078cdc5..5654f8a8e 100644 --- a/manager/job/mocks/preheat_mock.go +++ b/manager/job/mocks/preheat_mock.go @@ -23,6 +23,7 @@ import ( type MockPreheat struct { ctrl *gomock.Controller recorder *MockPreheatMockRecorder + isgomock struct{} } // MockPreheatMockRecorder is the mock recorder for MockPreheat. diff --git a/manager/job/mocks/sync_peers_mock.go b/manager/job/mocks/sync_peers_mock.go index 9135da7f1..5cf9824e5 100644 --- a/manager/job/mocks/sync_peers_mock.go +++ b/manager/job/mocks/sync_peers_mock.go @@ -20,6 +20,7 @@ import ( type MockSyncPeers struct { ctrl *gomock.Controller recorder *MockSyncPeersMockRecorder + isgomock struct{} } // MockSyncPeersMockRecorder is the mock recorder for MockSyncPeers. diff --git a/manager/job/mocks/task_mock.go b/manager/job/mocks/task_mock.go index 82b59c267..41cc6b700 100644 --- a/manager/job/mocks/task_mock.go +++ b/manager/job/mocks/task_mock.go @@ -23,6 +23,7 @@ import ( type MockTask struct { ctrl *gomock.Controller recorder *MockTaskMockRecorder + isgomock struct{} } // MockTaskMockRecorder is the mock recorder for MockTask. diff --git a/manager/searcher/mocks/searcher_mock.go b/manager/searcher/mocks/searcher_mock.go index 60cf54b99..a234829a6 100644 --- a/manager/searcher/mocks/searcher_mock.go +++ b/manager/searcher/mocks/searcher_mock.go @@ -22,6 +22,7 @@ import ( type MockSearcher struct { ctrl *gomock.Controller recorder *MockSearcherMockRecorder + isgomock struct{} } // MockSearcherMockRecorder is the mock recorder for MockSearcher. diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index 9785b54a8..b0eb8cbdf 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -25,6 +25,7 @@ import ( type MockService struct { ctrl *gomock.Controller recorder *MockServiceMockRecorder + isgomock struct{} } // MockServiceMockRecorder is the mock recorder for MockService. diff --git a/pkg/cache/cache_mock.go b/pkg/cache/cache_mock.go index 41bd52af5..503bae25f 100644 --- a/pkg/cache/cache_mock.go +++ b/pkg/cache/cache_mock.go @@ -21,6 +21,7 @@ import ( type MockCache struct { ctrl *gomock.Controller recorder *MockCacheMockRecorder + isgomock struct{} } // MockCacheMockRecorder is the mock recorder for MockCache. diff --git a/pkg/container/set/mocks/safe_set_mock.go b/pkg/container/set/mocks/safe_set_mock.go index e3be339e6..f3b7cc302 100644 --- a/pkg/container/set/mocks/safe_set_mock.go +++ b/pkg/container/set/mocks/safe_set_mock.go @@ -19,6 +19,7 @@ import ( type MockSafeSet[T comparable] struct { ctrl *gomock.Controller recorder *MockSafeSetMockRecorder[T] + isgomock struct{} } // MockSafeSetMockRecorder is the mock recorder for MockSafeSet. diff --git a/pkg/container/set/mocks/set_mock.go b/pkg/container/set/mocks/set_mock.go index 7f6f48193..9701c4602 100644 --- a/pkg/container/set/mocks/set_mock.go +++ b/pkg/container/set/mocks/set_mock.go @@ -19,6 +19,7 @@ import ( type MockSet[T comparable] struct { ctrl *gomock.Controller recorder *MockSetMockRecorder[T] + isgomock struct{} } // MockSetMockRecorder is the mock recorder for MockSet. diff --git a/pkg/dfpath/mocks/dfpath_mock.go b/pkg/dfpath/mocks/dfpath_mock.go index 64d03bd94..ad0c50272 100644 --- a/pkg/dfpath/mocks/dfpath_mock.go +++ b/pkg/dfpath/mocks/dfpath_mock.go @@ -20,6 +20,7 @@ import ( type MockDfpath struct { ctrl *gomock.Controller recorder *MockDfpathMockRecorder + isgomock struct{} } // MockDfpathMockRecorder is the mock recorder for MockDfpath. diff --git a/pkg/digest/mocks/digest_reader_mock.go b/pkg/digest/mocks/digest_reader_mock.go index fd47e0516..7d37f9934 100644 --- a/pkg/digest/mocks/digest_reader_mock.go +++ b/pkg/digest/mocks/digest_reader_mock.go @@ -19,6 +19,7 @@ import ( type MockReader struct { ctrl *gomock.Controller recorder *MockReaderMockRecorder + isgomock struct{} } // MockReaderMockRecorder is the mock recorder for MockReader. diff --git a/pkg/gc/gc_mock.go b/pkg/gc/gc_mock.go index 371a49e78..a9632f934 100644 --- a/pkg/gc/gc_mock.go +++ b/pkg/gc/gc_mock.go @@ -19,6 +19,7 @@ import ( type MockGC struct { ctrl *gomock.Controller recorder *MockGCMockRecorder + isgomock struct{} } // MockGCMockRecorder is the mock recorder for MockGC. diff --git a/pkg/gc/logger_mock.go b/pkg/gc/logger_mock.go index 54df67f85..1409184cd 100644 --- a/pkg/gc/logger_mock.go +++ b/pkg/gc/logger_mock.go @@ -19,6 +19,7 @@ import ( type MockLogger struct { ctrl *gomock.Controller recorder *MockLoggerMockRecorder + isgomock struct{} } // MockLoggerMockRecorder is the mock recorder for MockLogger. diff --git a/pkg/gc/runner_mock.go b/pkg/gc/runner_mock.go index cd22300c4..953ed0398 100644 --- a/pkg/gc/runner_mock.go +++ b/pkg/gc/runner_mock.go @@ -19,6 +19,7 @@ import ( type MockRunner struct { ctrl *gomock.Controller recorder *MockRunnerMockRecorder + isgomock struct{} } // MockRunnerMockRecorder is the mock recorder for MockRunner. diff --git a/pkg/graph/dag/mocks/dag_mock.go b/pkg/graph/dag/mocks/dag_mock.go index 32d5337ea..ac185c751 100644 --- a/pkg/graph/dag/mocks/dag_mock.go +++ b/pkg/graph/dag/mocks/dag_mock.go @@ -20,6 +20,7 @@ import ( type MockDAG[T comparable] struct { ctrl *gomock.Controller recorder *MockDAGMockRecorder[T] + isgomock struct{} } // MockDAGMockRecorder is the mock recorder for MockDAG. diff --git a/pkg/graph/dg/mocks/dg_mock.go b/pkg/graph/dg/mocks/dg_mock.go index 96ff48e5c..d2a2c5790 100644 --- a/pkg/graph/dg/mocks/dg_mock.go +++ b/pkg/graph/dg/mocks/dg_mock.go @@ -20,6 +20,7 @@ import ( type MockDG[T comparable] struct { ctrl *gomock.Controller recorder *MockDGMockRecorder[T] + isgomock struct{} } // MockDGMockRecorder is the mock recorder for MockDG. diff --git a/pkg/objectstorage/mocks/objectstorage_mock.go b/pkg/objectstorage/mocks/objectstorage_mock.go index 5ca5899e3..c6b32e6a0 100644 --- a/pkg/objectstorage/mocks/objectstorage_mock.go +++ b/pkg/objectstorage/mocks/objectstorage_mock.go @@ -23,6 +23,7 @@ import ( type MockObjectStorage struct { ctrl *gomock.Controller recorder *MockObjectStorageMockRecorder + isgomock struct{} } // MockObjectStorageMockRecorder is the mock recorder for MockObjectStorage. diff --git a/pkg/rpc/cdnsystem/client/mocks/client_mock.go b/pkg/rpc/cdnsystem/client/mocks/client_mock.go index f7d715880..b05a311e8 100644 --- a/pkg/rpc/cdnsystem/client/mocks/client_mock.go +++ b/pkg/rpc/cdnsystem/client/mocks/client_mock.go @@ -23,6 +23,7 @@ import ( type MockClient struct { ctrl *gomock.Controller recorder *MockClientMockRecorder + isgomock struct{} } // MockClientMockRecorder is the mock recorder for MockClient. diff --git a/pkg/rpc/dfdaemon/client/mocks/client_v1_mock.go b/pkg/rpc/dfdaemon/client/mocks/client_v1_mock.go index af329d716..a5e1c7f05 100644 --- a/pkg/rpc/dfdaemon/client/mocks/client_v1_mock.go +++ b/pkg/rpc/dfdaemon/client/mocks/client_v1_mock.go @@ -23,6 +23,7 @@ import ( type MockV1 struct { ctrl *gomock.Controller recorder *MockV1MockRecorder + isgomock struct{} } // MockV1MockRecorder is the mock recorder for MockV1. diff --git a/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go index 0a4b58de5..7a2acc49a 100644 --- a/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go +++ b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go @@ -23,6 +23,7 @@ import ( type MockV2 struct { ctrl *gomock.Controller recorder *MockV2MockRecorder + isgomock struct{} } // MockV2MockRecorder is the mock recorder for MockV2. diff --git a/pkg/rpc/health/client/mocks/client_mock.go b/pkg/rpc/health/client/mocks/client_mock.go index 6f65fd171..129a274e0 100644 --- a/pkg/rpc/health/client/mocks/client_mock.go +++ b/pkg/rpc/health/client/mocks/client_mock.go @@ -22,6 +22,7 @@ import ( type MockClient struct { ctrl *gomock.Controller recorder *MockClientMockRecorder + isgomock struct{} } // MockClientMockRecorder is the mock recorder for MockClient. diff --git a/pkg/rpc/manager/client/mocks/client_v1_mock.go b/pkg/rpc/manager/client/mocks/client_v1_mock.go index 9e32fd692..5da058217 100644 --- a/pkg/rpc/manager/client/mocks/client_v1_mock.go +++ b/pkg/rpc/manager/client/mocks/client_v1_mock.go @@ -23,6 +23,7 @@ import ( type MockV1 struct { ctrl *gomock.Controller recorder *MockV1MockRecorder + isgomock struct{} } // MockV1MockRecorder is the mock recorder for MockV1. diff --git a/pkg/rpc/manager/client/mocks/client_v2_mock.go b/pkg/rpc/manager/client/mocks/client_v2_mock.go index 281171e90..4d7ce07d4 100644 --- a/pkg/rpc/manager/client/mocks/client_v2_mock.go +++ b/pkg/rpc/manager/client/mocks/client_v2_mock.go @@ -23,6 +23,7 @@ import ( type MockV2 struct { ctrl *gomock.Controller recorder *MockV2MockRecorder + isgomock struct{} } // MockV2MockRecorder is the mock recorder for MockV2. diff --git a/pkg/rpc/scheduler/client/mocks/client_v1_mock.go b/pkg/rpc/scheduler/client/mocks/client_v1_mock.go index 0e9b055b3..0b766d3a6 100644 --- a/pkg/rpc/scheduler/client/mocks/client_v1_mock.go +++ b/pkg/rpc/scheduler/client/mocks/client_v1_mock.go @@ -22,6 +22,7 @@ import ( type MockV1 struct { ctrl *gomock.Controller recorder *MockV1MockRecorder + isgomock struct{} } // MockV1MockRecorder is the mock recorder for MockV1. diff --git a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go index afabcaabf..38f31f4e3 100644 --- a/pkg/rpc/scheduler/client/mocks/client_v2_mock.go +++ b/pkg/rpc/scheduler/client/mocks/client_v2_mock.go @@ -23,6 +23,7 @@ import ( type MockV2 struct { ctrl *gomock.Controller recorder *MockV2MockRecorder + isgomock struct{} } // MockV2MockRecorder is the mock recorder for MockV2. diff --git a/pkg/source/mocks/mock_source_client.go b/pkg/source/mocks/mock_source_client.go index 3d1de355c..b00766130 100644 --- a/pkg/source/mocks/mock_source_client.go +++ b/pkg/source/mocks/mock_source_client.go @@ -20,6 +20,7 @@ import ( type MockResourceClient struct { ctrl *gomock.Controller recorder *MockResourceClientMockRecorder + isgomock struct{} } // MockResourceClientMockRecorder is the mock recorder for MockResourceClient. @@ -118,6 +119,7 @@ func (mr *MockResourceClientMockRecorder) IsSupportRange(request any) *gomock.Ca type MockResourceMetadataGetter struct { ctrl *gomock.Controller recorder *MockResourceMetadataGetterMockRecorder + isgomock struct{} } // MockResourceMetadataGetterMockRecorder is the mock recorder for MockResourceMetadataGetter. @@ -156,6 +158,7 @@ func (mr *MockResourceMetadataGetterMockRecorder) GetMetadata(request any) *gomo type MockResourceLister struct { ctrl *gomock.Controller recorder *MockResourceListerMockRecorder + isgomock struct{} } // MockResourceListerMockRecorder is the mock recorder for MockResourceLister. @@ -194,6 +197,7 @@ func (mr *MockResourceListerMockRecorder) List(request any) *gomock.Call { type MockClientManager struct { ctrl *gomock.Controller recorder *MockClientManagerMockRecorder + isgomock struct{} } // MockClientManagerMockRecorder is the mock recorder for MockClientManager. @@ -282,6 +286,7 @@ func (mr *MockClientManagerMockRecorder) UnRegister(scheme any) *gomock.Call { type MockHook struct { ctrl *gomock.Controller recorder *MockHookMockRecorder + isgomock struct{} } // MockHookMockRecorder is the mock recorder for MockHook. diff --git a/scheduler/announcer/mocks/announcer_mock.go b/scheduler/announcer/mocks/announcer_mock.go index ace08c76b..81076fc12 100644 --- a/scheduler/announcer/mocks/announcer_mock.go +++ b/scheduler/announcer/mocks/announcer_mock.go @@ -19,6 +19,7 @@ import ( type MockAnnouncer struct { ctrl *gomock.Controller recorder *MockAnnouncerMockRecorder + isgomock struct{} } // MockAnnouncerMockRecorder is the mock recorder for MockAnnouncer. diff --git a/scheduler/config/config.go b/scheduler/config/config.go index cbf7f1c9a..f2ccb5de2 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -49,6 +49,9 @@ type Config struct { // SeedPeer configuration. SeedPeer SeedPeerConfig `yaml:"seedPeer" mapstructure:"seedPeer"` + // Peer configuration. + Peer PeerConfig `yaml:"peer" mapstructure:"peer"` + // Host configuration. Host HostConfig `yaml:"host" mapstructure:"host"` @@ -231,6 +234,11 @@ type SeedPeerConfig struct { TaskDownloadTimeout time.Duration `yaml:"taskDownloadTimeout" mapstructure:"taskDownloadTimeout"` } +type PeerConfig struct { + // TLS client configuration. + TLS *GRPCTLSClientConfig `yaml:"tls" mapstructure:"tls"` +} + type KeepAliveConfig struct { // Keep alive interval. Interval time.Duration `yaml:"interval" mapstructure:"interval"` diff --git a/scheduler/config/mocks/dynconfig_mock.go b/scheduler/config/mocks/dynconfig_mock.go index f33532bf0..54f3e8a84 100644 --- a/scheduler/config/mocks/dynconfig_mock.go +++ b/scheduler/config/mocks/dynconfig_mock.go @@ -23,6 +23,7 @@ import ( type MockDynconfigInterface struct { ctrl *gomock.Controller recorder *MockDynconfigInterfaceMockRecorder + isgomock struct{} } // MockDynconfigInterfaceMockRecorder is the mock recorder for MockDynconfigInterface. @@ -261,6 +262,7 @@ func (mr *MockDynconfigInterfaceMockRecorder) Stop() *gomock.Call { type MockObserver struct { ctrl *gomock.Controller recorder *MockObserverMockRecorder + isgomock struct{} } // MockObserverMockRecorder is the mock recorder for MockObserver. diff --git a/scheduler/job/mocks/job_mock.go b/scheduler/job/mocks/job_mock.go index 96a994d8c..d008915ae 100644 --- a/scheduler/job/mocks/job_mock.go +++ b/scheduler/job/mocks/job_mock.go @@ -19,6 +19,7 @@ import ( type MockJob struct { ctrl *gomock.Controller recorder *MockJobMockRecorder + isgomock struct{} } // MockJobMockRecorder is the mock recorder for MockJob. diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go index 0ed0e87e8..a338e7e0f 100644 --- a/scheduler/resource/persistentcache/host_manager.go +++ b/scheduler/resource/persistentcache/host_manager.go @@ -37,10 +37,10 @@ type HostManager interface { Load(context.Context, string) (*Host, bool) // Store sets host. - Store(context.Context, *Host) + Store(context.Context, *Host) error // Delete deletes host by a key. - Delete(context.Context, string) + Delete(context.Context, string) error // LoadAll returns all hosts. LoadAll(context.Context) ([]*Host, error) @@ -427,8 +427,8 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { } // Store sets host. -func (t *hostManager) Store(ctx context.Context, host *Host) { - t.rdb.HSet(ctx, +func (t *hostManager) Store(ctx context.Context, host *Host) error { + _, err := t.rdb.HSet(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, host.ID), "id", host.ID, "type", host.Type.Name(), @@ -488,12 +488,15 @@ func (t *hostManager) Store(ctx context.Context, host *Host) { "upload_count", host.UploadCount, "upload_failed_count", host.UploadFailedCount, "created_at", host.CreatedAt.Format(time.RFC3339), - "updated_at", host.UpdatedAt.Format(time.RFC3339)) + "updated_at", host.UpdatedAt.Format(time.RFC3339)).Result() + + return err } // Delete deletes host by a key. -func (t *hostManager) Delete(ctx context.Context, hostID string) { - t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)) +func (t *hostManager) Delete(ctx context.Context, hostID string) error { + _, err := t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result() + return err } // LoadAll returns all hosts. diff --git a/scheduler/resource/persistentcache/host_manager_mock.go b/scheduler/resource/persistentcache/host_manager_mock.go index dd682f3fd..68cee0e56 100644 --- a/scheduler/resource/persistentcache/host_manager_mock.go +++ b/scheduler/resource/persistentcache/host_manager_mock.go @@ -20,6 +20,7 @@ import ( type MockHostManager struct { ctrl *gomock.Controller recorder *MockHostManagerMockRecorder + isgomock struct{} } // MockHostManagerMockRecorder is the mock recorder for MockHostManager. @@ -40,9 +41,11 @@ func (m *MockHostManager) EXPECT() *MockHostManagerMockRecorder { } // Delete mocks base method. -func (m *MockHostManager) Delete(arg0 context.Context, arg1 string) { +func (m *MockHostManager) Delete(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "Delete", arg0, arg1) + ret := m.ctrl.Call(m, "Delete", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 } // Delete indicates an expected call of Delete. @@ -82,9 +85,11 @@ func (mr *MockHostManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { } // Store mocks base method. -func (m *MockHostManager) Store(arg0 context.Context, arg1 *Host) { +func (m *MockHostManager) Store(arg0 context.Context, arg1 *Host) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "Store", arg0, arg1) + ret := m.ctrl.Call(m, "Store", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 } // Store indicates an expected call of Store. diff --git a/scheduler/resource/persistentcache/peer_manager.go b/scheduler/resource/persistentcache/peer_manager.go index 510c9ff2a..199067094 100644 --- a/scheduler/resource/persistentcache/peer_manager.go +++ b/scheduler/resource/persistentcache/peer_manager.go @@ -22,14 +22,20 @@ import ( "context" "encoding/json" "errors" + "fmt" "strconv" "time" "github.com/bits-and-blooms/bitset" - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" logger "d7y.io/dragonfly/v2/internal/dflog" pkgredis "d7y.io/dragonfly/v2/pkg/redis" + dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" "d7y.io/dragonfly/v2/scheduler/config" ) @@ -46,6 +52,12 @@ type PeerManager interface { // LoadAll returns all peers. LoadAll(context.Context) ([]*Peer, error) + + // LoadAllByTaskID returns all peers by task id. + LoadAllByTaskID(context.Context, string) ([]*Peer, error) + + // DeleteAllByTaskID deletes all peers by task id. + DeleteAllByTaskID(context.Context, string) error } // peerManager contains content for peer manager. @@ -61,11 +73,14 @@ type peerManager struct { // Redis universal client interface. rdb redis.UniversalClient + + // transportCredentials is used to mTLS for peer grpc connection. + transportCredentials credentials.TransportCredentials } // New peer manager interface. -func newPeerManager(cfg *config.Config, rdb redis.UniversalClient, taskManager TaskManager, hostManager HostManager) PeerManager { - return &peerManager{config: cfg, rdb: rdb, taskManager: taskManager, hostManager: hostManager} +func newPeerManager(cfg *config.Config, rdb redis.UniversalClient, taskManager TaskManager, hostManager HostManager, transportCredentials credentials.TransportCredentials) PeerManager { + return &peerManager{config: cfg, rdb: rdb, taskManager: taskManager, hostManager: hostManager, transportCredentials: transportCredentials} } // Load returns persistent cache peer by a key. @@ -157,7 +172,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error { if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { // Store peer information and set expiration. - pipe.HSet(ctx, + if _, err := pipe.HSet(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), "id", peer.ID, "persistent", peer.Persistent, @@ -168,15 +183,33 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error { "host_id", peer.Host.ID, "ttl", peer.Cost, "created_at", peer.CreatedAt.Format(time.RFC3339), - "updated_at", peer.UpdatedAt.Format(time.RFC3339)) - pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), peer.Task.TTL) + "updated_at", peer.UpdatedAt.Format(time.RFC3339)).Result(); err != nil { + peer.Log.Errorf("store peer failed: %v", err) + return err + } - // Store the association with task and set expiration. - pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID) - pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.Task.TTL) + if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), peer.Task.TTL).Result(); err != nil { + peer.Log.Errorf("set peer ttl failed: %v", err) + return err + } + + // Store the joint-set with task and set expiration. + if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID).Result(); err != nil { + peer.Log.Errorf("add peer id to task joint-set failed: %v", err) + return err + } + + if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.Task.TTL).Result(); err != nil { + peer.Log.Errorf("set task joint-set ttl failed: %v", err) + return err + } + + // Store the joint-set with host. + if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.ID).Result(); err != nil { + peer.Log.Errorf("add peer id to host joint-set failed: %v", err) + return err + } - // Store the association with host. - pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.ID) return nil }); err != nil { peer.Log.Errorf("store peer failed: %v", err) @@ -195,9 +228,21 @@ func (p *peerManager) Delete(ctx context.Context, peerID string) error { return errors.New("getting peer failed from redis") } - pipe.Del(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)) - pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID) - pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID) + if _, err := pipe.Del(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result(); err != nil { + log.Errorf("delete peer failed: %v", err) + return err + } + + if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil { + log.Errorf("delete peer id from task joint-set failed: %v", err) + return err + } + + if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID).Result(); err != nil { + log.Errorf("delete peer id from host joint-set failed: %v", err) + return err + } + return nil }); err != nil { log.Errorf("store peer failed: %v", err) @@ -243,3 +288,58 @@ func (p *peerManager) LoadAll(ctx context.Context) ([]*Peer, error) { return peers, nil } + +// LoadAllByTaskID returns all persistent cache peers by task id. +func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) { + log := logger.WithTaskID(taskID) + peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result() + if err != nil { + log.Error("get peer ids failed") + return nil, err + } + + peers := make([]*Peer, 0, len(peerIDs)) + for _, peerID := range peerIDs { + peer, loaded := p.Load(ctx, peerID) + if !loaded { + log.Errorf("load peer %s failed", peerID) + continue + } + + peers = append(peers, peer) + } + + return peers, nil +} + +// DeleteAllByTaskID deletes all persistent cache peers by task id. +func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error { + log := logger.WithTaskID(taskID) + peers, err := p.LoadAllByTaskID(ctx, taskID) + if err != nil { + log.Error("load peers failed") + return err + } + + for _, peer := range peers { + addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port) + client, err := dfdaemonclient.GetV2ByAddr(ctx, addr, grpc.WithTransportCredentials(p.transportCredentials)) + if err != nil { + log.Errorf("get dfdaemon client failed: %v", err) + continue + } + + if err := client.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: taskID}); err != nil { + log.Errorf("delete task %s failed", taskID) + continue + } + + if err := p.Delete(ctx, peer.ID); err != nil { + log.Errorf("delete peer %s failed", peer.ID) + continue + } + } + + p.taskManager.Delete(ctx, taskID) + return nil +} diff --git a/scheduler/resource/persistentcache/peer_manager_mock.go b/scheduler/resource/persistentcache/peer_manager_mock.go index bbbc5322d..4231d7e5e 100644 --- a/scheduler/resource/persistentcache/peer_manager_mock.go +++ b/scheduler/resource/persistentcache/peer_manager_mock.go @@ -20,6 +20,7 @@ import ( type MockPeerManager struct { ctrl *gomock.Controller recorder *MockPeerManagerMockRecorder + isgomock struct{} } // MockPeerManagerMockRecorder is the mock recorder for MockPeerManager. @@ -53,6 +54,20 @@ func (mr *MockPeerManagerMockRecorder) Delete(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockPeerManager)(nil).Delete), arg0, arg1) } +// DeleteAllByTaskID mocks base method. +func (m *MockPeerManager) DeleteAllByTaskID(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteAllByTaskID", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteAllByTaskID indicates an expected call of DeleteAllByTaskID. +func (mr *MockPeerManagerMockRecorder) DeleteAllByTaskID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAllByTaskID", reflect.TypeOf((*MockPeerManager)(nil).DeleteAllByTaskID), arg0, arg1) +} + // Load mocks base method. func (m *MockPeerManager) Load(arg0 context.Context, arg1 string) (*Peer, bool) { m.ctrl.T.Helper() @@ -83,6 +98,21 @@ func (mr *MockPeerManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockPeerManager)(nil).LoadAll), arg0) } +// LoadAllByTaskID mocks base method. +func (m *MockPeerManager) LoadAllByTaskID(arg0 context.Context, arg1 string) ([]*Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadAllByTaskID", arg0, arg1) + ret0, _ := ret[0].([]*Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadAllByTaskID indicates an expected call of LoadAllByTaskID. +func (mr *MockPeerManagerMockRecorder) LoadAllByTaskID(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAllByTaskID", reflect.TypeOf((*MockPeerManager)(nil).LoadAllByTaskID), arg0, arg1) +} + // Store mocks base method. func (m *MockPeerManager) Store(arg0 context.Context, arg1 *Peer) error { m.ctrl.T.Helper() diff --git a/scheduler/resource/persistentcache/resource.go b/scheduler/resource/persistentcache/resource.go index ae706d462..d1c0b0705 100644 --- a/scheduler/resource/persistentcache/resource.go +++ b/scheduler/resource/persistentcache/resource.go @@ -19,7 +19,8 @@ package persistentcache import ( - "github.com/redis/go-redis/v9" + redis "github.com/redis/go-redis/v9" + "google.golang.org/grpc/credentials" "d7y.io/dragonfly/v2/scheduler/config" ) @@ -49,10 +50,10 @@ type resource struct { } // New returns Resource interface. -func New(cfg *config.Config, rdb redis.UniversalClient) Resource { +func New(cfg *config.Config, rdb redis.UniversalClient, transportCredentials credentials.TransportCredentials) Resource { taskManager := newTaskManager(cfg, rdb) hostManager := newHostManager(cfg, rdb) - peerManager := newPeerManager(cfg, rdb, taskManager, hostManager) + peerManager := newPeerManager(cfg, rdb, taskManager, hostManager, transportCredentials) return &resource{peerManager, taskManager, hostManager} } diff --git a/scheduler/resource/persistentcache/resource_mock.go b/scheduler/resource/persistentcache/resource_mock.go index 213586874..d536ef903 100644 --- a/scheduler/resource/persistentcache/resource_mock.go +++ b/scheduler/resource/persistentcache/resource_mock.go @@ -19,6 +19,7 @@ import ( type MockResource struct { ctrl *gomock.Controller recorder *MockResourceMockRecorder + isgomock struct{} } // MockResourceMockRecorder is the mock recorder for MockResource. diff --git a/scheduler/resource/persistentcache/task_manager.go b/scheduler/resource/persistentcache/task_manager.go index 506b6e049..9789cea72 100644 --- a/scheduler/resource/persistentcache/task_manager.go +++ b/scheduler/resource/persistentcache/task_manager.go @@ -40,7 +40,7 @@ type TaskManager interface { Store(context.Context, *Task) error // Delete deletes persistent cache task by a key. - Delete(context.Context, string) + Delete(context.Context, string) error // LoadAll returns all persistent cache tasks. LoadAll(context.Context) ([]*Task, error) @@ -147,7 +147,7 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { // Store sets persistent cache task. func (t *taskManager) Store(ctx context.Context, task *Task) error { if _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - pipe.HSet(ctx, + if _, err := pipe.HSet(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), "id", task.ID, "persistent_replica_count", task.PersistentReplicaCount, @@ -161,9 +161,16 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error { "state", task.FSM.Current(), "ttl", task.TTL, "created_at", task.CreatedAt.Format(time.RFC3339), - "updated_at", task.UpdatedAt.Format(time.RFC3339)) + "updated_at", task.UpdatedAt.Format(time.RFC3339)).Result(); err != nil { + task.Log.Errorf("store task failed: %v", err) + return err + } + + if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL).Result(); err != nil { + task.Log.Errorf("set task ttl failed: %v", err) + return err + } - pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL) return nil }); err != nil { task.Log.Errorf("store task failed: %v", err) @@ -174,8 +181,9 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error { } // Delete deletes persistent cache task by a key. -func (t *taskManager) Delete(ctx context.Context, taskID string) { - t.rdb.Del(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)) +func (t *taskManager) Delete(ctx context.Context, taskID string) error { + _, err := t.rdb.Del(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result() + return err } // LoadAll returns all persistent cache tasks. diff --git a/scheduler/resource/persistentcache/task_manager_mock.go b/scheduler/resource/persistentcache/task_manager_mock.go index 5e2cbaed4..4b5c30880 100644 --- a/scheduler/resource/persistentcache/task_manager_mock.go +++ b/scheduler/resource/persistentcache/task_manager_mock.go @@ -20,6 +20,7 @@ import ( type MockTaskManager struct { ctrl *gomock.Controller recorder *MockTaskManagerMockRecorder + isgomock struct{} } // MockTaskManagerMockRecorder is the mock recorder for MockTaskManager. @@ -40,9 +41,11 @@ func (m *MockTaskManager) EXPECT() *MockTaskManagerMockRecorder { } // Delete mocks base method. -func (m *MockTaskManager) Delete(arg0 context.Context, arg1 string) { +func (m *MockTaskManager) Delete(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() - m.ctrl.Call(m, "Delete", arg0, arg1) + ret := m.ctrl.Call(m, "Delete", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 } // Delete indicates an expected call of Delete. diff --git a/scheduler/resource/standard/host_manager_mock.go b/scheduler/resource/standard/host_manager_mock.go index fe8a49af9..7e8affa80 100644 --- a/scheduler/resource/standard/host_manager_mock.go +++ b/scheduler/resource/standard/host_manager_mock.go @@ -20,6 +20,7 @@ import ( type MockHostManager struct { ctrl *gomock.Controller recorder *MockHostManagerMockRecorder + isgomock struct{} } // MockHostManagerMockRecorder is the mock recorder for MockHostManager. diff --git a/scheduler/resource/standard/peer_manager_mock.go b/scheduler/resource/standard/peer_manager_mock.go index d05449ad4..f3439b268 100644 --- a/scheduler/resource/standard/peer_manager_mock.go +++ b/scheduler/resource/standard/peer_manager_mock.go @@ -19,6 +19,7 @@ import ( type MockPeerManager struct { ctrl *gomock.Controller recorder *MockPeerManagerMockRecorder + isgomock struct{} } // MockPeerManagerMockRecorder is the mock recorder for MockPeerManager. diff --git a/scheduler/resource/standard/resource_mock.go b/scheduler/resource/standard/resource_mock.go index a2e8d877f..515198793 100644 --- a/scheduler/resource/standard/resource_mock.go +++ b/scheduler/resource/standard/resource_mock.go @@ -19,6 +19,7 @@ import ( type MockResource struct { ctrl *gomock.Controller recorder *MockResourceMockRecorder + isgomock struct{} } // MockResourceMockRecorder is the mock recorder for MockResource. diff --git a/scheduler/resource/standard/seed_peer_client_mock.go b/scheduler/resource/standard/seed_peer_client_mock.go index 2a66b57b7..3eb7a6f45 100644 --- a/scheduler/resource/standard/seed_peer_client_mock.go +++ b/scheduler/resource/standard/seed_peer_client_mock.go @@ -26,6 +26,7 @@ import ( type MockSeedPeerClient struct { ctrl *gomock.Controller recorder *MockSeedPeerClientMockRecorder + isgomock struct{} } // MockSeedPeerClientMockRecorder is the mock recorder for MockSeedPeerClient. diff --git a/scheduler/resource/standard/seed_peer_mock.go b/scheduler/resource/standard/seed_peer_mock.go index 85cd577af..9dbd5c254 100644 --- a/scheduler/resource/standard/seed_peer_mock.go +++ b/scheduler/resource/standard/seed_peer_mock.go @@ -23,6 +23,7 @@ import ( type MockSeedPeer struct { ctrl *gomock.Controller recorder *MockSeedPeerMockRecorder + isgomock struct{} } // MockSeedPeerMockRecorder is the mock recorder for MockSeedPeer. diff --git a/scheduler/resource/standard/task_manager_mock.go b/scheduler/resource/standard/task_manager_mock.go index 9e2d38a2b..b3ea2b81d 100644 --- a/scheduler/resource/standard/task_manager_mock.go +++ b/scheduler/resource/standard/task_manager_mock.go @@ -19,6 +19,7 @@ import ( type MockTaskManager struct { ctrl *gomock.Controller recorder *MockTaskManagerMockRecorder + isgomock struct{} } // MockTaskManagerMockRecorder is the mock recorder for MockTaskManager. diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 99f443724..39bd52958 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -158,32 +158,42 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err // Initialize GC. s.gc = gc.New(gc.WithLogger(logger.GCLogger)) - // Initialize client transport credentials. - clientTransportCredentials := rpc.NewInsecureCredentials() + // Initialize seed peer client transport credentials. + seedPeerClientTransportCredentials := rpc.NewInsecureCredentials() if cfg.SeedPeer.TLS != nil { - clientTransportCredentials, err = rpc.NewClientCredentials(cfg.SeedPeer.TLS.CACert, cfg.SeedPeer.TLS.Cert, cfg.SeedPeer.TLS.Key) + seedPeerClientTransportCredentials, err = rpc.NewClientCredentials(cfg.SeedPeer.TLS.CACert, cfg.SeedPeer.TLS.Cert, cfg.SeedPeer.TLS.Key) if err != nil { - logger.Errorf("failed to create client credentials: %v", err) + logger.Errorf("failed to create seed peer client credentials: %v", err) return nil, err } } // Initialize dynconfig. - dynconfig, err := config.NewDynconfig(s.managerClient, filepath.Join(d.CacheDir(), dynconfig.CacheDirName), cfg, clientTransportCredentials) + dynconfig, err := config.NewDynconfig(s.managerClient, filepath.Join(d.CacheDir(), dynconfig.CacheDirName), cfg, seedPeerClientTransportCredentials) if err != nil { return nil, err } s.dynconfig = dynconfig // Initialize resource. - resource, err := standard.New(cfg, s.gc, dynconfig, clientTransportCredentials) + resource, err := standard.New(cfg, s.gc, dynconfig, seedPeerClientTransportCredentials) if err != nil { return nil, err } s.resource = resource + // Initialize seed peer client transport credentials. + peerClientTransportCredentials := rpc.NewInsecureCredentials() + if cfg.Peer.TLS != nil { + peerClientTransportCredentials, err = rpc.NewClientCredentials(cfg.Peer.TLS.CACert, cfg.Peer.TLS.Cert, cfg.Peer.TLS.Key) + if err != nil { + logger.Errorf("failed to create peer client credentials: %v", err) + return nil, err + } + } + // Initialize persistent cache resource. - s.persistentCacheResource = persistentcache.New(cfg, rdb) + s.persistentCacheResource = persistentcache.New(cfg, rdb, peerClientTransportCredentials) // Initialize job service. if cfg.Job.Enable && rdb != nil { diff --git a/scheduler/scheduling/mocks/scheduling_mock.go b/scheduler/scheduling/mocks/scheduling_mock.go index 605ab99e5..e42272737 100644 --- a/scheduler/scheduling/mocks/scheduling_mock.go +++ b/scheduler/scheduling/mocks/scheduling_mock.go @@ -22,6 +22,7 @@ import ( type MockScheduling struct { ctrl *gomock.Controller recorder *MockSchedulingMockRecorder + isgomock struct{} } // MockSchedulingMockRecorder is the mock recorder for MockScheduling. diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 1162a20b3..f111263d7 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -1545,8 +1545,7 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP }, nil } -// TODO Implement the following methods. // DeletePersistentCacheTask releases persistent cache task in scheduler. func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.DeletePersistentCacheTaskRequest) error { - return nil + return v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId()) } diff --git a/scheduler/storage/mocks/storage_mock.go b/scheduler/storage/mocks/storage_mock.go index 1474c1917..dc388b26d 100644 --- a/scheduler/storage/mocks/storage_mock.go +++ b/scheduler/storage/mocks/storage_mock.go @@ -21,6 +21,7 @@ import ( type MockStorage struct { ctrl *gomock.Controller recorder *MockStorageMockRecorder + isgomock struct{} } // MockStorageMockRecorder is the mock recorder for MockStorage.