Update pipeline state on server as a whole on approval (#3504)

We can not just update some records for steps, as we want the pipeline
engine as single source of truth but not manage the state.
And the server should only manage the state but not how pipelines work.

We can match the pipeline but neither workflows or steps 1:1, so we
"update" them as a whole by deleting existing workflow and step data and
insert the new info from engine.

close   #3494
close  #3472

---------
*Sponsored by Kithara Software GmbH*

---------

Co-authored-by: Robert Kaussow <xoxys@rknet.org>
This commit is contained in:
6543 2024-03-18 20:07:45 +01:00 committed by GitHub
parent 43264a5f8e
commit e57a09a404
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 78 additions and 230 deletions

View file

@ -1,4 +1,4 @@
// Code generated by mockery v2.40.3. DO NOT EDIT. // Code generated by mockery v2.42.1. DO NOT EDIT.
package mocks package mocks

View file

@ -79,25 +79,11 @@ func Approve(ctx context.Context, store store.Store, currentPipeline *model.Pipe
return nil, fmt.Errorf(msg) return nil, fmt.Errorf(msg)
} }
// TODO improve this // we have no way to link old workflows and steps in database to new engine generated steps,
for _, item := range pipelineItems { // so we just delete the old and insert the new ones
for _, wf := range currentPipeline.Workflows { if err := store.WorkflowsReplace(currentPipeline, currentPipeline.Workflows); err != nil {
if item.Workflow.Name == wf.Name { log.Error().Err(err).Str("repo", repo.FullName).Msgf("error persisting new steps for %s#%d after approval", repo.FullName, currentPipeline.Number)
item.Workflow = wf return nil, err
for _, stage := range item.Config.Stages {
for _, step := range stage.Steps {
for _, storeStep := range wf.Children {
if storeStep.Name == step.Name {
step.UUID = storeStep.UUID
break
}
}
}
}
break
}
}
} }
publishPipeline(ctx, currentPipeline, repo, user) publishPipeline(ctx, currentPipeline, repo, user)

View file

