Refactor agent (#2021)

- code cleanup
- init backend engine only once
- pass a taskUUID to the backend

---
*Sponsored by Kithara Software GmbH*
This commit is contained in:
6543 2023-07-20 20:39:20 +02:00 committed by GitHub
parent f464156917
commit 3cd78c9409
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 110 additions and 72 deletions

View file

@ -15,7 +15,6 @@
package agent
import (
"context"
"io"
"sync"
@ -28,7 +27,7 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
)
func (r *Runner) createLogger(_ context.Context, logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc {
func (r *Runner) createLogger(logger zerolog.Logger, uploads *sync.WaitGroup, work *rpc.Pipeline) pipeline.LogFunc {
return func(step *backend.Step, rc multipart.Reader) error {
loglogger := logger.With().
Str("image", step.Image).
@ -55,12 +54,8 @@ func (r *Runner) createLogger(_ context.Context, logger zerolog.Logger, uploads
log.Error().Err(err).Msg("copy limited logStream part")
}
loglogger.Debug().Msg("log stream copied")
defer func() {
loglogger.Debug().Msg("log stream closed")
uploads.Done()
}()
loglogger.Debug().Msg("log stream copied, close ...")
uploads.Done()
return nil
}

View file

@ -18,6 +18,7 @@ package agent
import (
"context"
"errors"
"fmt"
"sync"
"time"
@ -140,7 +141,8 @@ func (r *Runner) Run(runnerCtx context.Context) error {
var uploads sync.WaitGroup
err = pipeline.New(work.Config,
pipeline.WithContext(workflowCtx),
pipeline.WithLogger(r.createLogger(ctxmeta, logger, &uploads, work)),
pipeline.WithTaskUUID(fmt.Sprint(work.ID)),
pipeline.WithLogger(r.createLogger(logger, &uploads, work)),
pipeline.WithTracer(r.createTracer(ctxmeta, logger, work)),
pipeline.WithEngine(*r.engine),
pipeline.WithDescription(map[string]string{

View file

@ -215,20 +215,20 @@ func run(c *cli.Context) error {
}
}()
// load engine (e.g. init api client)
if err := engine.Load(backendCtx); err != nil {
log.Error().Err(err).Msg("cannot load backend engine")
return err
}
log.Debug().Msgf("loaded %s backend engine", engine.Name())
for i := 0; i < parallel; i++ {
i := i
go func() {
defer wg.Done()
// load engine (e.g. init api client)
err = engine.Load(backendCtx)
if err != nil {
log.Error().Err(err).Msg("cannot load backend engine")
return
}
r := agent.NewRunner(client, filter, hostname, counter, &engine)
log.Debug().Msgf("loaded %s backend engine", engine.Name())
log.Debug().Msgf("created new runner %d", i)
for {
if sigterm.IsSet() {

View file

@ -101,7 +101,9 @@ func (e *docker) Load(ctx context.Context) error {
return nil
}
func (e *docker) Setup(_ context.Context, conf *backend.Config) error {
func (e *docker) SetupWorkflow(_ context.Context, conf *backend.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment")
for _, vol := range conf.Volumes {
_, err := e.client.VolumeCreate(noContext, volume.VolumeCreateBody{
Name: vol.Name,
@ -128,7 +130,9 @@ func (e *docker) Setup(_ context.Context, conf *backend.Config) error {
return nil
}
func (e *docker) Exec(ctx context.Context, step *backend.Step) error {
func (e *docker) StartStep(ctx context.Context, step *backend.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)
config := toConfig(step)
hostConfig := toHostConfig(step)
containerName := toContainerName(step)
@ -204,7 +208,9 @@ func (e *docker) Exec(ctx context.Context, step *backend.Step) error {
return e.client.ContainerStart(ctx, containerName, startOpts)
}
func (e *docker) Wait(ctx context.Context, step *backend.Step) (*backend.State, error) {
func (e *docker) WaitStep(ctx context.Context, step *backend.Step, taskUUID string) (*backend.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)
containerName := toContainerName(step)
wait, errc := e.client.ContainerWait(ctx, containerName, "")
@ -228,7 +234,9 @@ func (e *docker) Wait(ctx context.Context, step *backend.Step) (*backend.State,
}, nil
}
func (e *docker) Tail(ctx context.Context, step *backend.Step) (io.ReadCloser, error) {
func (e *docker) TailStep(ctx context.Context, step *backend.Step, taskUUID string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)
logs, err := e.client.ContainerLogs(ctx, toContainerName(step), logsOpts)
if err != nil {
return nil, err
@ -244,7 +252,9 @@ func (e *docker) Tail(ctx context.Context, step *backend.Step) (io.ReadCloser, e
return rc, nil
}
func (e *docker) Destroy(_ context.Context, conf *backend.Config) error {
func (e *docker) DestroyWorkflow(_ context.Context, conf *backend.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
containerName := toContainerName(step)

View file

@ -115,8 +115,8 @@ func (e *kube) Load(context.Context) error {
}
// Setup the pipeline environment.
func (e *kube) Setup(ctx context.Context, conf *types.Config) error {
log.Trace().Msgf("Setting up Kubernetes primitives")
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
for _, vol := range conf.Volumes {
pvc, err := PersistentVolumeClaim(e.config.Namespace, vol.Name, e.config.StorageClass, e.config.VolumeSize, e.config.StorageRwx)
@ -168,25 +168,27 @@ func (e *kube) Setup(ctx context.Context, conf *types.Config) error {
}
// Start the pipeline step.
func (e *kube) Exec(ctx context.Context, step *types.Step) error {
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
pod, err := Pod(e.config.Namespace, step, e.config.PodLabels, e.config.PodAnnotations)
if err != nil {
return err
}
log.Trace().Msgf("Creating pod: %s", pod.Name)
log.Trace().Str("taskUUID", taskUUID).Msgf("Creating pod: %s", pod.Name)
_, err = e.client.CoreV1().Pods(e.config.Namespace).Create(ctx, pod, metav1.CreateOptions{})
return err
}
// Wait for the pipeline step to complete and returns
// the completion results.
func (e *kube) Wait(ctx context.Context, step *types.Step) (*types.State, error) {
func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
podName, err := dnsName(step.Name)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("Waiting for pod: %s", podName)
finished := make(chan bool)
podUpdated := func(old, new interface{}) {
@ -239,12 +241,14 @@ func (e *kube) Wait(ctx context.Context, step *types.Step) (*types.State, error)
}
// Tail the pipeline step logs.
func (e *kube) Tail(ctx context.Context, step *types.Step) (io.ReadCloser, error) {
func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
podName, err := dnsName(step.Name)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("Tail logs of pod: %s", podName)
up := make(chan bool)
podUpdated := func(old, new interface{}) {
@ -308,7 +312,9 @@ func (e *kube) Tail(ctx context.Context, step *types.Step) (io.ReadCloser, error
}
// Destroy the pipeline environment.
func (e *kube) Destroy(_ context.Context, conf *types.Config) error {
func (e *kube) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("Deleting Kubernetes primitives")
gracePeriodSeconds := int64(0) // immediately
dpb := metav1.DeletePropagationBackground

View file

@ -25,6 +25,7 @@ import (
"strings"
"github.com/alessio/shellescape"
"github.com/rs/zerolog/log"
"golang.org/x/exp/slices"
"github.com/woodpecker-ci/woodpecker/pipeline/backend/types"
@ -74,8 +75,10 @@ func (e *local) Load(context.Context) error {
return nil
}
// Setup the pipeline environment.
func (e *local) Setup(_ context.Context, c *types.Config) error {
// SetupWorkflow the pipeline environment.
func (e *local) SetupWorkflow(_ context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("create workflow environment")
baseDir, err := os.MkdirTemp("", "woodpecker-local-*")
if err != nil {
return err
@ -98,7 +101,7 @@ func (e *local) Setup(_ context.Context, c *types.Config) error {
// TODO: copy plugin-git binary to homeDir and set PATH
workflowID, err := e.getWorkflowIDFromConfig(c)
workflowID, err := e.getWorkflowIDFromConfig(conf)
if err != nil {
return err
}
@ -108,8 +111,10 @@ func (e *local) Setup(_ context.Context, c *types.Config) error {
return nil
}
// Exec the pipeline step.
func (e *local) Exec(ctx context.Context, step *types.Step) error {
// StartStep the pipeline step.
func (e *local) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("start step %s", step.Name)
state, err := e.getWorkflowStateFromStep(step)
if err != nil {
return err
@ -163,9 +168,11 @@ func (e *local) Exec(ctx context.Context, step *types.Step) error {
return cmd.Start()
}
// Wait for the pipeline step to complete and returns
// WaitStep for the pipeline step to complete and returns
// the completion results.
func (e *local) Wait(_ context.Context, step *types.Step) (*types.State, error) {
func (e *local) WaitStep(_ context.Context, step *types.Step, taskUUID string) (*types.State, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("wait for step %s", step.Name)
state, err := e.getWorkflowStateFromStep(step)
if err != nil {
return nil, err
@ -192,14 +199,17 @@ func (e *local) Wait(_ context.Context, step *types.Step) (*types.State, error)
}, err
}
// Tail the pipeline step logs.
func (e *local) Tail(context.Context, *types.Step) (io.ReadCloser, error) {
// TailStep the pipeline step logs.
func (e *local) TailStep(_ context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of step %s", step.Name)
return e.output, nil
}
// Destroy the pipeline environment.
func (e *local) Destroy(_ context.Context, c *types.Config) error {
state, err := e.getWorkflowStateFromConfig(c)
// DestroyWorkflow the pipeline environment.
func (e *local) DestroyWorkflow(_ context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("delete workflow environment")
state, err := e.getWorkflowStateFromConfig(conf)
if err != nil {
return err
}
@ -209,7 +219,7 @@ func (e *local) Destroy(_ context.Context, c *types.Config) error {
return err
}
workflowID, err := e.getWorkflowIDFromConfig(c)
workflowID, err := e.getWorkflowIDFromConfig(conf)
if err != nil {
return err
}

View file

@ -6,6 +6,7 @@ import (
"strings"
"github.com/melbahja/goph"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v2"
"github.com/woodpecker-ci/woodpecker/pipeline/backend/common"
@ -79,13 +80,15 @@ func (e *ssh) Load(ctx context.Context) error {
return nil
}
// Setup the pipeline environment.
func (e *ssh) Setup(_ context.Context, _ *types.Config) error {
// SetupWorkflow create the workflow environment.
func (e *ssh) SetupWorkflow(context.Context, *types.Config, string) error {
return nil
}
// Exec the pipeline step.
func (e *ssh) Exec(ctx context.Context, step *types.Step) error {
// StartStep start the step.
func (e *ssh) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Start step %s", step.Name)
// Get environment variables
var command []string
for a, b := range step.Environment {
@ -124,21 +127,21 @@ func (e *ssh) Exec(ctx context.Context, step *types.Step) error {
return e.cmd.Start()
}
// Wait for the pipeline step to complete and returns
// WaitStep for the pipeline step to complete and returns
// the completion results.
func (e *ssh) Wait(context.Context, *types.Step) (*types.State, error) {
func (e *ssh) WaitStep(context.Context, *types.Step, string) (*types.State, error) {
return &types.State{
Exited: true,
}, e.cmd.Wait()
}
// Tail the pipeline step logs.
func (e *ssh) Tail(context.Context, *types.Step) (io.ReadCloser, error) {
// TailStep the pipeline step logs.
func (e *ssh) TailStep(context.Context, *types.Step, string) (io.ReadCloser, error) {
return e.output, nil
}
// Destroy the pipeline environment.
func (e *ssh) Destroy(context.Context, *types.Config) error {
// DestroyWorkflow delete the workflow environment.
func (e *ssh) DestroyWorkflow(context.Context, *types.Config, string) error {
e.client.Close()
sftp, err := e.client.NewSftp()
if err != nil {

View file

@ -11,25 +11,25 @@ type Engine interface {
// Name returns the name of the backend.
Name() string
// Check if the backend is available.
IsAvailable(context.Context) bool
// IsAvailable check if the backend is available.
IsAvailable(ctx context.Context) bool
// Load the backend engine.
Load(context.Context) error
Load(ctx context.Context) error
// Setup the pipeline environment.
Setup(context.Context, *Config) error
// SetupWorkflow the workflow environment.
SetupWorkflow(ctx context.Context, conf *Config, taskUUID string) error
// Exec start the pipeline step.
Exec(context.Context, *Step) error
// StartStep start the workflow step.
StartStep(ctx context.Context, step *Step, taskUUID string) error
// Wait for the pipeline step to complete and returns
// WaitStep for the workflow step to complete and returns
// the completion results.
Wait(context.Context, *Step) (*State, error)
WaitStep(ctx context.Context, step *Step, taskUUID string) (*State, error)
// Tail the pipeline step logs.
Tail(context.Context, *Step) (io.ReadCloser, error)
// TailStep the workflow step logs.
TailStep(ctx context.Context, step *Step, taskUUID string) (io.ReadCloser, error)
// Destroy the pipeline environment.
Destroy(context.Context, *Config) error
// DestroyWorkflow the workflow environment.
DestroyWorkflow(ctx context.Context, conf *Config, taskUUID string) error
}

View file

@ -42,3 +42,9 @@ func WithDescription(desc map[string]string) Option {
r.Description = desc
}
}
func WithTaskUUID(uuid string) Option {
return func(r *Runtime) {
r.taskUUID = uuid
}
}

View file

@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
@ -16,6 +17,8 @@ import (
"github.com/woodpecker-ci/woodpecker/pipeline/multipart"
)
// TODO: move runtime into "runtime" subpackage
type (
// State defines the pipeline and process state.
State struct {
@ -45,6 +48,8 @@ type Runtime struct {
tracer Tracer
logger Logger
taskUUID string
Description map[string]string // The runtime descriptors.
}
@ -55,6 +60,7 @@ func New(spec *backend.Config, opts ...Option) *Runtime {
r.Description = map[string]string{}
r.spec = spec
r.ctx = context.Background()
r.taskUUID = uuid.New().String()
for _, opts := range opts {
opts(r)
}
@ -69,7 +75,7 @@ func (r *Runtime) MakeLogger() zerolog.Logger {
return logCtx.Logger()
}
// Starts the execution of the pipeline and waits for it to complete
// Starts the execution of an workflow and waits for it to complete
func (r *Runtime) Run(runnerCtx context.Context) error {
logger := r.MakeLogger()
logger.Debug().Msgf("Executing %d stages, in order of:", len(r.spec.Stages))
@ -86,13 +92,13 @@ func (r *Runtime) Run(runnerCtx context.Context) error {
}
defer func() {
if err := r.engine.Destroy(runnerCtx, r.spec); err != nil {
if err := r.engine.DestroyWorkflow(runnerCtx, r.spec, r.taskUUID); err != nil {
logger.Error().Err(err).Msg("could not destroy engine")
}
}()
r.started = time.Now().Unix()
if err := r.engine.Setup(r.ctx, r.spec); err != nil {
if err := r.engine.SetupWorkflow(r.ctx, r.spec, r.taskUUID); err != nil {
return err
}
@ -215,13 +221,13 @@ func (r *Runtime) execAll(steps []*backend.Step) <-chan error {
// Executes the step and returns the state and error.
func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
if err := r.engine.Exec(r.ctx, step); err != nil {
if err := r.engine.StartStep(r.ctx, step, r.taskUUID); err != nil {
return nil, err
}
var wg sync.WaitGroup
if r.logger != nil {
rc, err := r.engine.Tail(r.ctx, step)
rc, err := r.engine.TailStep(r.ctx, step, r.taskUUID)
if err != nil {
return nil, err
}
@ -246,7 +252,7 @@ func (r *Runtime) exec(step *backend.Step) (*backend.State, error) {
// Some pipeline backends, such as local, will close the pipe from Tail on Wait,
// so first make sure all reading has finished.
wg.Wait()
waitState, err := r.engine.Wait(r.ctx, step)
waitState, err := r.engine.WaitStep(r.ctx, step, r.taskUUID)
if err != nil {
if errors.Is(err, context.Canceled) {
return waitState, ErrCancel