feat: record flush_wait_ms in TxnInfo (#1342)

* feat: record flush_wait_ms in TxnInfo

Signed-off-by: ekexium <eke@fastmail.com>

* refactor: change FlushWaitDuration() to GetFlushMetrics()

Signed-off-by: ekexium <eke@fastmail.com>

---------

Signed-off-by: ekexium <eke@fastmail.com>
This commit is contained in:
ekexium 2024-05-15 11:13:15 +08:00 committed by GitHub
parent e1ca512cca
commit c40432e3ab
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 0 deletions

View File

@ -55,6 +55,9 @@ type PipelinedMemDB struct {
// Some([]) -> delete
batchGetCache map[string]util.Option[[]byte]
memChangeHook func(uint64)
// metrics
flushWaitDuration time.Duration
}
const (
@ -358,12 +361,14 @@ func (p *PipelinedMemDB) needFlush() bool {
// FlushWait will wait for all flushing tasks are done and return the error if there is a failure.
func (p *PipelinedMemDB) FlushWait() error {
if p.flushingMemDB != nil {
now := time.Now()
err := <-p.errCh
if err != nil {
err = p.handleAlreadyExistErr(err)
}
// cleanup the flushingMemDB so the next call of FlushWait will not wait for the error channel.
p.flushingMemDB = nil
p.flushWaitDuration += time.Since(now)
return err
}
return nil
@ -513,3 +518,10 @@ func (p *PipelinedMemDB) Checkpoint() *MemDBCheckpoint {
func (p *PipelinedMemDB) RevertToCheckpoint(*MemDBCheckpoint) {
panic("RevertToCheckpoint is not supported for PipelinedMemDB")
}
// GetFlushMetrics implements MemBuffer interface.
func (p *PipelinedMemDB) GetFlushMetrics() FlushMetrics {
return FlushMetrics{
WaitDuration: p.flushWaitDuration,
}
}

View File

@ -37,6 +37,7 @@ package unionstore
import (
"context"
"math"
"time"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/kv"
@ -236,6 +237,12 @@ type MemBuffer interface {
Flush(force bool) (bool, error)
// FlushWait waits for the flushing task done and return error.
FlushWait() error
// GetFlushDetails returns the metrics related to flushing
GetFlushMetrics() FlushMetrics
}
type FlushMetrics struct {
WaitDuration time.Duration
}
var (
@ -287,3 +294,6 @@ func (db *MemDBWithContext) BatchGet(ctx context.Context, keys [][]byte) (map[st
}
return m, nil
}
// GetFlushMetrisc implements the MemBuffer interface.
func (db *MemDBWithContext) GetFlushMetrics() FlushMetrics { return FlushMetrics{} }

View File

@ -853,6 +853,7 @@ type TxnInfo struct {
OnePCFallback bool `json:"one_pc_fallback"`
ErrMsg string `json:"error,omitempty"`
Pipelined bool `json:"pipelined"`
FlushWaitMs int64 `json:"flush_wait_ms"`
}
func (txn *KVTxn) onCommitted(err error) {
@ -875,6 +876,7 @@ func (txn *KVTxn) onCommitted(err error) {
AsyncCommitFallback: txn.committer.hasTriedAsyncCommit && !isAsyncCommit,
OnePCFallback: txn.committer.hasTriedOnePC && !isOnePC,
Pipelined: txn.IsPipelined(),
FlushWaitMs: txn.GetMemBuffer().GetFlushMetrics().WaitDuration.Milliseconds(),
}
if err != nil {
info.ErrMsg = err.Error()