feat: update api version to 2.0.8 (#2566)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-07-25 13:45:54 +08:00 committed by GitHub
parent b4a390db08
commit bd3d76a97c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 200 additions and 200 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20 go 1.20
require ( require (
d7y.io/api/v2 v2.0.7 d7y.io/api/v2 v2.0.8
github.com/MysteriousPotato/go-lockable v1.0.0 github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.6 github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0 github.com/Showmax/go-fqdn v1.0.0

4
go.sum
View File

@ -50,8 +50,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api/v2 v2.0.7 h1:jfvjTaZ4Q1Z/Ak25u3EU2UuXSDlmVI8OWiwqFW8961A= d7y.io/api/v2 v2.0.8 h1:LVrbv83ubb2wMyazsgoteQJC+J70qg7JDg7/1zdOYmg=
d7y.io/api/v2 v2.0.7/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY= d7y.io/api/v2 v2.0.8/go.mod h1:lwCvFjtRVsyTKsiXfh2W0Jdv+5tQGR/vFj+TknwnusY=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -128,11 +128,11 @@ func (s *schedulerServerV2) StatTask(ctx context.Context, req *schedulerv2.StatT
func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) (*emptypb.Empty, error) { func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) (*emptypb.Empty, error) {
// Collect AnnounceHostCount metrics. // Collect AnnounceHostCount metrics.
metrics.AnnounceHostCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion, metrics.AnnounceHostCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion,
req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc() req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GetGoVersion(), req.Host.Build.Platform).Inc()
if err := s.service.AnnounceHost(ctx, req); err != nil { if err := s.service.AnnounceHost(ctx, req); err != nil {
// Collect AnnounceHostFailureCount metrics. // Collect AnnounceHostFailureCount metrics.
metrics.AnnounceHostFailureCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion, metrics.AnnounceHostFailureCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion,
req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc() req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GetGoVersion(), req.Host.Build.Platform).Inc()
return nil, err return nil, err
} }

View File

@ -671,8 +671,8 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount, TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
Location: candidateParent.Host.Network.Location, Location: &candidateParent.Host.Network.Location,
Idc: candidateParent.Host.Network.IDC, Idc: &candidateParent.Host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: candidateParent.Host.Disk.Total, Total: candidateParent.Host.Disk.Total,
@ -687,7 +687,7 @@ func ConstructSuccessSmallTaskResponse(candidateParent *resource.Peer) *schedule
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: candidateParent.Host.Build.GitVersion, GitVersion: candidateParent.Host.Build.GitVersion,
GitCommit: candidateParent.Host.Build.GitCommit, GitCommit: candidateParent.Host.Build.GitCommit,
GoVersion: candidateParent.Host.Build.GoVersion, GoVersion: &candidateParent.Host.Build.GoVersion,
Platform: candidateParent.Host.Build.Platform, Platform: candidateParent.Host.Build.Platform,
}, },
} }
@ -845,8 +845,8 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount, TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
Location: candidateParent.Host.Network.Location, Location: &candidateParent.Host.Network.Location,
Idc: candidateParent.Host.Network.IDC, Idc: &candidateParent.Host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: candidateParent.Host.Disk.Total, Total: candidateParent.Host.Disk.Total,
@ -861,7 +861,7 @@ func ConstructSuccessNormalTaskResponse(dynconfig config.DynconfigInterface, can
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: candidateParent.Host.Build.GitVersion, GitVersion: candidateParent.Host.Build.GitVersion,
GitCommit: candidateParent.Host.Build.GitCommit, GitCommit: candidateParent.Host.Build.GitCommit,
GoVersion: candidateParent.Host.Build.GoVersion, GoVersion: &candidateParent.Host.Build.GoVersion,
Platform: candidateParent.Host.Build.Platform, Platform: candidateParent.Host.Build.Platform,
}, },
} }

View File

