feat: replace sortedList with sortedUniqueList (#793)

* feat: replace sortedList with sortedUniqueList

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2021-11-15 10:42:45 +08:00
parent c92787d0f7
commit f310425962
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
22 changed files with 2718 additions and 558 deletions

View File

@ -51,6 +51,6 @@ type GetCDNClustersQuery struct {
} }
type CDNClusterConfig struct { type CDNClusterConfig struct {
LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1"` LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"`
NetTopology string `yaml:"netTopology" mapstructure:"netTopology" json:"net_topology"` NetTopology string `yaml:"netTopology" mapstructure:"netTopology" json:"net_topology"`
} }

View File

@ -57,7 +57,7 @@ type SchedulerClusterConfig struct {
} }
type SchedulerClusterClientConfig struct { type SchedulerClusterClientConfig struct {
LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1"` LoadLimit uint `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit" binding:"omitempty,gte=1,lte=5000"`
} }
type SchedulerClusterScopes struct { type SchedulerClusterScopes struct {

View File

@ -0,0 +1,48 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: d7y.io/dragonfly/v2/pkg/container/list (interfaces: Item)
// Package mocks is a generated GoMock package.
package mocks
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
)
// MockItem is a mock of Item interface.
type MockItem struct {
ctrl *gomock.Controller
recorder *MockItemMockRecorder
}
// MockItemMockRecorder is the mock recorder for MockItem.
type MockItemMockRecorder struct {
mock *MockItem
}
// NewMockItem creates a new mock instance.
func NewMockItem(ctrl *gomock.Controller) *MockItem {
mock := &MockItem{ctrl: ctrl}
mock.recorder = &MockItemMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockItem) EXPECT() *MockItemMockRecorder {
return m.recorder
}
// SortedValue mocks base method.
func (m *MockItem) SortedValue() int {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SortedValue")
ret0, _ := ret[0].(int)
return ret0
}
// SortedValue indicates an expected call of SortedValue.
func (mr *MockItemMockRecorder) SortedValue() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SortedValue", reflect.TypeOf((*MockItem)(nil).SortedValue))
}

View File

@ -0,0 +1,137 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination ./mocks/list_mock.go -package mocks d7y.io/dragonfly/v2/pkg/container/list Item
package list
import (
"container/list"
"sync"
)
type Item interface {
SortedValue() int
}
type SortedList interface {
Len() int
Insert(Item)
Remove(Item)
Contains(Item) bool
Range(func(Item) bool)
ReverseRange(fn func(Item) bool)
}
type sortedList struct {
mu *sync.RWMutex
container *list.List
}
func NewSortedList() SortedList {
return &sortedList{
mu: &sync.RWMutex{},
container: list.New(),
}
}
func (l *sortedList) Len() int {
l.mu.RLock()
defer l.mu.RUnlock()
return l.container.Len()
}
func (l *sortedList) Insert(item Item) {
l.mu.Lock()
defer l.mu.Unlock()
for e := l.container.Front(); e != nil; e = e.Next() {
v, ok := e.Value.(Item)
if !ok {
continue
}
if v.SortedValue() >= item.SortedValue() {
l.container.InsertBefore(item, e)
return
}
}
l.container.PushBack(item)
}
func (l *sortedList) Remove(item Item) {
l.mu.Lock()
defer l.mu.Unlock()
for e := l.container.Front(); e != nil; e = e.Next() {
v, ok := e.Value.(Item)
if !ok {
continue
}
if v == item {
l.container.Remove(e)
return
}
}
}
func (l *sortedList) Contains(item Item) bool {
l.mu.RLock()
defer l.mu.RUnlock()
for e := l.container.Front(); e != nil; e = e.Next() {
if v, ok := e.Value.(Item); ok && v == item {
return true
}
}
return false
}
func (l *sortedList) Range(fn func(Item) bool) {
l.mu.RLock()
defer l.mu.RUnlock()
for e := l.container.Front(); e != nil; e = e.Next() {
v, ok := e.Value.(Item)
if !ok {
continue
}
if !fn(v) {
return
}
}
}
func (l *sortedList) ReverseRange(fn func(Item) bool) {
l.mu.RLock()
defer l.mu.RUnlock()
for e := l.container.Back(); e != nil; e = e.Prev() {
v, ok := e.Value.(Item)
if !ok {
continue
}
if !fn(v) {
return
}
}
}

View File

