*: add etcd2

This commit is contained in:
Gyu-Ho Lee 2016-03-22 02:04:49 -07:00
parent 934c2dec84
commit e3cb652892
8 changed files with 244 additions and 70 deletions

View File

@ -203,11 +203,9 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
if err != nil {
return nil, err
}
if err := os.RemoveAll(etcdDataDir); err != nil {
return nil, err
}
f, err := openToAppend(t.req.DatabaseLogPath)
if err != nil {
return nil, err
@ -221,7 +219,7 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
peerURLs := make([]string, clusterN)
members := make([]string, clusterN)
for i, u := range peerIPs {
names[i] = fmt.Sprintf("etcd-%d", i)
names[i] = fmt.Sprintf("etcd-%d", i+1)
clientURLs[i] = fmt.Sprintf("http://%s:2379", u)
peerURLs[i] = fmt.Sprintf("http://%s:2380", u)
members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i])
@ -264,6 +262,68 @@ func (t *transporterServer) Transfer(ctx context.Context, r *Request) (*Response
log.Printf("Exiting %s", cmd.Path)
}()
case Request_etcd2:
_, err := os.Stat(etcdBinaryPath)
if err != nil {
return nil, err
}
if err := os.RemoveAll(etcdDataDir); err != nil {
return nil, err
}
f, err := openToAppend(t.req.DatabaseLogPath)
if err != nil {
return nil, err
}
t.logfile = f
// generate flags from etcd server name
clusterN := len(peerIPs)
names := make([]string, clusterN)
clientURLs := make([]string, clusterN)
peerURLs := make([]string, clusterN)
members := make([]string, clusterN)
for i, u := range peerIPs {
names[i] = fmt.Sprintf("etcd-%d", i+1)
clientURLs[i] = fmt.Sprintf("http://%s:2379", u)
peerURLs[i] = fmt.Sprintf("http://%s:2380", u)
members[i] = fmt.Sprintf("%s=%s", names[i], peerURLs[i])
}
clusterStr := strings.Join(members, ",")
flags := []string{
"--name", fmt.Sprintf("etcd-%d", t.req.EtcdServerIndex),
"--data-dir", etcdDataDir,
"--listen-client-urls", clientURLs[t.req.EtcdServerIndex],
"--advertise-client-urls", clientURLs[t.req.EtcdServerIndex],
"--listen-peer-urls", peerURLs[t.req.EtcdServerIndex],
"--initial-advertise-peer-urls", peerURLs[t.req.EtcdServerIndex],
"--initial-cluster-token", etcdToken,
"--initial-cluster", clusterStr,
"--initial-cluster-state", "new",
}
flagString := strings.Join(flags, " ")
cmd := exec.Command(etcdBinaryPath, flags...)
cmd.Stdout = f
cmd.Stderr = f
log.Printf("Starting: %s %s", cmd.Path, flagString)
if err := cmd.Start(); err != nil {
return nil, err
}
t.cmd = cmd
t.pid = cmd.Process.Pid
log.Printf("Started: %s [PID: %d]", cmd.Path, t.pid)
processPID = t.pid
go func() {
if err := cmd.Wait(); err != nil {
log.Printf("Start(%s) cmd.Wait returned %v", cmd.Path, err)
return
}
log.Printf("Exiting %s", cmd.Path)
}()
case Request_ZooKeeper:
_, err := os.Stat("/usr/bin/java")
if err != nil {

View File

@ -63,16 +63,19 @@ type Request_Database int32
const (
Request_etcd Request_Database = 0
Request_ZooKeeper Request_Database = 1
Request_etcd2 Request_Database = 1
Request_ZooKeeper Request_Database = 2
)
var Request_Database_name = map[int32]string{
0: "etcd",
1: "ZooKeeper",
1: "etcd2",
2: "ZooKeeper",
}
var Request_Database_value = map[string]int32{
"etcd": 0,
"ZooKeeper": 1,
"etcd2": 1,
"ZooKeeper": 2,
}
func (x Request_Database) String() string {
@ -954,37 +957,37 @@ var (
)
var fileDescriptorMessage = []byte{
// 505 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x93, 0xcf, 0x72, 0x12, 0x41,
0x10, 0xc6, 0x21, 0x09, 0x61, 0xb7, 0x91, 0x04, 0xa7, 0xd4, 0x8c, 0xa9, 0x14, 0x65, 0x61, 0x0e,
0x39, 0x08, 0xa9, 0x4a, 0x2c, 0xcb, 0x43, 0x2e, 0x42, 0x2e, 0x31, 0x9a, 0x50, 0x8b, 0x27, 0x6f,
0xcb, 0xd2, 0x6c, 0xd6, 0x2c, 0xdb, 0x38, 0x3b, 0x6b, 0x91, 0x3c, 0x89, 0x8f, 0x94, 0xa3, 0x07,
0x1f, 0xc0, 0x3f, 0x2f, 0x62, 0xd3, 0x04, 0x30, 0x44, 0x3c, 0x4c, 0xd5, 0xf4, 0xf7, 0xfd, 0xbe,
0x86, 0xd9, 0xe9, 0x81, 0xf2, 0x00, 0xd3, 0xd4, 0x0f, 0xb1, 0x31, 0x34, 0x64, 0x49, 0x15, 0x78,
0x9b, 0xd8, 0xed, 0x7a, 0x18, 0xd9, 0x8b, 0xac, 0xdb, 0x08, 0x68, 0xb0, 0x1f, 0x52, 0x48, 0xfb,
0xe2, 0x76, 0xb3, 0xbe, 0x54, 0x52, 0xc8, 0x6e, 0x92, 0xaa, 0x7d, 0x2f, 0x40, 0xd1, 0xc3, 0xcf,
0x19, 0xa6, 0x56, 0xbd, 0x02, 0x97, 0x86, 0x68, 0x7c, 0x1b, 0x51, 0xa2, 0xf3, 0xcf, 0xf2, 0x7b,
0x1b, 0x07, 0xba, 0x21, 0x5d, 0x1b, 0xb7, 0x48, 0xe3, 0x7c, 0xea, 0x7b, 0x73, 0x54, 0x1d, 0x82,
0xd3, 0xf3, 0xad, 0xdf, 0xf5, 0x53, 0xd4, 0x2b, 0x12, 0xdb, 0x5a, 0x88, 0x1d, 0xdf, 0xda, 0xde,
0x0c, 0x54, 0x1a, 0x8a, 0x43, 0x44, 0x73, 0xd2, 0x4e, 0xf5, 0x2a, 0x67, 0x5c, 0x6f, 0x5a, 0xaa,
0x3d, 0xd8, 0x44, 0x1b, 0xf4, 0x3a, 0x68, 0xbe, 0xb0, 0x90, 0xf4, 0x70, 0xa4, 0xd7, 0x98, 0x28,
0x7b, 0x8b, 0xb2, 0xda, 0x85, 0xf2, 0x35, 0xd1, 0x25, 0x22, 0xff, 0x95, 0xf7, 0x57, 0x27, 0xc7,
0xba, 0x20, 0xdc, 0x5d, 0x51, 0xbd, 0x84, 0xc7, 0x33, 0xa1, 0x6d, 0xf0, 0x4d, 0x1c, 0x53, 0xd0,
0x89, 0xae, 0x51, 0xaf, 0x33, 0xbd, 0xea, 0xfd, 0xdb, 0x54, 0xaf, 0x61, 0x6b, 0xde, 0xc6, 0x1f,
0xb5, 0xe2, 0x88, 0x0f, 0xd4, 0x4a, 0x46, 0x49, 0xaa, 0x8b, 0x92, 0x5b, 0x66, 0xab, 0x1d, 0x70,
0x63, 0x0a, 0xb9, 0x59, 0x3f, 0x1a, 0x69, 0x47, 0xce, 0x36, 0x17, 0xc6, 0xa7, 0x9b, 0x7e, 0x83,
0x77, 0x2c, 0xfa, 0xf6, 0x42, 0xbb, 0xc2, 0x2c, 0xca, 0xea, 0x05, 0x3c, 0x1c, 0x50, 0x12, 0x59,
0x32, 0x1e, 0xa6, 0x59, 0x6c, 0x85, 0x05, 0x61, 0xef, 0x1b, 0x7c, 0x79, 0x4f, 0x42, 0xa2, 0x30,
0xc6, 0x56, 0x4c, 0x59, 0xaf, 0x6d, 0xe8, 0x13, 0x06, 0xf6, 0xcc, 0x1f, 0xa0, 0x2e, 0x49, 0x64,
0x89, 0xab, 0x8e, 0xe0, 0xe9, 0x5f, 0x4e, 0x87, 0x9b, 0xf2, 0xd5, 0xbd, 0xed, 0x9c, 0x9f, 0x9d,
0xe2, 0x95, 0x7e, 0x20, 0xd1, 0xe5, 0x80, 0x6a, 0xc2, 0xce, 0x7d, 0xb3, 0x99, 0x05, 0x97, 0x38,
0xf9, 0xed, 0xb2, 0x34, 0xf8, 0x2f, 0x53, 0xab, 0x83, 0x3b, 0x1b, 0x2b, 0xe5, 0x42, 0xa1, 0x63,
0x7d, 0x63, 0x2b, 0x39, 0xe5, 0xc0, 0x1a, 0xc3, 0xc3, 0x4a, 0x5e, 0x95, 0xc6, 0x33, 0x9a, 0x8a,
0xbc, 0x52, 0x7b, 0x0e, 0xce, 0x74, 0x9c, 0xc6, 0xc8, 0x78, 0x26, 0x18, 0x2e, 0x83, 0xfb, 0x91,
0xe8, 0x54, 0xee, 0xa3, 0x92, 0xaf, 0xed, 0x82, 0xc3, 0x89, 0x21, 0x25, 0x93, 0x49, 0x4b, 0xb3,
0x20, 0xe0, 0xc7, 0x22, 0x43, 0xed, 0x78, 0xd3, 0xf2, 0xe0, 0x08, 0x4a, 0x1f, 0x8c, 0x9f, 0x30,
0x67, 0x2c, 0x1a, 0x55, 0x07, 0x47, 0xca, 0x3e, 0xef, 0x37, 0xee, 0x4e, 0xf0, 0xf6, 0xe6, 0xac,
0x9e, 0x74, 0xad, 0xe5, 0x9a, 0x8f, 0x6e, 0x7e, 0x56, 0x73, 0x37, 0xbf, 0xaa, 0xf9, 0x6f, 0xbc,
0x7e, 0xf0, 0xfa, 0xfa, 0xbb, 0x9a, 0xeb, 0xae, 0xcb, 0xbb, 0x3a, 0xfc, 0x13, 0x00, 0x00, 0xff,
0xff, 0xb2, 0xf8, 0x8a, 0x3b, 0x9e, 0x03, 0x00, 0x00,
// 508 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x7c, 0x93, 0xcf, 0x4e, 0xdb, 0x40,
0x10, 0xc6, 0x13, 0x20, 0xc4, 0x9e, 0x34, 0x40, 0x57, 0x6d, 0xd9, 0x22, 0x14, 0x55, 0x11, 0x07,
0x0e, 0x8d, 0x91, 0x42, 0x55, 0xf5, 0xc0, 0xa5, 0x09, 0x17, 0x4a, 0x0b, 0x91, 0xd3, 0x53, 0x6f,
0x1b, 0x67, 0x62, 0x5c, 0x1c, 0x6f, 0xba, 0x5e, 0x57, 0x81, 0x27, 0xe9, 0x23, 0x71, 0xec, 0x1b,
0xf4, 0xdf, 0x8b, 0x74, 0x32, 0xc1, 0x49, 0x09, 0x0d, 0x07, 0x4b, 0x33, 0xdf, 0xf7, 0xfb, 0x26,
0x59, 0xef, 0x18, 0xaa, 0x43, 0x4c, 0x53, 0x15, 0xa2, 0x37, 0x32, 0xda, 0x6a, 0x51, 0xa2, 0x32,
0xb1, 0x3b, 0x8d, 0x30, 0xb2, 0x17, 0x59, 0xcf, 0x0b, 0xf4, 0xf0, 0x20, 0xd4, 0xa1, 0x3e, 0x60,
0xb7, 0x97, 0x0d, 0xb8, 0xe3, 0x86, 0xab, 0x69, 0xaa, 0xfe, 0xa3, 0x04, 0x65, 0x1f, 0xbf, 0x64,
0x98, 0x5a, 0xf1, 0x1a, 0x5c, 0x3d, 0x42, 0xa3, 0x6c, 0xa4, 0x13, 0x59, 0x7c, 0x51, 0xdc, 0xdf,
0x68, 0x4a, 0x8f, 0xa7, 0x7a, 0xb7, 0x88, 0x77, 0x9e, 0xfb, 0xfe, 0x1c, 0x15, 0x87, 0xe0, 0xf4,
0x95, 0x55, 0x3d, 0x95, 0xa2, 0x5c, 0xe1, 0xd8, 0xf6, 0x42, 0xec, 0xf8, 0xd6, 0xf6, 0x67, 0xa0,
0x90, 0x50, 0x1e, 0x21, 0x9a, 0x93, 0x4e, 0x2a, 0x57, 0x29, 0xe3, 0xfa, 0x79, 0x2b, 0xf6, 0x61,
0x13, 0x6d, 0xd0, 0xef, 0xa2, 0xf9, 0x4a, 0x42, 0xd2, 0xc7, 0xb1, 0x5c, 0x23, 0xa2, 0xea, 0x2f,
0xca, 0x62, 0x0f, 0xaa, 0xd7, 0x5a, 0x5f, 0x22, 0xd2, 0x5f, 0xf9, 0x70, 0x75, 0x72, 0x2c, 0x4b,
0xcc, 0xdd, 0x15, 0xc5, 0x2b, 0x78, 0x3a, 0x13, 0x3a, 0x06, 0xdf, 0xc6, 0xb1, 0x0e, 0xba, 0xd1,
0x35, 0xca, 0x75, 0xa2, 0x57, 0xfd, 0xff, 0x9b, 0xe2, 0x0d, 0x6c, 0xcf, 0xc7, 0xa8, 0x71, 0x3b,
0x8e, 0xe8, 0x40, 0xed, 0x64, 0x9c, 0xa4, 0xb2, 0xcc, 0xb9, 0x65, 0xb6, 0xd8, 0x05, 0x37, 0xd6,
0x21, 0x0d, 0x1b, 0x44, 0x63, 0xe9, 0xf0, 0xd9, 0xe6, 0xc2, 0xe4, 0x74, 0xf9, 0x3b, 0x78, 0x4f,
0xa2, 0xb2, 0x17, 0xd2, 0x65, 0x66, 0x51, 0x16, 0x2f, 0xe1, 0xf1, 0x50, 0x27, 0x91, 0xd5, 0xc6,
0xc7, 0x34, 0x8b, 0x2d, 0xb3, 0xc0, 0xec, 0x7d, 0x83, 0x2e, 0xef, 0x59, 0xa8, 0x75, 0x18, 0x63,
0x3b, 0xd6, 0x59, 0xbf, 0x63, 0xf4, 0x67, 0x0c, 0xec, 0x99, 0x1a, 0xa2, 0xac, 0x70, 0x64, 0x89,
0x2b, 0x8e, 0xe0, 0xf9, 0x3f, 0x4e, 0x97, 0x86, 0xd2, 0xd5, 0xbd, 0xeb, 0x9e, 0x9f, 0x9d, 0xe2,
0x95, 0x7c, 0xc4, 0xd1, 0xe5, 0x80, 0x68, 0xc1, 0xee, 0x7d, 0xb3, 0x95, 0x05, 0x97, 0x38, 0xfd,
0xed, 0x2a, 0x0f, 0x78, 0x90, 0xa9, 0x37, 0xc0, 0x9d, 0xad, 0x95, 0x70, 0xa1, 0xd4, 0xb5, 0xca,
0xd8, 0xad, 0x82, 0x70, 0x60, 0x8d, 0xe0, 0xd1, 0x56, 0x51, 0x54, 0x26, 0x3b, 0x9a, 0xb2, 0xbc,
0x52, 0xf7, 0xc0, 0xc9, 0xd7, 0x69, 0x82, 0x4c, 0x76, 0x82, 0x60, 0xca, 0x4d, 0xaa, 0x26, 0xd1,
0x55, 0x70, 0x3f, 0x69, 0x7d, 0xca, 0x57, 0x43, 0xfc, 0x1e, 0x38, 0x14, 0x1e, 0xe9, 0x64, 0xba,
0x74, 0x69, 0x16, 0x04, 0xf4, 0xdd, 0xf0, 0x7e, 0x3b, 0x7e, 0xde, 0x36, 0x8f, 0xa0, 0xf2, 0xd1,
0xa8, 0x84, 0x38, 0x63, 0xd1, 0x88, 0x06, 0x38, 0xdc, 0x0e, 0xa8, 0xde, 0xb8, 0xbb, 0xcc, 0x3b,
0x9b, 0xb3, 0x7e, 0x3a, 0xb5, 0x5e, 0x68, 0x3d, 0xb9, 0xf9, 0x55, 0x2b, 0xdc, 0xfc, 0xae, 0x15,
0xbf, 0xd3, 0xf3, 0x93, 0x9e, 0x6f, 0x7f, 0x6a, 0x85, 0xde, 0x3a, 0x7f, 0x62, 0x87, 0x7f, 0x03,
0x00, 0x00, 0xff, 0xff, 0x67, 0xc7, 0x60, 0xa8, 0xa9, 0x03, 0x00, 0x00,
}

View File

@ -20,7 +20,8 @@ message Request {
}
enum Database {
etcd = 0;
ZooKeeper = 1;
etcd2 = 1;
ZooKeeper = 2;
}
Operation operation = 1;

View File

@ -52,7 +52,7 @@ func init() {
}
func init() {
Command.PersistentFlags().StringVarP(&database, "database", "d", "etcd", "'etcd', 'zk'(zookeeper)")
Command.PersistentFlags().StringVarP(&database, "database", "d", "etcd", "etcd, etcd2, zk, consul")
Command.PersistentFlags().StringSliceVar(&endpoints, "endpoints", []string{"10.240.0.9:2181", "10.240.0.10:2181", "10.240.0.14:2181"}, "gRPC endpoints")
Command.PersistentFlags().UintVar(&totalConns, "conns", 1, "Total number of gRPC connections or Zookeeper connections")
Command.PersistentFlags().UintVar(&totalClients, "clients", 1, "Total number of gRPC clients (only for etcd)")

View File

@ -22,7 +22,8 @@ import (
"time"
"github.com/cheggaaa/pb"
v3 "github.com/coreos/etcd/clientv3"
clientv2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/samuel/go-zookeeper/zk"
"github.com/spf13/cobra"
"golang.org/x/net/context"
@ -62,8 +63,14 @@ func init() {
}
type request struct {
etcdOp v3.Op
zkOp zkOp
etcdOp clientv3.Op
etcd2Op etcd2Op
zkOp zkOp
}
type etcd2Op struct {
key string
value string
}
type zkOp struct {
@ -87,7 +94,7 @@ func putFunc(cmd *cobra.Command, args []string) {
bar.Format("Bom !")
bar.Start()
var etcdClients []*v3.Client
var etcdClients []*clientv3.Client
switch database {
case "etcd":
etcdClients = mustCreateClients(totalClients, totalConns)
@ -100,6 +107,14 @@ func putFunc(cmd *cobra.Command, args []string) {
etcdClients[i].Close()
}
}()
case "etcd2":
conns := mustCreateClientsEtcd2(totalConns)
for i := range conns {
wg.Add(1)
go doPutEtcd2(context.Background(), conns[i], requests)
}
case "zk":
conns := mustCreateConnsZk(totalConns)
defer func() {
@ -111,6 +126,7 @@ func putFunc(cmd *cobra.Command, args []string) {
wg.Add(1)
go doPutZk(context.Background(), conns[i], requests)
}
default:
log.Fatalf("unknown database %s", database)
}
@ -131,7 +147,7 @@ func putFunc(cmd *cobra.Command, args []string) {
}
switch database {
case "etcd":
requests <- request{etcdOp: v3.OpPut(string(k), v)}
requests <- request{etcdOp: clientv3.OpPut(string(k), v)}
case "zk":
requests <- request{zkOp: zkOp{key: "/" + string(k), value: []byte(v)}}
}
@ -147,7 +163,7 @@ func putFunc(cmd *cobra.Command, args []string) {
<-pdoneC
}
func doPut(ctx context.Context, client v3.KV, requests <-chan request) {
func doPut(ctx context.Context, client clientv3.KV, requests <-chan request) {
defer wg.Done()
for req := range requests {
@ -164,6 +180,24 @@ func doPut(ctx context.Context, client v3.KV, requests <-chan request) {
}
}
func doPutEtcd2(ctx context.Context, conn clientv2.KeysAPI, requests <-chan request) {
defer wg.Done()
for req := range requests {
op := req.etcd2Op
st := time.Now()
_, err := conn.Set(context.Background(), op.key, op.value, nil)
var errStr string
if err != nil {
errStr = err.Error()
}
results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
bar.Increment()
}
}
func doPutZk(ctx context.Context, conn *zk.Conn, requests <-chan request) {
defer wg.Done()
@ -182,7 +216,7 @@ func doPutZk(ctx context.Context, conn *zk.Conn, requests <-chan request) {
}
}
func compactKV(clients []*v3.Client) {
func compactKV(clients []*clientv3.Client) {
var curRev int64
for _, c := range clients {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)

View File

@ -21,7 +21,8 @@ import (
"time"
"github.com/cheggaaa/pb"
v3 "github.com/coreos/etcd/clientv3"
clientv2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/samuel/go-zookeeper/zk"
"github.com/spf13/cobra"
"golang.org/x/net/context"
@ -61,7 +62,7 @@ func rangeFunc(cmd *cobra.Command, args []string) {
var err error
for i := 0; i < 5; i++ {
clients := mustCreateClients(1, 1)
_, err = clients[0].Do(context.Background(), v3.OpPut(k, string(v)))
_, err = clients[0].Do(context.Background(), clientv3.OpPut(k, string(v)))
if err != nil {
continue
}
@ -73,6 +74,23 @@ func rangeFunc(cmd *cobra.Command, args []string) {
os.Exit(1)
}
case "etcd2":
fmt.Printf("PUT '%s' to etcd2\n", k)
var err error
for i := 0; i < 5; i++ {
clients := mustCreateClientsEtcd2(totalConns)
_, err = clients[0].Set(context.Background(), k, string(v), nil)
if err != nil {
continue
}
fmt.Printf("Done with PUT '%s' to etcd2\n", k)
break
}
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
case "zk":
k = "/" + k
fmt.Printf("PUT '%s' to Zookeeper\n", k)
@ -105,7 +123,7 @@ func rangeFunc(cmd *cobra.Command, args []string) {
}
}
if database == "etcd" {
if database == "etcd" { // etcd2 quorum false by default
if rangeConsistency == "l" {
fmt.Println("bench with linearizable range")
} else if rangeConsistency == "s" {
@ -137,6 +155,14 @@ func rangeFunc(cmd *cobra.Command, args []string) {
clients[i].Close()
}
}()
case "etcd2":
conns := mustCreateClientsEtcd2(totalConns)
for i := range conns {
wg.Add(1)
go doRangeEtcd2(conns[i], requests)
}
case "zk":
conns := mustCreateConnsZk(totalConns)
defer func() {
@ -148,6 +174,7 @@ func rangeFunc(cmd *cobra.Command, args []string) {
wg.Add(1)
go doRangeZk(conns[i], requests)
}
default:
log.Fatalf("unknown database %s", database)
}
@ -157,11 +184,15 @@ func rangeFunc(cmd *cobra.Command, args []string) {
for i := 0; i < rangeTotal; i++ {
switch database {
case "etcd":
opts := []v3.OpOption{v3.WithRange(end)}
opts := []clientv3.OpOption{clientv3.WithRange(end)}
if rangeConsistency == "s" {
opts = append(opts, v3.WithSerializable())
opts = append(opts, clientv3.WithSerializable())
}
requests <- request{etcdOp: v3.OpGet(k, opts...)}
requests <- request{etcdOp: clientv3.OpGet(k, opts...)}
case "etcd2":
requests <- request{etcd2Op: etcd2Op{key: k}}
case "zk":
requests <- request{zkOp: zkOp{key: k}}
}
@ -177,11 +208,12 @@ func rangeFunc(cmd *cobra.Command, args []string) {
<-pdoneC
}
func doRange(client v3.KV, requests <-chan request) {
func doRange(client clientv3.KV, requests <-chan request) {
defer wg.Done()
for req := range requests {
op := req.etcdOp
st := time.Now()
_, err := client.Do(context.Background(), op)
@ -194,11 +226,30 @@ func doRange(client v3.KV, requests <-chan request) {
}
}
func doRangeEtcd2(conn clientv2.KeysAPI, requests <-chan request) {
defer wg.Done()
for req := range requests {
op := req.etcd2Op
st := time.Now()
_, err := conn.Get(context.Background(), op.key, nil)
var errStr string
if err != nil {
errStr = err.Error()
}
results <- result{errStr: errStr, duration: time.Since(st), happened: time.Now()}
bar.Increment()
}
}
func doRangeZk(conn *zk.Conn, requests <-chan request) {
defer wg.Done()
for req := range requests {
op := req.zkOp
st := time.Now()
_, _, err := conn.Get(op.key)

View File

@ -23,6 +23,7 @@ import (
mrand "math/rand"
clientv2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/samuel/go-zookeeper/zk"
)
@ -33,20 +34,6 @@ var (
dialTotal int
)
func mustCreateConnsZk(total uint) []*zk.Conn {
zks := make([]*zk.Conn, total)
for i := range zks {
endpoint := endpoints[dialTotal%len(endpoints)]
dialTotal++
conn, _, err := zk.Connect([]string{endpoint}, time.Second)
if err != nil {
log.Fatal(err)
}
zks[i] = conn
}
return zks
}
func mustCreateConn() *clientv3.Client {
endpoint := endpoints[dialTotal%len(endpoints)]
dialTotal++
@ -72,6 +59,42 @@ func mustCreateClients(totalClients, totalConns uint) []*clientv3.Client {
return clients
}
func mustCreateClientsEtcd2(total uint) []clientv2.KeysAPI {
cks := make([]clientv2.KeysAPI, total)
for i := range cks {
endpoint := endpoints[dialTotal%len(endpoints)]
dialTotal++
cfg := clientv2.Config{
Endpoints: []string{endpoint},
Transport: clientv2.DefaultTransport,
HeaderTimeoutPerRequest: time.Second,
}
c, err := clientv2.New(cfg)
if err != nil {
log.Fatal(err)
}
kapi := clientv2.NewKeysAPI(c)
cks[i] = kapi
}
return cks
}
func mustCreateConnsZk(total uint) []*zk.Conn {
zks := make([]*zk.Conn, total)
for i := range zks {
endpoint := endpoints[dialTotal%len(endpoints)]
dialTotal++
conn, _, err := zk.Connect([]string{endpoint}, time.Second)
if err != nil {
log.Fatal(err)
}
zks[i] = conn
}
return zks
}
func mustRandBytes(n int) []byte {
rb := make([]byte, n)
_, err := rand.Read(rb)

View File

@ -63,7 +63,7 @@ var (
)
func init() {
StartCommand.PersistentFlags().StringVar(&globalFlags.Database, "database", "", "etcd, zookeeper, zk, etcd2, consul.")
StartCommand.PersistentFlags().StringVar(&globalFlags.Database, "database", "", "etcd, etcd2, zookeeper, zk, consul.")
StartCommand.PersistentFlags().StringSliceVar(&globalFlags.AgentEndpoints, "agent-endpoints", []string{""}, "Endpoints to send client requests to, then it automatically configures.")
StartCommand.PersistentFlags().Int64Var(&globalFlags.ZookeeperPreAllocSize, "zk-pre-alloc-size", 65536*1024, "Disk pre-allocation size in bytes.")
StartCommand.PersistentFlags().Int64Var(&globalFlags.ZookeeperMaxClientCnxns, "zk-max-client-conns", 5000, "Maximum number of concurrent Zookeeper connection.")
@ -101,6 +101,8 @@ func CommandFunc(cmd *cobra.Command, args []string) {
switch globalFlags.Database {
case "etcd":
req.Database = agent.Request_etcd
case "etcd2":
req.Database = agent.Request_etcd2
case "zookeeper":
req.Database = agent.Request_ZooKeeper
default: