feat: merge sync peer with peer table in manager (#2668)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-08-24 10:28:08 +08:00 committed by GitHub
parent 63392f2d29
commit fe28ba4c8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 363 additions and 92 deletions

View File

@ -52,14 +52,15 @@ type Announcer interface {
// announcer provides announce function.
type announcer struct {
config *config.DaemonOption
dynconfig config.Dynconfig
hostID string
daemonPort int32
daemonDownloadPort int32
schedulerClient schedulerclient.V1
managerClient managerclient.V1
done chan struct{}
config *config.DaemonOption
dynconfig config.Dynconfig
hostID string
daemonPort int32
daemonDownloadPort int32
daemonObjectStoragePort int32
schedulerClient schedulerclient.V1
managerClient managerclient.V1
done chan struct{}
}
// Option is a functional option for configuring the announcer.
@ -72,6 +73,13 @@ func WithManagerClient(client managerclient.V1) Option {
}
}
// WithObjectStoragePort sets the daemonObjectStoragePort.
func WithObjectStoragePort(port int32) Option {
return func(a *announcer) {
a.daemonObjectStoragePort = port
}
}
// New returns a new Announcer interface.
func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, hostID string, daemonPort int32, daemonDownloadPort int32, schedulerClient schedulerclient.V1, options ...Option) Announcer {
a := &announcer{
@ -153,6 +161,11 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest,
hostType = types.HostTypeSuperSeedName
}
var objectStoragePort int32
if a.config.ObjectStorage.Enable {
objectStoragePort = a.daemonObjectStoragePort
}
pid := os.Getpid()
h, err := host.Info()
@ -223,17 +236,18 @@ func (a *announcer) newAnnounceHostRequest() (*schedulerv1.AnnounceHostRequest,
}
return &schedulerv1.AnnounceHostRequest{
Id: a.hostID,
Type: hostType,
Hostname: a.config.Host.Hostname,
Ip: a.config.Host.AdvertiseIP.String(),
Port: a.daemonPort,
DownloadPort: a.daemonDownloadPort,
Os: h.OS,
Platform: h.Platform,
PlatformFamily: h.PlatformFamily,
PlatformVersion: h.PlatformVersion,
KernelVersion: h.KernelVersion,
Id: a.hostID,
Type: hostType,
Hostname: a.config.Host.Hostname,
Ip: a.config.Host.AdvertiseIP.String(),
Port: a.daemonPort,
DownloadPort: a.daemonDownloadPort,
ObjectStoragePort: objectStoragePort,
Os: h.OS,
Platform: h.Platform,
PlatformFamily: h.PlatformFamily,
PlatformVersion: h.PlatformVersion,
KernelVersion: h.KernelVersion,
Cpu: &schedulerv1.CPU{
LogicalCount: uint32(cpuLogicalCount),
PhysicalCount: uint32(cpuPhysicalCount),

View File

@ -562,12 +562,15 @@ func (cd *clientDaemon) Serve() error {
cd.schedPeerHost.DownPort = int32(uploadPort)
// prepare object storage service listen
var objectStorageListener net.Listener
var (
objectStorageListener net.Listener
objectStoragePort int
)
if cd.Option.ObjectStorage.Enable {
if cd.Option.ObjectStorage.TCPListen == nil {
return errors.New("object storage tcp listen option is empty")
}
objectStorageListener, _, err = cd.prepareTCPListener(cd.Option.ObjectStorage.ListenOption, true)
objectStorageListener, objectStoragePort, err = cd.prepareTCPListener(cd.Option.ObjectStorage.ListenOption, true)
if err != nil {
logger.Errorf("failed to listen for object storage service: %v", err)
return err
@ -678,6 +681,11 @@ func (cd *clientDaemon) Serve() error {
if cd.managerClient != nil {
announcerOptions = append(announcerOptions, announcer.WithManagerClient(cd.managerClient))
}
if cd.Option.ObjectStorage.Enable {
announcerOptions = append(announcerOptions, announcer.WithObjectStoragePort(int32(objectStoragePort)))
}
cd.announcer = announcer.New(&cd.Option, cd.dynconfig, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort,
cd.schedPeerHost.DownPort, cd.schedulerClient, announcerOptions...)
go func() {

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20
require (
d7y.io/api/v2 v2.0.21
d7y.io/api/v2 v2.0.23
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0

4
go.sum
View File

@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.21 h1:g/hiw1KhkroQjqsnetGdjutEJv4zsNY8LoM2jDM2UYg=
d7y.io/api/v2 v2.0.21/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY=
d7y.io/api/v2 v2.0.23 h1:s0vDhh5P1jfKO/dee2DhQiUG7eFORVF2/M9O9SLRNQI=
d7y.io/api/v2 v2.0.23/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -123,6 +123,12 @@ func With(args ...any) *SugaredLoggerOnWith {
}
}
func WithScheduler(hostname, ip string, clusterID uint64) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"hostname", hostname, "ip", ip, "clusterID", clusterID},
}
}
func WithPeer(hostID, taskID, peerID string) *SugaredLoggerOnWith {
return &SugaredLoggerOnWith{
withArgs: []any{"hostID", hostID, "taskID", taskID, "peerID", peerID},

View File

@ -149,6 +149,15 @@ func (t *Job) GetGroupJobState(groupID string) (*GroupJobState, error) {
}, nil
}
func MarshalResponse(v any) (string, error) {
b, err := json.Marshal(v)
if err != nil {
return "", err
}
return string(b), nil
}
func MarshalRequest(v any) ([]machineryv1tasks.Arg, error) {
b, err := json.Marshal(v)
if err != nil {
@ -169,6 +178,7 @@ func UnmarshalResponse(data []reflect.Value, v any) error {
if err := json.Unmarshal([]byte(data[0].String()), v); err != nil {
return err
}
return nil
}

View File

@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestJobMarshal(t *testing.T) {
func TestJob_MarshalRequest(t *testing.T) {
tests := []struct {
name string
value any
@ -110,7 +110,7 @@ func TestJobMarshal(t *testing.T) {
}
}
func TestJobUnmarshal(t *testing.T) {
func TestJob_UnmarshalResponse(t *testing.T) {
tests := []struct {
name string
data []reflect.Value
@ -206,7 +206,7 @@ func TestJobUnmarshal(t *testing.T) {
}
}
func TestUnmarshalRequest(t *testing.T) {
func TestJob_UnmarshalRequest(t *testing.T) {
tests := []struct {
name string
data string

View File

@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestJobGetSchedulerQueue(t *testing.T) {
func TestJob_GetSchedulerQueue(t *testing.T) {
tests := []struct {
name string
clusterID uint

View File

@ -441,6 +441,7 @@ func New() *Config {
},
SyncPeers: SyncPeersConfig{
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
@ -626,6 +627,10 @@ func (cfg *Config) Validate() error {
return errors.New("syncPeers requires parameter interval and it must be greater than 12 hours")
}
if cfg.Job.SyncPeers.Timeout == 0 {
return errors.New("syncPeers requires parameter timeout")
}
if cfg.ObjectStorage.Enable {
if cfg.ObjectStorage.Name == "" {
return errors.New("objectStorage requires parameter name")

View File

@ -196,6 +196,7 @@ func TestConfig_Load(t *testing.T) {
},
SyncPeers: SyncPeersConfig{
Interval: 13 * time.Hour,
Timeout: 2 * time.Minute,
},
},
ObjectStorage: ObjectStorageConfig{
@ -759,6 +760,21 @@ func TestConfig_Validate(t *testing.T) {
assert.EqualError(err, "syncPeers requires parameter interval and it must be greater than 12 hours")
},
},
{
name: "syncPeers requires parameter timeout",
config: New(),
mock: func(cfg *Config) {
cfg.Auth.JWT = mockJWTConfig
cfg.Database.Type = DatabaseTypeMysql
cfg.Database.Mysql = mockMysqlConfig
cfg.Database.Redis = mockRedisConfig
cfg.Job.SyncPeers.Timeout = 0
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "syncPeers requires parameter timeout")
},
},
{
name: "objectStorage requires parameter name",
config: New(),

View File

@ -98,6 +98,9 @@ const (
// MinJobSyncPeersInterval is the min interval for syncing all peers information from the scheduler.
MinJobSyncPeersInterval = 12 * time.Hour
// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
DefaultJobSyncPeersTimeout = 10 * time.Minute
)
const (

View File

@ -70,6 +70,7 @@ job:
caCert: testdata/ca.crt
syncPeers:
interval: 13h
timeout: 2m
objectStorage:
enable: true

View File

@ -5,6 +5,7 @@
package mocks
import (
context "context"
reflect "reflect"
gomock "github.com/golang/mock/gomock"
@ -33,6 +34,20 @@ func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder {
return m.recorder
}
// Run mocks base method.
func (m *MockSyncPeers) Run(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Run", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Run indicates an expected call of Run.
func (mr *MockSyncPeersMockRecorder) Run(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0)
}
// Serve mocks base method.
func (m *MockSyncPeers) Serve() {
m.ctrl.T.Helper()

View File

@ -32,10 +32,15 @@ import (
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/scheduler/resource"
)
// SyncPeers is an interface for sync peers.
type SyncPeers interface {
// Run sync peers.
Run(context.Context) error
// Started sync peers server.
Serve()
@ -61,37 +66,56 @@ func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncP
}, nil
}
// TODO Implement function.
// Run sync peers.
func (s *syncPeers) Run(ctx context.Context) error {
// Find all of the scheduler clusters that has active schedulers.
var candidateSchedulerClusters []models.SchedulerCluster
if err := s.db.WithContext(ctx).Find(&candidateSchedulerClusters).Error; err != nil {
return err
}
// Find all of the schedulers that has active scheduler cluster.
var candidateSchedulers []models.Scheduler
for _, candidateSchedulerCluster := range candidateSchedulerClusters {
var scheduler models.Scheduler
if err := s.db.WithContext(ctx).Preload("SchedulerCluster").First(&scheduler, models.Scheduler{
SchedulerClusterID: candidateSchedulerCluster.ID,
State: models.SchedulerStateActive,
}).Error; err != nil {
continue
}
candidateSchedulers = append(candidateSchedulers, scheduler)
}
// Send sync peer requests to all available schedulers,
// and merge the sync peer results with the data in
// the peer table in the database.
for _, scheduler := range candidateSchedulers {
log := logger.WithScheduler(scheduler.Hostname, scheduler.IP, uint64(scheduler.SchedulerClusterID))
// Send sync peer request to scheduler.
results, err := s.createSyncPeers(ctx, scheduler)
if err != nil {
log.Error(err)
continue
}
log.Infof("sync peers count is %d", len(results))
// Merge sync peer results with the data in the peer table.
s.mergePeers(ctx, scheduler, results, log)
}
return nil
}
// Started sync peers server.
func (s *syncPeers) Serve() {
tick := time.NewTicker(s.config.Job.SyncPeers.Interval)
for {
select {
case <-tick.C:
// Find all of the scheduler clusters that has active schedulers.
var candidateSchedulerClusters []models.SchedulerCluster
if err := s.db.WithContext(context.Background()).Find(&candidateSchedulerClusters).Error; err != nil {
logger.Errorf("find candidate scheduler clusters failed: %v", err)
break
}
var candidateSchedulers []models.Scheduler
for _, candidateSchedulerCluster := range candidateSchedulerClusters {
var schedulers []models.Scheduler
if err := s.db.WithContext(context.Background()).Preload("SchedulerCluster").Find(&schedulers, models.Scheduler{
SchedulerClusterID: candidateSchedulerCluster.ID,
State: models.SchedulerStateActive,
}).Error; err != nil {
continue
}
candidateSchedulers = append(candidateSchedulers, schedulers...)
}
for _, scheduler := range candidateSchedulers {
if _, err := s.createSyncPeers(context.Background(), scheduler); err != nil {
logger.Error(err)
}
if err := s.Run(context.Background()); err != nil {
logger.Errorf("sync peers failed: %v", err)
}
case <-s.done:
return
@ -105,7 +129,7 @@ func (s *syncPeers) Stop() {
}
// createSyncPeers creates sync peers.
func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) (any, error) {
func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Scheduler) ([]*resource.Host, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanSyncPeers, trace.WithSpanKind(trace.SpanKindProducer))
defer span.End()
@ -123,6 +147,7 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
RoutingKey: queue.String(),
}
// Send sync peer task to worker.
logger.Infof("create sync peers in queue %v, task: %#v", queue, task)
asyncResult, err := s.job.Server.SendTaskWithContext(ctx, task)
if err != nil {
@ -130,5 +155,104 @@ func (s *syncPeers) createSyncPeers(ctx context.Context, scheduler models.Schedu
return nil, err
}
return asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval)
// Get sync peer task result.
results, err := asyncResult.GetWithTimeout(s.config.Job.SyncPeers.Timeout, DefaultTaskPollingInterval)
if err != nil {
return nil, err
}
// Unmarshal sync peer task result.
var hosts []*resource.Host
if err := internaljob.UnmarshalResponse(results, &hosts); err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, fmt.Errorf("can not found peers")
}
return hosts, nil
}
// Merge sync peer results with the data in the peer table.
func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, results []*resource.Host, log *logger.SugaredLoggerOnWith) {
// Convert sync peer results from slice to map.
syncPeers := make(map[string]*resource.Host)
for _, result := range results {
syncPeers[result.ID] = result
}
rows, err := s.db.Model(&models.Peer{}).Find(&models.Peer{SchedulerClusterID: scheduler.ID}).Rows()
if err != nil {
log.Error(err)
return
}
defer rows.Close()
for rows.Next() {
peer := models.Peer{}
if err := s.db.ScanRows(rows, &peer); err != nil {
log.Error(err)
continue
}
// If the peer exists in the sync peer results, update the peer data in the database with
// the sync peer results and delete the sync peer from the sync peers map.
id := idgen.HostIDV2(peer.IP, peer.Hostname)
if syncPeer, ok := syncPeers[id]; ok {
if err := s.db.WithContext(ctx).Preload("User").First(&models.Peer{}, peer.ID).Updates(models.Peer{
Type: syncPeer.Type.Name(),
IDC: syncPeer.Network.IDC,
Location: syncPeer.Network.Location,
Port: syncPeer.Port,
DownloadPort: syncPeer.DownloadPort,
ObjectStoragePort: syncPeer.ObjectStoragePort,
State: models.PeerStateActive,
OS: syncPeer.OS,
Platform: syncPeer.Platform,
PlatformFamily: syncPeer.PlatformFamily,
PlatformVersion: syncPeer.PlatformVersion,
KernelVersion: syncPeer.KernelVersion,
GitVersion: syncPeer.Build.GitVersion,
GitCommit: syncPeer.Build.GitCommit,
BuildPlatform: syncPeer.Build.Platform,
}).Error; err != nil {
log.Error(err)
}
// Delete the sync peer from the sync peers map.
delete(syncPeers, id)
} else {
// If the peer does not exist in the sync peer results, delete the peer in the database.
if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Peer{}, peer.ID).Error; err != nil {
log.Error(err)
}
}
}
// Insert the sync peers that do not exist in the database into the peer table.
for _, syncPeer := range syncPeers {
if err := s.db.WithContext(ctx).Create(&models.Peer{
Hostname: syncPeer.Hostname,
Type: syncPeer.Type.Name(),
IDC: syncPeer.Network.IDC,
Location: syncPeer.Network.Location,
IP: syncPeer.IP,
Port: syncPeer.Port,
DownloadPort: syncPeer.DownloadPort,
ObjectStoragePort: syncPeer.ObjectStoragePort,
State: models.PeerStateActive,
OS: syncPeer.OS,
Platform: syncPeer.Platform,
PlatformFamily: syncPeer.PlatformFamily,
PlatformVersion: syncPeer.PlatformVersion,
KernelVersion: syncPeer.KernelVersion,
GitVersion: syncPeer.Build.GitVersion,
GitCommit: syncPeer.Build.GitCommit,
BuildPlatform: syncPeer.Build.Platform,
SchedulerClusterID: uint(syncPeer.SchedulerClusterID),
}).Error; err != nil {
log.Error(err)
}
}
}

View File

@ -86,16 +86,19 @@ func EmbedFolder(fsEmbed embed.FS, targetPath string) static.ServeFileSystem {
// Server is the manager server.
type Server struct {
// Server configuration
// Server configuration.
config *config.Config
// GRPC server
// Job server.
job *job.Job
// GRPC server.
grpcServer *grpc.Server
// REST server
// REST server.
restServer *http.Server
// Metrics server
// Metrics server.
metricsServer *http.Server
}
@ -103,34 +106,35 @@ type Server struct {
func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
s := &Server{config: cfg}
// Initialize database
// Initialize database.
db, err := database.New(cfg)
if err != nil {
return nil, err
}
// Initialize enforcer
// Initialize enforcer.
enforcer, err := rbac.NewEnforcer(db.DB)
if err != nil {
return nil, err
}
// Initialize cache
// Initialize cache.
cache, err := cache.New(cfg)
if err != nil {
return nil, err
}
// Initialize searcher
// Initialize searcher.
searcher := searcher.New(d.PluginDir())
// Initialize job
// Initialize job.
job, err := job.New(cfg, db.DB)
if err != nil {
return nil, err
}
s.job = job
// Initialize object storage
// Initialize object storage.
var objectStorage objectstorage.ObjectStorage
if cfg.ObjectStorage.Enable {
objectStorage, err = objectstorage.New(
@ -146,7 +150,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
}
}
// Initialize REST server
// Initialize REST server.
restService := service.New(cfg, db, cache, job, enforcer, objectStorage)
router, err := router.Init(cfg, d.LogDir(), restService, db, enforcer, EmbedFolder(assets, assetsTargetPath))
if err != nil {
@ -157,7 +161,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
Handler: router,
}
// Initialize roles and check roles
// Initialize roles and check roles.
err = rbac.InitRBAC(enforcer, router, db.DB)
if err != nil {
return nil, err
@ -213,7 +217,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
)
}
// Initialize GRPC server
// Initialize GRPC server.
_, grpcServer, err := rpcserver.New(cfg, db, cache, searcher, objectStorage, options...)
if err != nil {
return nil, err
@ -221,7 +225,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
s.grpcServer = grpcServer
// Initialize prometheus
// Initialize prometheus.
if cfg.Metrics.Enable {
s.metricsServer = metrics.New(&cfg.Metrics, grpcServer)
}
@ -231,7 +235,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
// Serve starts the manager server.
func (s *Server) Serve() error {
// Started REST server
// Started REST server.
go func() {
logger.Infof("started rest server at %s", s.restServer.Addr)
if s.config.Server.REST.TLS != nil {
@ -251,7 +255,7 @@ func (s *Server) Serve() error {
}
}()
// Started metrics server
// Started metrics server.
if s.metricsServer != nil {
go func() {
logger.Infof("started metrics server at %s", s.metricsServer.Addr)
@ -264,14 +268,20 @@ func (s *Server) Serve() error {
}()
}
// Generate GRPC listener
// Started job server.
go func() {
logger.Info("started job server")
s.job.Serve()
}()
// Generate GRPC listener.
lis, _, err := rpc.ListenWithPortRange(s.config.Server.GRPC.ListenIP.String(), s.config.Server.GRPC.PortRange.Start, s.config.Server.GRPC.PortRange.End)
if err != nil {
logger.Fatalf("net listener failed to start: %v", err)
}
defer lis.Close()
// Started GRPC server
// Started GRPC server.
logger.Infof("started grpc server at %s://%s", lis.Addr().Network(), lis.Addr().String())
if err := s.grpcServer.Serve(lis); err != nil {
logger.Errorf("stoped grpc server: %+v", err)
@ -283,14 +293,14 @@ func (s *Server) Serve() error {
// Stop stops the manager server.
func (s *Server) Stop() {
// Stop REST server
// Stop REST server.
if err := s.restServer.Shutdown(context.Background()); err != nil {
logger.Errorf("rest server failed to stop: %+v", err)
} else {
logger.Info("rest server closed under request")
}
// Stop metrics server
// Stop metrics server.
if s.metricsServer != nil {
if err := s.metricsServer.Shutdown(context.Background()); err != nil {
logger.Errorf("metrics server failed to stop: %+v", err)
@ -299,7 +309,10 @@ func (s *Server) Stop() {
}
}
// Stop GRPC server
// Stop job server.
s.job.Stop()
// Stop GRPC server.
stopped := make(chan struct{})
go func() {
s.grpcServer.GracefulStop()

View File

@ -27,7 +27,7 @@ import (
)
func TestDigest_Reader(t *testing.T) {
logger := logger.With("test", "digest")
log := logger.With("test", "digest")
tests := []struct {
name string
@ -40,7 +40,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha1 reader",
algorithm: AlgorithmSHA1,
data: []byte("foo"),
options: []Option{WithLogger(logger)},
options: []Option{WithLogger(log)},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -55,7 +55,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha256 reader",
algorithm: AlgorithmSHA256,
data: []byte("foo"),
options: []Option{WithLogger(logger)},
options: []Option{WithLogger(log)},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -70,7 +70,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha512 reader",
algorithm: AlgorithmSHA512,
data: []byte("foo"),
options: []Option{WithLogger(logger)},
options: []Option{WithLogger(log)},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -85,7 +85,7 @@ func TestDigest_Reader(t *testing.T) {
name: "md5 reader",
algorithm: AlgorithmMD5,
data: []byte("foo"),
options: []Option{WithLogger(logger)},
options: []Option{WithLogger(log)},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -100,7 +100,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha1 reader with encoded",
algorithm: AlgorithmSHA1,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")},
options: []Option{WithLogger(log), WithEncoded("0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -115,7 +115,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha256 reader with encoded",
algorithm: AlgorithmSHA256,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")},
options: []Option{WithLogger(log), WithEncoded("2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -130,7 +130,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha512 reader with encoded",
algorithm: AlgorithmSHA512,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("f7fbba6e0636f890e56fbbf3283e524c6fa3204ae298382d624741d0dc6638326e282c41be5e4254d8820772c5518a2c5a8c0c7f7eda19594a7eb539453e1ed7")},
options: []Option{WithLogger(log), WithEncoded("f7fbba6e0636f890e56fbbf3283e524c6fa3204ae298382d624741d0dc6638326e282c41be5e4254d8820772c5518a2c5a8c0c7f7eda19594a7eb539453e1ed7")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -145,7 +145,7 @@ func TestDigest_Reader(t *testing.T) {
name: "md5 reader with encoded",
algorithm: AlgorithmMD5,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("acbd18db4cc2f85cedef654fccc4a4d8")},
options: []Option{WithLogger(log), WithEncoded("acbd18db4cc2f85cedef654fccc4a4d8")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -160,7 +160,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha1 reader with invalid encoded",
algorithm: AlgorithmSHA1,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("bar")},
options: []Option{WithLogger(log), WithEncoded("bar")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -173,7 +173,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha256 reader with invalid encoded",
algorithm: AlgorithmSHA256,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("bar")},
options: []Option{WithLogger(log), WithEncoded("bar")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -186,7 +186,7 @@ func TestDigest_Reader(t *testing.T) {
name: "sha512 reader with invalid encoded",
algorithm: AlgorithmSHA512,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("bar")},
options: []Option{WithLogger(log), WithEncoded("bar")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -199,7 +199,7 @@ func TestDigest_Reader(t *testing.T) {
name: "md5 reader with invalid encoded",
algorithm: AlgorithmMD5,
data: []byte("foo"),
options: []Option{WithLogger(logger), WithEncoded("bar")},
options: []Option{WithLogger(log), WithEncoded("bar")},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.NoError(err)
@ -212,7 +212,7 @@ func TestDigest_Reader(t *testing.T) {
name: "new reader with invalid algorithm",
algorithm: "",
data: []byte("foo"),
options: []Option{WithLogger(logger)},
options: []Option{WithLogger(log)},
run: func(t *testing.T, data []byte, reader Reader, err error) {
assert := assert.New(t)
assert.Error(err)

View File

@ -212,7 +212,7 @@ func (j *job) preheat(ctx context.Context, req string) error {
}
// syncPeers is a job to sync peers.
func (j *job) syncPeers() ([]*resource.Host, error) {
func (j *job) syncPeers() (string, error) {
var hosts []*resource.Host
j.resource.HostManager().Range(func(key, value any) bool {
host, ok := value.(*resource.Host)
@ -225,5 +225,5 @@ func (j *job) syncPeers() ([]*resource.Host, error) {
return true
})
return hosts, nil
return internaljob.MarshalResponse(hosts)
}

View File

@ -38,6 +38,13 @@ func WithSchedulerClusterID(id uint64) HostOption {
}
}
// WithObjectStoragePort sets host's ObjectStoragePort.
func WithObjectStoragePort(port int32) HostOption {
return func(h *Host) {
h.ObjectStoragePort = port
}
}
// WithConcurrentUploadLimit sets host's ConcurrentUploadLimit.
func WithConcurrentUploadLimit(limit int32) HostOption {
return func(h *Host) {
@ -135,6 +142,9 @@ type Host struct {
// DownloadPort is piece downloading port.
DownloadPort int32
// ObjectStoragePort is object storage port.
ObjectStoragePort int32
// Host OS.
OS string

View File

@ -150,6 +150,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
@ -173,6 +174,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawSeedHost.IP)
assert.Equal(host.Port, mockRawSeedHost.Port)
assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
@ -209,6 +211,31 @@ func TestHost_NewHost(t *testing.T) {
assert.NotNil(host.Log)
},
},
{
name: "new host and set object storage port",
rawHost: mockRawHost,
options: []HostOption{WithObjectStoragePort(1)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(1))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set upload loadlimit",
rawHost: mockRawHost,
@ -221,6 +248,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
@ -245,6 +273,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.OS, "linux")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -270,6 +299,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.Platform, "ubuntu")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -295,6 +325,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.PlatformFamily, "debian")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -320,6 +351,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.PlatformVersion, "22.04")
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
@ -344,6 +376,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.Equal(host.KernelVersion, "5.15.0-27-generic")
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -369,6 +402,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.CPU, mockCPU)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -394,6 +428,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Memory, mockMemory)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -419,6 +454,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Network, mockNetwork)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -444,6 +480,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Disk, mockDisk)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
@ -469,6 +506,7 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.ObjectStoragePort, int32(0))
assert.EqualValues(host.Build, mockBuild)
assert.Equal(host.SchedulerClusterID, uint64(0))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))

View File

@ -552,6 +552,10 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
options = append(options, resource.WithSchedulerClusterID(req.GetSchedulerClusterId()))
}
if req.GetObjectStoragePort() != 0 {
options = append(options, resource.WithObjectStoragePort(req.GetObjectStoragePort()))
}
host = resource.NewHost(
req.GetId(), req.GetIp(), req.GetHostname(), req.GetPort(), req.GetDownloadPort(),
types.ParseHostType(req.GetType()), options...,

View File

@ -540,6 +540,10 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
options = append(options, resource.WithSchedulerClusterID(req.Host.GetSchedulerClusterId()))
}
if req.Host.GetObjectStoragePort() != 0 {
options = append(options, resource.WithObjectStoragePort(req.Host.GetObjectStoragePort()))
}
host = resource.NewHost(
req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(),
req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),