Expose etcd client latency metrics
Kubernetes-commit: d5c9ad80499a9148a40b8a6c33c165cf12578649
This commit is contained in:
parent
d1e178ce07
commit
1fd6bb259f
|
|
@ -38,6 +38,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd"
|
"k8s.io/apiserver/pkg/storage/etcd"
|
||||||
|
"k8s.io/apiserver/pkg/storage/etcd/metrics"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
utiltrace "k8s.io/utils/trace"
|
utiltrace "k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
|
|
@ -111,7 +112,9 @@ func (s *store) Versioner() storage.Versioner {
|
||||||
// Get implements storage.Interface.Get.
|
// Get implements storage.Interface.Get.
|
||||||
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
|
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
|
||||||
key = path.Join(s.pathPrefix, key)
|
key = path.Join(s.pathPrefix, key)
|
||||||
|
startTime := time.Now()
|
||||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||||
|
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -156,11 +159,13 @@ func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object,
|
||||||
return storage.NewInternalError(err.Error())
|
return storage.NewInternalError(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
startTime := time.Now()
|
||||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||||
notFound(key),
|
notFound(key),
|
||||||
).Then(
|
).Then(
|
||||||
clientv3.OpPut(key, string(newData), opts...),
|
clientv3.OpPut(key, string(newData), opts...),
|
||||||
).Commit()
|
).Commit()
|
||||||
|
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -191,10 +196,12 @@ func (s *store) Delete(ctx context.Context, key string, out runtime.Object, prec
|
||||||
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
|
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
|
||||||
// We need to do get and delete in single transaction in order to
|
// We need to do get and delete in single transaction in order to
|
||||||
// know the value and revision before deleting it.
|
// know the value and revision before deleting it.
|
||||||
|
startTime := time.Now()
|
||||||
txnResp, err := s.client.KV.Txn(ctx).If().Then(
|
txnResp, err := s.client.KV.Txn(ctx).If().Then(
|
||||||
clientv3.OpGet(key),
|
clientv3.OpGet(key),
|
||||||
clientv3.OpDelete(key),
|
clientv3.OpDelete(key),
|
||||||
).Commit()
|
).Commit()
|
||||||
|
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -212,7 +219,9 @@ func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
|
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
|
||||||
|
startTime := time.Now()
|
||||||
getResp, err := s.client.KV.Get(ctx, key)
|
getResp, err := s.client.KV.Get(ctx, key)
|
||||||
|
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -224,6 +233,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
||||||
if err := preconditions.Check(key, origState.obj); err != nil {
|
if err := preconditions.Check(key, origState.obj); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
startTime := time.Now()
|
||||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||||
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
||||||
).Then(
|
).Then(
|
||||||
|
|
@ -231,6 +241,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
||||||
).Else(
|
).Else(
|
||||||
clientv3.OpGet(key),
|
clientv3.OpGet(key),
|
||||||
).Commit()
|
).Commit()
|
||||||
|
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -247,7 +258,7 @@ func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.O
|
||||||
func (s *store) GuaranteedUpdate(
|
func (s *store) GuaranteedUpdate(
|
||||||
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
|
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
|
||||||
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
||||||
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String()))
|
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", getTypeName(out)))
|
||||||
defer trace.LogIfLong(500 * time.Millisecond)
|
defer trace.LogIfLong(500 * time.Millisecond)
|
||||||
|
|
||||||
v, err := conversion.EnforcePtr(out)
|
v, err := conversion.EnforcePtr(out)
|
||||||
|
|
@ -257,7 +268,9 @@ func (s *store) GuaranteedUpdate(
|
||||||
key = path.Join(s.pathPrefix, key)
|
key = path.Join(s.pathPrefix, key)
|
||||||
|
|
||||||
getCurrentState := func() (*objState, error) {
|
getCurrentState := func() (*objState, error) {
|
||||||
|
startTime := time.Now()
|
||||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||||
|
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -339,6 +352,7 @@ func (s *store) GuaranteedUpdate(
|
||||||
}
|
}
|
||||||
trace.Step("Transaction prepared")
|
trace.Step("Transaction prepared")
|
||||||
|
|
||||||
|
startTime := time.Now()
|
||||||
txnResp, err := s.client.KV.Txn(ctx).If(
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
||||||
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
||||||
).Then(
|
).Then(
|
||||||
|
|
@ -346,6 +360,7 @@ func (s *store) GuaranteedUpdate(
|
||||||
).Else(
|
).Else(
|
||||||
clientv3.OpGet(key),
|
clientv3.OpGet(key),
|
||||||
).Commit()
|
).Commit()
|
||||||
|
metrics.RecordEtcdRequestLatency("update", getTypeName(out), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -379,7 +394,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
|
||||||
}
|
}
|
||||||
|
|
||||||
key = path.Join(s.pathPrefix, key)
|
key = path.Join(s.pathPrefix, key)
|
||||||
|
startTime := time.Now()
|
||||||
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
||||||
|
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -399,7 +416,9 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
|
||||||
|
|
||||||
func (s *store) Count(key string) (int64, error) {
|
func (s *store) Count(key string) (int64, error) {
|
||||||
key = path.Join(s.pathPrefix, key)
|
key = path.Join(s.pathPrefix, key)
|
||||||
|
startTime := time.Now()
|
||||||
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
|
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
|
||||||
|
metrics.RecordEtcdRequestLatency("listWithCount", key, startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
@ -554,7 +573,9 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||||
var lastKey []byte
|
var lastKey []byte
|
||||||
var hasMore bool
|
var hasMore bool
|
||||||
for {
|
for {
|
||||||
|
startTime := time.Now()
|
||||||
getResp, err := s.client.KV.Get(ctx, key, options...)
|
getResp, err := s.client.KV.Get(ctx, key, options...)
|
||||||
|
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
||||||
}
|
}
|
||||||
|
|
@ -786,3 +807,8 @@ func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.Selec
|
||||||
func notFound(key string) clientv3.Cmp {
|
func notFound(key string) clientv3.Cmp {
|
||||||
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
|
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getTypeName returns type name of an object for reporting purposes.
|
||||||
|
func getTypeName(obj interface{}) string {
|
||||||
|
return reflect.TypeOf(obj).String()
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue