diff --git a/scheduler/resource/persistentcache/host.go b/scheduler/resource/persistentcache/host.go new file mode 100644 index 000000000..8d934af8e --- /dev/null +++ b/scheduler/resource/persistentcache/host.go @@ -0,0 +1,267 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package persistentcache + +import ( + "time" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/types" +) + +// Host contains content for host. +type Host struct { + // ID is host id. + ID string + + // Type is host type. + Type types.HostType + + // Hostname is host name. + Hostname string + + // IP is host ip. + IP string + + // Port is grpc service port. + Port int32 + + // DownloadPort is piece downloading port. + DownloadPort int32 + + // DisableShared is whether the host is disabled for + // shared with other peers. + DisableShared bool + + // Host OS. + OS string + + // Host platform. + Platform string + + // Host platform family. + PlatformFamily string + + // Host platform version. + PlatformVersion string + + // Host kernel version. + KernelVersion string + + // CPU Stat. + CPU CPU + + // Memory Stat. + Memory Memory + + // Network Stat. + Network Network + + // Dist Stat. + Disk Disk + + // Build information. + Build Build + + // AnnounceInterval is the interval between host announces to scheduler. + AnnounceInterval time.Duration + + // ConcurrentUploadLimit is concurrent upload limit count. + ConcurrentUploadLimit int32 + + // ConcurrentUploadCount is concurrent upload count. + ConcurrentUploadCount int32 + + // UploadCount is total upload count. + UploadCount int64 + + // UploadFailedCount is upload failed count. + UploadFailedCount int64 + + // CreatedAt is host create time. + CreatedAt time.Time + + // UpdatedAt is host update time. + UpdatedAt time.Time + + // Host log. + Log *logger.SugaredLoggerOnWith +} + +// CPU contains content for cpu. +type CPU struct { + // Number of logical cores in the system. + LogicalCount uint32 `csv:"logicalCount"` + + // Number of physical cores in the system. + PhysicalCount uint32 `csv:"physicalCount"` + + // Percent calculates the percentage of cpu used. + Percent float64 `csv:"percent"` + + // Calculates the percentage of cpu used by process. + ProcessPercent float64 `csv:"processPercent"` + + // Times contains the amounts of time the CPU has spent performing different kinds of work. + Times CPUTimes `csv:"times"` +} + +// CPUTimes contains content for cpu times. +type CPUTimes struct { + // CPU time of user. + User float64 `csv:"user"` + + // CPU time of system. + System float64 `csv:"system"` + + // CPU time of idle. + Idle float64 `csv:"idle"` + + // CPU time of nice. + Nice float64 `csv:"nice"` + + // CPU time of iowait. + Iowait float64 `csv:"iowait"` + + // CPU time of irq. + Irq float64 `csv:"irq"` + + // CPU time of softirq. + Softirq float64 `csv:"softirq"` + + // CPU time of steal. + Steal float64 `csv:"steal"` + + // CPU time of guest. + Guest float64 `csv:"guest"` + + // CPU time of guest nice. + GuestNice float64 `csv:"guestNice"` +} + +// Memory contains content for memory. +type Memory struct { + // Total amount of RAM on this system. + Total uint64 `csv:"total"` + + // RAM available for programs to allocate. + Available uint64 `csv:"available"` + + // RAM used by programs. + Used uint64 `csv:"used"` + + // Percentage of RAM used by programs. + UsedPercent float64 `csv:"usedPercent"` + + // Calculates the percentage of memory used by process. + ProcessUsedPercent float64 `csv:"processUsedPercent"` + + // This is the kernel's notion of free memory. + Free uint64 `csv:"free"` +} + +// Network contains content for network. +type Network struct { + // Return count of tcp connections opened and status is ESTABLISHED. + TCPConnectionCount uint32 `csv:"tcpConnectionCount"` + + // Return count of upload tcp connections opened and status is ESTABLISHED. + UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"` + + // Location path(area|country|province|city|...). + Location string `csv:"location"` + + // IDC where the peer host is located + IDC string `csv:"idc"` +} + +// Build contains content for build. +type Build struct { + // Git version. + GitVersion string `csv:"gitVersion"` + + // Git commit. + GitCommit string `csv:"gitCommit"` + + // Golang version. + GoVersion string `csv:"goVersion"` + + // Build platform. + Platform string `csv:"platform"` +} + +// Disk contains content for disk. +type Disk struct { + // Total amount of disk on the data path of dragonfly. + Total uint64 `csv:"total"` + + // Free amount of disk on the data path of dragonfly. + Free uint64 `csv:"free"` + + // Used amount of disk on the data path of dragonfly. + Used uint64 `csv:"used"` + + // Used percent of disk on the data path of dragonfly directory. + UsedPercent float64 `csv:"usedPercent"` + + // Total amount of indoes on the data path of dragonfly directory. + InodesTotal uint64 `csv:"inodesTotal"` + + // Used amount of indoes on the data path of dragonfly directory. + InodesUsed uint64 `csv:"inodesUsed"` + + // Free amount of indoes on the data path of dragonfly directory. + InodesFree uint64 `csv:"inodesFree"` + + // Used percent of indoes on the data path of dragonfly directory. + InodesUsedPercent float64 `csv:"inodesUsedPercent"` +} + +// New host instance. +func NewHost( + id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadLimit, concurrentUploadCount int32, + UploadCount, UploadFailedCount int64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk, + build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, +) *Host { + return &Host{ + ID: id, + Type: types.HostType(typ), + Hostname: hostname, + IP: ip, + Port: port, + DownloadPort: downloadPort, + DisableShared: disableShared, + OS: os, + Platform: platform, + PlatformFamily: platformFamily, + PlatformVersion: platformVersion, + KernelVersion: kernelVersion, + CPU: cpu, + Memory: memory, + Network: network, + Disk: disk, + Build: build, + AnnounceInterval: announceInterval, + ConcurrentUploadLimit: concurrentUploadLimit, + ConcurrentUploadCount: concurrentUploadCount, + UploadCount: UploadCount, + UploadFailedCount: UploadFailedCount, + CreatedAt: createdAt, + UpdatedAt: updatedAt, + Log: logger.WithHost(id, hostname, ip), + } +} diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go new file mode 100644 index 000000000..07b7634c3 --- /dev/null +++ b/scheduler/resource/persistentcache/host_manager.go @@ -0,0 +1,504 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination host_manager_mock.go -source host_manager.go -package persistentcache + +package persistentcache + +import ( + "context" + "fmt" + "strconv" + "time" + + "github.com/redis/go-redis/v9" + + logger "d7y.io/dragonfly/v2/internal/dflog" + pkgredis "d7y.io/dragonfly/v2/pkg/redis" + pkgtypes "d7y.io/dragonfly/v2/pkg/types" + "d7y.io/dragonfly/v2/scheduler/config" +) + +// HostManager is the interface used for host manager. +type HostManager interface { + // Load returns host for a key. + Load(context.Context, string) (*Host, bool) + + // Store sets host. + Store(context.Context, *Host) + + // Delete deletes host for a key. + Delete(context.Context, string) + + // LoadAll returns all hosts. + LoadAll(context.Context) ([]*Host, error) +} + +// hostManager contains content for host manager. +type hostManager struct { + // Config is scheduler config. + config *config.Config + + // Redis universal client interface. + rdb redis.UniversalClient +} + +// TODO: Use newTaskManager for resource management. +// New host manager interface. +// nolint +func newHostManager(cfg *config.Config, rdb redis.UniversalClient) HostManager { + return &hostManager{config: cfg, rdb: rdb} +} + +// Load returns host for a key. +func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { + rawHost, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result() + if err != nil { + fmt.Println("getting host failed from Redis:", err) + return nil, false + } + + // Set integer fields from raw host. + port, err := strconv.ParseInt(rawHost["port"], 10, 32) + if err != nil { + fmt.Println("parsing port failed:", err) + return nil, false + } + + downloadPort, err := strconv.ParseInt(rawHost["download_port"], 10, 32) + if err != nil { + fmt.Println("parsing download port failed:", err) + return nil, false + } + + concurrentUploadLimit, err := strconv.ParseInt(rawHost["concurrent_upload_limit"], 10, 32) + if err != nil { + fmt.Println("parsing concurrent upload limit failed:", err) + return nil, false + } + + concurrentUploadCount, err := strconv.ParseInt(rawHost["concurrent_upload_count"], 10, 32) + if err != nil { + fmt.Println("parsing concurrent upload count failed:", err) + return nil, false + } + + uploadCount, err := strconv.ParseInt(rawHost["upload_count"], 10, 64) + if err != nil { + fmt.Println("parsing upload count failed:", err) + return nil, false + } + + uploadFailedCount, err := strconv.ParseInt(rawHost["upload_failed_count"], 10, 64) + if err != nil { + fmt.Println("parsing upload failed count failed:", err) + return nil, false + } + + // Set boolean fields from raw host. + diableShared, err := strconv.ParseBool(rawHost["disable_shared"]) + if err != nil { + fmt.Println("parsing disable shared failed:", err) + return nil, false + } + + // Set cpu fields from raw host. + cpuLogicalCount, err := strconv.ParseUint(rawHost["cpu_logical_count"], 10, 32) + if err != nil { + fmt.Println("parsing cpu logical count failed:", err) + return nil, false + } + + cpuPhysicalCount, err := strconv.ParseUint(rawHost["cpu_physical_count"], 10, 32) + if err != nil { + fmt.Println("parsing cpu physical count failed:", err) + return nil, false + } + + cpuPercent, err := strconv.ParseFloat(rawHost["cpu_percent"], 64) + if err != nil { + fmt.Println("parsing cpu percent failed:", err) + return nil, false + } + + cpuProcessPercent, err := strconv.ParseFloat(rawHost["cpu_processe_percent"], 64) + if err != nil { + fmt.Println("parsing cpu process percent failed:", err) + return nil, false + } + + cpuTimesUser, err := strconv.ParseFloat(rawHost["cpu_times_user"], 64) + if err != nil { + fmt.Println("parsing cpu times user failed:", err) + return nil, false + } + + cpuTimesSystem, err := strconv.ParseFloat(rawHost["cpu_times_system"], 64) + if err != nil { + fmt.Println("parsing cpu times system failed:", err) + return nil, false + } + + cpuTimesIdle, err := strconv.ParseFloat(rawHost["cpu_times_idle"], 64) + if err != nil { + fmt.Println("parsing cpu times idle failed:", err) + return nil, false + } + + cpuTimesNice, err := strconv.ParseFloat(rawHost["cpu_times_nice"], 64) + if err != nil { + fmt.Println("parsing cpu times nice failed:", err) + return nil, false + } + + cpuTimesIowait, err := strconv.ParseFloat(rawHost["cpu_times_iowait"], 64) + if err != nil { + fmt.Println("parsing cpu times iowait failed:", err) + return nil, false + } + + cpuTimesIrq, err := strconv.ParseFloat(rawHost["cpu_times_irq"], 64) + if err != nil { + fmt.Println("parsing cpu times irq failed:", err) + return nil, false + } + + cpuTimesSoftirq, err := strconv.ParseFloat(rawHost["cpu_times_softirq"], 64) + if err != nil { + fmt.Println("parsing cpu times softirq failed:", err) + return nil, false + } + + cpuTimesSteal, err := strconv.ParseFloat(rawHost["cpu_times_steal"], 64) + if err != nil { + fmt.Println("parsing cpu times steal failed:", err) + return nil, false + } + + cpuTimesGuest, err := strconv.ParseFloat(rawHost["cpu_times_guest"], 64) + if err != nil { + fmt.Println("parsing cpu times guest failed:", err) + return nil, false + } + + cpuTimesGuestNice, err := strconv.ParseFloat(rawHost["cpu_times_guest_nice"], 64) + if err != nil { + fmt.Println("parsing cpu times guest nice failed:", err) + return nil, false + } + + cpu := CPU{ + LogicalCount: uint32(cpuLogicalCount), + PhysicalCount: uint32(cpuPhysicalCount), + Percent: cpuPercent, + ProcessPercent: cpuProcessPercent, + Times: CPUTimes{ + User: cpuTimesUser, + System: cpuTimesSystem, + Idle: cpuTimesIdle, + Nice: cpuTimesNice, + Iowait: cpuTimesIowait, + Irq: cpuTimesIrq, + Softirq: cpuTimesSoftirq, + Steal: cpuTimesSteal, + Guest: cpuTimesGuest, + GuestNice: cpuTimesGuestNice, + }, + } + + // Set memory fields from raw host. + memoryTotal, err := strconv.ParseUint(rawHost["memory_total"], 10, 64) + if err != nil { + fmt.Println("parsing memory total failed:", err) + return nil, false + } + + memoryAvailable, err := strconv.ParseUint(rawHost["memory_available"], 10, 64) + if err != nil { + fmt.Println("parsing memory available failed:", err) + return nil, false + } + + memoryUsed, err := strconv.ParseUint(rawHost["memory_used"], 10, 64) + if err != nil { + fmt.Println("parsing memory used failed:", err) + return nil, false + } + + memoryUsedPercent, err := strconv.ParseFloat(rawHost["memory_used_percent"], 64) + if err != nil { + fmt.Println("parsing memory used percent failed:", err) + return nil, false + } + + memoryProcessUsedPercent, err := strconv.ParseFloat(rawHost["memory_processe_used_percent"], 64) + if err != nil { + fmt.Println("parsing memory process used percent failed:", err) + return nil, false + } + + memoryFree, err := strconv.ParseUint(rawHost["memory_free"], 10, 64) + if err != nil { + fmt.Println("parsing memory free failed:", err) + return nil, false + } + + memory := Memory{ + Total: memoryTotal, + Available: memoryAvailable, + Used: memoryUsed, + UsedPercent: memoryUsedPercent, + ProcessUsedPercent: memoryProcessUsedPercent, + Free: memoryFree, + } + + // Set network fields from raw host. + networkTCPConnectionCount, err := strconv.ParseUint(rawHost["network_tcp_connection_count"], 10, 32) + if err != nil { + fmt.Println("parsing network tcp connection count failed:", err) + return nil, false + } + + networkUploadTCPConnectionCount, err := strconv.ParseUint(rawHost["network_upload_tcp_connection_count"], 10, 32) + if err != nil { + fmt.Println("parsing network upload tcp connection count failed:", err) + return nil, false + } + + network := Network{ + TCPConnectionCount: uint32(networkTCPConnectionCount), + UploadTCPConnectionCount: uint32(networkUploadTCPConnectionCount), + Location: rawHost["network_location"], + IDC: rawHost["network_idc"], + } + + // Set disk fields from raw host. + diskTotal, err := strconv.ParseUint(rawHost["disk_total"], 10, 64) + if err != nil { + fmt.Println("parsing disk total failed:", err) + return nil, false + } + + diskFree, err := strconv.ParseUint(rawHost["disk_free"], 10, 64) + if err != nil { + fmt.Println("parsing disk free failed:", err) + return nil, false + } + + diskUsed, err := strconv.ParseUint(rawHost["disk_used"], 10, 64) + if err != nil { + fmt.Println("parsing disk used failed:", err) + return nil, false + } + + diskUsedPercent, err := strconv.ParseFloat(rawHost["disk_used_percent"], 64) + if err != nil { + fmt.Println("parsing disk used percent failed:", err) + return nil, false + } + + diskInodesTotal, err := strconv.ParseUint(rawHost["disk_inodes_total"], 10, 64) + if err != nil { + fmt.Println("parsing disk inodes total failed:", err) + return nil, false + } + + diskInodesUsed, err := strconv.ParseUint(rawHost["disk_inodes_used"], 10, 64) + if err != nil { + fmt.Println("parsing disk inodes used failed:", err) + return nil, false + } + + diskInodesFree, err := strconv.ParseUint(rawHost["disk_inodes_free"], 10, 64) + if err != nil { + fmt.Println("parsing disk inodes free failed:", err) + return nil, false + } + + diskInodesUsedPercent, err := strconv.ParseFloat(rawHost["disk_inodes_used_percent"], 64) + if err != nil { + fmt.Println("parsing disk inodes used percent failed:", err) + return nil, false + } + + disk := Disk{ + Total: diskTotal, + Free: diskFree, + Used: diskUsed, + UsedPercent: diskUsedPercent, + InodesTotal: diskInodesTotal, + InodesUsed: diskInodesUsed, + InodesFree: diskInodesFree, + InodesUsedPercent: diskInodesUsedPercent, + } + + build := Build{ + GitVersion: rawHost["build_git_version"], + GitCommit: rawHost["build_git_commit"], + GoVersion: rawHost["build_go_version"], + Platform: rawHost["build_platform"], + } + + // Set time fields from raw host. + announceInterval, err := strconv.ParseInt(rawHost["announce_interval"], 10, 32) + if err != nil { + fmt.Println("parsing announce interval failed:", err) + return nil, false + } + + createdAt, err := time.Parse(time.RFC3339, rawHost["created_at"]) + if err != nil { + fmt.Println("parsing created at failed:", err) + return nil, false + } + + updatedAt, err := time.Parse(time.RFC3339, rawHost["updated_at"]) + if err != nil { + fmt.Println("parsing updated at failed:", err) + return nil, false + } + + return NewHost( + rawHost["id"], + rawHost["hostname"], + rawHost["ip"], + rawHost["os"], + rawHost["platform"], + rawHost["platform_family"], + rawHost["platform_version"], + rawHost["kernel_version"], + int32(port), + int32(downloadPort), + int32(concurrentUploadLimit), + int32(concurrentUploadCount), + uploadCount, + uploadFailedCount, + diableShared, + pkgtypes.ParseHostType(rawHost["type"]), + cpu, + memory, + network, + disk, + build, + time.Duration(announceInterval), + createdAt, + updatedAt, + logger.WithHost(rawHost["id"], rawHost["hostname"], rawHost["ip"]), + ), true +} + +// Store sets host. +func (t *hostManager) Store(ctx context.Context, host *Host) { + t.rdb.HSet(ctx, + pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, host.ID), + "id", host.ID, + "type", host.Type.Name(), + "hostname", host.Hostname, + "ip", host.IP, + "port", host.Port, + "download_port", host.DownloadPort, + "disable_shared", host.DisableShared, + "os", host.OS, + "platform", host.Platform, + "platform_family", host.PlatformFamily, + "platform_version", host.PlatformVersion, + "kernel_version", host.KernelVersion, + "cpu_logical_count", host.CPU.LogicalCount, + "cpu_physical_count", host.CPU.PhysicalCount, + "cpu_percent", host.CPU.Percent, + "cpu_processe_percent", host.CPU.ProcessPercent, + "cpu_times_user", host.CPU.Times.User, + "cpu_times_system", host.CPU.Times.System, + "cpu_times_idle", host.CPU.Times.Idle, + "cpu_times_nice", host.CPU.Times.Nice, + "cpu_times_iowait", host.CPU.Times.Iowait, + "cpu_times_irq", host.CPU.Times.Irq, + "cpu_times_softirq", host.CPU.Times.Softirq, + "cpu_times_steal", host.CPU.Times.Steal, + "cpu_times_guest", host.CPU.Times.Guest, + "cpu_times_guest_nice", host.CPU.Times.GuestNice, + "memory_total", host.Memory.Total, + "memory_available", host.Memory.Available, + "memory_used", host.Memory.Used, + "memory_used_percent", host.Memory.UsedPercent, + "memory_processe_used_percent", host.Memory.ProcessUsedPercent, + "memory_free", host.Memory.Free, + "network_tcp_connection_count", host.Network.TCPConnectionCount, + "network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount, + "network_location", host.Network.Location, + "network_idc", host.Network.IDC, + "disk_total", host.Disk.Total, + "disk_free", host.Disk.Free, + "disk_used", host.Disk.Used, + "disk_used_percent", host.Disk.UsedPercent, + "disk_inodes_total", host.Disk.InodesTotal, + "disk_inodes_used", host.Disk.InodesUsed, + "disk_inodes_free", host.Disk.InodesFree, + "disk_inodes_used_percent", host.Disk.InodesUsedPercent, + "build_git_version", host.Build.GitVersion, + "build_git_commit", host.Build.GitCommit, + "build_go_version", host.Build.GoVersion, + "build_platform", host.Build.Platform, + "announce_interval", host.AnnounceInterval, + "concurrent_upload_limit", host.ConcurrentUploadLimit, + "concurrent_upload_count", host.ConcurrentUploadCount, + "upload_count", host.UploadCount, + "upload_failed_count", host.UploadFailedCount, + "created_at", host.CreatedAt.Format(time.RFC3339), + "updated_at", host.UpdatedAt.Format(time.RFC3339)) +} + +// Delete deletes host for a key. +func (t *hostManager) Delete(ctx context.Context, hostID string) { + t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)) +} + +// LoadAll returns all hosts. +func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) { + var ( + hosts []*Host + cursor uint64 + ) + + for { + var ( + hostKeys []string + err error + ) + + hostKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(t.config.Manager.SchedulerClusterID), 10).Result() + if err != nil { + logger.Warn("scan hosts failed") + return nil, err + } + + for _, hostKey := range hostKeys { + host, loaded := t.Load(ctx, hostKey) + if !loaded { + logger.WithHostID(hostKey).Warn("load host failed") + continue + } + + hosts = append(hosts, host) + } + + if cursor == 0 { + break + } + } + + return hosts, nil +} diff --git a/scheduler/resource/persistentcache/host_manager_mock.go b/scheduler/resource/persistentcache/host_manager_mock.go new file mode 100644 index 000000000..dd682f3fd --- /dev/null +++ b/scheduler/resource/persistentcache/host_manager_mock.go @@ -0,0 +1,94 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: host_manager.go +// +// Generated by this command: +// +// mockgen -destination host_manager_mock.go -source host_manager.go -package persistentcache +// + +// Package persistentcache is a generated GoMock package. +package persistentcache + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockHostManager is a mock of HostManager interface. +type MockHostManager struct { + ctrl *gomock.Controller + recorder *MockHostManagerMockRecorder +} + +// MockHostManagerMockRecorder is the mock recorder for MockHostManager. +type MockHostManagerMockRecorder struct { + mock *MockHostManager +} + +// NewMockHostManager creates a new mock instance. +func NewMockHostManager(ctrl *gomock.Controller) *MockHostManager { + mock := &MockHostManager{ctrl: ctrl} + mock.recorder = &MockHostManagerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockHostManager) EXPECT() *MockHostManagerMockRecorder { + return m.recorder +} + +// Delete mocks base method. +func (m *MockHostManager) Delete(arg0 context.Context, arg1 string) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Delete", arg0, arg1) +} + +// Delete indicates an expected call of Delete. +func (mr *MockHostManagerMockRecorder) Delete(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockHostManager)(nil).Delete), arg0, arg1) +} + +// Load mocks base method. +func (m *MockHostManager) Load(arg0 context.Context, arg1 string) (*Host, bool) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Load", arg0, arg1) + ret0, _ := ret[0].(*Host) + ret1, _ := ret[1].(bool) + return ret0, ret1 +} + +// Load indicates an expected call of Load. +func (mr *MockHostManagerMockRecorder) Load(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockHostManager)(nil).Load), arg0, arg1) +} + +// LoadAll mocks base method. +func (m *MockHostManager) LoadAll(arg0 context.Context) ([]*Host, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadAll", arg0) + ret0, _ := ret[0].([]*Host) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadAll indicates an expected call of LoadAll. +func (mr *MockHostManagerMockRecorder) LoadAll(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockHostManager)(nil).LoadAll), arg0) +} + +// Store mocks base method. +func (m *MockHostManager) Store(arg0 context.Context, arg1 *Host) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Store", arg0, arg1) +} + +// Store indicates an expected call of Store. +func (mr *MockHostManagerMockRecorder) Store(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Store", reflect.TypeOf((*MockHostManager)(nil).Store), arg0, arg1) +} diff --git a/scheduler/resource/persistentcache/task_manager.go b/scheduler/resource/persistentcache/task_manager.go index 6274c66a3..3603308e0 100644 --- a/scheduler/resource/persistentcache/task_manager.go +++ b/scheduler/resource/persistentcache/task_manager.go @@ -103,7 +103,7 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { } // Set time fields from raw task. - ttl, err := strconv.Atoi(rawTask["ttl"]) + ttl, err := strconv.ParseInt(rawTask["ttl"], 10, 32) if err != nil { fmt.Println("parsing ttl failed:", err) return nil, false @@ -146,9 +146,9 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) { ), true } -// Store sets task persistent cache task. +// Store sets persistent cache task. func (t *taskManager) Store(ctx context.Context, task *Task) error { - _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + if _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { t.rdb.HSet(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), "id", task.ID, @@ -167,9 +167,12 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error { t.rdb.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL) return nil - }) + }); err != nil { + task.Log.Warnf("store task failed: %v", err) + return err + } - return err + return nil } // Delete deletes persistent cache task for a key.