1
0
mirror of https://github.com/gogf/gf.git synced 2025-04-05 11:18:50 +08:00

improve package gtimer for priority checks

This commit is contained in:
John Guo 2021-08-06 12:08:49 +08:00
parent 214d0513e5
commit 0e158903c2
10 changed files with 85 additions and 51 deletions

View File

@ -156,9 +156,7 @@ func (a *SortedArray) Append(values ...interface{}) *SortedArray {
if cmp > 0 {
index++
}
rear := append([]interface{}{}, a.array[index:]...)
a.array = append(a.array[0:index], value)
a.array = append(a.array, rear...)
a.array = append(a.array[:index], append([]interface{}{value}, a.array[index:]...)...)
}
return a
}

View File

@ -0,0 +1,39 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package garray_test
import (
"github.com/gogf/gf/container/garray"
"testing"
)
type anySortedArrayItem struct {
priority int64
value interface{}
}
var (
anyArray = garray.NewArray()
anySortedArray = garray.NewSortedArray(func(a, b interface{}) int {
return int(a.(anySortedArrayItem).priority - b.(anySortedArrayItem).priority)
})
)
func Benchmark_AnyArray_Add(b *testing.B) {
for i := 0; i < b.N; i++ {
anyArray.Append(i)
}
}
func Benchmark_AnySortedArray_Add(b *testing.B) {
for i := 0; i < b.N; i++ {
anySortedArray.Add(anySortedArrayItem{
priority: int64(i),
value: i,
})
}
}

View File

@ -139,7 +139,7 @@ func Database(name ...string) gdb.DB {
}
return db
} else {
// It panics often because it dose not find its configuration for given group.
// If panics, often because it does not find its configuration for given group.
panic(err)
}
return nil

View File