@ -125,6 +125,9 @@ func setPipelineStepsOnPipeline(pipeline *model.Pipeline, pipelineItems []*stepb
} }
} }
// the workflows in the pipeline should be empty as only we do populate them,
// but if a pipeline was already loaded form database it might contain things, so we just clean it
pipeline.Workflows = nil
for _, item := range pipelineItems { for _, item := range pipelineItems {
for _, stage := range item.Config.Stages { for _, stage := range item.Config.Stages {
for _, step := range stage.Steps { for _, step := range stage.Steps {

View file

@ -126,24 +126,7 @@ func (s storage) DeletePipeline(pipeline *model.Pipeline) error {
} }
func (s storage) deletePipeline(sess *xorm.Session, pipelineID int64) error { func (s storage) deletePipeline(sess *xorm.Session, pipelineID int64) error {
// delete related steps if err := s.workflowsDelete(sess, pipelineID); err != nil {
for startSteps := 0; ; startSteps += perPage {
stepIDs := make([]int64, 0, perPage)
if err := sess.Limit(perPage, startSteps).Table("steps").Cols("step_id").Where("step_pipeline_id = ?", pipelineID).Find(&stepIDs); err != nil {
return err
}
if len(stepIDs) == 0 {
break
}
for i := range stepIDs {
if err := deleteStep(sess, stepIDs[i]); err != nil {
return err
}
}
}
if _, err := sess.Where("workflow_pipeline_id = ?", pipelineID).Delete(new(model.Workflow)); err != nil {
return err return err
} }

View file

@ -44,6 +44,14 @@ func (s storage) WorkflowsCreate(workflows []*model.Workflow) error {
return err return err
} }
if err := s.workflowsCreate(sess, workflows); err != nil {
return err
}
return sess.Commit()
}
func (s storage) workflowsCreate(sess *xorm.Session, workflows []*model.Workflow) error {
for i := range workflows { for i := range workflows {
// only Insert on single object ref set auto created ID back to object // only Insert on single object ref set auto created ID back to object
if err := s.stepCreate(sess, workflows[i].Children); err != nil { if err := s.stepCreate(sess, workflows[i].Children); err != nil {
@ -53,10 +61,50 @@ func (s storage) WorkflowsCreate(workflows []*model.Workflow) error {
return err return err
} }
} }
return nil
}
// WorkflowsReplace performs an atomic replacement of workflows and associated steps by deleting all existing workflows and steps and inserting the new ones
func (s storage) WorkflowsReplace(pipeline *model.Pipeline, workflows []*model.Workflow) error {
sess := s.engine.NewSession()
defer sess.Close()
if err := sess.Begin(); err != nil {
return err
}
if err := s.workflowsDelete(sess, pipeline.ID); err != nil {
return err
}
if err := s.workflowsCreate(sess, workflows); err != nil {
return err
}
return sess.Commit() return sess.Commit()
} }
func (s storage) workflowsDelete(sess *xorm.Session, pipelineID int64) error {
// delete related steps
for startSteps := 0; ; startSteps += perPage {
stepIDs := make([]int64, 0, perPage)
if err := sess.Limit(perPage, startSteps).Table("steps").Cols("step_id").Where("step_pipeline_id = ?", pipelineID).Find(&stepIDs); err != nil {
return err
}
if len(stepIDs) == 0 {
break
}
for i := range stepIDs {
if err := deleteStep(sess, stepIDs[i]); err != nil {
return err
}
}
}
_, err := sess.Where("workflow_pipeline_id = ?", pipelineID).Delete(new(model.Workflow))
return err
}
func (s storage) WorkflowList(pipeline *model.Pipeline) ([]*model.Workflow, error) { func (s storage) WorkflowList(pipeline *model.Pipeline) ([]*model.Workflow, error) {
return s.workflowList(s.engine.NewSession(), pipeline) return s.workflowList(s.engine.NewSession(), pipeline)
} }

View file

@ -1,4 +1,4 @@
// Code generated by mockery v2.40.3. DO NOT EDIT. // Code generated by mockery v2.42.1. DO NOT EDIT.
package mocks package mocks
@ -174,52 +174,6 @@ func (_m *Store) Close() error {
return r0 return r0
} }
// ConfigCreate provides a mock function with given fields: _a0
func (_m *Store) ConfigCreate(_a0 *model.Config) error {
ret := _m.Called(_a0)
if len(ret) == 0 {
panic("no return value specified for ConfigCreate")
}
var r0 error
if rf, ok := ret.Get(0).(func(*model.Config) error); ok {
r0 = rf(_a0)
} else {
r0 = ret.Error(0)
}
return r0
}
// ConfigFindApproved provides a mock function with given fields: _a0
func (_m *Store) ConfigFindApproved(_a0 *model.Config) (bool, error) {
ret := _m.Called(_a0)
if len(ret) == 0 {
panic("no return value specified for ConfigFindApproved")
}
var r0 bool
var r1 error
if rf, ok := ret.Get(0).(func(*model.Config) (bool, error)); ok {
return rf(_a0)
}
if rf, ok := ret.Get(0).(func(*model.Config) bool); ok {
r0 = rf(_a0)
} else {
r0 = ret.Get(0).(bool)
}
if rf, ok := ret.Get(1).(func(*model.Config) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ConfigPersist provides a mock function with given fields: _a0 // ConfigPersist provides a mock function with given fields: _a0
func (_m *Store) ConfigPersist(_a0 *model.Config) (*model.Config, error) { func (_m *Store) ConfigPersist(_a0 *model.Config) (*model.Config, error) {
ret := _m.Called(_a0) ret := _m.Called(_a0)
@ -645,36 +599,6 @@ func (_m *Store) GetPipeline(_a0 int64) (*model.Pipeline, error) {
return r0, r1 return r0, r1
} }
// GetPipelineCommit provides a mock function with given fields: _a0, _a1, _a2
func (_m *Store) GetPipelineCommit(_a0 *model.Repo, _a1 string, _a2 string) (*model.Pipeline, error) {
ret := _m.Called(_a0, _a1, _a2)
if len(ret) == 0 {
panic("no return value specified for GetPipelineCommit")
}
var r0 *model.Pipeline
var r1 error
if rf, ok := ret.Get(0).(func(*model.Repo, string, string) (*model.Pipeline, error)); ok {
return rf(_a0, _a1, _a2)
}
if rf, ok := ret.Get(0).(func(*model.Repo, string, string) *model.Pipeline); ok {
r0 = rf(_a0, _a1, _a2)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.Pipeline)
}
}
if rf, ok := ret.Get(1).(func(*model.Repo, string, string) error); ok {
r1 = rf(_a0, _a1, _a2)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetPipelineCount provides a mock function with given fields: // GetPipelineCount provides a mock function with given fields:
func (_m *Store) GetPipelineCount() (int64, error) { func (_m *Store) GetPipelineCount() (int64, error) {
ret := _m.Called() ret := _m.Called()
@ -853,66 +777,6 @@ func (_m *Store) GetPipelineQueue() ([]*model.Feed, error) {
return r0, r1 return r0, r1
} }
// GetPipelineRef provides a mock function with given fields: _a0, _a1
func (_m *Store) GetPipelineRef(_a0 *model.Repo, _a1 string) (*model.Pipeline, error) {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for GetPipelineRef")
}
var r0 *model.Pipeline
var r1 error
if rf, ok := ret.Get(0).(func(*model.Repo, string) (*model.Pipeline, error)); ok {
return rf(_a0, _a1)
}
if rf, ok := ret.Get(0).(func(*model.Repo, string) *model.Pipeline); ok {
r0 = rf(_a0, _a1)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.Pipeline)
}
}
if rf, ok := ret.Get(1).(func(*model.Repo, string) error); ok {
r1 = rf(_a0, _a1)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetRedirection provides a mock function with given fields: _a0
func (_m *Store) GetRedirection(_a0 string) (*model.Redirection, error) {
ret := _m.Called(_a0)
if len(ret) == 0 {
panic("no return value specified for GetRedirection")
}
var r0 *model.Redirection
var r1 error
if rf, ok := ret.Get(0).(func(string) (*model.Redirection, error)); ok {
return rf(_a0)
}
if rf, ok := ret.Get(0).(func(string) *model.Redirection); ok {
r0 = rf(_a0)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*model.Redirection)
}
}
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(_a0)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetRepo provides a mock function with given fields: _a0 // GetRepo provides a mock function with given fields: _a0
func (_m *Store) GetRepo(_a0 int64) (*model.Repo, error) { func (_m *Store) GetRepo(_a0 int64) (*model.Repo, error) {
ret := _m.Called(_a0) ret := _m.Called(_a0)
@ -1363,24 +1227,6 @@ func (_m *Store) LogFind(_a0 *model.Step) ([]*model.LogEntry, error) {
return r0, r1 return r0, r1
} }
// LogSave provides a mock function with given fields: _a0, _a1
func (_m *Store) LogSave(_a0 *model.Step, _a1 []*model.LogEntry) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for LogSave")
}
var r0 error
if rf, ok := ret.Get(0).(func(*model.Step, []*model.LogEntry) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// Migrate provides a mock function with given fields: _a0 // Migrate provides a mock function with given fields: _a0
func (_m *Store) Migrate(_a0 bool) error { func (_m *Store) Migrate(_a0 bool) error {
ret := _m.Called(_a0) ret := _m.Called(_a0)
@ -1633,24 +1479,6 @@ func (_m *Store) OrgUpdate(_a0 *model.Org) error {
return r0 return r0
} }
// PermDelete provides a mock function with given fields: perm
func (_m *Store) PermDelete(perm *model.Perm) error {
ret := _m.Called(perm)
if len(ret) == 0 {
panic("no return value specified for PermDelete")
}
var r0 error
if rf, ok := ret.Get(0).(func(*model.Perm) error); ok {
r0 = rf(perm)
} else {
r0 = ret.Error(0)
}
return r0
}
// PermFind provides a mock function with given fields: user, repo // PermFind provides a mock function with given fields: user, repo
func (_m *Store) PermFind(user *model.User, repo *model.Repo) (*model.Perm, error) { func (_m *Store) PermFind(user *model.User, repo *model.Repo) (*model.Perm, error) {
ret := _m.Called(user, repo) ret := _m.Called(user, repo)
@ -1681,24 +1509,6 @@ func (_m *Store) PermFind(user *model.User, repo *model.Repo) (*model.Perm, erro
return r0, r1 return r0, r1
} }
// PermFlush provides a mock function with given fields: user, before
func (_m *Store) PermFlush(user *model.User, before int64) error {
ret := _m.Called(user, before)
if len(ret) == 0 {
panic("no return value specified for PermFlush")
}
var r0 error
if rf, ok := ret.Get(0).(func(*model.User, int64) error); ok {
r0 = rf(user, before)
} else {
r0 = ret.Error(0)
}
return r0
}
// PermUpsert provides a mock function with given fields: perm // PermUpsert provides a mock function with given fields: perm
func (_m *Store) PermUpsert(perm *model.Perm) error { func (_m *Store) PermUpsert(perm *model.Perm) error {
ret := _m.Called(perm) ret := _m.Called(perm)
@ -2609,6 +2419,24 @@ func (_m *Store) WorkflowsCreate(_a0 []*model.Workflow) error {
return r0 return r0
} }
// WorkflowsReplace provides a mock function with given fields: _a0, _a1
func (_m *Store) WorkflowsReplace(_a0 *model.Pipeline, _a1 []*model.Workflow) error {
ret := _m.Called(_a0, _a1)
if len(ret) == 0 {
panic("no return value specified for WorkflowsReplace")
}
var r0 error
if rf, ok := ret.Get(0).(func(*model.Pipeline, []*model.Workflow) error); ok {
r0 = rf(_a0, _a1)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // NewStore creates a new instance of Store. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value. // The first argument is typically a *testing.T value.
func NewStore(t interface { func NewStore(t interface {

View file

@ -171,6 +171,7 @@ type Store interface {
// Workflow // Workflow
WorkflowGetTree(*model.Pipeline) ([]*model.Workflow, error) WorkflowGetTree(*model.Pipeline) ([]*model.Workflow, error)
WorkflowsCreate([]*model.Workflow) error WorkflowsCreate([]*model.Workflow) error
WorkflowsReplace(*model.Pipeline, []*model.Workflow) error
WorkflowLoad(int64) (*model.Workflow, error) WorkflowLoad(int64) (*model.Workflow, error)
WorkflowUpdate(*model.Workflow) error WorkflowUpdate(*model.Workflow) error

1
web/components.d.ts vendored
View file

@ -109,7 +109,6 @@ declare module 'vue' {
Tab: typeof import('./src/components/layout/scaffold/Tab.vue')['default'] Tab: typeof import('./src/components/layout/scaffold/Tab.vue')['default']
Tabs: typeof import('./src/components/layout/scaffold/Tabs.vue')['default'] Tabs: typeof import('./src/components/layout/scaffold/Tabs.vue')['default']
TextField: typeof import('./src/components/form/TextField.vue')['default'] TextField: typeof import('./src/components/form/TextField.vue')['default']
UserAPITab: typeof import('./src/components/user/UserAPITab.vue')['default']
UserCLIAndAPITab: typeof import('./src/components/user/UserCLIAndAPITab.vue')['default'] UserCLIAndAPITab: typeof import('./src/components/user/UserCLIAndAPITab.vue')['default']
UserGeneralTab: typeof import('./src/components/user/UserGeneralTab.vue')['default'] UserGeneralTab: typeof import('./src/components/user/UserGeneralTab.vue')['default']
UserSecretsTab: typeof import('./src/components/user/UserSecretsTab.vue')['default'] UserSecretsTab: typeof import('./src/components/user/UserSecretsTab.vue')['default']