feat: add host manager for persistent cache (#3546)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-09-30 14:05:52 +08:00 committed by GitHub
parent 53f5e9c79c
commit bd8ecfbf29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 873 additions and 5 deletions

View File

@ -0,0 +1,267 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package persistentcache
import (
"time"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/types"
)
// Host contains content for host.
type Host struct {
// ID is host id.
ID string
// Type is host type.
Type types.HostType
// Hostname is host name.
Hostname string
// IP is host ip.
IP string
// Port is grpc service port.
Port int32
// DownloadPort is piece downloading port.
DownloadPort int32
// DisableShared is whether the host is disabled for
// shared with other peers.
DisableShared bool
// Host OS.
OS string
// Host platform.
Platform string
// Host platform family.
PlatformFamily string
// Host platform version.
PlatformVersion string
// Host kernel version.
KernelVersion string
// CPU Stat.
CPU CPU
// Memory Stat.
Memory Memory
// Network Stat.
Network Network
// Dist Stat.
Disk Disk
// Build information.
Build Build
// AnnounceInterval is the interval between host announces to scheduler.
AnnounceInterval time.Duration
// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit int32
// ConcurrentUploadCount is concurrent upload count.
ConcurrentUploadCount int32
// UploadCount is total upload count.
UploadCount int64
// UploadFailedCount is upload failed count.
UploadFailedCount int64
// CreatedAt is host create time.
CreatedAt time.Time
// UpdatedAt is host update time.
UpdatedAt time.Time
// Host log.
Log *logger.SugaredLoggerOnWith
}
// CPU contains content for cpu.
type CPU struct {
// Number of logical cores in the system.
LogicalCount uint32 `csv:"logicalCount"`
// Number of physical cores in the system.
PhysicalCount uint32 `csv:"physicalCount"`
// Percent calculates the percentage of cpu used.
Percent float64 `csv:"percent"`
// Calculates the percentage of cpu used by process.
ProcessPercent float64 `csv:"processPercent"`
// Times contains the amounts of time the CPU has spent performing different kinds of work.
Times CPUTimes `csv:"times"`
}
// CPUTimes contains content for cpu times.
type CPUTimes struct {
// CPU time of user.
User float64 `csv:"user"`
// CPU time of system.
System float64 `csv:"system"`
// CPU time of idle.
Idle float64 `csv:"idle"`
// CPU time of nice.
Nice float64 `csv:"nice"`
// CPU time of iowait.
Iowait float64 `csv:"iowait"`
// CPU time of irq.
Irq float64 `csv:"irq"`
// CPU time of softirq.
Softirq float64 `csv:"softirq"`
// CPU time of steal.
Steal float64 `csv:"steal"`
// CPU time of guest.
Guest float64 `csv:"guest"`
// CPU time of guest nice.
GuestNice float64 `csv:"guestNice"`
}
// Memory contains content for memory.
type Memory struct {
// Total amount of RAM on this system.
Total uint64 `csv:"total"`
// RAM available for programs to allocate.
Available uint64 `csv:"available"`
// RAM used by programs.
Used uint64 `csv:"used"`
// Percentage of RAM used by programs.
UsedPercent float64 `csv:"usedPercent"`
// Calculates the percentage of memory used by process.
ProcessUsedPercent float64 `csv:"processUsedPercent"`
// This is the kernel's notion of free memory.
Free uint64 `csv:"free"`
}
// Network contains content for network.
type Network struct {
// Return count of tcp connections opened and status is ESTABLISHED.
TCPConnectionCount uint32 `csv:"tcpConnectionCount"`
// Return count of upload tcp connections opened and status is ESTABLISHED.
UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"`
// Location path(area|country|province|city|...).
Location string `csv:"location"`
// IDC where the peer host is located
IDC string `csv:"idc"`
}
// Build contains content for build.
type Build struct {
// Git version.
GitVersion string `csv:"gitVersion"`
// Git commit.
GitCommit string `csv:"gitCommit"`
// Golang version.
GoVersion string `csv:"goVersion"`
// Build platform.
Platform string `csv:"platform"`
}
// Disk contains content for disk.
type Disk struct {
// Total amount of disk on the data path of dragonfly.
Total uint64 `csv:"total"`
// Free amount of disk on the data path of dragonfly.
Free uint64 `csv:"free"`
// Used amount of disk on the data path of dragonfly.
Used uint64 `csv:"used"`
// Used percent of disk on the data path of dragonfly directory.
UsedPercent float64 `csv:"usedPercent"`
// Total amount of indoes on the data path of dragonfly directory.
InodesTotal uint64 `csv:"inodesTotal"`
// Used amount of indoes on the data path of dragonfly directory.
InodesUsed uint64 `csv:"inodesUsed"`
// Free amount of indoes on the data path of dragonfly directory.
InodesFree uint64 `csv:"inodesFree"`
// Used percent of indoes on the data path of dragonfly directory.
InodesUsedPercent float64 `csv:"inodesUsedPercent"`
}
// New host instance.
func NewHost(
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadLimit, concurrentUploadCount int32,
UploadCount, UploadFailedCount int64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith,
) *Host {
return &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
IP: ip,
Port: port,
DownloadPort: downloadPort,
DisableShared: disableShared,
OS: os,
Platform: platform,
PlatformFamily: platformFamily,
PlatformVersion: platformVersion,
KernelVersion: kernelVersion,
CPU: cpu,
Memory: memory,
Network: network,
Disk: disk,
Build: build,
AnnounceInterval: announceInterval,
ConcurrentUploadLimit: concurrentUploadLimit,
ConcurrentUploadCount: concurrentUploadCount,
UploadCount: UploadCount,
UploadFailedCount: UploadFailedCount,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
}
}

