mirror of https://github.com/grpc/grpc-go.git
				
				
				
			internal/wrr: introduce order offset to edf scheduler as a tiebreaker for entries with same deadline (#3800)
This commit is contained in:
		
							parent
							
								
									b54ea173dc
								
							
						
					
					
						commit
						7d8921505e
					
				|  | @ -24,8 +24,10 @@ import ( | |||
| 
 | ||||
| // edfWrr is a struct for EDF weighted round robin implementation.
 | ||||
| type edfWrr struct { | ||||
| 	lock  sync.Mutex | ||||
| 	items edfPriorityQueue | ||||
| 	lock               sync.Mutex | ||||
| 	items              edfPriorityQueue | ||||
| 	currentOrderOffset uint64 | ||||
| 	currentTime        float64 | ||||
| } | ||||
| 
 | ||||
| // NewEDF creates Earliest Deadline First (EDF)
 | ||||
|  | @ -38,17 +40,20 @@ func NewEDF() WRR { | |||
| 
 | ||||
| // edfEntry is an internal wrapper for item that also stores weight and relative position in the queue.
 | ||||
| type edfEntry struct { | ||||
| 	deadline float64 | ||||
| 	weight   int64 | ||||
| 	item     interface{} | ||||
| 	deadline    float64 | ||||
| 	weight      int64 | ||||
| 	orderOffset uint64 | ||||
| 	item        interface{} | ||||
| } | ||||
| 
 | ||||
| // edfPriorityQueue is a heap.Interface implementation for edfEntry elements.
 | ||||
| type edfPriorityQueue []*edfEntry | ||||
| 
 | ||||
| func (pq edfPriorityQueue) Len() int           { return len(pq) } | ||||
| func (pq edfPriorityQueue) Less(i, j int) bool { return pq[i].deadline < pq[j].deadline } | ||||
| func (pq edfPriorityQueue) Swap(i, j int)      { pq[i], pq[j] = pq[j], pq[i] } | ||||
| func (pq edfPriorityQueue) Len() int { return len(pq) } | ||||
| func (pq edfPriorityQueue) Less(i, j int) bool { | ||||
| 	return pq[i].deadline < pq[j].deadline || pq[i].deadline == pq[j].deadline && pq[i].orderOffset < pq[j].orderOffset | ||||
| } | ||||
| func (pq edfPriorityQueue) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } | ||||
| 
 | ||||
| func (pq *edfPriorityQueue) Push(x interface{}) { | ||||
| 	*pq = append(*pq, x.(*edfEntry)) | ||||
|  | @ -60,22 +65,16 @@ func (pq *edfPriorityQueue) Pop() interface{} { | |||
| 	return old[len(old)-1] | ||||
| } | ||||
| 
 | ||||
| // Current time in EDF scheduler.
 | ||||
| func (edf *edfWrr) currentTime() float64 { | ||||
| 	if len(edf.items) == 0 { | ||||
| 		return 0.0 | ||||
| 	} | ||||
| 	return edf.items[0].deadline | ||||
| } | ||||
| 
 | ||||
| func (edf *edfWrr) Add(item interface{}, weight int64) { | ||||
| 	edf.lock.Lock() | ||||
| 	defer edf.lock.Unlock() | ||||
| 	entry := edfEntry{ | ||||
| 		deadline: edf.currentTime() + 1.0/float64(weight), | ||||
| 		weight:   weight, | ||||
| 		item:     item, | ||||
| 		deadline:    edf.currentTime + 1.0/float64(weight), | ||||
| 		weight:      weight, | ||||
| 		item:        item, | ||||
| 		orderOffset: edf.currentOrderOffset, | ||||
| 	} | ||||
| 	edf.currentOrderOffset++ | ||||
| 	heap.Push(&edf.items, &entry) | ||||
| } | ||||
| 
 | ||||
|  | @ -86,7 +85,8 @@ func (edf *edfWrr) Next() interface{} { | |||
| 		return nil | ||||
| 	} | ||||
| 	item := edf.items[0] | ||||
| 	item.deadline = edf.currentTime() + 1.0/float64(item.weight) | ||||
| 	edf.currentTime = item.deadline | ||||
| 	item.deadline = edf.currentTime + 1.0/float64(item.weight) | ||||
| 	heap.Fix(&edf.items, 0) | ||||
| 	return item.item | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,35 @@ | |||
| /* | ||||
|  * | ||||
|  * Copyright 2020 gRPC authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| package wrr | ||||
| 
 | ||||
| import ( | ||||
| 	"testing" | ||||
| ) | ||||
| 
 | ||||
| func (s) TestEDFOnEndpointsWithSameWeight(t *testing.T) { | ||||
| 	wrr := NewEDF() | ||||
| 	wrr.Add("1", 1) | ||||
| 	wrr.Add("2", 1) | ||||
| 	wrr.Add("3", 1) | ||||
| 	expected := []string{"1", "2", "3", "1", "2", "3", "1", "2", "3", "1", "2", "3"} | ||||
| 	for i := 0; i < len(expected); i++ { | ||||
| 		item := wrr.Next().(string) | ||||
| 		if item != expected[i] { | ||||
| 			t.Errorf("wrr Next=%s, want=%s", item, expected[i]) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
		Loading…
	
		Reference in New Issue