feat: add sync peer job for scheduler (#2663)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-08-22 19:58:27 +08:00 committed by GitHub
parent 036e32c1b8
commit dfde8bdce6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 385 additions and 28 deletions

View File

@ -24,7 +24,11 @@ const (
// Job Name. // Job Name.
const ( const (
// PreheatJob is the name of preheat job.
PreheatJob = "preheat" PreheatJob = "preheat"
// SyncPeersJob is the name of syncing peers job.
SyncPeersJob = "sync_peers"
) )
// Machinery server configuration. // Machinery server configuration.

View File

@ -284,6 +284,9 @@ type TCPListenPortRange struct {
type JobConfig struct { type JobConfig struct {
// Preheat configuration. // Preheat configuration.
Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"` Preheat PreheatConfig `yaml:"preheat" mapstructure:"preheat"`
// Sync peers configuration.
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`
} }
type PreheatConfig struct { type PreheatConfig struct {
@ -294,6 +297,12 @@ type PreheatConfig struct {
TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"` TLS *PreheatTLSClientConfig `yaml:"tls" mapstructure:"tls"`
} }
type SyncPeersConfig struct {
// Interval is the interval for syncing all peers information from the scheduler and
// display peers information in the manager console.
Interval time.Duration `yaml:"interval" mapstructure:"interval"`
}
type PreheatTLSClientConfig struct { type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string. // CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"` CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
@ -427,6 +436,9 @@ func New() *Config {
Preheat: PreheatConfig{ Preheat: PreheatConfig{
RegistryTimeout: DefaultJobPreheatRegistryTimeout, RegistryTimeout: DefaultJobPreheatRegistryTimeout,
}, },
SyncPeers: SyncPeersConfig{
Interval: DefaultJobSyncPeersInterval,
},
}, },
ObjectStorage: ObjectStorageConfig{ ObjectStorage: ObjectStorageConfig{
Enable: false, Enable: false,
@ -607,6 +619,10 @@ func (cfg *Config) Validate() error {
return errors.New("preheat requires parameter registryTimeout") return errors.New("preheat requires parameter registryTimeout")
} }
if cfg.Job.SyncPeers.Interval <= MinJobSyncPeersInterval {
return errors.New("syncPeers requires parameter interval and it must be greater than 12 hours")
}
if cfg.ObjectStorage.Enable { if cfg.ObjectStorage.Enable {
if cfg.ObjectStorage.Name == "" { if cfg.ObjectStorage.Name == "" {
return errors.New("objectStorage requires parameter name") return errors.New("objectStorage requires parameter name")

View File

@ -194,6 +194,9 @@ func TestConfig_Load(t *testing.T) {
CACert: "foo", CACert: "foo",
}, },
}, },
SyncPeers: SyncPeersConfig{
Interval: 13 * time.Hour,
},
}, },
ObjectStorage: ObjectStorageConfig{ ObjectStorage: ObjectStorageConfig{
Enable: true, Enable: true,
@ -741,6 +744,21 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "preheat requires parameter registryTimeout") assert.EqualError(err, "preheat requires parameter registryTimeout")
}, },
}, },
{
name: "syncPeers requires parameter interval",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.SyncPeers.Interval = 11 * time.Hour
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "syncPeers requires parameter interval and it must be greater than 12 hours")
},
},
{ {
name: "objectStorage requires parameter name", name: "objectStorage requires parameter name",
config: New(), config: New(),

View File

@ -26,6 +26,7 @@ const (
const ( const (
SpanPreheat = "preheat" SpanPreheat = "preheat"
SpanSyncPeers = "sync-peers"
SpanGetLayers = "get-layers" SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry" SpanAuthWithRegistry = "auth-with-registry"
) )

View File

@ -92,6 +92,12 @@ const (
const ( const (
// DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest. // DefaultJobPreheatRegistryTimeout is the default timeout for requesting registry to get token and manifest.
DefaultJobPreheatRegistryTimeout = 1 * time.Minute DefaultJobPreheatRegistryTimeout = 1 * time.Minute
// DefaultJobSyncPeersInterval is the default interval for syncing all peers information from the scheduler.
DefaultJobSyncPeersInterval = 24 * time.Hour
// MinJobSyncPeersInterval is the min interval for syncing all peers information from the scheduler.
MinJobSyncPeersInterval = 12 * time.Hour
) )
const ( const (

View File

@ -68,6 +68,9 @@ job:
registryTimeout: 1m registryTimeout: 1m
tls: tls:
caCert: testdata/ca.crt caCert: testdata/ca.crt
syncPeers:
interval: 13h
objectStorage: objectStorage:
enable: true enable: true
name: s3 name: s3

View File

@ -20,15 +20,24 @@ import (
"crypto/x509" "crypto/x509"
"errors" "errors"
"go.opentelemetry.io/otel"
internaljob "d7y.io/dragonfly/v2/internal/job" internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config" "d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
) )
// tracer is a global tracer for job.
var tracer = otel.Tracer("manager")
// Job is an implementation of job.
type Job struct { type Job struct {
*internaljob.Job *internaljob.Job
Preheat Preheat
SyncPeers
} }
// New returns a new Job.
func New(cfg *config.Config) (*Job, error) { func New(cfg *config.Config) (*Job, error) {
j, err := internaljob.New(&internaljob.Config{ j, err := internaljob.New(&internaljob.Config{
Addrs: cfg.Database.Redis.Addrs, Addrs: cfg.Database.Redis.Addrs,
@ -50,13 +59,34 @@ func New(cfg *config.Config) (*Job, error) {
} }
} }
p, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool) preheat, err := newPreheat(j, cfg.Job.Preheat.RegistryTimeout, certPool)
if err != nil {
return nil, err
}
syncPeers, err := newSyncPeers(j)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Job{ return &Job{
Job: j, Job: j,
Preheat: p, Preheat: preheat,
SyncPeers: syncPeers,
}, nil }, nil
} }
// getSchedulerQueues gets scheduler queues.
func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue {
var queues []internaljob.Queue
for _, scheduler := range schedulers {
queue, err := internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname)
if err != nil {
continue
}
queues = append(queues, queue)
}
return queues
}

View File

@ -0,0 +1,58 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: sync_peers.go
// Package mocks is a generated GoMock package.
package mocks
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
)
// MockSyncPeers is a mock of SyncPeers interface.
type MockSyncPeers struct {
ctrl *gomock.Controller
recorder *MockSyncPeersMockRecorder
}
// MockSyncPeersMockRecorder is the mock recorder for MockSyncPeers.
type MockSyncPeersMockRecorder struct {
mock *MockSyncPeers
}
// NewMockSyncPeers creates a new mock instance.
func NewMockSyncPeers(ctrl *gomock.Controller) *MockSyncPeers {
mock := &MockSyncPeers{ctrl: ctrl}
mock.recorder = &MockSyncPeersMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder {
return m.recorder
}
// Serve mocks base method.
func (m *MockSyncPeers) Serve() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Serve")
}
// Serve indicates an expected call of Serve.
func (mr *MockSyncPeersMockRecorder) Serve() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockSyncPeers)(nil).Serve))
}
// Stop mocks base method.
func (m *MockSyncPeers) Stop() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Stop")
}
// Stop indicates an expected call of Stop.
func (mr *MockSyncPeersMockRecorder) Stop() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockSyncPeers)(nil).Stop))
}

