Skip to content

Commit 53a3b00

Browse files
authored
ScheduledSparkApplications: NextRun should be recalculated if desired schedule changes (kubeflow#857)
* scheduledSparkApplications: NextRun should be recalculated whenever schedule changes * updatedScheduleRuntime -> updatedNextRunTime
1 parent 8a4a991 commit 53a3b00

File tree

2 files changed

+35
-5
lines changed

2 files changed

+35
-5
lines changed

pkg/controller/scheduledsparkapplication/controller.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -172,9 +172,11 @@ func (c *Controller) syncScheduledSparkApplication(key string) error {
172172
status.ScheduleState = v1beta2.ScheduledState
173173
now := c.clock.Now()
174174
nextRunTime := status.NextRun.Time
175-
if nextRunTime.IsZero() {
175+
// if we updated the schedule for an earlier execution - those changes need to be reflected
176+
updatedNextRunTime := schedule.Next(now)
177+
if nextRunTime.IsZero() || updatedNextRunTime.Before(nextRunTime) {
176178
// The first run of the application.
177-
nextRunTime = schedule.Next(now)
179+
nextRunTime = updatedNextRunTime
178180
status.NextRun = metav1.NewTime(nextRunTime)
179181
}
180182
if nextRunTime.Before(now) {

pkg/controller/scheduledsparkapplication/controller_test.go

+31-3
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestSyncScheduledSparkApplication_Allow(t *testing.T) {
4343
Name: "test-app-allow",
4444
},
4545
Spec: v1beta2.ScheduledSparkApplicationSpec{
46-
Schedule: "@every 1m",
46+
Schedule: "@every 10m",
4747
ConcurrencyPolicy: v1beta2.ConcurrencyAllow,
4848
},
4949
}
@@ -61,8 +61,8 @@ func TestSyncScheduledSparkApplication_Allow(t *testing.T) {
6161
// The first run should not have been triggered.
6262
assert.True(t, app.Status.LastRunName == "")
6363

64-
// Advance the clock by 1 minute.
65-
clk.Step(1 * time.Minute)
64+
// Advance the clock by 10 minutes.
65+
clk.Step(10 * time.Minute)
6666
if err := c.syncScheduledSparkApplication(key); err != nil {
6767
t.Fatal(err)
6868
}
@@ -145,6 +145,34 @@ func TestSyncScheduledSparkApplication_Allow(t *testing.T) {
145145
assert.Equal(t, 1, len(app.Status.PastSuccessfulRunNames))
146146
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(secondRunName, options)
147147
assert.NotNil(t, run)
148+
149+
// Test the case where we update the schedule to be more frequent
150+
app.Spec.Schedule = "@every 2m"
151+
recentRunName := app.Status.LastRunName
152+
recentRunTime := app.Status.LastRun.Time
153+
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Update(app)
154+
// sync our update
155+
if err := c.syncScheduledSparkApplication(key); err != nil {
156+
t.Fatal(err)
157+
}
158+
// Advance the clock by 3 minutes.
159+
clk.Step(3 * time.Minute)
160+
if err := c.syncScheduledSparkApplication(key); err != nil {
161+
t.Fatal(err)
162+
}
163+
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
164+
// A run should have been triggered
165+
assert.NotEqual(t, recentRunName, app.Status.LastRunName)
166+
assert.True(t, recentRunTime.Before(app.Status.LastRun.Time))
167+
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(app.Status.LastRunName, options)
168+
assert.NotNil(t, run)
169+
// Simulate completion of the last run
170+
run.Status.AppState.State = v1beta2.CompletedState
171+
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Update(run)
172+
// This sync should not start any new run, but update Status.PastSuccessfulRunNames.
173+
if err := c.syncScheduledSparkApplication(key); err != nil {
174+
t.Fatal(err)
175+
}
148176
}
149177

150178
func TestSyncScheduledSparkApplication_Forbid(t *testing.T) {

0 commit comments

Comments
 (0)