diff --git a/client/config/peerhost.go b/client/config/peerhost.go index cea488600..8e75cdbd1 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -983,4 +983,6 @@ type PeerExchangeOption struct { ReSyncInterval time.Duration `mapstructure:"reSyncInterval" yaml:"reSyncInterval"` // ReplicaThreshold is used for keeping replicas in all peers is not bigger than threshold to save storage ReplicaThreshold int `mapstructure:"replicaThreshold" yaml:"replicaThreshold"` + // ReplicaCleanPercentage is percentage probability to clean local replica when reach threshold, available values: [0, 100] + ReplicaCleanPercentage int32 `mapstructure:"replicaCleanPercentage" yaml:"replicaCleanPercentage"` } diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index f6874f2f8..4b1860df7 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -191,11 +191,12 @@ var peerHostConfig = func() *DaemonOption { LogMaxAge: DefaultLogRotateMaxAge, LogMaxBackups: DefaultLogRotateMaxBackups, PeerExchange: PeerExchangeOption{ - Enable: false, - InitialInterval: time.Minute, - InitialBroadcastDelay: 3 * time.Minute, - ReSyncInterval: 10 * time.Minute, - ReplicaThreshold: 2, + Enable: false, + InitialInterval: time.Minute, + InitialBroadcastDelay: 3 * time.Minute, + ReSyncInterval: 10 * time.Minute, + ReplicaThreshold: 2, + ReplicaCleanPercentage: 1, }, } } diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index 5fc661401..e4f7256e6 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -191,11 +191,12 @@ var peerHostConfig = func() *DaemonOption { LogMaxAge: DefaultLogRotateMaxAge, LogMaxBackups: DefaultLogRotateMaxBackups, PeerExchange: PeerExchangeOption{ - Enable: false, - InitialInterval: time.Minute, - InitialBroadcastDelay: 3 * time.Minute, - ReSyncInterval: 10 * time.Minute, - ReplicaThreshold: 2, + Enable: false, + InitialInterval: time.Minute, + InitialBroadcastDelay: 3 * time.Minute, + ReSyncInterval: 10 * time.Minute, + ReplicaThreshold: 2, + ReplicaCleanPercentage: 1, }, } } diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 4dd54d99c..5a53b60d7 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -257,7 +257,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { }, pex.WithInitialRetryInterval(opt.PeerExchange.InitialInterval), pex.WithReSyncInterval(opt.PeerExchange.ReSyncInterval), - pex.WithReplicaThreshold(opt.PeerExchange.ReplicaThreshold)) + pex.WithReplicaThreshold(opt.PeerExchange.ReplicaThreshold), + pex.WithReplicaCleanPercentage(opt.PeerExchange.ReplicaCleanPercentage)) if err != nil { return nil, err } diff --git a/client/daemon/pex/peer_exchange.go b/client/daemon/pex/peer_exchange.go index 63c1ba886..e04b95ef6 100644 --- a/client/daemon/pex/peer_exchange.go +++ b/client/daemon/pex/peer_exchange.go @@ -43,9 +43,10 @@ type peerExchange struct { } type peerExchangeConfig struct { - initialRetryInterval time.Duration - reSyncInterval time.Duration - replicaThreshold int + initialRetryInterval time.Duration + reSyncInterval time.Duration + replicaThreshold int + replicaCleanPercentage int32 } func WithName(name string) func(*memberlist.Config, *peerExchangeConfig) { @@ -102,6 +103,14 @@ func WithReplicaThreshold(threshold int) func(*memberlist.Config, *peerExchangeC } } +func WithReplicaCleanPercentage(percentage int32) func(*memberlist.Config, *peerExchangeConfig) { + return func(memberConfig *memberlist.Config, pexConfig *peerExchangeConfig) { + if percentage > 0 { + pexConfig.replicaCleanPercentage = percentage + } + } +} + func NewPeerExchange( reclaim ReclaimFunc, lister InitialMemberLister, @@ -130,6 +139,7 @@ func NewPeerExchange( logger.Infof("peer exchange initial retry interval: %s", pexConfig.initialRetryInterval) logger.Infof("peer exchange re-sync interval: %s", pexConfig.reSyncInterval) logger.Infof("peer exchange replica threshold: %d", pexConfig.replicaThreshold) + logger.Infof("peer exchange replica clean percentage: %d", pexConfig.replicaCleanPercentage) pex := &peerExchange{ config: pexConfig, @@ -168,7 +178,11 @@ func (p *peerExchange) SearchPeer(task string) SearchPeerResult { case SearchPeerResultTypeLocal: // check replica threshold and reclaim local cache if len(searchPeerResult.Peers) > p.config.replicaThreshold { - p.tryReclaim(task, searchPeerResult) + if p.tryReclaim(task, searchPeerResult) { + // change result type to remote and drop local peer + searchPeerResult.Type = SearchPeerResultTypeRemote + searchPeerResult.Peers = searchPeerResult.Peers[1:] + } } case SearchPeerResultTypeRemote: if len(searchPeerResult.Peers) < p.config.replicaThreshold { @@ -179,18 +193,24 @@ func (p *peerExchange) SearchPeer(task string) SearchPeerResult { return searchPeerResult } -func (p *peerExchange) tryReclaim(task string, searchPeerResult SearchPeerResult) { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - // reclaim with 1% probability for shrink double reclaim with other members - if r.Int31n(100) == 0 { - peer := searchPeerResult.Peers[0].PeerID - searchPeerResult.Type = SearchPeerResultTypeRemote - p.memberManager.logger.Debugf("task %s replica threshold reached, try to reclaim local peer cache %s", task, peer) - err := p.reclaim(task, peer) - if err != nil { - p.memberManager.logger.Warnf("task %s peer %s reclaim local cache error: %s", task, peer, err) - } +func (p *peerExchange) tryReclaim(task string, searchPeerResult SearchPeerResult) bool { + if p.config.replicaCleanPercentage == 0 { + return false } + r := rand.New(rand.NewSource(time.Now().UnixNano())) + // reclaim with probability for shrink double reclaim with other members + // Int31n is [0, n), +1 for percentage [1, 100] + if r.Int31n(100)+1 > p.config.replicaCleanPercentage { + return false + } + // when Type is SearchPeerResultTypeLocal, peer 0 is always local peer + peer := searchPeerResult.Peers[0].PeerID + p.memberManager.logger.Debugf("task %s replica threshold reached, try to reclaim local peer cache %s", task, peer) + err := p.reclaim(task, peer) + if err != nil { + p.memberManager.logger.Warnf("task %s peer %s reclaim local cache error: %s", task, peer, err) + } + return true } func (p *peerExchange) BroadcastPeer(data *dfdaemonv1.PeerMetadata) {