Handling canceled, skipped builds

This commit is contained in:
Laszlo Fogas 2019-06-19 08:36:13 +02:00
parent 1d47ba8a32
commit 69de8face1
5 changed files with 84 additions and 68 deletions

View file

@ -103,6 +103,8 @@ func (q *fifo) Error(c context.Context, id string, err error) error {
taskEntry.error = err
close(taskEntry.done)
delete(q.running, id)
} else {
q.removeFromPending(id)
}
q.Unlock()
return nil
@ -268,3 +270,17 @@ func (q *fifo) updateDepStatusInQueue(taskID string, success bool) {
}
}
}
func (q *fifo) removeFromPending(taskID string) {
logrus.Debugf("queue: trying to remove %s", taskID)
var next *list.Element
for e := q.pending.Front(); e != nil; e = next {
next = e.Next()
task := e.Value.(*Task)
if task.ID == taskID {
logrus.Debugf("queue: %s is removed from pending", taskID)
q.pending.Remove(e)
return
}
}
}

View file

@ -203,6 +203,41 @@ func TestFifoErrors(t *testing.T) {
}
}
func TestFifoCancel(t *testing.T) {
task1 := &Task{
ID: "1",
}
task2 := &Task{
ID: "2",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
}
task3 := &Task{
ID: "3",
Dependencies: []string{"1"},
DepStatus: make(map[string]bool),
RunOn: []string{"success", "failure"},
}
q := New().(*fifo)
q.Push(noContext, task2)
q.Push(noContext, task3)
q.Push(noContext, task1)
_, _ = q.Poll(noContext, func(*Task) bool { return true })
q.Error(noContext, task1.ID, fmt.Errorf("cancelled"))
q.Error(noContext, task2.ID, fmt.Errorf("cancelled"))
q.Error(noContext, task3.ID, fmt.Errorf("cancelled"))
info := q.Info(noContext)
if len(info.Pending) != 0 {
t.Errorf("All pipelines should be cancelled")
return
}
}
func TestShouldRun(t *testing.T) {
task := &Task{
ID: "2",

View file

@ -51,7 +51,7 @@ const (
// GitHub commit status.
func convertStatus(status string) string {
switch status {
case model.StatusPending, model.StatusRunning, model.StatusBlocked:
case model.StatusPending, model.StatusRunning, model.StatusBlocked, model.StatusSkipped:
return statusPending
case model.StatusFailure, model.StatusDeclined:
return statusFailure

View file

@ -156,13 +156,12 @@ func GetProcLogs(c *gin.Context) {
io.Copy(c.Writer, rc)
}
// DeleteBuild cancels a build
func DeleteBuild(c *gin.Context) {
repo := session.Repo(c)
// parse the build number and job sequence number from
// the repquest parameter.
// parse the build number from the request parameter.
num, _ := strconv.Atoi(c.Params.ByName("number"))
seq, _ := strconv.Atoi(c.Params.ByName("job"))
build, err := store.GetBuildNumber(c, repo, num)
if err != nil {
@ -170,27 +169,40 @@ func DeleteBuild(c *gin.Context) {
return
}
proc, err := store.FromContext(c).ProcFind(build, seq)
procs, err := store.FromContext(c).ProcList(build)
if err != nil {
c.AbortWithError(404, err)
return
}
if proc.State != model.StatusRunning {
cancelled := false
for _, proc := range procs {
if proc.PPID != 0 {
continue
}
if proc.State != model.StatusRunning && proc.State != model.StatusPending {
continue
}
proc.State = model.StatusKilled
proc.Stopped = time.Now().Unix()
if proc.Started == 0 {
proc.Started = proc.Stopped
}
proc.ExitCode = 137
// TODO cancel child procs
store.FromContext(c).ProcUpdate(proc)
Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel)
cancelled = true
}
if !cancelled {
c.String(400, "Cannot cancel a non-running build")
return
}
proc.State = model.StatusKilled
proc.Stopped = time.Now().Unix()
if proc.Started == 0 {
proc.Started = proc.Stopped
}
proc.ExitCode = 137
// TODO cancel child procs
store.FromContext(c).ProcUpdate(proc)
Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel)
c.String(204, "")
}

View file

@ -441,7 +441,11 @@ func (s *RPC) updateProcState(proc *model.Proc, state rpc.State) {
proc.Stopped = state.Finished
proc.Error = state.Error
proc.ExitCode = state.ExitCode
proc.State = model.StatusSuccess
if state.Started == 0 {
proc.State = model.StatusSkipped
} else {
proc.State = model.StatusSuccess
}
if proc.ExitCode != 0 || proc.Error != "" {
proc.State = model.StatusFailure
}
@ -522,21 +526,6 @@ func (s *RPC) notify(c context.Context, repo *model.Repo, build *model.Build, pr
s.pubsub.Publish(c, "topic/events", message)
}
func (s *RPC) checkCancelled(pipeline *rpc.Pipeline) (bool, error) {
pid, err := strconv.ParseInt(pipeline.ID, 10, 64)
if err != nil {
return false, err
}
proc, err := s.store.ProcLoad(pid)
if err != nil {
return false, err
}
if proc.State == model.StatusKilled {
return true, nil
}
return false, err
}
func createFilterFunc(filter rpc.Filter) (queue.Filter, error) {
var st *expr.Selector
var err error
@ -606,42 +595,6 @@ func (s *DroneServer) Next(c oldcontext.Context, req *proto.NextRequest) (*proto
res.Pipeline.Payload, _ = json.Marshal(pipeline.Config)
return res, err
// fn := func(task *queue.Task) bool {
// for k, v := range req.GetFilter().Labels {
// if task.Labels[k] != v {
// return false
// }
// }
// return true
// }
// task, err := s.Queue.Poll(c, fn)
// if err != nil {
// return nil, err
// } else if task == nil {
// return nil, nil
// }
//
// pipeline := new(rpc.Pipeline)
// json.Unmarshal(task.Data, pipeline)
//
// res := new(proto.NextReply)
// res.Pipeline = new(proto.Pipeline)
// res.Pipeline.Id = pipeline.ID
// res.Pipeline.Timeout = pipeline.Timeout
// res.Pipeline.Payload, _ = json.Marshal(pipeline.Config)
//
// // check if the process was previously cancelled
// // cancelled, _ := s.checkCancelled(pipeline)
// // if cancelled {
// // logrus.Debugf("ignore pid %v: cancelled by user", pipeline.ID)
// // if derr := s.queue.Done(c, pipeline.ID); derr != nil {
// // logrus.Errorf("error: done: cannot ack proc_id %v: %s", pipeline.ID, err)
// // }
// // return nil, nil
// // }
//
// return res, nil
}
func (s *DroneServer) Init(c oldcontext.Context, req *proto.InitRequest) (*proto.Empty, error) {