Skip to content

Commit

Permalink
update code to fix error on termination time when using sidecar (kube…
Browse files Browse the repository at this point in the history
…flow#867)

* update code to fix error on termination time when using sidecar

* fix and add new tests

* fixup! update code to fix error on termination time when using sidecar
  • Loading branch information
ImpSy authored Apr 11, 2020
1 parent 2e4559e commit 2bd7a00
Show file tree
Hide file tree
Showing 4 changed files with 307 additions and 32 deletions.
12 changes: 12 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ type ApplicationState struct {
ErrorMessage string `json:"errorMessage,omitempty"`
}

// DriverState tells the current state of a spark driver.
type DriverState string

// Different states a spark driver may have.
const (
DriverPendingState DriverState = "PENDING"
DriverRunningState DriverState = "RUNNING"
DriverCompletedState DriverState = "COMPLETED"
DriverFailedState DriverState = "FAILED"
DriverUnknownState DriverState = "UNKNOWN"
)

// ExecutorState tells the current state of an executor.
type ExecutorState string

Expand Down
31 changes: 16 additions & 15 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,29 +324,30 @@ func (c *Controller) getAndUpdateDriverState(app *v1beta2.SparkApplication) erro
}

app.Status.SparkApplicationID = getSparkApplicationID(driverPod)
driverState := podStatusToDriverState(driverPod.Status)