@ -0,0 +1,766 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package list
import (
"math/rand"
"runtime"
"sync"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"d7y.io/dragonfly/v2/pkg/container/list/mocks"
)
const N = 1000
func TestSortedListInsert(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, l SortedList, items ...Item)
}{
{
name: "insert values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
assert.Equal(l.Contains(items[0]), true)
assert.Equal(l.Len(), 1)
},
},
{
name: "insert multi value succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Contains(items[0]), true)
assert.Equal(l.Contains(items[1]), true)
assert.Equal(l.Len(), 2)
},
},
{
name: "insert same values",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(2),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[0])
assert.Equal(l.Contains(items[0]), true)
assert.Equal(l.Len(), 2)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedListInsert_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes()
l := NewSortedList()
nums := rand.Perm(N)
var wg sync.WaitGroup
wg.Add(len(nums))
for i := 0; i < len(nums); i++ {
go func(i int) {
l.Insert(mockItem)
wg.Done()
}(i)
}
wg.Wait()
count := 0
l.Range(func(item Item) bool {
count++
return true
})
if count != len(nums) {
t.Errorf("SortedList is missing element")
}
}
func TestSortedListRemove(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, l SortedList, items ...Item)
}{
{
name: "remove values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Contains(items[0]), true)
assert.Equal(l.Contains(items[1]), true)
assert.Equal(l.Len(), 2)
l.Remove(items[0])
assert.Equal(l.Contains(items[0]), false)
assert.Equal(l.Len(), 1)
l.Remove(items[1])
assert.Equal(l.Contains(items[1]), false)
assert.Equal(l.Len(), 0)
},
},
{
name: "remove value dost not exits",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
assert.Equal(l.Contains(items[0]), true)
assert.Equal(l.Len(), 1)
l.Remove(items[1])
assert.Equal(l.Contains(items[0]), true)
assert.Equal(l.Len(), 1)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedListRemove_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes()
l := NewSortedList()
nums := rand.Perm(N)
for i := 0; i < len(nums); i++ {
l.Insert(mockItem)
}
var wg sync.WaitGroup
wg.Add(len(nums))
for i := 0; i < len(nums); i++ {
go func(i int) {
l.Remove(mockItem)
wg.Done()
}(i)
}
wg.Wait()
if l.Len() != 0 {
t.Errorf("SortedList is redundant elements")
}
}
func TestSortedListContains(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, l SortedList, items ...Item)
}{
{
name: "contains values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Contains(items[0]), true)
assert.Equal(l.Contains(items[1]), true)
},
},
{
name: "contains value dost not exits",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
assert.Equal(l.Contains(items[1]), false)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedListContains_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes()
l := NewSortedList()
nums := rand.Perm(N)
for range nums {
l.Insert(mockItem)
}
var wg sync.WaitGroup
for range nums {
wg.Add(1)
go func() {
if ok := l.Contains(mockItem); !ok {
t.Error("SortedList contains error")
}
wg.Done()
}()
}
wg.Wait()
}
func TestSortedListLen(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, l SortedList, items ...Item)
}{
{
name: "get length succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Len(), 2)
},
},
{
name: "get empty list length",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
assert.Equal(l.Len(), 0)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedListLen_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes()
l := NewSortedList()
var wg sync.WaitGroup
wg.Add(1)
go func() {
elems := l.Len()
for i := 0; i < N; i++ {
newElems := l.Len()
if newElems < elems {
t.Errorf("Len shrunk from %v to %v", elems, newElems)
}
}
wg.Done()
}()
for i := 0; i < N; i++ {
l.Insert(mockItem)
}
wg.Wait()
}
func TestSortedListRange(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, l SortedList, items ...Item)
}{
{
name: "range succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Len(), 2)
i := 0
l.Range(func(item Item) bool {
assert.Equal(item, items[i])
i++
return true
})
},
},
{
name: "range multi values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
for i := range m {
m[i].SortedValue().Return(i).AnyTimes()
}
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
for _, item := range items {
l.Insert(item)
}
assert.Equal(l.Len(), 10)
i := 0
l.Range(func(item Item) bool {
assert.Equal(item, items[i])
i++
return true
})
},
},
{
name: "range stoped",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Len(), 2)
l.Range(func(item Item) bool {
assert.Equal(item, items[0])
return false
})
},
},
{
name: "range same values",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).AnyTimes(),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[0])
l.Insert(items[0])
assert.Equal(l.Len(), 3)
count := 0
l.Range(func(item Item) bool {
assert.Equal(item, items[0])
count++
return true
})
assert.Equal(count, 3)
},
},
{
name: "range empty list",
mock: func(m ...*mocks.MockItemMockRecorder) {
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
count := 0
l.Range(func(item Item) bool {
count++
return true
})
assert.Equal(count, 0)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
var mockItems []Item
var mockItemRecorders []*mocks.MockItemMockRecorder
for i := 0; i < 10; i++ {
mockItem := mocks.NewMockItem(ctl)
mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT())
mockItems = append(mockItems, mockItem)
}
tc.mock(mockItemRecorders...)
tc.expect(t, NewSortedList(), mockItems...)
})
}
}
func TestSortedListRange_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes()
l := NewSortedList()
var wg sync.WaitGroup
wg.Add(1)
go func() {
i := 0
l.Range(func(_ Item) bool {
i++
return true
})
j := 0
l.Range(func(_ Item) bool {
j++
return true
})
if j < i {
t.Errorf("Values shrunk from %v to %v", i, j)
}
wg.Done()
}()
for i := 0; i < N; i++ {
l.Insert(mockItem)
}
wg.Wait()
}
func TestSortedListReverseRange(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, l SortedList, items ...Item)
}{
{
name: "reverse range succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Len(), 2)
i := 0
l.ReverseRange(func(item Item) bool {
assert.Equal(item, items[i])
i++
return true
})
},
},
{
name: "reverse range multi values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
for i := range m {
m[i].SortedValue().Return(i).AnyTimes()
}
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
for _, item := range items {
l.Insert(item)
}
assert.Equal(l.Len(), 10)
i := 9
l.ReverseRange(func(item Item) bool {
assert.Equal(item, items[i])
i--
return true
})
},
},
{
name: "reverse range stoped",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[1])
assert.Equal(l.Len(), 2)
l.ReverseRange(func(item Item) bool {
assert.Equal(item, items[1])
return false
})
},
},
{
name: "reverse range same values",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).AnyTimes(),
)
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
l.Insert(items[0])
l.Insert(items[0])
l.Insert(items[0])
assert.Equal(l.Len(), 3)
count := 0
l.ReverseRange(func(item Item) bool {
assert.Equal(item, items[0])
count++
return true
})
assert.Equal(count, 3)
},
},
{
name: "reverse range empty list",
mock: func(m ...*mocks.MockItemMockRecorder) {
},
expect: func(t *testing.T, l SortedList, items ...Item) {
assert := assert.New(t)
count := 0
l.ReverseRange(func(item Item) bool {
count++
return true
})
assert.Equal(count, 0)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
var mockItems []Item
var mockItemRecorders []*mocks.MockItemMockRecorder
for i := 0; i < 10; i++ {
mockItem := mocks.NewMockItem(ctl)
mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT())
mockItems = append(mockItems, mockItem)
}
tc.mock(mockItemRecorders...)
tc.expect(t, NewSortedList(), mockItems...)
})
}
}
func TestSortedListReverseRange_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return rand.Intn(N) }).AnyTimes()
l := NewSortedList()
var wg sync.WaitGroup
wg.Add(1)
go func() {
i := 0
l.ReverseRange(func(_ Item) bool {
i++
return true
})
j := 0
l.ReverseRange(func(_ Item) bool {
j++
return true
})
if j < i {
t.Errorf("Values shrunk from %v to %v", i, j)
}
wg.Done()
}()
for i := 0; i < N; i++ {
l.Insert(mockItem)
}
wg.Wait()
}
type item struct{ id int }
func (i *item) SortedValue() int { return rand.Intn(1000) }
func BenchmarkSortedListInsert(b *testing.B) {
l := NewSortedList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItems = append(mockItems, &item{id: i})
}
b.ResetTimer()
for _, mockItem := range mockItems {
l.Insert(mockItem)
}
}
func BenchmarkSortedListRemove(b *testing.B) {
l := NewSortedList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItems = append(mockItems, &item{id: i})
}
for _, mockItem := range mockItems {
l.Insert(mockItem)
}
b.ResetTimer()
for _, mockItem := range mockItems {
l.Remove(mockItem)
}
}
func BenchmarkSortedListContains(b *testing.B) {
l := NewSortedList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItems = append(mockItems, &item{id: i})
}
for _, mockItem := range mockItems {
l.Insert(mockItem)
}
b.ResetTimer()
for _, mockItem := range mockItems {
l.Contains(mockItem)
}
}
func BenchmarkSortedListRange(b *testing.B) {
l := NewSortedList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItems = append(mockItems, &item{id: i})
}
for _, mockItem := range mockItems {
l.Insert(mockItem)
}
b.ResetTimer()
l.Range(func(_ Item) bool { return true })
}
func BenchmarkSortedListReverseRange(b *testing.B) {
l := NewSortedList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItems = append(mockItems, &item{id: i})
}
for _, mockItem := range mockItems {
l.Insert(mockItem)
}
b.ResetTimer()
l.ReverseRange(func(_ Item) bool { return true })
}

View File

@ -0,0 +1,106 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package list
import (
"sync"
"d7y.io/dragonfly/v2/pkg/container/set"
)
type SortedUniqueList interface {
Len() int
Insert(Item)
Remove(Item)
Contains(Item) bool
Range(func(Item) bool)
ReverseRange(fn func(Item) bool)
}
type sortedUniqueList struct {
mu *sync.RWMutex
container SortedList
data set.Set
}
func NewSortedUniqueList() SortedUniqueList {
return &sortedUniqueList{
mu: &sync.RWMutex{},
container: NewSortedList(),
data: set.New(),
}
}
func (ul *sortedUniqueList) Len() int {
ul.mu.RLock()
defer ul.mu.RUnlock()
return ul.container.Len()
}
func (ul *sortedUniqueList) Insert(item Item) {
ul.mu.Lock()
defer ul.mu.Unlock()
if ok := ul.data.Contains(item); ok {
ul.container.Remove(item)
ul.container.Insert(item)
return
}
ul.data.Add(item)
ul.container.Insert(item)
}
func (ul *sortedUniqueList) Remove(item Item) {
ul.mu.Lock()
defer ul.mu.Unlock()
ul.data.Delete(item)
ul.container.Remove(item)
}
func (ul *sortedUniqueList) Contains(item Item) bool {
ul.mu.RLock()
defer ul.mu.RUnlock()
return ul.data.Contains(item)
}
func (ul *sortedUniqueList) Range(fn func(item Item) bool) {
ul.mu.RLock()
defer ul.mu.RUnlock()
ul.container.Range(func(item Item) bool {
if !fn(item) {
return false
}
return true
})
}
func (ul *sortedUniqueList) ReverseRange(fn func(item Item) bool) {
ul.mu.RLock()
defer ul.mu.RUnlock()
ul.container.ReverseRange(func(item Item) bool {
if !fn(item) {
return false
}
return true
})
}

View File

