From 79a0fc59e4d5012095282344f5fdbca1492de771 Mon Sep 17 00:00:00 2001 From: ekexium Date: Thu, 30 Jun 2022 15:19:23 +0800 Subject: [PATCH] Add a callback when MemDB grows (#520) * implement the footprint chagne hook Signed-off-by: ekexium * simplify the signature Signed-off-by: ekexium * also call the callback when arena shrinks Signed-off-by: ekexium * also callback on reset Signed-off-by: ekexium Co-authored-by: disksing --- internal/unionstore/memdb.go | 14 ++++++++++++++ internal/unionstore/memdb_arena.go | 20 ++++++++++++++++++++ tikv/kv.go | 8 ++++++-- txnkv/transaction/txn.go | 15 +++++++++++++-- 4 files changed, 53 insertions(+), 4 deletions(-) diff --git a/internal/unionstore/memdb.go b/internal/unionstore/memdb.go index 69fa6a58..3f743feb 100644 --- a/internal/unionstore/memdb.go +++ b/internal/unionstore/memdb.go @@ -857,3 +857,17 @@ func (db *MemDB) RemoveFromBuffer(key []byte) { db.size -= len(db.vlog.getValue(x.vptr)) db.deleteNode(x) } + +// SetMemoryFootprintChangeHook sets the hook function that is triggered when memdb grows. +func (db *MemDB) SetMemoryFootprintChangeHook(hook func(uint64)) { + innerHook := func() { + hook(db.allocator.capacity + db.vlog.capacity) + } + db.allocator.memChangeHook = innerHook + db.vlog.memChangeHook = innerHook +} + +// Mem returns the current memory footprint +func (db *MemDB) Mem() uint64 { + return db.allocator.capacity + db.vlog.capacity +} diff --git a/internal/unionstore/memdb_arena.go b/internal/unionstore/memdb_arena.go index ef7dd906..adaca8d3 100644 --- a/internal/unionstore/memdb_arena.go +++ b/internal/unionstore/memdb_arena.go @@ -91,6 +91,10 @@ func (addr *memdbArenaAddr) load(src []byte) { type memdbArena struct { blockSize int blocks []memdbArenaBlock + // the total size of all blocks, also the approximate memory footprint of the arena. + capacity uint64 + // when it enlarges or shrinks, call this function with the current memory footprint (in bytes) + memChangeHook func() } func (a *memdbArena) alloc(size int, align bool) (memdbArenaAddr, []byte) { @@ -123,6 +127,14 @@ func (a *memdbArena) enlarge(allocSize, blockSize int) { a.blocks = append(a.blocks, memdbArenaBlock{ buf: make([]byte, a.blockSize), }) + a.capacity += uint64(a.blockSize) + a.onMemChange() +} + +func (a *memdbArena) onMemChange() { + if a.memChangeHook != nil { + a.memChangeHook() + } } func (a *memdbArena) allocInLastBlock(size int, align bool) (memdbArenaAddr, []byte) { @@ -140,6 +152,8 @@ func (a *memdbArena) reset() { } a.blocks = a.blocks[:0] a.blockSize = 0 + a.capacity = 0 + a.onMemChange() } type memdbArenaBlock struct { @@ -198,6 +212,12 @@ func (a *memdbArena) truncate(snap *MemDBCheckpoint) { a.blocks[len(a.blocks)-1].length = snap.offsetInBlock } a.blockSize = snap.blockSize + + a.capacity = 0 + for _, block := range a.blocks { + a.capacity += uint64(block.length) + } + a.onMemChange() } type nodeAllocator struct { diff --git a/tikv/kv.go b/tikv/kv.go index 476eb5a5..977e83ec 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -250,19 +250,23 @@ func (s *KVStore) runSafePointChecker() { } // Begin a global transaction. -func (s *KVStore) Begin(opts ...TxnOption) (*transaction.KVTxn, error) { +func (s *KVStore) Begin(opts ...TxnOption) (txn *transaction.KVTxn, err error) { options := &transaction.TxnOptions{} // Inject the options for _, opt := range opts { opt(options) } + defer func() { + if err == nil && txn != nil && options.MemoryFootprintChangeHook != nil { + txn.SetMemoryFootprintChangeHook(options.MemoryFootprintChangeHook) + } + }() if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } var ( startTS uint64 - err error ) if options.StartTS != nil { startTS = *options.StartTS diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 62fea02d..c5b638a2 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -80,8 +80,9 @@ type SchemaAmender interface { // TxnOptions indicates the option when beginning a transaction. // TxnOptions are set by the TxnOption values passed to Begin type TxnOptions struct { - TxnScope string - StartTS *uint64 + TxnScope string + StartTS *uint64 + MemoryFootprintChangeHook func(uint64) } // KVTxn contains methods to interact with a TiKV transaction. @@ -884,6 +885,16 @@ func (txn *KVTxn) GetClusterID() uint64 { return txn.store.GetClusterID() } +// SetMemoryFootprintChangeHook sets the hook function that is triggered when memdb grows +func (txn *KVTxn) SetMemoryFootprintChangeHook(hook func(uint64)) { + txn.us.GetMemBuffer().SetMemoryFootprintChangeHook(hook) +} + +// Mem returns the current memory footprint +func (txn *KVTxn) Mem() uint64 { + return txn.us.GetMemBuffer().Mem() +} + // SetRequestSourceInternal sets the scope of the request source. func (txn *KVTxn) SetRequestSourceInternal(internal bool) { txn.RequestSource.SetRequestSourceInternal(internal)