View File

@ -0,0 +1,504 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination host_manager_mock.go -source host_manager.go -package persistentcache
package persistentcache
import (
"context"
"fmt"
"strconv"
"time"
"github.com/redis/go-redis/v9"
logger "d7y.io/dragonfly/v2/internal/dflog"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
)
// HostManager is the interface used for host manager.
type HostManager interface {
// Load returns host for a key.
Load(context.Context, string) (*Host, bool)
// Store sets host.
Store(context.Context, *Host)
// Delete deletes host for a key.
Delete(context.Context, string)
// LoadAll returns all hosts.
LoadAll(context.Context) ([]*Host, error)
}
// hostManager contains content for host manager.
type hostManager struct {
// Config is scheduler config.
config *config.Config
// Redis universal client interface.
rdb redis.UniversalClient
}
// TODO: Use newTaskManager for resource management.
// New host manager interface.
// nolint
func newHostManager(cfg *config.Config, rdb redis.UniversalClient) HostManager {
return &hostManager{config: cfg, rdb: rdb}
}
// Load returns host for a key.
func (t *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
rawHost, err := t.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID)).Result()
if err != nil {
fmt.Println("getting host failed from Redis:", err)
return nil, false
}
// Set integer fields from raw host.
port, err := strconv.ParseInt(rawHost["port"], 10, 32)
if err != nil {
fmt.Println("parsing port failed:", err)
return nil, false
}
downloadPort, err := strconv.ParseInt(rawHost["download_port"], 10, 32)
if err != nil {
fmt.Println("parsing download port failed:", err)
return nil, false
}
concurrentUploadLimit, err := strconv.ParseInt(rawHost["concurrent_upload_limit"], 10, 32)
if err != nil {
fmt.Println("parsing concurrent upload limit failed:", err)
return nil, false
}
concurrentUploadCount, err := strconv.ParseInt(rawHost["concurrent_upload_count"], 10, 32)
if err != nil {
fmt.Println("parsing concurrent upload count failed:", err)
return nil, false
}
uploadCount, err := strconv.ParseInt(rawHost["upload_count"], 10, 64)
if err != nil {
fmt.Println("parsing upload count failed:", err)
return nil, false
}
uploadFailedCount, err := strconv.ParseInt(rawHost["upload_failed_count"], 10, 64)
if err != nil {
fmt.Println("parsing upload failed count failed:", err)
return nil, false
}
// Set boolean fields from raw host.
diableShared, err := strconv.ParseBool(rawHost["disable_shared"])
if err != nil {
fmt.Println("parsing disable shared failed:", err)
return nil, false
}
// Set cpu fields from raw host.
cpuLogicalCount, err := strconv.ParseUint(rawHost["cpu_logical_count"], 10, 32)
if err != nil {
fmt.Println("parsing cpu logical count failed:", err)
return nil, false
}
cpuPhysicalCount, err := strconv.ParseUint(rawHost["cpu_physical_count"], 10, 32)
if err != nil {
fmt.Println("parsing cpu physical count failed:", err)
return nil, false
}
cpuPercent, err := strconv.ParseFloat(rawHost["cpu_percent"], 64)
if err != nil {
fmt.Println("parsing cpu percent failed:", err)
return nil, false
}
cpuProcessPercent, err := strconv.ParseFloat(rawHost["cpu_processe_percent"], 64)
if err != nil {
fmt.Println("parsing cpu process percent failed:", err)
return nil, false
}
cpuTimesUser, err := strconv.ParseFloat(rawHost["cpu_times_user"], 64)
if err != nil {
fmt.Println("parsing cpu times user failed:", err)
return nil, false
}
cpuTimesSystem, err := strconv.ParseFloat(rawHost["cpu_times_system"], 64)
if err != nil {
fmt.Println("parsing cpu times system failed:", err)
return nil, false
}
cpuTimesIdle, err := strconv.ParseFloat(rawHost["cpu_times_idle"], 64)
if err != nil {
fmt.Println("parsing cpu times idle failed:", err)
return nil, false
}
cpuTimesNice, err := strconv.ParseFloat(rawHost["cpu_times_nice"], 64)
if err != nil {
fmt.Println("parsing cpu times nice failed:", err)
return nil, false
}
cpuTimesIowait, err := strconv.ParseFloat(rawHost["cpu_times_iowait"], 64)
if err != nil {
fmt.Println("parsing cpu times iowait failed:", err)
return nil, false
}
cpuTimesIrq, err := strconv.ParseFloat(rawHost["cpu_times_irq"], 64)
if err != nil {
fmt.Println("parsing cpu times irq failed:", err)
return nil, false
}
cpuTimesSoftirq, err := strconv.ParseFloat(rawHost["cpu_times_softirq"], 64)
if err != nil {
fmt.Println("parsing cpu times softirq failed:", err)
return nil, false
}
cpuTimesSteal, err := strconv.ParseFloat(rawHost["cpu_times_steal"], 64)
if err != nil {
fmt.Println("parsing cpu times steal failed:", err)
return nil, false
}
cpuTimesGuest, err := strconv.ParseFloat(rawHost["cpu_times_guest"], 64)
if err != nil {
fmt.Println("parsing cpu times guest failed:", err)
return nil, false
}
cpuTimesGuestNice, err := strconv.ParseFloat(rawHost["cpu_times_guest_nice"], 64)
if err != nil {
fmt.Println("parsing cpu times guest nice failed:", err)
return nil, false
}
cpu := CPU{
LogicalCount: uint32(cpuLogicalCount),
PhysicalCount: uint32(cpuPhysicalCount),
Percent: cpuPercent,
ProcessPercent: cpuProcessPercent,
Times: CPUTimes{
User: cpuTimesUser,
System: cpuTimesSystem,
Idle: cpuTimesIdle,
Nice: cpuTimesNice,
Iowait: cpuTimesIowait,
Irq: cpuTimesIrq,
Softirq: cpuTimesSoftirq,
Steal: cpuTimesSteal,
Guest: cpuTimesGuest,
GuestNice: cpuTimesGuestNice,
},
}
// Set memory fields from raw host.
memoryTotal, err := strconv.ParseUint(rawHost["memory_total"], 10, 64)
if err != nil {
fmt.Println("parsing memory total failed:", err)
return nil, false
}
memoryAvailable, err := strconv.ParseUint(rawHost["memory_available"], 10, 64)
if err != nil {
fmt.Println("parsing memory available failed:", err)
return nil, false
}
memoryUsed, err := strconv.ParseUint(rawHost["memory_used"], 10, 64)
if err != nil {
fmt.Println("parsing memory used failed:", err)
return nil, false
}
memoryUsedPercent, err := strconv.ParseFloat(rawHost["memory_used_percent"], 64)
if err != nil {
fmt.Println("parsing memory used percent failed:", err)
return nil, false
}
memoryProcessUsedPercent, err := strconv.ParseFloat(rawHost["memory_processe_used_percent"], 64)
if err != nil {
fmt.Println("parsing memory process used percent failed:", err)
return nil, false
}
memoryFree, err := strconv.ParseUint(rawHost["memory_free"], 10, 64)
if err != nil {
fmt.Println("parsing memory free failed:", err)
return nil, false
}
memory := Memory{
Total: memoryTotal,
Available: memoryAvailable,
Used: memoryUsed,
UsedPercent: memoryUsedPercent,
ProcessUsedPercent: memoryProcessUsedPercent,
Free: memoryFree,
}
// Set network fields from raw host.
networkTCPConnectionCount, err := strconv.ParseUint(rawHost["network_tcp_connection_count"], 10, 32)
if err != nil {
fmt.Println("parsing network tcp connection count failed:", err)
return nil, false
}
networkUploadTCPConnectionCount, err := strconv.ParseUint(rawHost["network_upload_tcp_connection_count"], 10, 32)
if err != nil {
fmt.Println("parsing network upload tcp connection count failed:", err)
return nil, false
}
network := Network{
TCPConnectionCount: uint32(networkTCPConnectionCount),
UploadTCPConnectionCount: uint32(networkUploadTCPConnectionCount),
Location: rawHost["network_location"],
IDC: rawHost["network_idc"],
}
// Set disk fields from raw host.
diskTotal, err := strconv.ParseUint(rawHost["disk_total"], 10, 64)
if err != nil {
fmt.Println("parsing disk total failed:", err)
return nil, false
}
diskFree, err := strconv.ParseUint(rawHost["disk_free"], 10, 64)
if err != nil {
fmt.Println("parsing disk free failed:", err)
return nil, false
}
diskUsed, err := strconv.ParseUint(rawHost["disk_used"], 10, 64)
if err != nil {
fmt.Println("parsing disk used failed:", err)
return nil, false
}
diskUsedPercent, err := strconv.ParseFloat(rawHost["disk_used_percent"], 64)
if err != nil {
fmt.Println("parsing disk used percent failed:", err)
return nil, false
}
diskInodesTotal, err := strconv.ParseUint(rawHost["disk_inodes_total"], 10, 64)
if err != nil {
fmt.Println("parsing disk inodes total failed:", err)
return nil, false
}
diskInodesUsed, err := strconv.ParseUint(rawHost["disk_inodes_used"], 10, 64)
if err != nil {
fmt.Println("parsing disk inodes used failed:", err)
return nil, false
}
diskInodesFree, err := strconv.ParseUint(rawHost["disk_inodes_free"], 10, 64)
if err != nil {
fmt.Println("parsing disk inodes free failed:", err)
return nil, false
}
diskInodesUsedPercent, err := strconv.ParseFloat(rawHost["disk_inodes_used_percent"], 64)
if err != nil {
fmt.Println("parsing disk inodes used percent failed:", err)
return nil, false
}
disk := Disk{
Total: diskTotal,
Free: diskFree,
Used: diskUsed,
UsedPercent: diskUsedPercent,
InodesTotal: diskInodesTotal,
InodesUsed: diskInodesUsed,
InodesFree: diskInodesFree,
InodesUsedPercent: diskInodesUsedPercent,
}
build := Build{
GitVersion: rawHost["build_git_version"],
GitCommit: rawHost["build_git_commit"],
GoVersion: rawHost["build_go_version"],
Platform: rawHost["build_platform"],
}
// Set time fields from raw host.
announceInterval, err := strconv.ParseInt(rawHost["announce_interval"], 10, 32)
if err != nil {
fmt.Println("parsing announce interval failed:", err)
return nil, false
}
createdAt, err := time.Parse(time.RFC3339, rawHost["created_at"])
if err != nil {
fmt.Println("parsing created at failed:", err)
return nil, false
}
updatedAt, err := time.Parse(time.RFC3339, rawHost["updated_at"])
if err != nil {
fmt.Println("parsing updated at failed:", err)
return nil, false
}
return NewHost(
rawHost["id"],
rawHost["hostname"],
rawHost["ip"],
rawHost["os"],
rawHost["platform"],
rawHost["platform_family"],
rawHost["platform_version"],
rawHost["kernel_version"],
int32(port),
int32(downloadPort),
int32(concurrentUploadLimit),
int32(concurrentUploadCount),
uploadCount,
uploadFailedCount,
diableShared,
pkgtypes.ParseHostType(rawHost["type"]),
cpu,
memory,
network,
disk,
build,
time.Duration(announceInterval),
createdAt,
updatedAt,
logger.WithHost(rawHost["id"], rawHost["hostname"], rawHost["ip"]),
), true
}
// Store sets host.
func (t *hostManager) Store(ctx context.Context, host *Host) {
t.rdb.HSet(ctx,
pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, host.ID),
"id", host.ID,
"type", host.Type.Name(),
"hostname", host.Hostname,
"ip", host.IP,
"port", host.Port,
"download_port", host.DownloadPort,
"disable_shared", host.DisableShared,
"os", host.OS,
"platform", host.Platform,
"platform_family", host.PlatformFamily,
"platform_version", host.PlatformVersion,
"kernel_version", host.KernelVersion,
"cpu_logical_count", host.CPU.LogicalCount,
"cpu_physical_count", host.CPU.PhysicalCount,
"cpu_percent", host.CPU.Percent,
"cpu_processe_percent", host.CPU.ProcessPercent,
"cpu_times_user", host.CPU.Times.User,
"cpu_times_system", host.CPU.Times.System,
"cpu_times_idle", host.CPU.Times.Idle,
"cpu_times_nice", host.CPU.Times.Nice,
"cpu_times_iowait", host.CPU.Times.Iowait,
"cpu_times_irq", host.CPU.Times.Irq,
"cpu_times_softirq", host.CPU.Times.Softirq,
"cpu_times_steal", host.CPU.Times.Steal,
"cpu_times_guest", host.CPU.Times.Guest,
"cpu_times_guest_nice", host.CPU.Times.GuestNice,
"memory_total", host.Memory.Total,
"memory_available", host.Memory.Available,
"memory_used", host.Memory.Used,
"memory_used_percent", host.Memory.UsedPercent,
"memory_processe_used_percent", host.Memory.ProcessUsedPercent,
"memory_free", host.Memory.Free,
"network_tcp_connection_count", host.Network.TCPConnectionCount,
"network_upload_tcp_connection_count", host.Network.UploadTCPConnectionCount,
"network_location", host.Network.Location,
"network_idc", host.Network.IDC,
"disk_total", host.Disk.Total,
"disk_free", host.Disk.Free,
"disk_used", host.Disk.Used,
"disk_used_percent", host.Disk.UsedPercent,
"disk_inodes_total", host.Disk.InodesTotal,
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"announce_interval", host.AnnounceInterval,
"concurrent_upload_limit", host.ConcurrentUploadLimit,
"concurrent_upload_count", host.ConcurrentUploadCount,
"upload_count", host.UploadCount,
"upload_failed_count", host.UploadFailedCount,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339))
}
// Delete deletes host for a key.
func (t *hostManager) Delete(ctx context.Context, hostID string) {
t.rdb.Del(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(t.config.Manager.SchedulerClusterID, hostID))
}
// LoadAll returns all hosts.
func (t *hostManager) LoadAll(ctx context.Context) ([]*Host, error) {
var (
hosts []*Host
cursor uint64
)
for {
var (
hostKeys []string
err error
)
hostKeys, cursor, err = t.rdb.Scan(ctx, cursor, pkgredis.MakePersistentCacheHostsInScheduler(t.config.Manager.SchedulerClusterID), 10).Result()
if err != nil {
logger.Warn("scan hosts failed")
return nil, err
}
for _, hostKey := range hostKeys {
host, loaded := t.Load(ctx, hostKey)
if !loaded {
logger.WithHostID(hostKey).Warn("load host failed")
continue
}
hosts = append(hosts, host)
}
if cursor == 0 {
break
}
}
return hosts, nil
}