@ -1351,8 +1351,8 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount, TcpConnectionCount: candidateParent.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: candidateParent.Host.Network.UploadTCPConnectionCount,
Location: candidateParent.Host.Network.Location, Location: &candidateParent.Host.Network.Location,
Idc: candidateParent.Host.Network.IDC, Idc: &candidateParent.Host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: candidateParent.Host.Disk.Total, Total: candidateParent.Host.Disk.Total,
@ -1367,7 +1367,7 @@ func TestScheduling_ConstructSuccessSmallTaskResponse(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: candidateParent.Host.Build.GitVersion, GitVersion: candidateParent.Host.Build.GitVersion,
GitCommit: candidateParent.Host.Build.GitCommit, GitCommit: candidateParent.Host.Build.GitCommit,
GoVersion: candidateParent.Host.Build.GoVersion, GoVersion: &candidateParent.Host.Build.GoVersion,
Platform: candidateParent.Host.Build.Platform, Platform: candidateParent.Host.Build.Platform,
}, },
}, },
@ -1509,8 +1509,8 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: candidateParents[0].Host.Network.TCPConnectionCount, TcpConnectionCount: candidateParents[0].Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParents[0].Host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: candidateParents[0].Host.Network.UploadTCPConnectionCount,
Location: candidateParents[0].Host.Network.Location, Location: &candidateParents[0].Host.Network.Location,
Idc: candidateParents[0].Host.Network.IDC, Idc: &candidateParents[0].Host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: candidateParents[0].Host.Disk.Total, Total: candidateParents[0].Host.Disk.Total,
@ -1525,7 +1525,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: candidateParents[0].Host.Build.GitVersion, GitVersion: candidateParents[0].Host.Build.GitVersion,
GitCommit: candidateParents[0].Host.Build.GitCommit, GitCommit: candidateParents[0].Host.Build.GitCommit,
GoVersion: candidateParents[0].Host.Build.GoVersion, GoVersion: &candidateParents[0].Host.Build.GoVersion,
Platform: candidateParents[0].Host.Build.Platform, Platform: candidateParents[0].Host.Build.Platform,
}, },
}, },
@ -1641,8 +1641,8 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: candidateParents[0].Host.Network.TCPConnectionCount, TcpConnectionCount: candidateParents[0].Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: candidateParents[0].Host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: candidateParents[0].Host.Network.UploadTCPConnectionCount,
Location: candidateParents[0].Host.Network.Location, Location: &candidateParents[0].Host.Network.Location,
Idc: candidateParents[0].Host.Network.IDC, Idc: &candidateParents[0].Host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: candidateParents[0].Host.Disk.Total, Total: candidateParents[0].Host.Disk.Total,
@ -1657,7 +1657,7 @@ func TestScheduling_ConstructSuccessNormalTaskResponse(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: candidateParents[0].Host.Build.GitVersion, GitVersion: candidateParents[0].Host.Build.GitVersion,
GitCommit: candidateParents[0].Host.Build.GitCommit, GitCommit: candidateParents[0].Host.Build.GitCommit,
GoVersion: candidateParents[0].Host.Build.GoVersion, GoVersion: &candidateParents[0].Host.Build.GoVersion,
Platform: candidateParents[0].Host.Build.Platform, Platform: candidateParents[0].Host.Build.Platform,
}, },
}, },

View File

@ -276,8 +276,8 @@ var (
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: mockNetwork.TCPConnectionCount, TcpConnectionCount: mockNetwork.TCPConnectionCount,
UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
Location: mockNetwork.Location, Location: &mockNetwork.Location,
Idc: mockNetwork.IDC, Idc: &mockNetwork.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: mockDisk.Total, Total: mockDisk.Total,
@ -292,7 +292,7 @@ var (
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: mockBuild.GitVersion, GitVersion: mockBuild.GitVersion,
GitCommit: mockBuild.GitCommit, GitCommit: mockBuild.GitCommit,
GoVersion: mockBuild.GoVersion, GoVersion: &mockBuild.GoVersion,
Platform: mockBuild.Platform, Platform: mockBuild.Platform,
}, },
}, },

View File

