From c7d29aafa7073f64555532d0d10f19714bc8e6c2 Mon Sep 17 00:00:00 2001 From: tongjian <1045931706@qq.com> Date: Wed, 3 Jan 2024 15:32:51 +0800 Subject: [PATCH] client: replace array by priority queue to support cache queue (#1095) * impl priority queue Signed-off-by: bufferflies <1045931706@qq.com> * replace priority queue Signed-off-by: bufferflies <1045931706@qq.com> * fix gosimple Signed-off-by: bufferflies <1045931706@qq.com> * remove chinese comment Signed-off-by: bufferflies <1045931706@qq.com> --------- Signed-off-by: bufferflies <1045931706@qq.com> --- internal/client/client_batch.go | 25 +++--- internal/client/client_test.go | 3 +- internal/client/priority_queue.go | 108 +++++++++++++++++++++++++ internal/client/priority_queue_test.go | 49 +++++++++++ 4 files changed, 172 insertions(+), 13 deletions(-) create mode 100644 internal/client/priority_queue.go create mode 100644 internal/client/priority_queue_test.go diff --git a/internal/client/client_batch.go b/internal/client/client_batch.go index 9802350b..a9956cd1 100644 --- a/internal/client/client_batch.go +++ b/internal/client/client_batch.go @@ -76,6 +76,11 @@ func (b *batchCommandsEntry) isCanceled() bool { return atomic.LoadInt32(&b.canceled) == 1 } +// TODO: implement by the request priority. +func (b *batchCommandsEntry) priority() int { + return 0 +} + func (b *batchCommandsEntry) error(err error) { b.err = err close(b.res) @@ -87,7 +92,7 @@ type batchCommandsBuilder struct { // Each BatchCommandsRequest_Request sent to a store has a unique identity to // distinguish its response. idAlloc uint64 - entries []*batchCommandsEntry + entries *PriorityQueue requests []*tikvpb.BatchCommandsRequest_Request requestIDs []uint64 // In most cases, there isn't any forwardingReq. @@ -95,11 +100,11 @@ type batchCommandsBuilder struct { } func (b *batchCommandsBuilder) len() int { - return len(b.entries) + return b.entries.Len() } func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) { - b.entries = append(b.entries, entry) + b.entries.Push(entry) } // build builds BatchCommandsRequests and calls collect() for each valid entry. @@ -108,7 +113,8 @@ func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) { func (b *batchCommandsBuilder) build( collect func(id uint64, e *batchCommandsEntry), ) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) { - for _, e := range b.entries { + for _, entry := range b.entries.All() { + e := entry.(*batchCommandsEntry) if e.isCanceled() { continue } @@ -140,8 +146,8 @@ func (b *batchCommandsBuilder) build( } func (b *batchCommandsBuilder) cancel(e error) { - for _, entry := range b.entries { - entry.error(e) + for _, entry := range b.entries.All() { + entry.(*batchCommandsEntry).error(e) } } @@ -152,10 +158,7 @@ func (b *batchCommandsBuilder) reset() { // The data in the cap part of the slice would reference the prewrite keys whose // underlying memory is borrowed from memdb. The reference cause GC can't release // the memdb, leading to serious memory leak problems in the large transaction case. - for i := 0; i < len(b.entries); i++ { - b.entries[i] = nil - } - b.entries = b.entries[:0] + b.entries.Reset() for i := 0; i < len(b.requests); i++ { b.requests[i] = nil } @@ -170,7 +173,7 @@ func (b *batchCommandsBuilder) reset() { func newBatchCommandsBuilder(maxBatchSize uint) *batchCommandsBuilder { return &batchCommandsBuilder{ idAlloc: 0, - entries: make([]*batchCommandsEntry, 0, maxBatchSize), + entries: NewPriorityQueue(), requests: make([]*tikvpb.BatchCommandsRequest_Request, 0, maxBatchSize), requestIDs: make([]uint64, 0, maxBatchSize), forwardingReqs: make(map[string]*tikvpb.BatchCommandsRequest), diff --git a/internal/client/client_test.go b/internal/client/client_test.go index e107bb90..f5c26df3 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -475,7 +475,7 @@ func TestBatchCommandsBuilder(t *testing.T) { // Test reset builder.reset() assert.Equal(t, builder.len(), 0) - assert.Equal(t, len(builder.entries), 0) + assert.Equal(t, builder.entries.Len(), 0) assert.Equal(t, len(builder.requests), 0) assert.Equal(t, len(builder.requestIDs), 0) assert.Equal(t, len(builder.forwardingReqs), 0) @@ -483,7 +483,6 @@ func TestBatchCommandsBuilder(t *testing.T) { } func TestTraceExecDetails(t *testing.T) { - assert.Nil(t, buildSpanInfoFromResp(nil)) assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{})) assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}})) diff --git a/internal/client/priority_queue.go b/internal/client/priority_queue.go new file mode 100644 index 00000000..f70afa10 --- /dev/null +++ b/internal/client/priority_queue.go @@ -0,0 +1,108 @@ +// Copyright 2023 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import "container/heap" + +// Item is the interface that all entries in a priority queue must implement. +type Item interface { + priority() int +} + +// entry is an entry in a priority queue. +type entry struct { + entry Item + index int +} + +// prioritySlice implements heap.Interface and holds Entries. +type prioritySlice []entry + +// Len returns the length of the priority queue. +func (ps prioritySlice) Len() int { + return len(ps) +} + +// Less compares two entries in the priority queue. +// The higher priority entry is the one with the lower value. +func (ps prioritySlice) Less(i, j int) bool { + return ps[i].entry.priority() > ps[j].entry.priority() +} + +// Swap swaps two entries in the priority queue. +func (ps prioritySlice) Swap(i, j int) { + ps[i], ps[j] = ps[j], ps[i] + ps[i].index = i + ps[j].index = j +} + +// Push adds an entry to the priority queue. +func (ps *prioritySlice) Push(x interface{}) { + item := x.(entry) + item.index = len(*ps) + *ps = append(*ps, item) +} + +// Pop removes the highest priority entry from the priority queue. +func (ps *prioritySlice) Pop() interface{} { + old := *ps + n := len(old) + item := old[n-1] + item.index = -1 + *ps = old[0 : n-1] + return item +} + +// PriorityQueue is a priority queue. +type PriorityQueue struct { + ps prioritySlice +} + +// NewPriorityQueue creates a new priority queue. +func NewPriorityQueue() *PriorityQueue { + return &PriorityQueue{} +} + +// Len returns the length of the priority queue. +func (pq *PriorityQueue) Len() int { + return pq.ps.Len() +} + +// Push adds an entry to the priority queue. +func (pq *PriorityQueue) Push(item Item) { + heap.Push(&pq.ps, entry{entry: item}) +} + +// Pop removes the highest priority entry from the priority queue. +func (pq *PriorityQueue) Pop() Item { + return heap.Pop(&pq.ps).(entry).entry +} + +// All returns all entries in the priority queue not ensure the priority. +func (pq *PriorityQueue) All() []Item { + items := make([]Item, 0, pq.Len()) + for i := 0; i < pq.Len(); i++ { + items = append(items, pq.ps[i].entry) + } + return items +} + +// Reset resets the priority queue. +func (pq *PriorityQueue) Reset() { + for i := 0; i < pq.Len(); i++ { + pq.ps[i].entry = nil + } + pq.ps = pq.ps[:0] +} diff --git a/internal/client/priority_queue_test.go b/internal/client/priority_queue_test.go new file mode 100644 index 00000000..e249155c --- /dev/null +++ b/internal/client/priority_queue_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 TiKV Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +type FakeItem struct { + pri int + value int +} + +func (f *FakeItem) priority() int { + return f.pri +} + +func TestPriority(t *testing.T) { + re := require.New(t) + pq := NewPriorityQueue() + for i := 1; i <= 5; i++ { + pq.Push(&FakeItem{value: i, pri: i}) + } + re.Equal(5, pq.Len()) + arr := pq.All() + re.Len(arr, 5) + pq.Reset() + re.Equal(0, pq.Len()) + for i := 1; i <= 5; i++ { + pq.Push(&FakeItem{value: i, pri: i}) + } + for i := pq.Len(); i > 0; i-- { + re.Equal(i, pq.Pop().(*FakeItem).value) + } +}