feat: implement delete persistent cache task in scheduler (#3619)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-10-29 22:24:18 +08:00 committed by GitHub
parent c1ed128a93
commit 58959be0c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
63 changed files with 277 additions and 45 deletions

View File

@ -22,6 +22,7 @@ import (
type MockDynconfig struct {
ctrl *gomock.Controller
recorder *MockDynconfigMockRecorder
isgomock struct{}
}
// MockDynconfigMockRecorder is the mock recorder for MockDynconfig.
@ -226,6 +227,7 @@ func (mr *MockDynconfigMockRecorder) Stop() *gomock.Call {
type MockObserver struct {
ctrl *gomock.Controller
recorder *MockObserverMockRecorder
isgomock struct{}
}
// MockObserverMockRecorder is the mock recorder for MockObserver.

View File

@ -19,6 +19,7 @@ import (
type MockAnnouncer struct {
ctrl *gomock.Controller
recorder *MockAnnouncerMockRecorder
isgomock struct{}
}
// MockAnnouncerMockRecorder is the mock recorder for MockAnnouncer.

View File

@ -20,6 +20,7 @@ import (
type MockObjectStorage struct {
ctrl *gomock.Controller
recorder *MockObjectStorageMockRecorder
isgomock struct{}
}
// MockObjectStorageMockRecorder is the mock recorder for MockObjectStorage.

View File

@ -26,6 +26,7 @@ import (
type MockTaskManager struct {
ctrl *gomock.Controller
recorder *MockTaskManagerMockRecorder
isgomock struct{}
}
// MockTaskManagerMockRecorder is the mock recorder for MockTaskManager.
@ -183,6 +184,7 @@ func (mr *MockTaskManagerMockRecorder) Subscribe(request any) *gomock.Call {
type MockTask struct {
ctrl *gomock.Controller
recorder *MockTaskMockRecorder
isgomock struct{}
}
// MockTaskMockRecorder is the mock recorder for MockTask.
@ -416,6 +418,7 @@ func (mr *MockTaskMockRecorder) UpdateSourceErrorStatus(st any) *gomock.Call {
type MockLogger struct {
ctrl *gomock.Controller
recorder *MockLoggerMockRecorder
isgomock struct{}
}
// MockLoggerMockRecorder is the mock recorder for MockLogger.

View File

@ -21,6 +21,7 @@ import (
type MockPieceDownloader struct {
ctrl *gomock.Controller
recorder *MockPieceDownloaderMockRecorder
isgomock struct{}
}
// MockPieceDownloaderMockRecorder is the mock recorder for MockPieceDownloader.

View File

@ -25,6 +25,7 @@ import (
type MockPieceManager struct {
ctrl *gomock.Controller
recorder *MockPieceManagerMockRecorder
isgomock struct{}
}
// MockPieceManagerMockRecorder is the mock recorder for MockPieceManager.

View File

@ -21,6 +21,7 @@ import (
type MockManager struct {
ctrl *gomock.Controller
recorder *MockManagerMockRecorder
isgomock struct{}
}
// MockManagerMockRecorder is the mock recorder for MockManager.
@ -112,6 +113,7 @@ func (mr *MockManagerMockRecorder) Watch(arg0 any) *gomock.Call {
type MockConfigWatcher struct {
ctrl *gomock.Controller
recorder *MockConfigWatcherMockRecorder
isgomock struct{}
}
// MockConfigWatcherMockRecorder is the mock recorder for MockConfigWatcher.

View File

@ -23,6 +23,7 @@ import (
type MockServer struct {
ctrl *gomock.Controller
recorder *MockServerMockRecorder
isgomock struct{}
}
// MockServerMockRecorder is the mock recorder for MockServer.
@ -124,6 +125,7 @@ func (mr *MockServerMockRecorder) Stop() *gomock.Call {
type MockResultSender struct {
ctrl *gomock.Controller
recorder *MockResultSenderMockRecorder
isgomock struct{}
}
// MockResultSenderMockRecorder is the mock recorder for MockResultSender.

View File

@ -26,6 +26,7 @@ import (
type MockTaskStorageDriver struct {
ctrl *gomock.Controller
recorder *MockTaskStorageDriverMockRecorder
isgomock struct{}
}
// MockTaskStorageDriverMockRecorder is the mock recorder for MockTaskStorageDriver.
@ -197,6 +198,7 @@ func (mr *MockTaskStorageDriverMockRecorder) WritePiece(ctx, req any) *gomock.Ca
type MockReclaimer struct {
ctrl *gomock.Controller
recorder *MockReclaimerMockRecorder
isgomock struct{}
}
// MockReclaimerMockRecorder is the mock recorder for MockReclaimer.
@ -260,6 +262,7 @@ func (mr *MockReclaimerMockRecorder) Reclaim() *gomock.Call {
type MockManager struct {
ctrl *gomock.Controller
recorder *MockManagerMockRecorder
isgomock struct{}
}
// MockManagerMockRecorder is the mock recorder for MockManager.

View File

@ -20,6 +20,7 @@ import (
type MockManager struct {
ctrl *gomock.Controller
recorder *MockManagerMockRecorder
isgomock struct{}
}
// MockManagerMockRecorder is the mock recorder for MockManager.

View File

@ -24,6 +24,7 @@ import (
type MockDfstore struct {
ctrl *gomock.Controller
recorder *MockDfstoreMockRecorder
isgomock struct{}
}
// MockDfstoreMockRecorder is the mock recorder for MockDfstore.

View File

@ -20,6 +20,7 @@ import (
type MockKeepAlive struct {
ctrl *gomock.Controller
recorder *MockKeepAliveMockRecorder
isgomock struct{}
}
// MockKeepAliveMockRecorder is the mock recorder for MockKeepAlive.

View File

@ -19,6 +19,7 @@ import (
type MockDynconfig[T any] struct {
ctrl *gomock.Controller
recorder *MockDynconfigMockRecorder[T]
isgomock struct{}
}
// MockDynconfigMockRecorder is the mock recorder for MockDynconfig.

View File

@ -19,6 +19,7 @@ import (
type MockManagerClient struct {
ctrl *gomock.Controller
recorder *MockManagerClientMockRecorder
isgomock struct{}
}
// MockManagerClientMockRecorder is the mock recorder for MockManagerClient.

View File

@ -21,6 +21,7 @@ import (
type MockOauth struct {
ctrl *gomock.Controller
recorder *MockOauthMockRecorder
isgomock struct{}
}
// MockOauthMockRecorder is the mock recorder for MockOauth.

View File

@ -19,6 +19,7 @@ import (
type MockGC struct {
ctrl *gomock.Controller
recorder *MockGCMockRecorder
isgomock struct{}
}
// MockGCMockRecorder is the mock recorder for MockGC.

View File

@ -23,6 +23,7 @@ import (
type MockPreheat struct {
ctrl *gomock.Controller
recorder *MockPreheatMockRecorder
isgomock struct{}
}
// MockPreheatMockRecorder is the mock recorder for MockPreheat.

View File

@ -20,6 +20,7 @@ import (
type MockSyncPeers struct {
ctrl *gomock.Controller
recorder *MockSyncPeersMockRecorder
isgomock struct{}
}
// MockSyncPeersMockRecorder is the mock recorder for MockSyncPeers.

View File

@ -23,6 +23,7 @@ import (
type MockTask struct {
ctrl *gomock.Controller
recorder *MockTaskMockRecorder
isgomock struct{}
}
// MockTaskMockRecorder is the mock recorder for MockTask.

View File

@ -22,6 +22,7 @@ import (
type MockSearcher struct {
ctrl *gomock.Controller
recorder *MockSearcherMockRecorder
isgomock struct{}
}
// MockSearcherMockRecorder is the mock recorder for MockSearcher.

View File

@ -25,6 +25,7 @@ import (
type MockService struct {
ctrl *gomock.Controller
recorder *MockServiceMockRecorder
isgomock struct{}
}
// MockServiceMockRecorder is the mock recorder for MockService.

View File

@ -21,6 +21,7 @@ import (
type MockCache struct {
ctrl *gomock.Controller
recorder *MockCacheMockRecorder
isgomock struct{}
}
// MockCacheMockRecorder is the mock recorder for MockCache.

View File

@ -19,6 +19,7 @@ import (
type MockSafeSet[T comparable] struct {
ctrl *gomock.Controller
recorder *MockSafeSetMockRecorder[T]
isgomock struct{}
}
// MockSafeSetMockRecorder is the mock recorder for MockSafeSet.

View File

@ -19,6 +19,7 @@ import (
type MockSet[T comparable] struct {
ctrl *gomock.Controller
recorder *MockSetMockRecorder[T]
isgomock struct{}
}
// MockSetMockRecorder is the mock recorder for MockSet.

View File

@ -20,6 +20,7 @@ import (
type MockDfpath struct {
ctrl *gomock.Controller
recorder *MockDfpathMockRecorder
isgomock struct{}
}
// MockDfpathMockRecorder is the mock recorder for MockDfpath.

View File

@ -19,6 +19,7 @@ import (
type MockReader struct {
ctrl *gomock.Controller
recorder *MockReaderMockRecorder
isgomock struct{}
}
// MockReaderMockRecorder is the mock recorder for MockReader.

View File

@ -19,6 +19,7 @@ import (
type MockGC struct {
ctrl *gomock.Controller
recorder *MockGCMockRecorder
isgomock struct{}
}
// MockGCMockRecorder is the mock recorder for MockGC.

View File

@ -19,6 +19,7 @@ import (
type MockLogger struct {
ctrl *gomock.Controller
recorder *MockLoggerMockRecorder
isgomock struct{}
}
// MockLoggerMockRecorder is the mock recorder for MockLogger.

View File

@ -19,6 +19,7 @@ import (
type MockRunner struct {
ctrl *gomock.Controller
recorder *MockRunnerMockRecorder
isgomock struct{}
}
// MockRunnerMockRecorder is the mock recorder for MockRunner.

View File

@ -20,6 +20,7 @@ import (
type MockDAG[T comparable] struct {
ctrl *gomock.Controller
recorder *MockDAGMockRecorder[T]
isgomock struct{}
}
// MockDAGMockRecorder is the mock recorder for MockDAG.

View File

@ -20,6 +20,7 @@ import (
type MockDG[T comparable] struct {
ctrl *gomock.Controller
recorder *MockDGMockRecorder[T]
isgomock struct{}
}
// MockDGMockRecorder is the mock recorder for MockDG.

View File

@ -23,6 +23,7 @@ import (
type MockObjectStorage struct {
ctrl *gomock.Controller
recorder *MockObjectStorageMockRecorder
isgomock struct{}
}
// MockObjectStorageMockRecorder is the mock recorder for MockObjectStorage.

View File

@ -23,6 +23,7 @@ import (
type MockClient struct {
ctrl *gomock.Controller
recorder *MockClientMockRecorder
isgomock struct{}
}
// MockClientMockRecorder is the mock recorder for MockClient.

View File

@ -23,6 +23,7 @@ import (
type MockV1 struct {
ctrl *gomock.Controller
recorder *MockV1MockRecorder
isgomock struct{}
}
// MockV1MockRecorder is the mock recorder for MockV1.

View File

@ -23,6 +23,7 @@ import (
type MockV2 struct {
ctrl *gomock.Controller
recorder *MockV2MockRecorder
isgomock struct{}
}
// MockV2MockRecorder is the mock recorder for MockV2.

View File

@ -22,6 +22,7 @@ import (
type MockClient struct {
ctrl *gomock.Controller
recorder *MockClientMockRecorder
isgomock struct{}
}
// MockClientMockRecorder is the mock recorder for MockClient.

View File

@ -23,6 +23,7 @@ import (
type MockV1 struct {
ctrl *gomock.Controller
recorder *MockV1MockRecorder
isgomock struct{}
}
// MockV1MockRecorder is the mock recorder for MockV1.

View File

@ -23,6 +23,7 @@ import (
type MockV2 struct {
ctrl *gomock.Controller
recorder *MockV2MockRecorder
isgomock struct{}
}
// MockV2MockRecorder is the mock recorder for MockV2.

View File

@ -22,6 +22,7 @@ import (
type MockV1 struct {
ctrl *gomock.Controller
recorder *MockV1MockRecorder
isgomock struct{}
}
// MockV1MockRecorder is the mock recorder for MockV1.

View File

@ -23,6 +23,7 @@ import (
type MockV2 struct {
ctrl *gomock.Controller
recorder *MockV2MockRecorder
isgomock struct{}
}
// MockV2MockRecorder is the mock recorder for MockV2.

View File

@ -20,6 +20,7 @@ import (
type MockResourceClient struct {
ctrl *gomock.Controller
recorder *MockResourceClientMockRecorder
isgomock struct{}
}
// MockResourceClientMockRecorder is the mock recorder for MockResourceClient.
@ -118,6 +119,7 @@ func (mr *MockResourceClientMockRecorder) IsSupportRange(request any) *gomock.Ca
type MockResourceMetadataGetter struct {
ctrl *gomock.Controller
recorder *MockResourceMetadataGetterMockRecorder
isgomock struct{}
}
// MockResourceMetadataGetterMockRecorder is the mock recorder for MockResourceMetadataGetter.
@ -156,6 +158,7 @@ func (mr *MockResourceMetadataGetterMockRecorder) GetMetadata(request any) *gomo
type MockResourceLister struct {
ctrl *gomock.Controller
recorder *MockResourceListerMockRecorder
isgomock struct{}
}
// MockResourceListerMockRecorder is the mock recorder for MockResourceLister.
@ -194,6 +197,7 @@ func (mr *MockResourceListerMockRecorder) List(request any) *gomock.Call {
type MockClientManager struct {
ctrl *gomock.Controller
recorder *MockClientManagerMockRecorder
isgomock struct{}
}
// MockClientManagerMockRecorder is the mock recorder for MockClientManager.
@ -282,6 +286,7 @@ func (mr *MockClientManagerMockRecorder) UnRegister(scheme any) *gomock.Call {
type MockHook struct {
ctrl *gomock.Controller
recorder *MockHookMockRecorder
isgomock struct{}
}
// MockHookMockRecorder is the mock recorder for MockHook.

View File

@ -19,6 +19,7 @@ import (
type MockAnnouncer struct {
ctrl *gomock.Controller
recorder *MockAnnouncerMockRecorder
isgomock struct{}
}
// MockAnnouncerMockRecorder is the mock recorder for MockAnnouncer.

View File

@ -49,6 +49,9 @@ type Config struct {
// SeedPeer configuration.
SeedPeer SeedPeerConfig `yaml:"seedPeer" mapstructure:"seedPeer"`
// Peer configuration.
Peer PeerConfig `yaml:"peer" mapstructure:"peer"`
// Host configuration.
Host HostConfig `yaml:"host" mapstructure:"host"`
@ -231,6 +234,11 @@ type SeedPeerConfig struct {
TaskDownloadTimeout time.Duration `yaml:"taskDownloadTimeout" mapstructure:"taskDownloadTimeout"`
}
type PeerConfig struct {
// TLS client configuration.
TLS *GRPCTLSClientConfig `yaml:"tls" mapstructure:"tls"`
}
type KeepAliveConfig struct {
// Keep alive interval.
Interval time.Duration `yaml:"interval" mapstructure:"interval"`

View File

@ -23,6 +23,7 @@ import (
type MockDynconfigInterface struct {
ctrl *gomock.Controller
recorder *MockDynconfigInterfaceMockRecorder
isgomock struct{}
}
// MockDynconfigInterfaceMockRecorder is the mock recorder for MockDynconfigInterface.
@ -261,6 +262,7 @@ func (mr *MockDynconfigInterfaceMockRecorder) Stop() *gomock.Call {
type MockObserver struct {
ctrl *gomock.Controller
recorder *MockObserverMockRecorder
isgomock struct{}
}
// MockObserverMockRecorder is the mock recorder for MockObserver.

View File

@ -19,6 +19,7 @@ import (
type MockJob struct {
ctrl *gomock.Controller
recorder *MockJobMockRecorder
isgomock struct{}
}
// MockJobMockRecorder is the mock recorder for MockJob.

View File

@ -37,10 +37,10 @@ type HostManager interface {
Load(context.Context, string) (*Host, bool)
// Store sets host.
Store(context.Context, *Host)
Store(context.Context, *Host) error
// Delete deletes host by a key.
Delete(context.Context, string)
Delete(context.Context, string) error
// LoadAll returns all hosts.
LoadAll(context.Context) ([]*Host, error)
@ -427,8 +427,8 @@ func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
}
// Store sets host.
func (t *hostManager) Store(ctx context.Context, host *Host) {
t.rdb.HSet(ctx,
func (t *hostManager) Store(ctx context.Context, host *Host) error {
_, err := t.rdb.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
@ -488,12 +488,15 @@ func (t *hostManager) Store(ctx context.Context, host *Host) {
"upload_count", host.UploadCount,
"upload_failed_count", host.UploadFailedCount,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339))
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result()
return err
}
// Delete deletes host by a key.
func (t *hostManager) Delete(ctx context.Context, hostID string) {
t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID))
func (t *hostManager) Delete(ctx context.Context, hostID string) error {
_, err := t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result()
return err
}
// LoadAll returns all hosts.

View File

@ -20,6 +20,7 @@ import (
type MockHostManager struct {
ctrl *gomock.Controller
recorder *MockHostManagerMockRecorder
isgomock struct{}
}
// MockHostManagerMockRecorder is the mock recorder for MockHostManager.
@ -40,9 +41,11 @@ func (m *MockHostManager) EXPECT() *MockHostManagerMockRecorder {
}
// Delete mocks base method.
func (m *MockHostManager) Delete(arg0 context.Context, arg1 string) {
func (m *MockHostManager) Delete(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Delete", arg0, arg1)
ret := m.ctrl.Call(m, "Delete", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Delete indicates an expected call of Delete.
@ -82,9 +85,11 @@ func (mr *MockHostManagerMockRecorder) LoadAll(arg0 any) *gomock.Call {
}
// Store mocks base method.
func (m *MockHostManager) Store(arg0 context.Context, arg1 *Host) {
func (m *MockHostManager) Store(arg0 context.Context, arg1 *Host) error {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Store", arg0, arg1)
ret := m.ctrl.Call(m, "Store", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Store indicates an expected call of Store.

View File

@ -22,14 +22,20 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/bits-and-blooms/bitset"
"github.com/redis/go-redis/v9"
redis "github.com/redis/go-redis/v9"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
logger "d7y.io/dragonfly/v2/internal/dflog"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
"d7y.io/dragonfly/v2/scheduler/config"
)
@ -46,6 +52,12 @@ type PeerManager interface {
// LoadAll returns all peers.
LoadAll(context.Context) ([]*Peer, error)
// LoadAllByTaskID returns all peers by task id.
LoadAllByTaskID(context.Context, string) ([]*Peer, error)
// DeleteAllByTaskID deletes all peers by task id.
DeleteAllByTaskID(context.Context, string) error
}
// peerManager contains content for peer manager.
@ -61,11 +73,14 @@ type peerManager struct {
// Redis universal client interface.
rdb redis.UniversalClient
// transportCredentials is used to mTLS for peer grpc connection.
transportCredentials credentials.TransportCredentials
}
// New peer manager interface.
func newPeerManager(cfg *config.Config, rdb redis.UniversalClient, taskManager TaskManager, hostManager HostManager) PeerManager {
return &peerManager{config: cfg, rdb: rdb, taskManager: taskManager, hostManager: hostManager}
func newPeerManager(cfg *config.Config, rdb redis.UniversalClient, taskManager TaskManager, hostManager HostManager, transportCredentials credentials.TransportCredentials) PeerManager {
return &peerManager{config: cfg, rdb: rdb, taskManager: taskManager, hostManager: hostManager, transportCredentials: transportCredentials}
}
// Load returns persistent cache peer by a key.
@ -157,7 +172,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
if _, err := p.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
// Store peer information and set expiration.
pipe.HSet(ctx,
if _, err := pipe.HSet(ctx,
pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID),
"id", peer.ID,
"persistent", peer.Persistent,
@ -168,15 +183,33 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
"host_id", peer.Host.ID,
"ttl", peer.Cost,
"created_at", peer.CreatedAt.Format(time.RFC3339),
"updated_at", peer.UpdatedAt.Format(time.RFC3339))
pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), peer.Task.TTL)
"updated_at", peer.UpdatedAt.Format(time.RFC3339)).Result(); err != nil {
peer.Log.Errorf("store peer failed: %v", err)
return err
}
// Store the association with task and set expiration.
pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID)
pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.Task.TTL)
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), peer.Task.TTL).Result(); err != nil {
peer.Log.Errorf("set peer ttl failed: %v", err)
return err
}
// Store the joint-set with task and set expiration.
if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Task.ID), peer.ID).Result(); err != nil {
peer.Log.Errorf("add peer id to task joint-set failed: %v", err)
return err
}
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.Task.TTL).Result(); err != nil {
peer.Log.Errorf("set task joint-set ttl failed: %v", err)
return err
}
// Store the joint-set with host.
if _, err := pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.ID).Result(); err != nil {
peer.Log.Errorf("add peer id to host joint-set failed: %v", err)
return err
}
// Store the association with host.
pipe.SAdd(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, peer.Host.ID), peer.ID)
return nil
}); err != nil {
peer.Log.Errorf("store peer failed: %v", err)
@ -195,9 +228,21 @@ func (p *peerManager) Delete(ctx context.Context, peerID string) error {
return errors.New("getting peer failed from redis")
}
pipe.Del(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID))
pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID)
pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID)
if _, err := pipe.Del(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peerID)).Result(); err != nil {
log.Errorf("delete peer failed: %v", err)
return err
}
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["task_id"]), peerID).Result(); err != nil {
log.Errorf("delete peer id from task joint-set failed: %v", err)
return err
}
if _, err := pipe.SRem(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheHostInScheduler(p.config.Manager.SchedulerClusterID, rawPeer["host_id"]), peerID).Result(); err != nil {
log.Errorf("delete peer id from host joint-set failed: %v", err)
return err
}
return nil
}); err != nil {
log.Errorf("store peer failed: %v", err)
@ -243,3 +288,58 @@ func (p *peerManager) LoadAll(ctx context.Context) ([]*Peer, error) {
return peers, nil
}
// LoadAllByTaskID returns all persistent cache peers by task id.
func (p *peerManager) LoadAllByTaskID(ctx context.Context, taskID string) ([]*Peer, error) {
log := logger.WithTaskID(taskID)
peerIDs, err := p.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(p.config.Manager.SchedulerClusterID, taskID)).Result()
if err != nil {
log.Error("get peer ids failed")
return nil, err
}
peers := make([]*Peer, 0, len(peerIDs))
for _, peerID := range peerIDs {
peer, loaded := p.Load(ctx, peerID)
if !loaded {
log.Errorf("load peer %s failed", peerID)
continue
}
peers = append(peers, peer)
}
return peers, nil
}
// DeleteAllByTaskID deletes all persistent cache peers by task id.
func (p *peerManager) DeleteAllByTaskID(ctx context.Context, taskID string) error {
log := logger.WithTaskID(taskID)
peers, err := p.LoadAllByTaskID(ctx, taskID)
if err != nil {
log.Error("load peers failed")
return err
}
for _, peer := range peers {
addr := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port)
client, err := dfdaemonclient.GetV2ByAddr(ctx, addr, grpc.WithTransportCredentials(p.transportCredentials))
if err != nil {
log.Errorf("get dfdaemon client failed: %v", err)
continue
}
if err := client.DeletePersistentCacheTask(ctx, &dfdaemonv2.DeletePersistentCacheTaskRequest{TaskId: taskID}); err != nil {
log.Errorf("delete task %s failed", taskID)
continue
}
if err := p.Delete(ctx, peer.ID); err != nil {
log.Errorf("delete peer %s failed", peer.ID)
continue
}
}
p.taskManager.Delete(ctx, taskID)
return nil
}

View File

@ -20,6 +20,7 @@ import (
type MockPeerManager struct {
ctrl *gomock.Controller
recorder *MockPeerManagerMockRecorder
isgomock struct{}
}
// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager.
@ -53,6 +54,20 @@ func (mr *MockPeerManagerMockRecorder) Delete(arg0, arg1 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockPeerManager)(nil).Delete), arg0, arg1)
}
// DeleteAllByTaskID mocks base method.
func (m *MockPeerManager) DeleteAllByTaskID(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteAllByTaskID", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteAllByTaskID indicates an expected call of DeleteAllByTaskID.
func (mr *MockPeerManagerMockRecorder) DeleteAllByTaskID(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteAllByTaskID", reflect.TypeOf((*MockPeerManager)(nil).DeleteAllByTaskID), arg0, arg1)
}
// Load mocks base method.
func (m *MockPeerManager) Load(arg0 context.Context, arg1 string) (*Peer, bool) {
m.ctrl.T.Helper()
@ -83,6 +98,21 @@ func (mr *MockPeerManagerMockRecorder) LoadAll(arg0 any) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockPeerManager)(nil).LoadAll), arg0)
}
// LoadAllByTaskID mocks base method.
func (m *MockPeerManager) LoadAllByTaskID(arg0 context.Context, arg1 string) ([]*Peer, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LoadAllByTaskID", arg0, arg1)
ret0, _ := ret[0].([]*Peer)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// LoadAllByTaskID indicates an expected call of LoadAllByTaskID.
func (mr *MockPeerManagerMockRecorder) LoadAllByTaskID(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAllByTaskID", reflect.TypeOf((*MockPeerManager)(nil).LoadAllByTaskID), arg0, arg1)
}
// Store mocks base method.
func (m *MockPeerManager) Store(arg0 context.Context, arg1 *Peer) error {
m.ctrl.T.Helper()

View File

@ -19,7 +19,8 @@
package persistentcache
import (
"github.com/redis/go-redis/v9"
redis "github.com/redis/go-redis/v9"
"google.golang.org/grpc/credentials"
"d7y.io/dragonfly/v2/scheduler/config"
)
@ -49,10 +50,10 @@ type resource struct {
}
// New returns Resource interface.
func New(cfg *config.Config, rdb redis.UniversalClient) Resource {
func New(cfg *config.Config, rdb redis.UniversalClient, transportCredentials credentials.TransportCredentials) Resource {
taskManager := newTaskManager(cfg, rdb)
hostManager := newHostManager(cfg, rdb)
peerManager := newPeerManager(cfg, rdb, taskManager, hostManager)
peerManager := newPeerManager(cfg, rdb, taskManager, hostManager, transportCredentials)
return &resource{peerManager, taskManager, hostManager}
}

View File

@ -19,6 +19,7 @@ import (
type MockResource struct {
ctrl *gomock.Controller
recorder *MockResourceMockRecorder
isgomock struct{}
}
// MockResourceMockRecorder is the mock recorder for MockResource.

View File

@ -40,7 +40,7 @@ type TaskManager interface {
Store(context.Context, *Task) error
// Delete deletes persistent cache task by a key.
Delete(context.Context, string)
Delete(context.Context, string) error
// LoadAll returns all persistent cache tasks.
LoadAll(context.Context) ([]*Task, error)
@ -147,7 +147,7 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) {
// Store sets persistent cache task.
func (t *taskManager) Store(ctx context.Context, task *Task) error {
if _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.HSet(ctx,
if _, err := pipe.HSet(ctx,
pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID),
"id", task.ID,
"persistent_replica_count", task.PersistentReplicaCount,
@ -161,9 +161,16 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error {
"state", task.FSM.Current(),
"ttl", task.TTL,
"created_at", task.CreatedAt.Format(time.RFC3339),
"updated_at", task.UpdatedAt.Format(time.RFC3339))
"updated_at", task.UpdatedAt.Format(time.RFC3339)).Result(); err != nil {
task.Log.Errorf("store task failed: %v", err)
return err
}
if _, err := pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL).Result(); err != nil {
task.Log.Errorf("set task ttl failed: %v", err)
return err
}
pipe.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL)
return nil
}); err != nil {
task.Log.Errorf("store task failed: %v", err)
@ -174,8 +181,9 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error {
}
// Delete deletes persistent cache task by a key.
func (t *taskManager) Delete(ctx context.Context, taskID string) {
t.rdb.Del(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID))
func (t *taskManager) Delete(ctx context.Context, taskID string) error {
_, err := t.rdb.Del(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result()
return err
}
// LoadAll returns all persistent cache tasks.

View File

@ -20,6 +20,7 @@ import (
type MockTaskManager struct {
ctrl *gomock.Controller
recorder *MockTaskManagerMockRecorder
isgomock struct{}
}
// MockTaskManagerMockRecorder is the mock recorder for MockTaskManager.
@ -40,9 +41,11 @@ func (m *MockTaskManager) EXPECT() *MockTaskManagerMockRecorder {
}
// Delete mocks base method.
func (m *MockTaskManager) Delete(arg0 context.Context, arg1 string) {
func (m *MockTaskManager) Delete(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Delete", arg0, arg1)
ret := m.ctrl.Call(m, "Delete", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// Delete indicates an expected call of Delete.

View File

@ -20,6 +20,7 @@ import (
type MockHostManager struct {
ctrl *gomock.Controller
recorder *MockHostManagerMockRecorder
isgomock struct{}
}
// MockHostManagerMockRecorder is the mock recorder for MockHostManager.

View File

@ -19,6 +19,7 @@ import (
type MockPeerManager struct {
ctrl *gomock.Controller
recorder *MockPeerManagerMockRecorder
isgomock struct{}
}
// MockPeerManagerMockRecorder is the mock recorder for MockPeerManager.

View File

@ -19,6 +19,7 @@ import (
type MockResource struct {
ctrl *gomock.Controller
recorder *MockResourceMockRecorder
isgomock struct{}
}
// MockResourceMockRecorder is the mock recorder for MockResource.

View File

@ -26,6 +26,7 @@ import (
type MockSeedPeerClient struct {
ctrl *gomock.Controller
recorder *MockSeedPeerClientMockRecorder
isgomock struct{}
}
// MockSeedPeerClientMockRecorder is the mock recorder for MockSeedPeerClient.

View File

@ -23,6 +23,7 @@ import (
type MockSeedPeer struct {
ctrl *gomock.Controller
recorder *MockSeedPeerMockRecorder
isgomock struct{}
}
// MockSeedPeerMockRecorder is the mock recorder for MockSeedPeer.

View File

@ -19,6 +19,7 @@ import (
type MockTaskManager struct {
ctrl *gomock.Controller
recorder *MockTaskManagerMockRecorder
isgomock struct{}
}
// MockTaskManagerMockRecorder is the mock recorder for MockTaskManager.

View File

@ -158,32 +158,42 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
// Initialize GC.
s.gc = gc.New(gc.WithLogger(logger.GCLogger))
// Initialize client transport credentials.
clientTransportCredentials := rpc.NewInsecureCredentials()
// Initialize seed peer client transport credentials.
seedPeerClientTransportCredentials := rpc.NewInsecureCredentials()
if cfg.SeedPeer.TLS != nil {
clientTransportCredentials, err = rpc.NewClientCredentials(cfg.SeedPeer.TLS.CACert, cfg.SeedPeer.TLS.Cert, cfg.SeedPeer.TLS.Key)
seedPeerClientTransportCredentials, err = rpc.NewClientCredentials(cfg.SeedPeer.TLS.CACert, cfg.SeedPeer.TLS.Cert, cfg.SeedPeer.TLS.Key)
if err != nil {
logger.Errorf("failed to create client credentials: %v", err)
logger.Errorf("failed to create seed peer client credentials: %v", err)
return nil, err
}
}
// Initialize dynconfig.
dynconfig, err := config.NewDynconfig(s.managerClient, filepath.Join(d.CacheDir(), dynconfig.CacheDirName), cfg, clientTransportCredentials)
dynconfig, err := config.NewDynconfig(s.managerClient, filepath.Join(d.CacheDir(), dynconfig.CacheDirName), cfg, seedPeerClientTransportCredentials)
if err != nil {
return nil, err
}
s.dynconfig = dynconfig
// Initialize resource.
resource, err := standard.New(cfg, s.gc, dynconfig, clientTransportCredentials)
resource, err := standard.New(cfg, s.gc, dynconfig, seedPeerClientTransportCredentials)
if err != nil {
return nil, err
}
s.resource = resource
// Initialize seed peer client transport credentials.
peerClientTransportCredentials := rpc.NewInsecureCredentials()
if cfg.Peer.TLS != nil {
peerClientTransportCredentials, err = rpc.NewClientCredentials(cfg.Peer.TLS.CACert, cfg.Peer.TLS.Cert, cfg.Peer.TLS.Key)
if err != nil {
logger.Errorf("failed to create peer client credentials: %v", err)
return nil, err
}
}
// Initialize persistent cache resource.
s.persistentCacheResource = persistentcache.New(cfg, rdb)
s.persistentCacheResource = persistentcache.New(cfg, rdb, peerClientTransportCredentials)
// Initialize job service.
if cfg.Job.Enable && rdb != nil {

View File

@ -22,6 +22,7 @@ import (
type MockScheduling struct {
ctrl *gomock.Controller
recorder *MockSchedulingMockRecorder
isgomock struct{}
}
// MockSchedulingMockRecorder is the mock recorder for MockScheduling.

View File

@ -1545,8 +1545,7 @@ func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatP
}, nil
}
// TODO Implement the following methods.
// DeletePersistentCacheTask releases persistent cache task in scheduler.
func (v *V2) DeletePersistentCacheTask(ctx context.Context, req *schedulerv2.DeletePersistentCacheTaskRequest) error {
return nil
return v.persistentCacheResource.PeerManager().DeleteAllByTaskID(ctx, req.GetTaskId())
}

View File

@ -21,6 +21,7 @@ import (
type MockStorage struct {
ctrl *gomock.Controller
recorder *MockStorageMockRecorder
isgomock struct{}
}
// MockStorageMockRecorder is the mock recorder for MockStorage.