@ -106,77 +106,77 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
return err return err
} }
logger := logger.WithPeer(req.HostId, req.TaskId, req.PeerId) logger := logger.WithPeer(req.GetHostId(), req.GetTaskId(), req.GetPeerId())
switch announcePeerRequest := req.GetRequest().(type) { switch announcePeerRequest := req.GetRequest().(type) {
case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest: case *schedulerv2.AnnouncePeerRequest_RegisterPeerRequest:
logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %s", announcePeerRequest.RegisterPeerRequest.Download.Url) logger.Infof("receive AnnouncePeerRequest_RegisterPeerRequest: %s", announcePeerRequest.RegisterPeerRequest.Download.Url)
if err := v.handleRegisterPeerRequest(ctx, stream, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterPeerRequest); err != nil { if err := v.handleRegisterPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), announcePeerRequest.RegisterPeerRequest); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest: case *schedulerv2.AnnouncePeerRequest_RegisterSeedPeerRequest:
logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %s", announcePeerRequest.RegisterSeedPeerRequest.Download.Url) logger.Infof("receive AnnouncePeerRequest_RegisterSeedPeerRequest: %s", announcePeerRequest.RegisterSeedPeerRequest.Download.Url)
if err := v.handleRegisterSeedPeerRequest(ctx, stream, req.HostId, req.TaskId, req.PeerId, announcePeerRequest.RegisterSeedPeerRequest); err != nil { if err := v.handleRegisterSeedPeerRequest(ctx, stream, req.GetHostId(), req.GetTaskId(), req.GetPeerId(), announcePeerRequest.RegisterSeedPeerRequest); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPeerStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPeerStartedRequest: %#v", announcePeerRequest.DownloadPeerStartedRequest)
if err := v.handleDownloadPeerStartedRequest(ctx, req.PeerId); err != nil { if err := v.handleDownloadPeerStartedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceStartedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceStartedRequest)
if err := v.handleDownloadPeerBackToSourceStartedRequest(ctx, req.PeerId); err != nil { if err := v.handleDownloadPeerBackToSourceStartedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPeerFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPeerFinishedRequest: %#v", announcePeerRequest.DownloadPeerFinishedRequest)
if err := v.handleDownloadPeerFinishedRequest(ctx, req.PeerId); err != nil { if err := v.handleDownloadPeerFinishedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFinishedRequest)
if err := v.handleDownloadPeerBackToSourceFinishedRequest(ctx, req.PeerId, announcePeerRequest.DownloadPeerBackToSourceFinishedRequest); err != nil { if err := v.handleDownloadPeerBackToSourceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPeerBackToSourceFinishedRequest); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPeerFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerFailedRequest: %#v", announcePeerRequest.DownloadPeerFailedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPeerFailedRequest: %#v", announcePeerRequest.DownloadPeerFailedRequest)
if err := v.handleDownloadPeerFailedRequest(ctx, req.PeerId); err != nil { if err := v.handleDownloadPeerFailedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFailedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPeerBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPeerBackToSourceFailedRequest)
if err := v.handleDownloadPeerBackToSourceFailedRequest(ctx, req.PeerId); err != nil { if err := v.handleDownloadPeerBackToSourceFailedRequest(ctx, req.GetPeerId()); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPieceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPieceFinishedRequest: %#v", announcePeerRequest.DownloadPieceFinishedRequest)
if err := v.handleDownloadPieceFinishedRequest(ctx, req.PeerId, announcePeerRequest.DownloadPieceFinishedRequest); err != nil { if err := v.handleDownloadPieceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceFinishedRequest); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFinishedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFinishedRequest)
if err := v.handleDownloadPieceBackToSourceFinishedRequest(ctx, req.PeerId, announcePeerRequest.DownloadPieceBackToSourceFinishedRequest); err != nil { if err := v.handleDownloadPieceBackToSourceFinishedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceBackToSourceFinishedRequest); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPieceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPieceFailedRequest: %#v", announcePeerRequest.DownloadPieceFailedRequest)
if err := v.handleDownloadPieceFailedRequest(ctx, req.PeerId, announcePeerRequest.DownloadPieceFailedRequest); err != nil { if err := v.handleDownloadPieceFailedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceFailedRequest); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: case *schedulerv2.AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest:
logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest) logger.Infof("receive AnnouncePeerRequest_DownloadPieceBackToSourceFailedRequest: %#v", announcePeerRequest.DownloadPieceBackToSourceFailedRequest)
if err := v.handleDownloadPieceBackToSourceFailedRequest(ctx, req.PeerId, announcePeerRequest.DownloadPieceBackToSourceFailedRequest); err != nil { if err := v.handleDownloadPieceBackToSourceFailedRequest(ctx, req.GetPeerId(), announcePeerRequest.DownloadPieceBackToSourceFailedRequest); err != nil {
logger.Error(err) logger.Error(err)
return err return err
} }
@ -193,11 +193,11 @@ func (v *V2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePeerServer) error
// StatPeer checks information of peer. // StatPeer checks information of peer.
func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) { func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) {
logger.WithTaskID(req.TaskId).Infof("stat peer request: %#v", req) logger.WithTaskID(req.GetTaskId()).Infof("stat peer request: %#v", req)
peer, loaded := v.resource.PeerManager().Load(req.PeerId) peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())
if !loaded { if !loaded {
return nil, status.Errorf(codes.NotFound, "peer %s not found", req.PeerId) return nil, status.Errorf(codes.NotFound, "peer %s not found", req.GetPeerId())
} }
resp := &commonv2.Peer{ resp := &commonv2.Peer{
@ -336,8 +336,8 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: peer.Host.Network.TCPConnectionCount, TcpConnectionCount: peer.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
Location: peer.Host.Network.Location, Location: &peer.Host.Network.Location,
Idc: peer.Host.Network.IDC, Idc: &peer.Host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: peer.Host.Disk.Total, Total: peer.Host.Disk.Total,
@ -352,7 +352,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: peer.Host.Build.GitVersion, GitVersion: peer.Host.Build.GitVersion,
GitCommit: peer.Host.Build.GitCommit, GitCommit: peer.Host.Build.GitCommit,
GoVersion: peer.Host.Build.GoVersion, GoVersion: &peer.Host.Build.GoVersion,
Platform: peer.Host.Build.Platform, Platform: peer.Host.Build.Platform,
}, },
} }
@ -362,11 +362,11 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
// LeavePeer releases peer in scheduler. // LeavePeer releases peer in scheduler.
func (v *V2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) error { func (v *V2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) error {
logger.WithTaskAndPeerID(req.TaskId, req.PeerId).Infof("leave peer request: %#v", req) logger.WithTaskAndPeerID(req.GetTaskId(), req.GetPeerId()).Infof("leave peer request: %#v", req)
peer, loaded := v.resource.PeerManager().Load(req.PeerId) peer, loaded := v.resource.PeerManager().Load(req.GetPeerId())
if !loaded { if !loaded {
msg := fmt.Sprintf("peer %s not found", req.PeerId) msg := fmt.Sprintf("peer %s not found", req.GetPeerId())
logger.Error(msg) logger.Error(msg)
return status.Error(codes.NotFound, msg) return status.Error(codes.NotFound, msg)
} }
@ -388,11 +388,11 @@ func (v *V2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequ
// StatTask checks information of task. // StatTask checks information of task.
func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) { func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) {
logger.WithTaskID(req.Id).Infof("stat task request: %#v", req) logger.WithTaskID(req.GetId()).Infof("stat task request: %#v", req)
task, loaded := v.resource.TaskManager().Load(req.Id) task, loaded := v.resource.TaskManager().Load(req.GetId())
if !loaded { if !loaded {
msg := fmt.Sprintf("task %s not found", req.Id) msg := fmt.Sprintf("task %s not found", req.GetId())
logger.Error(msg) logger.Error(msg)
return nil, status.Error(codes.NotFound, msg) return nil, status.Error(codes.NotFound, msg)
} }
@ -451,7 +451,7 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c
// AnnounceHost announces host to scheduler. // AnnounceHost announces host to scheduler.
func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) error { func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) error {
logger.WithHostID(req.Host.Id).Infof("announce host request: %#v", req.Host) logger.WithHostID(req.Host.GetId()).Infof("announce host request: %#v", req.GetHost())
// Get scheduler cluster client config by manager. // Get scheduler cluster client config by manager.
var concurrentUploadLimit int32 var concurrentUploadLimit int32
@ -459,86 +459,86 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
concurrentUploadLimit = int32(clientConfig.LoadLimit) concurrentUploadLimit = int32(clientConfig.LoadLimit)
} }
host, loaded := v.resource.HostManager().Load(req.Host.Id) host, loaded := v.resource.HostManager().Load(req.Host.GetId())
if !loaded { if !loaded {
options := []resource.HostOption{ options := []resource.HostOption{
resource.WithOS(req.Host.Os), resource.WithOS(req.Host.GetOs()),
resource.WithPlatform(req.Host.Platform), resource.WithPlatform(req.Host.GetPlatform()),
resource.WithPlatformFamily(req.Host.PlatformFamily), resource.WithPlatformFamily(req.Host.GetPlatformFamily()),
resource.WithPlatformVersion(req.Host.PlatformVersion), resource.WithPlatformVersion(req.Host.GetPlatformVersion()),
resource.WithKernelVersion(req.Host.KernelVersion), resource.WithKernelVersion(req.Host.GetKernelVersion()),
} }
if concurrentUploadLimit > 0 { if concurrentUploadLimit > 0 {
options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit)) options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit))
} }
if req.Host.Cpu != nil { if req.Host.GetCpu() != nil {
options = append(options, resource.WithCPU(resource.CPU{ options = append(options, resource.WithCPU(resource.CPU{
LogicalCount: req.Host.Cpu.LogicalCount, LogicalCount: req.Host.Cpu.GetLogicalCount(),
PhysicalCount: req.Host.Cpu.PhysicalCount, PhysicalCount: req.Host.Cpu.GetPhysicalCount(),
Percent: req.Host.Cpu.Percent, Percent: req.Host.Cpu.GetPercent(),
ProcessPercent: req.Host.Cpu.ProcessPercent, ProcessPercent: req.Host.Cpu.GetProcessPercent(),
Times: resource.CPUTimes{ Times: resource.CPUTimes{
User: req.Host.Cpu.Times.User, User: req.Host.Cpu.Times.GetUser(),
System: req.Host.Cpu.Times.System, System: req.Host.Cpu.Times.GetSystem(),
Idle: req.Host.Cpu.Times.Idle, Idle: req.Host.Cpu.Times.GetIdle(),
Nice: req.Host.Cpu.Times.Nice, Nice: req.Host.Cpu.Times.GetNice(),
Iowait: req.Host.Cpu.Times.Iowait, Iowait: req.Host.Cpu.Times.GetIowait(),
Irq: req.Host.Cpu.Times.Irq, Irq: req.Host.Cpu.Times.GetIrq(),
Softirq: req.Host.Cpu.Times.Softirq, Softirq: req.Host.Cpu.Times.GetSoftirq(),
Steal: req.Host.Cpu.Times.Steal, Steal: req.Host.Cpu.Times.GetSteal(),
Guest: req.Host.Cpu.Times.Guest, Guest: req.Host.Cpu.Times.GetGuest(),
GuestNice: req.Host.Cpu.Times.GuestNice, GuestNice: req.Host.Cpu.Times.GetGuest(),
}, },
})) }))
} }
if req.Host.Memory != nil { if req.Host.GetMemory() != nil {
options = append(options, resource.WithMemory(resource.Memory{ options = append(options, resource.WithMemory(resource.Memory{
Total: req.Host.Memory.Total, Total: req.Host.Memory.GetTotal(),
Available: req.Host.Memory.Available, Available: req.Host.Memory.GetAvailable(),
Used: req.Host.Memory.Used, Used: req.Host.Memory.GetUsed(),
UsedPercent: req.Host.Memory.UsedPercent, UsedPercent: req.Host.Memory.GetUsedPercent(),
ProcessUsedPercent: req.Host.Memory.ProcessUsedPercent, ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(),
Free: req.Host.Memory.Free, Free: req.Host.Memory.GetFree(),
})) }))
} }
if req.Host.Network != nil { if req.Host.GetNetwork() != nil {
options = append(options, resource.WithNetwork(resource.Network{ options = append(options, resource.WithNetwork(resource.Network{
TCPConnectionCount: req.Host.Network.TcpConnectionCount, TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(),
UploadTCPConnectionCount: req.Host.Network.UploadTcpConnectionCount, UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),
Location: req.Host.Network.Location, Location: req.Host.Network.GetLocation(),
IDC: req.Host.Network.Idc, IDC: req.Host.Network.GetIdc(),
})) }))
} }
if req.Host.Disk != nil { if req.Host.GetDisk() != nil {
options = append(options, resource.WithDisk(resource.Disk{ options = append(options, resource.WithDisk(resource.Disk{
Total: req.Host.Disk.Total, Total: req.Host.Disk.GetTotal(),
Free: req.Host.Disk.Free, Free: req.Host.Disk.GetFree(),
Used: req.Host.Disk.Used, Used: req.Host.Disk.GetUsed(),
UsedPercent: req.Host.Disk.UsedPercent, UsedPercent: req.Host.Disk.GetUsedPercent(),
InodesTotal: req.Host.Disk.InodesTotal, InodesTotal: req.Host.Disk.GetInodesTotal(),
InodesUsed: req.Host.Disk.InodesUsed, InodesUsed: req.Host.Disk.GetInodesUsed(),
InodesFree: req.Host.Disk.InodesFree, InodesFree: req.Host.Disk.GetInodesFree(),
InodesUsedPercent: req.Host.Disk.InodesUsedPercent, InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
})) }))
} }
if req.Host.Build != nil { if req.Host.GetBuild() != nil {
options = append(options, resource.WithBuild(resource.Build{ options = append(options, resource.WithBuild(resource.Build{
GitVersion: req.Host.Build.GitVersion, GitVersion: req.Host.Build.GetGitVersion(),
GitCommit: req.Host.Build.GitCommit, GitCommit: req.Host.Build.GetGitCommit(),
GoVersion: req.Host.Build.GoVersion, GoVersion: req.Host.Build.GetGoVersion(),
Platform: req.Host.Build.Platform, Platform: req.Host.Build.GetPlatform(),
})) }))
} }
host = resource.NewHost( host = resource.NewHost(
req.Host.Id, req.Host.Ip, req.Host.Hostname, req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(),
req.Host.Port, req.Host.DownloadPort, types.HostType(req.Host.Type), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),
options..., options...,
) )
@ -548,80 +548,80 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
// Host already exists and updates properties. // Host already exists and updates properties.
host.Port = req.Host.Port host.Port = req.Host.GetPort()
host.DownloadPort = req.Host.DownloadPort host.DownloadPort = req.Host.GetDownloadPort()
host.Type = types.HostType(req.Host.Type) host.Type = types.HostType(req.Host.GetType())
host.OS = req.Host.Os host.OS = req.Host.GetOs()
host.Platform = req.Host.Platform host.Platform = req.Host.GetPlatform()
host.PlatformFamily = req.Host.PlatformFamily host.PlatformFamily = req.Host.GetPlatformFamily()
host.PlatformVersion = req.Host.PlatformVersion host.PlatformVersion = req.Host.GetPlatformVersion()
host.KernelVersion = req.Host.KernelVersion host.KernelVersion = req.Host.GetKernelVersion()
host.UpdatedAt.Store(time.Now()) host.UpdatedAt.Store(time.Now())
if concurrentUploadLimit > 0 { if concurrentUploadLimit > 0 {
host.ConcurrentUploadLimit.Store(concurrentUploadLimit) host.ConcurrentUploadLimit.Store(concurrentUploadLimit)
} }
if req.Host.Cpu != nil { if req.Host.GetCpu() != nil {
host.CPU = resource.CPU{ host.CPU = resource.CPU{
LogicalCount: req.Host.Cpu.LogicalCount, LogicalCount: req.Host.Cpu.GetLogicalCount(),
PhysicalCount: req.Host.Cpu.PhysicalCount, PhysicalCount: req.Host.Cpu.GetPhysicalCount(),
Percent: req.Host.Cpu.Percent, Percent: req.Host.Cpu.GetPercent(),
ProcessPercent: req.Host.Cpu.ProcessPercent, ProcessPercent: req.Host.Cpu.GetProcessPercent(),
Times: resource.CPUTimes{ Times: resource.CPUTimes{
User: req.Host.Cpu.Times.User, User: req.Host.Cpu.Times.GetUser(),
System: req.Host.Cpu.Times.System, System: req.Host.Cpu.Times.GetSystem(),
Idle: req.Host.Cpu.Times.Idle, Idle: req.Host.Cpu.Times.GetIdle(),
Nice: req.Host.Cpu.Times.Nice, Nice: req.Host.Cpu.Times.GetNice(),
Iowait: req.Host.Cpu.Times.Iowait, Iowait: req.Host.Cpu.Times.GetIowait(),
Irq: req.Host.Cpu.Times.Irq, Irq: req.Host.Cpu.Times.GetIrq(),
Softirq: req.Host.Cpu.Times.Softirq, Softirq: req.Host.Cpu.Times.GetSoftirq(),
Steal: req.Host.Cpu.Times.Steal, Steal: req.Host.Cpu.Times.GetSteal(),
Guest: req.Host.Cpu.Times.Guest, Guest: req.Host.Cpu.Times.GetGuest(),
GuestNice: req.Host.Cpu.Times.GuestNice, GuestNice: req.Host.Cpu.Times.GetGuestNice(),
}, },
} }
} }
if req.Host.Memory != nil { if req.Host.GetMemory() != nil {
host.Memory = resource.Memory{ host.Memory = resource.Memory{
Total: req.Host.Memory.Total, Total: req.Host.Memory.GetTotal(),
Available: req.Host.Memory.Available, Available: req.Host.Memory.GetAvailable(),
Used: req.Host.Memory.Used, Used: req.Host.Memory.GetUsed(),
UsedPercent: req.Host.Memory.UsedPercent, UsedPercent: req.Host.Memory.GetUsedPercent(),
ProcessUsedPercent: req.Host.Memory.ProcessUsedPercent, ProcessUsedPercent: req.Host.Memory.GetProcessUsedPercent(),
Free: req.Host.Memory.Free, Free: req.Host.Memory.GetFree(),
} }
} }
if req.Host.Network != nil { if req.Host.GetNetwork() != nil {
host.Network = resource.Network{ host.Network = resource.Network{
TCPConnectionCount: req.Host.Network.TcpConnectionCount, TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(),
UploadTCPConnectionCount: req.Host.Network.UploadTcpConnectionCount, UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),
Location: req.Host.Network.Location, Location: req.Host.Network.GetLocation(),
IDC: req.Host.Network.Idc, IDC: req.Host.Network.GetIdc(),
} }
} }
if req.Host.Disk != nil { if req.Host.GetDisk() != nil {
host.Disk = resource.Disk{ host.Disk = resource.Disk{
Total: req.Host.Disk.Total, Total: req.Host.Disk.GetTotal(),
Free: req.Host.Disk.Free, Free: req.Host.Disk.GetFree(),
Used: req.Host.Disk.Used, Used: req.Host.Disk.GetUsed(),
UsedPercent: req.Host.Disk.UsedPercent, UsedPercent: req.Host.Disk.GetUsedPercent(),
InodesTotal: req.Host.Disk.InodesTotal, InodesTotal: req.Host.Disk.GetInodesTotal(),
InodesUsed: req.Host.Disk.InodesUsed, InodesUsed: req.Host.Disk.GetInodesUsed(),
InodesFree: req.Host.Disk.InodesFree, InodesFree: req.Host.Disk.GetInodesFree(),
InodesUsedPercent: req.Host.Disk.InodesUsedPercent, InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
} }
} }
if req.Host.Build != nil { if req.Host.GetBuild() != nil {
host.Build = resource.Build{ host.Build = resource.Build{
GitVersion: req.Host.Build.GitVersion, GitVersion: req.Host.Build.GetGitVersion(),
GitCommit: req.Host.Build.GitCommit, GitCommit: req.Host.Build.GetGitCommit(),
GoVersion: req.Host.Build.GoVersion, GoVersion: req.Host.Build.GetGoVersion(),
Platform: req.Host.Build.Platform, Platform: req.Host.Build.GetPlatform(),
} }
} }
@ -630,11 +630,11 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
// LeaveHost releases host in scheduler. // LeaveHost releases host in scheduler.
func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) error { func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) error {
logger.WithHostID(req.Id).Infof("leave host request: %#v", req) logger.WithHostID(req.GetId()).Infof("leave host request: %#v", req)
host, loaded := v.resource.HostManager().Load(req.Id) host, loaded := v.resource.HostManager().Load(req.GetId())
if !loaded { if !loaded {
msg := fmt.Sprintf("host %s not found", req.Id) msg := fmt.Sprintf("host %s not found", req.GetId())
logger.Error(msg) logger.Error(msg)
return status.Error(codes.NotFound, msg) return status.Error(codes.NotFound, msg)
} }
@ -660,13 +660,13 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
return err return err
} }
logger := logger.WithHost(req.Host.Id, req.Host.Hostname, req.Host.Ip) logger := logger.WithHost(req.Host.GetId(), req.Host.GetHostname(), req.Host.GetIp())
switch syncProbesRequest := req.GetRequest().(type) { switch syncProbesRequest := req.GetRequest().(type) {
case *schedulerv2.SyncProbesRequest_ProbeStartedRequest: case *schedulerv2.SyncProbesRequest_ProbeStartedRequest:
// Find probed hosts in network topology. Based on the source host information, // Find probed hosts in network topology. Based on the source host information,
// the most candidate hosts will be evaluated. // the most candidate hosts will be evaluated.
logger.Info("receive SyncProbesRequest_ProbeStartedRequest") logger.Info("receive SyncProbesRequest_ProbeStartedRequest")
hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id) hosts, err := v.networkTopology.FindProbedHosts(req.Host.GetId())
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return status.Error(codes.FailedPrecondition, err.Error()) return status.Error(codes.FailedPrecondition, err.Error())
@ -715,8 +715,8 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: host.Network.TCPConnectionCount, TcpConnectionCount: host.Network.TCPConnectionCount,
UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount,
Location: host.Network.Location, Location: &host.Network.Location,
Idc: host.Network.IDC, Idc: &host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: host.Disk.Total, Total: host.Disk.Total,
@ -731,7 +731,7 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: host.Build.GitVersion, GitVersion: host.Build.GitVersion,
GitCommit: host.Build.GitCommit, GitCommit: host.Build.GitCommit,
GoVersion: host.Build.GoVersion, GoVersion: &host.Build.GoVersion,
Platform: host.Build.Platform, Platform: host.Build.Platform,
}, },
}) })
@ -755,12 +755,12 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
continue continue
} }
if err := v.networkTopology.Store(req.Host.Id, probedHost.ID); err != nil { if err := v.networkTopology.Store(req.Host.GetId(), probedHost.ID); err != nil {
logger.Errorf("store failed: %s", err.Error()) logger.Errorf("store failed: %s", err.Error())
continue continue
} }
if err := v.networkTopology.Probes(req.Host.Id, probe.Host.Id).Enqueue(&networktopology.Probe{ if err := v.networkTopology.Probes(req.Host.GetId(), probe.Host.Id).Enqueue(&networktopology.Probe{
Host: probedHost, Host: probedHost,
RTT: probe.Rtt.AsDuration(), RTT: probe.Rtt.AsDuration(),
CreatedAt: probe.CreatedAt.AsTime(), CreatedAt: probe.CreatedAt.AsTime(),
@ -791,7 +791,7 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer. // Handle resource included host, task, and peer.
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download) _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
if err != nil { if err != nil {
return err return err
} }
@ -828,7 +828,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
// handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest. // handleRegisterSeedPeerRequest handles RegisterSeedPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) error { func (v *V2) handleRegisterSeedPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterSeedPeerRequest) error {
// Handle resource included host, task, and peer. // Handle resource included host, task, and peer.
_, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.Download) _, task, peer, err := v.handleResource(ctx, stream, hostID, taskID, peerID, req.GetDownload())
if err != nil { if err != nil {
return err return err
} }
@ -969,8 +969,8 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context,
// Handle task with peer back-to-source finished request, peer can only represent // Handle task with peer back-to-source finished request, peer can only represent
// a successful task after downloading the complete task. // a successful task after downloading the complete task.
if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) { if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) {
peer.Task.ContentLength.Store(req.ContentLength) peer.Task.ContentLength.Store(req.GetContentLength())
peer.Task.TotalPieceCount.Store(req.PieceCount) peer.Task.TotalPieceCount.Store(req.GetPieceCount())
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil { if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -1063,17 +1063,17 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe
func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFinishedRequest) error { func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFinishedRequest) error {
// Construct piece. // Construct piece.
piece := &resource.Piece{ piece := &resource.Piece{
Number: req.Piece.Number, Number: req.Piece.GetNumber(),
ParentID: req.Piece.ParentId, ParentID: req.Piece.GetParentId(),
Offset: req.Piece.Offset, Offset: req.Piece.GetOffset(),
Length: req.Piece.Length, Length: req.Piece.GetLength(),
TrafficType: req.Piece.TrafficType, TrafficType: req.Piece.GetTrafficType(),
Cost: req.Piece.Cost.AsDuration(), Cost: req.Piece.GetCost().AsDuration(),
CreatedAt: req.Piece.CreatedAt.AsTime(), CreatedAt: req.Piece.GetCreatedAt().AsTime(),
} }
if len(req.Piece.Digest) > 0 { if len(req.Piece.GetDigest()) > 0 {
d, err := digest.Parse(req.Piece.Digest) d, err := digest.Parse(req.Piece.GetDigest())
if err != nil { if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error()) return status.Errorf(codes.InvalidArgument, err.Error())
} }
@ -1126,17 +1126,17 @@ func (v *V2) handleDownloadPieceFinishedRequest(ctx context.Context, peerID stri
func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) error { func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) error {
// Construct piece. // Construct piece.
piece := &resource.Piece{ piece := &resource.Piece{
Number: req.Piece.Number, Number: req.Piece.GetNumber(),
ParentID: req.Piece.ParentId, ParentID: req.Piece.GetParentId(),
Offset: req.Piece.Offset, Offset: req.Piece.GetOffset(),
Length: req.Piece.Length, Length: req.Piece.GetLength(),
TrafficType: req.Piece.TrafficType, TrafficType: req.Piece.GetTrafficType(),
Cost: req.Piece.Cost.AsDuration(), Cost: req.Piece.GetCost().AsDuration(),
CreatedAt: req.Piece.CreatedAt.AsTime(), CreatedAt: req.Piece.GetCreatedAt().AsTime(),
} }
if len(req.Piece.Digest) > 0 { if len(req.Piece.GetDigest()) > 0 {
d, err := digest.Parse(req.Piece.Digest) d, err := digest.Parse(req.Piece.GetDigest())
if err != nil { if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error()) return status.Errorf(codes.InvalidArgument, err.Error())
} }
@ -1182,20 +1182,20 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string
} }
// Collect DownloadPieceCount and DownloadPieceFailureCount metrics. // Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
metrics.DownloadPieceCount.WithLabelValues(req.Piece.TrafficType.String(), peer.Task.Type.String(), metrics.DownloadPieceCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.TrafficType.String(), peer.Task.Type.String(), metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
if req.Temporary { if req.Temporary {
// Handle peer with piece temporary failed request. // Handle peer with piece temporary failed request.
peer.UpdatedAt.Store(time.Now()) peer.UpdatedAt.Store(time.Now())
peer.BlockParents.Add(req.Piece.ParentId) peer.BlockParents.Add(req.Piece.GetParentId())
if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil { if err := v.scheduling.ScheduleCandidateParents(ctx, peer, peer.BlockParents); err != nil {
return status.Error(codes.FailedPrecondition, err.Error()) return status.Error(codes.FailedPrecondition, err.Error())
} }
if parent, loaded := v.resource.PeerManager().Load(req.Piece.ParentId); loaded { if parent, loaded := v.resource.PeerManager().Load(req.Piece.GetParentId()); loaded {
parent.Host.UploadFailedCount.Inc() parent.Host.UploadFailedCount.Inc()
} }
@ -1221,9 +1221,9 @@ func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, p
peer.Task.UpdatedAt.Store(time.Now()) peer.Task.UpdatedAt.Store(time.Now())
// Collect DownloadPieceCount and DownloadPieceFailureCount metrics. // Collect DownloadPieceCount and DownloadPieceFailureCount metrics.
metrics.DownloadPieceCount.WithLabelValues(req.Piece.TrafficType.String(), peer.Task.Type.String(), metrics.DownloadPieceCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.TrafficType.String(), peer.Task.Type.String(), metrics.DownloadPieceFailureCount.WithLabelValues(req.Piece.GetTrafficType().String(), peer.Task.Type.String(),
peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc() peer.Task.Tag, peer.Task.Application, peer.Host.Type.Name()).Inc()
return status.Error(codes.Internal, "download piece from source failed") return status.Error(codes.Internal, "download piece from source failed")

View File

@ -207,8 +207,8 @@ func TestServiceV2_StatPeer(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: peer.Host.Network.TCPConnectionCount, TcpConnectionCount: peer.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
Location: peer.Host.Network.Location, Location: &peer.Host.Network.Location,
Idc: peer.Host.Network.IDC, Idc: &peer.Host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: peer.Host.Disk.Total, Total: peer.Host.Disk.Total,
@ -223,7 +223,7 @@ func TestServiceV2_StatPeer(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: peer.Host.Build.GitVersion, GitVersion: peer.Host.Build.GitVersion,
GitCommit: peer.Host.Build.GitCommit, GitCommit: peer.Host.Build.GitCommit,
GoVersion: peer.Host.Build.GoVersion, GoVersion: &peer.Host.Build.GoVersion,
Platform: peer.Host.Build.Platform, Platform: peer.Host.Build.Platform,
}, },
}, },
@ -464,8 +464,8 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: mockNetwork.TCPConnectionCount, TcpConnectionCount: mockNetwork.TCPConnectionCount,
UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
Location: mockNetwork.Location, Location: &mockNetwork.Location,
Idc: mockNetwork.IDC, Idc: &mockNetwork.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: mockDisk.Total, Total: mockDisk.Total,
@ -480,7 +480,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: mockBuild.GitVersion, GitVersion: mockBuild.GitVersion,
GitCommit: mockBuild.GitCommit, GitCommit: mockBuild.GitCommit,
GoVersion: mockBuild.GoVersion, GoVersion: &mockBuild.GoVersion,
Platform: mockBuild.Platform, Platform: mockBuild.Platform,
}, },
}, },
@ -568,8 +568,8 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: mockNetwork.TCPConnectionCount, TcpConnectionCount: mockNetwork.TCPConnectionCount,
UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
Location: mockNetwork.Location, Location: &mockNetwork.Location,
Idc: mockNetwork.IDC, Idc: &mockNetwork.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: mockDisk.Total, Total: mockDisk.Total,
@ -584,7 +584,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: mockBuild.GitVersion, GitVersion: mockBuild.GitVersion,
GitCommit: mockBuild.GitCommit, GitCommit: mockBuild.GitCommit,
GoVersion: mockBuild.GoVersion, GoVersion: &mockBuild.GoVersion,
Platform: mockBuild.Platform, Platform: mockBuild.Platform,
}, },
}, },
@ -672,8 +672,8 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: mockNetwork.TCPConnectionCount, TcpConnectionCount: mockNetwork.TCPConnectionCount,
UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
Location: mockNetwork.Location, Location: &mockNetwork.Location,
Idc: mockNetwork.IDC, Idc: &mockNetwork.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: mockDisk.Total, Total: mockDisk.Total,
@ -688,7 +688,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: mockBuild.GitVersion, GitVersion: mockBuild.GitVersion,
GitCommit: mockBuild.GitCommit, GitCommit: mockBuild.GitCommit,
GoVersion: mockBuild.GoVersion, GoVersion: &mockBuild.GoVersion,
Platform: mockBuild.Platform, Platform: mockBuild.Platform,
}, },
}, },
@ -772,8 +772,8 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: mockNetwork.TCPConnectionCount, TcpConnectionCount: mockNetwork.TCPConnectionCount,
UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
Location: mockNetwork.Location, Location: &mockNetwork.Location,
Idc: mockNetwork.IDC, Idc: &mockNetwork.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: mockDisk.Total, Total: mockDisk.Total,
@ -788,7 +788,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: mockBuild.GitVersion, GitVersion: mockBuild.GitVersion,
GitCommit: mockBuild.GitCommit, GitCommit: mockBuild.GitCommit,
GoVersion: mockBuild.GoVersion, GoVersion: &mockBuild.GoVersion,
Platform: mockBuild.Platform, Platform: mockBuild.Platform,
}, },
}, },