From a102af5f834b9cd0a91165e1ea1e7f85c6fed10a Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 14 Sep 2015 17:32:52 -0700 Subject: [PATCH] refactor naming API and etcd-based impl --- naming/etcd/etcd.go | 190 +++++++++++++++++--------------------------- naming/naming.go | 46 +++++++++-- 2 files changed, 109 insertions(+), 127 deletions(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index 915e22717..e140068a9 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -1,145 +1,97 @@ package etcd import ( - "log" - "sync" - etcdcl "github.com/coreos/etcd/client" "golang.org/x/net/context" "google.golang.org/grpc/naming" ) -type kv struct { - key, value string -} - -// recvBuffer is an unbounded channel of *kv to record all the pending changes from etcd server. -type recvBuffer struct { - c chan *kv - mu sync.Mutex - stopping bool - backlog []*kv -} - -func newRecvBuffer() *recvBuffer { - b := &recvBuffer{ - c: make(chan *kv, 1), - } - return b -} - -func (b *recvBuffer) put(r *kv) { - b.mu.Lock() - defer b.mu.Unlock() - if b.stopping { - return - } - b.backlog = append(b.backlog, r) - select { - case b.c <- b.backlog[0]: - b.backlog = b.backlog[1:] - default: - } -} - -func (b *recvBuffer) load() { - b.mu.Lock() - defer b.mu.Unlock() - if b.stopping || len(b.backlog) == 0 { - return - } - select { - case b.c <- b.backlog[0]: - b.backlog = b.backlog[1:] - default: - } -} - -func (b *recvBuffer) get() <-chan *kv { - return b.c -} - -// stop terminates the recvBuffer. After it is called, the recvBuffer is not usable any more. -func (b *recvBuffer) stop() { - b.mu.Lock() - b.stopping = true - close(b.c) - b.mu.Unlock() -} - -type etcdNR struct { - kAPI etcdcl.KeysAPI - recv *recvBuffer +type watcher struct { + wr etcdcl.Watcher ctx context.Context cancel context.CancelFunc } -// NewETCDNR creates an etcd NameResolver. -func NewETCDNR(cfg etcdcl.Config) (naming.Resolver, error) { - c, err := etcdcl.New(cfg) - if err != nil { - return nil, err - } - kAPI := etcdcl.NewKeysAPI(c) - ctx, cancel := context.WithCancel(context.Background()) - return &etcdNR{ - kAPI: kAPI, - recv: newRecvBuffer(), - ctx: ctx, - cancel: cancel, - }, nil -} - -// getNode builds the resulting key-value map starting from node recursively. -func getNode(node *etcdcl.Node, res map[string]string) { - if !node.Dir { - res[node.Key] = node.Value - return - } - for _, val := range node.Nodes { - getNode(val, res) - } -} - -func (nr *etcdNR) Get(target string) map[string]string { - resp, err := nr.kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true}) - if err != nil { - log.Printf("etcdNR.Get(_) stopped: %v", err) - return nil - } - res := make(map[string]string) - getNode(resp.Node, res) - return res -} - -func (nr *etcdNR) Watch(target string) { - watcher := nr.kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}) +func (w *watcher) Next() (*naming.Update, error) { for { - resp, err := watcher.Next(nr.ctx) + resp, err := w.wr.Next(w.ctx) if err != nil { - log.Printf("etcdNR.Watch(_) stopped: %v", err) - break + return nil, err } if resp.Node.Dir { continue } - entry := &kv{key: resp.Node.Key, value: resp.Node.Value} - nr.recv.put(entry) + var act naming.OP + if resp.Action == "set" { + if resp.PrevNode == nil { + act = naming.Add + } else { + act = naming.Modify + } + } else if resp.Action == "delete" { + act = naming.Delete + } + if act == naming.No { + continue + } + return &naming.Update{ + Op: act, + Key: resp.Node.Key, + Val: resp.Node.Value, + }, nil } } -func (nr *etcdNR) GetUpdate() (string, string) { - i := <-nr.recv.get() - nr.recv.load() - if i == nil { - return "", "" +func (w *watcher) Stop() { + w.cancel() +} + +type resolver struct { + kapi etcdcl.KeysAPI +} + +func (r *resolver) NewWatcher(target string) naming.Watcher { + ctx, cancel := context.WithCancel(context.Background()) + return &watcher{ + wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}), + ctx: ctx, + cancel: cancel, } - // returns key and the corresponding value of the updated kv pair - return i.key, i.value } -func (nr *etcdNR) Stop() { - nr.recv.stop() - nr.cancel() +// getNode reports the naming.Update starting from node recursively. +func getNode(node *etcdcl.Node) (updates []*naming.Update) { + for _, v := range node.Nodes { + updates = append(updates, getNode(v)...) + } + if !node.Dir { + entry := &naming.Update{ + Op: naming.Add, + Key: node.Key, + Val: node.Value, + } + updates = []*naming.Update{entry} + } + return +} + +func (r *resolver) Resolve(target string) ([]*naming.Update, error) { + resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true}) + if err != nil { + return nil, err + } + updates := getNode(resp.Node) + return updates, nil +} + +// NewResolver creates an etcd-based naming.Resolver. +func NewResolver(cfg etcdcl.Config) (naming.Resolver, error) { + c, err := etcdcl.New(cfg) + if err != nil { + return nil, err + } + return &resolver{ + kapi: etcdcl.NewKeysAPI(c), + }, nil } diff --git a/naming/naming.go b/naming/naming.go index a6a319f75..a1fd33570 100644 --- a/naming/naming.go +++ b/naming/naming.go @@ -1,13 +1,43 @@ package naming -// Resolver dose name resolution and watches for the resolution changes. +// OP defines the corresponding operations for a name resolution change. +type OP uint8 + +const ( + // No indicates there are no changes. + No OP = iota + // Add indicates a new address is added. + Add + // Delete indicates an exisiting address is deleted. + Delete + // Modify indicates an existing address is modified. + Modify +) + +type ServiceConfig interface{} + +// Update defines a name resolution change. +type Update struct { + // Op indicates the operation of the update. + Op OP + Key string + Val string + Config ServiceConfig +} + +// Resolver does one-shot name resolution and creates a Watcher to +// watch the future updates. type Resolver interface { - // Get gets a snapshot of the current name resolution results for target. - Get(target string) map[string]string - // Watch watches for the name resolution changes on target. It blocks until Stop() is invoked. The watch results are obtained via GetUpdate(). - Watch(target string) - // GetUpdate returns a name resolution change when watch is triggered. It blocks until it observes a change. The caller needs to call it again to get the next change. - GetUpdate() (string, string) - // Stop shuts down the NameResolver. + // Resolve returns the name resolution results. + Resolve(target string) ([]*Update, error) + // NewWatcher creates a Watcher to watch the changes on target. + NewWatcher(target string) Watcher +} + +// Watcher watches the updates for a particular target. +type Watcher interface { + // Next blocks until an update or error happens. + Next() (*Update, error) + // Stop stops the Watcher. Stop() }