@ -0,0 +1,786 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package list
import (
"math/rand"
"runtime"
"sync"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"d7y.io/dragonfly/v2/pkg/container/list/mocks"
)
func TestSortedUniqueListInsert(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, ul SortedUniqueList, items ...Item)
}{
{
name: "insert values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
assert.Equal(ul.Contains(items[0]), true)
assert.Equal(ul.Len(), 1)
},
},
{
name: "insert multi value succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Contains(items[0]), true)
assert.Equal(ul.Contains(items[1]), true)
assert.Equal(ul.Len(), 2)
},
},
{
name: "insert same values",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[0])
assert.Equal(ul.Contains(items[0]), true)
assert.Equal(ul.Len(), 1)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedUniqueListInsert_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
ul := NewSortedUniqueList()
nums := rand.Perm(N)
var mockItems []Item
for _, v := range nums {
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes()
mockItems = append(mockItems, mockItem)
}
var wg sync.WaitGroup
wg.Add(len(mockItems))
for _, mockItem := range mockItems {
go func(item Item) {
ul.Insert(item)
wg.Done()
}(mockItem)
}
wg.Wait()
count := 0
ul.Range(func(item Item) bool {
count++
return true
})
if count != len(nums) {
t.Errorf("SortedUniqueList is missing element")
}
}
func TestSortedUniqueListRemove(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, ul SortedUniqueList, items ...Item)
}{
{
name: "remove values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Contains(items[0]), true)
assert.Equal(ul.Contains(items[1]), true)
assert.Equal(ul.Len(), 2)
ul.Remove(items[0])
assert.Equal(ul.Contains(items[0]), false)
assert.Equal(ul.Len(), 1)
ul.Remove(items[1])
assert.Equal(ul.Contains(items[1]), false)
assert.Equal(ul.Len(), 0)
},
},
{
name: "remove value dost not exits",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
assert.Equal(ul.Contains(items[0]), true)
assert.Equal(ul.Len(), 1)
ul.Remove(items[1])
assert.Equal(ul.Contains(items[0]), true)
assert.Equal(ul.Len(), 1)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedUniqueListRemove_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
ul := NewSortedUniqueList()
nums := rand.Perm(N)
var mockItems []Item
for _, v := range nums {
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes()
mockItems = append(mockItems, mockItem)
ul.Insert(mockItem)
}
var wg sync.WaitGroup
wg.Add(len(mockItems))
for _, mockItem := range mockItems {
go func(item Item) {
ul.Remove(item)
wg.Done()
}(mockItem)
}
wg.Wait()
if ul.Len() != 0 {
t.Errorf("SortedUniqueList is redundant elements")
}
}
func TestSortedUniqueListContains(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, ul SortedUniqueList, items ...Item)
}{
{
name: "contains values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Contains(items[0]), true)
assert.Equal(ul.Contains(items[1]), true)
},
},
{
name: "contains value dost not exits",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
assert.Equal(ul.Contains(items[1]), false)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedUniqueListContains_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
ul := NewSortedUniqueList()
nums := rand.Perm(N)
var mockItems []Item
for _, v := range nums {
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes()
mockItems = append(mockItems, mockItem)
ul.Insert(mockItem)
}
var wg sync.WaitGroup
wg.Add(len(mockItems))
for _, mockItem := range mockItems {
go func(item Item) {
if ok := ul.Contains(item); !ok {
t.Error("SortedUniqueList contains error")
}
wg.Done()
}(mockItem)
}
wg.Wait()
}
func TestSortedUniqueListLen(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, ul SortedUniqueList, items ...Item)
}{
{
name: "get length succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Len(), 2)
},
},
{
name: "get empty list length",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
assert.Equal(ul.Len(), 0)
},
},
{
name: "get same values length",
mock: func(m ...*mocks.MockItemMockRecorder) {},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[0])
assert.Equal(ul.Len(), 1)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockItems := []*mocks.MockItem{mocks.NewMockItem(ctl), mocks.NewMockItem(ctl)}
tc.mock(mockItems[0].EXPECT(), mockItems[1].EXPECT())
tc.expect(t, NewSortedUniqueList(), mockItems[0], mockItems[1])
})
}
}
func TestSortedUniqueListLen_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
ul := NewSortedUniqueList()
nums := rand.Perm(N)
var mockItems []Item
for _, v := range nums {
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes()
mockItems = append(mockItems, mockItem)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
elems := ul.Len()
for i := 0; i < N; i++ {
newElems := ul.Len()
if newElems < elems {
t.Errorf("Len shrunk from %v to %v", elems, newElems)
}
}
wg.Done()
}()
for _, mockItem := range mockItems {
ul.Insert(mockItem)
}
wg.Wait()
}
func TestSortedUniqueListRange(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, ul SortedUniqueList, items ...Item)
}{
{
name: "range succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Len(), 2)
i := 0
ul.Range(func(item Item) bool {
assert.Equal(item, items[i])
i++
return true
})
},
},
{
name: "range multi values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
for i := range m {
m[i].SortedValue().Return(i).AnyTimes()
}
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
for _, item := range items {
ul.Insert(item)
}
ul.Insert(items[1])
assert.Equal(ul.Len(), 10)
i := 0
ul.Range(func(item Item) bool {
assert.Equal(item, items[i])
i++
return true
})
},
},
{
name: "range stoped",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Len(), 2)
ul.Range(func(item Item) bool {
assert.Equal(item, items[0])
return false
})
},
},
{
name: "range same values",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).AnyTimes(),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[0])
ul.Insert(items[0])
assert.Equal(ul.Len(), 1)
count := 0
ul.Range(func(item Item) bool {
assert.Equal(item, items[0])
count++
return true
})
assert.Equal(count, 1)
},
},
{
name: "range empty list",
mock: func(m ...*mocks.MockItemMockRecorder) {
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
count := 0
ul.Range(func(item Item) bool {
count++
return true
})
assert.Equal(count, 0)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
var mockItems []Item
var mockItemRecorders []*mocks.MockItemMockRecorder
for i := 0; i < 10; i++ {
mockItem := mocks.NewMockItem(ctl)
mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT())
mockItems = append(mockItems, mockItem)
}
tc.mock(mockItemRecorders...)
tc.expect(t, NewSortedUniqueList(), mockItems...)
})
}
}
func TestSortedUniqueListRange_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
ul := NewSortedUniqueList()
nums := rand.Perm(N)
var mockItems []Item
for _, v := range nums {
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes()
mockItems = append(mockItems, mockItem)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
i := 0
ul.Range(func(_ Item) bool {
i++
return true
})
j := 0
ul.Range(func(_ Item) bool {
j++
return true
})
if j < i {
t.Errorf("Values shrunk from %v to %v", i, j)
}
wg.Done()
}()
for _, mockItem := range mockItems {
ul.Insert(mockItem)
}
wg.Wait()
}
func TestSortedUniqueListReverseRange(t *testing.T) {
tests := []struct {
name string
mock func(m ...*mocks.MockItemMockRecorder)
expect func(t *testing.T, ul SortedUniqueList, items ...Item)
}{
{
name: "reverse range succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Len(), 2)
i := 1
ul.ReverseRange(func(item Item) bool {
assert.Equal(item, items[i])
i--
return true
})
},
},
{
name: "reverse range multi values succeeded",
mock: func(m ...*mocks.MockItemMockRecorder) {
for i := range m {
m[i].SortedValue().Return(i).AnyTimes()
}
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
for _, item := range items {
ul.Insert(item)
}
ul.Insert(items[1])
assert.Equal(ul.Len(), 10)
i := 9
ul.Range(func(item Item) bool {
assert.Equal(item, items[i])
i--
return true
})
},
},
{
name: "reverse range stoped",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).Times(1),
m[1].SortedValue().Return(1).Times(1),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[1])
assert.Equal(ul.Len(), 2)
ul.ReverseRange(func(item Item) bool {
assert.Equal(item, items[0])
return false
})
},
},
{
name: "reverse range same values",
mock: func(m ...*mocks.MockItemMockRecorder) {
gomock.InOrder(
m[0].SortedValue().Return(0).AnyTimes(),
)
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
ul.Insert(items[0])
ul.Insert(items[0])
ul.Insert(items[0])
assert.Equal(ul.Len(), 1)
count := 0
ul.ReverseRange(func(item Item) bool {
assert.Equal(item, items[0])
count++
return true
})
assert.Equal(count, 1)
},
},
{
name: "reverse range empty list",
mock: func(m ...*mocks.MockItemMockRecorder) {
},
expect: func(t *testing.T, ul SortedUniqueList, items ...Item) {
assert := assert.New(t)
count := 0
ul.ReverseRange(func(item Item) bool {
count++
return true
})
assert.Equal(count, 0)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
var mockItems []Item
var mockItemRecorders []*mocks.MockItemMockRecorder
for i := 0; i < 10; i++ {
mockItem := mocks.NewMockItem(ctl)
mockItemRecorders = append(mockItemRecorders, mockItem.EXPECT())
mockItems = append(mockItems, mockItem)
}
tc.mock(mockItemRecorders...)
tc.expect(t, NewSortedUniqueList(), mockItems...)
})
}
}
func TestSortedUniqueListReverseRange_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
ctl := gomock.NewController(t)
defer ctl.Finish()
ul := NewSortedUniqueList()
nums := rand.Perm(N)
var mockItems []Item
for _, v := range nums {
mockItem := mocks.NewMockItem(ctl)
mockItem.EXPECT().SortedValue().DoAndReturn(func() int { return v }).AnyTimes()
mockItems = append(mockItems, mockItem)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
i := 0
ul.ReverseRange(func(_ Item) bool {
i++
return true
})
j := 0
ul.ReverseRange(func(_ Item) bool {
j++
return true
})
if j < i {
t.Errorf("Values shrunk from %v to %v", i, j)
}
wg.Done()
}()
for _, mockItem := range mockItems {
ul.Insert(mockItem)
}
wg.Wait()
}
func BenchmarkSortedUniqueListInsert(b *testing.B) {
ul := NewSortedUniqueList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItems = append(mockItems, &item{id: i})
}
b.ResetTimer()
for _, mockItem := range mockItems {
ul.Insert(mockItem)
}
}
func BenchmarkSortedUniqueListRemove(b *testing.B) {
ul := NewSortedUniqueList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItem := &item{id: i}
ul.Insert(mockItem)
mockItems = append(mockItems, mockItem)
}
b.ResetTimer()
for _, mockItem := range mockItems {
ul.Remove(mockItem)
}
}
func BenchmarkSortedUniqueListContains(b *testing.B) {
ul := NewSortedUniqueList()
var mockItems []*item
for i := 0; i < b.N; i++ {
mockItem := &item{id: i}
ul.Insert(mockItem)
mockItems = append(mockItems, mockItem)
}
b.ResetTimer()
for _, mockItem := range mockItems {
ul.Contains(mockItem)
}
}
func BenchmarkSortedUniqueListRange(b *testing.B) {
ul := NewSortedUniqueList()
for i := 0; i < b.N; i++ {
mockItem := item{id: i}
ul.Insert(&mockItem)
}
b.ResetTimer()
ul.Range(func(_ Item) bool { return true })
}
func BenchmarkSortedUniqueListReverseRange(b *testing.B) {
ul := NewSortedUniqueList()
for i := 0; i < b.N; i++ {
mockItem := item{id: i}
ul.Insert(&mockItem)
}
b.ResetTimer()
ul.ReverseRange(func(item Item) bool { return true })
}