View File

@ -36,7 +36,6 @@ import (
"github.com/distribution/distribution/v3/manifest/schema2" "github.com/distribution/distribution/v3/manifest/schema2"
"github.com/go-http-utils/headers" "github.com/go-http-utils/headers"
"github.com/google/uuid" "github.com/google/uuid"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
@ -47,8 +46,6 @@ import (
nethttp "d7y.io/dragonfly/v2/pkg/net/http" nethttp "d7y.io/dragonfly/v2/pkg/net/http"
) )
var tracer = otel.Tracer("manager")
type PreheatType string type PreheatType string
const ( const (
@ -59,18 +56,23 @@ const (
PreheatFileType PreheatType = "file" PreheatFileType PreheatType = "file"
) )
// accessURLPattern is the pattern of access url.
var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)") var accessURLPattern, _ = regexp.Compile("^(.*)://(.*)/v2/(.*)/manifests/(.*)")
// Preheat is an interface for preheat job.
type Preheat interface { type Preheat interface {
// CreatePreheat creates a preheat job.
CreatePreheat(context.Context, []models.Scheduler, types.PreheatArgs) (*internaljob.GroupJobState, error) CreatePreheat(context.Context, []models.Scheduler, types.PreheatArgs) (*internaljob.GroupJobState, error)
} }
// preheat is an implementation of Preheat.
type preheat struct { type preheat struct {
job *internaljob.Job job *internaljob.Job
httpRequestTimeout time.Duration httpRequestTimeout time.Duration
rootCAs *x509.CertPool rootCAs *x509.CertPool
} }
// preheatImage is image information for preheat.
type preheatImage struct { type preheatImage struct {
protocol string protocol string
domain string domain string
@ -78,10 +80,12 @@ type preheatImage struct {
tag string tag string
} }
// newPreheat creates a new Preheat.
func newPreheat(job *internaljob.Job, httpRequestTimeout time.Duration, rootCAs *x509.CertPool) (Preheat, error) { func newPreheat(job *internaljob.Job, httpRequestTimeout time.Duration, rootCAs *x509.CertPool) (Preheat, error) {
return &preheat{job, httpRequestTimeout, rootCAs}, nil return &preheat{job, httpRequestTimeout, rootCAs}, nil
} }
// CreatePreheat creates a preheat job.
func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Scheduler, json types.PreheatArgs) (*internaljob.GroupJobState, error) { func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Scheduler, json types.PreheatArgs) (*internaljob.GroupJobState, error) {
var span trace.Span var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer)) ctx, span = tracer.Start(ctx, config.SpanPreheat, trace.WithSpanKind(trace.SpanKindProducer))
@ -127,6 +131,7 @@ func (p *preheat) CreatePreheat(ctx context.Context, schedulers []models.Schedul
return p.createGroupJob(ctx, files, queues) return p.createGroupJob(ctx, files, queues)
} }
// createGroupJob creates a group job.
func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.PreheatRequest, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
var signatures []*machineryv1tasks.Signature var signatures []*machineryv1tasks.Signature
for _, queue := range queues { for _, queue := range queues {
@ -169,6 +174,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []internaljob.Prehea
}, nil }, nil
} }
// getLayers gets layers of image.
func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) { func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) {
ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer)) ctx, span := tracer.Start(ctx, config.SpanGetLayers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End() defer span.End()
@ -204,6 +210,7 @@ func (p *preheat) getLayers(ctx context.Context, url, tag, filter string, header
return layers, nil return layers, nil
} }
// getManifests gets manifests of image.
func (p *preheat) getManifests(ctx context.Context, url string, header http.Header, timeout time.Duration) (*http.Response, error) { func (p *preheat) getManifests(ctx context.Context, url string, header http.Header, timeout time.Duration) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
@ -229,6 +236,7 @@ func (p *preheat) getManifests(ctx context.Context, url string, header http.Head
return resp, nil return resp, nil
} }
// parseLayers parses layers of image.
func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) { func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, header http.Header, image *preheatImage) ([]internaljob.PreheatRequest, error) {
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
@ -255,6 +263,7 @@ func (p *preheat) parseLayers(resp *http.Response, url, tag, filter string, head
return layers, nil return layers, nil
} }
// getAuthToken gets auth token from registry.
func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration, rootCAs *x509.CertPool) (string, error) { func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration, rootCAs *x509.CertPool) (string, error) {
ctx, span := tracer.Start(ctx, config.SpanAuthWithRegistry, trace.WithSpanKind(trace.SpanKindProducer)) ctx, span := tracer.Start(ctx, config.SpanAuthWithRegistry, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End() defer span.End()
@ -298,6 +307,7 @@ func getAuthToken(ctx context.Context, header http.Header, timeout time.Duration
return token, nil return token, nil
} }
// authURL gets auth url from www-authenticate header.
func authURL(wwwAuth []string) string { func authURL(wwwAuth []string) string {
// Bearer realm="<auth-service-url>",service="<service>",scope="repository:<name>:pull" // Bearer realm="<auth-service-url>",service="<service>",scope="repository:<name>:pull"
if len(wwwAuth) == 0 { if len(wwwAuth) == 0 {
@ -315,10 +325,12 @@ func authURL(wwwAuth []string) string {
return fmt.Sprintf("%s?%s", host, query) return fmt.Sprintf("%s?%s", host, query)
} }
// layerURL gets layer url.
func layerURL(protocol string, domain string, name string, digest string) string { func layerURL(protocol string, domain string, name string, digest string) string {
return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", protocol, domain, name, digest) return fmt.Sprintf("%s://%s/v2/%s/blobs/%s", protocol, domain, name, digest)
} }
// parseAccessURL parses access url.
func parseAccessURL(url string) (*preheatImage, error) { func parseAccessURL(url string) (*preheatImage, error) {
r := accessURLPattern.FindStringSubmatch(url) r := accessURLPattern.FindStringSubmatch(url)
if len(r) != 5 { if len(r) != 5 {
@ -332,17 +344,3 @@ func parseAccessURL(url string) (*preheatImage, error) {
tag: r[4], tag: r[4],
}, nil }, nil
} }
func getSchedulerQueues(schedulers []models.Scheduler) []internaljob.Queue {
var queues []internaljob.Queue
for _, scheduler := range schedulers {
queue, err := internaljob.GetSchedulerQueue(scheduler.SchedulerClusterID, scheduler.Hostname)
if err != nil {
continue
}
queues = append(queues, queue)
}
return queues
}

104
manager/job/sync_peers.go Normal file
View File

@ -0,0 +1,104 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination mocks/sync_peers_mock.go -source sync_peers.go -package mocks
package job
import (
"context"
"fmt"
"time"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
)
// SyncPeers is an interface for sync peers.
type SyncPeers interface {
// Started sync peers server.
Serve()
// Stop sync peers server.
Stop()
}
// syncPeers is an implementation of SyncPeers.
type syncPeers struct {
job *internaljob.Job
}
// newSyncPeers returns a new SyncPeers.
func newSyncPeers(job *internaljob.Job) (SyncPeers, error) {
return &syncPeers{job}, nil
}
// TODO Implement function.
// Started sync peers server.
func (s *syncPeers) Serve() {
}
// TODO Implement function.
// Stop sync peers server.
func (s *syncPeers) Stop() {
}
// createSyncPeers creates sync peers.
func (s *syncPeers) createSyncPeers(ctx context.Context, schedulers []models.Scheduler) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanSyncPeers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
// Initialize queues
queues := getSchedulerQueues(schedulers)
var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: internaljob.SyncPeersJob,
RoutingKey: queue.String(),
})
}
group, err := machineryv1tasks.NewGroup(signatures...)
if err != nil {
return nil, err
}
var tasks []machineryv1tasks.Signature
for _, signature := range signatures {
tasks = append(tasks, *signature)
}
logger.Infof("create sync peers group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := s.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create sync peers group %s failed", group.GroupUUID, err)
return nil, err
}
return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: time.Now(),
}, nil
}

