From 3cd78c9409d1e631fc2539f12137a2f454583d8d Mon Sep 17 00:00:00 2001 From: 6543 Date: Thu, 20 Jul 2023 20:39:20 +0200 Subject: [PATCH] Refactor agent (#2021) - code cleanup - init backend engine only once - pass a taskUUID to the backend --- *Sponsored by Kithara Software GmbH* --- agent/logger.go | 11 ++----- agent/runner.go | 4 ++- cmd/agent/agent.go | 18 ++++++------ pipeline/backend/docker/docker.go | 20 +++++++++---- pipeline/backend/kubernetes/kubernetes.go | 20 ++++++++----- pipeline/backend/local/local.go | 36 +++++++++++++++-------- pipeline/backend/ssh/ssh.go | 23 ++++++++------- pipeline/backend/types/engine.go | 26 ++++++++-------- pipeline/option.go | 6 ++++ pipeline/pipeline.go | 18 ++++++++---- 10 files changed, 110 insertions(+), 72 deletions(-) diff --git a/agent/logger.go b/agent/logger.go index ae9b3445a..b51d352e6 100644 --- a/agent/logger.go +++ b/agent/logger.go @@ -15,7 +15,6 @@ package agent import ( - "context" "io" "sync" @@ -28,7 +27,7 @@ import ( "github.com/woodpecker-ci/woodpecker/pipeline/rpc" ) -func (r *Runner) createLogger(_ context.Context, logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc { +func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc { return func(step *backend.Step, rc multipart.Reader) error { loglogger := logger.With(). Str("image", step.Image). @@ -55,12 +54,8 @@ func (r *Runner) createLogger(_ context.Context, logger zerolog.Logger, uploads log.Error().Err(err).Msg("copy limited logStream part") } - loglogger.Debug().Msg("log stream copied") - - defer func() { - loglogger.Debug().Msg("log stream closed") - uploads.Done() - }() + loglogger.Debug().Msg("log stream copied, close ...") + uploads.Done() return nil } diff --git a/agent/runner.go b/agent/runner.go index b553f0aea..300d3ad35 100644 --- a/agent/runner.go +++ b/agent/runner.go @@ -18,6 +18,7 @@ package agent import ( "context" "errors" + "fmt" "sync" "time" @@ -140,7 +141,8 @@ func (r *Runner) Run(runnerCtx context.Context) error { var uploads sync.WaitGroup err = pipeline.New(work.Config, pipeline.WithContext(workflowCtx), - pipeline.WithLogger(r.createLogger(ctxmeta, logger, &uploads, work)), + pipeline.WithTaskUUID(fmt.Sprint(work.ID)), + pipeline.WithLogger(r.createLogger(logger, &uploads, work)), pipeline.WithTracer(r.createTracer(ctxmeta, logger, work)), pipeline.WithEngine(*r.engine), pipeline.WithDescription(map[string]string{ diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index a83d5c627..246afec78 100644 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -215,20 +215,20 @@ func run(c *cli.Context) error { } }() + // load engine (e.g. init api client) + if err := engine.Load(backendCtx); err != nil { + log.Error().Err(err).Msg("cannot load backend engine") + return err + } + log.Debug().Msgf("loaded %s backend engine", engine.Name()) + for i := 0; i < parallel; i++ { + i := i go func() { defer wg.Done() - // load engine (e.g. init api client) - err = engine.Load(backendCtx) - if err != nil { - log.Error().Err(err).Msg("cannot load backend engine") - return - } - r := agent.NewRunner(client, filter, hostname, counter, &engine) - - log.Debug().Msgf("loaded %s backend engine", engine.Name()) + log.Debug().Msgf("created new runner %d", i) for { if sigterm.IsSet() { diff --git a/pipeline/backend/docker/docker.go b/pipeline/backend/docker/docker.go index 30e74fcdf..a2d365bee 100644 --- a/pipeline/backend/docker/docker.go +++ b/pipeline/backend/docker/docker.go @@ -101,7 +101,9 @@ func (e *docker) Load(ctx context.Context) error { return nil } -func (e *docker) Setup(_ context.Context, conf *backend.Config) error { +func (e *docker) SetupWorkflow(_ context.Context, conf *backend.Config, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment") + for _, vol := range conf.Volumes { _, err := e.client.VolumeCreate(noContext, volume.VolumeCreateBody{ Name: vol.Name, @@ -128,7 +130,9 @@ func (e *docker) Setup(_ context.Context, conf *backend.Config) error { return nil } -func (e *docker) Exec(ctx context.Context, step *backend.Step) error { +func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name) + config := toConfig(step) hostConfig := toHostConfig(step) containerName := toContainerName(step) @@ -204,7 +208,9 @@ func (e *docker) Exec(ctx context.Context, step *backend.Step) error { return e.client.ContainerStart(ctx, containerName, startOpts) } -func (e *docker) Wait(ctx context.Context, step *backend.Step) (*backend.State, error) { +func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) { + log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name) + containerName := toContainerName(step) wait, errc := e.client.ContainerWait(ctx, containerName, "") @@ -228,7 +234,9 @@ func (e *docker) Wait(ctx context.Context, step *backend.Step) (*backend.State, }, nil } -func (e *docker) Tail(ctx context.Context, step *backend.Step) (io.ReadCloser, error) { +func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID string) (io.ReadCloser, error) { + log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name) + logs, err := e.client.ContainerLogs(ctx, toContainerName(step), logsOpts) if err != nil { return nil, err @@ -244,7 +252,9 @@ func (e *docker) Tail(ctx context.Context, step *backend.Step) (io.ReadCloser, e return rc, nil } -func (e *docker) Destroy(_ context.Context, conf *backend.Config) error { +func (e *docker) DestroyWorkflow(_ context.Context, conf *backend.Config, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment") + for _, stage := range conf.Stages { for _, step := range stage.Steps { containerName := toContainerName(step) diff --git a/pipeline/backend/kubernetes/kubernetes.go b/pipeline/backend/kubernetes/kubernetes.go index b4cb6dd85..aba979a52 100644 --- a/pipeline/backend/kubernetes/kubernetes.go +++ b/pipeline/backend/kubernetes/kubernetes.go @@ -115,8 +115,8 @@ func (e *kube) Load(context.Context) error { } // Setup the pipeline environment. -func (e *kube) Setup(ctx context.Context, conf *types.Config) error { - log.Trace().Msgf("Setting up Kubernetes primitives") +func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives") for _, vol := range conf.Volumes { pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx) @@ -168,25 +168,27 @@ func (e *kube) Setup(ctx context.Context, conf *types.Config) error { } // Start the pipeline step. -func (e *kube) Exec(ctx context.Context, step *types.Step) error { +func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error { pod, err := Pod(e.config.Namespace, step, e.config.PodLabels, e.config.PodAnnotations) if err != nil { return err } - log.Trace().Msgf("Creating pod: %s", pod.Name) + log.Trace().Str("taskUUID", taskUUID).Msgf("Creating pod: %s", pod.Name) _, err = e.client.CoreV1().Pods(e.config.Namespace).Create(ctx, pod, metav1.CreateOptions{}) return err } // Wait for the pipeline step to complete and returns // the completion results. -func (e *kube) Wait(ctx context.Context, step *types.Step) (*types.State, error) { +func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) { podName, err := dnsName(step.Name) if err != nil { return nil, err } + log.Trace().Str("taskUUID", taskUUID).Msgf("Waiting for pod: %s", podName) + finished := make(chan bool) podUpdated := func(old, new interface{}) { @@ -239,12 +241,14 @@ func (e *kube) Wait(ctx context.Context, step *types.Step) (*types.State, error) } // Tail the pipeline step logs. -func (e *kube) Tail(ctx context.Context, step *types.Step) (io.ReadCloser, error) { +func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) { podName, err := dnsName(step.Name) if err != nil { return nil, err } + log.Trace().Str("taskUUID", taskUUID).Msgf("Tail logs of pod: %s", podName) + up := make(chan bool) podUpdated := func(old, new interface{}) { @@ -308,7 +312,9 @@ func (e *kube) Tail(ctx context.Context, step *types.Step) (io.ReadCloser, error } // Destroy the pipeline environment. -func (e *kube) Destroy(_ context.Context, conf *types.Config) error { +func (e *kube) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msg("Deleting Kubernetes primitives") + gracePeriodSeconds := int64(0) // immediately dpb := metav1.DeletePropagationBackground diff --git a/pipeline/backend/local/local.go b/pipeline/backend/local/local.go index 0015c252e..2405c19bb 100644 --- a/pipeline/backend/local/local.go +++ b/pipeline/backend/local/local.go @@ -25,6 +25,7 @@ import ( "strings" "github.com/alessio/shellescape" + "github.com/rs/zerolog/log" "golang.org/x/exp/slices" "github.com/woodpecker-ci/woodpecker/pipeline/backend/types" @@ -74,8 +75,10 @@ func (e *local) Load(context.Context) error { return nil } -// Setup the pipeline environment. -func (e *local) Setup(_ context.Context, c *types.Config) error { +// SetupWorkflow the pipeline environment. +func (e *local) SetupWorkflow(_ context.Context, conf *types.Config, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment") + baseDir, err := os.MkdirTemp("", "woodpecker-local-*") if err != nil { return err @@ -98,7 +101,7 @@ func (e *local) Setup(_ context.Context, c *types.Config) error { // TODO: copy plugin-git binary to homeDir and set PATH - workflowID, err := e.getWorkflowIDFromConfig(c) + workflowID, err := e.getWorkflowIDFromConfig(conf) if err != nil { return err } @@ -108,8 +111,10 @@ func (e *local) Setup(_ context.Context, c *types.Config) error { return nil } -// Exec the pipeline step. -func (e *local) Exec(ctx context.Context, step *types.Step) error { +// StartStep the pipeline step. +func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name) + state, err := e.getWorkflowStateFromStep(step) if err != nil { return err @@ -163,9 +168,11 @@ func (e *local) Exec(ctx context.Context, step *types.Step) error { return cmd.Start() } -// Wait for the pipeline step to complete and returns +// WaitStep for the pipeline step to complete and returns // the completion results. -func (e *local) Wait(_ context.Context, step *types.Step) (*types.State, error) { +func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) { + log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name) + state, err := e.getWorkflowStateFromStep(step) if err != nil { return nil, err @@ -192,14 +199,17 @@ func (e *local) Wait(_ context.Context, step *types.Step) (*types.State, error) }, err } -// Tail the pipeline step logs. -func (e *local) Tail(context.Context, *types.Step) (io.ReadCloser, error) { +// TailStep the pipeline step logs. +func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) { + log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name) return e.output, nil } -// Destroy the pipeline environment. -func (e *local) Destroy(_ context.Context, c *types.Config) error { - state, err := e.getWorkflowStateFromConfig(c) +// DestroyWorkflow the pipeline environment. +func (e *local) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment") + + state, err := e.getWorkflowStateFromConfig(conf) if err != nil { return err } @@ -209,7 +219,7 @@ func (e *local) Destroy(_ context.Context, c *types.Config) error { return err } - workflowID, err := e.getWorkflowIDFromConfig(c) + workflowID, err := e.getWorkflowIDFromConfig(conf) if err != nil { return err } diff --git a/pipeline/backend/ssh/ssh.go b/pipeline/backend/ssh/ssh.go index 5e6c96efb..f22d7d7a1 100644 --- a/pipeline/backend/ssh/ssh.go +++ b/pipeline/backend/ssh/ssh.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/melbahja/goph" + "github.com/rs/zerolog/log" "github.com/urfave/cli/v2" "github.com/woodpecker-ci/woodpecker/pipeline/backend/common" @@ -79,13 +80,15 @@ func (e *ssh) Load(ctx context.Context) error { return nil } -// Setup the pipeline environment. -func (e *ssh) Setup(_ context.Context, _ *types.Config) error { +// SetupWorkflow create the workflow environment. +func (e *ssh) SetupWorkflow(context.Context, *types.Config, string) error { return nil } -// Exec the pipeline step. -func (e *ssh) Exec(ctx context.Context, step *types.Step) error { +// StartStep start the step. +func (e *ssh) StartStep(ctx context.Context, step *types.Step, taskUUID string) error { + log.Trace().Str("taskUUID", taskUUID).Msgf("Start step %s", step.Name) + // Get environment variables var command []string for a, b := range step.Environment { @@ -124,21 +127,21 @@ func (e *ssh) Exec(ctx context.Context, step *types.Step) error { return e.cmd.Start() } -// Wait for the pipeline step to complete and returns +// WaitStep for the pipeline step to complete and returns // the completion results. -func (e *ssh) Wait(context.Context, *types.Step) (*types.State, error) { +func (e *ssh) WaitStep(context.Context, *types.Step, string) (*types.State, error) { return &types.State{ Exited: true, }, e.cmd.Wait() } -// Tail the pipeline step logs. -func (e *ssh) Tail(context.Context, *types.Step) (io.ReadCloser, error) { +// TailStep the pipeline step logs. +func (e *ssh) TailStep(context.Context, *types.Step, string) (io.ReadCloser, error) { return e.output, nil } -// Destroy the pipeline environment. -func (e *ssh) Destroy(context.Context, *types.Config) error { +// DestroyWorkflow delete the workflow environment. +func (e *ssh) DestroyWorkflow(context.Context, *types.Config, string) error { e.client.Close() sftp, err := e.client.NewSftp() if err != nil { diff --git a/pipeline/backend/types/engine.go b/pipeline/backend/types/engine.go index 1bdea0e26..661d8e3e1 100644 --- a/pipeline/backend/types/engine.go +++ b/pipeline/backend/types/engine.go @@ -11,25 +11,25 @@ type Engine interface { // Name returns the name of the backend. Name() string - // Check if the backend is available. - IsAvailable(context.Context) bool + // IsAvailable check if the backend is available. + IsAvailable(ctx context.Context) bool // Load the backend engine. - Load(context.Context) error + Load(ctx context.Context) error - // Setup the pipeline environment. - Setup(context.Context, *Config) error + // SetupWorkflow the workflow environment. + SetupWorkflow(ctx context.Context, conf *Config, taskUUID string) error - // Exec start the pipeline step. - Exec(context.Context, *Step) error + // StartStep start the workflow step. + StartStep(ctx context.Context, step *Step, taskUUID string) error - // Wait for the pipeline step to complete and returns + // WaitStep for the workflow step to complete and returns // the completion results. - Wait(context.Context, *Step) (*State, error) + WaitStep(ctx context.Context, step *Step, taskUUID string) (*State, error) - // Tail the pipeline step logs. - Tail(context.Context, *Step) (io.ReadCloser, error) + // TailStep the workflow step logs. + TailStep(ctx context.Context, step *Step, taskUUID string) (io.ReadCloser, error) - // Destroy the pipeline environment. - Destroy(context.Context, *Config) error + // DestroyWorkflow the workflow environment. + DestroyWorkflow(ctx context.Context, conf *Config, taskUUID string) error } diff --git a/pipeline/option.go b/pipeline/option.go index 4d380af66..787d874e2 100644 --- a/pipeline/option.go +++ b/pipeline/option.go @@ -42,3 +42,9 @@ func WithDescription(desc map[string]string) Option { r.Description = desc } } + +func WithTaskUUID(uuid string) Option { + return func(r *Runtime) { + r.taskUUID = uuid + } +} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index bab76520c..f1ba0b040 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "golang.org/x/sync/errgroup" @@ -16,6 +17,8 @@ import ( "github.com/woodpecker-ci/woodpecker/pipeline/multipart" ) +// TODO: move runtime into "runtime" subpackage + type ( // State defines the pipeline and process state. State struct { @@ -45,6 +48,8 @@ type Runtime struct { tracer Tracer logger Logger + taskUUID string + Description map[string]string // The runtime descriptors. } @@ -55,6 +60,7 @@ func New(spec *backend.Config, opts ...Option) *Runtime { r.Description = map[string]string{} r.spec = spec r.ctx = context.Background() + r.taskUUID = uuid.New().String() for _, opts := range opts { opts(r) } @@ -69,7 +75,7 @@ func (r *Runtime) MakeLogger() zerolog.Logger { return logCtx.Logger() } -// Starts the execution of the pipeline and waits for it to complete +// Starts the execution of an workflow and waits for it to complete func (r *Runtime) Run(runnerCtx context.Context) error { logger := r.MakeLogger() logger.Debug().Msgf("Executing %d stages, in order of:", len(r.spec.Stages)) @@ -86,13 +92,13 @@ func (r *Runtime) Run(runnerCtx context.Context) error { } defer func() { - if err := r.engine.Destroy(runnerCtx, r.spec); err != nil { + if err := r.engine.DestroyWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil { logger.Error().Err(err).Msg("could not destroy engine") } }() r.started = time.Now().Unix() - if err := r.engine.Setup(r.ctx, r.spec); err != nil { + if err := r.engine.SetupWorkflow(r.ctx, r.spec, r.taskUUID); err != nil { return err } @@ -215,13 +221,13 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error { // Executes the step and returns the state and error. func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { - if err := r.engine.Exec(r.ctx, step); err != nil { + if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil { return nil, err } var wg sync.WaitGroup if r.logger != nil { - rc, err := r.engine.Tail(r.ctx, step) + rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID) if err != nil { return nil, err } @@ -246,7 +252,7 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) { // Some pipeline backends, such as local, will close the pipe from Tail on Wait, // so first make sure all reading has finished. wg.Wait() - waitState, err := r.engine.Wait(r.ctx, step) + waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID) if err != nil { if errors.Is(err, context.Canceled) { return waitState, ErrCancel