View File

@ -0,0 +1,94 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: host_manager.go
//
// Generated by this command:
//
// mockgen -destination host_manager_mock.go -source host_manager.go -package persistentcache
//
// Package persistentcache is a generated GoMock package.
package persistentcache
import (
context "context"
reflect "reflect"
gomock "go.uber.org/mock/gomock"
)
// MockHostManager is a mock of HostManager interface.
type MockHostManager struct {
ctrl *gomock.Controller
recorder *MockHostManagerMockRecorder
}
// MockHostManagerMockRecorder is the mock recorder for MockHostManager.
type MockHostManagerMockRecorder struct {
mock *MockHostManager
}
// NewMockHostManager creates a new mock instance.
func NewMockHostManager(ctrl *gomock.Controller) *MockHostManager {
mock := &MockHostManager{ctrl: ctrl}
mock.recorder = &MockHostManagerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockHostManager) EXPECT() *MockHostManagerMockRecorder {
return m.recorder
}
// Delete mocks base method.
func (m *MockHostManager) Delete(arg0 context.Context, arg1 string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Delete", arg0, arg1)
}
// Delete indicates an expected call of Delete.
func (mr *MockHostManagerMockRecorder) Delete(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockHostManager)(nil).Delete), arg0, arg1)
}
// Load mocks base method.
func (m *MockHostManager) Load(arg0 context.Context, arg1 string) (*Host, bool) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Load", arg0, arg1)
ret0, _ := ret[0].(*Host)
ret1, _ := ret[1].(bool)
return ret0, ret1
}
// Load indicates an expected call of Load.
func (mr *MockHostManagerMockRecorder) Load(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Load", reflect.TypeOf((*MockHostManager)(nil).Load), arg0, arg1)
}
// LoadAll mocks base method.
func (m *MockHostManager) LoadAll(arg0 context.Context) ([]*Host, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LoadAll", arg0)
ret0, _ := ret[0].([]*Host)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// LoadAll indicates an expected call of LoadAll.
func (mr *MockHostManagerMockRecorder) LoadAll(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadAll", reflect.TypeOf((*MockHostManager)(nil).LoadAll), arg0)
}
// Store mocks base method.
func (m *MockHostManager) Store(arg0 context.Context, arg1 *Host) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Store", arg0, arg1)
}
// Store indicates an expected call of Store.
func (mr *MockHostManagerMockRecorder) Store(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Store", reflect.TypeOf((*MockHostManager)(nil).Store), arg0, arg1)
}

