feat: register service to manager (#475)

* feat: register service to manager

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2021-07-20 10:14:16 +08:00
parent aff7951c47
commit 899c2432b6
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
11 changed files with 322 additions and 1109 deletions

View File

@ -164,9 +164,7 @@ func (s *Server) register(ctx context.Context) error {
location := s.config.Host.Location
downloadPort := int32(s.config.DownloadPort)
var cdn *manager.CDN
var err error
cdn, err = s.managerClient.CreateCDN(ctx, &manager.CreateCDNRequest{
cdn, err := s.managerClient.UpdateCDN(ctx, &manager.UpdateCDNRequest{
SourceType: manager.SourceType_CDN_SOURCE,
HostName: iputils.HostName,
Ip: ip,
@ -176,22 +174,10 @@ func (s *Server) register(ctx context.Context) error {
DownloadPort: downloadPort,
})
if err != nil {
cdn, err = s.managerClient.UpdateCDN(ctx, &manager.UpdateCDNRequest{
SourceType: manager.SourceType_CDN_SOURCE,
HostName: iputils.HostName,
Ip: ip,
Port: port,
Idc: idc,
Location: location,
DownloadPort: downloadPort,
})
if err != nil {
logger.Errorf("update cdn to manager failed %v", err)
return err
}
logger.Infof("update cdn %s successfully", cdn.HostName)
logger.Errorf("update cdn %s to manager failed %v", cdn.HostName, err)
return err
}
logger.Infof("create cdn %s successfully", cdn.HostName)
logger.Infof("update cdn %s to manager successfully", cdn.HostName)
cdnClusterID := s.config.Manager.CDNClusterID
if cdnClusterID != 0 {
@ -199,7 +185,7 @@ func (s *Server) register(ctx context.Context) error {
CdnId: cdn.Id,
CdnClusterId: cdnClusterID,
}); err != nil {
logger.Warnf("add cdn to cdn cluster failed %v", err)
logger.Warnf("add cdn %s to cdn cluster %s failed %v", cdn.HostName, cdnClusterID, err)
return err
}
logger.Infof("add cdn %s to cdn cluster %s successfully", cdn.HostName, cdnClusterID)

View File

@ -2,6 +2,7 @@ package service
import (
"context"
"errors"
"io"
logger "d7y.io/dragonfly/v2/internal/dflog"
@ -101,7 +102,7 @@ func (s *GRPC) GetCDN(ctx context.Context, req *manager.GetCDNRequest) (*manager
return &pbCDN, nil
}
func (s *GRPC) CreateCDN(ctx context.Context, req *manager.CreateCDNRequest) (*manager.CDN, error) {
func (s *GRPC) createCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*manager.CDN, error) {
if err := req.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
@ -116,7 +117,7 @@ func (s *GRPC) CreateCDN(ctx context.Context, req *manager.CreateCDNRequest) (*m
}
if err := s.db.Create(&cdn).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
return &manager.CDN{
@ -136,14 +137,21 @@ func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m
}
cdn := model.CDN{}
if err := s.db.First(&cdn, model.CDN{HostName: req.HostName}).Updates(model.CDN{
if err := s.db.First(&cdn, model.CDN{HostName: req.HostName}).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createCDN(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.db.Model(&cdn).Updates(model.CDN{
IDC: req.Idc,
Location: req.Location,
IP: req.Ip,
Port: req.Port,
DownloadPort: req.DownloadPort,
}).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -171,16 +179,16 @@ func (s *GRPC) AddCDNToCDNCluster(ctx context.Context, req *manager.AddCDNToCDNC
cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, req.CdnClusterId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
cdn := model.CDN{}
if err := s.db.First(&cdn, req.CdnId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.db.Model(&cdnCluster).Association("CDNs").Append(&cdn); err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -288,7 +296,7 @@ func (s *GRPC) GetScheduler(ctx context.Context, req *manager.GetSchedulerReques
return &pbScheduler, nil
}
func (s *GRPC) CreateScheduler(ctx context.Context, req *manager.CreateSchedulerRequest) (*manager.Scheduler, error) {
func (s *GRPC) createScheduler(ctx context.Context, req *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) {
if err := req.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
@ -311,7 +319,7 @@ func (s *GRPC) CreateScheduler(ctx context.Context, req *manager.CreateScheduler
}
if err := s.db.Create(&scheduler).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
return &manager.Scheduler{
@ -332,6 +340,14 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
return nil, status.Error(codes.InvalidArgument, err.Error())
}
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{HostName: req.HostName}).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createScheduler(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
}
var netConfig datatypes.JSONMap
if len(req.NetConfig) > 0 {
if err := netConfig.UnmarshalJSON(req.NetConfig); err != nil {
@ -339,8 +355,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
}
}
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{HostName: req.HostName}).Updates(model.Scheduler{
if err := s.db.Model(&scheduler).Updates(model.Scheduler{
VIPs: req.Vips,
IDC: req.Idc,
Location: req.Location,
@ -348,7 +363,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
IP: req.Ip,
Port: req.Port,
}).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -378,16 +393,16 @@ func (s *GRPC) AddSchedulerClusterToSchedulerCluster(ctx context.Context, req *m
schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, req.SchedulerClusterId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.db.Model(&schedulerCluster).Association("Schedulers").Append(&scheduler); err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -474,7 +489,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
req, err := m.Recv()
if err != nil {
logger.Errorf("keepalive failed for the first time: %v", err)
return err
return status.Error(codes.Unknown, err.Error())
}
if err := req.Validate(); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
@ -492,7 +507,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.Scheduler{
Status: model.SchedulerStatusActive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -511,7 +526,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.CDN{
Status: model.CDNStatusActive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -533,7 +548,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.Scheduler{
Status: model.SchedulerStatusInactive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -552,7 +567,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.CDN{
Status: model.CDNStatusInactive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}
if err := s.cache.Delete(
@ -568,7 +583,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
return nil
}
logger.Errorf("%s keepalive failed: %v", hostName, err)
return err
return status.Error(codes.Unknown, err.Error())
}
logger.Debugf("%s type of %s send keepalive request", sourceType, hostName)

View File

@ -12,6 +12,7 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SeederClient is the client API for Seeder service.
@ -33,7 +34,7 @@ func NewSeederClient(cc grpc.ClientConnInterface) SeederClient {
}
func (c *seederClient) ObtainSeeds(ctx context.Context, in *SeedRequest, opts ...grpc.CallOption) (Seeder_ObtainSeedsClient, error) {
stream, err := c.cc.NewStream(ctx, &_Seeder_serviceDesc.Streams[0], "/cdnsystem.Seeder/ObtainSeeds", opts...)
stream, err := c.cc.NewStream(ctx, &Seeder_ServiceDesc.Streams[0], "/cdnsystem.Seeder/ObtainSeeds", opts...)
if err != nil {
return nil, err
}
@ -104,7 +105,7 @@ type UnsafeSeederServer interface {
}
func RegisterSeederServer(s grpc.ServiceRegistrar, srv SeederServer) {
s.RegisterService(&_Seeder_serviceDesc, srv)
s.RegisterService(&Seeder_ServiceDesc, srv)
}
func _Seeder_ObtainSeeds_Handler(srv interface{}, stream grpc.ServerStream) error {
@ -146,7 +147,10 @@ func _Seeder_GetPieceTasks_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
var _Seeder_serviceDesc = grpc.ServiceDesc{
// Seeder_ServiceDesc is the grpc.ServiceDesc for Seeder service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Seeder_ServiceDesc = grpc.ServiceDesc{
ServiceName: "cdnsystem.Seeder",
HandlerType: (*SeederServer)(nil),
Methods: []grpc.MethodDesc{

View File

@ -13,6 +13,7 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// DaemonClient is the client API for Daemon service.
@ -36,7 +37,7 @@ func NewDaemonClient(cc grpc.ClientConnInterface) DaemonClient {
}
func (c *daemonClient) Download(ctx context.Context, in *DownRequest, opts ...grpc.CallOption) (Daemon_DownloadClient, error) {
stream, err := c.cc.NewStream(ctx, &_Daemon_serviceDesc.Streams[0], "/dfdaemon.Daemon/Download", opts...)
stream, err := c.cc.NewStream(ctx, &Daemon_ServiceDesc.Streams[0], "/dfdaemon.Daemon/Download", opts...)
if err != nil {
return nil, err
}
@ -121,7 +122,7 @@ type UnsafeDaemonServer interface {
}
func RegisterDaemonServer(s grpc.ServiceRegistrar, srv DaemonServer) {
s.RegisterService(&_Daemon_serviceDesc, srv)
s.RegisterService(&Daemon_ServiceDesc, srv)
}
func _Daemon_Download_Handler(srv interface{}, stream grpc.ServerStream) error {
@ -181,7 +182,10 @@ func _Daemon_CheckHealth_Handler(srv interface{}, ctx context.Context, dec func(
return interceptor(ctx, in, info, handler)
}
var _Daemon_serviceDesc = grpc.ServiceDesc{
// Daemon_ServiceDesc is the grpc.ServiceDesc for Daemon service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Daemon_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dfdaemon.Daemon",
HandlerType: (*DaemonServer)(nil),
Methods: []grpc.MethodDesc{

File diff suppressed because it is too large Load Diff

View File

@ -390,159 +390,6 @@ var _ interface {
ErrorName() string
} = GetCDNRequestValidationError{}
// Validate checks the field values on CreateCDNRequest with the rules defined
// in the proto definition for this message. If any rules are violated, an
// error is returned.
func (m *CreateCDNRequest) Validate() error {
if m == nil {
return nil
}
if _, ok := SourceType_name[int32(m.GetSourceType())]; !ok {
return CreateCDNRequestValidationError{
field: "SourceType",
reason: "value must be one of the defined enum values",
}
}
if err := m._validateHostname(m.GetHostName()); err != nil {
return CreateCDNRequestValidationError{
field: "HostName",
reason: "value must be a valid hostname",
cause: err,
}
}
if m.GetIdc() != "" {
if l := utf8.RuneCountInString(m.GetIdc()); l < 1 || l > 1024 {
return CreateCDNRequestValidationError{
field: "Idc",
reason: "value length must be between 1 and 1024 runes, inclusive",
}
}
}
if m.GetLocation() != "" {
if utf8.RuneCountInString(m.GetLocation()) > 1024 {
return CreateCDNRequestValidationError{
field: "Location",
reason: "value length must be at most 1024 runes",
}
}
}
if ip := net.ParseIP(m.GetIp()); ip == nil {
return CreateCDNRequestValidationError{
field: "Ip",
reason: "value must be a valid IP address",
}
}
if val := m.GetPort(); val < 1024 || val >= 65535 {
return CreateCDNRequestValidationError{
field: "Port",
reason: "value must be inside range [1024, 65535)",
}
}
if val := m.GetDownloadPort(); val < 1024 || val >= 65535 {
return CreateCDNRequestValidationError{
field: "DownloadPort",
reason: "value must be inside range [1024, 65535)",
}
}
return nil
}
func (m *CreateCDNRequest) _validateHostname(host string) error {
s := strings.ToLower(strings.TrimSuffix(host, "."))
if len(host) > 253 {
return errors.New("hostname cannot exceed 253 characters")
}
for _, part := range strings.Split(s, ".") {
if l := len(part); l == 0 || l > 63 {
return errors.New("hostname part must be non-empty and cannot exceed 63 characters")
}
if part[0] == '-' {
return errors.New("hostname parts cannot begin with hyphens")
}
if part[len(part)-1] == '-' {
return errors.New("hostname parts cannot end with hyphens")
}
for _, r := range part {
if (r < 'a' || r > 'z') && (r < '0' || r > '9') && r != '-' {
return fmt.Errorf("hostname parts can only contain alphanumeric characters or hyphens, got %q", string(r))
}
}
}
return nil
}
// CreateCDNRequestValidationError is the validation error returned by
// CreateCDNRequest.Validate if the designated constraints aren't met.
type CreateCDNRequestValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e CreateCDNRequestValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e CreateCDNRequestValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e CreateCDNRequestValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e CreateCDNRequestValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e CreateCDNRequestValidationError) ErrorName() string { return "CreateCDNRequestValidationError" }
// Error satisfies the builtin error interface
func (e CreateCDNRequestValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sCreateCDNRequest.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = CreateCDNRequestValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = CreateCDNRequestValidationError{}
// Validate checks the field values on UpdateCDNRequest with the rules defined
// in the proto definition for this message. If any rules are violated, an
// error is returned.
@ -588,37 +435,25 @@ func (m *UpdateCDNRequest) Validate() error {
}
if m.GetIp() != "" {
if ip := net.ParseIP(m.GetIp()); ip == nil {
return UpdateCDNRequestValidationError{
field: "Ip",
reason: "value must be a valid IP address",
}
if ip := net.ParseIP(m.GetIp()); ip == nil {
return UpdateCDNRequestValidationError{
field: "Ip",
reason: "value must be a valid IP address",
}
}
if m.GetPort() != 0 {
if val := m.GetPort(); val < 1024 || val >= 65535 {
return UpdateCDNRequestValidationError{
field: "Port",
reason: "value must be inside range [1024, 65535)",
}
if val := m.GetPort(); val < 1024 || val >= 65535 {
return UpdateCDNRequestValidationError{
field: "Port",
reason: "value must be inside range [1024, 65535)",
}
}
if m.GetDownloadPort() != 0 {
if val := m.GetDownloadPort(); val < 1024 || val >= 65535 {
return UpdateCDNRequestValidationError{
field: "DownloadPort",
reason: "value must be inside range [1024, 65535)",
}
if val := m.GetDownloadPort(); val < 1024 || val >= 65535 {
return UpdateCDNRequestValidationError{
field: "DownloadPort",
reason: "value must be inside range [1024, 65535)",
}
}
return nil
@ -1094,176 +929,6 @@ var _ interface {
ErrorName() string
} = GetSchedulerRequestValidationError{}
// Validate checks the field values on CreateSchedulerRequest with the rules
// defined in the proto definition for this message. If any rules are
// violated, an error is returned.
func (m *CreateSchedulerRequest) Validate() error {
if m == nil {
return nil
}
if _, ok := SourceType_name[int32(m.GetSourceType())]; !ok {
return CreateSchedulerRequestValidationError{
field: "SourceType",
reason: "value must be one of the defined enum values",
}
}
if err := m._validateHostname(m.GetHostName()); err != nil {
return CreateSchedulerRequestValidationError{
field: "HostName",
reason: "value must be a valid hostname",
cause: err,
}
}
if m.GetVips() != "" {
if l := utf8.RuneCountInString(m.GetVips()); l < 1 || l > 1024 {
return CreateSchedulerRequestValidationError{
field: "Vips",
reason: "value length must be between 1 and 1024 runes, inclusive",
}
}
}
if m.GetIdc() != "" {
if l := utf8.RuneCountInString(m.GetIdc()); l < 1 || l > 1024 {
return CreateSchedulerRequestValidationError{
field: "Idc",
reason: "value length must be between 1 and 1024 runes, inclusive",
}
}
}
if m.GetLocation() != "" {
if utf8.RuneCountInString(m.GetLocation()) > 1024 {
return CreateSchedulerRequestValidationError{
field: "Location",
reason: "value length must be at most 1024 runes",
}
}
}
if len(m.GetNetConfig()) > 0 {
if len(m.GetNetConfig()) < 1 {
return CreateSchedulerRequestValidationError{
field: "NetConfig",
reason: "value length must be at least 1 bytes",
}
}
}
if ip := net.ParseIP(m.GetIp()); ip == nil {
return CreateSchedulerRequestValidationError{
field: "Ip",
reason: "value must be a valid IP address",
}
}
if val := m.GetPort(); val < 1024 || val >= 65535 {
return CreateSchedulerRequestValidationError{
field: "Port",
reason: "value must be inside range [1024, 65535)",
}
}
return nil
}
func (m *CreateSchedulerRequest) _validateHostname(host string) error {
s := strings.ToLower(strings.TrimSuffix(host, "."))
if len(host) > 253 {
return errors.New("hostname cannot exceed 253 characters")
}
for _, part := range strings.Split(s, ".") {
if l := len(part); l == 0 || l > 63 {
return errors.New("hostname part must be non-empty and cannot exceed 63 characters")
}
if part[0] == '-' {
return errors.New("hostname parts cannot begin with hyphens")
}
if part[len(part)-1] == '-' {
return errors.New("hostname parts cannot end with hyphens")
}
for _, r := range part {
if (r < 'a' || r > 'z') && (r < '0' || r > '9') && r != '-' {
return fmt.Errorf("hostname parts can only contain alphanumeric characters or hyphens, got %q", string(r))
}
}
}
return nil
}
// CreateSchedulerRequestValidationError is the validation error returned by
// CreateSchedulerRequest.Validate if the designated constraints aren't met.
type CreateSchedulerRequestValidationError struct {
field string
reason string
cause error
key bool
}
// Field function returns field value.
func (e CreateSchedulerRequestValidationError) Field() string { return e.field }
// Reason function returns reason value.
func (e CreateSchedulerRequestValidationError) Reason() string { return e.reason }
// Cause function returns cause value.
func (e CreateSchedulerRequestValidationError) Cause() error { return e.cause }
// Key function returns key value.
func (e CreateSchedulerRequestValidationError) Key() bool { return e.key }
// ErrorName returns error name.
func (e CreateSchedulerRequestValidationError) ErrorName() string {
return "CreateSchedulerRequestValidationError"
}
// Error satisfies the builtin error interface
func (e CreateSchedulerRequestValidationError) Error() string {
cause := ""
if e.cause != nil {
cause = fmt.Sprintf(" | caused by: %v", e.cause)
}
key := ""
if e.key {
key = "key for "
}
return fmt.Sprintf(
"invalid %sCreateSchedulerRequest.%s: %s%s",
key,
e.field,
e.reason,
cause)
}
var _ error = CreateSchedulerRequestValidationError{}
var _ interface {
Field() string
Reason() string
Key() bool
Cause() error
ErrorName() string
} = CreateSchedulerRequestValidationError{}
// Validate checks the field values on UpdateSchedulerRequest with the rules
// defined in the proto definition for this message. If any rules are
// violated, an error is returned.
@ -1331,26 +996,18 @@ func (m *UpdateSchedulerRequest) Validate() error {
}
if m.GetIp() != "" {
if ip := net.ParseIP(m.GetIp()); ip == nil {
return UpdateSchedulerRequestValidationError{
field: "Ip",
reason: "value must be a valid IP address",
}
if ip := net.ParseIP(m.GetIp()); ip == nil {
return UpdateSchedulerRequestValidationError{
field: "Ip",
reason: "value must be a valid IP address",
}
}
if m.GetPort() != 0 {
if val := m.GetPort(); val < 1024 || val >= 65535 {
return UpdateSchedulerRequestValidationError{
field: "Port",
reason: "value must be inside range [1024, 65535)",
}
if val := m.GetPort(); val < 1024 || val >= 65535 {
return UpdateSchedulerRequestValidationError{
field: "Port",
reason: "value must be inside range [1024, 65535)",
}
}
return nil

View File

@ -62,24 +62,14 @@ message GetCDNRequest {
string host_name = 2 [(validate.rules).string.hostname = true];
}
message CreateCDNRequest {
SourceType source_type = 1 [(validate.rules).enum.defined_only = true];
string host_name = 2 [(validate.rules).string.hostname = true];
string idc = 4 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
string location = 5 [(validate.rules).string = {max_len: 1024, ignore_empty: true}];
string ip = 6 [(validate.rules).string.ip = true];
int32 port = 7 [(validate.rules).int32 = {gte: 1024, lt: 65535}];
int32 download_port = 8 [(validate.rules).int32 = {gte: 1024, lt: 65535}];
}
message UpdateCDNRequest {
SourceType source_type = 1 [(validate.rules).enum.defined_only = true];
string host_name = 2 [(validate.rules).string.hostname = true];
string idc = 3 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
string location = 4 [(validate.rules).string = {max_len: 1024, ignore_empty: true}];
string ip = 5 [(validate.rules).string = {ip: true, ignore_empty: true}];
int32 port = 6 [(validate.rules).int32 = {gte: 1024, lt: 65535, ignore_empty: true}];
int32 download_port = 7 [(validate.rules).int32 = {gte: 1024, lt: 65535, ignore_empty: true}];
string ip = 5 [(validate.rules).string = {ip: true}];
int32 port = 6 [(validate.rules).int32 = {gte: 1024, lt: 65535}];
int32 download_port = 7 [(validate.rules).int32 = {gte: 1024, lt: 65535}];
}
message AddCDNToCDNClusterRequest {
@ -116,17 +106,6 @@ message GetSchedulerRequest {
string host_name = 2 [(validate.rules).string.hostname = true];
}
message CreateSchedulerRequest {
SourceType source_type = 1 [(validate.rules).enum.defined_only = true];
string host_name = 2 [(validate.rules).string.hostname = true];
string vips = 4 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
string idc = 5 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
string location = 6 [(validate.rules).string = {max_len: 1024, ignore_empty: true}];
bytes net_config = 7 [(validate.rules).bytes = {min_len: 1, ignore_empty: true}];
string ip = 8 [(validate.rules).string.ip = true];
int32 port = 9 [(validate.rules).int32 = {gte: 1024, lt: 65535}];
}
message UpdateSchedulerRequest {
SourceType source_type = 1 [(validate.rules).enum.defined_only = true];
string host_name = 2 [(validate.rules).string.hostname = true];
@ -134,8 +113,8 @@ message UpdateSchedulerRequest {
string idc = 5 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
string location = 6 [(validate.rules).string = {max_len: 1024, ignore_empty: true}];
bytes net_config = 7 [(validate.rules).bytes = {min_len: 1, ignore_empty: true}];
string ip = 8 [(validate.rules).string = {ip: true, ignore_empty: true}];
int32 port = 9 [(validate.rules).int32 = {gte: 1024, lt: 65535, ignore_empty: true}];
string ip = 8 [(validate.rules).string = {ip: true}];
int32 port = 9 [(validate.rules).int32 = {gte: 1024, lt: 65535}];
}
message AddSchedulerClusterToSchedulerClusterRequest {
@ -164,16 +143,12 @@ message KeepAliveRequest {
service Manager {
// Get CDN and CDN cluster configuration
rpc GetCDN(GetCDNRequest) returns(CDN);
// Create CDN configuration
rpc CreateCDN(CreateCDNRequest) returns(CDN);
// Update CDN configuration
rpc UpdateCDN(UpdateCDNRequest) returns(CDN);
// AddCDNToCDNCluster add cdn to cdn cluster
rpc AddCDNToCDNCluster(AddCDNToCDNClusterRequest) returns(google.protobuf.Empty);
// Get Scheduler and Scheduler cluster configuration
rpc GetScheduler(GetSchedulerRequest)returns(Scheduler);
// Create scheduler configuration
rpc CreateScheduler(CreateSchedulerRequest) returns(Scheduler);
// Update scheduler configuration
rpc UpdateScheduler(UpdateSchedulerRequest) returns(Scheduler);
// AddSchedulerClusterToSchedulerCluster add scheduler to scheduler cluster

View File

@ -12,6 +12,7 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// ManagerClient is the client API for Manager service.
@ -20,16 +21,12 @@ const _ = grpc.SupportPackageIsVersion7
type ManagerClient interface {
// Get CDN and CDN cluster configuration
GetCDN(ctx context.Context, in *GetCDNRequest, opts ...grpc.CallOption) (*CDN, error)
// Create CDN configuration
CreateCDN(ctx context.Context, in *CreateCDNRequest, opts ...grpc.CallOption) (*CDN, error)
// Update CDN configuration
UpdateCDN(ctx context.Context, in *UpdateCDNRequest, opts ...grpc.CallOption) (*CDN, error)
// AddCDNToCDNCluster add cdn to cdn cluster
AddCDNToCDNCluster(ctx context.Context, in *AddCDNToCDNClusterRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// Get Scheduler and Scheduler cluster configuration
GetScheduler(ctx context.Context, in *GetSchedulerRequest, opts ...grpc.CallOption) (*Scheduler, error)
// Create scheduler configuration
CreateScheduler(ctx context.Context, in *CreateSchedulerRequest, opts ...grpc.CallOption) (*Scheduler, error)
// Update scheduler configuration
UpdateScheduler(ctx context.Context, in *UpdateSchedulerRequest, opts ...grpc.CallOption) (*Scheduler, error)
// AddSchedulerClusterToSchedulerCluster add scheduler to scheduler cluster
@ -57,15 +54,6 @@ func (c *managerClient) GetCDN(ctx context.Context, in *GetCDNRequest, opts ...g
return out, nil
}
func (c *managerClient) CreateCDN(ctx context.Context, in *CreateCDNRequest, opts ...grpc.CallOption) (*CDN, error) {
out := new(CDN)
err := c.cc.Invoke(ctx, "/manager.Manager/CreateCDN", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managerClient) UpdateCDN(ctx context.Context, in *UpdateCDNRequest, opts ...grpc.CallOption) (*CDN, error) {
out := new(CDN)
err := c.cc.Invoke(ctx, "/manager.Manager/UpdateCDN", in, out, opts...)
@ -93,15 +81,6 @@ func (c *managerClient) GetScheduler(ctx context.Context, in *GetSchedulerReques
return out, nil
}
func (c *managerClient) CreateScheduler(ctx context.Context, in *CreateSchedulerRequest, opts ...grpc.CallOption) (*Scheduler, error) {
out := new(Scheduler)
err := c.cc.Invoke(ctx, "/manager.Manager/CreateScheduler", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managerClient) UpdateScheduler(ctx context.Context, in *UpdateSchedulerRequest, opts ...grpc.CallOption) (*Scheduler, error) {
out := new(Scheduler)
err := c.cc.Invoke(ctx, "/manager.Manager/UpdateScheduler", in, out, opts...)
@ -130,7 +109,7 @@ func (c *managerClient) ListSchedulers(ctx context.Context, in *ListSchedulersRe
}
func (c *managerClient) KeepAlive(ctx context.Context, opts ...grpc.CallOption) (Manager_KeepAliveClient, error) {
stream, err := c.cc.NewStream(ctx, &_Manager_serviceDesc.Streams[0], "/manager.Manager/KeepAlive", opts...)
stream, err := c.cc.NewStream(ctx, &Manager_ServiceDesc.Streams[0], "/manager.Manager/KeepAlive", opts...)
if err != nil {
return nil, err
}
@ -169,16 +148,12 @@ func (x *managerKeepAliveClient) CloseAndRecv() (*emptypb.Empty, error) {
type ManagerServer interface {
// Get CDN and CDN cluster configuration
GetCDN(context.Context, *GetCDNRequest) (*CDN, error)
// Create CDN configuration
CreateCDN(context.Context, *CreateCDNRequest) (*CDN, error)
// Update CDN configuration
UpdateCDN(context.Context, *UpdateCDNRequest) (*CDN, error)
// AddCDNToCDNCluster add cdn to cdn cluster
AddCDNToCDNCluster(context.Context, *AddCDNToCDNClusterRequest) (*emptypb.Empty, error)
// Get Scheduler and Scheduler cluster configuration
GetScheduler(context.Context, *GetSchedulerRequest) (*Scheduler, error)
// Create scheduler configuration
CreateScheduler(context.Context, *CreateSchedulerRequest) (*Scheduler, error)
// Update scheduler configuration
UpdateScheduler(context.Context, *UpdateSchedulerRequest) (*Scheduler, error)
// AddSchedulerClusterToSchedulerCluster add scheduler to scheduler cluster
@ -197,9 +172,6 @@ type UnimplementedManagerServer struct {
func (UnimplementedManagerServer) GetCDN(context.Context, *GetCDNRequest) (*CDN, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetCDN not implemented")
}
func (UnimplementedManagerServer) CreateCDN(context.Context, *CreateCDNRequest) (*CDN, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateCDN not implemented")
}
func (UnimplementedManagerServer) UpdateCDN(context.Context, *UpdateCDNRequest) (*CDN, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateCDN not implemented")
}
@ -209,9 +181,6 @@ func (UnimplementedManagerServer) AddCDNToCDNCluster(context.Context, *AddCDNToC
func (UnimplementedManagerServer) GetScheduler(context.Context, *GetSchedulerRequest) (*Scheduler, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetScheduler not implemented")
}
func (UnimplementedManagerServer) CreateScheduler(context.Context, *CreateSchedulerRequest) (*Scheduler, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateScheduler not implemented")
}
func (UnimplementedManagerServer) UpdateScheduler(context.Context, *UpdateSchedulerRequest) (*Scheduler, error) {
return nil, status.Errorf(codes.Unimplemented, "method UpdateScheduler not implemented")
}
@ -234,7 +203,7 @@ type UnsafeManagerServer interface {
}
func RegisterManagerServer(s grpc.ServiceRegistrar, srv ManagerServer) {
s.RegisterService(&_Manager_serviceDesc, srv)
s.RegisterService(&Manager_ServiceDesc, srv)
}
func _Manager_GetCDN_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
@ -255,24 +224,6 @@ func _Manager_GetCDN_Handler(srv interface{}, ctx context.Context, dec func(inte
return interceptor(ctx, in, info, handler)
}
func _Manager_CreateCDN_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateCDNRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServer).CreateCDN(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/manager.Manager/CreateCDN",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServer).CreateCDN(ctx, req.(*CreateCDNRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Manager_UpdateCDN_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateCDNRequest)
if err := dec(in); err != nil {
@ -327,24 +278,6 @@ func _Manager_GetScheduler_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
func _Manager_CreateScheduler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateSchedulerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServer).CreateScheduler(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/manager.Manager/CreateScheduler",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServer).CreateScheduler(ctx, req.(*CreateSchedulerRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Manager_UpdateScheduler_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(UpdateSchedulerRequest)
if err := dec(in); err != nil {
@ -425,7 +358,10 @@ func (x *managerKeepAliveServer) Recv() (*KeepAliveRequest, error) {
return m, nil
}
var _Manager_serviceDesc = grpc.ServiceDesc{
// Manager_ServiceDesc is the grpc.ServiceDesc for Manager service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Manager_ServiceDesc = grpc.ServiceDesc{
ServiceName: "manager.Manager",
HandlerType: (*ManagerServer)(nil),
Methods: []grpc.MethodDesc{
@ -433,10 +369,6 @@ var _Manager_serviceDesc = grpc.ServiceDesc{
MethodName: "GetCDN",
Handler: _Manager_GetCDN_Handler,
},
{
MethodName: "CreateCDN",
Handler: _Manager_CreateCDN_Handler,
},
{
MethodName: "UpdateCDN",
Handler: _Manager_UpdateCDN_Handler,
@ -449,10 +381,6 @@ var _Manager_serviceDesc = grpc.ServiceDesc{
MethodName: "GetScheduler",
Handler: _Manager_GetScheduler_Handler,
},
{
MethodName: "CreateScheduler",
Handler: _Manager_CreateScheduler_Handler,
},
{
MethodName: "UpdateScheduler",
Handler: _Manager_UpdateScheduler_Handler,

View File

@ -12,6 +12,7 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// SchedulerClient is the client API for Scheduler service.
@ -48,7 +49,7 @@ func (c *schedulerClient) RegisterPeerTask(ctx context.Context, in *PeerTaskRequ
}
func (c *schedulerClient) ReportPieceResult(ctx context.Context, opts ...grpc.CallOption) (Scheduler_ReportPieceResultClient, error) {
stream, err := c.cc.NewStream(ctx, &_Scheduler_serviceDesc.Streams[0], "/scheduler.Scheduler/ReportPieceResult", opts...)
stream, err := c.cc.NewStream(ctx, &Scheduler_ServiceDesc.Streams[0], "/scheduler.Scheduler/ReportPieceResult", opts...)
if err != nil {
return nil, err
}
@ -139,7 +140,7 @@ type UnsafeSchedulerServer interface {
}
func RegisterSchedulerServer(s grpc.ServiceRegistrar, srv SchedulerServer) {
s.RegisterService(&_Scheduler_serviceDesc, srv)
s.RegisterService(&Scheduler_ServiceDesc, srv)
}
func _Scheduler_RegisterPeerTask_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
@ -222,7 +223,10 @@ func _Scheduler_LeaveTask_Handler(srv interface{}, ctx context.Context, dec func
return interceptor(ctx, in, info, handler)
}
var _Scheduler_serviceDesc = grpc.ServiceDesc{
// Scheduler_ServiceDesc is the grpc.ServiceDesc for Scheduler service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Scheduler_ServiceDesc = grpc.ServiceDesc{
ServiceName: "scheduler.Scheduler",
HandlerType: (*SchedulerServer)(nil),
Methods: []grpc.MethodDesc{

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: ../../internal/rpc/manager/manager_grpc.pb.go
// Source: ../../pkg/rpc/manager/manager_grpc.pb.go
// Package mocks is a generated GoMock package.
package mocks
@ -78,46 +78,6 @@ func (mr *MockManagerClientMockRecorder) AddSchedulerClusterToSchedulerCluster(c
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSchedulerClusterToSchedulerCluster", reflect.TypeOf((*MockManagerClient)(nil).AddSchedulerClusterToSchedulerCluster), varargs...)
}
// CreateCDN mocks base method.
func (m *MockManagerClient) CreateCDN(ctx context.Context, in *manager.CreateCDNRequest, opts ...grpc.CallOption) (*manager.CDN, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "CreateCDN", varargs...)
ret0, _ := ret[0].(*manager.CDN)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateCDN indicates an expected call of CreateCDN.
func (mr *MockManagerClientMockRecorder) CreateCDN(ctx, in interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateCDN", reflect.TypeOf((*MockManagerClient)(nil).CreateCDN), varargs...)
}
// CreateScheduler mocks base method.
func (m *MockManagerClient) CreateScheduler(ctx context.Context, in *manager.CreateSchedulerRequest, opts ...grpc.CallOption) (*manager.Scheduler, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "CreateScheduler", varargs...)
ret0, _ := ret[0].(*manager.Scheduler)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateScheduler indicates an expected call of CreateScheduler.
func (mr *MockManagerClientMockRecorder) CreateScheduler(ctx, in interface{}, opts ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateScheduler", reflect.TypeOf((*MockManagerClient)(nil).CreateScheduler), varargs...)
}
// GetCDN mocks base method.
func (m *MockManagerClient) GetCDN(ctx context.Context, in *manager.GetCDNRequest, opts ...grpc.CallOption) (*manager.CDN, error) {
m.ctrl.T.Helper()
@ -428,36 +388,6 @@ func (mr *MockManagerServerMockRecorder) AddSchedulerClusterToSchedulerCluster(a
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSchedulerClusterToSchedulerCluster", reflect.TypeOf((*MockManagerServer)(nil).AddSchedulerClusterToSchedulerCluster), arg0, arg1)
}
// CreateCDN mocks base method.
func (m *MockManagerServer) CreateCDN(arg0 context.Context, arg1 *manager.CreateCDNRequest) (*manager.CDN, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateCDN", arg0, arg1)
ret0, _ := ret[0].(*manager.CDN)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateCDN indicates an expected call of CreateCDN.
func (mr *MockManagerServerMockRecorder) CreateCDN(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateCDN", reflect.TypeOf((*MockManagerServer)(nil).CreateCDN), arg0, arg1)
}
// CreateScheduler mocks base method.
func (m *MockManagerServer) CreateScheduler(arg0 context.Context, arg1 *manager.CreateSchedulerRequest) (*manager.Scheduler, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateScheduler", arg0, arg1)
ret0, _ := ret[0].(*manager.Scheduler)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateScheduler indicates an expected call of CreateScheduler.
func (mr *MockManagerServerMockRecorder) CreateScheduler(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateScheduler", reflect.TypeOf((*MockManagerServer)(nil).CreateScheduler), arg0, arg1)
}
// GetCDN mocks base method.
func (m *MockManagerServer) GetCDN(arg0 context.Context, arg1 *manager.GetCDNRequest) (*manager.CDN, error) {
m.ctrl.T.Helper()

View File

@ -175,7 +175,7 @@ func (s *Server) register(ctx context.Context) error {
var scheduler *manager.Scheduler
var err error
scheduler, err = s.managerClient.CreateScheduler(ctx, &manager.CreateSchedulerRequest{
scheduler, err = s.managerClient.UpdateScheduler(ctx, &manager.UpdateSchedulerRequest{
SourceType: manager.SourceType_SCHEDULER_SOURCE,
HostName: iputils.HostName,
Ip: ip,
@ -184,21 +184,10 @@ func (s *Server) register(ctx context.Context) error {
Location: location,
})
if err != nil {
scheduler, err = s.managerClient.UpdateScheduler(ctx, &manager.UpdateSchedulerRequest{
SourceType: manager.SourceType_SCHEDULER_SOURCE,
HostName: iputils.HostName,
Ip: ip,
Port: port,
Idc: idc,
Location: location,
})
if err != nil {
logger.Warnf("update scheduler to manager failed %v", err)
return err
}
logger.Infof("update scheduler %s successfully", scheduler.HostName)
logger.Warnf("update scheduler %s to manager failed %v", scheduler.HostName, err)
return err
}
logger.Infof("create scheduler %s successfully", scheduler.HostName)
logger.Infof("update scheduler %s to manager successfully", scheduler.HostName)
schedulerClusterID := s.config.Manager.SchedulerClusterID
if schedulerClusterID != 0 {
@ -206,7 +195,7 @@ func (s *Server) register(ctx context.Context) error {
SchedulerId: scheduler.Id,
SchedulerClusterId: schedulerClusterID,
}); err != nil {
logger.Warnf("add scheduler to scheduler cluster failed %v", err)
logger.Warnf("add scheduler %s to scheduler cluster %s failed %v", scheduler.HostName, schedulerClusterID, err)
return err
}
logger.Infof("add scheduler %s to scheduler cluster %s successfully", scheduler.HostName, schedulerClusterID)