diff --git a/cncd/queue/fifo.go b/cncd/queue/fifo.go index 18cf328f5..f6c064b2a 100644 --- a/cncd/queue/fifo.go +++ b/cncd/queue/fifo.go @@ -99,41 +99,57 @@ func (q *fifo) Poll(c context.Context, f Filter) (*Task, error) { // Done signals that the item is done executing. func (q *fifo) Done(c context.Context, id string, exitStatus string) error { - return q.finished(id, exitStatus, nil) + return q.finished([]string{id}, exitStatus, nil) } // Error signals that the item is done executing with error. func (q *fifo) Error(c context.Context, id string, err error) error { + return q.finished([]string{id}, StatusFailure, err) +} + +// Error signals that the item is done executing with error. +func (q *fifo) ErrorAtOnce(c context.Context, id []string, err error) error { return q.finished(id, StatusFailure, err) } -func (q *fifo) finished(id string, exitStatus string, err error) error { +func (q *fifo) finished(ids []string, exitStatus string, err error) error { q.Lock() - taskEntry, ok := q.running[id] - if ok { - taskEntry.error = err - close(taskEntry.done) - delete(q.running, id) - } else { - q.removeFromPending(id) + + for _, id := range ids { + taskEntry, ok := q.running[id] + if ok { + taskEntry.error = err + close(taskEntry.done) + delete(q.running, id) + } else { + q.removeFromPending(id) + } + q.updateDepStatusInQueue(id, exitStatus) } - q.updateDepStatusInQueue(id, exitStatus) + q.Unlock() return nil } // Evict removes a pending task from the queue. func (q *fifo) Evict(c context.Context, id string) error { + return q.EvictAtOnce(c, []string{id}) +} + +// Evict removes a pending task from the queue. +func (q *fifo) EvictAtOnce(c context.Context, ids []string) error { q.Lock() defer q.Unlock() - var next *list.Element - for e := q.pending.Front(); e != nil; e = next { - next = e.Next() - task, ok := e.Value.(*Task) - if ok && task.ID == id { - q.pending.Remove(e) - return nil + for _, id := range ids { + var next *list.Element + for e := q.pending.Front(); e != nil; e = next { + next = e.Next() + task, ok := e.Value.(*Task) + if ok && task.ID == id { + q.pending.Remove(e) + return nil + } } } return ErrNotFound diff --git a/cncd/queue/queue.go b/cncd/queue/queue.go index 67ffe3ee5..29bccb20e 100644 --- a/cncd/queue/queue.go +++ b/cncd/queue/queue.go @@ -123,9 +123,15 @@ type Queue interface { // Error signals the task is complete with errors. Error(c context.Context, id string, err error) error + // Error signals the task is complete with errors. + ErrorAtOnce(c context.Context, id []string, err error) error + // Evict removes a pending task from the queue. Evict(c context.Context, id string) error + // Evict removes a pending task from the queue. + EvictAtOnce(c context.Context, id []string) error + // Wait waits until the task is complete. Wait(c context.Context, id string) error diff --git a/model/queue.go b/model/queue.go index 6e5148692..d47e7a034 100644 --- a/model/queue.go +++ b/model/queue.go @@ -119,3 +119,14 @@ func (q *persistentQueue) Evict(c context.Context, id string) error { } return err } + +// Evict removes a pending task from the queue. +func (q *persistentQueue) EvictAtOnce(c context.Context, ids []string) error { + err := q.Queue.EvictAtOnce(c, ids) + if err == nil { + for _, id := range ids { + q.store.TaskDelete(id) + } + } + return err +} diff --git a/router/router.go b/router/router.go index 82bcaab8e..7e2125270 100644 --- a/router/router.go +++ b/router/router.go @@ -113,7 +113,7 @@ func Load(mux *httptreemux.ContextMux, middleware ...gin.HandlerFunc) http.Handl repo.POST("/move", session.MustRepoAdmin(), server.MoveRepo) repo.POST("/builds/:number", session.MustPush, server.PostBuild) - repo.DELETE("/builds/:number", session.MustRepoAdmin(), server.ZombieKill) + repo.DELETE("/builds/:number", session.MustPush, server.DeleteBuild) repo.POST("/builds/:number/approve", session.MustPush, server.PostApproval) repo.POST("/builds/:number/decline", session.MustPush, server.PostDecline) repo.DELETE("/builds/:number/:job", session.MustPush, server.DeleteBuild) diff --git a/server/build.go b/server/build.go index a1501484d..b650dbcca 100644 --- a/server/build.go +++ b/server/build.go @@ -160,8 +160,6 @@ func GetProcLogs(c *gin.Context) { // DeleteBuild cancels a build func DeleteBuild(c *gin.Context) { repo := session.Repo(c) - - // parse the build number from the request parameter. num, _ := strconv.Atoi(c.Params.ByName("number")) build, err := store.GetBuildNumber(c, repo, num) @@ -176,76 +174,63 @@ func DeleteBuild(c *gin.Context) { return } - cancelled := false + if build.Status != model.StatusRunning && build.Status != model.StatusPending { + c.String(400, "Cannot cancel a non-running or non-pending build") + return + } + + // First cancel/evict procs in the queue in one go + procToCancel := []string{} + procToEvict := []string{} for _, proc := range procs { if proc.PPID != 0 { continue } - - if proc.State != model.StatusRunning && proc.State != model.StatusPending { - continue + if proc.State == model.StatusRunning { + procToCancel = append(procToCancel, fmt.Sprint(proc.ID)) } - - // TODO cancel child procs - if _, err = UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil { - log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err) + if proc.State == model.StatusPending { + procToEvict = append(procToEvict, fmt.Sprint(proc.ID)) } - - Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel) - cancelled = true - } - - if !cancelled { - c.String(400, "Cannot cancel a non-running build") - return - } - - c.String(204, "") -} - -// ZombieKill kills zombie processes stuck in an infinite pending -// or running state. This can only be invoked by administrators and -// may have negative effects. -func ZombieKill(c *gin.Context) { - repo := session.Repo(c) - - // parse the build number and job sequence number from - // the repquest parameter. - num, _ := strconv.Atoi(c.Params.ByName("number")) - - build, err := store.GetBuildNumber(c, repo, num) - if err != nil { - c.AbortWithError(404, err) - return - } - - procs, err := store.FromContext(c).ProcList(build) - if err != nil { - c.AbortWithError(404, err) - return - } - - if build.Status != model.StatusRunning { - c.String(400, "Cannot force cancel a non-running build") - return } + Config.Services.Queue.EvictAtOnce(context.Background(), procToEvict) + Config.Services.Queue.ErrorAtOnce(context.Background(), procToEvict, queue.ErrCancel) + Config.Services.Queue.ErrorAtOnce(context.Background(), procToCancel, queue.ErrCancel) + // Then update the DB status for pending builds + // Running ones will be set when the agents stop on the cancel signal for _, proc := range procs { - if proc.Running() { - if _, err := UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil { - log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err) + if proc.State == model.StatusPending { + if proc.PPID != 0 { + if _, err = UpdateProcToStatusSkipped(store.FromContext(c), *proc, 0); err != nil { + log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err) + } + } else { + if _, err = UpdateProcToStatusKilled(store.FromContext(c), *proc); err != nil { + log.Printf("error: done: cannot update proc_id %d state: %s", proc.ID, err) + } } - } else { - store.FromContext(c).ProcUpdate(proc) } - Config.Services.Queue.Error(context.Background(), fmt.Sprint(proc.ID), queue.ErrCancel) } - if _, err := UpdateToStatusKilled(store.FromContext(c), *build); err != nil { + killedBuild, err := UpdateToStatusKilled(store.FromContext(c), *build) + if err != nil { c.AbortWithError(500, err) return } + // For pending builds, we stream the UI the latest state. + // For running builds, the UI will be updated when the agents acknowledge the cancel + if build.Status == model.StatusPending { + procs, err = store.FromContext(c).ProcList(killedBuild) + if err != nil { + c.AbortWithError(404, err) + return + } + killedBuild.Procs = model.Tree(procs) + publishToTopic(c, killedBuild, repo, model.Cancelled) + } + c.String(204, "") } @@ -353,7 +338,7 @@ func PostApproval(c *gin.Context) { } }() - publishToTopic(c, build, repo) + publishToTopic(c, build, repo, model.Enqueued) queueBuild(build, repo, buildItems) } @@ -557,7 +542,7 @@ func PostBuild(c *gin.Context) { } c.JSON(202, build) - publishToTopic(c, build, repo) + publishToTopic(c, build, repo, model.Enqueued) queueBuild(build, repo, buildItems) } diff --git a/server/hook.go b/server/hook.go index fdbe8f8dd..eee048430 100644 --- a/server/hook.go +++ b/server/hook.go @@ -285,7 +285,7 @@ func PostHook(c *gin.Context) { } }() - publishToTopic(c, build, repo) + publishToTopic(c, build, repo, model.Enqueued) queueBuild(build, repo, buildItems) } @@ -360,7 +360,7 @@ func findOrPersistPipelineConfig(build *model.Build, remoteYamlConfig *remote.Fi } // publishes message to UI clients -func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo) { +func publishToTopic(c *gin.Context, build *model.Build, repo *model.Repo, event model.EventType) { message := pubsub.Message{ Labels: map[string]string{ "repo": repo.FullName,