View File

@ -103,7 +103,7 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) {
} }
// Set time fields from raw task. // Set time fields from raw task.
ttl, err := strconv.Atoi(rawTask["ttl"]) ttl, err := strconv.ParseInt(rawTask["ttl"], 10, 32)
if err != nil { if err != nil {
fmt.Println("parsing ttl failed:", err) fmt.Println("parsing ttl failed:", err)
return nil, false return nil, false
@ -146,9 +146,9 @@ func (t *taskManager) Load(ctx context.Context, taskID string) (*Task, bool) {
), true ), true
} }
// Store sets task persistent cache task. // Store sets persistent cache task.
func (t *taskManager) Store(ctx context.Context, task *Task) error { func (t *taskManager) Store(ctx context.Context, task *Task) error {
_, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error { if _, err := t.rdb.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
t.rdb.HSet(ctx, t.rdb.HSet(ctx,
pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID),
"id", task.ID, "id", task.ID,
@ -167,9 +167,12 @@ func (t *taskManager) Store(ctx context.Context, task *Task) error {
t.rdb.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL) t.rdb.Expire(ctx, pkgredis.MakePersistentCacheTaskKeyInScheduler(t.config.Manager.SchedulerClusterID, task.ID), task.TTL)
return nil return nil
}) }); err != nil {
task.Log.Warnf("store task failed: %v", err)
return err
}
return err return nil
} }
// Delete deletes persistent cache task for a key. // Delete deletes persistent cache task for a key.