View File

@ -0,0 +1,101 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package set
import (
"sync"
)
type SafeSet interface {
Values() []interface{}
Add(interface{}) bool
Delete(interface{})
Contains(...interface{}) bool
Len() uint
Range(func(interface{}) bool)
}
type safeSet struct {
mu *sync.RWMutex
data map[interface{}]struct{}
}
func NewSafeSet() SafeSet {
return &safeSet{
mu: &sync.RWMutex{},
data: make(map[interface{}]struct{}),
}
}
func (s *safeSet) Values() []interface{} {
var result []interface{}
s.Range(func(v interface{}) bool {
result = append(result, v)
return true
})
return result
}
func (s *safeSet) Add(v interface{}) bool {
s.mu.RLock()
_, found := s.data[v]
if found {
s.mu.RUnlock()
return false
}
s.mu.RUnlock()
s.mu.Lock()
defer s.mu.Unlock()
s.data[v] = struct{}{}
return true
}
func (s *safeSet) Delete(v interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.data, v)
}
func (s *safeSet) Contains(vals ...interface{}) bool {
s.mu.RLock()
defer s.mu.RUnlock()
for _, v := range vals {
if _, ok := s.data[v]; !ok {
return false
}
}
return true
}
func (s *safeSet) Len() uint {
s.mu.RLock()
defer s.mu.RUnlock()
return uint(len(s.data))
}
func (s *safeSet) Range(fn func(interface{}) bool) {
s.mu.RLock()
defer s.mu.RUnlock()
for v := range s.data {
if !fn(v) {
break
}
}
}

View File

@ -0,0 +1,382 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package set
import (
"math/rand"
"runtime"
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
const N = 1000
func TestSafeSetAdd(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, ok bool, s Set, value interface{})
}{
{
name: "add value succeeded",
value: "foo",
expect: func(t *testing.T, ok bool, s Set, value interface{}) {
assert := assert.New(t)
assert.Equal(ok, true)
assert.Equal(s.Values(), []interface{}{value})
},
},
{
name: "add value failed",
value: "foo",
expect: func(t *testing.T, _ bool, s Set, value interface{}) {
assert := assert.New(t)
ok := s.Add("foo")
assert.Equal(ok, false)
assert.Equal(s.Values(), []interface{}{value})
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := NewSafeSet()
tc.expect(t, s.Add(tc.value), s, tc.value)
})
}
}
func TestSafeSetAdd_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
s := NewSafeSet()
nums := rand.Perm(N)
var wg sync.WaitGroup
wg.Add(len(nums))
for i := 0; i < len(nums); i++ {
go func(i int) {
s.Add(i)
wg.Done()
}(i)
}
wg.Wait()
for _, n := range nums {
if !s.Contains(n) {
t.Errorf("Set is missing element: %v", n)
}
}
}
func TestSafeSetDelete(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, s Set, value interface{})
}{
{
name: "delete value succeeded",
value: "foo",
expect: func(t *testing.T, s Set, value interface{}) {
assert := assert.New(t)
s.Delete(value)
assert.Equal(s.Len(), uint(0))
},
},
{
name: "delete value does not exist",
value: "foo",
expect: func(t *testing.T, s Set, _ interface{}) {
assert := assert.New(t)
s.Delete("bar")
assert.Equal(s.Len(), uint(1))
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := NewSafeSet()
s.Add(tc.value)
tc.expect(t, s, tc.value)
})
}
}
func TestSafeSetDelete_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
s := NewSafeSet()
nums := rand.Perm(N)
for _, v := range nums {
s.Add(v)
}
var wg sync.WaitGroup
wg.Add(len(nums))
for _, v := range nums {
go func(i int) {
s.Delete(i)
wg.Done()
}(v)
}
wg.Wait()
if s.Len() != 0 {
t.Errorf("Expected len 0; got %v", s.Len())
}
}
func TestSafeSetContains(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, s Set, value interface{})
}{
{
name: "contains value succeeded",
value: "foo",
expect: func(t *testing.T, s Set, value interface{}) {
assert := assert.New(t)
assert.Equal(s.Contains(value), true)
},
},
{
name: "contains value does not exist",
value: "foo",
expect: func(t *testing.T, s Set, _ interface{}) {
assert := assert.New(t)
assert.Equal(s.Contains("bar"), false)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := NewSafeSet()
s.Add(tc.value)
tc.expect(t, s, tc.value)
})
}
}
func TestSafeSetContains_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
s := NewSafeSet()
nums := rand.Perm(N)
interfaces := make([]interface{}, 0)
for _, v := range nums {
s.Add(v)
interfaces = append(interfaces, v)
}
var wg sync.WaitGroup
for range nums {
wg.Add(1)
go func() {
s.Contains(interfaces...)
wg.Done()
}()
}
wg.Wait()
}
func TestSetSafeLen(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
}{
{
name: "get length succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
assert.Equal(s.Len(), uint(1))
},
},
{
name: "get empty set length",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
assert.Equal(s.Len(), uint(0))
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := NewSafeSet()
tc.expect(t, s)
})
}
}
func TestSafeSetLen_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
s := NewSafeSet()
var wg sync.WaitGroup
wg.Add(1)
go func() {
elems := s.Len()
for i := 0; i < N; i++ {
newElems := s.Len()
if newElems < elems {
t.Errorf("Len shrunk from %v to %v", elems, newElems)
}
}
wg.Done()
}()
for i := 0; i < N; i++ {
s.Add(rand.Int())
}
wg.Wait()
}
func TestSafeSetValues(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
}{
{
name: "get values succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
assert.Equal(s.Values(), []interface{}{"foo"})
},
},
{
name: "get empty values",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
assert.Equal(s.Values(), []interface{}(nil))
},
},
{
name: "get multi values succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
s.Add("bar")
assert.Contains(s.Values(), "bar")
assert.Contains(s.Values(), "foo")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := NewSafeSet()
tc.expect(t, s)
})
}
}
func TestSafeSetValues_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
s := NewSafeSet()
var wg sync.WaitGroup
wg.Add(1)
go func() {
elems := s.Values()
for i := 0; i < N; i++ {
newElems := s.Values()
if len(newElems) < len(elems) {
t.Errorf("Values shrunk from %v to %v", elems, newElems)
}
}
wg.Done()
}()
for i := 0; i < N; i++ {
s.Add(rand.Int())
}
wg.Wait()
}
func TestSafeSetRange(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
}{
{
name: "range succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
s.Range(func(v interface{}) bool {
assert.Equal(v, "foo")
return true
})
},
},
{
name: "range failed",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
s.Add("bar")
s.Range(func(v interface{}) bool {
assert.Equal(s.Contains(v), true)
return false
})
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := NewSafeSet()
tc.expect(t, s)
})
}
}
func TestSafeSetRange_Concurrent(t *testing.T) {
runtime.GOMAXPROCS(2)
s := NewSafeSet()
var wg sync.WaitGroup
wg.Add(1)
go func() {
elems := s.Values()
i := 0
s.Range(func(v interface{}) bool {
i++
return true
})
if i < len(elems) {
t.Errorf("Values shrunk from %v to %v", elems, i)
}
wg.Done()
}()
for i := 0; i < N; i++ {
s.Add(rand.Int())
}
wg.Wait()
}

