From bb7453262ac4b3728322972a11219511471bb9a4 Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Sun, 5 Mar 2017 22:05:16 +1100 Subject: [PATCH] implement cancel function --- drone/agent/exp.go | 11 +++-- server/build.go | 29 +++++++----- server/rpc.go | 10 ++-- .../cncd/pipeline/pipeline/rpc/client.go | 22 ++++++--- .../cncd/pipeline/pipeline/rpc/peer.go | 11 +++-- .../cncd/pipeline/pipeline/rpc/server.go | 25 +++++++--- vendor/vendor.json | 46 +++++++++---------- 7 files changed, 95 insertions(+), 59 deletions(-) diff --git a/drone/agent/exp.go b/drone/agent/exp.go index 57e252cc7..39f2fde4c 100644 --- a/drone/agent/exp.go +++ b/drone/agent/exp.go @@ -103,10 +103,9 @@ func run(ctx context.Context, client rpc.Peer) error { cancelled := abool.New() go func() { - ok, _ := client.Notify(ctx, work.ID) - if ok { + if err := client.Wait(ctx, work.ID); err != nil { cancelled.SetTo(true) - log.Printf("pipeline: cancel signal received: %s", work.ID) + log.Printf("pipeline: cancel signal received: %s: %s", work.ID, err) cancel() } else { log.Printf("pipeline: cancel channel closed: %s", work.ID) @@ -133,16 +132,19 @@ func run(ctx context.Context, client rpc.Peer) error { log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err) } + var uploads sync.WaitGroup defaultLogger := pipeline.LogFunc(func(proc *backend.Step, rc multipart.Reader) error { part, rerr := rc.NextPart() if rerr != nil { return rerr } + uploads.Add(1) writer := rpc.NewLineWriter(client, work.ID, proc.Alias) io.Copy(writer, part) defer func() { log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias) + uploads.Done() }() part, rerr = rc.NextPart() @@ -174,7 +176,7 @@ func run(ctx context.Context, client rpc.Peer) error { state.ExitCode = xerr.Code } if cancelled.IsSet() { - state.ExitCode = 130 + state.ExitCode = 137 } else if state.ExitCode == 0 { state.ExitCode = 1 } @@ -182,6 +184,7 @@ func run(ctx context.Context, client rpc.Peer) error { log.Printf("pipeline: execution complete: %s", work.ID) + uploads.Wait() err = client.Update(context.Background(), work.ID, state) if err != nil { log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err) diff --git a/server/build.go b/server/build.go index e6f71361a..c78f8b2d8 100644 --- a/server/build.go +++ b/server/build.go @@ -2,12 +2,16 @@ package server import ( "bufio" + "context" + "fmt" "io" "net/http" + "os" "strconv" "time" log "github.com/Sirupsen/logrus" + "github.com/cncd/queue" "github.com/drone/drone/remote" "github.com/drone/drone/shared/httputil" "github.com/drone/drone/store" @@ -149,14 +153,17 @@ func DeleteBuild(c *gin.Context) { job.ExitCode = 137 store.UpdateBuildJob(c, build, job) - client := stomp.MustFromContext(c) - client.SendJSON("/topic/cancel", model.Event{ - Type: model.Cancelled, - Repo: *repo, - Build: *build, - Job: *job, - }, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10))) - + if os.Getenv("DRONE_CANARY") == "" { + client := stomp.MustFromContext(c) + client.SendJSON("/topic/cancel", model.Event{ + Type: model.Cancelled, + Repo: *repo, + Build: *build, + Job: *job, + }, stomp.WithHeader("job-id", strconv.FormatInt(job.ID, 10))) + } else { + config.queue.Error(context.Background(), fmt.Sprint(job.ID), queue.ErrCancel) + } c.String(204, "") } @@ -197,8 +204,8 @@ func PostBuild(c *gin.Context) { } // fetch the .drone.yml file from the database - config := ToConfig(c) - raw, err := remote_.File(user, repo, build, config.Yaml) + cfg := ToConfig(c) + raw, err := remote_.File(user, repo, build, cfg.Yaml) if err != nil { log.Errorf("failure to get build config for %s. %s", repo.FullName, err) c.AbortWithError(404, err) @@ -206,7 +213,7 @@ func PostBuild(c *gin.Context) { } // Fetch secrets file but don't exit on error as it's optional - sec, err := remote_.File(user, repo, build, config.Shasum) + sec, err := remote_.File(user, repo, build, cfg.Shasum) if err != nil { log.Debugf("cannot find build secrets for %s. %s", repo.FullName, err) } diff --git a/server/rpc.go b/server/rpc.go index f15107d54..26876bc66 100644 --- a/server/rpc.go +++ b/server/rpc.go @@ -91,10 +91,9 @@ func (s *RPC) Next(c context.Context) (*rpc.Pipeline, error) { return pipeline, err } -// Notify implements the rpc.Notify function -func (s *RPC) Notify(c context.Context, id string) (bool, error) { - err := s.queue.Wait(c, id) - return (err == queue.ErrCancel), nil +// Wait implements the rpc.Wait function +func (s *RPC) Wait(c context.Context, id string) error { + return s.queue.Wait(c, id) } // Extend implements the rpc.Extend function @@ -211,6 +210,9 @@ func (s *RPC) Update(c context.Context, id string, state rpc.State) error { // Save implements the rpc.Save function func (s *RPC) Save(c context.Context, id, mime string, file io.Reader) error { return nil } +// Done implements the rpc.Done function +func (s *RPC) Done(c context.Context, id string) error { return nil } + // Log implements the rpc.Log function func (s *RPC) Log(c context.Context, id string, line *rpc.Line) error { entry := new(logging.Entry) diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go index 35fd1bda0..b6f9b5403 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/client.go @@ -17,11 +17,12 @@ import ( const ( methodNext = "next" - methodNotify = "notify" + methodWait = "wait" + methodDone = "done" methodExtend = "extend" methodUpdate = "update" - methodLog = "log" methodSave = "save" + methodLog = "log" ) type ( @@ -80,11 +81,18 @@ func (t *Client) Next(c context.Context) (*Pipeline, error) { return res, err } -// Notify returns true if the pipeline should be cancelled. -func (t *Client) Notify(c context.Context, id string) (bool, error) { - out := false - err := t.call(c, methodNotify, id, &out) - return out, err +// Wait blocks until the pipeline is complete. +func (t *Client) Wait(c context.Context, id string) error { + // err := t.call(c, methodWait, id, nil) + // if err != nil && err.Error() == ErrCancelled.Error() { + // return ErrCancelled + // } + return t.call(c, methodWait, id, nil) +} + +// Done signals the pipeline is complete. +func (t *Client) Done(c context.Context, id string) error { + return t.call(c, methodDone, id, nil) } // Extend extends the pipeline deadline. diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go index 414591f81..4636f1d98 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/peer.go @@ -7,6 +7,9 @@ import ( "github.com/cncd/pipeline/pipeline/backend" ) +// ErrCancelled signals the pipeine is cancelled. +// var ErrCancelled = errors.New("cancelled") + type ( // Filter defines filters for fetching items from the queue. Filter struct { @@ -36,9 +39,11 @@ type Peer interface { // Next returns the next pipeline in the queue. Next(c context.Context) (*Pipeline, error) - // Notify returns true if the pipeline should be cancelled. - // TODO: rename to Done, Wait? - Notify(c context.Context, id string) (bool, error) + // Wait blocks untilthe pipeline is complete. + Wait(c context.Context, id string) error + + // Done signals the pipeline is complete. + Done(c context.Context, id string) error // Extend extends the pipeline deadline Extend(c context.Context, id string) error diff --git a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go index 242c4ee9a..21e4507cb 100644 --- a/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go +++ b/vendor/github.com/cncd/pipeline/pipeline/rpc/server.go @@ -52,8 +52,10 @@ func (s *Server) router(ctx context.Context, conn *jsonrpc2.Conn, req *jsonrpc2. switch req.Method { case methodNext: return s.next(ctx, req) - case methodNotify: - return s.notify(ctx, req) + case methodWait: + return s.wait(ctx, req) + case methodDone: + return s.done(ctx, req) case methodExtend: return s.extend(ctx, req) case methodUpdate: @@ -73,15 +75,26 @@ func (s *Server) next(ctx context.Context, req *jsonrpc2.Request) (interface{}, return s.peer.Next(ctx) } -// notify unmarshals the rpc request parameters and invokes the peer.Notify +// wait unmarshals the rpc request parameters and invokes the peer.Wait // procedure. The results are retuned and written to the rpc response. -func (s *Server) notify(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { +func (s *Server) wait(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { var id string err := json.Unmarshal([]byte(*req.Params), &id) if err != nil { return nil, err } - return s.peer.Notify(ctx, id) + return nil, s.peer.Wait(ctx, id) +} + +// done unmarshals the rpc request parameters and invokes the peer.Done +// procedure. The results are retuned and written to the rpc response. +func (s *Server) done(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) { + var id string + err := json.Unmarshal([]byte(*req.Params), &id) + if err != nil { + return nil, err + } + return nil, s.peer.Done(ctx, id) } // extend unmarshals the rpc request parameters and invokes the peer.Extend @@ -115,8 +128,6 @@ func (s *Server) log(req *jsonrpc2.Request) (interface{}, error) { return nil, s.peer.Log(noContext, in.ID, in.Line) } -// save unmarshals the rpc request parameters and invokes the peer.Save -// procedure. The results are retuned and written to the rpc response. func (s *Server) save(req *jsonrpc2.Request) (interface{}, error) { in := new(saveReq) if err := json.Unmarshal([]byte(*req.Params), in); err != nil { diff --git a/vendor/vendor.json b/vendor/vendor.json index dabb2e53e..b46145009 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -33,68 +33,68 @@ { "checksumSHA1": "W3AuK8ocqHwlUajGmQLFvnRhTZE=", "path": "github.com/cncd/pipeline/pipeline", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "PSzh0ix/rlMrS/Cl3aH6GHGrJuo=", "path": "github.com/cncd/pipeline/pipeline/backend", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "0CGXRaYwZhJxGIrGhn8WGpkFqPo=", "path": "github.com/cncd/pipeline/pipeline/backend/docker", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "uUagpzha5ah/a3RO6IImvzHYFlY=", "path": "github.com/cncd/pipeline/pipeline/frontend", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "O0sulBQAHJeNLg3lO38Cq5uf/eg=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "e1lZWQdObXCKWqZOGlOeaeERQMc=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/compiler", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "Q0GkNUFamVYIA1Fd8r0A5M6Gx54=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/linter", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "kx2sPUIMozPC/g6E4w48h3FfH3k=", "path": "github.com/cncd/pipeline/pipeline/frontend/yaml/matrix", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "2/3f3oNmxXy5kcrRLCFa24Oc9O4=", "path": "github.com/cncd/pipeline/pipeline/interrupt", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "8eTwXZPM/Kp9uE/mnhpWDTiX7nY=", "path": "github.com/cncd/pipeline/pipeline/multipart", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { - "checksumSHA1": "5axmtZsHaQ5uE/tuNQZygquNx8U=", + "checksumSHA1": "UUmeGDBdpk+UXtexFnNmbWIHgG8=", "path": "github.com/cncd/pipeline/pipeline/rpc", - "revision": "b5457789534af0da2a05bd9fd0c0afe6fea391c6", - "revisionTime": "2017-03-04T04:47:59Z" + "revision": "d4e09fd3021a16408bc3ebdd3500efd28f51e72c", + "revisionTime": "2017-03-05T09:53:47Z" }, { "checksumSHA1": "7Qj1DK0ceAXkYztW0l3+L6sn+V8=",