From bdcee93f799ccb3a97f75a9da05302687e365027 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Wed, 15 Jun 2022 21:33:29 +0200 Subject: [PATCH] Move Server Pipeline Build code out of API into own package (#949) - refactor - create new errors to handle on them - dedup code - split server pipeline functionality's into dedicated functions - add code comments to document what goes on - add TODOs for next refactor --- server/api/build.go | 469 ++----------------------------------- server/api/helper.go | 57 +++++ server/api/hook.go | 285 +--------------------- server/pipeline/approve.go | 68 ++++++ server/pipeline/cancel.go | 176 ++++++++++++++ server/pipeline/config.go | 56 +++++ server/pipeline/create.go | 135 +++++++++++ server/pipeline/decline.go | 54 +++++ server/pipeline/errors.go | 54 +++++ server/pipeline/filter.go | 73 ++++++ server/pipeline/helper.go | 41 ++++ server/pipeline/items.go | 85 +++++++ server/pipeline/queue.go | 70 ++++++ server/pipeline/restart.go | 128 ++++++++++ server/pipeline/start.go | 54 +++++ server/pipeline/topic.go | 45 ++++ 16 files changed, 1134 insertions(+), 716 deletions(-) create mode 100644 server/api/helper.go create mode 100644 server/pipeline/approve.go create mode 100644 server/pipeline/cancel.go create mode 100644 server/pipeline/config.go create mode 100644 server/pipeline/create.go create mode 100644 server/pipeline/decline.go create mode 100644 server/pipeline/errors.go create mode 100644 server/pipeline/filter.go create mode 100644 server/pipeline/helper.go create mode 100644 server/pipeline/items.go create mode 100644 server/pipeline/queue.go create mode 100644 server/pipeline/restart.go create mode 100644 server/pipeline/start.go create mode 100644 server/pipeline/topic.go diff --git a/server/api/build.go b/server/api/build.go index 447cbca01..7d7c0139e 100644 --- a/server/api/build.go +++ b/server/api/build.go @@ -19,9 +19,6 @@ package api import ( "bytes" - "context" - "database/sql" - "errors" "fmt" "io" "net/http" @@ -31,12 +28,9 @@ import ( "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" - "github.com/woodpecker-ci/woodpecker/server" "github.com/woodpecker-ci/woodpecker/server/model" - "github.com/woodpecker-ci/woodpecker/server/queue" - "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/pipeline" "github.com/woodpecker-ci/woodpecker/server/router/middleware/session" - "github.com/woodpecker-ci/woodpecker/server/shared" "github.com/woodpecker-ci/woodpecker/server/store" ) @@ -216,99 +210,14 @@ func DeleteBuild(c *gin.Context) { return } - if build.Status != model.StatusRunning && build.Status != model.StatusPending { - c.String(http.StatusBadRequest, "Cannot cancel a non-running or non-pending build") - return + if err := pipeline.Cancel(c, _store, repo, build); err != nil { + handlePipelineErr(c, err) + } else { + c.Status(http.StatusNoContent) } - - code, err := cancelBuild(c, _store, repo, build) - if err != nil { - _ = c.AbortWithError(code, err) - return - } - - c.String(code, "") -} - -// Cancel the build and returns the status. -func cancelBuild( - ctx context.Context, - _store store.Store, - repo *model.Repo, - build *model.Build, -) (int, error) { - procs, err := _store.ProcList(build) - if err != nil { - return http.StatusNotFound, err - } - - // First cancel/evict procs in the queue in one go - var ( - procsToCancel []string - procsToEvict []string - ) - for _, proc := range procs { - if proc.PPID != 0 { - continue - } - if proc.State == model.StatusRunning { - procsToCancel = append(procsToCancel, fmt.Sprint(proc.ID)) - } - if proc.State == model.StatusPending { - procsToEvict = append(procsToEvict, fmt.Sprint(proc.ID)) - } - } - - if len(procsToEvict) != 0 { - if err := server.Config.Services.Queue.EvictAtOnce(ctx, procsToEvict); err != nil { - log.Error().Err(err).Msgf("queue: evict_at_once: %v", procsToEvict) - } - if err := server.Config.Services.Queue.ErrorAtOnce(ctx, procsToEvict, queue.ErrCancel); err != nil { - log.Error().Err(err).Msgf("queue: evict_at_once: %v", procsToEvict) - } - } - if len(procsToCancel) != 0 { - if err := server.Config.Services.Queue.ErrorAtOnce(ctx, procsToCancel, queue.ErrCancel); err != nil { - log.Error().Err(err).Msgf("queue: evict_at_once: %v", procsToCancel) - } - } - - // Then update the DB status for pending builds - // Running ones will be set when the agents stop on the cancel signal - for _, proc := range procs { - if proc.State == model.StatusPending { - if proc.PPID != 0 { - if _, err = shared.UpdateProcToStatusSkipped(_store, *proc, 0); err != nil { - log.Error().Msgf("error: done: cannot update proc_id %d state: %s", proc.ID, err) - } - } else { - if _, err = shared.UpdateProcToStatusKilled(_store, *proc); err != nil { - log.Error().Msgf("error: done: cannot update proc_id %d state: %s", proc.ID, err) - } - } - } - } - - killedBuild, err := shared.UpdateToStatusKilled(_store, *build) - if err != nil { - log.Error().Err(err).Msgf("UpdateToStatusKilled: %v", build) - return http.StatusInternalServerError, err - } - - procs, err = _store.ProcList(killedBuild) - if err != nil { - return http.StatusNotFound, err - } - if killedBuild.Procs, err = model.Tree(procs); err != nil { - return http.StatusInternalServerError, err - } - if err := publishToTopic(ctx, killedBuild, repo); err != nil { - log.Error().Err(err).Msg("publishToTopic") - } - - return http.StatusNoContent, nil } +// PostApproval start pipelines in gated repos func PostApproval(c *gin.Context) { var ( _store = store.FromContext(c) @@ -322,48 +231,16 @@ func PostApproval(c *gin.Context) { _ = c.AbortWithError(404, err) return } - if build.Status != model.StatusBlocked { - c.String(http.StatusBadRequest, "cannot decline a build with status %s", build.Status) - return - } - // fetch the build file from the database - configs, err := _store.ConfigsForBuild(build.ID) + newBuild, err := pipeline.Approve(c, _store, build, user, repo) if err != nil { - log.Error().Msgf("failure to get build config for %s. %s", repo.FullName, err) - _ = c.AbortWithError(404, err) - return + handlePipelineErr(c, err) + } else { + c.JSON(200, newBuild) } - - if build, err = shared.UpdateToStatusPending(_store, *build, user.Login); err != nil { - c.String(http.StatusInternalServerError, "error updating build. %s", err) - return - } - - var yamls []*remote.FileMeta - for _, y := range configs { - yamls = append(yamls, &remote.FileMeta{Data: y.Data, Name: y.Name}) - } - - build, buildItems, err := createBuildItems(c, _store, build, user, repo, yamls, nil) - if err != nil { - msg := fmt.Sprintf("failure to createBuildItems for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - - build, err = startBuild(c, _store, build, user, repo, buildItems) - if err != nil { - msg := fmt.Sprintf("failure to start build for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - - c.JSON(200, build) } +// PostDecline decline pipelines in gated repos func PostDecline(c *gin.Context) { var ( _store = store.FromContext(c) @@ -374,35 +251,16 @@ func PostDecline(c *gin.Context) { build, err := _store.GetBuildNumber(repo, num) if err != nil { - _ = c.AbortWithError(404, err) - return - } - if build.Status != model.StatusBlocked { - c.String(500, "cannot decline a build with status %s", build.Status) + c.String(http.StatusNotFound, "%v", err) return } - if _, err = shared.UpdateToStatusDeclined(_store, *build, user.Login); err != nil { - c.String(500, "error updating build. %s", err) - return + build, err = pipeline.Decline(c, _store, build, user, repo) + if err != nil { + handlePipelineErr(c, err) + } else { + c.JSON(200, build) } - - if build.Procs, err = _store.ProcList(build); err != nil { - log.Error().Err(err).Msg("can not get proc list from store") - } - if build.Procs, err = model.Tree(build.Procs); err != nil { - log.Error().Err(err).Msg("can not build tree from proc list") - } - - if err := updateBuildStatus(c, build, repo, user); err != nil { - log.Error().Err(err).Msg("updateBuildStatus") - } - - if err := publishToTopic(c, build, repo); err != nil { - log.Error().Err(err).Msg("publishToTopic") - } - - c.JSON(200, build) } func GetBuildQueue(c *gin.Context) { @@ -414,9 +272,8 @@ func GetBuildQueue(c *gin.Context) { c.JSON(200, out) } -// PostBuild restarts a build +// PostBuild restarts a build optional with altered event, deploy or environment func PostBuild(c *gin.Context) { - _remote := server.Config.Services.Remote _store := store.FromContext(c) repo := session.Repo(c) @@ -440,69 +297,13 @@ func PostBuild(c *gin.Context) { return } - switch build.Status { - case model.StatusDeclined, - model.StatusBlocked: - c.String(500, "cannot restart a build with status %s", build.Status) - return - } + // refresh the token to make sure, pipeline.ReStart can still obtain the pipeline config if nessessary again + refreshUserToken(c, user) - // if the remote has a refresh token, the current access token - // may be stale. Therefore, we should refresh prior to dispatching - // the job. - if refresher, ok := _remote.(remote.Refresher); ok { - ok, err := refresher.Refresh(c, user) - if err != nil { - log.Error().Err(err).Msgf("refresh oauth token of user '%s' failed", user.Login) - } else if ok { - if err := _store.UpdateUser(user); err != nil { - log.Error().Err(err).Msg("fail to save user to store after refresh oauth token") - } - } - } - - var pipelineFiles []*remote.FileMeta - - // fetch the old pipeline config from database - configs, err := _store.ConfigsForBuild(build.ID) - if err != nil { - log.Error().Msgf("failure to get build config for %s. %s", repo.FullName, err) - _ = c.AbortWithError(404, err) - return - } - - for _, y := range configs { - pipelineFiles = append(pipelineFiles, &remote.FileMeta{Data: y.Data, Name: y.Name}) - } - - // If config extension is active we should refetch the config in case something changed - if server.Config.Services.ConfigService != nil && server.Config.Services.ConfigService.IsConfigured() { - currentFileMeta := make([]*remote.FileMeta, len(configs)) - for i, cfg := range configs { - currentFileMeta[i] = &remote.FileMeta{Name: cfg.Name, Data: cfg.Data} - } - - newConfig, useOld, err := server.Config.Services.ConfigService.FetchConfig(c, repo, build, currentFileMeta) - if err != nil { - msg := fmt.Sprintf("On fetching external build config: %s", err) - c.String(http.StatusBadRequest, msg) - return - } - if !useOld { - pipelineFiles = newConfig - } - } - - build.ID = 0 - build.Number = 0 - build.Parent = num - build.Status = model.StatusPending - build.Started = 0 - build.Finished = 0 - build.Enqueued = time.Now().UTC().Unix() - build.Error = "" + // make Deploy overridable build.Deploy = c.DefaultQuery("deploy_to", build.Deploy) + // make Event overridable if event, ok := c.GetQuery("event"); ok { build.Event = model.WebhookEvent(event) @@ -513,21 +314,6 @@ func PostBuild(c *gin.Context) { } } - err = _store.CreateBuild(build) - if err != nil { - msg := fmt.Sprintf("failure to save build for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - - if err := persistBuildConfigs(_store, configs, build.ID); err != nil { - msg := fmt.Sprintf("failure to persist build config for %s.", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - // Read query string parameters into buildParams, exclude reserved params envs := map[string]string{} for key, val := range c.Request.URL.Query() { @@ -543,23 +329,12 @@ func PostBuild(c *gin.Context) { } } - build, buildItems, err := createBuildItems(c, _store, build, user, repo, pipelineFiles, envs) + newBuild, err := pipeline.Restart(c, _store, build, user, repo, envs) if err != nil { - msg := fmt.Sprintf("failure to createBuildItems for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return + handlePipelineErr(c, err) + } else { + c.JSON(200, newBuild) } - - build, err = startBuild(c, _store, build, user, repo, buildItems) - if err != nil { - msg := fmt.Sprintf("failure to start build for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - - c.JSON(200, build) } func DeleteBuildLogs(c *gin.Context) { @@ -603,198 +378,6 @@ func DeleteBuildLogs(c *gin.Context) { c.String(204, "") } -func createBuildItems(ctx context.Context, store store.Store, build *model.Build, user *model.User, repo *model.Repo, yamls []*remote.FileMeta, envs map[string]string) (*model.Build, []*shared.BuildItem, error) { - netrc, err := server.Config.Services.Remote.Netrc(user, repo) - if err != nil { - log.Error().Err(err).Msg("Failed to generate netrc file") - } - - // get the previous build so that we can send status change notifications - last, err := store.GetBuildLastBefore(repo, build.Branch, build.ID) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - log.Error().Err(err).Str("repo", repo.FullName).Msgf("Error getting last build before build number '%d'", build.Number) - } - - secs, err := server.Config.Services.Secrets.SecretListBuild(repo, build) - if err != nil { - log.Error().Err(err).Msgf("Error getting secrets for %s#%d", repo.FullName, build.Number) - } - - regs, err := server.Config.Services.Registries.RegistryList(repo) - if err != nil { - log.Error().Err(err).Msgf("Error getting registry credentials for %s#%d", repo.FullName, build.Number) - } - - if envs == nil { - envs = map[string]string{} - } - if server.Config.Services.Environ != nil { - globals, _ := server.Config.Services.Environ.EnvironList(repo) - for _, global := range globals { - envs[global.Name] = global.Value - } - } - - b := shared.ProcBuilder{ - Repo: repo, - Curr: build, - Last: last, - Netrc: netrc, - Secs: secs, - Regs: regs, - Envs: envs, - Link: server.Config.Server.Host, - Yamls: yamls, - } - buildItems, err := b.Build() - if err != nil { - if _, err := shared.UpdateToStatusError(store, *build, err); err != nil { - log.Error().Err(err).Msgf("Error setting error status of build for %s#%d", repo.FullName, build.Number) - } - return nil, nil, err - } - - build = shared.SetBuildStepsOnBuild(b.Curr, buildItems) - - return build, buildItems, nil -} - -func cancelPreviousPipelines( - ctx context.Context, - _store store.Store, - build *model.Build, - user *model.User, - repo *model.Repo, -) error { - // check this event should cancel previous pipelines - eventIncluded := false - for _, ev := range repo.CancelPreviousPipelineEvents { - if ev == build.Event { - eventIncluded = true - break - } - } - if !eventIncluded { - return nil - } - - // get all active activeBuilds - activeBuilds, err := _store.GetActiveBuildList(repo, -1) - if err != nil { - return err - } - - buildNeedsCancel := func(active *model.Build) (bool, error) { - // always filter on same event - if active.Event != build.Event { - return false, nil - } - - // find events for the same context - switch build.Event { - case model.EventPush: - return build.Branch == active.Branch, nil - default: - return build.Refspec == active.Refspec, nil - } - } - - for _, active := range activeBuilds { - if active.ID == build.ID { - // same build. e.g. self - continue - } - - cancel, err := buildNeedsCancel(active) - if err != nil { - log.Error(). - Err(err). - Str("Ref", active.Ref). - Msg("Error while trying to cancel build, skipping") - continue - } - if !cancel { - continue - } - _, err = cancelBuild(ctx, _store, repo, active) - if err != nil { - log.Error(). - Err(err). - Str("Ref", active.Ref). - Int64("ID", active.ID). - Msg("Failed to cancel build") - } - } - - return nil -} - -func startBuild( - ctx context.Context, - store store.Store, - activeBuild *model.Build, - user *model.User, - repo *model.Repo, - buildItems []*shared.BuildItem, -) (*model.Build, error) { - // call to cancel previous builds if needed - if err := cancelPreviousPipelines(ctx, store, activeBuild, user, repo); err != nil { - // should be not breaking - log.Error().Err(err).Msg("Failed to cancel previous builds") - } - - if err := store.ProcCreate(activeBuild.Procs); err != nil { - log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting procs for %s#%d", repo.FullName, activeBuild.Number) - return nil, err - } - - if err := publishToTopic(ctx, activeBuild, repo); err != nil { - log.Error().Err(err).Msg("publishToTopic") - } - - if err := queueBuild(activeBuild, repo, buildItems); err != nil { - log.Error().Err(err).Msg("queueBuild") - return nil, err - } - - if err := updateBuildStatus(ctx, activeBuild, repo, user); err != nil { - log.Error().Err(err).Msg("updateBuildStatus") - } - - return activeBuild, nil -} - -func updateBuildStatus(ctx context.Context, build *model.Build, repo *model.Repo, user *model.User) error { - for _, proc := range build.Procs { - // skip child procs - if !proc.IsParent() { - continue - } - - err := server.Config.Services.Remote.Status(ctx, user, repo, build, proc) - if err != nil { - log.Error().Err(err).Msgf("error setting commit status for %s/%d", repo.FullName, build.Number) - return err - } - } - - return nil -} - -func persistBuildConfigs(store store.Store, configs []*model.Config, buildID int64) error { - for _, conf := range configs { - buildConfig := &model.BuildConfig{ - ConfigID: conf.ID, - BuildID: buildID, - } - err := store.BuildConfigCreate(buildConfig) - if err != nil { - return err - } - } - return nil -} - var deleteStr = `[ { "proc": %q, diff --git a/server/api/helper.go b/server/api/helper.go new file mode 100644 index 000000000..4826cc5c8 --- /dev/null +++ b/server/api/helper.go @@ -0,0 +1,57 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + + "github.com/gin-gonic/gin" + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/pipeline" + "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +func handlePipelineErr(c *gin.Context, err error) { + if pipeline.IsErrNotFound(err) { + c.String(http.StatusNotFound, "%v", err) + } else if pipeline.IsErrBadRequest(err) { + c.String(http.StatusBadRequest, "%v", err) + } else if pipeline.IsErrFiltered(err) { + c.String(http.StatusNoContent, "%v", err) + } else { + _ = c.AbortWithError(http.StatusInternalServerError, err) + } +} + +// if the remote has a refresh token, the current access token may be stale. +// Therefore, we should refresh prior to dispatching the job. +func refreshUserToken(c *gin.Context, user *model.User) { + _remote := server.Config.Services.Remote + _store := store.FromContext(c) + if refresher, ok := _remote.(remote.Refresher); ok { + ok, err := refresher.Refresh(c, user) + if err != nil { + log.Error().Err(err).Msgf("refresh oauth token of user '%s' failed", user.Login) + } else if ok { + if err := _store.UpdateUser(user); err != nil { + log.Error().Err(err).Msg("fail to save user to store after refresh oauth token") + } + } + } +} diff --git a/server/api/hook.go b/server/api/hook.go index b38d23cb2..548dde88a 100644 --- a/server/api/hook.go +++ b/server/api/hook.go @@ -18,27 +18,18 @@ package api import ( - "context" - "crypto/sha256" - "encoding/json" "fmt" "math/rand" "net/http" "regexp" - "strconv" "time" "github.com/gin-gonic/gin" "github.com/rs/zerolog/log" - "github.com/woodpecker-ci/woodpecker/pipeline/frontend/yaml" - "github.com/woodpecker-ci/woodpecker/pipeline/rpc" "github.com/woodpecker-ci/woodpecker/server" "github.com/woodpecker-ci/woodpecker/server/model" - "github.com/woodpecker-ci/woodpecker/server/pubsub" - "github.com/woodpecker-ci/woodpecker/server/queue" - "github.com/woodpecker-ci/woodpecker/server/remote" - "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/pipeline" "github.com/woodpecker-ci/woodpecker/server/store" "github.com/woodpecker-ci/woodpecker/shared/token" ) @@ -75,17 +66,18 @@ func BlockTilQueueHasRunningItem(c *gin.Context) { c.Status(http.StatusOK) } +// PostHook start a pipeline triggered by a forges post webhook func PostHook(c *gin.Context) { _store := store.FromContext(c) - tmpRepo, build, err := server.Config.Services.Remote.Hook(c, c.Request) + tmpRepo, tmpBuild, err := server.Config.Services.Remote.Hook(c, c.Request) if err != nil { msg := "failure to parse hook" log.Debug().Err(err).Msg(msg) c.String(http.StatusBadRequest, msg) return } - if build == nil { + if tmpBuild == nil { msg := "ignoring hook: hook parsing resulted in empty build" log.Debug().Msg(msg) c.String(http.StatusOK, msg) @@ -98,11 +90,11 @@ func PostHook(c *gin.Context) { return } - // skip the build if any case-insensitive combination of the words "skip" and "ci" + // skip the tmpBuild if any case-insensitive combination of the words "skip" and "ci" // wrapped in square brackets appear in the commit message - skipMatch := skipRe.FindString(build.Message) + skipMatch := skipRe.FindString(tmpBuild.Message) if len(skipMatch) > 0 { - msg := fmt.Sprintf("ignoring hook: %s found in %s", skipMatch, build.Commit) + msg := fmt.Sprintf("ignoring hook: %s found in %s", skipMatch, tmpBuild.Commit) log.Debug().Msg(msg) c.String(http.StatusNoContent, msg) return @@ -146,270 +138,17 @@ func PostHook(c *gin.Context) { return } - if build.Event == model.EventPull && !repo.AllowPull { + if tmpBuild.Event == model.EventPull && !repo.AllowPull { msg := "ignoring hook: pull requests are disabled for this repo in woodpecker" log.Debug().Str("repo", repo.FullName).Msg(msg) c.String(http.StatusNoContent, msg) return } - repoUser, err := _store.GetUser(repo.UserID) + build, err := pipeline.Create(c, _store, repo, tmpBuild) if err != nil { - msg := fmt.Sprintf("failure to find repo owner via id '%d'", repo.UserID) - log.Error().Err(err).Str("repo", repo.FullName).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return + handlePipelineErr(c, err) + } else { + c.JSON(200, build) } - - // if the remote has a refresh token, the current access token - // may be stale. Therefore, we should refresh prior to dispatching - // the build. - if refresher, ok := server.Config.Services.Remote.(remote.Refresher); ok { - refreshed, err := refresher.Refresh(c, repoUser) - if err != nil { - log.Error().Err(err).Msgf("failed to refresh oauth2 token for repoUser: %s", repoUser.Login) - } else if refreshed { - if err := _store.UpdateUser(repoUser); err != nil { - log.Error().Err(err).Msgf("error while updating repoUser: %s", repoUser.Login) - // move forward - } - } - } - - // fetch the build file from the remote - configFetcher := shared.NewConfigFetcher(server.Config.Services.Remote, server.Config.Services.ConfigService, repoUser, repo, build) - remoteYamlConfigs, err := configFetcher.Fetch(c) - if err != nil { - msg := fmt.Sprintf("cannot find config '%s' in '%s' with user: '%s'", repo.Config, build.Ref, repoUser.Login) - log.Debug().Err(err).Str("repo", repo.FullName).Msg(msg) - c.String(http.StatusNotFound, msg) - return - } - - filtered, err := branchFiltered(build, remoteYamlConfigs) - if err != nil { - msg := "failure to parse yaml from hook" - log.Debug().Err(err).Str("repo", repo.FullName).Msg(msg) - c.String(http.StatusBadRequest, msg) - return - } - if filtered { - msg := "ignoring hook: branch does not match restrictions defined in yaml" - log.Debug().Str("repo", repo.FullName).Msg(msg) - c.String(http.StatusOK, msg) - return - } - - if zeroSteps(build, remoteYamlConfigs) { - msg := "ignoring hook: step conditions yield zero runnable steps" - log.Debug().Str("repo", repo.FullName).Msg(msg) - c.String(http.StatusOK, msg) - return - } - - // update some build fields - build.RepoID = repo.ID - build.Verified = true - build.Status = model.StatusPending - - // TODO(336) extend gated feature with an allow/block List - if repo.IsGated { - build.Status = model.StatusBlocked - } - - err = _store.CreateBuild(build, build.Procs...) - if err != nil { - msg := fmt.Sprintf("failure to save build for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - - // persist the build config for historical correctness, restarts, etc - for _, remoteYamlConfig := range remoteYamlConfigs { - _, err := findOrPersistPipelineConfig(_store, build, remoteYamlConfig) - if err != nil { - msg := fmt.Sprintf("failure to find or persist pipeline config for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - } - - build, buildItems, err := createBuildItems(c, _store, build, repoUser, repo, remoteYamlConfigs, nil) - if err != nil { - msg := fmt.Sprintf("failure to createBuildItems for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - - if build.Status == model.StatusBlocked { - if err := publishToTopic(c, build, repo); err != nil { - log.Error().Err(err).Msg("publishToTopic") - } - - if err := updateBuildStatus(c, build, repo, repoUser); err != nil { - log.Error().Err(err).Msg("updateBuildStatus") - } - - c.JSON(http.StatusOK, build) - return - } - - build, err = startBuild(c, _store, build, repoUser, repo, buildItems) - if err != nil { - msg := fmt.Sprintf("failure to start build for %s", repo.FullName) - log.Error().Err(err).Msg(msg) - c.String(http.StatusInternalServerError, msg) - return - } - - c.JSON(http.StatusOK, build) -} - -// TODO: parse yaml once and not for each filter function -func branchFiltered(build *model.Build, remoteYamlConfigs []*remote.FileMeta) (bool, error) { - log.Trace().Msgf("hook.branchFiltered(): build branch: '%s' build event: '%s' config count: %d", build.Branch, build.Event, len(remoteYamlConfigs)) - - if build.Event == model.EventTag || build.Event == model.EventDeploy { - return false, nil - } - - for _, remoteYamlConfig := range remoteYamlConfigs { - parsedPipelineConfig, err := yaml.ParseBytes(remoteYamlConfig.Data) - if err != nil { - log.Trace().Msgf("parse config '%s': %s", remoteYamlConfig.Name, err) - return false, err - } - log.Trace().Msgf("config '%s': %#v", remoteYamlConfig.Name, parsedPipelineConfig) - - if parsedPipelineConfig.Branches.Match(build.Branch) { - return false, nil - } - } - - return true, nil -} - -func zeroSteps(build *model.Build, remoteYamlConfigs []*remote.FileMeta) bool { - b := shared.ProcBuilder{ - Repo: &model.Repo{}, - Curr: build, - Last: &model.Build{}, - Netrc: &model.Netrc{}, - Secs: []*model.Secret{}, - Regs: []*model.Registry{}, - Link: "", - Yamls: remoteYamlConfigs, - } - - buildItems, err := b.Build() - if err != nil { - return false - } - if len(buildItems) == 0 { - return true - } - - return false -} - -func findOrPersistPipelineConfig(store store.Store, build *model.Build, remoteYamlConfig *remote.FileMeta) (*model.Config, error) { - sha := shasum(remoteYamlConfig.Data) - conf, err := store.ConfigFindIdentical(build.RepoID, sha) - if err != nil { - conf = &model.Config{ - RepoID: build.RepoID, - Data: remoteYamlConfig.Data, - Hash: sha, - Name: shared.SanitizePath(remoteYamlConfig.Name), - } - err = store.ConfigCreate(conf) - if err != nil { - // retry in case we receive two hooks at the same time - conf, err = store.ConfigFindIdentical(build.RepoID, sha) - if err != nil { - return nil, err - } - } - } - - buildConfig := &model.BuildConfig{ - ConfigID: conf.ID, - BuildID: build.ID, - } - if err := store.BuildConfigCreate(buildConfig); err != nil { - return nil, err - } - - return conf, nil -} - -// publishes message to UI clients -func publishToTopic(c context.Context, build *model.Build, repo *model.Repo) (err error) { - message := pubsub.Message{ - Labels: map[string]string{ - "repo": repo.FullName, - "private": strconv.FormatBool(repo.IsSCMPrivate), - }, - } - buildCopy := *build - if buildCopy.Procs, err = model.Tree(buildCopy.Procs); err != nil { - return err - } - - message.Data, _ = json.Marshal(model.Event{ - Repo: *repo, - Build: buildCopy, - }) - return server.Config.Services.Pubsub.Publish(c, "topic/events", message) -} - -func queueBuild(build *model.Build, repo *model.Repo, buildItems []*shared.BuildItem) error { - var tasks []*queue.Task - for _, item := range buildItems { - if item.Proc.State == model.StatusSkipped { - continue - } - task := new(queue.Task) - task.ID = fmt.Sprint(item.Proc.ID) - task.Labels = map[string]string{} - for k, v := range item.Labels { - task.Labels[k] = v - } - task.Labels["platform"] = item.Platform - task.Labels["repo"] = repo.FullName - task.Dependencies = taskIds(item.DependsOn, buildItems) - task.RunOn = item.RunsOn - task.DepStatus = make(map[string]string) - - task.Data, _ = json.Marshal(rpc.Pipeline{ - ID: fmt.Sprint(item.Proc.ID), - Config: item.Config, - Timeout: repo.Timeout, - }) - - if err := server.Config.Services.Logs.Open(context.Background(), task.ID); err != nil { - return err - } - tasks = append(tasks, task) - } - return server.Config.Services.Queue.PushAtOnce(context.Background(), tasks) -} - -func taskIds(dependsOn []string, buildItems []*shared.BuildItem) (taskIds []string) { - for _, dep := range dependsOn { - for _, buildItem := range buildItems { - if buildItem.Proc.Name == dep { - taskIds = append(taskIds, fmt.Sprint(buildItem.Proc.ID)) - } - } - } - return -} - -func shasum(raw []byte) string { - sum := sha256.Sum256(raw) - return fmt.Sprintf("%x", sum) } diff --git a/server/pipeline/approve.go b/server/pipeline/approve.go new file mode 100644 index 000000000..c56c18c23 --- /dev/null +++ b/server/pipeline/approve.go @@ -0,0 +1,68 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "fmt" + + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +// Approve update the status to pending for blocked build because of a gated repo +// and start them afterwards +func Approve(ctx context.Context, store store.Store, build *model.Build, user *model.User, repo *model.Repo) (*model.Build, error) { + if build.Status != model.StatusBlocked { + return nil, ErrBadRequest{Msg: fmt.Sprintf("cannot decline a build with status %s", build.Status)} + } + + // fetch the build file from the database + configs, err := store.ConfigsForBuild(build.ID) + if err != nil { + msg := fmt.Sprintf("failure to get build config for %s. %s", repo.FullName, err) + log.Error().Msg(msg) + return nil, ErrNotFound{Msg: msg} + } + + if build, err = shared.UpdateToStatusPending(store, *build, user.Login); err != nil { + return nil, fmt.Errorf("error updating build. %s", err) + } + + var yamls []*remote.FileMeta + for _, y := range configs { + yamls = append(yamls, &remote.FileMeta{Data: y.Data, Name: y.Name}) + } + + build, buildItems, err := createBuildItems(ctx, store, build, user, repo, yamls, nil) + if err != nil { + msg := fmt.Sprintf("failure to createBuildItems for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, err + } + + build, err = start(ctx, store, build, user, repo, buildItems) + if err != nil { + msg := fmt.Sprintf("failure to start build for %s: %v", repo.FullName, err) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + return build, nil +} diff --git a/server/pipeline/cancel.go b/server/pipeline/cancel.go new file mode 100644 index 000000000..a6e3cf39c --- /dev/null +++ b/server/pipeline/cancel.go @@ -0,0 +1,176 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "fmt" + + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/queue" + "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +// Cancel the build and returns the status. +func Cancel(ctx context.Context, store store.Store, repo *model.Repo, build *model.Build) error { + if build.Status != model.StatusRunning && build.Status != model.StatusPending { + return ErrBadRequest{Msg: "Cannot cancel a non-running or non-pending build"} + } + + procs, err := store.ProcList(build) + if err != nil { + return ErrNotFound{Msg: err.Error()} + } + + // First cancel/evict procs in the queue in one go + var ( + procsToCancel []string + procsToEvict []string + ) + for _, proc := range procs { + if proc.PPID != 0 { + continue + } + if proc.State == model.StatusRunning { + procsToCancel = append(procsToCancel, fmt.Sprint(proc.ID)) + } + if proc.State == model.StatusPending { + procsToEvict = append(procsToEvict, fmt.Sprint(proc.ID)) + } + } + + if len(procsToEvict) != 0 { + if err := server.Config.Services.Queue.EvictAtOnce(ctx, procsToEvict); err != nil { + log.Error().Err(err).Msgf("queue: evict_at_once: %v", procsToEvict) + } + if err := server.Config.Services.Queue.ErrorAtOnce(ctx, procsToEvict, queue.ErrCancel); err != nil { + log.Error().Err(err).Msgf("queue: evict_at_once: %v", procsToEvict) + } + } + if len(procsToCancel) != 0 { + if err := server.Config.Services.Queue.ErrorAtOnce(ctx, procsToCancel, queue.ErrCancel); err != nil { + log.Error().Err(err).Msgf("queue: evict_at_once: %v", procsToCancel) + } + } + + // Then update the DB status for pending builds + // Running ones will be set when the agents stop on the cancel signal + for _, proc := range procs { + if proc.State == model.StatusPending { + if proc.PPID != 0 { + if _, err = shared.UpdateProcToStatusSkipped(store, *proc, 0); err != nil { + log.Error().Msgf("error: done: cannot update proc_id %d state: %s", proc.ID, err) + } + } else { + if _, err = shared.UpdateProcToStatusKilled(store, *proc); err != nil { + log.Error().Msgf("error: done: cannot update proc_id %d state: %s", proc.ID, err) + } + } + } + } + + killedBuild, err := shared.UpdateToStatusKilled(store, *build) + if err != nil { + log.Error().Err(err).Msgf("UpdateToStatusKilled: %v", build) + return err + } + + procs, err = store.ProcList(killedBuild) + if err != nil { + return ErrNotFound{Msg: err.Error()} + } + if killedBuild.Procs, err = model.Tree(procs); err != nil { + return err + } + if err := publishToTopic(ctx, killedBuild, repo); err != nil { + log.Error().Err(err).Msg("publishToTopic") + } + + return nil +} + +func cancelPreviousPipelines( + ctx context.Context, + _store store.Store, + build *model.Build, + repo *model.Repo, +) error { + // check this event should cancel previous pipelines + eventIncluded := false + for _, ev := range repo.CancelPreviousPipelineEvents { + if ev == build.Event { + eventIncluded = true + break + } + } + if !eventIncluded { + return nil + } + + // get all active activeBuilds + activeBuilds, err := _store.GetActiveBuildList(repo, -1) + if err != nil { + return err + } + + buildNeedsCancel := func(active *model.Build) (bool, error) { + // always filter on same event + if active.Event != build.Event { + return false, nil + } + + // find events for the same context + switch build.Event { + case model.EventPush: + return build.Branch == active.Branch, nil + default: + return build.Refspec == active.Refspec, nil + } + } + + for _, active := range activeBuilds { + if active.ID == build.ID { + // same build. e.g. self + continue + } + + cancel, err := buildNeedsCancel(active) + if err != nil { + log.Error(). + Err(err). + Str("Ref", active.Ref). + Msg("Error while trying to cancel build, skipping") + continue + } + + if !cancel { + continue + } + + if err = Cancel(ctx, _store, repo, active); err != nil { + log.Error(). + Err(err). + Str("Ref", active.Ref). + Int64("ID", active.ID). + Msg("Failed to cancel build") + } + } + + return nil +} diff --git a/server/pipeline/config.go b/server/pipeline/config.go new file mode 100644 index 000000000..b2d0bfc70 --- /dev/null +++ b/server/pipeline/config.go @@ -0,0 +1,56 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "crypto/sha256" + "fmt" + + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +func findOrPersistPipelineConfig(store store.Store, build *model.Build, remoteYamlConfig *remote.FileMeta) (*model.Config, error) { + sha := fmt.Sprintf("%x", sha256.Sum256(remoteYamlConfig.Data)) + conf, err := store.ConfigFindIdentical(build.RepoID, sha) + if err != nil { + conf = &model.Config{ + RepoID: build.RepoID, + Data: remoteYamlConfig.Data, + Hash: sha, + Name: shared.SanitizePath(remoteYamlConfig.Name), + } + err = store.ConfigCreate(conf) + if err != nil { + // retry in case we receive two hooks at the same time + conf, err = store.ConfigFindIdentical(build.RepoID, sha) + if err != nil { + return nil, err + } + } + } + + buildConfig := &model.BuildConfig{ + ConfigID: conf.ID, + BuildID: build.ID, + } + if err := store.BuildConfigCreate(buildConfig); err != nil { + return nil, err + } + + return conf, nil +} diff --git a/server/pipeline/create.go b/server/pipeline/create.go new file mode 100644 index 000000000..c13b50e48 --- /dev/null +++ b/server/pipeline/create.go @@ -0,0 +1,135 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "fmt" + + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +// Create a new build and start it +func Create(ctx context.Context, _store store.Store, repo *model.Repo, build *model.Build) (*model.Build, error) { + repoUser, err := _store.GetUser(repo.UserID) + if err != nil { + msg := fmt.Sprintf("failure to find repo owner via id '%d'", repo.UserID) + log.Error().Err(err).Str("repo", repo.FullName).Msg(msg) + return nil, fmt.Errorf(msg) + } + + // if the remote has a refresh token, the current access token + // may be stale. Therefore, we should refresh prior to dispatching + // the build. + if refresher, ok := server.Config.Services.Remote.(remote.Refresher); ok { + refreshed, err := refresher.Refresh(ctx, repoUser) + if err != nil { + log.Error().Err(err).Msgf("failed to refresh oauth2 token for repoUser: %s", repoUser.Login) + } else if refreshed { + if err := _store.UpdateUser(repoUser); err != nil { + log.Error().Err(err).Msgf("error while updating repoUser: %s", repoUser.Login) + // move forward + } + } + } + + // fetch the build file from the remote + configFetcher := shared.NewConfigFetcher(server.Config.Services.Remote, server.Config.Services.ConfigService, repoUser, repo, build) + remoteYamlConfigs, err := configFetcher.Fetch(ctx) + if err != nil { + msg := fmt.Sprintf("cannot find config '%s' in '%s' with user: '%s'", repo.Config, build.Ref, repoUser.Login) + log.Debug().Err(err).Str("repo", repo.FullName).Msg(msg) + return nil, ErrNotFound{Msg: msg} + } + + filtered, err := branchFiltered(build, remoteYamlConfigs) + if err != nil { + msg := "failure to parse yaml from hook" + log.Debug().Err(err).Str("repo", repo.FullName).Msg(msg) + return nil, ErrBadRequest{Msg: msg} + } + if filtered { + err := ErrFiltered{Msg: "branch does not match restrictions defined in yaml"} + log.Debug().Str("repo", repo.FullName).Msgf("%v", err) + return nil, err + } + + if zeroSteps(build, remoteYamlConfigs) { + err := ErrFiltered{Msg: "step conditions yield zero runnable steps"} + log.Debug().Str("repo", repo.FullName).Msgf("%v", err) + return nil, err + } + + // update some build fields + build.RepoID = repo.ID + build.Verified = true + build.Status = model.StatusPending + + // TODO(336) extend gated feature with an allow/block List + if repo.IsGated { + build.Status = model.StatusBlocked + } + + err = _store.CreateBuild(build, build.Procs...) + if err != nil { + msg := fmt.Sprintf("failure to save build for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + // persist the build config for historical correctness, restarts, etc + for _, remoteYamlConfig := range remoteYamlConfigs { + _, err := findOrPersistPipelineConfig(_store, build, remoteYamlConfig) + if err != nil { + msg := fmt.Sprintf("failure to find or persist pipeline config for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + } + + build, buildItems, err := createBuildItems(ctx, _store, build, repoUser, repo, remoteYamlConfigs, nil) + if err != nil { + msg := fmt.Sprintf("failure to createBuildItems for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + if build.Status == model.StatusBlocked { + if err := publishToTopic(ctx, build, repo); err != nil { + log.Error().Err(err).Msg("publishToTopic") + } + + if err := updateBuildStatus(ctx, build, repo, repoUser); err != nil { + log.Error().Err(err).Msg("updateBuildStatus") + } + + return build, nil + } + + build, err = start(ctx, _store, build, repoUser, repo, buildItems) + if err != nil { + msg := fmt.Sprintf("failure to start build for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + return build, nil +} diff --git a/server/pipeline/decline.go b/server/pipeline/decline.go new file mode 100644 index 000000000..3744daa33 --- /dev/null +++ b/server/pipeline/decline.go @@ -0,0 +1,54 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "fmt" + + "github.com/rs/zerolog/log" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +// Decline update the status to declined for blocked build because of a gated repo +func Decline(ctx context.Context, store store.Store, build *model.Build, user *model.User, repo *model.Repo) (*model.Build, error) { + if build.Status != model.StatusBlocked { + return nil, fmt.Errorf("cannot decline a build with status %s", build.Status) + } + + _, err := shared.UpdateToStatusDeclined(store, *build, user.Login) + if err != nil { + return nil, fmt.Errorf("error updating build. %s", err) + } + + if build.Procs, err = store.ProcList(build); err != nil { + log.Error().Err(err).Msg("can not get proc list from store") + } + if build.Procs, err = model.Tree(build.Procs); err != nil { + log.Error().Err(err).Msg("can not build tree from proc list") + } + + if err := updateBuildStatus(ctx, build, repo, user); err != nil { + log.Error().Err(err).Msg("updateBuildStatus") + } + + if err := publishToTopic(ctx, build, repo); err != nil { + log.Error().Err(err).Msg("publishToTopic") + } + + return build, nil +} diff --git a/server/pipeline/errors.go b/server/pipeline/errors.go new file mode 100644 index 000000000..f18c7b899 --- /dev/null +++ b/server/pipeline/errors.go @@ -0,0 +1,54 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +type ErrNotFound struct { + Msg string +} + +func (e ErrNotFound) Error() string { + return e.Msg +} + +func IsErrNotFound(err error) bool { + _, ok := err.(ErrNotFound) + return ok +} + +type ErrBadRequest struct { + Msg string +} + +func (e ErrBadRequest) Error() string { + return e.Msg +} + +func IsErrBadRequest(err error) bool { + _, ok := err.(ErrBadRequest) + return ok +} + +type ErrFiltered struct { + Msg string +} + +func (e ErrFiltered) Error() string { + return "ignoring hook: " + e.Msg +} + +func IsErrFiltered(err error) bool { + _, ok := err.(ErrFiltered) + return ok +} diff --git a/server/pipeline/filter.go b/server/pipeline/filter.go new file mode 100644 index 000000000..6d6cbffe5 --- /dev/null +++ b/server/pipeline/filter.go @@ -0,0 +1,73 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +// TODO(770): pipeline filter should not belong here + +import ( + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/pipeline/frontend/yaml" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/shared" +) + +func zeroSteps(build *model.Build, remoteYamlConfigs []*remote.FileMeta) bool { + b := shared.ProcBuilder{ + Repo: &model.Repo{}, + Curr: build, + Last: &model.Build{}, + Netrc: &model.Netrc{}, + Secs: []*model.Secret{}, + Regs: []*model.Registry{}, + Link: "", + Yamls: remoteYamlConfigs, + } + + buildItems, err := b.Build() + if err != nil { + return false + } + if len(buildItems) == 0 { + return true + } + + return false +} + +// TODO: parse yaml once and not for each filter function +func branchFiltered(build *model.Build, remoteYamlConfigs []*remote.FileMeta) (bool, error) { + log.Trace().Msgf("hook.branchFiltered(): build branch: '%s' build event: '%s' config count: %d", build.Branch, build.Event, len(remoteYamlConfigs)) + + if build.Event == model.EventTag || build.Event == model.EventDeploy { + return false, nil + } + + for _, remoteYamlConfig := range remoteYamlConfigs { + parsedPipelineConfig, err := yaml.ParseBytes(remoteYamlConfig.Data) + if err != nil { + log.Trace().Msgf("parse config '%s': %s", remoteYamlConfig.Name, err) + return false, err + } + log.Trace().Msgf("config '%s': %#v", remoteYamlConfig.Name, parsedPipelineConfig) + + if parsedPipelineConfig.Branches.Match(build.Branch) { + return false, nil + } + } + + return true, nil +} diff --git a/server/pipeline/helper.go b/server/pipeline/helper.go new file mode 100644 index 000000000..0dcf41d4d --- /dev/null +++ b/server/pipeline/helper.go @@ -0,0 +1,41 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" +) + +func updateBuildStatus(ctx context.Context, build *model.Build, repo *model.Repo, user *model.User) error { + for _, proc := range build.Procs { + // skip child procs + if !proc.IsParent() { + continue + } + + err := server.Config.Services.Remote.Status(ctx, user, repo, build, proc) + if err != nil { + log.Error().Err(err).Msgf("error setting commit status for %s/%d", repo.FullName, build.Number) + return err + } + } + + return nil +} diff --git a/server/pipeline/items.go b/server/pipeline/items.go new file mode 100644 index 000000000..ae56352fc --- /dev/null +++ b/server/pipeline/items.go @@ -0,0 +1,85 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "database/sql" + "errors" + + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +func createBuildItems(ctx context.Context, store store.Store, build *model.Build, user *model.User, repo *model.Repo, yamls []*remote.FileMeta, envs map[string]string) (*model.Build, []*shared.BuildItem, error) { + netrc, err := server.Config.Services.Remote.Netrc(user, repo) + if err != nil { + log.Error().Err(err).Msg("Failed to generate netrc file") + } + + // get the previous build so that we can send status change notifications + last, err := store.GetBuildLastBefore(repo, build.Branch, build.ID) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + log.Error().Err(err).Str("repo", repo.FullName).Msgf("Error getting last build before build number '%d'", build.Number) + } + + secs, err := server.Config.Services.Secrets.SecretListBuild(repo, build) + if err != nil { + log.Error().Err(err).Msgf("Error getting secrets for %s#%d", repo.FullName, build.Number) + } + + regs, err := server.Config.Services.Registries.RegistryList(repo) + if err != nil { + log.Error().Err(err).Msgf("Error getting registry credentials for %s#%d", repo.FullName, build.Number) + } + + if envs == nil { + envs = map[string]string{} + } + if server.Config.Services.Environ != nil { + globals, _ := server.Config.Services.Environ.EnvironList(repo) + for _, global := range globals { + envs[global.Name] = global.Value + } + } + + b := shared.ProcBuilder{ + Repo: repo, + Curr: build, + Last: last, + Netrc: netrc, + Secs: secs, + Regs: regs, + Envs: envs, + Link: server.Config.Server.Host, + Yamls: yamls, + } + buildItems, err := b.Build() + if err != nil { + if _, err := shared.UpdateToStatusError(store, *build, err); err != nil { + log.Error().Err(err).Msgf("Error setting error status of build for %s#%d", repo.FullName, build.Number) + } + return nil, nil, err + } + + build = shared.SetBuildStepsOnBuild(b.Curr, buildItems) + + return build, buildItems, nil +} diff --git a/server/pipeline/queue.go b/server/pipeline/queue.go new file mode 100644 index 000000000..d8c0cf855 --- /dev/null +++ b/server/pipeline/queue.go @@ -0,0 +1,70 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/woodpecker-ci/woodpecker/pipeline/rpc" + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/queue" + "github.com/woodpecker-ci/woodpecker/server/shared" +) + +func queueBuild(build *model.Build, repo *model.Repo, buildItems []*shared.BuildItem) error { + var tasks []*queue.Task + for _, item := range buildItems { + if item.Proc.State == model.StatusSkipped { + continue + } + task := new(queue.Task) + task.ID = fmt.Sprint(item.Proc.ID) + task.Labels = map[string]string{} + for k, v := range item.Labels { + task.Labels[k] = v + } + task.Labels["platform"] = item.Platform + task.Labels["repo"] = repo.FullName + task.Dependencies = taskIds(item.DependsOn, buildItems) + task.RunOn = item.RunsOn + task.DepStatus = make(map[string]string) + + task.Data, _ = json.Marshal(rpc.Pipeline{ + ID: fmt.Sprint(item.Proc.ID), + Config: item.Config, + Timeout: repo.Timeout, + }) + + if err := server.Config.Services.Logs.Open(context.Background(), task.ID); err != nil { + return err + } + tasks = append(tasks, task) + } + return server.Config.Services.Queue.PushAtOnce(context.Background(), tasks) +} + +func taskIds(dependsOn []string, buildItems []*shared.BuildItem) (taskIds []string) { + for _, dep := range dependsOn { + for _, buildItem := range buildItems { + if buildItem.Proc.Name == dep { + taskIds = append(taskIds, fmt.Sprint(buildItem.Proc.ID)) + } + } + } + return +} diff --git a/server/pipeline/restart.go b/server/pipeline/restart.go new file mode 100644 index 000000000..02f43ae46 --- /dev/null +++ b/server/pipeline/restart.go @@ -0,0 +1,128 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "fmt" + "time" + + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/remote" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +// Restart a build by creating a new one out of the old and start it +func Restart(ctx context.Context, store store.Store, lastBuild *model.Build, user *model.User, repo *model.Repo, envs map[string]string) (*model.Build, error) { + switch lastBuild.Status { + case model.StatusDeclined, + model.StatusBlocked: + return nil, ErrBadRequest{Msg: fmt.Sprintf("cannot restart a build with status %s", lastBuild.Status)} + } + + var pipelineFiles []*remote.FileMeta + + // fetch the old pipeline config from database + configs, err := store.ConfigsForBuild(lastBuild.ID) + if err != nil { + msg := fmt.Sprintf("failure to get build config for %s. %s", repo.FullName, err) + log.Error().Msgf(msg) + return nil, ErrNotFound{Msg: msg} + } + + for _, y := range configs { + pipelineFiles = append(pipelineFiles, &remote.FileMeta{Data: y.Data, Name: y.Name}) + } + + // If config extension is active we should refetch the config in case something changed + if server.Config.Services.ConfigService != nil && server.Config.Services.ConfigService.IsConfigured() { + currentFileMeta := make([]*remote.FileMeta, len(configs)) + for i, cfg := range configs { + currentFileMeta[i] = &remote.FileMeta{Name: cfg.Name, Data: cfg.Data} + } + + newConfig, useOld, err := server.Config.Services.ConfigService.FetchConfig(ctx, repo, lastBuild, currentFileMeta) + if err != nil { + return nil, ErrBadRequest{ + Msg: fmt.Sprintf("On fetching external build config: %s", err), + } + } + if !useOld { + pipelineFiles = newConfig + } + } + + newBuild := createNewBuildOutOfOld(lastBuild) + newBuild.Parent = lastBuild.ID + + err = store.CreateBuild(newBuild) + if err != nil { + msg := fmt.Sprintf("failure to save build for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + if err := persistBuildConfigs(store, configs, newBuild.ID); err != nil { + msg := fmt.Sprintf("failure to persist build config for %s.", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + newBuild, buildItems, err := createBuildItems(ctx, store, newBuild, user, repo, pipelineFiles, envs) + if err != nil { + msg := fmt.Sprintf("failure to createBuildItems for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + newBuild, err = start(ctx, store, newBuild, user, repo, buildItems) + if err != nil { + msg := fmt.Sprintf("failure to start build for %s", repo.FullName) + log.Error().Err(err).Msg(msg) + return nil, fmt.Errorf(msg) + } + + return newBuild, nil +} + +// TODO: reuse at create.go too +func persistBuildConfigs(store store.Store, configs []*model.Config, buildID int64) error { + for _, conf := range configs { + buildConfig := &model.BuildConfig{ + ConfigID: conf.ID, + BuildID: buildID, + } + err := store.BuildConfigCreate(buildConfig) + if err != nil { + return err + } + } + return nil +} + +func createNewBuildOutOfOld(old *model.Build) *model.Build { + new := *old + new.ID = 0 + new.Number = 0 + new.Status = model.StatusPending + new.Started = 0 + new.Finished = 0 + new.Enqueued = time.Now().UTC().Unix() + new.Error = "" + return &new +} diff --git a/server/pipeline/start.go b/server/pipeline/start.go new file mode 100644 index 000000000..9af2011bd --- /dev/null +++ b/server/pipeline/start.go @@ -0,0 +1,54 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + + "github.com/rs/zerolog/log" + + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/shared" + "github.com/woodpecker-ci/woodpecker/server/store" +) + +// start a build, make sure it was stored persistent in the store before +func start(ctx context.Context, store store.Store, activeBuild *model.Build, user *model.User, repo *model.Repo, buildItems []*shared.BuildItem) (*model.Build, error) { + // call to cancel previous builds if needed + if err := cancelPreviousPipelines(ctx, store, activeBuild, repo); err != nil { + // should be not breaking + log.Error().Err(err).Msg("Failed to cancel previous builds") + } + + if err := store.ProcCreate(activeBuild.Procs); err != nil { + log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting procs for %s#%d", repo.FullName, activeBuild.Number) + return nil, err + } + + if err := publishToTopic(ctx, activeBuild, repo); err != nil { + log.Error().Err(err).Msg("publishToTopic") + } + + if err := queueBuild(activeBuild, repo, buildItems); err != nil { + log.Error().Err(err).Msg("queueBuild") + return nil, err + } + + if err := updateBuildStatus(ctx, activeBuild, repo, user); err != nil { + log.Error().Err(err).Msg("updateBuildStatus") + } + + return activeBuild, nil +} diff --git a/server/pipeline/topic.go b/server/pipeline/topic.go new file mode 100644 index 000000000..006752609 --- /dev/null +++ b/server/pipeline/topic.go @@ -0,0 +1,45 @@ +// Copyright 2022 Woodpecker Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pipeline + +import ( + "context" + "encoding/json" + "strconv" + + "github.com/woodpecker-ci/woodpecker/server" + "github.com/woodpecker-ci/woodpecker/server/model" + "github.com/woodpecker-ci/woodpecker/server/pubsub" +) + +// publishToTopic publishes message to UI clients +func publishToTopic(c context.Context, build *model.Build, repo *model.Repo) (err error) { + message := pubsub.Message{ + Labels: map[string]string{ + "repo": repo.FullName, + "private": strconv.FormatBool(repo.IsSCMPrivate), + }, + } + buildCopy := *build + if buildCopy.Procs, err = model.Tree(buildCopy.Procs); err != nil { + return err + } + + message.Data, _ = json.Marshal(model.Event{ + Repo: *repo, + Build: buildCopy, + }) + return server.Config.Services.Pubsub.Publish(c, "topic/events", message) +}