mirror of https://github.com/kubernetes/kops.git
270 lines
7.2 KiB
Go
270 lines
7.2 KiB
Go
package mesh
|
|
|
|
import "sync"
|
|
|
|
// Gossip is the sending interface.
|
|
//
|
|
// TODO(pb): rename to e.g. Sender
|
|
type Gossip interface {
|
|
// GossipUnicast emits a single message to a peer in the mesh.
|
|
//
|
|
// TODO(pb): rename to Unicast?
|
|
//
|
|
// Unicast takes []byte instead of GossipData because "to date there has
|
|
// been no compelling reason [in practice] to do merging on unicast."
|
|
// But there may be some motivation to have unicast Mergeable; see
|
|
// https://github.com/weaveworks/weave/issues/1764
|
|
//
|
|
// TODO(pb): for uniformity of interface, rather take GossipData?
|
|
GossipUnicast(dst PeerName, msg []byte) error
|
|
|
|
// GossipBroadcast emits a message to all peers in the mesh.
|
|
//
|
|
// TODO(pb): rename to Broadcast?
|
|
GossipBroadcast(update GossipData)
|
|
}
|
|
|
|
// Gossiper is the receiving interface.
|
|
//
|
|
// TODO(pb): rename to e.g. Receiver
|
|
type Gossiper interface {
|
|
// OnGossipUnicast merges received data into state.
|
|
//
|
|
// TODO(pb): rename to e.g. OnUnicast
|
|
OnGossipUnicast(src PeerName, msg []byte) error
|
|
|
|
// OnGossipBroadcast merges received data into state and returns a
|
|
// representation of the received data (typically a delta) for further
|
|
// propagation.
|
|
//
|
|
// TODO(pb): rename to e.g. OnBroadcast
|
|
OnGossipBroadcast(src PeerName, update []byte) (received GossipData, err error)
|
|
|
|
// Gossip returns the state of everything we know; gets called periodically.
|
|
Gossip() (complete GossipData)
|
|
|
|
// OnGossip merges received data into state and returns "everything new
|
|
// I've just learnt", or nil if nothing in the received data was new.
|
|
OnGossip(msg []byte) (delta GossipData, err error)
|
|
}
|
|
|
|
// GossipData is a merge-able dataset.
|
|
// Think: log-structured data.
|
|
type GossipData interface {
|
|
// Encode encodes the data into multiple byte-slices.
|
|
Encode() [][]byte
|
|
|
|
// Merge combines another GossipData into this one and returns the result.
|
|
//
|
|
// TODO(pb): does it need to be leave the original unmodified?
|
|
Merge(GossipData) GossipData
|
|
}
|
|
|
|
// GossipSender accumulates GossipData that needs to be sent to one
|
|
// destination, and sends it when possible. GossipSender is one-to-one with a
|
|
// channel.
|
|
type gossipSender struct {
|
|
sync.Mutex
|
|
makeMsg func(msg []byte) protocolMsg
|
|
makeBroadcastMsg func(srcName PeerName, msg []byte) protocolMsg
|
|
sender protocolSender
|
|
gossip GossipData
|
|
broadcasts map[PeerName]GossipData
|
|
more chan<- struct{}
|
|
flush chan<- chan<- bool // for testing
|
|
}
|
|
|
|
// NewGossipSender constructs a usable GossipSender.
|
|
func newGossipSender(
|
|
makeMsg func(msg []byte) protocolMsg,
|
|
makeBroadcastMsg func(srcName PeerName, msg []byte) protocolMsg,
|
|
sender protocolSender,
|
|
stop <-chan struct{},
|
|
) *gossipSender {
|
|
more := make(chan struct{}, 1)
|
|
flush := make(chan chan<- bool)
|
|
s := &gossipSender{
|
|
makeMsg: makeMsg,
|
|
makeBroadcastMsg: makeBroadcastMsg,
|
|
sender: sender,
|
|
broadcasts: make(map[PeerName]GossipData),
|
|
more: more,
|
|
flush: flush,
|
|
}
|
|
go s.run(stop, more, flush)
|
|
return s
|
|
}
|
|
|
|
func (s *gossipSender) run(stop <-chan struct{}, more <-chan struct{}, flush <-chan chan<- bool) {
|
|
sent := false
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
return
|
|
case <-more:
|
|
sentSomething, err := s.deliver(stop)
|
|
if err != nil {
|
|
return
|
|
}
|
|
sent = sent || sentSomething
|
|
case ch := <-flush: // for testing
|
|
// send anything pending, then reply back whether we sent
|
|
// anything since previous flush
|
|
select {
|
|
case <-more:
|
|
sentSomething, err := s.deliver(stop)
|
|
if err != nil {
|
|
return
|
|
}
|
|
sent = sent || sentSomething
|
|
default:
|
|
}
|
|
ch <- sent
|
|
sent = false
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *gossipSender) deliver(stop <-chan struct{}) (bool, error) {
|
|
sent := false
|
|
// We must not hold our lock when sending, since that would block
|
|
// the callers of Send/Broadcast while we are stuck waiting for
|
|
// network congestion to clear. So we pick and send one piece of
|
|
// data at a time, only holding the lock during the picking.
|
|
for {
|
|
select {
|
|
case <-stop:
|
|
return sent, nil
|
|
default:
|
|
}
|
|
data, makeProtocolMsg := s.pick()
|
|
if data == nil {
|
|
return sent, nil
|
|
}
|
|
for _, msg := range data.Encode() {
|
|
if err := s.sender.SendProtocolMsg(makeProtocolMsg(msg)); err != nil {
|
|
return sent, err
|
|
}
|
|
}
|
|
sent = true
|
|
}
|
|
}
|
|
|
|
func (s *gossipSender) pick() (data GossipData, makeProtocolMsg func(msg []byte) protocolMsg) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
switch {
|
|
case s.gossip != nil: // usually more important than broadcasts
|
|
data = s.gossip
|
|
makeProtocolMsg = s.makeMsg
|
|
s.gossip = nil
|
|
case len(s.broadcasts) > 0:
|
|
for srcName, d := range s.broadcasts {
|
|
data = d
|
|
makeProtocolMsg = func(msg []byte) protocolMsg { return s.makeBroadcastMsg(srcName, msg) }
|
|
delete(s.broadcasts, srcName)
|
|
break
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Send accumulates the GossipData and will send it eventually.
|
|
// Send and Broadcast accumulate into different buckets.
|
|
func (s *gossipSender) Send(data GossipData) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
if s.empty() {
|
|
defer s.prod()
|
|
}
|
|
if s.gossip == nil {
|
|
s.gossip = data
|
|
} else {
|
|
s.gossip = s.gossip.Merge(data)
|
|
}
|
|
}
|
|
|
|
// Broadcast accumulates the GossipData under the given srcName and will send
|
|
// it eventually. Send and Broadcast accumulate into different buckets.
|
|
func (s *gossipSender) Broadcast(srcName PeerName, data GossipData) {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
if s.empty() {
|
|
defer s.prod()
|
|
}
|
|
d, found := s.broadcasts[srcName]
|
|
if !found {
|
|
s.broadcasts[srcName] = data
|
|
} else {
|
|
s.broadcasts[srcName] = d.Merge(data)
|
|
}
|
|
}
|
|
|
|
func (s *gossipSender) empty() bool { return s.gossip == nil && len(s.broadcasts) == 0 }
|
|
|
|
func (s *gossipSender) prod() {
|
|
select {
|
|
case s.more <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// Flush sends all pending data, and returns true if anything was sent since
|
|
// the previous flush. For testing.
|
|
func (s *gossipSender) Flush() bool {
|
|
ch := make(chan bool)
|
|
s.flush <- ch
|
|
return <-ch
|
|
}
|
|
|
|
// gossipSenders wraps a ProtocolSender (e.g. a LocalConnection) and yields
|
|
// per-channel GossipSenders.
|
|
// TODO(pb): may be able to remove this and use makeGossipSender directly
|
|
type gossipSenders struct {
|
|
sync.Mutex
|
|
sender protocolSender
|
|
stop <-chan struct{}
|
|
senders map[string]*gossipSender
|
|
}
|
|
|
|
// NewGossipSenders returns a usable GossipSenders leveraging the ProtocolSender.
|
|
// TODO(pb): is stop chan the best way to do that?
|
|
func newGossipSenders(sender protocolSender, stop <-chan struct{}) *gossipSenders {
|
|
return &gossipSenders{
|
|
sender: sender,
|
|
stop: stop,
|
|
senders: make(map[string]*gossipSender),
|
|
}
|
|
}
|
|
|
|
// Sender yields the GossipSender for the named channel.
|
|
// It will use the factory function if no sender yet exists.
|
|
func (gs *gossipSenders) Sender(channelName string, makeGossipSender func(sender protocolSender, stop <-chan struct{}) *gossipSender) *gossipSender {
|
|
gs.Lock()
|
|
defer gs.Unlock()
|
|
s, found := gs.senders[channelName]
|
|
if !found {
|
|
s = makeGossipSender(gs.sender, gs.stop)
|
|
gs.senders[channelName] = s
|
|
}
|
|
return s
|
|
}
|
|
|
|
// Flush flushes all managed senders. Used for testing.
|
|
func (gs *gossipSenders) Flush() bool {
|
|
sent := false
|
|
gs.Lock()
|
|
defer gs.Unlock()
|
|
for _, sender := range gs.senders {
|
|
sent = sender.Flush() || sent
|
|
}
|
|
return sent
|
|
}
|
|
|
|
// GossipChannels is an index of channel name to gossip channel.
|
|
type gossipChannels map[string]*gossipChannel
|
|
|
|
type gossipConnection interface {
|
|
gossipSenders() *gossipSenders
|
|
}
|