refactor naming API and etcd-based impl

This commit is contained in:
iamqizhao 2015-09-14 17:32:52 -07:00
parent 448ae5f6ab
commit a102af5f83
2 changed files with 109 additions and 127 deletions

View File

@ -1,145 +1,97 @@
package etcd
import (
"log"
"sync"
etcdcl "github.com/coreos/etcd/client"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
)
type kv struct {
key, value string
}
// recvBuffer is an unbounded channel of *kv to record all the pending changes from etcd server.
type recvBuffer struct {
c chan *kv
mu sync.Mutex
stopping bool
backlog []*kv
}
func newRecvBuffer() *recvBuffer {
b := &recvBuffer{
c: make(chan *kv, 1),
}
return b
}
func (b *recvBuffer) put(r *kv) {
b.mu.Lock()
defer b.mu.Unlock()
if b.stopping {
return
}
b.backlog = append(b.backlog, r)
select {
case b.c <- b.backlog[0]:
b.backlog = b.backlog[1:]
default:
}
}
func (b *recvBuffer) load() {
b.mu.Lock()
defer b.mu.Unlock()
if b.stopping || len(b.backlog) == 0 {
return
}
select {
case b.c <- b.backlog[0]:
b.backlog = b.backlog[1:]
default:
}
}
func (b *recvBuffer) get() <-chan *kv {
return b.c
}
// stop terminates the recvBuffer. After it is called, the recvBuffer is not usable any more.
func (b *recvBuffer) stop() {
b.mu.Lock()
b.stopping = true
close(b.c)
b.mu.Unlock()
}
type etcdNR struct {
kAPI etcdcl.KeysAPI
recv *recvBuffer
type watcher struct {
wr etcdcl.Watcher
ctx context.Context
cancel context.CancelFunc
}
// NewETCDNR creates an etcd NameResolver.
func NewETCDNR(cfg etcdcl.Config) (naming.Resolver, error) {
c, err := etcdcl.New(cfg)
if err != nil {
return nil, err
}
kAPI := etcdcl.NewKeysAPI(c)
ctx, cancel := context.WithCancel(context.Background())
return &etcdNR{
kAPI: kAPI,
recv: newRecvBuffer(),
ctx: ctx,
cancel: cancel,
}, nil
}
// getNode builds the resulting key-value map starting from node recursively.
func getNode(node *etcdcl.Node, res map[string]string) {
if !node.Dir {
res[node.Key] = node.Value
return
}
for _, val := range node.Nodes {
getNode(val, res)
}
}
func (nr *etcdNR) Get(target string) map[string]string {
resp, err := nr.kAPI.Get(nr.ctx, target, &etcdcl.GetOptions{Recursive: true, Sort: true})
if err != nil {
log.Printf("etcdNR.Get(_) stopped: %v", err)
return nil
}
res := make(map[string]string)
getNode(resp.Node, res)
return res
}
func (nr *etcdNR) Watch(target string) {
watcher := nr.kAPI.Watcher(target, &etcdcl.WatcherOptions{Recursive: true})
func (w *watcher) Next() (*naming.Update, error) {
for {
resp, err := watcher.Next(nr.ctx)
resp, err := w.wr.Next(w.ctx)
if err != nil {
log.Printf("etcdNR.Watch(_) stopped: %v", err)
break
return nil, err
}
if resp.Node.Dir {
continue
}
entry := &kv{key: resp.Node.Key, value: resp.Node.Value}
nr.recv.put(entry)
var act naming.OP
if resp.Action == "set" {
if resp.PrevNode == nil {
act = naming.Add
} else {
act = naming.Modify
}
} else if resp.Action == "delete" {
act = naming.Delete
}
if act == naming.No {
continue
}
return &naming.Update{
Op: act,
Key: resp.Node.Key,
Val: resp.Node.Value,
}, nil
}
}
func (nr *etcdNR) GetUpdate() (string, string) {
i := <-nr.recv.get()
nr.recv.load()
if i == nil {
return "", ""
func (w *watcher) Stop() {
w.cancel()
}
type resolver struct {
kapi etcdcl.KeysAPI
}
func (r *resolver) NewWatcher(target string) naming.Watcher {
ctx, cancel := context.WithCancel(context.Background())
return &watcher{
wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}),
ctx: ctx,
cancel: cancel,
}
// returns key and the corresponding value of the updated kv pair
return i.key, i.value
}
func (nr *etcdNR) Stop() {
nr.recv.stop()
nr.cancel()
// getNode reports the naming.Update starting from node recursively.
func getNode(node *etcdcl.Node) (updates []*naming.Update) {
for _, v := range node.Nodes {
updates = append(updates, getNode(v)...)
}
if !node.Dir {
entry := &naming.Update{
Op: naming.Add,
Key: node.Key,
Val: node.Value,
}
updates = []*naming.Update{entry}
}
return
}
func (r *resolver) Resolve(target string) ([]*naming.Update, error) {
resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true})
if err != nil {
return nil, err
}
updates := getNode(resp.Node)
return updates, nil
}
// NewResolver creates an etcd-based naming.Resolver.
func NewResolver(cfg etcdcl.Config) (naming.Resolver, error) {
c, err := etcdcl.New(cfg)
if err != nil {
return nil, err
}
return &resolver{
kapi: etcdcl.NewKeysAPI(c),
}, nil
}

View File

@ -1,13 +1,43 @@
package naming
// Resolver dose name resolution and watches for the resolution changes.
// OP defines the corresponding operations for a name resolution change.
type OP uint8
const (
// No indicates there are no changes.
No OP = iota
// Add indicates a new address is added.
Add
// Delete indicates an exisiting address is deleted.
Delete
// Modify indicates an existing address is modified.
Modify
)
type ServiceConfig interface{}
// Update defines a name resolution change.
type Update struct {
// Op indicates the operation of the update.
Op OP
Key string
Val string
Config ServiceConfig
}
// Resolver does one-shot name resolution and creates a Watcher to
// watch the future updates.
type Resolver interface {
// Get gets a snapshot of the current name resolution results for target.
Get(target string) map[string]string
// Watch watches for the name resolution changes on target. It blocks until Stop() is invoked. The watch results are obtained via GetUpdate().
Watch(target string)
// GetUpdate returns a name resolution change when watch is triggered. It blocks until it observes a change. The caller needs to call it again to get the next change.
GetUpdate() (string, string)
// Stop shuts down the NameResolver.
// Resolve returns the name resolution results.
Resolve(target string) ([]*Update, error)
// NewWatcher creates a Watcher to watch the changes on target.
NewWatcher(target string) Watcher
}
// Watcher watches the updates for a particular target.
type Watcher interface {
// Next blocks until an update or error happens.
Next() (*Update, error)
// Stop stops the Watcher.
Stop()
}