-
Notifications
You must be signed in to change notification settings - Fork 569
Expand file tree
/
Copy pathPipelineStatusTimelineRepository.go
More file actions
295 lines (269 loc) · 13.5 KB
/
PipelineStatusTimelineRepository.go
File metadata and controls
295 lines (269 loc) · 13.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package pipelineConfig
import (
"github.com/devtron-labs/devtron/pkg/sql"
"github.com/go-pg/pg"
"go.uber.org/zap"
"time"
)
type TimelineStatus = string
var TimelineStatusDescription string
const (
TIMELINE_STATUS_DEPLOYMENT_INITIATED TimelineStatus = "DEPLOYMENT_INITIATED"
TIMELINE_STATUS_GIT_COMMIT TimelineStatus = "GIT_COMMIT"
TIMELINE_STATUS_GIT_COMMIT_FAILED TimelineStatus = "GIT_COMMIT_FAILED"
TIMELINE_STATUS_KUBECTL_APPLY_STARTED TimelineStatus = "KUBECTL_APPLY_STARTED"
TIMELINE_STATUS_KUBECTL_APPLY_SYNCED TimelineStatus = "KUBECTL_APPLY_SYNCED"
TIMELINE_STATUS_APP_HEALTHY TimelineStatus = "HEALTHY"
TIMELINE_STATUS_DEPLOYMENT_FAILED TimelineStatus = "FAILED"
TIMELINE_STATUS_FETCH_TIMED_OUT TimelineStatus = "TIMED_OUT"
TIMELINE_STATUS_UNABLE_TO_FETCH_STATUS TimelineStatus = "UNABLE_TO_FETCH_STATUS"
TIMELINE_STATUS_DEPLOYMENT_SUPERSEDED TimelineStatus = "DEPLOYMENT_SUPERSEDED"
TIMELINE_STATUS_MANIFEST_GENERATED TimelineStatus = "MANIFEST_GENERATED"
)
const (
TIMELINE_DESCRIPTION_DEPLOYMENT_INITIATED string = "Deployment initiated successfully."
TIMELINE_DESCRIPTION_VULNERABLE_IMAGE string = "Deployment failed: Vulnerability policy violated."
TIMELINE_DESCRIPTION_MANIFEST_GENERATED string = "HELM_PACKAGE_GENERATED"
)
type PipelineStatusTimelineRepository interface {
SaveTimelines(timelines []*PipelineStatusTimeline) error
SaveTimelinesWithTxn(timelines []*PipelineStatusTimeline, tx *pg.Tx) error
UpdateTimelines(timelines []*PipelineStatusTimeline) error
UpdateTimelinesWithTxn(timelines []*PipelineStatusTimeline, tx *pg.Tx) error
FetchTimelinesByPipelineId(pipelineId int) ([]*PipelineStatusTimeline, error)
FetchTimelinesByWfrId(wfrId int) ([]*PipelineStatusTimeline, error)
FetchTimelineByWfrIdAndStatus(wfrId int, status TimelineStatus) (*PipelineStatusTimeline, error)
FetchTimelineByInstalledAppVersionHistoryIdAndStatus(installedAppVersionHistoryId int, status TimelineStatus) (*PipelineStatusTimeline, error)
FetchTimelineByWfrIdAndStatuses(wfrId int, statuses []TimelineStatus) ([]*PipelineStatusTimeline, error)
FetchTimelineByInstalledAppVersionHistoryIdAndPipelineStatuses(installedAppVersionHistoryId int, statuses []TimelineStatus) ([]*PipelineStatusTimeline, error)
FetchLatestTimelineByWfrId(wfrId int) (*PipelineStatusTimeline, error)
CheckIfTerminalStatusTimelinePresentByWfrId(wfrId int) (bool, error)
CheckIfTerminalStatusTimelinePresentByInstalledAppVersionHistoryId(installedAppVersionHistoryId int) (bool, error)
FetchLatestTimelineByAppIdAndEnvId(appId, envId int) (*PipelineStatusTimeline, error)
DeleteByCdWfrIdAndTimelineStatuses(cdWfrId int, status []TimelineStatus) error
DeleteByCdWfrIdAndTimelineStatusesWithTxn(cdWfrId int, status []TimelineStatus, tx *pg.Tx) error
FetchTimelinesByInstalledAppVersionHistoryId(installedAppVersionHistoryId int) ([]*PipelineStatusTimeline, error)
FetchLatestTimelinesByInstalledAppVersionHistoryId(installedAppVersionHistoryId int) (*PipelineStatusTimeline, error)
}
type PipelineStatusTimelineRepositoryImpl struct {
dbConnection *pg.DB
logger *zap.SugaredLogger
}
func NewPipelineStatusTimelineRepositoryImpl(dbConnection *pg.DB,
logger *zap.SugaredLogger) *PipelineStatusTimelineRepositoryImpl {
return &PipelineStatusTimelineRepositoryImpl{
dbConnection: dbConnection,
logger: logger,
}
}
type PipelineStatusTimeline struct {
tableName struct{} `sql:"pipeline_status_timeline" pg:",discard_unknown_columns"`
Id int `sql:"id,pk"`
InstalledAppVersionHistoryId int `sql:"installed_app_version_history_id,type:integer"`
CdWorkflowRunnerId int `sql:"cd_workflow_runner_id,type:integer"`
Status TimelineStatus `sql:"status"`
StatusDetail string `sql:"status_detail"`
StatusTime time.Time `sql:"status_time"`
sql.AuditLog
}
func (impl *PipelineStatusTimelineRepositoryImpl) SaveTimelines(timelines []*PipelineStatusTimeline) error {
err := impl.dbConnection.Insert(&timelines)
if err != nil {
impl.logger.Errorw("error in saving timeline of cd pipeline status", "err", err, "timeline", timelines)
return err
}
return nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) SaveTimelinesWithTxn(timelines []*PipelineStatusTimeline, tx *pg.Tx) error {
err := tx.Insert(&timelines)
if err != nil {
impl.logger.Errorw("error in saving timelines of cd pipeline status", "err", err, "timelines", timelines)
return err
}
return nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) UpdateTimelines(timelines []*PipelineStatusTimeline) error {
_, err := impl.dbConnection.Model(&timelines).Update()
if err != nil {
impl.logger.Errorw("error in updating timeline of cd pipeline status", "err", err, "timeline", timelines)
return err
}
return nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) UpdateTimelinesWithTxn(timelines []*PipelineStatusTimeline, tx *pg.Tx) error {
_, err := tx.Model(&timelines).Update()
if err != nil {
impl.logger.Errorw("error in updating timelines of cd pipeline status", "err", err, "timelines", timelines)
return err
}
return nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchTimelinesByPipelineId(pipelineId int) ([]*PipelineStatusTimeline, error) {
var timelines []*PipelineStatusTimeline
err := impl.dbConnection.Model(&timelines).
Join("INNER JOIN cd_workflow_runner wfr ON wfr.id = pipeline_status_timeline.cd_workflow_runner_id").
Join("INNER JOIN cd_workflow cw ON cw.id=wfr.cd_workflow_id").
Where("cw.pipelineId = ?", pipelineId).Select()
if err != nil {
impl.logger.Errorw("error in getting timelines by pipelineId", "err", err, "pipelineId", pipelineId)
return nil, err
}
return timelines, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchTimelinesByWfrId(wfrId int) ([]*PipelineStatusTimeline, error) {
var timelines []*PipelineStatusTimeline
err := impl.dbConnection.Model(&timelines).
Where("cd_workflow_runner_id = ?", wfrId).
Order("status_time ASC").Select()
if err != nil {
impl.logger.Errorw("error in getting timelines by wfrId", "err", err, "wfrId", wfrId)
return nil, err
}
return timelines, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchTimelineByWfrIdAndStatus(wfrId int, status TimelineStatus) (*PipelineStatusTimeline, error) {
timeline := &PipelineStatusTimeline{}
err := impl.dbConnection.Model(timeline).
Where("cd_workflow_runner_id = ?", wfrId).
Where("status = ?", status).
Limit(1).Select()
if err != nil {
impl.logger.Errorw("error in getting timeline of latest wf by wfrId and status", "err", err, "wfrId", wfrId, "status", status)
return nil, err
}
return timeline, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchTimelineByInstalledAppVersionHistoryIdAndStatus(installedAppVersionHistoryId int, status TimelineStatus) (*PipelineStatusTimeline, error) {
timeline := &PipelineStatusTimeline{}
err := impl.dbConnection.Model(timeline).
Where("installed_app_version_history_id = ?", installedAppVersionHistoryId).
Where("status = ?", status).
Limit(1).Select()
if err != nil {
impl.logger.Errorw("error in getting timeline of latest installed app version history by installedAppVersionHistoryId and status", "err", err, "installedAppVersionHistoryId", installedAppVersionHistoryId, "status", status)
return nil, err
}
return timeline, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchTimelineByWfrIdAndStatuses(wfrId int, statuses []TimelineStatus) ([]*PipelineStatusTimeline, error) {
var timelines []*PipelineStatusTimeline
err := impl.dbConnection.Model(&timelines).
Where("cd_workflow_runner_id = ?", wfrId).
Where("status in (?)", pg.In(statuses)).Select()
if err != nil {
impl.logger.Errorw("error in getting timeline of latest wf by wfrId and statuses", "err", err, "wfrId", wfrId, "statuses", statuses)
return nil, err
}
return timelines, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchTimelineByInstalledAppVersionHistoryIdAndPipelineStatuses(installedAppVersionHistoryId int, statuses []TimelineStatus) ([]*PipelineStatusTimeline, error) {
var timelines []*PipelineStatusTimeline
err := impl.dbConnection.Model(&timelines).
Where("installed_app_version_history_id = ?", installedAppVersionHistoryId).
Where("status in (?)", pg.In(statuses)).Select()
if err != nil {
impl.logger.Errorw("error in getting timeline of latest wf by installedAppVersionHistoryId and statuses", "err", err, "installedAppVersionHistoryId", installedAppVersionHistoryId, "statuses", statuses)
return nil, err
}
return timelines, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchLatestTimelineByWfrId(wfrId int) (*PipelineStatusTimeline, error) {
timeline := &PipelineStatusTimeline{}
err := impl.dbConnection.Model(timeline).
Where("cd_workflow_runner_id = ?", wfrId).
Order("status_time DESC").
Limit(1).Select()
if err != nil {
impl.logger.Errorw("error in getting timeline of latest wf by wfrId", "err", err, "wfrId", wfrId)
return nil, err
}
return timeline, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) CheckIfTerminalStatusTimelinePresentByWfrId(wfrId int) (bool, error) {
terminalStatus := []string{string(TIMELINE_STATUS_APP_HEALTHY), string(TIMELINE_STATUS_DEPLOYMENT_FAILED), string(TIMELINE_STATUS_GIT_COMMIT_FAILED), string(TIMELINE_STATUS_DEPLOYMENT_SUPERSEDED)}
timeline := &PipelineStatusTimeline{}
exists, err := impl.dbConnection.Model(timeline).
Where("cd_workflow_runner_id = ?", wfrId).
Where("status in (?)", pg.In(terminalStatus)).Exists()
if err != nil {
impl.logger.Errorw("error in checking if terminal timeline of latest wf by pipelineId and status", "err", err, "wfrId", wfrId)
return false, err
}
return exists, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) CheckIfTerminalStatusTimelinePresentByInstalledAppVersionHistoryId(installedAppVersionHistoryId int) (bool, error) {
terminalStatus := []string{string(TIMELINE_STATUS_APP_HEALTHY), string(TIMELINE_STATUS_DEPLOYMENT_FAILED), string(TIMELINE_STATUS_GIT_COMMIT_FAILED), string(TIMELINE_STATUS_DEPLOYMENT_SUPERSEDED)}
timeline := &PipelineStatusTimeline{}
exists, err := impl.dbConnection.Model(timeline).
Where("installed_app_version_history_id = ?", installedAppVersionHistoryId).
Where("status in (?)", pg.In(terminalStatus)).Exists()
if err != nil {
impl.logger.Errorw("error in checking if terminal timeline of latest installed app by installedAppVersionHistoryId and status", "err", err, "wfrId", installedAppVersionHistoryId)
return false, err
}
return exists, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchLatestTimelineByAppIdAndEnvId(appId, envId int) (*PipelineStatusTimeline, error) {
var timeline PipelineStatusTimeline
err := impl.dbConnection.Model(&timeline).
Column("pipeline_status_timeline.*").
Join("INNER JOIN cd_workflow_runner wfr ON wfr.id = pipeline_status_timeline.cd_workflow_runner_id").
Join("INNER JOIN cd_workflow cw ON cw.id=wfr.cd_workflow_id").
Join("INNER JOIN pipeline p ON p.id=cw.pipeline_id").
Where("p.app_id = ?", appId).
Where("p.environment_id = ?", envId).
Where("p.deleted = false").
Order("pipeline_status_timeline.status_time DESC").
Limit(1).
Select()
if err != nil {
impl.logger.Errorw("error in getting timelines by pipelineId", "err", err, "appId", appId, "envId", envId)
return nil, err
}
return &timeline, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) DeleteByCdWfrIdAndTimelineStatuses(cdWfrId int, status []TimelineStatus) error {
var timeline PipelineStatusTimeline
_, err := impl.dbConnection.Model(&timeline).
Where("cd_workflow_runner_id = ?", cdWfrId).
Where("status in (?)", pg.In(status)).Delete()
if err != nil {
impl.logger.Errorw("error in deleting pipeline status timeline by cdWfrId and status", "err", err, "cdWfrId", cdWfrId, "status", status)
return err
}
return nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) DeleteByCdWfrIdAndTimelineStatusesWithTxn(cdWfrId int, status []TimelineStatus, tx *pg.Tx) error {
var timeline PipelineStatusTimeline
_, err := tx.Model(&timeline).
Where("cd_workflow_runner_id = ?", cdWfrId).
Where("status in (?)", pg.In(status)).Delete()
if err != nil {
impl.logger.Errorw("error in deleting pipeline status timeline by cdWfrId and status", "err", err, "cdWfrId", cdWfrId, "status", status)
return err
}
return nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchTimelinesByInstalledAppVersionHistoryId(installedAppVersionHistoryId int) ([]*PipelineStatusTimeline, error) {
var timelines []*PipelineStatusTimeline
err := impl.dbConnection.Model(&timelines).
Where("installed_app_version_history_id = ?", installedAppVersionHistoryId).
Order("status_time ASC").Select()
if err != nil {
impl.logger.Errorw("error in getting timelines by installAppVersionHistoryId", "err", err, "wfrId", installedAppVersionHistoryId)
return nil, err
}
return timelines, nil
}
func (impl *PipelineStatusTimelineRepositoryImpl) FetchLatestTimelinesByInstalledAppVersionHistoryId(installedAppVersionHistoryId int) (*PipelineStatusTimeline, error) {
timeline := &PipelineStatusTimeline{}
err := impl.dbConnection.Model(timeline).
Where("installed_app_version_history_id = ?", installedAppVersionHistoryId).
Order("status_time DESC").
Limit(1).Select()
if err != nil {
impl.logger.Errorw("error in getting timeline of latest installed_app_version_history by installed_app_version_history_id", "err", err, "installed_app_version_history_id", installedAppVersionHistoryId)
return nil, err
}
return timeline, nil
}