View File

@ -432,6 +432,20 @@ func (mr *MockServiceMockRecorder) DestroyOauth(arg0, arg1 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyOauth", reflect.TypeOf((*MockService)(nil).DestroyOauth), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyOauth", reflect.TypeOf((*MockService)(nil).DestroyOauth), arg0, arg1)
} }
// DestroyPeer mocks base method.
func (m *MockService) DestroyPeer(arg0 context.Context, arg1 uint) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DestroyPeer", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// DestroyPeer indicates an expected call of DestroyPeer.
func (mr *MockServiceMockRecorder) DestroyPeer(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyPeer", reflect.TypeOf((*MockService)(nil).DestroyPeer), arg0, arg1)
}
// DestroyPersonalAccessToken mocks base method. // DestroyPersonalAccessToken mocks base method.
func (m *MockService) DestroyPersonalAccessToken(arg0 context.Context, arg1 uint) error { func (m *MockService) DestroyPersonalAccessToken(arg0 context.Context, arg1 uint) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -733,19 +747,35 @@ func (mr *MockServiceMockRecorder) GetOauths(arg0, arg1 interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOauths", reflect.TypeOf((*MockService)(nil).GetOauths), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOauths", reflect.TypeOf((*MockService)(nil).GetOauths), arg0, arg1)
} }
// GetPeers mocks base method. // GetPeer mocks base method.
func (m *MockService) GetPeers(arg0 context.Context) ([]string, error) { func (m *MockService) GetPeer(arg0 context.Context, arg1 uint) (*models.Peer, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPeers", arg0) ret := m.ctrl.Call(m, "GetPeer", arg0, arg1)
ret0, _ := ret[0].([]string) ret0, _ := ret[0].(*models.Peer)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// GetPeers indicates an expected call of GetPeers. // GetPeer indicates an expected call of GetPeer.
func (mr *MockServiceMockRecorder) GetPeers(arg0 interface{}) *gomock.Call { func (mr *MockServiceMockRecorder) GetPeer(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*MockService)(nil).GetPeers), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeer", reflect.TypeOf((*MockService)(nil).GetPeer), arg0, arg1)
}
// GetPeers mocks base method.
func (m *MockService) GetPeers(arg0 context.Context, arg1 types.GetPeersQuery) ([]models.Peer, int64, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetPeers", arg0, arg1)
ret0, _ := ret[0].([]models.Peer)
ret1, _ := ret[1].(int64)
ret2, _ := ret[2].(error)
return ret0, ret1, ret2
}
// GetPeers indicates an expected call of GetPeers.
func (mr *MockServiceMockRecorder) GetPeers(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPeers", reflect.TypeOf((*MockService)(nil).GetPeers), arg0, arg1)
} }
// GetPermissions mocks base method. // GetPermissions mocks base method.

