diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index 8c9e6a102..a0eed6ba3 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -24,9 +24,9 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" - "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials/insecure" cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1" @@ -39,25 +39,30 @@ import ( ) const ( - backoffBaseDelay = 1 * time.Second - backoffMultiplier = 1.2 - backoffJitter = 0.2 - backoffMaxDelay = 120 * time.Second - minConnectTime = 3 * time.Second //fast fail, leave time to try other scheduler + // maxRetries is maximum number of retries. + maxRetries = 3 + + // backoffWaitBetween is waiting for a fixed period of + // time between calls in backoff linear. + backoffWaitBetween = 500 * time.Millisecond + + // perRetryTimeout is GRPC timeout per call (including initial call) on this call. + perRetryTimeout = 3 * time.Second ) +// defaultDialOptions is default dial options of manager client. var defaultDialOptions = []grpc.DialOption{ grpc.WithDefaultServiceConfig(balancer.BalancerServiceConfig), grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: backoffBaseDelay, - Multiplier: backoffMultiplier, - Jitter: backoffJitter, - MaxDelay: backoffMaxDelay, - }, - MinConnectTimeout: minConnectTime, - }), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithPerRetryTimeout(perRetryTimeout), + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), diff --git a/pkg/rpc/manager/client/client.go b/pkg/rpc/manager/client/client.go index 15895691d..b7f943651 100644 --- a/pkg/rpc/manager/client/client.go +++ b/pkg/rpc/manager/client/client.go @@ -25,9 +25,9 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" - "google.golang.org/grpc/backoff" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" @@ -40,13 +40,65 @@ import ( ) const ( - contextTimeout = 2 * time.Minute - backoffBaseDelay = 1 * time.Second - backoffMultiplier = 1.6 - backoffJitter = 0.2 - backoffMaxDelay = 10 * time.Second + // maxRetries is maximum number of retries. + maxRetries = 3 + + // backoffWaitBetween is waiting for a fixed period of + // time between calls in backoff linear. + backoffWaitBetween = 500 * time.Millisecond + + // perRetryTimeout is GRPC timeout per call (including initial call) on this call. + perRetryTimeout = 5 * time.Second ) +// defaultDialOptions is default dial options of manager client. +var defaultDialOptions = []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithPerRetryTimeout(perRetryTimeout), + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), +} + +// GetClient returns manager client. +func GetClient(target string, options ...grpc.DialOption) (Client, error) { + conn, err := grpc.Dial( + target, + append(defaultDialOptions, options...)..., + ) + if err != nil { + return nil, err + } + + return &client{ + ManagerClient: managerv1.NewManagerClient(conn), + conn: conn, + }, nil +} + +// GetClientByAddr returns manager client with addresses. +func GetClientByAddr(netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (Client, error) { + for _, netAddr := range netAddrs { + ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr}) + if err := ipReachable.Check(); err == nil { + logger.Infof("use %s address for manager grpc client", netAddr.Addr) + return GetClient(netAddr.Addr, opts...) + } + logger.Warnf("%s manager address can not reachable", netAddr.Addr) + } + + return nil, errors.New("can not find available manager addresses") +} + // Client is the interface for grpc client. type Client interface { // Update Seed peer configuration. @@ -80,100 +132,34 @@ type client struct { conn *grpc.ClientConn } -// GetClient returns manager client. -func GetClient(target string, options ...grpc.DialOption) (Client, error) { - dialOptions := []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock(), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: backoffBaseDelay, - Multiplier: backoffMultiplier, - Jitter: backoffJitter, - MaxDelay: backoffMaxDelay, - }, - }), - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - grpc_prometheus.StreamClientInterceptor, - grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), - )), - } - dialOptions = append(dialOptions, options...) - - conn, err := grpc.Dial( - target, - dialOptions..., - ) - if err != nil { - return nil, err - } - - return &client{ - ManagerClient: managerv1.NewManagerClient(conn), - conn: conn, - }, nil -} - -// GetClientByAddr returns manager client with addresses. -func GetClientByAddr(netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (Client, error) { - for _, netAddr := range netAddrs { - ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr}) - if err := ipReachable.Check(); err == nil { - logger.Infof("use %s address for manager grpc client", netAddr.Addr) - return GetClient(netAddr.Addr, opts...) - } - logger.Warnf("%s manager address can not reachable", netAddr.Addr) - } - - return nil, errors.New("can not find available manager addresses") -} - // Update SeedPeer configuration. func (c *client) UpdateSeedPeer(req *managerv1.UpdateSeedPeerRequest) (*managerv1.SeedPeer, error) { - ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - defer cancel() - - return c.ManagerClient.UpdateSeedPeer(ctx, req) + return c.ManagerClient.UpdateSeedPeer(context.Background(), req) } // Get Scheduler and Scheduler cluster configuration. func (c *client) GetScheduler(req *managerv1.GetSchedulerRequest) (*managerv1.Scheduler, error) { - ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - defer cancel() - - return c.ManagerClient.GetScheduler(ctx, req) + return c.ManagerClient.GetScheduler(context.Background(), req) } // Update scheduler configuration. func (c *client) UpdateScheduler(req *managerv1.UpdateSchedulerRequest) (*managerv1.Scheduler, error) { - ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - defer cancel() - - return c.ManagerClient.UpdateScheduler(ctx, req) + return c.ManagerClient.UpdateScheduler(context.Background(), req) } // List acitve schedulers configuration. func (c *client) ListSchedulers(req *managerv1.ListSchedulersRequest) (*managerv1.ListSchedulersResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - defer cancel() - - return c.ManagerClient.ListSchedulers(ctx, req) + return c.ManagerClient.ListSchedulers(context.Background(), req) } // Get object storage configuration. func (c *client) GetObjectStorage(req *managerv1.GetObjectStorageRequest) (*managerv1.ObjectStorage, error) { - ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - defer cancel() - - return c.ManagerClient.GetObjectStorage(ctx, req) + return c.ManagerClient.GetObjectStorage(context.Background(), req) } // List buckets configuration. func (c *client) ListBuckets(req *managerv1.ListBucketsRequest) (*managerv1.ListBucketsResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) - defer cancel() - - return c.ManagerClient.ListBuckets(ctx, req) + return c.ManagerClient.ListBuckets(context.Background(), req) } // List acitve schedulers configuration. diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 2e33022bc..3140eb489 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -24,9 +24,9 @@ import ( grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" - "google.golang.org/grpc/backoff" "google.golang.org/grpc/credentials/insecure" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -39,13 +39,52 @@ import ( ) const ( - backoffBaseDelay = 1 * time.Second - backoffMultiplier = 1.2 - backoffJitter = 0.2 - backoffMaxDelay = 120 * time.Second - minConnectTime = 3 * time.Second //fast fail, leave time to try other scheduler + // maxRetries is maximum number of retries. + maxRetries = 3 + + // backoffWaitBetween is waiting for a fixed period of + // time between calls in backoff linear. + backoffWaitBetween = 500 * time.Millisecond + + // perRetryTimeout is GRPC timeout per call (including initial call) on this call. + perRetryTimeout = 3 * time.Second ) +// defaultDialOptions is default dial options of manager client. +var defaultDialOptions = []grpc.DialOption{ + grpc.WithDefaultServiceConfig(balancer.BalancerServiceConfig), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithPerRetryTimeout(perRetryTimeout), + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), +} + +// GetClient get scheduler clients using resolver and balancer, +func GetClient(options ...grpc.DialOption) (Client, error) { + conn, err := grpc.Dial( + resolver.SchedulerVirtualTarget, + append(defaultDialOptions, options...)..., + ) + if err != nil { + return nil, err + } + + return &client{ + conn, + schedulerv1.NewSchedulerClient(conn), + }, nil +} + // NewBeginOfPiece creates begin of piece. func NewBeginOfPiece(taskID, peerID string) *schedulerv1.PieceResult { return &schedulerv1.PieceResult{ @@ -69,41 +108,6 @@ func NewEndOfPiece(taskID, peerID string, finishedCount int32) *schedulerv1.Piec } } -// GetClient get scheduler clients using resolver and balancer, -func GetClient(options ...grpc.DialOption) (Client, error) { - dialOptions := []grpc.DialOption{ - grpc.WithDefaultServiceConfig(balancer.BalancerServiceConfig), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithConnectParams(grpc.ConnectParams{ - Backoff: backoff.Config{ - BaseDelay: backoffBaseDelay, - Multiplier: backoffMultiplier, - Jitter: backoffJitter, - MaxDelay: backoffMaxDelay, - }, - MinConnectTimeout: minConnectTime, - }), - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - grpc_prometheus.StreamClientInterceptor, - grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), - )), - } - dialOptions = append(dialOptions, options...) - - conn, err := grpc.Dial( - resolver.SchedulerVirtualTarget, - dialOptions..., - ) - if err != nil { - return nil, err - } - - return &client{ - conn, - schedulerv1.NewSchedulerClient(conn), - }, nil -} - // Client is the interface for grpc client. type Client interface { // RegisterPeerTask registers a peer into task. diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index e3bee74a1..d63352a28 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -33,6 +33,7 @@ import ( const ( // SeedTag Default value of tag label for seed peer. SeedTag = "d7y/seed" + // SeedApplication Default value of application label for seed peer. SeedApplication = "d7y/seed" )