client: replace array by priority queue to support cache queue (#1095)

* impl priority queue

Signed-off-by: bufferflies <1045931706@qq.com>

* replace priority queue

Signed-off-by: bufferflies <1045931706@qq.com>

* fix gosimple

Signed-off-by: bufferflies <1045931706@qq.com>

* remove chinese comment

Signed-off-by: bufferflies <1045931706@qq.com>

---------

Signed-off-by: bufferflies <1045931706@qq.com>
This commit is contained in:
tongjian 2024-01-03 15:32:51 +08:00 committed by GitHub
parent dbea404a84
commit c7d29aafa7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 172 additions and 13 deletions

View File

@ -76,6 +76,11 @@ func (b *batchCommandsEntry) isCanceled() bool {
return atomic.LoadInt32(&b.canceled) == 1
}
// TODO: implement by the request priority.
func (b *batchCommandsEntry) priority() int {
return 0
}
func (b *batchCommandsEntry) error(err error) {
b.err = err
close(b.res)
@ -87,7 +92,7 @@ type batchCommandsBuilder struct {
// Each BatchCommandsRequest_Request sent to a store has a unique identity to
// distinguish its response.
idAlloc uint64
entries []*batchCommandsEntry
entries *PriorityQueue
requests []*tikvpb.BatchCommandsRequest_Request
requestIDs []uint64
// In most cases, there isn't any forwardingReq.
@ -95,11 +100,11 @@ type batchCommandsBuilder struct {
}
func (b *batchCommandsBuilder) len() int {
return len(b.entries)
return b.entries.Len()
}
func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) {
b.entries = append(b.entries, entry)
b.entries.Push(entry)
}
// build builds BatchCommandsRequests and calls collect() for each valid entry.
@ -108,7 +113,8 @@ func (b *batchCommandsBuilder) push(entry *batchCommandsEntry) {
func (b *batchCommandsBuilder) build(
collect func(id uint64, e *batchCommandsEntry),
) (*tikvpb.BatchCommandsRequest, map[string]*tikvpb.BatchCommandsRequest) {
for _, e := range b.entries {
for _, entry := range b.entries.All() {
e := entry.(*batchCommandsEntry)
if e.isCanceled() {
continue
}
@ -140,8 +146,8 @@ func (b *batchCommandsBuilder) build(
}
func (b *batchCommandsBuilder) cancel(e error) {
for _, entry := range b.entries {
entry.error(e)
for _, entry := range b.entries.All() {
entry.(*batchCommandsEntry).error(e)
}
}
@ -152,10 +158,7 @@ func (b *batchCommandsBuilder) reset() {
// The data in the cap part of the slice would reference the prewrite keys whose
// underlying memory is borrowed from memdb. The reference cause GC can't release
// the memdb, leading to serious memory leak problems in the large transaction case.
for i := 0; i < len(b.entries); i++ {
b.entries[i] = nil
}
b.entries = b.entries[:0]
b.entries.Reset()
for i := 0; i < len(b.requests); i++ {
b.requests[i] = nil
}
@ -170,7 +173,7 @@ func (b *batchCommandsBuilder) reset() {
func newBatchCommandsBuilder(maxBatchSize uint) *batchCommandsBuilder {
return &batchCommandsBuilder{
idAlloc: 0,
entries: make([]*batchCommandsEntry, 0, maxBatchSize),
entries: NewPriorityQueue(),
requests: make([]*tikvpb.BatchCommandsRequest_Request, 0, maxBatchSize),
requestIDs: make([]uint64, 0, maxBatchSize),
forwardingReqs: make(map[string]*tikvpb.BatchCommandsRequest),

View File

@ -475,7 +475,7 @@ func TestBatchCommandsBuilder(t *testing.T) {
// Test reset
builder.reset()
assert.Equal(t, builder.len(), 0)
assert.Equal(t, len(builder.entries), 0)
assert.Equal(t, builder.entries.Len(), 0)
assert.Equal(t, len(builder.requests), 0)
assert.Equal(t, len(builder.requestIDs), 0)
assert.Equal(t, len(builder.forwardingReqs), 0)
@ -483,7 +483,6 @@ func TestBatchCommandsBuilder(t *testing.T) {
}
func TestTraceExecDetails(t *testing.T) {
assert.Nil(t, buildSpanInfoFromResp(nil))
assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{}))
assert.Nil(t, buildSpanInfoFromResp(&tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}))

View File

@ -0,0 +1,108 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import "container/heap"
// Item is the interface that all entries in a priority queue must implement.
type Item interface {
priority() int
}
// entry is an entry in a priority queue.
type entry struct {
entry Item
index int
}
// prioritySlice implements heap.Interface and holds Entries.
type prioritySlice []entry
// Len returns the length of the priority queue.
func (ps prioritySlice) Len() int {
return len(ps)
}
// Less compares two entries in the priority queue.
// The higher priority entry is the one with the lower value.
func (ps prioritySlice) Less(i, j int) bool {
return ps[i].entry.priority() > ps[j].entry.priority()
}
// Swap swaps two entries in the priority queue.
func (ps prioritySlice) Swap(i, j int) {
ps[i], ps[j] = ps[j], ps[i]
ps[i].index = i
ps[j].index = j
}
// Push adds an entry to the priority queue.
func (ps *prioritySlice) Push(x interface{}) {
item := x.(entry)
item.index = len(*ps)
*ps = append(*ps, item)
}
// Pop removes the highest priority entry from the priority queue.
func (ps *prioritySlice) Pop() interface{} {
old := *ps
n := len(old)
item := old[n-1]
item.index = -1
*ps = old[0 : n-1]
return item
}
// PriorityQueue is a priority queue.
type PriorityQueue struct {
ps prioritySlice
}
// NewPriorityQueue creates a new priority queue.
func NewPriorityQueue() *PriorityQueue {
return &PriorityQueue{}
}
// Len returns the length of the priority queue.
func (pq *PriorityQueue) Len() int {
return pq.ps.Len()
}
// Push adds an entry to the priority queue.
func (pq *PriorityQueue) Push(item Item) {
heap.Push(&pq.ps, entry{entry: item})
}
// Pop removes the highest priority entry from the priority queue.
func (pq *PriorityQueue) Pop() Item {
return heap.Pop(&pq.ps).(entry).entry
}
// All returns all entries in the priority queue not ensure the priority.
func (pq *PriorityQueue) All() []Item {
items := make([]Item, 0, pq.Len())
for i := 0; i < pq.Len(); i++ {
items = append(items, pq.ps[i].entry)
}
return items
}
// Reset resets the priority queue.
func (pq *PriorityQueue) Reset() {
for i := 0; i < pq.Len(); i++ {
pq.ps[i].entry = nil
}
pq.ps = pq.ps[:0]
}

View File

@ -0,0 +1,49 @@
// Copyright 2023 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package client
import (
"testing"
"github.com/stretchr/testify/require"
)
type FakeItem struct {
pri int
value int
}
func (f *FakeItem) priority() int {
return f.pri
}
func TestPriority(t *testing.T) {
re := require.New(t)
pq := NewPriorityQueue()
for i := 1; i <= 5; i++ {
pq.Push(&FakeItem{value: i, pri: i})
}
re.Equal(5, pq.Len())
arr := pq.All()
re.Len(arr, 5)
pq.Reset()
re.Equal(0, pq.Len())
for i := 1; i <= 5; i++ {
pq.Push(&FakeItem{value: i, pri: i})
}
for i := pq.Len(); i > 0; i-- {
re.Equal(i, pq.Pop().(*FakeItem).value)
}
}