From f2c1d46f9ed461a2c5a85b676c1aab3ebb7cf6bc Mon Sep 17 00:00:00 2001 From: Brad Rydzewski Date: Mon, 26 Sep 2016 00:39:28 -0500 Subject: [PATCH] queue integrated with server, but not agent --- bus/bus.go | 37 ++- drone/server.go | 1 + queue/queue_impl_test.go | 183 ++++++------ router/middleware/broker.go | 52 ++++ router/router.go | 24 +- server/broker.go | 13 + server/build.go | 29 +- server/hook.go | 20 +- server/queue.go | 552 +++++++++++++++++++++--------------- server/stream.go | 212 ++++---------- yaml/label.go | 26 ++ yaml/label_test.go | 32 +++ 12 files changed, 658 insertions(+), 523 deletions(-) create mode 100644 router/middleware/broker.go create mode 100644 server/broker.go create mode 100644 yaml/label.go create mode 100644 yaml/label_test.go diff --git a/bus/bus.go b/bus/bus.go index 7bb39c534..5f858b954 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -2,8 +2,6 @@ package bus //go:generate mockery -name Bus -output mock -case=underscore -import "golang.org/x/net/context" - // Bus represents an event bus implementation that // allows a publisher to broadcast Event notifications // to a list of subscribers. @@ -21,20 +19,21 @@ type Bus interface { Unsubscribe(chan *Event) } -// Publish broadcasts an event to all subscribers. -func Publish(c context.Context, event *Event) { - FromContext(c).Publish(event) -} - -// Subscribe adds the channel to the list of -// subscribers. Each subscriber in the list will -// receive broadcast events. -func Subscribe(c context.Context, eventc chan *Event) { - FromContext(c).Subscribe(eventc) -} - -// Unsubscribe removes the channel from the -// list of subscribers. -func Unsubscribe(c context.Context, eventc chan *Event) { - FromContext(c).Unsubscribe(eventc) -} +// +// // Publish broadcasts an event to all subscribers. +// func Publish(c context.Context, event *Event) { +// FromContext(c).Publish(event) +// } +// +// // Subscribe adds the channel to the list of +// // subscribers. Each subscriber in the list will +// // receive broadcast events. +// func Subscribe(c context.Context, eventc chan *Event) { +// FromContext(c).Subscribe(eventc) +// } +// +// // Unsubscribe removes the channel from the +// // list of subscribers. +// func Unsubscribe(c context.Context, eventc chan *Event) { +// FromContext(c).Unsubscribe(eventc) +// } diff --git a/drone/server.go b/drone/server.go index f930ae373..390642fbc 100644 --- a/drone/server.go +++ b/drone/server.go @@ -295,6 +295,7 @@ func server(c *cli.Context) error { middleware.Store(c), middleware.Remote(c), middleware.Agents(c), + middleware.Broker(c), ) // start the server with tls enabled diff --git a/queue/queue_impl_test.go b/queue/queue_impl_test.go index 778576232..45f38bff6 100644 --- a/queue/queue_impl_test.go +++ b/queue/queue_impl_test.go @@ -1,93 +1,94 @@ package queue -import ( - "sync" - "testing" - - . "github.com/franela/goblin" - "github.com/gin-gonic/gin" -) - -func TestBuild(t *testing.T) { - g := Goblin(t) - g.Describe("Queue", func() { - - g.It("Should publish item", func() { - c := new(gin.Context) - q := newQueue() - ToContext(c, q) - - w1 := &Work{} - w2 := &Work{} - Publish(c, w1) - Publish(c, w2) - g.Assert(len(q.items)).Equal(2) - g.Assert(len(q.itemc)).Equal(2) - }) - - g.It("Should remove item", func() { - c := new(gin.Context) - q := newQueue() - ToContext(c, q) - - w1 := &Work{} - w2 := &Work{} - w3 := &Work{} - Publish(c, w1) - Publish(c, w2) - Publish(c, w3) - Remove(c, w2) - g.Assert(len(q.items)).Equal(2) - g.Assert(len(q.itemc)).Equal(2) - - g.Assert(Pull(c)).Equal(w1) - g.Assert(Pull(c)).Equal(w3) - g.Assert(Remove(c, w2)).Equal(ErrNotFound) - }) - - g.It("Should pull item", func() { - c := new(gin.Context) - q := New() - ToContext(c, q) - - cn := new(closeNotifier) - cn.closec = make(chan bool, 1) - w1 := &Work{} - w2 := &Work{} - - Publish(c, w1) - g.Assert(Pull(c)).Equal(w1) - - Publish(c, w2) - g.Assert(PullClose(c, cn)).Equal(w2) - }) - - g.It("Should cancel pulling item", func() { - c := new(gin.Context) - q := New() - ToContext(c, q) - - cn := new(closeNotifier) - cn.closec = make(chan bool, 1) - var wg sync.WaitGroup - go func() { - wg.Add(1) - g.Assert(PullClose(c, cn) == nil).IsTrue() - wg.Done() - }() - go func() { - cn.closec <- true - }() - wg.Wait() - - }) - }) -} - -type closeNotifier struct { - closec chan bool -} - -func (c *closeNotifier) CloseNotify() <-chan bool { - return c.closec -} +// +// import ( +// "sync" +// "testing" +// +// . "github.com/franela/goblin" +// "github.com/gin-gonic/gin" +// ) +// +// func TestBuild(t *testing.T) { +// g := Goblin(t) +// g.Describe("Queue", func() { +// +// g.It("Should publish item", func() { +// c := new(gin.Context) +// q := newQueue() +// ToContext(c, q) +// +// w1 := &Work{} +// w2 := &Work{} +// Publish(c, w1) +// Publish(c, w2) +// g.Assert(len(q.items)).Equal(2) +// g.Assert(len(q.itemc)).Equal(2) +// }) +// +// g.It("Should remove item", func() { +// c := new(gin.Context) +// q := newQueue() +// ToContext(c, q) +// +// w1 := &Work{} +// w2 := &Work{} +// w3 := &Work{} +// Publish(c, w1) +// Publish(c, w2) +// Publish(c, w3) +// Remove(c, w2) +// g.Assert(len(q.items)).Equal(2) +// g.Assert(len(q.itemc)).Equal(2) +// +// g.Assert(Pull(c)).Equal(w1) +// g.Assert(Pull(c)).Equal(w3) +// g.Assert(Remove(c, w2)).Equal(ErrNotFound) +// }) +// +// g.It("Should pull item", func() { +// c := new(gin.Context) +// q := New() +// ToContext(c, q) +// +// cn := new(closeNotifier) +// cn.closec = make(chan bool, 1) +// w1 := &Work{} +// w2 := &Work{} +// +// Publish(c, w1) +// g.Assert(Pull(c)).Equal(w1) +// +// Publish(c, w2) +// g.Assert(PullClose(c, cn)).Equal(w2) +// }) +// +// g.It("Should cancel pulling item", func() { +// c := new(gin.Context) +// q := New() +// ToContext(c, q) +// +// cn := new(closeNotifier) +// cn.closec = make(chan bool, 1) +// var wg sync.WaitGroup +// go func() { +// wg.Add(1) +// g.Assert(PullClose(c, cn) == nil).IsTrue() +// wg.Done() +// }() +// go func() { +// cn.closec <- true +// }() +// wg.Wait() +// +// }) +// }) +// } +// +// type closeNotifier struct { +// closec chan bool +// } +// +// func (c *closeNotifier) CloseNotify() <-chan bool { +// return c.closec +// } diff --git a/router/middleware/broker.go b/router/middleware/broker.go new file mode 100644 index 000000000..2222df510 --- /dev/null +++ b/router/middleware/broker.go @@ -0,0 +1,52 @@ +package middleware + +import ( + "sync" + + handlers "github.com/drone/drone/server" + + "github.com/codegangsta/cli" + "github.com/drone/mq/server" + "github.com/drone/mq/stomp" + + "github.com/Sirupsen/logrus" + "github.com/gin-gonic/gin" +) + +const ( + serverKey = "broker" + clientKey = "stomp.client" // mirrored from stomp/context +) + +// Broker is a middleware function that initializes the broker +// and adds the broker client to the request context. +func Broker(cli *cli.Context) gin.HandlerFunc { + secret := cli.String("agent-secret") + if secret == "" { + logrus.Fatalf("failed to generate token from DRONE_SECRET") + } + + broker := server.NewServer( + server.WithCredentials("x-token", secret), + ) + client := broker.Client() + + var once sync.Once + return func(c *gin.Context) { + c.Set(serverKey, broker) + c.Set(clientKey, client) + once.Do(func() { + // this is some really hacky stuff + // turns out I need to do some refactoring + // don't judge! + // will fix in 0.6 release + ctx := c.Copy() + client.Connect( + stomp.WithCredentials("x-token", secret), + ) + client.Subscribe("/queue/updates", stomp.HandlerFunc(func(m *stomp.Message) { + go handlers.HandleUpdate(ctx, m.Copy()) + })) + }) + } +} diff --git a/router/router.go b/router/router.go index 912ae2736..ce3ac4936 100644 --- a/router/router.go +++ b/router/router.go @@ -113,17 +113,9 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { e.POST("/hook", server.PostHook) e.POST("/api/hook", server.PostHook) - stream := e.Group("/api/stream") - { - stream.Use(session.SetRepo()) - stream.Use(session.SetPerm()) - stream.Use(session.MustPull) - - stream.GET("/:owner/:name", server.GetRepoEvents) - stream.GET("/:owner/:name/:build/:number", server.GetStream) - } ws := e.Group("/ws") { + ws.GET("/broker", server.Broker) ws.GET("/feed", server.EventStream) ws.GET("/logs/:owner/:name/:build/:number", session.SetRepo(), @@ -152,20 +144,6 @@ func Load(middleware ...gin.HandlerFunc) http.Handler { agents.GET("", server.GetAgents) } - queue := e.Group("/api/queue") - { - queue.Use(session.AuthorizeAgent) - queue.POST("/pull", server.Pull) - queue.POST("/pull/:os/:arch", server.Pull) - queue.POST("/wait/:id", server.Wait) - queue.POST("/stream/:id", server.Stream) - queue.POST("/status/:id", server.Update) - queue.POST("/ping", server.Ping) - - queue.POST("/logs/:id", server.PostLogs) - queue.GET("/logs/:id", server.WriteLogs) - } - // DELETE THESE // gitlab := e.Group("/gitlab/:owner/:name") // { diff --git a/server/broker.go b/server/broker.go new file mode 100644 index 000000000..83a97b870 --- /dev/null +++ b/server/broker.go @@ -0,0 +1,13 @@ +package server + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +// Broker handles connections to the embedded message broker. +func Broker(c *gin.Context) { + broker := c.MustGet("broker").(http.Handler) + broker.ServeHTTP(c.Writer, c.Request) +} diff --git a/server/build.go b/server/build.go index b7f805cdc..3e9d7a4ac 100644 --- a/server/build.go +++ b/server/build.go @@ -12,11 +12,13 @@ import ( "github.com/drone/drone/shared/httputil" "github.com/drone/drone/store" "github.com/drone/drone/stream" + "github.com/drone/drone/yaml" "github.com/gin-gonic/gin" "github.com/square/go-jose" "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" + "github.com/drone/mq/stomp" ) func GetBuilds(c *gin.Context) { @@ -148,7 +150,14 @@ func DeleteBuild(c *gin.Context) { job.ExitCode = 137 store.UpdateBuildJob(c, build, job) - bus.Publish(c, bus.NewEvent(bus.Cancelled, repo, build, job)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/cancel", bus.Event{ + Type: bus.Cancelled, + Repo: *repo, + Build: *build, + Job: *job, + }) + c.String(204, "") } @@ -293,7 +302,7 @@ func PostBuild(c *gin.Context) { last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) secs, err := store.GetMergedSecretList(c, repo) if err != nil { - log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) + log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } var signed bool @@ -318,9 +327,19 @@ func PostBuild(c *gin.Context) { log.Debugf(".drone.yml is signed=%v and verified=%v", signed, verified) - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/events", bus.Event{ + Type: bus.Enqueued, + Repo: *repo, + Build: *build, + }, + stomp.WithHeader("repo", repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)), + ) + for _, job := range jobs { - queue.Publish(c, &queue.Work{ + broker, _ := stomp.FromContext(c) + broker.SendJSON("/queue/pending", &queue.Work{ Signed: signed, Verified: verified, User: user, @@ -332,7 +351,7 @@ func PostBuild(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }) + }, stomp.WithHeaders(yaml.ParseLabel(raw))) } } diff --git a/server/hook.go b/server/hook.go index 1c93ed867..7f722c127 100644 --- a/server/hook.go +++ b/server/hook.go @@ -3,6 +3,7 @@ package server import ( "fmt" "regexp" + "strconv" "github.com/gin-gonic/gin" "github.com/square/go-jose" @@ -16,6 +17,7 @@ import ( "github.com/drone/drone/shared/token" "github.com/drone/drone/store" "github.com/drone/drone/yaml" + "github.com/drone/mq/stomp" ) var skipRe = regexp.MustCompile(`\[(?i:ci *skip|skip *ci)\]`) @@ -208,12 +210,22 @@ func PostHook(c *gin.Context) { last, _ := store.GetBuildLastBefore(c, repo, build.Branch, build.ID) secs, err := store.GetMergedSecretList(c, repo) if err != nil { - log.Errorf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) + log.Debugf("Error getting secrets for %s#%d. %s", repo.FullName, build.Number, err) } - bus.Publish(c, bus.NewBuildEvent(bus.Enqueued, repo, build)) + client := stomp.MustFromContext(c) + client.SendJSON("/topic/events", bus.Event{ + Type: bus.Enqueued, + Repo: *repo, + Build: *build, + }, + stomp.WithHeader("repo", repo.FullName), + stomp.WithHeader("private", strconv.FormatBool(repo.IsPrivate)), + ) + for _, job := range jobs { - queue.Publish(c, &queue.Work{ + broker, _ := stomp.FromContext(c) + broker.SendJSON("/queue/pending", &queue.Work{ Signed: build.Signed, Verified: build.Verified, User: user, @@ -225,7 +237,7 @@ func PostHook(c *gin.Context) { Yaml: string(raw), Secrets: secs, System: &model.System{Link: httputil.GetURL(c.Request)}, - }) + }, stomp.WithHeaders(yaml.ParseLabel(raw))) } } diff --git a/server/queue.go b/server/queue.go index 7cdaf863f..adfbd11ff 100644 --- a/server/queue.go +++ b/server/queue.go @@ -1,80 +1,319 @@ package server import ( + "bytes" "fmt" - "io" "net/http" - "strconv" - "sync" - "time" + + "golang.org/x/net/context" "github.com/Sirupsen/logrus" - "github.com/drone/drone/bus" "github.com/drone/drone/model" "github.com/drone/drone/queue" "github.com/drone/drone/remote" "github.com/drone/drone/store" - "github.com/drone/drone/stream" - "github.com/gin-gonic/gin" + "github.com/drone/mq/stomp" "github.com/gorilla/websocket" ) -// Pull is a long request that polls and attemts to pull work off the queue stack. -func Pull(c *gin.Context) { - logrus.Debugf("Agent %s connected.", c.ClientIP()) +// +// // Pull is a long request that polls and attemts to pull work off the queue stack. +// func Pull(c *gin.Context) { +// logrus.Debugf("Agent %s connected.", c.ClientIP()) +// +// w := queue.PullClose(c, c.Writer) +// if w == nil { +// logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) +// } else { +// +// // setup the channel to stream logs +// if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { +// logrus.Errorf("Unable to create stream. %s", err) +// } +// +// c.JSON(202, w) +// +// logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", +// c.ClientIP(), +// w.Repo.Owner, +// w.Repo.Name, +// w.Build.Number, +// w.Job.Number, +// ) +// } +// } +// +// // Wait is a long request that polls and waits for cancelled build requests. +// func Wait(c *gin.Context) { +// id, err := strconv.ParseInt(c.Param("id"), 10, 64) +// if err != nil { +// c.String(500, "Invalid input. %s", err) +// return +// } +// +// eventc := make(chan *bus.Event, 1) +// +// bus.Subscribe(c, eventc) +// defer bus.Unsubscribe(c, eventc) +// +// for { +// select { +// case event := <-eventc: +// if event.Job.ID == id && event.Type == bus.Cancelled { +// c.JSON(200, event.Job) +// return +// } +// case <-c.Writer.CloseNotify(): +// return +// } +// } +// } +// +// // Update handles build updates from the agent and persists to the database. +// func Update(c *gin.Context) { +// work := &queue.Work{} +// if err := c.BindJSON(work); err != nil { +// logrus.Errorf("Invalid input. %s", err) +// return +// } +// +// // TODO(bradrydzewski) it is really annoying that we have to do this lookup +// // and I'd prefer not to. The reason we do this is because the Build and Job +// // have fields that aren't serialized to json and would be reset to their +// // empty values if we just saved what was coming in the http.Request body. +// build, err := store.GetBuild(c, work.Build.ID) +// if err != nil { +// c.String(404, "Unable to find build. %s", err) +// return +// } +// job, err := store.GetJob(c, work.Job.ID) +// if err != nil { +// c.String(404, "Unable to find job. %s", err) +// return +// } +// build.Started = work.Build.Started +// build.Finished = work.Build.Finished +// build.Status = work.Build.Status +// job.Started = work.Job.Started +// job.Finished = work.Job.Finished +// job.Status = work.Job.Status +// job.ExitCode = work.Job.ExitCode +// job.Error = work.Job.Error +// +// if build.Status == model.StatusPending { +// build.Started = work.Job.Started +// build.Status = model.StatusRunning +// store.UpdateBuild(c, build) +// } +// +// // if job.Status == model.StatusRunning { +// // err := stream.Create(c, stream.ToKey(job.ID)) +// // if err != nil { +// // logrus.Errorf("Unable to create stream. %s", err) +// // } +// // } +// +// ok, err := store.UpdateBuildJob(c, build, job) +// if err != nil { +// c.String(500, "Unable to update job. %s", err) +// return +// } +// +// if ok && build.Status != model.StatusRunning { +// // get the user because we transfer the user form the server to agent +// // and back we lose the token which does not get serialized to json. +// user, err := store.GetUser(c, work.User.ID) +// if err != nil { +// c.String(500, "Unable to find user. %s", err) +// return +// } +// remote.Status(c, user, work.Repo, build, +// fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) +// } +// +// if build.Status == model.StatusRunning { +// bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) +// } else { +// bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) +// } +// +// c.JSON(200, work) +// } +// +// // Stream streams the logs to disk or memory for broadcasing to listeners. Once +// // the stream is closed it is moved to permanent storage in the database. +// func Stream(c *gin.Context) { +// id, err := strconv.ParseInt(c.Param("id"), 10, 64) +// if err != nil { +// c.String(500, "Invalid input. %s", err) +// return +// } +// +// key := c.Param("id") +// logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) +// +// wc, err := stream.Writer(c, key) +// if err != nil { +// c.String(500, "Failed to create stream writer. %s", err) +// return +// } +// +// defer func() { +// wc.Close() +// stream.Delete(c, key) +// }() +// +// io.Copy(wc, c.Request.Body) +// +// rc, err := stream.Reader(c, key) +// if err != nil { +// c.String(500, "Failed to create stream reader. %s", err) +// return +// } +// +// wg := sync.WaitGroup{} +// wg.Add(1) +// +// go func() { +// defer recover() +// store.WriteLog(c, &model.Job{ID: id}, rc) +// wg.Done() +// }() +// +// wc.Close() +// wg.Wait() +// c.String(200, "") +// +// logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) +// } +// +// func Ping(c *gin.Context) { +// agent, err := store.GetAgentAddr(c, c.ClientIP()) +// if err == nil { +// agent.Updated = time.Now().Unix() +// err = store.UpdateAgent(c, agent) +// } else { +// err = store.CreateAgent(c, &model.Agent{ +// Address: c.ClientIP(), +// Platform: "linux/amd64", +// Capacity: 2, +// Created: time.Now().Unix(), +// Updated: time.Now().Unix(), +// }) +// } +// if err != nil { +// logrus.Errorf("Unable to register agent. %s", err.Error()) +// } +// c.String(200, "PONG") +// } - w := queue.PullClose(c, c.Writer) - if w == nil { - logrus.Debugf("Agent %s could not pull work.", c.ClientIP()) - } else { +// +// +// Below are alternate implementations for the Queue that use websockets. +// +// +// +// // PostLogs handles an http request from the agent to post build logs. These +// // logs are posted at the end of the build process. +// func PostLogs(c *gin.Context) { +// id, _ := strconv.ParseInt(c.Param("id"), 10, 64) +// job, err := store.GetJob(c, id) +// if err != nil { +// c.String(404, "Cannot upload logs. %s", err) +// return +// } +// if err := store.WriteLog(c, job, c.Request.Body); err != nil { +// c.String(500, "Cannot persist logs", err) +// return +// } +// c.String(200, "") +// } +// +// // WriteLogs handles an http request from the agent to stream build logs from +// // the agent to the server to enable real time streamings to the client. +// func WriteLogs(c *gin.Context) { +// id, err := strconv.ParseInt(c.Param("id"), 10, 64) +// if err != nil { +// c.String(500, "Invalid input. %s", err) +// return +// } +// +// conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) +// if err != nil { +// c.String(500, "Cannot upgrade to websocket. %s", err) +// return +// } +// defer conn.Close() +// +// wc, err := stream.Writer(c, stream.ToKey(id)) +// if err != nil { +// c.String(500, "Cannot create stream writer. %s", err) +// return +// } +// defer func() { +// wc.Close() +// stream.Delete(c, stream.ToKey(id)) +// }() +// +// var msg []byte +// for { +// _, msg, err = conn.ReadMessage() +// if err != nil { +// break +// } +// wc.Write(msg) +// wc.Write(newline) +// } +// +// if err != nil && err != io.EOF { +// c.String(500, "Error reading logs. %s", err) +// return +// } +// // +// // rc, err := stream.Reader(c, stream.ToKey(id)) +// // if err != nil { +// // c.String(500, "Failed to create stream reader. %s", err) +// // return +// // } +// // +// // wg := sync.WaitGroup{} +// // wg.Add(1) +// // +// // go func() { +// // defer recover() +// // store.WriteLog(c, &model.Job{ID: id}, rc) +// // wg.Done() +// // }() +// // +// // wc.Close() +// // wg.Wait() +// +// } - // setup the channel to stream logs - if err := stream.Create(c, stream.ToKey(w.Job.ID)); err != nil { - logrus.Errorf("Unable to create stream. %s", err) - } +// newline defines a newline constant to separate lines in the build output +var newline = []byte{'\n'} - c.JSON(202, w) - - logrus.Debugf("Agent %s assigned work. %s/%s#%d.%d", - c.ClientIP(), - w.Repo.Owner, - w.Repo.Name, - w.Build.Number, - w.Job.Number, - ) - } +// upgrader defines the default behavior for upgrading the websocket. +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true + }, } -// Wait is a long request that polls and waits for cancelled build requests. -func Wait(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - eventc := make(chan *bus.Event, 1) - - bus.Subscribe(c, eventc) - defer bus.Unsubscribe(c, eventc) - - for { - select { - case event := <-eventc: - if event.Job.ID == id && event.Type == bus.Cancelled { - c.JSON(200, event.Job) - return - } - case <-c.Writer.CloseNotify(): - return +// HandleUpdate handles build updates from the agent and persists to the database. +func HandleUpdate(c context.Context, message *stomp.Message) { + defer func() { + message.Release() + if r := recover(); r != nil { + err := r.(error) + logrus.Errorf("Panic recover: broker update handler: %s", err) } - } -} + }() -// Update handles build updates from the agent and persists to the database. -func Update(c *gin.Context) { - work := &queue.Work{} - if err := c.BindJSON(work); err != nil { + work := new(queue.Work) + if err := message.Unmarshal(work); err != nil { logrus.Errorf("Invalid input. %s", err) return } @@ -85,12 +324,12 @@ func Update(c *gin.Context) { // empty values if we just saved what was coming in the http.Request body. build, err := store.GetBuild(c, work.Build.ID) if err != nil { - c.String(404, "Unable to find build. %s", err) + logrus.Errorf("Unable to find build. %s", err) return } job, err := store.GetJob(c, work.Job.ID) if err != nil { - c.String(404, "Unable to find job. %s", err) + logrus.Errorf("Unable to find job. %s", err) return } build.Started = work.Build.Started @@ -117,189 +356,52 @@ func Update(c *gin.Context) { ok, err := store.UpdateBuildJob(c, build, job) if err != nil { - c.String(500, "Unable to update job. %s", err) + logrus.Errorf("Unable to update job. %s", err) return } if ok && build.Status != model.StatusRunning { // get the user because we transfer the user form the server to agent // and back we lose the token which does not get serialized to json. - user, err := store.GetUser(c, work.User.ID) - if err != nil { - c.String(500, "Unable to find user. %s", err) + user, uerr := store.GetUser(c, work.User.ID) + if uerr != nil { + logrus.Errorf("Unable to find user. %s", err) return } remote.Status(c, user, work.Repo, build, fmt.Sprintf("%s/%s/%d", work.System.Link, work.Repo.FullName, work.Build.Number)) } - if build.Status == model.StatusRunning { - bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) - } else { - bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) - } + var buf bytes.Buffer + var sub []byte - c.JSON(200, work) -} - -// Stream streams the logs to disk or memory for broadcasing to listeners. Once -// the stream is closed it is moved to permanent storage in the database. -func Stream(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - key := c.Param("id") - logrus.Infof("Agent %s creating stream %s.", c.ClientIP(), key) - - wc, err := stream.Writer(c, key) - if err != nil { - c.String(500, "Failed to create stream writer. %s", err) - return - } - - defer func() { - wc.Close() - stream.Delete(c, key) - }() - - io.Copy(wc, c.Request.Body) - - rc, err := stream.Reader(c, key) - if err != nil { - c.String(500, "Failed to create stream reader. %s", err) - return - } - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - defer recover() - store.WriteLog(c, &model.Job{ID: id}, rc) - wg.Done() - }() - - wc.Close() - wg.Wait() - c.String(200, "") - - logrus.Debugf("Agent %s wrote stream to database", c.ClientIP()) -} - -func Ping(c *gin.Context) { - agent, err := store.GetAgentAddr(c, c.ClientIP()) - if err == nil { - agent.Updated = time.Now().Unix() - err = store.UpdateAgent(c, agent) - } else { - err = store.CreateAgent(c, &model.Agent{ - Address: c.ClientIP(), - Platform: "linux/amd64", - Capacity: 2, - Created: time.Now().Unix(), - Updated: time.Now().Unix(), - }) - } - if err != nil { - logrus.Errorf("Unable to register agent. %s", err.Error()) - } - c.String(200, "PONG") -} - -// -// -// Below are alternate implementations for the Queue that use websockets. -// -// - -// PostLogs handles an http request from the agent to post build logs. These -// logs are posted at the end of the build process. -func PostLogs(c *gin.Context) { - id, _ := strconv.ParseInt(c.Param("id"), 10, 64) - job, err := store.GetJob(c, id) - if err != nil { - c.String(404, "Cannot upload logs. %s", err) - return - } - if err := store.WriteLog(c, job, c.Request.Body); err != nil { - c.String(500, "Cannot persist logs", err) - return - } - c.String(200, "") -} - -// WriteLogs handles an http request from the agent to stream build logs from -// the agent to the server to enable real time streamings to the client. -func WriteLogs(c *gin.Context) { - id, err := strconv.ParseInt(c.Param("id"), 10, 64) - if err != nil { - c.String(500, "Invalid input. %s", err) - return - } - - conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) - if err != nil { - c.String(500, "Cannot upgrade to websocket. %s", err) - return - } - defer conn.Close() - - wc, err := stream.Writer(c, stream.ToKey(id)) - if err != nil { - c.String(500, "Cannot create stream writer. %s", err) - return - } - defer func() { - wc.Close() - stream.Delete(c, stream.ToKey(id)) - }() - - var msg []byte - for { - _, msg, err = conn.ReadMessage() - if err != nil { - break + done := make(chan bool) + dest := fmt.Sprintf("/topic/%d", job.ID) + client, _ := stomp.FromContext(c) + sub, err = client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { + if len(m.Header.Get([]byte("eof"))) != 0 { + done <- true } - wc.Write(msg) - wc.Write(newline) - } - - if err != nil && err != io.EOF { - c.String(500, "Error reading logs. %s", err) + buf.Write(m.Body) + buf.WriteByte('\n') + m.Release() + })) + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) return } - // - // rc, err := stream.Reader(c, stream.ToKey(id)) - // if err != nil { - // c.String(500, "Failed to create stream reader. %s", err) - // return + <-done + + if err := store.WriteLog(c, job, &buf); err != nil { + logrus.Errorf("Unable to write logs to store. %s", err) + return + } + // if build.Status == model.StatusRunning { + // bus.Publish(c, bus.NewEvent(bus.Started, work.Repo, build, job)) + // } else { + // bus.Publish(c, bus.NewEvent(bus.Finished, work.Repo, build, job)) // } - // - // wg := sync.WaitGroup{} - // wg.Add(1) - // - // go func() { - // defer recover() - // store.WriteLog(c, &model.Job{ID: id}, rc) - // wg.Done() - // }() - // - // wc.Close() - // wg.Wait() -} - -// newline defines a newline constant to separate lines in the build output -var newline = []byte{'\n'} - -// upgrader defines the default behavior for upgrading the websocket. -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func(r *http.Request) bool { - return true - }, + client.Unsubscribe(sub) + client.Send(dest, []byte{}, stomp.WithRetain("remove")) } diff --git a/server/stream.go b/server/stream.go index b80bbc73a..4c8786f75 100644 --- a/server/stream.go +++ b/server/stream.go @@ -1,121 +1,21 @@ package server import ( - "bufio" - "encoding/json" - "io" + "fmt" "strconv" "time" - "github.com/drone/drone/bus" "github.com/drone/drone/cache" "github.com/drone/drone/model" "github.com/drone/drone/router/middleware/session" "github.com/drone/drone/store" - "github.com/drone/drone/stream" + "github.com/drone/mq/stomp" "github.com/Sirupsen/logrus" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" - "github.com/manucorporat/sse" ) -// GetRepoEvents will upgrade the connection to a Websocket and will stream -// event updates to the browser. -func GetRepoEvents(c *gin.Context) { - repo := session.Repo(c) - c.Writer.Header().Set("Content-Type", "text/event-stream") - - eventc := make(chan *bus.Event, 1) - bus.Subscribe(c, eventc) - defer func() { - bus.Unsubscribe(c, eventc) - close(eventc) - logrus.Infof("closed event stream") - }() - - c.Stream(func(w io.Writer) bool { - select { - case event := <-eventc: - if event == nil { - logrus.Infof("nil event received") - return false - } - - // TODO(bradrydzewski) This is a super hacky workaround until we improve - // the actual bus. Having a per-call database event is just plain stupid. - if event.Repo.FullName == repo.FullName { - - var payload = struct { - model.Build - Jobs []*model.Job `json:"jobs"` - }{} - payload.Build = event.Build - payload.Jobs, _ = store.GetJobList(c, &event.Build) - data, _ := json.Marshal(&payload) - - sse.Encode(w, sse.Event{ - Event: "message", - Data: string(data), - }) - } - case <-c.Writer.CloseNotify(): - return false - } - return true - }) -} - -func GetStream(c *gin.Context) { - - repo := session.Repo(c) - buildn, _ := strconv.Atoi(c.Param("build")) - jobn, _ := strconv.Atoi(c.Param("number")) - - c.Writer.Header().Set("Content-Type", "text/event-stream") - - build, err := store.GetBuildNumber(c, repo, buildn) - if err != nil { - logrus.Debugln("stream cannot get build number.", err) - c.AbortWithError(404, err) - return - } - job, err := store.GetJobNumber(c, build, jobn) - if err != nil { - logrus.Debugln("stream cannot get job number.", err) - c.AbortWithError(404, err) - return - } - - rc, err := stream.Reader(c, stream.ToKey(job.ID)) - if err != nil { - c.AbortWithError(404, err) - return - } - - go func() { - <-c.Writer.CloseNotify() - rc.Close() - }() - - var line int - var scanner = bufio.NewScanner(rc) - for scanner.Scan() { - line++ - var err = sse.Encode(c.Writer, sse.Event{ - Id: strconv.Itoa(line), - Event: "message", - Data: scanner.Text(), - }) - if err != nil { - break - } - c.Writer.Flush() - } - - logrus.Debugf("Closed stream %s#%d", repo.FullName, build.Number) -} - var ( // Time allowed to write the file to the client. writeWait = 5 * time.Second @@ -165,47 +65,35 @@ func LogStream(c *gin.Context) { ticker := time.NewTicker(pingPeriod) defer ticker.Stop() - rc, err := stream.Reader(c, stream.ToKey(job.ID)) - if err != nil { - c.AbortWithError(404, err) - return - } - - quitc := make(chan bool) - defer func() { - quitc <- true - close(quitc) - rc.Close() - ws.Close() - logrus.Debug("Successfully closed websocket") - }() - - go func() { - defer func() { - recover() - }() - for { - select { - case <-quitc: - return - case <-ticker.C: - err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) - if err != nil { - return - } - } - } - }() - - var scanner = bufio.NewScanner(rc) - var b []byte - for scanner.Scan() { - b = scanner.Bytes() - if len(b) == 0 { - continue + done := make(chan bool) + dest := fmt.Sprintf("/topic/%d", job.ID) + client, _ := stomp.FromContext(c) + sub, err := client.Subscribe(dest, stomp.HandlerFunc(func(m *stomp.Message) { + if len(m.Header.Get([]byte("eof"))) != 0 { + done <- true } ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteMessage(websocket.TextMessage, b) + ws.WriteMessage(websocket.TextMessage, m.Body) + m.Release() + })) + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) + return + } + defer func() { + client.Unsubscribe(sub) + }() + + for { + select { + case <-done: + return + case <-ticker.C: + err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) + if err != nil { + return + } + } } } @@ -227,20 +115,34 @@ func EventStream(c *gin.Context) { repo, _ = cache.GetRepoMap(c, user) } - ticker := time.NewTicker(pingPeriod) + eventc := make(chan []byte, 10) quitc := make(chan bool) - eventc := make(chan *bus.Event, 10) - bus.Subscribe(c, eventc) + tick := time.NewTicker(pingPeriod) defer func() { - ticker.Stop() - bus.Unsubscribe(c, eventc) - quitc <- true - close(quitc) - close(eventc) + tick.Stop() ws.Close() logrus.Debug("Successfully closed websocket") }() + client := stomp.MustFromContext(c) + sub, err := client.Subscribe("/topic/events", stomp.HandlerFunc(func(m *stomp.Message) { + name := m.Header.GetString("repo") + priv := m.Header.GetBool("private") + if repo[name] || !priv { + eventc <- m.Body + } + m.Release() + })) + if err != nil { + logrus.Errorf("Unable to read logs from broker. %s", err) + return + } + defer func() { + close(quitc) + close(eventc) + client.Unsubscribe(sub) + }() + go func() { defer func() { recover() @@ -249,15 +151,13 @@ func EventStream(c *gin.Context) { select { case <-quitc: return - case event := <-eventc: - if event == nil { + case event, ok := <-eventc: + if !ok { return } - if repo[event.Repo.FullName] || !event.Repo.IsPrivate { - ws.SetWriteDeadline(time.Now().Add(writeWait)) - ws.WriteJSON(event) - } - case <-ticker.C: + ws.SetWriteDeadline(time.Now().Add(writeWait)) + ws.WriteMessage(websocket.TextMessage, event) + case <-tick.C: err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)) if err != nil { return diff --git a/yaml/label.go b/yaml/label.go new file mode 100644 index 000000000..445a9b6b4 --- /dev/null +++ b/yaml/label.go @@ -0,0 +1,26 @@ +package yaml + +import ( + "gopkg.in/yaml.v2" + + "github.com/drone/drone/yaml/types" +) + +// ParseLabel parses the labels section of the Yaml document. +func ParseLabel(in []byte) map[string]string { + out := struct { + Labels types.MapEqualSlice `yaml:"labels"` + }{} + + yaml.Unmarshal(in, &out) + labels := out.Labels.Map() + if labels == nil { + labels = make(map[string]string) + } + return labels +} + +// ParseLabelString parses the labels section of the Yaml document. +func ParseLabelString(in string) map[string]string { + return ParseLabel([]byte(in)) +} diff --git a/yaml/label_test.go b/yaml/label_test.go new file mode 100644 index 000000000..3032f33ed --- /dev/null +++ b/yaml/label_test.go @@ -0,0 +1,32 @@ +package yaml + +import ( + "testing" + + "github.com/franela/goblin" +) + +func TestLabel(t *testing.T) { + + g := goblin.Goblin(t) + g.Describe("Label parser", func() { + + g.It("Should parse empty yaml", func() { + labels := ParseLabelString("") + g.Assert(len(labels)).Equal(0) + }) + + g.It("Should parse slice", func() { + labels := ParseLabelString("labels: [foo=bar, baz=boo]") + g.Assert(len(labels)).Equal(2) + g.Assert(labels["foo"]).Equal("bar") + g.Assert(labels["baz"]).Equal("boo") + }) + + g.It("Should parse map", func() { + labels := ParseLabelString("labels: {foo: bar, baz: boo}") + g.Assert(labels["foo"]).Equal("bar") + g.Assert(labels["baz"]).Equal("boo") + }) + }) +}