diff --git a/client/daemon/pex/peer_exchange_test.go b/client/daemon/pex/peer_exchange_test.go deleted file mode 100644 index 5a774c952..000000000 --- a/client/daemon/pex/peer_exchange_test.go +++ /dev/null @@ -1,385 +0,0 @@ -/* - * Copyright 2024 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package pex - -import ( - "context" - "fmt" - "io" - "log" - "net" - "testing" - "time" - - "github.com/hashicorp/memberlist" - "github.com/phayes/freeport" - "github.com/stretchr/testify/assert" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/grpclog" - "google.golang.org/protobuf/types/known/emptypb" - - commonv1 "d7y.io/api/v2/pkg/apis/common/v1" - dfdaemonv1 "d7y.io/api/v2/pkg/apis/dfdaemon/v1" - - "d7y.io/dragonfly/v2/pkg/retry" -) - -func TestPeerExchange(t *testing.T) { - grpclog.SetLoggerV2(grpclog.NewLoggerV2(io.Discard, io.Discard, io.Discard)) - testCases := []struct { - name string - memberCount int - genPeers func() []*dfdaemonv1.PeerMetadata - }{ - { - name: "normal peers", - memberCount: 3, - genPeers: func() []*dfdaemonv1.PeerMetadata { - peers := []*dfdaemonv1.PeerMetadata{ - { - TaskId: "task-1", - PeerId: "peer-1", - State: dfdaemonv1.PeerState_Running, - }, - { - TaskId: "task-2", - PeerId: "peer-2", - State: dfdaemonv1.PeerState_Success, - }, - { - TaskId: "task-3", - PeerId: "peer-3", - State: dfdaemonv1.PeerState_Running, - }, - } - return peers - }, - }, - { - name: "normal peers with deleted", - memberCount: 3, - genPeers: func() []*dfdaemonv1.PeerMetadata { - peers := []*dfdaemonv1.PeerMetadata{ - { - TaskId: "task-1", - PeerId: "peer-1", - State: dfdaemonv1.PeerState_Running, - }, - { - TaskId: "task-2", - PeerId: "peer-2", - State: dfdaemonv1.PeerState_Success, - }, - { - TaskId: "task-3", - PeerId: "peer-3", - State: dfdaemonv1.PeerState_Deleted, - }, - } - return peers - }, - }, - { - name: "normal peers with failed", - memberCount: 3, - genPeers: func() []*dfdaemonv1.PeerMetadata { - peers := []*dfdaemonv1.PeerMetadata{ - { - TaskId: "task-1", - PeerId: "peer-1", - State: dfdaemonv1.PeerState_Running, - }, - { - TaskId: "task-2", - PeerId: "peer-2", - State: dfdaemonv1.PeerState_Success, - }, - { - TaskId: "task-3", - PeerId: "peer-3", - State: dfdaemonv1.PeerState_Deleted, - }, - { - TaskId: "task-4", - PeerId: "peer-4", - State: dfdaemonv1.PeerState_Failed, - }, - } - return peers - }, - }, - { - name: "normal peers with failed", - memberCount: 10, - genPeers: func() []*dfdaemonv1.PeerMetadata { - peers := []*dfdaemonv1.PeerMetadata{ - { - TaskId: "task-1", - PeerId: "peer-1", - State: dfdaemonv1.PeerState_Running, - }, - { - TaskId: "task-2", - PeerId: "peer-2", - State: dfdaemonv1.PeerState_Success, - }, - { - TaskId: "task-3", - PeerId: "peer-3", - State: dfdaemonv1.PeerState_Deleted, - }, - { - TaskId: "task-4", - PeerId: "peer-4", - State: dfdaemonv1.PeerState_Failed, - }, - } - return peers - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - assert := assert.New(t) - - memberCount := tc.memberCount - peers := tc.genPeers() - - pexServers := setupMembers(assert, memberCount) - - // 1. ensure members' connections - // for large scale members, need wait more time for inter-connections ready - _, _, err := retry.Run(context.Background(), float64(memberCount/3), 10, 10, func() (data any, cancel bool, err error) { - var shouldRetry bool - for _, pex := range pexServers { - memberKeys := pex.memberManager.memberPool.MemberKeys() - if memberCount-1 != len(memberKeys) { - shouldRetry = true - } - } - if shouldRetry { - return nil, false, fmt.Errorf("members count not match") - } - return nil, false, nil - }) - - // make sure all members are ready - if err != nil { - for _, pex := range pexServers { - memberKeys := pex.memberManager.memberPool.MemberKeys() - assert.Equal(memberCount-1, len(memberKeys), - fmt.Sprintf("%s should have %d members", pex.localMember.HostID, memberCount-1)) - } - return - } - - // 2. broadcast peers in all pex servers - for _, p := range peers { - for _, pex := range pexServers { - peer := &dfdaemonv1.PeerMetadata{ - TaskId: p.TaskId, - PeerId: genPeerID(p, pex), - State: p.State, - } - pex.PeerSearchBroadcaster().BroadcastPeer(peer) - } - } - - time.Sleep(3 * time.Second) - - // 3. verify peers - for _, peer := range peers { - for _, pex := range pexServers { - searchPeerResult := pex.PeerSearchBroadcaster().SearchPeer(peer.TaskId) - // check peer state - switch peer.State { - case dfdaemonv1.PeerState_Running, dfdaemonv1.PeerState_Success: - assert.Truef(searchPeerResult.Type != SearchPeerResultTypeNotFound, - "%s should have task %s", pex.localMember.HostID, peer.TaskId) - // other members + local member - assert.Equalf(memberCount, len(searchPeerResult.Peers), - "%s should have %d peers for task %s", pex.localMember.HostID, memberCount, peer.TaskId) - - // check all peers is in other members - for _, realPeer := range searchPeerResult.Peers { - if realPeer.isLocal { - continue - } - var found bool - found = isPeerExistInOtherPEXServers(pexServers, pex.localMember.HostID, peer, realPeer) - assert.Truef(found, "peer %s/%s in %s should be found in other members", peer.TaskId, realPeer.PeerID, pex.localMember.HostID) - } - case dfdaemonv1.PeerState_Failed, dfdaemonv1.PeerState_Deleted: - assert.Truef(searchPeerResult.Type == SearchPeerResultTypeNotFound, "%s should not have task %s", pex.localMember.HostID, peer.TaskId) - assert.Equalf(0, len(searchPeerResult.Peers), - "%s should not have any peers for task %s", pex.localMember.HostID, peer.TaskId) - default: - - } - } - } - }) - } -} - -func isPeerExistInOtherPEXServers(peerExchangeServers []*peerExchange, hostID string, peer *dfdaemonv1.PeerMetadata, real *DestPeer) bool { - for _, pex := range peerExchangeServers { - // skip for local - if pex.localMember.HostID == hostID { - continue - } - // verify peer with host id - if genPeerID(peer, pex) == real.PeerID { - return true - } - } - return false -} - -func genPeerID(peer *dfdaemonv1.PeerMetadata, pex *peerExchange) string { - return peer.PeerId + "-" + pex.localMember.HostID -} - -type mockServer struct { - PeerExchangeFunc func(dfdaemonv1.Daemon_PeerExchangeServer) error -} - -func (m *mockServer) LeaveHost(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) { - panic("should not be invoked") -} - -func (m *mockServer) Download(request *dfdaemonv1.DownRequest, server dfdaemonv1.Daemon_DownloadServer) error { - panic("should not be invoked") -} - -func (m *mockServer) GetPieceTasks(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { - panic("should not be invoked") -} - -func (m *mockServer) CheckHealth(ctx context.Context, empty *emptypb.Empty) (*emptypb.Empty, error) { - panic("should not be invoked") -} - -func (m *mockServer) SyncPieceTasks(server dfdaemonv1.Daemon_SyncPieceTasksServer) error { - panic("should not be invoked") -} - -func (m *mockServer) StatTask(ctx context.Context, request *dfdaemonv1.StatTaskRequest) (*emptypb.Empty, error) { - panic("should not be invoked") -} - -func (m *mockServer) ImportTask(ctx context.Context, request *dfdaemonv1.ImportTaskRequest) (*emptypb.Empty, error) { - panic("should not be invoked") -} - -func (m *mockServer) ExportTask(ctx context.Context, request *dfdaemonv1.ExportTaskRequest) (*emptypb.Empty, error) { - panic("should not be invoked") -} - -func (m *mockServer) DeleteTask(ctx context.Context, request *dfdaemonv1.DeleteTaskRequest) (*emptypb.Empty, error) { - panic("should not be invoked") -} - -func (m *mockServer) PeerExchange(server dfdaemonv1.Daemon_PeerExchangeServer) error { - return m.PeerExchangeFunc(server) -} - -type testMember struct { - idx int - rpcPort, gossipPort int -} - -func setupMember(assert *assert.Assertions, member *testMember, members []*memberlist.Node) *peerExchange { - listen, err := net.Listen("tcp", fmt.Sprintf(":%d", member.rpcPort)) - assert.Nil(err) - - memberMeta := &MemberMeta{ - HostID: fmt.Sprintf("host-%d", member.idx), - IP: "127.0.0.1", - RPCPort: int32(member.rpcPort), - ProxyPort: 0, - } - - lister := NewStaticPeerMemberLister(members) - - pex, err := NewPeerExchange( - func(task, peer string) error { - return nil - }, - lister, - time.Minute, - []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - }, - WithName(fmt.Sprintf("node-%d", member.idx)), - WithBindPort(member.gossipPort), - WithAdvertisePort(member.gossipPort), - WithAdvertiseAddr("127.0.0.1"), - WithInitialRetryInterval(10*time.Microsecond), - WithReSyncInterval(10*time.Microsecond), - ) - assert.Nil(err) - - ms := &mockServer{ - PeerExchangeFunc: pex.PeerExchangeRPC().PeerExchange, - } - - s := grpc.NewServer() - dfdaemonv1.RegisterDaemonServer(s, ms) - go func() { - if err := s.Serve(listen); err != nil { - log.Fatalf("grpc Serve exited with error: %v", err) - } - }() - go func() { - if err := pex.Serve(memberMeta); err != nil { - log.Fatalf("pex Serve exited with error: %v", err) - } - }() - return pex.(*peerExchange) -} - -func setupMembers(assert *assert.Assertions, memberCount int) []*peerExchange { - var ( - testMembers []*testMember - members []*memberlist.Node - peerExchangeServers []*peerExchange - ) - - ports, err := freeport.GetFreePorts(2 * memberCount) - assert.Nil(err) - - for i := range memberCount { - rpcPort, gossipPort := ports[2*i], ports[2*i+1] - testMembers = append(testMembers, &testMember{ - idx: i, - rpcPort: rpcPort, - gossipPort: gossipPort, - }) - members = append(members, &memberlist.Node{ - Addr: net.ParseIP("127.0.0.1"), - Port: uint16(gossipPort), - }) - } - - for i := range memberCount { - peerExchangeServers = append(peerExchangeServers, setupMember(assert, testMembers[i], members)) - } - return peerExchangeServers -} diff --git a/internal/job/constants.go b/internal/job/constants.go index 6b853a821..5689163a4 100644 --- a/internal/job/constants.go +++ b/internal/job/constants.go @@ -40,8 +40,8 @@ const ( // Machinery server configuration. const ( DefaultResultsExpireIn = 86400 - DefaultRedisMaxIdle = 10 - DefaultRedisMaxActive = 50 + DefaultRedisMaxIdle = 5 + DefaultRedisMaxActive = 20 DefaultRedisIdleTimeout = 30 DefaultRedisReadTimeout = 60 DefaultRedisWriteTimeout = 60 diff --git a/manager/cache/cache.go b/manager/cache/cache.go index 1f153011b..5e1ab6e45 100644 --- a/manager/cache/cache.go +++ b/manager/cache/cache.go @@ -45,6 +45,8 @@ func New(cfg *config.Config) (*Cache, error) { Password: cfg.Database.Redis.Password, SentinelUsername: cfg.Database.Redis.SentinelUsername, SentinelPassword: cfg.Database.Redis.SentinelPassword, + PoolSize: cfg.Database.Redis.PoolSize, + PoolTimeout: cfg.Database.Redis.PoolTimeout, }) if err != nil { return nil, err diff --git a/manager/config/config.go b/manager/config/config.go index 94b343828..2c2da0098 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -234,6 +234,12 @@ type RedisConfig struct { // BackendDB is server backend DB name. BackendDB int `yaml:"backendDB" mapstructure:"backendDB"` + // PoolSize is the maximum number of connections in the idle connection pool. + PoolSize int `yaml:"poolSize" mapstructure:"poolSize"` + + // PoolTimeout is the maximum amount of time a connection may be idle before being closed. + PoolTimeout time.Duration `yaml:"poolTimeout" mapstructure:"poolTimeout"` + // Proxy is redis proxy configuration. // If enabled, the manager starts a TCP proxy (defaulting to port 65100) that // forwards requests to the Redis service. This allows Schedulers to connect to Redis @@ -433,9 +439,11 @@ func New() *Config { Migrate: true, }, Redis: RedisConfig{ - DB: DefaultRedisDB, - BrokerDB: DefaultRedisBrokerDB, - BackendDB: DefaultRedisBackendDB, + DB: DefaultRedisDB, + BrokerDB: DefaultRedisBrokerDB, + BackendDB: DefaultRedisBackendDB, + PoolSize: DefaultRedisPoolSize, + PoolTimeout: DefaultRedisPoolTimeout, Proxy: RedisProxyConfig{ Enable: false, Addr: DefaultRedisProxyAddr, diff --git a/manager/config/config_test.go b/manager/config/config_test.go index cec354878..d08cefeaf 100644 --- a/manager/config/config_test.go +++ b/manager/config/config_test.go @@ -159,12 +159,14 @@ func TestConfig_Load(t *testing.T) { Migrate: true, }, Redis: RedisConfig{ - Password: "bar", - Addrs: []string{"foo", "bar"}, - MasterName: "baz", - DB: 0, - BrokerDB: 1, - BackendDB: 2, + Password: "bar", + Addrs: []string{"foo", "bar"}, + MasterName: "baz", + PoolSize: 10, + PoolTimeout: 1 * time.Second, + DB: 0, + BrokerDB: 1, + BackendDB: 2, Proxy: RedisProxyConfig{ Enable: true, Addr: ":65101", diff --git a/manager/config/constants.go b/manager/config/constants.go index e474e1544..b1665c636 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -57,6 +57,12 @@ const ( ) const ( + // DefaultRedisPoolSize is default pool size for redis. + DefaultRedisPoolSize = 20 + + // DefaultRedisPoolTimeout is default pool timeout for redis. + DefaultRedisPoolTimeout = 10 * time.Second + // DefaultRedisDB is default db for redis. DefaultRedisDB = 0 @@ -78,7 +84,7 @@ const ( DefaultLFUCacheTTL = 3 * time.Minute // DefaultLFUCacheSize is default size for lfu cache. - DefaultLFUCacheSize = 10 * 1000 + DefaultLFUCacheSize = 8 * 1000 ) const ( diff --git a/manager/config/testdata/manager.yaml b/manager/config/testdata/manager.yaml index c33606a9a..ad436f9e2 100644 --- a/manager/config/testdata/manager.yaml +++ b/manager/config/testdata/manager.yaml @@ -57,6 +57,8 @@ database: addrs: [foo, bar] masterName: baz password: bar + poolSize: 10 + poolTimeout: 1s db: 0 brokerDB: 1 backendDB: 2