@ -24,6 +24,7 @@ import (
const (
// Short writes for common usage durations.
D = 24 * time.Hour
H = time.Hour
M = time.Minute

View File

@ -15,6 +15,7 @@ var (
// locationMap is time zone name to its location object.
// Time zone name is like: Asia/Shanghai.
locationMap = make(map[string]*time.Location)
// locationMu is used for concurrent safety for `locationMap`.
locationMu = sync.RWMutex{}
)

View File

@ -33,12 +33,12 @@ func (entry *Entry) Status() int {
// Run runs the timer job asynchronously.
func (entry *Entry) Run() {
leftRunningTimes := entry.times.Add(-1)
// Running times exceeding checks.
// It checks its running times exceeding.
if leftRunningTimes < 0 {
entry.status.Set(StatusClosed)
return
}
// This means it does not limit the running times.
// This means it has no limit in running times.
if leftRunningTimes == math.MaxInt32-1 {
entry.times.Set(math.MaxInt32)
}

View File

@ -18,9 +18,9 @@ import (
// high priority is served before an element with low priority.
// priorityQueue is based on heap structure.
type priorityQueue struct {
mu sync.RWMutex
heap *priorityQueueHeap // the underlying queue items manager using heap.
latestPriority *gtype.Int64 // latestPriority stores the most priority value of the heap, which is used to check if necessary to call the Pop of heap by Timer.
mu sync.Mutex
heap *priorityQueueHeap // the underlying queue items manager using heap.
nextPriority *gtype.Int64 // nextPriority stores the next priority value of the heap, which is used to check if necessary to call the Pop of heap by Timer.
}
// priorityQueueHeap is a heap manager, of which the underlying `array` is a array implementing a heap structure.
@ -37,25 +37,16 @@ type priorityQueueItem struct {
// newPriorityQueue creates and returns a priority queue.
func newPriorityQueue() *priorityQueue {
queue := &priorityQueue{
heap: &priorityQueueHeap{
array: make([]priorityQueueItem, 0),
},
latestPriority: gtype.NewInt64(math.MaxInt64),
heap: &priorityQueueHeap{array: make([]priorityQueueItem, 0)},
nextPriority: gtype.NewInt64(math.MaxInt64),
}
heap.Init(queue.heap)
return queue
}
// Len retrieves and returns the length of the queue.
func (q *priorityQueue) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()
return q.heap.Len()
}
// LatestPriority retrieves and returns the minimum and the most priority value of the queue.
func (q *priorityQueue) LatestPriority() int64 {
return q.latestPriority.Val()
// NextPriority retrieves and returns the minimum and the most priority value of the queue.
func (q *priorityQueue) NextPriority() int64 {
return q.nextPriority.Val()
}
// Push pushes a value to the queue.
@ -63,42 +54,30 @@ func (q *priorityQueue) LatestPriority() int64 {
// The lesser the `priority` value the higher priority of the `value`.
func (q *priorityQueue) Push(value interface{}, priority int64) {
q.mu.Lock()
defer q.mu.Unlock()
heap.Push(q.heap, priorityQueueItem{
value: value,
priority: priority,
})
q.mu.Unlock()
// Update the minimum priority using atomic operation.
for {
latestPriority := q.latestPriority.Val()
if priority >= latestPriority {
break
}
if q.latestPriority.Cas(latestPriority, priority) {
break
}
nextPriority := q.nextPriority.Val()
if priority >= nextPriority {
return
}
q.nextPriority.Set(priority)
}
// Pop retrieves, removes and returns the most high priority value from the queue.
func (q *priorityQueue) Pop() interface{} {
q.mu.Lock()
defer q.mu.Unlock()
if v := heap.Pop(q.heap); v != nil {
item := v.(priorityQueueItem)
q.mu.Unlock()
// Update the minimum priority using atomic operation.
for {
latestPriority := q.latestPriority.Val()
if item.priority >= latestPriority {
break
}
if q.latestPriority.Cas(latestPriority, item.priority) {
break
}
var nextPriority int64 = math.MaxInt64
if len(q.heap.array) > 0 {
nextPriority = q.heap.array[0].priority
}
return item.value
} else {
q.mu.Unlock()
q.nextPriority.Set(nextPriority)
return v.(priorityQueueItem).value
}
return nil
}

View File

@ -12,6 +12,7 @@ func (h *priorityQueueHeap) Len() int {
}
// Less is used to implement the interface of sort.Interface.
// The least one is placed to the top of the heap.
func (h *priorityQueueHeap) Less(i, j int) bool {
return h.array[i].priority < h.array[j].priority
}

View File

@ -23,8 +23,7 @@ func (t *Timer) loop() {
switch t.status.Val() {
case StatusRunning:
// Timer proceeding.
currentTimerTicks = t.ticks.Add(1)
if currentTimerTicks >= t.queue.LatestPriority() {
if currentTimerTicks = t.ticks.Add(1); currentTimerTicks >= t.queue.NextPriority() {
t.proceed(currentTimerTicks)
}
@ -40,7 +39,7 @@ func (t *Timer) loop() {
}()
}
// proceed proceeds the timer job checking and running logic.
// proceed function proceeds the timer job checking and running logic.
func (t *Timer) proceed(currentTimerTicks int64) {
var (
value interface{}
@ -51,9 +50,9 @@ func (t *Timer) proceed(currentTimerTicks int64) {
break
}
entry := value.(*Entry)
// It checks if it meets the ticks requirement.
// It checks if it meets the ticks' requirement.
if jobNextTicks := entry.nextTicks.Val(); currentTimerTicks < jobNextTicks {
// It push the job back if current ticks does not meet its running ticks requirement.
// It pushes the job back if current ticks does not meet its running ticks requirement.
t.queue.Push(entry, entry.nextTicks.Val())
break
}

View File

@ -46,3 +46,19 @@ func TestTimer_Proceed(t *testing.T) {
t.Assert(array.Len(), 2)
})
}
func TestTimer_PriorityQueue(t *testing.T) {
gtest.C(t, func(t *gtest.T) {
queue := newPriorityQueue()
queue.Push(1, 1)
queue.Push(4, 4)
queue.Push(5, 5)
queue.Push(2, 2)
queue.Push(3, 3)
t.Assert(queue.Pop(), 1)
t.Assert(queue.Pop(), 2)
t.Assert(queue.Pop(), 3)
t.Assert(queue.Pop(), 4)
t.Assert(queue.Pop(), 5)
})
}