feat: add persistent cache task for scheduler (#3545)
Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
7253f0fc63
commit
53f5e9c79c
|
|
@ -141,6 +141,12 @@ func WithTask(taskID, url string) *SugaredLoggerOnWith {
|
|||
}
|
||||
}
|
||||
|
||||
func WithPersistentCacheTask(taskID string) *SugaredLoggerOnWith {
|
||||
return &SugaredLoggerOnWith{
|
||||
withArgs: []any{"taskID", taskID},
|
||||
}
|
||||
}
|
||||
|
||||
func WithHost(hostID, hostname, ip string) *SugaredLoggerOnWith {
|
||||
return &SugaredLoggerOnWith{
|
||||
withArgs: []any{"hostID", hostID, "hostname", hostname, "ip", ip},
|
||||
|
|
|
|||
|
|
@ -41,6 +41,18 @@ const (
|
|||
// SchedulersNamespace prefix of schedulers namespace cache key.
|
||||
SchedulersNamespace = "schedulers"
|
||||
|
||||
// SchedulerClustersNamespace prefix of scheduler clusters namespace cache key.
|
||||
SchedulerClustersNamespace = "scheduler-clusters"
|
||||
|
||||
// TasksNamespace prefix of tasks namespace cache key.
|
||||
PersistentCacheTasksNamespace = "persistent-cache-tasks"
|
||||
|
||||
// PersistentCachePeersNamespace prefix of persistent cache peers namespace cache key.
|
||||
PersistentCachePeersNamespace = "persistent-cache-peers"
|
||||
|
||||
// PersistentCacheHostsNamespace prefix of persistent cache hosts namespace cache key.
|
||||
PersistentCacheHostsNamespace = "persistent-cache-hosts"
|
||||
|
||||
// ApplicationsNamespace prefix of applications namespace cache key.
|
||||
ApplicationsNamespace = "applications"
|
||||
|
||||
|
|
@ -137,6 +149,35 @@ func MakeKeyInScheduler(namespace, id string) string {
|
|||
return fmt.Sprintf("%s:%s", MakeNamespaceKeyInScheduler(namespace), id)
|
||||
}
|
||||
|
||||
// MakeSchedulerClusterKeyInManager make scheduler cluster key in manager.
|
||||
func MakePersistentCacheTaskKeyInScheduler(schedulerClusterID uint, taskID string) string {
|
||||
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID))
|
||||
}
|
||||
|
||||
// MakePersistentCacheTasksInScheduler make persistent cache tasks in scheduler.
|
||||
func MakePersistentCacheTasksInScheduler(schedulerClusterID uint) string {
|
||||
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheTasksNamespace))
|
||||
}
|
||||
|
||||
// MakePersistentCachePeerKeyInScheduler make persistent cache peer key in scheduler.
|
||||
func MakePersistentCachePeerKeyInScheduler(schedulerClusterID uint, peerID string) string {
|
||||
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCachePeersNamespace, peerID))
|
||||
}
|
||||
|
||||
func MakePersistentCachePeersInScheduler(schedulerClusterID uint) string {
|
||||
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCachePeersNamespace))
|
||||
}
|
||||
|
||||
// MakePersistentCacheHostKeyInScheduler make persistent cache host key in scheduler.
|
||||
func MakePersistentCacheHostKeyInScheduler(schedulerClusterID uint, hostID string) string {
|
||||
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheHostsNamespace, hostID))
|
||||
}
|
||||
|
||||
// MakePersistentCacheHostsInScheduler make persistent cache hosts in scheduler.
|
||||
func MakePersistentCacheHostsInScheduler(schedulerClusterID uint) string {
|
||||
return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s", schedulerClusterID, PersistentCacheHostsNamespace))
|
||||
}
|
||||
|
||||
// MakeNetworkTopologyKeyInScheduler make network topology key in scheduler.
|
||||
func MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID string) string {
|
||||
return MakeKeyInScheduler(NetworkTopologyNamespace, fmt.Sprintf("%s:%s", srcHostID, destHostID))
|
||||
|
|
|
|||
|
|
@ -0,0 +1,144 @@
|
|||
/*
|
||||
* Copyright 2024 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package persistentcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/looplab/fsm"
|
||||
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/digest"
|
||||
)
|
||||
|
||||
const (
|
||||
// Task has been created but did not start uploading.
|
||||
TaskStatePending = "Pending"
|
||||
|
||||
// Task is uploading resources for p2p cluster.
|
||||
TaskStateUploading = "Uploading"
|
||||
|
||||
// Task has been uploaded successfully.
|
||||
TaskStateSucceeded = "Succeeded"
|
||||
|
||||
// Task has been uploaded failed.
|
||||
TaskStateFailed = "Failed"
|
||||
)
|
||||
|
||||
const (
|
||||
// Task is uploading.
|
||||
TaskEventUpload = "Upload"
|
||||
|
||||
// Task uploaded successfully.
|
||||
TaskEventUploadSucceeded = "UploadSucceeded"
|
||||
|
||||
// Task uploaded failed.
|
||||
TaskEventUploadFailed = "UploadFailed"
|
||||
)
|
||||
|
||||
// Task contains content for persistent cache task.
|
||||
type Task struct {
|
||||
// ID is task id.
|
||||
ID string
|
||||
|
||||
// Replica count of the persistent cache task. The persistent cache task will
|
||||
// not be deleted when dfdamon runs garbage collection. It only be deleted
|
||||
// when the task is deleted by the user.
|
||||
PersistentReplicaCount uint64
|
||||
|
||||
// Replica count of the cache task. If cache task is not persistent,
|
||||
// the persistent cache task will be deleted when dfdaemon runs garbage collection.
|
||||
ReplicaCount uint64
|
||||
|
||||
// Digest of the persistent cache task content, for example md5:xxx or sha256:yyy.
|
||||
Digest *digest.Digest
|
||||
|
||||
// Tag is used to distinguish different persistent cache tasks.
|
||||
Tag string
|
||||
|
||||
// Application of persistent cache task.
|
||||
Application string
|
||||
|
||||
// Persistet cache task piece length.
|
||||
PieceLength int32
|
||||
|
||||
// ContentLength is persistent cache task total content length.
|
||||
ContentLength int64
|
||||
|
||||
// TotalPieceCount is total piece count.
|
||||
TotalPieceCount int32
|
||||
|
||||
// Persistent cache task state machine.
|
||||
FSM *fsm.FSM
|
||||
|
||||
// TTL is persistent cache task time to live.
|
||||
TTL time.Duration
|
||||
|
||||
// CreatedAt is persistent cache task create time.
|
||||
CreatedAt time.Time
|
||||
|
||||
// UpdatedAt is persistent cache task update time.
|
||||
UpdatedAt time.Time
|
||||
|
||||
// Persistent cache task log.
|
||||
Log *logger.SugaredLoggerOnWith
|
||||
}
|
||||
|
||||
// New persistent cache task instance.
|
||||
func NewTask(id, tag, application, state string, persistentReplicaCount uint64, replicaCount uint64, pieceLength int32,
|
||||
contentLength int64, totalPieceCount int32, digest *digest.Digest, ttl time.Duration, createdAt, updatedAt time.Time,
|
||||
log *logger.SugaredLoggerOnWith) *Task {
|
||||
t := &Task{
|
||||
ID: id,
|
||||
PersistentReplicaCount: persistentReplicaCount,
|
||||
ReplicaCount: replicaCount,
|
||||
Digest: digest,
|
||||
Tag: tag,
|
||||
Application: application,
|
||||
ContentLength: contentLength,
|
||||
TotalPieceCount: totalPieceCount,
|
||||
TTL: time.Hour * 24,
|
||||
CreatedAt: createdAt,
|
||||
UpdatedAt: updatedAt,
|
||||
Log: logger.WithPersistentCacheTask(id),
|
||||
}
|
||||
|
||||
// Initialize state machine.
|
||||
t.FSM = fsm.NewFSM(
|
||||
TaskStatePending,
|
||||
fsm.Events{
|
||||
{Name: TaskEventUpload, Src: []string{TaskStatePending, TaskStateFailed}, Dst: TaskStateUploading},
|
||||
{Name: TaskEventUploadSucceeded, Src: []string{TaskStateUploading}, Dst: TaskStateSucceeded},
|
||||
{Name: TaskEventUploadFailed, Src: []string{TaskStateUploading}, Dst: TaskStateFailed},
|
||||
},
|
||||
fsm.Callbacks{
|
||||
TaskEventUpload: func(ctx context.Context, e *fsm.Event) {
|
||||
t.Log.Infof("task state is %s", e.FSM.Current())
|
||||
},
|
||||
TaskEventUploadSucceeded: func(ctx context.Context, e *fsm.Event) {
|
||||
t.Log.Infof("task state is %s", e.FSM.Current())
|
||||
},
|
||||
TaskEventUploadFailed: func(ctx context.Context, e *fsm.Event) {
|
||||
t.Log.Infof("task state is %s", e.FSM.Current())
|
||||
},
|
||||
},
|
||||
)
|
||||
t.FSM.SetState(state)
|
||||
|
||||
return t
|
||||
}
|
||||
|
|
@ -0,0 +1,215 @@
|
|||
/*
|
||||
* Copyright 2024 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
//go:generate mockgen -destination task_manager_mock.go -source task_manager.go -package persistentcache
|
||||
|
||||
package persistentcache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/digest"
|
||||
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
|
||||
"d7y.io/dragonfly/v2/scheduler/config"
|
||||
)
|
||||
|
||||
// TaskManager is the interface used for persistent cache task manager.
|
||||
type TaskManager interface {
|
||||
// Load returns persistent cache task for a key.
|
||||
Load(context.Context, string) (*Task, bool)
|
||||
|
||||
// Store sets persistent cache task.
|
||||
Store(context.Context, *Task) error
|
||||
|
||||
// Delete deletes persistent cache task for a key.
|
||||
Delete(context.Context, string)
|
||||
|
||||
// LoadAll returns all persistent cache tasks.
|
||||
LoadAll(context.Context) ([]*Task, error)
|
||||
}
|
||||
|
||||
// taskManager contains content for persistent cache task manager.
|
||||
type taskManager struct {
|
||||
// Config is scheduler config.
|
||||
config *config.Config
|
||||
|
||||
// Redis universal client interface.
|
||||
rdb redis.UniversalClient
|
||||
}
|
||||
|
||||
// TODO: Use newTaskManager for resource management.
|
||||
// New persistent cache task manager interface.
|
||||
// nolint
|
||||
func newTaskManager(cfg *config.Config, rdb redis.UniversalClient) TaskManager {
|
||||
return &taskManager{config: cfg, rdb: rdb}
|
||||
}
|
||||
|
||||
// Load returns persistent cache task for a key.
|
||||
func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) {
|
||||
rawTask, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID)).Result()
|
||||
if err != nil {
|
||||
fmt.Println("getting task failed from Redis:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Set integer fields from raw task.
|
||||
persistentReplicaCount, err := strconv.ParseUint(rawTask["persistent_replica_count"], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Println("parsing persistent replica count failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
replicaCount, err := strconv.ParseUint(rawTask["replica_count"], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Println("parsing replica count failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
pieceLength, err := strconv.ParseInt(rawTask["piece_length"], 10, 32)
|
||||
if err != nil {
|
||||
fmt.Println("parsing piece length failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
contentLength, err := strconv.ParseInt(rawTask["content_length"], 10, 64)
|
||||
if err != nil {
|
||||
fmt.Println("parsing content length failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
totalPieceCount, err := strconv.ParseInt(rawTask["total_piece_count"], 10, 32)
|
||||
if err != nil {
|
||||
fmt.Println("parsing total piece count failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Set time fields from raw task.
|
||||
ttl, err := strconv.Atoi(rawTask["ttl"])
|
||||
if err != nil {
|
||||
fmt.Println("parsing ttl failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
createdAt, err := time.Parse(time.RFC3339, rawTask["created_at"])
|
||||
if err != nil {
|
||||
fmt.Println("parsing created at failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
updatedAt, err := time.Parse(time.RFC3339, rawTask["updated_at"])
|
||||
if err != nil {
|
||||
fmt.Println("parsing updated at failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Set digest from raw task.
|
||||
digest, err := digest.Parse(rawTask["digest"])
|
||||
if err != nil {
|
||||
fmt.Println("parsing digest failed:", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return NewTask(
|
||||
rawTask["id"],
|
||||
rawTask["tag"],
|
||||
rawTask["application"],
|
||||
rawTask["state"],
|
||||
persistentReplicaCount,
|
||||
replicaCount,
|
||||
int32(pieceLength),
|
||||
contentLength,
|
||||
int32(totalPieceCount),
|
||||
digest,
|
||||
time.Duration(ttl),
|
||||
createdAt,
|
||||
updatedAt,
|
||||
logger.WithPersistentCacheTask(rawTask["id"]),
|
||||
), true
|
||||
}
|
||||
|
||||
// Store sets task persistent cache task.
|
||||
func (t *taskManager) Store(ctx context.Context, task *Task) error {
|
||||
_, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||
t.rdb.HSet(ctx,
|
||||
pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID),
|
||||
"id", task.ID,
|
||||
"persistent_replica_count", task.PersistentReplicaCount,
|
||||
"replica_count", task.ReplicaCount,
|
||||
"digest", task.Digest.String(),
|
||||
"tag", task.Tag,
|
||||
"application", task.Application,
|
||||
"piece_length", task.PieceLength,
|
||||
"content_length", task.ContentLength,
|
||||
"total_piece_count", task.TotalPieceCount,
|
||||
"state", TaskStatePending,
|
||||
"ttl", task.TTL,
|
||||
"created_at", task.CreatedAt.Format(time.RFC3339),
|
||||
"updated_at", task.UpdatedAt.Format(time.RFC3339))
|
||||
|
||||
t.rdb.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL)
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete deletes persistent cache task for a key.
|
||||
func (t *taskManager) Delete(ctx context.Context, taskID string) {
|
||||
t.rdb.Del(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, taskID))
|
||||
}
|
||||
|
||||
// LoadAll returns all persistent cache tasks.
|
||||
func (t *taskManager) LoadAll(ctx context.Context) ([]*Task, error) {
|
||||
var (
|
||||
tasks []*Task
|
||||
cursor uint64
|
||||
)
|
||||
|
||||
for {
|
||||
var (
|
||||
taskKeys []string
|
||||
err error
|
||||
)
|
||||
|
||||
taskKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheTasksInScheduler(t.config.Manager.SchedulerClusterID), 10).Result()
|
||||
if err != nil {
|
||||
logger.Warn("scan tasks failed")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, taskKey := range taskKeys {
|
||||
task, loaded := t.Load(ctx, taskKey)
|
||||
if !loaded {
|
||||
logger.WithTaskID(taskKey).Warn("load task failed")
|
||||
continue
|
||||
}
|
||||
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
|
||||
if cursor == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
// Code generated by MockGen. DO NOT EDIT.
|
||||
// Source: task_manager.go
|
||||
//
|
||||
// Generated by this command:
|
||||
//
|
||||
// mockgen -destination task_manager_mock.go -source task_manager.go -package persistentcache
|
||||
//
|
||||
|
||||
// Package persistentcache is a generated GoMock package.
|
||||
package persistentcache
|
||||
|
||||
import (
|
||||
context "context"
|
||||
reflect "reflect"
|
||||
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
// MockTaskManager is a mock of TaskManager interface.
|
||||
type MockTaskManager struct {
|
||||
ctrl *gomock.Controller
|
||||
recorder *MockTaskManagerMockRecorder
|
||||
}
|
||||
|
||||
// MockTaskManagerMockRecorder is the mock recorder for MockTaskManager.
|
||||
type MockTaskManagerMockRecorder struct {
|
||||
mock *MockTaskManager
|
||||
}
|
||||
|
||||
// NewMockTaskManager creates a new mock instance.
|
||||
func NewMockTaskManager(ctrl *gomock.Controller) *MockTaskManager {
|
||||
mock := &MockTaskManager{ctrl: ctrl}
|
||||
mock.recorder = &MockTaskManagerMockRecorder{mock}
|
||||
return mock
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockTaskManager) EXPECT() *MockTaskManagerMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// Delete mocks base method.
|
||||
func (m *MockTaskManager) Delete(arg0 context.Context, arg1 string) {
|
||||
m.ctrl.T.Helper()
|
||||
m.ctrl.Call(m, "Delete", arg0, arg1)
|
||||
}
|
||||
|
||||
// Delete indicates an expected call of Delete.
|
||||
func (mr *MockTaskManagerMockRecorder) Delete(arg0, arg1 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockTaskManager)(nil).Delete), arg0, arg1)
|
||||
}
|
||||
|
||||
// Load mocks base method.
|
||||
func (m *MockTaskManager) Load(arg0 context.Context, arg1 string) (*Task, bool) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Load", arg0, arg1)
|
||||
ret0, _ := ret[0].(*Task)
|
||||
ret1, _ := ret[1].(bool)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// Load indicates an expected call of Load.
|
||||
func (mr *MockTaskManagerMockRecorder) Load(arg0, arg1 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockTaskManager)(nil).Load), arg0, arg1)
|
||||
}
|
||||
|
||||
// LoadAll mocks base method.
|
||||
func (m *MockTaskManager) LoadAll(arg0 context.Context) ([]*Task, error) {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "LoadAll", arg0)
|
||||
ret0, _ := ret[0].([]*Task)
|
||||
ret1, _ := ret[1].(error)
|
||||
return ret0, ret1
|
||||
}
|
||||
|
||||
// LoadAll indicates an expected call of LoadAll.
|
||||
func (mr *MockTaskManagerMockRecorder) LoadAll(arg0 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockTaskManager)(nil).LoadAll), arg0)
|
||||
}
|
||||
|
||||
// Store mocks base method.
|
||||
func (m *MockTaskManager) Store(arg0 context.Context, arg1 *Task) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Store", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Store indicates an expected call of Store.
|
||||
func (mr *MockTaskManagerMockRecorder) Store(arg0, arg1 any) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Store", reflect.TypeOf((*MockTaskManager)(nil).Store), arg0, arg1)
|
||||
}
|
||||
|
|
@ -19,7 +19,6 @@
|
|||
package standard
|
||||
|
||||
import (
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
|
|
@ -64,9 +63,6 @@ type resource struct {
|
|||
// Scheduler config.
|
||||
config *config.Config
|
||||
|
||||
// Redis universal client interface.
|
||||
rdb redis.UniversalClient
|
||||
|
||||
// TransportCredentials stores the Authenticator required to setup a client connection.
|
||||
transportCredentials credentials.TransportCredentials
|
||||
}
|
||||
|
|
@ -82,13 +78,6 @@ func WithTransportCredentials(creds credentials.TransportCredentials) Option {
|
|||
}
|
||||
}
|
||||
|
||||
// WithRedisClient returns a Option which configures the redis client.
|
||||
func WithRedisClient(rdb redis.UniversalClient) Option {
|
||||
return func(r *resource) {
|
||||
r.rdb = rdb
|
||||
}
|
||||
}
|
||||
|
||||
// New returns Resource interface.
|
||||
func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, options ...Option) (Resource, error) {
|
||||
resource := &resource{config: cfg}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ import (
|
|||
|
||||
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
|
||||
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
|
||||
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
|
||||
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/container/set"
|
||||
|
|
@ -529,32 +528,3 @@ func (t *Task) ReportPieceResultToPeers(peerPacket *schedulerv1.PeerPacket, even
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AnnouncePeers announces all peers in the task with the state code.
|
||||
// Used only in v2 version of the grpc.
|
||||
func (t *Task) AnnouncePeers(resp *schedulerv2.AnnouncePeerResponse, event string) {
|
||||
for _, vertex := range t.DAG.GetVertices() {
|
||||
peer := vertex.Value
|
||||
if peer == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if peer.FSM.Is(PeerStateRunning) {
|
||||
stream, loaded := peer.LoadAnnouncePeerStream()
|
||||
if !loaded {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := stream.Send(resp); err != nil {
|
||||
t.Log.Errorf("send response to peer %s failed: %s", peer.ID, err.Error())
|
||||
continue
|
||||
}
|
||||
t.Log.Infof("task announces peer %s response %#v", peer.ID, resp.Response)
|
||||
|
||||
if err := peer.FSM.Event(context.Background(), event); err != nil {
|
||||
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,8 +28,6 @@ import (
|
|||
commonv2 "d7y.io/api/v2/pkg/apis/common/v2"
|
||||
schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1"
|
||||
v1mocks "d7y.io/api/v2/pkg/apis/scheduler/v1/mocks"
|
||||
schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2"
|
||||
v2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks"
|
||||
|
||||
"d7y.io/dragonfly/v2/pkg/container/set"
|
||||
"d7y.io/dragonfly/v2/pkg/digest"
|
||||
|
|
@ -1578,86 +1576,3 @@ func TestTask_ReportPieceResultToPeers(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTask_AnnouncePeers(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
run func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder)
|
||||
}{
|
||||
{
|
||||
name: "peer state is PeerStatePending",
|
||||
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
|
||||
mockPeer.FSM.SetState(PeerStatePending)
|
||||
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.True(mockPeer.FSM.Is(PeerStatePending))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "peer state is PeerStateRunning and stream is empty",
|
||||
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
|
||||
mockPeer.FSM.SetState(PeerStateRunning)
|
||||
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.True(mockPeer.FSM.Is(PeerStateRunning))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "peer state is PeerStateRunning and stream sending failed",
|
||||
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
|
||||
mockPeer.FSM.SetState(PeerStateRunning)
|
||||
mockPeer.StoreAnnouncePeerStream(stream)
|
||||
ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1)
|
||||
|
||||
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.True(mockPeer.FSM.Is(PeerStateRunning))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "peer state is PeerStateRunning and state changing failed",
|
||||
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
|
||||
mockPeer.FSM.SetState(PeerStateRunning)
|
||||
mockPeer.StoreAnnouncePeerStream(stream)
|
||||
ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(errors.New("foo")).Times(1)
|
||||
|
||||
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.True(mockPeer.FSM.Is(PeerStateRunning))
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "peer state is PeerStateRunning and announce peer successfully",
|
||||
run: func(t *testing.T, task *Task, mockPeer *Peer, stream schedulerv2.Scheduler_AnnouncePeerServer, ms *v2mocks.MockScheduler_AnnouncePeerServerMockRecorder) {
|
||||
mockPeer.FSM.SetState(PeerStateRunning)
|
||||
mockPeer.StoreAnnouncePeerStream(stream)
|
||||
ms.Send(gomock.Eq(&schedulerv2.AnnouncePeerResponse{})).Return(nil).Times(1)
|
||||
|
||||
task.AnnouncePeers(&schedulerv2.AnnouncePeerResponse{}, PeerEventDownloadFailed)
|
||||
|
||||
assert := assert.New(t)
|
||||
assert.True(mockPeer.FSM.Is(PeerStateFailed))
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctl := gomock.NewController(t)
|
||||
defer ctl.Finish()
|
||||
stream := v2mocks.NewMockScheduler_AnnouncePeerServer(ctl)
|
||||
|
||||
mockHost := NewHost(
|
||||
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
|
||||
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
|
||||
task := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit)
|
||||
mockPeer := NewPeer(mockPeerID, mockResourceConfig, task, mockHost)
|
||||
task.StorePeer(mockPeer)
|
||||
tc.run(t, task, mockPeer, stream, stream.EXPECT())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -230,10 +230,6 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
|
|||
resourceOptions = append(resourceOptions, resource.WithTransportCredentials(clientTransportCredentials))
|
||||
}
|
||||
|
||||
if rdb != nil {
|
||||
resourceOptions = append(resourceOptions, resource.WithRedisClient(rdb))
|
||||
}
|
||||
|
||||
resource, err := resource.New(cfg, s.gc, dynconfig, resourceOptions...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
Loading…
Reference in New Issue