View File

@ -44,10 +44,12 @@ const (
preheatTimeout = 20 * time.Minute preheatTimeout = 20 * time.Minute
) )
// Job is an interface for job.
type Job interface { type Job interface {
Serve() Serve()
} }
// job is an implementation of Job.
type job struct { type job struct {
globalJob *internaljob.Job globalJob *internaljob.Job
schedulerJob *internaljob.Job schedulerJob *internaljob.Job
@ -56,6 +58,7 @@ type job struct {
config *config.Config config *config.Config
} }
// New creates a new Job.
func New(cfg *config.Config, resource resource.Resource) (Job, error) { func New(cfg *config.Config, resource resource.Resource) (Job, error) {
redisConfig := &internaljob.Config{ redisConfig := &internaljob.Config{
Addrs: cfg.Database.Redis.Addrs, Addrs: cfg.Database.Redis.Addrs,
@ -103,6 +106,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
namedJobFuncs := map[string]any{ namedJobFuncs := map[string]any{
internaljob.PreheatJob: t.preheat, internaljob.PreheatJob: t.preheat,
internaljob.SyncPeersJob: t.syncPeers,
} }
if err := localJob.RegisterJob(namedJobFuncs); err != nil { if err := localJob.RegisterJob(namedJobFuncs); err != nil {
@ -113,6 +117,7 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
return t, nil return t, nil
} }
// Serve starts the job.
func (j *job) Serve() { func (j *job) Serve() {
go func() { go func() {
logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum) logger.Infof("ready to launch %d worker(s) on global queue", j.config.Job.GlobalWorkerNum)
@ -142,6 +147,7 @@ func (j *job) Serve() {
}() }()
} }
// preheat is a job to preheat.
func (j *job) preheat(ctx context.Context, req string) error { func (j *job) preheat(ctx context.Context, req string) error {
ctx, cancel := context.WithTimeout(ctx, preheatTimeout) ctx, cancel := context.WithTimeout(ctx, preheatTimeout)
defer cancel() defer cancel()
@ -204,3 +210,20 @@ func (j *job) preheat(ctx context.Context, req string) error {
} }
} }
} }
// syncPeers is a job to sync peers.
func (j *job) syncPeers() ([]*resource.Host, error) {
var hosts []*resource.Host
j.resource.HostManager().Range(func(key, value any) bool {
host, ok := value.(*resource.Host)
if !ok {
logger.Errorf("invalid host %v %v", key, value)
return true
}
hosts = append(hosts, host)
return true
})
return hosts, nil
}

View File

@ -48,6 +48,10 @@ type HostManager interface {
// Delete deletes host for a key. // Delete deletes host for a key.
Delete(string) Delete(string)
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
Range(f func(any, any) bool)
// LoadRandomHosts loads host randomly through the Range of sync.Map. // LoadRandomHosts loads host randomly through the Range of sync.Map.
LoadRandomHosts(int, set.SafeSet[string]) []*Host LoadRandomHosts(int, set.SafeSet[string]) []*Host
@ -107,6 +111,12 @@ func (h *hostManager) Delete(key string) {
h.Map.Delete(key) h.Map.Delete(key)
} }
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
func (h *hostManager) Range(f func(key, value any) bool) {
h.Map.Range(f)
}
// LoadRandomHosts loads host randomly through the Range of sync.Map. // LoadRandomHosts loads host randomly through the Range of sync.Map.
func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*Host { func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*Host {
hosts := make([]*Host, 0, n) hosts := make([]*Host, 0, n)

View File

@ -90,6 +90,18 @@ func (mr *MockHostManagerMockRecorder) LoadRandomHosts(arg0, arg1 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadRandomHosts", reflect.TypeOf((*MockHostManager)(nil).LoadRandomHosts), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadRandomHosts", reflect.TypeOf((*MockHostManager)(nil).LoadRandomHosts), arg0, arg1)
} }
// Range mocks base method.
func (m *MockHostManager) Range(f func(any, any) bool) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Range", f)
}
// Range indicates an expected call of Range.
func (mr *MockHostManagerMockRecorder) Range(f interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockHostManager)(nil).Range), f)
}
// RunGC mocks base method. // RunGC mocks base method.
func (m *MockHostManager) RunGC() error { func (m *MockHostManager) RunGC() error {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -48,6 +48,10 @@ type PeerManager interface {
// Delete deletes peer for a key. // Delete deletes peer for a key.
Delete(string) Delete(string)
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
Range(f func(any, any) bool)
// Try to reclaim peer. // Try to reclaim peer.
RunGC() error RunGC() error
} }
@ -140,6 +144,12 @@ func (p *peerManager) Delete(key string) {
} }
} }
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
func (p *peerManager) Range(f func(key, value any) bool) {
p.Map.Range(f)
}
// Try to reclaim peer. // Try to reclaim peer.
func (p *peerManager) RunGC() error { func (p *peerManager) RunGC() error {
p.Map.Range(func(_, value any) bool { p.Map.Range(func(_, value any) bool {

View File

@ -75,6 +75,18 @@ func (mr *MockPeerManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockPeerManager)(nil).LoadOrStore), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockPeerManager)(nil).LoadOrStore), arg0)
} }
// Range mocks base method.
func (m *MockPeerManager) Range(f func(any, any) bool) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Range", f)
}
// Range indicates an expected call of Range.
func (mr *MockPeerManagerMockRecorder) Range(f interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockPeerManager)(nil).Range), f)
}
// RunGC mocks base method. // RunGC mocks base method.
func (m *MockPeerManager) RunGC() error { func (m *MockPeerManager) RunGC() error {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -46,6 +46,10 @@ type TaskManager interface {
// Delete deletes task for a key. // Delete deletes task for a key.
Delete(string) Delete(string)
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
Range(f func(any, any) bool)
// Try to reclaim task. // Try to reclaim task.
RunGC() error RunGC() error
} }
@ -102,6 +106,12 @@ func (t *taskManager) Delete(key string) {
t.Map.Delete(key) t.Map.Delete(key)
} }
// Range calls f sequentially for each key and value present in the map.
// If f returns false, range stops the iteration.
func (t *taskManager) Range(f func(key, value any) bool) {
t.Map.Range(f)
}
// Try to reclaim task. // Try to reclaim task.
func (t *taskManager) RunGC() error { func (t *taskManager) RunGC() error {
t.Map.Range(func(_, value any) bool { t.Map.Range(func(_, value any) bool {

View File

@ -75,6 +75,18 @@ func (mr *MockTaskManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockTaskManager)(nil).LoadOrStore), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockTaskManager)(nil).LoadOrStore), arg0)
} }
// Range mocks base method.
func (m *MockTaskManager) Range(f func(any, any) bool) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Range", f)
}
// Range indicates an expected call of Range.
func (mr *MockTaskManagerMockRecorder) Range(f interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Range", reflect.TypeOf((*MockTaskManager)(nil).Range), f)
}
// RunGC mocks base method. // RunGC mocks base method.
func (m *MockTaskManager) RunGC() error { func (m *MockTaskManager) RunGC() error {
m.ctrl.T.Helper() m.ctrl.T.Helper()