78
pkg/container/set/set.go Normal file
View File

@ -0,0 +1,78 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package set
type Set interface {
Values() []interface{}
Add(interface{}) bool
Delete(interface{})
Contains(...interface{}) bool
Len() uint
Range(func(interface{}) bool)
}
type set map[interface{}]struct{}
func New() Set {
return &set{}
}
func (s *set) Values() []interface{} {
var result []interface{}
s.Range(func(v interface{}) bool {
result = append(result, v)
return true
})
return result
}
func (s *set) Add(v interface{}) bool {
_, found := (*s)[v]
if found {
return false
}
(*s)[v] = struct{}{}
return true
}
func (s *set) Delete(v interface{}) {
delete(*s, v)
}
func (s *set) Contains(vals ...interface{}) bool {
for _, v := range vals {
if _, ok := (*s)[v]; !ok {
return false
}
}
return true
}
func (s *set) Len() uint {
return uint(len(*s))
}
func (s *set) Range(fn func(interface{}) bool) {
for v := range *s {
if !fn(v) {
break
}
}
}

View File

@ -0,0 +1,234 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package set
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSetAdd(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, ok bool, s Set, value interface{})
}{
{
name: "add value succeeded",
value: "foo",
expect: func(t *testing.T, ok bool, s Set, value interface{}) {
assert := assert.New(t)
assert.Equal(ok, true)
assert.Equal(s.Values(), []interface{}{value})
},
},
{
name: "add value failed",
value: "foo",
expect: func(t *testing.T, _ bool, s Set, value interface{}) {
assert := assert.New(t)
ok := s.Add("foo")
assert.Equal(ok, false)
assert.Equal(s.Values(), []interface{}{value})
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := New()
tc.expect(t, s.Add(tc.value), s, tc.value)
})
}
}
func TestSetDelete(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, s Set, value interface{})
}{
{
name: "delete value succeeded",
value: "foo",
expect: func(t *testing.T, s Set, value interface{}) {
assert := assert.New(t)
s.Delete(value)
assert.Equal(s.Len(), uint(0))
},
},
{
name: "delete value does not exist",
value: "foo",
expect: func(t *testing.T, s Set, _ interface{}) {
assert := assert.New(t)
s.Delete("bar")
assert.Equal(s.Len(), uint(1))
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := New()
s.Add(tc.value)
tc.expect(t, s, tc.value)
})
}
}
func TestSetContains(t *testing.T) {
tests := []struct {
name string
value interface{}
expect func(t *testing.T, s Set, value interface{})
}{
{
name: "contains value succeeded",
value: "foo",
expect: func(t *testing.T, s Set, value interface{}) {
assert := assert.New(t)
assert.Equal(s.Contains(value), true)
},
},
{
name: "contains value does not exist",
value: "foo",
expect: func(t *testing.T, s Set, _ interface{}) {
assert := assert.New(t)
assert.Equal(s.Contains("bar"), false)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := New()
s.Add(tc.value)
tc.expect(t, s, tc.value)
})
}
}
func TestSetLen(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
}{
{
name: "get length succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
assert.Equal(s.Len(), uint(1))
},
},
{
name: "get empty set length",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
assert.Equal(s.Len(), uint(0))
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := New()
tc.expect(t, s)
})
}
}
func TestSetValues(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
}{
{
name: "get values succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
assert.Equal(s.Values(), []interface{}{"foo"})
},
},
{
name: "get empty values",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
assert.Equal(s.Values(), []interface{}(nil))
},
},
{
name: "get multi values succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
s.Add("bar")
assert.Contains(s.Values(), "bar")
assert.Contains(s.Values(), "foo")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := New()
tc.expect(t, s)
})
}
}
func TestSetRange(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, s Set)
}{
{
name: "range succeeded",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
s.Range(func(v interface{}) bool {
assert.Equal(v, "foo")
return true
})
},
},
{
name: "range failed",
expect: func(t *testing.T, s Set) {
assert := assert.New(t)
s.Add("foo")
s.Add("bar")
s.Range(func(v interface{}) bool {
assert.Equal(s.Contains(v), true)
return false
})
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s := New()
tc.expect(t, s)
})
}
}

View File

@ -1,49 +0,0 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sortedlist
type empty struct{}
type bucket struct {
count int
buckets []map[Item]empty
}
func (b *bucket) Size() int {
return b.count
}
func (b *bucket) Add(key int, data Item) {
for key >= len(b.buckets) {
b.buckets = append(b.buckets, make(map[Item]empty))
}
b.buckets[key][data] = empty{}
b.count++
return
}
func (b *bucket) Delete(key int, data Item) {
for key >= len(b.buckets) {
b.buckets = append(b.buckets, make(map[Item]empty))
}
_, ok := b.buckets[key][data]
if ok {
delete(b.buckets[key], data)
b.count--
}
return
}

View File