if driverPod.Status.Phase == apiv1.PodSucceeded || driverPod.Status.Phase == apiv1.PodFailed {
if hasDriverTerminated(driverState) {
if app.Status.TerminationTime.IsZero() {
app.Status.TerminationTime = metav1.Now()
}
if driverPod.Status.Phase == apiv1.PodFailed {
if len(driverPod.Status.ContainerStatuses) > 0 {
terminatedState := driverPod.Status.ContainerStatuses[0].State.Terminated
if terminatedState != nil {
app.Status.AppState.ErrorMessage = fmt.Sprintf("driver pod failed with ExitCode: %d, Reason: %s", terminatedState.ExitCode, terminatedState.Reason)
if driverState == v1beta2.DriverFailedState {
state := getDriverContainerTerminatedState(driverPod.Status)
if state != nil {
if state.ExitCode != 0 {
app.Status.AppState.ErrorMessage = fmt.Sprintf("driver container failed with ExitCode: %d, Reason: %s", state.ExitCode, state.Reason)
}
} else {
app.Status.AppState.ErrorMessage = "driver container status missing"
}
}
}

newState := driverStateToApplicationState(driverPod.Status)
newState := driverStateToApplicationState(driverState)
// Only record a driver event if the application state (derived from the driver pod phase) has changed.
if newState != app.Status.AppState.State {
c.recordDriverEvent(app, driverPod.Status.Phase, driverPod.Name)
c.recordDriverEvent(app, driverState, driverPod.Name)
app.Status.AppState.State = newState
}
app.Status.AppState.State = newState

return nil
}
Expand Down Expand Up @@ -937,17 +938,17 @@ func (c *Controller) recordSparkApplicationEvent(app *v1beta2.SparkApplication)
}
}

func (c *Controller) recordDriverEvent(app *v1beta2.SparkApplication, phase apiv1.PodPhase, name string) {
func (c *Controller) recordDriverEvent(app *v1beta2.SparkApplication, phase v1beta2.DriverState, name string) {
switch phase {
case apiv1.PodSucceeded:
case v1beta2.DriverCompletedState:
c.recorder.Eventf(app, apiv1.EventTypeNormal, "SparkDriverCompleted", "Driver %s completed", name)
case apiv1.PodPending:
case v1beta2.DriverPendingState:
c.recorder.Eventf(app, apiv1.EventTypeNormal, "SparkDriverPending", "Driver %s is pending", name)
case apiv1.PodRunning:
case v1beta2.DriverRunningState:
c.recorder.Eventf(app, apiv1.EventTypeNormal, "SparkDriverRunning", "Driver %s is running", name)
case apiv1.PodFailed:
case v1beta2.DriverFailedState:
c.recorder.Eventf(app, apiv1.EventTypeWarning, "SparkDriverFailed", "Driver %s failed", name)
case apiv1.PodUnknown:
case v1beta2.DriverUnknownState:
c.recorder.Eventf(app, apiv1.EventTypeWarning, "SparkDriverUnknownState", "Driver %s in unknown state", name)
}
}
Expand Down
237 changes: 235 additions & 2 deletions pkg/controller/sparkapplication/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,176 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) {
successMetricCount: 1,
},
},
{
appName: appName,
oldAppStatus: v1beta2.RunningState,
oldExecutorStatus: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorRunningState},
driverPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: driverPodName,
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkDriverRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: config.SparkDriverContainerName,
State: apiv1.ContainerState{
Running: &apiv1.ContainerStateRunning{},
},
},
{
Name: "sidecar",
State: apiv1.ContainerState{
Terminated: &apiv1.ContainerStateTerminated{
ExitCode: 0,
},
},
},
},
},
},
executorPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "exec-1",
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkExecutorRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodSucceeded,
},
},
expectedAppState: v1beta2.RunningState,
expectedExecutorState: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorCompletedState},
expectedAppMetrics: metrics{},
expectedExecutorMetrics: executorMetrics{
successMetricCount: 1,
},
},
{
appName: appName,
oldAppStatus: v1beta2.RunningState,
oldExecutorStatus: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorRunningState},
driverPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: driverPodName,
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkDriverRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: config.SparkDriverContainerName,
State: apiv1.ContainerState{
Terminated: &apiv1.ContainerStateTerminated{
ExitCode: 0,
},
},
},
{
Name: "sidecar",
State: apiv1.ContainerState{
Running: &apiv1.ContainerStateRunning{},
},
},
},
},
},
executorPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "exec-1",
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkExecutorRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodSucceeded,
},
},
expectedAppState: v1beta2.SucceedingState,
expectedExecutorState: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorCompletedState},
expectedAppMetrics: metrics{
successMetricCount: 1,
},
expectedExecutorMetrics: executorMetrics{
successMetricCount: 1,
},
},
{
appName: appName,
oldAppStatus: v1beta2.RunningState,
oldExecutorStatus: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorRunningState},
driverPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: driverPodName,
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkDriverRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodRunning,
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: config.SparkDriverContainerName,
State: apiv1.ContainerState{
Terminated: &apiv1.ContainerStateTerminated{
ExitCode: 137,
Reason: "OOMKilled",
},
},
},
{
Name: "sidecar",
State: apiv1.ContainerState{
Running: &apiv1.ContainerStateRunning{},
},
},
},
},
},
executorPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "exec-1",
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkExecutorRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodSucceeded,
},
},
expectedAppState: v1beta2.FailingState,
expectedExecutorState: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorCompletedState},
expectedAppMetrics: metrics{
failedMetricCount: 1,
},
expectedExecutorMetrics: executorMetrics{
successMetricCount: 1,
},
},
{
appName: appName,
oldAppStatus: v1beta2.RunningState,
Expand All @@ -1036,6 +1206,7 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) {
Phase: apiv1.PodFailed,
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: config.SparkDriverContainerName,
State: apiv1.ContainerState{
Terminated: &apiv1.ContainerStateTerminated{
ExitCode: 137,
Expand Down Expand Up @@ -1069,6 +1240,66 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) {
failedMetricCount: 1,
},
},
{
appName: appName,
oldAppStatus: v1beta2.RunningState,
oldExecutorStatus: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorRunningState},
driverPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: driverPodName,
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkDriverRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodFailed,
ContainerStatuses: []apiv1.ContainerStatus{
{
Name: config.SparkDriverContainerName,
State: apiv1.ContainerState{
Terminated: &apiv1.ContainerStateTerminated{
ExitCode: 0,
},
},
},
{
Name: "sidecar",
State: apiv1.ContainerState{
Terminated: &apiv1.ContainerStateTerminated{
ExitCode: 137,
Reason: "OOMKilled",
},
},
},
},
},
},
executorPod: &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "exec-1",
Namespace: "test",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkExecutorRole,
config.SparkAppNameLabel: appName,
},
ResourceVersion: "1",
},
Status: apiv1.PodStatus{
Phase: apiv1.PodSucceeded,
},
},
expectedAppState: v1beta2.SucceedingState,
expectedExecutorState: map[string]v1beta2.ExecutorState{"exec-1": v1beta2.ExecutorCompletedState},
expectedAppMetrics: metrics{
successMetricCount: 1,
},
expectedExecutorMetrics: executorMetrics{
successMetricCount: 1,
},
},
{
appName: appName,
oldAppStatus: v1beta2.FailingState,
Expand Down Expand Up @@ -1192,8 +1423,10 @@ func TestSyncSparkApplication_ExecutingState(t *testing.T) {
// Validate error message if the driver pod failed.
if test.driverPod != nil && test.driverPod.Status.Phase == apiv1.PodFailed {
if len(test.driverPod.Status.ContainerStatuses) > 0 && test.driverPod.Status.ContainerStatuses[0].State.Terminated != nil {
assert.Equal(t, updatedApp.Status.AppState.ErrorMessage,
fmt.Sprintf("driver pod failed with ExitCode: %d, Reason: %s", test.driverPod.Status.ContainerStatuses[0].State.Terminated.ExitCode, test.driverPod.Status.ContainerStatuses[0].State.Terminated.Reason))
if test.driverPod.Status.ContainerStatuses[0].State.Terminated.ExitCode != 0 {
assert.Equal(t, updatedApp.Status.AppState.ErrorMessage,
fmt.Sprintf("driver container failed with ExitCode: %d, Reason: %s", test.driverPod.Status.ContainerStatuses[0].State.Terminated.ExitCode, test.driverPod.Status.ContainerStatuses[0].State.Terminated.Reason))
}
} else {
assert.Equal(t, updatedApp.Status.AppState.ErrorMessage, "driver container status missing")
}
Expand Down
Loading

0 comments on commit 2bd7a00

Please sign in to comment.