package mesh import "sync" // Gossip is the sending interface. // // TODO(pb): rename to e.g. Sender type Gossip interface { // GossipUnicast emits a single message to a peer in the mesh. // // TODO(pb): rename to Unicast? // // Unicast takes []byte instead of GossipData because "to date there has // been no compelling reason [in practice] to do merging on unicast." // But there may be some motivation to have unicast Mergeable; see // https://github.com/weaveworks/weave/issues/1764 // // TODO(pb): for uniformity of interface, rather take GossipData? GossipUnicast(dst PeerName, msg []byte) error // GossipBroadcast emits a message to all peers in the mesh. // // TODO(pb): rename to Broadcast? GossipBroadcast(update GossipData) } // Gossiper is the receiving interface. // // TODO(pb): rename to e.g. Receiver type Gossiper interface { // OnGossipUnicast merges received data into state. // // TODO(pb): rename to e.g. OnUnicast OnGossipUnicast(src PeerName, msg []byte) error // OnGossipBroadcast merges received data into state and returns a // representation of the received data (typically a delta) for further // propagation. // // TODO(pb): rename to e.g. OnBroadcast OnGossipBroadcast(src PeerName, update []byte) (received GossipData, err error) // Gossip returns the state of everything we know; gets called periodically. Gossip() (complete GossipData) // OnGossip merges received data into state and returns "everything new // I've just learnt", or nil if nothing in the received data was new. OnGossip(msg []byte) (delta GossipData, err error) } // GossipData is a merge-able dataset. // Think: log-structured data. type GossipData interface { // Encode encodes the data into multiple byte-slices. Encode() [][]byte // Merge combines another GossipData into this one and returns the result. // // TODO(pb): does it need to be leave the original unmodified? Merge(GossipData) GossipData } // GossipSender accumulates GossipData that needs to be sent to one // destination, and sends it when possible. GossipSender is one-to-one with a // channel. type gossipSender struct { sync.Mutex makeMsg func(msg []byte) protocolMsg makeBroadcastMsg func(srcName PeerName, msg []byte) protocolMsg sender protocolSender gossip GossipData broadcasts map[PeerName]GossipData more chan<- struct{} flush chan<- chan<- bool // for testing } // NewGossipSender constructs a usable GossipSender. func newGossipSender( makeMsg func(msg []byte) protocolMsg, makeBroadcastMsg func(srcName PeerName, msg []byte) protocolMsg, sender protocolSender, stop <-chan struct{}, ) *gossipSender { more := make(chan struct{}, 1) flush := make(chan chan<- bool) s := &gossipSender{ makeMsg: makeMsg, makeBroadcastMsg: makeBroadcastMsg, sender: sender, broadcasts: make(map[PeerName]GossipData), more: more, flush: flush, } go s.run(stop, more, flush) return s } func (s *gossipSender) run(stop <-chan struct{}, more <-chan struct{}, flush <-chan chan<- bool) { sent := false for { select { case <-stop: return case <-more: sentSomething, err := s.deliver(stop) if err != nil { return } sent = sent || sentSomething case ch := <-flush: // for testing // send anything pending, then reply back whether we sent // anything since previous flush select { case <-more: sentSomething, err := s.deliver(stop) if err != nil { return } sent = sent || sentSomething default: } ch <- sent sent = false } } } func (s *gossipSender) deliver(stop <-chan struct{}) (bool, error) { sent := false // We must not hold our lock when sending, since that would block // the callers of Send/Broadcast while we are stuck waiting for // network congestion to clear. So we pick and send one piece of // data at a time, only holding the lock during the picking. for { select { case <-stop: return sent, nil default: } data, makeProtocolMsg := s.pick() if data == nil { return sent, nil } for _, msg := range data.Encode() { if err := s.sender.SendProtocolMsg(makeProtocolMsg(msg)); err != nil { return sent, err } } sent = true } } func (s *gossipSender) pick() (data GossipData, makeProtocolMsg func(msg []byte) protocolMsg) { s.Lock() defer s.Unlock() switch { case s.gossip != nil: // usually more important than broadcasts data = s.gossip makeProtocolMsg = s.makeMsg s.gossip = nil case len(s.broadcasts) > 0: for srcName, d := range s.broadcasts { data = d makeProtocolMsg = func(msg []byte) protocolMsg { return s.makeBroadcastMsg(srcName, msg) } delete(s.broadcasts, srcName) break } } return } // Send accumulates the GossipData and will send it eventually. // Send and Broadcast accumulate into different buckets. func (s *gossipSender) Send(data GossipData) { s.Lock() defer s.Unlock() if s.empty() { defer s.prod() } if s.gossip == nil { s.gossip = data } else { s.gossip = s.gossip.Merge(data) } } // Broadcast accumulates the GossipData under the given srcName and will send // it eventually. Send and Broadcast accumulate into different buckets. func (s *gossipSender) Broadcast(srcName PeerName, data GossipData) { s.Lock() defer s.Unlock() if s.empty() { defer s.prod() } d, found := s.broadcasts[srcName] if !found { s.broadcasts[srcName] = data } else { s.broadcasts[srcName] = d.Merge(data) } } func (s *gossipSender) empty() bool { return s.gossip == nil && len(s.broadcasts) == 0 } func (s *gossipSender) prod() { select { case s.more <- struct{}{}: default: } } // Flush sends all pending data, and returns true if anything was sent since // the previous flush. For testing. func (s *gossipSender) Flush() bool { ch := make(chan bool) s.flush <- ch return <-ch } // gossipSenders wraps a ProtocolSender (e.g. a LocalConnection) and yields // per-channel GossipSenders. // TODO(pb): may be able to remove this and use makeGossipSender directly type gossipSenders struct { sync.Mutex sender protocolSender stop <-chan struct{} senders map[string]*gossipSender } // NewGossipSenders returns a usable GossipSenders leveraging the ProtocolSender. // TODO(pb): is stop chan the best way to do that? func newGossipSenders(sender protocolSender, stop <-chan struct{}) *gossipSenders { return &gossipSenders{ sender: sender, stop: stop, senders: make(map[string]*gossipSender), } } // Sender yields the GossipSender for the named channel. // It will use the factory function if no sender yet exists. func (gs *gossipSenders) Sender(channelName string, makeGossipSender func(sender protocolSender, stop <-chan struct{}) *gossipSender) *gossipSender { gs.Lock() defer gs.Unlock() s, found := gs.senders[channelName] if !found { s = makeGossipSender(gs.sender, gs.stop) gs.senders[channelName] = s } return s } // Flush flushes all managed senders. Used for testing. func (gs *gossipSenders) Flush() bool { sent := false gs.Lock() defer gs.Unlock() for _, sender := range gs.senders { sent = sender.Flush() || sent } return sent } // GossipChannels is an index of channel name to gossip channel. type gossipChannels map[string]*gossipChannel type gossipConnection interface { gossipSenders() *gossipSenders }