@ -1,237 +0,0 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sortedlist
import (
"fmt"
"sync"
)
const BucketMaxLength = 100000
const InnerBucketMaxLength = 10000
type Item interface {
GetSortKeys() (key1 int, key2 int)
}
type SortedList struct {
l sync.RWMutex
buckets []bucket
keyMap map[Item]int
left int
right int
}
func NewSortedList() *SortedList {
l := &SortedList{
left: 0,
right: 0,
keyMap: make(map[Item]int),
}
return l
}
func (l *SortedList) Add(data Item) (err error) {
key1, key2 := data.GetSortKeys()
if key1 > BucketMaxLength || key1 < 0 {
return fmt.Errorf("sorted list key1 out of range")
}
if key2 > InnerBucketMaxLength || key2 < 0 {
return fmt.Errorf("sorted list key2 out of range")
}
l.l.Lock()
defer l.l.Unlock()
l.addItem(key1, key2, data)
return
}
func (l *SortedList) Update(data Item) (err error) {
key1, key2 := data.GetSortKeys()
if key1 > BucketMaxLength || key1 < 0 {
return fmt.Errorf("sorted list key1 out of range")
}
if key2 > InnerBucketMaxLength || key2 < 0 {
return fmt.Errorf("sorted list key2 out of range")
}
l.l.Lock()
defer l.l.Unlock()
oldKey1, oldKey2, ok := l.getKeyMapKey(data)
if !ok {
return
}
if key1 == oldKey1 && key2 == oldKey2 {
return
}
l.deleteItem(oldKey1, oldKey2, data)
l.addItem(key1, key2, data)
return
}
func (l *SortedList) UpdateOrAdd(data Item) (err error) {
key1, key2 := data.GetSortKeys()
if key1 > BucketMaxLength || key1 < 0 {
return fmt.Errorf("sorted list key1 out of range")
}
if key2 > InnerBucketMaxLength || key2 < 0 {
return fmt.Errorf("sorted list key2 out of range")
}
l.l.Lock()
defer l.l.Unlock()
oldKey1, oldKey2, ok := l.getKeyMapKey(data)
if !ok {
l.addItem(key1, key2, data)
return
}
if key1 == oldKey1 && key2 == oldKey2 {
return
}
l.deleteItem(oldKey1, oldKey2, data)
l.addItem(key1, key2, data)
return
}
func (l *SortedList) Delete(data Item) (err error) {
l.l.Lock()
defer l.l.Unlock()
oldKey1, oldKey2, ok := l.getKeyMapKey(data)
if !ok {
return
}
l.deleteItem(oldKey1, oldKey2, data)
return
}
func (l *SortedList) Range(fn func(data Item) bool) {
l.RangeLimit(-1, fn)
}
func (l *SortedList) RangeLimit(limit int, fn func(Item) bool) {
if limit == 0 {
return
}
l.l.RLock()
defer l.l.RUnlock()
if len(l.buckets) == 0 {
return
}
count := 0
for i := l.left; i <= l.right; i++ {
buc := l.buckets[i]
for _, b := range buc.buckets {
for it := range b {
if !fn(it) {
return
}
count++
if limit > 0 && count >= limit {
return
}
}
}
}
}
func (l *SortedList) RangeReverse(fn func(data Item) bool) {
l.RangeReverseLimit(-1, fn)
}
func (l *SortedList) RangeReverseLimit(limit int, fn func(Item) bool) {
if limit == 0 {
return
}
l.l.RLock()
defer l.l.RUnlock()
if len(l.buckets) == 0 {
return
}
count := 0
for i := l.right; i >= l.left; i-- {
for j := len(l.buckets[i].buckets) - 1; j >= 0; j-- {
for it := range l.buckets[i].buckets[j] {
if !fn(it) {
return
}
count++
if limit > 0 && count >= limit {
return
}
}
}
}
}
func (l *SortedList) Size() int {
l.l.RLock()
defer l.l.RUnlock()
return len(l.keyMap)
}
func (l *SortedList) addItem(key1, key2 int, data Item) {
l.addKey(key1)
l.buckets[key1].Add(key2, data)
l.setKeyMapKey(key1, key2, data)
l.shrink()
}
func (l *SortedList) deleteItem(key1, key2 int, data Item) {
l.addKey(key1)
l.buckets[key1].Delete(key2, data)
l.deleteKeyMapKey(data)
l.shrink()
}
func (l *SortedList) addKey(key int) {
for key >= len(l.buckets) {
l.buckets = append(l.buckets, bucket{})
}
if l.right < key {
l.right = key
}
if l.left > key {
l.left = key
}
}
func (l *SortedList) shrink() {
for l.left < l.right && l.buckets[l.left].Size() == 0 {
l.left++
}
for l.left < l.right && l.buckets[l.right].Size() == 0 {
l.right--
}
}
func (l *SortedList) setKeyMapKey(key1, key2 int, data Item) {
l.keyMap[data] = key1*1000 + key2
}
func (l *SortedList) getKeyMapKey(data Item) (key1, key2 int, ok bool) {
key, ok := l.keyMap[data]
key1 = key / 1000
key2 = key % 1000
return
}
func (l *SortedList) deleteKeyMapKey(data Item) {
delete(l.keyMap, data)
}

View File

@ -1,190 +0,0 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sortedlist
import (
"fmt"
"math/rand"
"testing"
)
type item struct {
key1 int
key2 int
}
func newItem(key1, key2 int) *item {
return &item{key1, key2}
}
func (i *item) GetSortKeys() (int, int) {
return i.key1, i.key2
}
func TestAdd(t *testing.T) {
l := NewSortedList()
l.Add(newItem(1, 2))
l.Add(newItem(2, 2))
l.Add(newItem(2, 3))
if l.Size() != 3 {
t.Errorf("TestAdd failed count required[3] but get [%d]", l.Size())
}
}
func TestDelete(t *testing.T) {
l := NewSortedList()
it := newItem(1, 3)
l.Add(newItem(1, 2))
l.Add(newItem(2, 2))
l.Add(it)
l.Add(newItem(2, 3))
l.Delete(it)
if l.Size() != 3 {
t.Errorf("TestDelete failed count required[3] but get [%d]", l.Size())
}
}
func TestUpdate(t *testing.T) {
l := NewSortedList()
it := newItem(1, 3)
l.Add(newItem(1, 2))
l.Add(newItem(2, 2))
l.Add(it)
it.key1 = 2
l.Update(it)
l.Add(newItem(2, 3))
key1, key2, ok := l.getKeyMapKey(it)
if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok {
t.Errorf("TestUpdate failed count required[3] but get [%d]", l.Size())
}
}
func TestRange(t *testing.T) {
l := NewSortedList()
it := newItem(1, 3)
l.Add(newItem(1, 2))
l.Add(newItem(2, 2))
l.Add(it)
it.key1 = 2
l.Update(it)
l.Add(newItem(2, 3))
key1, key2, ok := l.getKeyMapKey(it)
if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok {
t.Errorf("TestUpdate failed count required[4] but get [%d]", l.Size())
}
count := 0
l.Range(func(data Item) bool {
it := data.(*item)
fmt.Println(it.key1, it.key2)
count++
return true
})
if l.Size() != count {
t.Errorf("TestRange failed count required[4] but get [%d]", l.Size())
}
}
func TestRangeLimit(t *testing.T) {
l := NewSortedList()
it := newItem(1, 3)
l.Add(newItem(1, 2))
l.Add(newItem(2, 2))
l.Add(it)
it.key1 = 2
l.Update(it)
l.Add(newItem(2, 3))
key1, key2, ok := l.getKeyMapKey(it)
if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok {
t.Errorf("TestUpdate failed count required[4] but get [%d]", l.Size())
}
count := 0
l.RangeLimit(2, func(data Item) bool {
it := data.(*item)
fmt.Println(it.key1, it.key2)
count++
return true
})
if 2 != count {
t.Errorf("TestRange failed count required[2] but get [%d]", count)
}
}
func TestRangeReverse(t *testing.T) {
l := NewSortedList()
it := newItem(1, 3)
l.Add(newItem(1, 2))
l.Add(newItem(2, 2))
l.Add(it)
it.key1 = 2
l.Update(it)
l.Add(newItem(2, 3))
key1, key2, ok := l.getKeyMapKey(it)
if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok {
t.Errorf("TestRangeReverse failed count required[4] but get [%d]", l.Size())
}
count := 0
l.RangeReverse(func(data Item) bool {
it := data.(*item)
fmt.Println(it.key1, it.key2)
count++
return true
})
if l.Size() != count {
t.Errorf("TestRangeReverse failed count required[4] but get [%d]", l.Size())
}
}
func TestRangeReverseLimit(t *testing.T) {
l := NewSortedList()
it := newItem(1, 3)
l.Add(newItem(1, 2))
l.Add(newItem(2, 2))
l.Add(it)
it.key1 = 2
l.Update(it)
l.Add(newItem(2, 3))
key1, key2, ok := l.getKeyMapKey(it)
if l.Size() != 4 || key1 != 2 || key2 != 3 || !ok {
t.Errorf("TestRangeReverseLimit failed count required[4] but get [%d]", l.Size())
}
count := 0
l.RangeReverseLimit(2, func(data Item) bool {
it := data.(*item)
fmt.Println(it.key1, it.key2)
count++
return true
})
if 2 != count {
t.Errorf("TestRangeReverseLimit failed count required[4] but get [%d]", l.Size())
}
}
func BenchmarkAdd(b *testing.B) {
b.ResetTimer()
l := NewSortedList()
for i := 0; i < b.N; i++ {
l.Add(newItem(rand.Intn(BucketMaxLength), rand.Intn(InnerBucketMaxLength)))
}
if b.N != l.Size() {
b.Errorf("BenchmarkAdd failed count required[%d] but get [%d]", b.N, l.Size())
}
fmt.Println(l.Size(), b.N)
}

