Tasks should not run on error, unless specified

This commit is contained in:
Laszlo Fogas 2019-06-16 15:26:45 +02:00
parent 6a2c3f129c
commit a37866179f
3 changed files with 111 additions and 47 deletions

View file

@ -97,10 +97,11 @@ func (q *fifo) Done(c context.Context, id string) error {
// Error signals that the item is done executing with error.
func (q *fifo) Error(c context.Context, id string, err error) error {
q.Lock()
state, ok := q.running[id]
taskEntry, ok := q.running[id]
if ok {
state.error = err
close(state.done)
q.updateDepStatusInQueue(id, err == nil)
taskEntry.error = err
close(taskEntry.done)
delete(q.running, id)
}
q.Unlock()
@ -247,3 +248,23 @@ func (q *fifo) depsInQueue(task *Task) bool {
}
return false
}
func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
pending, ok := e.Value.(*Task)
for _, dep := range pending.Dependencies {
if ok && taskID == dep {
pending.DepStatus[dep] = success
}
}
}
for _, running := range q.running {
for _, dep := range running.item.Dependencies {
if taskID == dep {
running.item.DepStatus[dep] = success
}
}
}
}

View file

@ -2,6 +2,7 @@ package queue
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -126,6 +127,7 @@ func TestFifoDependencies(t *testing.T) {
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
}
q := New().(*fifo)
@ -146,3 +148,57 @@ func TestFifoDependencies(t *testing.T) {
return
}
}
func TestFifoErrors(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
RunOn: []string{"success", "failure"},
}
q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task3)
q.Push(noContext, task1)
got, _ := q.Poll(noContext, func(*Task) bool { return true })
if got != task1 {
t.Errorf("expect task1 returned from queue as task2 depends on it")
return
}
q.Error(noContext, got.ID, fmt.Errorf("exitcode 1, there was an error"))
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task2 {
t.Errorf("expect task2 returned from queue")
return
}
if got.ShouldRun() {
t.Errorf("expect task2 should not run, since task1 failed")
return
}
got, _ = q.Poll(noContext, func(*Task) bool { return true })
if got != task3 {
t.Errorf("expect task3 returned from queue")
return
}
if !got.ShouldRun() {
t.Errorf("expect task3 should run, task1 failed, but task3 runs on failure too")
return
}
}

View file

@ -24,8 +24,38 @@ type Task struct {
// Labels represents the key-value pairs the entry is lebeled with.
Labels map[string]string `json:"labels,omitempty"`
// Task IDs this task depend on
// Task IDs this task depend
Dependencies []string
// If dep finished sucessfully
DepStatus map[string]bool
// RunOn failure or success
RunOn []string
}
// ShouldRun tells if a task should be run or skipped, based on dependencies
func (t *Task) ShouldRun() bool {
if runsOnFailure(t.RunOn) {
return true
}
for _, success := range t.DepStatus {
if !success {
return false
}
}
return true
}
func runsOnFailure(runsOn []string) bool {
for _, status := range runsOn {
if status == "failure" {
return true
}
}
return false
}
// InfoT provides runtime information.
@ -74,46 +104,3 @@ type Queue interface {
// Info returns internal queue information.
Info(c context.Context) InfoT
}
// // global instance of the queue.
// var global = New()
//
// // Set sets the global queue.
// func Set(queue Queue) {
// global = queue
// }
//
// // Push pushes an task to the tail of the global queue.
// func Push(c context.Context, task *Task) error {
// return global.Push(c, task)
// }
//
// // Poll retrieves and removes a task head of the global queue.
// func Poll(c context.Context, f Filter) (*Task, error) {
// return global.Poll(c, f)
// }
//
// // Extend extends the deadline for a task.
// func Extend(c context.Context, id string) error {
// return global.Extend(c, id)
// }
//
// // Done signals the task is complete.
// func Done(c context.Context, id string) error {
// return global.Done(c, id)
// }
//
// // Error signals the task is complete with errors.
// func Error(c context.Context, id string, err error) {
// global.Error(c, id, err)
// }
//
// // Wait waits until the task is complete.
// func Wait(c context.Context, id string) error {
// return global.Wait(c, id)
// }
//
// // Info returns internal queue information.
// func Info(c context.Context) InfoT {
// return global.Info(c)
// }