Differentiating between waiting on dependencies and workers

This commit is contained in:
Laszlo Fogas 2019-07-09 16:23:56 +02:00
parent 3c50918eb5
commit bb941c8b83
5 changed files with 117 additions and 36 deletions

View file

@ -27,21 +27,23 @@ type worker struct {
type fifo struct {
sync.Mutex
workers map[*worker]struct{}
running map[string]*entry
pending *list.List
extension time.Duration
paused bool
workers map[*worker]struct{}
running map[string]*entry
pending *list.List
waitingOnDeps *list.List
extension time.Duration
paused bool
}
// New returns a new fifo queue.
func New() Queue {
return &fifo{
workers: map[*worker]struct{}{},
running: map[string]*entry{},
pending: list.New(),
extension: time.Minute * 10,
paused: false,
workers: map[*worker]struct{}{},
running: map[string]*entry{},
pending: list.New(),
waitingOnDeps: list.New(),
extension: time.Minute * 10,
paused: false,
}
}
@ -161,11 +163,15 @@ func (q *fifo) Info(c context.Context) InfoT {
stats := InfoT{}
stats.Stats.Workers = len(q.workers)
stats.Stats.Pending = q.pending.Len()
stats.Stats.WaitingOnDeps = q.waitingOnDeps.Len()
stats.Stats.Running = len(q.running)
for e := q.pending.Front(); e != nil; e = e.Next() {
stats.Pending = append(stats.Pending, e.Value.(*Task))
}
for e := q.waitingOnDeps.Front(); e != nil; e = e.Next() {
stats.WaitingOnDeps = append(stats.WaitingOnDeps, e.Value.(*Task))
}
for _, entry := range q.running {
stats.Running = append(stats.Running, entry.item)
}
@ -210,7 +216,7 @@ func (q *fifo) process() {
defer q.Unlock()
q.resubmitExpiredBuilds()
q.filterWaiting()
for pending, worker := q.assignToWorker(); pending != nil && worker != nil; pending, worker = q.assignToWorker() {
task := pending.Value.(*Task)
delete(q.workers, worker)
@ -224,16 +230,41 @@ func (q *fifo) process() {
}
}
func (q *fifo) filterWaiting() {
// resubmits all waiting tasks to pending, deps may have cleared
var nextWaiting *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = nextWaiting {
nextWaiting = e.Next()
task := e.Value.(*Task)
q.pending.PushBack(task)
}
// rebuild waitingDeps
q.waitingOnDeps = list.New()
filtered := []*list.Element{}
var nextPending *list.Element
for e := q.pending.Front(); e != nil; e = nextPending {
nextPending = e.Next()
task := e.Value.(*Task)
if q.depsInQueue(task) {
logrus.Debugf("queue: waiting due to unmet dependencies %v", task.ID)
q.waitingOnDeps.PushBack(task)
filtered = append(filtered, e)
}
}
// filter waiting tasks
for _, f := range filtered {
q.pending.Remove(f)
}
}
func (q *fifo) assignToWorker() (*list.Element, *worker) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
logrus.Debugf("queue: trying to assign task: %v with deps %v", task.ID, task.Dependencies)
if q.depsInQueue(task) {
logrus.Debugf("queue: skipping due to unmet dependencies %v", task.ID)
continue
}
for w := range q.workers {
if w.filter(task) {
@ -290,6 +321,7 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
}
}
}
for _, running := range q.running {
for _, dep := range running.item.Dependencies {
if taskID == dep {
@ -297,6 +329,17 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
}
}
}
var n *list.Element
for e := q.waitingOnDeps.Front(); e != nil; e = n {
next = e.Next()
waiting, ok := e.Value.(*Task)
for _, dep := range waiting.Dependencies {
if ok && taskID == dep {
waiting.DepStatus[dep] = success
}
}
}
}
func (q *fifo) removeFromPending(taskID string) {

View file

@ -131,8 +131,7 @@ func TestFifoDependencies(t *testing.T) {
}
q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task1)
q.PushAtOnce(noContext, []*Task{task2, task1})
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
@ -168,9 +167,7 @@ func TestFifoErrors(t *testing.T) {
}
q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task3)
q.Push(noContext, task1)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
@ -222,9 +219,7 @@ func TestFifoCancel(t *testing.T) {
}
q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task3)
q.Push(noContext, task1)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
_, _ = q.Poll(noContext, func(*Task) bool { return true })
q.Error(noContext, task1.ID, fmt.Errorf("cancelled"))
@ -251,7 +246,6 @@ func TestFifoPause(t *testing.T) {
wg.Done()
}()
q.Pause()
t0 := time.Now()
q.Push(noContext, task1)
@ -261,7 +255,7 @@ func TestFifoPause(t *testing.T) {
wg.Wait()
t1 := time.Now()
if t1.Sub(t0) < 20 * time.Millisecond {
if t1.Sub(t0) < 20*time.Millisecond {
t.Errorf("Should have waited til resume")
}
@ -284,6 +278,46 @@ func TestFifoPauseResume(t *testing.T) {
_, _ = q.Poll(noContext, func(*Task) bool { return true })
}
func TestWaitingVsPending(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
RunOn: []string{"success", "failure"},
}
q := New().(*fifo)
q.PushAtOnce(noContext, []*Task{task2, task3, task1})
got, _ := q.Poll(noContext, func(*Task) bool { return true })
info := q.Info(noContext)
if info.Stats.WaitingOnDeps != 2 {
t.Errorf("2 should wait on deps")
}
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
got, _ = q.Poll(noContext, func(*Task) bool { return true })
info = q.Info(noContext)
if info.Stats.WaitingOnDeps != 0 {
t.Errorf("0 should wait on deps")
}
if info.Stats.Pending != 1 {
t.Errorf("1 should wait for worker")
}
}
func TestShouldRun(t *testing.T) {
task := &Task{
ID: "2",

View file

@ -85,13 +85,15 @@ func runsOnSuccess(runsOn []string) bool {
// InfoT provides runtime information.
type InfoT struct {
Pending []*Task `json:"pending"`
Running []*Task `json:"running"`
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
Pending []*Task `json:"pending"`
WaitingOnDeps []*Task `json:"waiting_on_deps"`
Running []*Task `json:"running"`
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
} `json:"stats"`
Paused bool
}

View file

@ -27,6 +27,7 @@ func Test_QueueInfo(t *testing.T) {
"stats": {
"worker_count": 3,
"pending_count": 0,
"waiting_on_deps_count": 0,
"running_count": 1,
"completed_count": 0
},

View file

@ -150,10 +150,11 @@ type (
// Info provides queue stats.
Info struct {
Stats struct {
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
Workers int `json:"worker_count"`
Pending int `json:"pending_count"`
WaitingOnDeps int `json:"waiting_on_deps_count"`
Running int `json:"running_count"`
Complete int `json:"completed_count"`
} `json:"stats"`
Paused bool `json:"paused,omitempty"`
}