View File

@ -27,8 +27,8 @@ import (
"d7y.io/dragonfly/v2/internal/dfcodes" "d7y.io/dragonfly/v2/internal/dfcodes"
"d7y.io/dragonfly/v2/internal/dferrors" "d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/list"
schedulerRPC "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerRPC "d7y.io/dragonfly/v2/pkg/rpc/scheduler"
"d7y.io/dragonfly/v2/pkg/structure/sortedlist"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/core/scheduler" "d7y.io/dragonfly/v2/scheduler/core/scheduler"
"d7y.io/dragonfly/v2/scheduler/supervisor" "d7y.io/dragonfly/v2/scheduler/supervisor"
@ -371,8 +371,12 @@ func constructSuccessPeerPacket(peer *supervisor.Peer, parent *supervisor.Peer,
func handleCDNSeedTaskFail(task *supervisor.Task) { func handleCDNSeedTaskFail(task *supervisor.Task) {
if task.CanBackToSource() { if task.CanBackToSource() {
task.GetPeers().Range(func(data sortedlist.Item) bool { task.GetPeers().Range(func(item list.Item) bool {
peer := data.(*supervisor.Peer) peer, ok := item.(*supervisor.Peer)
if !ok {
return true
}
if task.CanBackToSource() { if task.CanBackToSource() {
if !task.ContainsBackToSourcePeer(peer.ID) { if !task.ContainsBackToSourcePeer(peer.ID) {
if peer.CloseChannelWithError(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source because cdn seed task failed", peer.ID)) == nil { if peer.CloseChannelWithError(dferrors.Newf(dfcodes.SchedNeedBackSource, "peer %s need back source because cdn seed task failed", peer.ID)) == nil {
@ -381,12 +385,17 @@ func handleCDNSeedTaskFail(task *supervisor.Task) {
} }
return true return true
} }
return false return false
}) })
} else { } else {
task.SetStatus(supervisor.TaskStatusFail) task.SetStatus(supervisor.TaskStatusFail)
task.GetPeers().Range(func(data sortedlist.Item) bool { task.GetPeers().Range(func(item list.Item) bool {
peer := data.(*supervisor.Peer) peer, ok := item.(*supervisor.Peer)
if !ok {
return true
}
if err := peer.CloseChannelWithError(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")); err != nil { if err := peer.CloseChannelWithError(dferrors.New(dfcodes.SchedTaskStatusError, "schedule task status failed")); err != nil {
peer.Log().Warnf("close peer conn channel failed: %v", err) peer.Log().Warnf("close peer conn channel failed: %v", err)
} }

View File

@ -142,7 +142,7 @@ func (s *Scheduler) ScheduleParent(peer *supervisor.Peer, blankParents sets.Stri
func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int, blankChildren sets.String) (candidateChildren []*supervisor.Peer) { func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int, blankChildren sets.String) (candidateChildren []*supervisor.Peer) {
peer.Log().Debug("start schedule children flow") peer.Log().Debug("start schedule children flow")
defer peer.Log().Debugf("finish schedule children flow, select num %d candidate children, "+ defer peer.Log().Debugf("finish schedule children flow, select num %d candidate children, "+
"current task tree node count %d, back source peers: %v", len(candidateChildren), peer.Task.GetPeers().Size(), peer.Task.GetBackToSourcePeers()) "current task tree node count %d, back source peers: %v", len(candidateChildren), peer.Task.GetPeers().Len(), peer.Task.GetBackToSourcePeers())
candidateChildren = peer.Task.Pick(limit, func(candidateNode *supervisor.Peer) bool { candidateChildren = peer.Task.Pick(limit, func(candidateNode *supervisor.Peer) bool {
if candidateNode == nil { if candidateNode == nil {
peer.Log().Debugf("******candidate child peer is not selected because it is nil******") peer.Log().Debugf("******candidate child peer is not selected because it is nil******")
@ -215,7 +215,7 @@ func (s *Scheduler) selectCandidateChildren(peer *supervisor.Peer, limit int, bl
func (s *Scheduler) selectCandidateParents(peer *supervisor.Peer, limit int, blankParents sets.String) (candidateParents []*supervisor.Peer) { func (s *Scheduler) selectCandidateParents(peer *supervisor.Peer, limit int, blankParents sets.String) (candidateParents []*supervisor.Peer) {
peer.Log().Debug("start schedule parent flow") peer.Log().Debug("start schedule parent flow")
defer peer.Log().Debugf("finish schedule parent flow, select num %d candidates parents, "+ defer peer.Log().Debugf("finish schedule parent flow, select num %d candidates parents, "+
"current task tree node count %d, back source peers: %v", len(candidateParents), peer.Task.GetPeers().Size(), peer.Task.GetBackToSourcePeers()) "current task tree node count %d, back source peers: %v", len(candidateParents), peer.Task.GetPeers().Len(), peer.Task.GetBackToSourcePeers())
if !peer.Task.CanSchedule() { if !peer.Task.CanSchedule() {
peer.Log().Debugf("++++++peer can not be scheduled because task cannot be scheduled at this timewaiting task status become seeding. "+ peer.Log().Debugf("++++++peer can not be scheduled because task cannot be scheduled at this timewaiting task status become seeding. "+
"it current status is %s++++++", peer.Task.GetStatus()) "it current status is %s++++++", peer.Task.GetStatus())

View File

@ -26,6 +26,11 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
) )
const (
// When using the manager configuration parameter, limit the maximum load number to 5000
HostMaxLoad = 5 * 1000
)
type HostManager interface { type HostManager interface {
// Add host // Add host
Add(*Host) Add(*Host)

View File

@ -1,4 +1,3 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: d7y.io/dragonfly/v2/scheduler/supervisor (interfaces: HostManager) // Source: d7y.io/dragonfly/v2/scheduler/supervisor (interfaces: HostManager)
// Package mocks is a generated GoMock package. // Package mocks is a generated GoMock package.
@ -7,8 +6,9 @@ package mocks
import ( import (
reflect "reflect" reflect "reflect"
supervisor "d7y.io/dragonfly/v2/scheduler/supervisor"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
supervisor "d7y.io/dragonfly/v2/scheduler/supervisor"
) )
// MockHostManager is a mock of HostManager interface. // MockHostManager is a mock of HostManager interface.

View File

@ -116,7 +116,7 @@ func (m *peerManager) Delete(id string) {
func (m *peerManager) GetPeersByTask(taskID string) []*Peer { func (m *peerManager) GetPeersByTask(taskID string) []*Peer {
var peers []*Peer var peers []*Peer
m.peers.Range(func(key, value interface{}) bool { m.peers.Range(func(_, value interface{}) bool {
peer := value.(*Peer) peer := value.(*Peer)
if peer.Task.ID == taskID { if peer.Task.ID == taskID {
peers = append(peers, peer) peers = append(peers, peer)
@ -152,7 +152,7 @@ func (m *peerManager) RunGC() error {
if peer.Host.GetPeersLen() == 0 { if peer.Host.GetPeersLen() == 0 {
m.hostManager.Delete(peer.Host.UUID) m.hostManager.Delete(peer.Host.UUID)
} }
if peer.Task.GetPeers().Size() == 0 { if peer.Task.GetPeers().Len() == 0 {
peer.Task.Log().Info("peers is empty, task status become waiting") peer.Task.Log().Info("peers is empty, task status become waiting")
peer.Task.SetStatus(TaskStatusWaiting) peer.Task.SetStatus(TaskStatusWaiting)
} }
@ -396,22 +396,23 @@ func (peer *Peer) UpdateProgress(finishedCount int32, cost int) {
peer.Task.UpdatePeer(peer) peer.Task.UpdatePeer(peer)
return return
} }
} }
func (peer *Peer) GetSortKeys() (key1, key2 int) { func (peer *Peer) SortedValue() int {
peer.lock.RLock() peer.lock.RLock()
defer peer.lock.RUnlock() defer peer.lock.RUnlock()
key1 = int(peer.TotalPieceCount.Load()) pieceCount := peer.TotalPieceCount.Load()
key2 = peer.getFreeLoad() hostLoad := peer.getFreeLoad()
return return int(pieceCount*HostMaxLoad + hostLoad)
} }
func (peer *Peer) getFreeLoad() int { func (peer *Peer) getFreeLoad() int32 {
if peer.Host == nil { if peer.Host == nil {
return 0 return 0
} }
return int(peer.Host.GetFreeUploadLoad()) return peer.Host.GetFreeUploadLoad()
} }
func (peer *Peer) SetStatus(status PeerStatus) { func (peer *Peer) SetStatus(status PeerStatus) {

View File

@ -29,6 +29,10 @@ import (
"d7y.io/dragonfly/v2/scheduler/supervisor/mocks" "d7y.io/dragonfly/v2/scheduler/supervisor/mocks"
) )
const (
HostMaxLoad = 5 * 1000
)
func TestPeer_New(t *testing.T) { func TestPeer_New(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@ -209,9 +213,7 @@ func TestPeer_Cost(t *testing.T) {
average, success := peer.GetPieceAverageCost() average, success := peer.GetPieceAverageCost()
assert.True(success) assert.True(success)
assert.Equal(4, average) assert.Equal(4, average)
finishedCountFetch, loadFetch := peer.GetSortKeys() assert.Equal(peer.SortedValue(), 4*HostMaxLoad+100)
assert.Equal(4, finishedCountFetch)
assert.Equal(100, loadFetch)
}, },
}, },
{ {

View File

@ -24,9 +24,9 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/list"
gc "d7y.io/dragonfly/v2/pkg/gc" gc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/structure/sortedlist"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
) )
@ -109,7 +109,7 @@ func (m *taskManager) RunGC() error {
task.SetStatus(TaskStatusZombie) task.SetStatus(TaskStatusZombie)
} }
if task.GetPeers().Size() == 0 { if task.GetPeers().Len() == 0 {
task.Log().Info("peers is empty, task status become waiting") task.Log().Info("peers is empty, task status become waiting")
task.SetStatus(TaskStatusWaiting) task.SetStatus(TaskStatusWaiting)
} }
@ -178,8 +178,8 @@ type Task struct {
lastAccessAt *atomic.Time lastAccessAt *atomic.Time
// status is task status and type is TaskStatus // status is task status and type is TaskStatus
status atomic.Value status atomic.Value
// peers is peer list // peers is peer sorted unique list
peers *sortedlist.SortedList peers list.SortedUniqueList
// BackToSourceWeight is back-to-source peer weight // BackToSourceWeight is back-to-source peer weight
BackToSourceWeight atomic.Int32 BackToSourceWeight atomic.Int32
// backToSourcePeers is back-to-source peers list // backToSourcePeers is back-to-source peers list
@ -205,7 +205,7 @@ func NewTask(id, url string, meta *base.UrlMeta) *Task {
lastAccessAt: atomic.NewTime(now), lastAccessAt: atomic.NewTime(now),
backToSourcePeers: []string{}, backToSourcePeers: []string{},
pieces: &sync.Map{}, pieces: &sync.Map{},
peers: sortedlist.NewSortedList(), peers: list.NewSortedUniqueList(),
logger: logger.WithTaskID(id), logger: logger.WithTaskID(id),
} }
@ -263,30 +263,18 @@ func (task *Task) UpdateSuccess(pieceCount int32, contentLength int64) {
} }
func (task *Task) AddPeer(peer *Peer) { func (task *Task) AddPeer(peer *Peer) {
err := task.peers.UpdateOrAdd(peer) task.peers.Insert(peer)
if err != nil {
task.logger.Errorf("add peer %s failed: %v", peer.ID, err)
}
task.logger.Debugf("peer %s has been added, current total peer count is %d", peer.ID, task.peers.Size())
} }
func (task *Task) UpdatePeer(peer *Peer) { func (task *Task) UpdatePeer(peer *Peer) {
err := task.peers.Update(peer) task.peers.Insert(peer)
if err != nil {
task.logger.Errorf("update peer %s failed: %v", peer.ID, err)
}
task.logger.Debugf("peer %s has been updated, current total peer count is %d", peer.ID, task.peers.Size())
} }
func (task *Task) DeletePeer(peer *Peer) { func (task *Task) DeletePeer(peer *Peer) {
err := task.peers.Delete(peer) task.peers.Remove(peer)
if err != nil {
task.logger.Errorf("delete peer %s failed: %v", peer.ID, err)
}
task.logger.Debugf("peer %s has been deleted, current total peer count is %d", peer.ID, task.peers.Size())
} }
func (task *Task) GetPeers() *sortedlist.SortedList { func (task *Task) GetPeers() list.SortedUniqueList {
return task.peers return task.peers
} }
@ -343,45 +331,46 @@ func (task *Task) GetBackToSourcePeers() []string {
return task.backToSourcePeers return task.backToSourcePeers
} }
func (task *Task) Pick(limit int, pickFn func(peer *Peer) bool) (pickedPeers []*Peer) { func (task *Task) Pick(limit int, pickFn func(peer *Peer) bool) []*Peer {
return task.pick(limit, false, pickFn) var peers []*Peer
}
func (task *Task) PickReverse(limit int, pickFn func(peer *Peer) bool) (pickedPeers []*Peer) { task.GetPeers().Range(func(item list.Item) bool {
return task.pick(limit, true, pickFn) if len(peers) >= limit {
}
func (task *Task) pick(limit int, reverse bool, pickFn func(peer *Peer) bool) (pickedPeers []*Peer) {
if pickFn == nil {
return
}
if !reverse {
task.GetPeers().Range(func(data sortedlist.Item) bool {
if len(pickedPeers) >= limit {
return false return false
} }
peer := data.(*Peer) peer, ok := item.(*Peer)
if pickFn(peer) { if !ok {
pickedPeers = append(pickedPeers, peer)
}
return true return true
})
return
} }
task.GetPeers().RangeReverse(func(data sortedlist.Item) bool {
if len(pickedPeers) >= limit {
return false
}
peer := data.(*Peer)
if pickFn(peer) { if pickFn(peer) {
pickedPeers = append(pickedPeers, peer) peers = append(peers, peer)
} }
return true return true
}) })
return return peers
}
func (task *Task) PickReverse(limit int, pickFn func(peer *Peer) bool) []*Peer {
var peers []*Peer
task.GetPeers().ReverseRange(func(item list.Item) bool {
if len(peers) >= limit {
return false
}
peer, ok := item.(*Peer)
if !ok {
return true
}
if pickFn(peer) {
peers = append(peers, peer)
}
return true
})
return peers
} }
func (task *Task) Log() *logger.SugaredLoggerOnWith { func (task *Task) Log() *logger.SugaredLoggerOnWith {

View File

@ -322,14 +322,6 @@ func TestTask_Pick(t *testing.T) {
limit: 100, limit: 100,
answer: []string{}, answer: []string{},
}, },
{
name: "invalid pickFn",
number: 10,
pick: (func(peer *supervisor.Peer) bool)(nil),
reverse: false,
limit: 100,
answer: []string{},
},
} }
for _, tc := range tests { for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {