Skip to content

Commit

Permalink
Added support for retrying failed submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Feb 12, 2018
1 parent f6fad48 commit 2cf8975
Show file tree
Hide file tree
Showing 14 changed files with 289 additions and 116 deletions.
1 change: 1 addition & 0 deletions examples/spark-pi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ spec:
memory: "512m"
labels:
version: 2.3.0
serviceAccount: spark
executor:
cores: 1
instances: 1
Expand Down
2 changes: 1 addition & 1 deletion hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ ${CODEGEN_PKG}/generate-groups.sh "all" \
--output-base "$(dirname ${BASH_SOURCE})/../../.."

# To use your own boilerplate text append:
# --go-header-file ${SCRIPT_ROOT}/hack/custom-boilerplate.go.txt
# --go-header-file ${SCRIPT_ROOT}/hack/custom-boilerplate.go.txt
2 changes: 1 addition & 1 deletion hack/verify-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ then
else
echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh"
exit 1
fi
fi
3 changes: 1 addition & 2 deletions manifest/spark-operator-rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#
# Copyright 2017 Google LLC
#
Expand Down Expand Up @@ -49,4 +48,4 @@ subjects:
roleRef:
kind: ClusterRole
name: sparkoperator
apiGroup: rbac.authorization.k8s.io
apiGroup: rbac.authorization.k8s.io
2 changes: 1 addition & 1 deletion manifest/spark-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,4 @@ spec:
imagePullPolicy: Always
command: ["/usr/bin/spark-operator"]
args:
- -logtostderr
- -logtostderr
54 changes: 54 additions & 0 deletions manifest/spark-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
namespace: default
name: spark-role
rules:
- apiGroups:
- "" # "" indicates the core API group
resources:
- "pods"
verbs:
- "*"
- apiGroups:
- "" # "" indicates the core API group
resources:
- "services"
verbs:
- "*"
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: default
subjects:
- kind: ServiceAccount
name: spark
namespace: default
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io
4 changes: 4 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type SparkApplicationSpec struct {
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"`
// NodeSelector is the Kubernetes node selector to be added to the driver and executor pods.
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// MaxSubmissionRetries is the maximum number of times to retry a failed submission.
MaxSubmissionRetries int32 `json:"maxSubmissionRetries,omitempty"`
}

// ApplicationStateType represents the type of the current state of an application.
Expand Down Expand Up @@ -167,6 +169,8 @@ type SparkApplicationStatus struct {
AppState ApplicationState `json:"applicationState"`
// ExecutorState records the state of executors by executor Pod names.
ExecutorState map[string]ExecutorState `json:"executorState,omitempty"`
// SubmissionRetries is the number of retries attempted for a failed submission.
SubmissionRetries int32 `json:"submissionRetries,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
168 changes: 105 additions & 63 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type SparkApplicationController struct {
recorder record.EventRecorder
runner *sparkSubmitRunner
sparkPodMonitor *sparkPodMonitor
appStateReportingChan <-chan appStateUpdate
appStateReportingChan <-chan *appStateUpdate
podStateReportingChan <-chan interface{}
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func newSparkApplicationController(
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"spark-application-controller")

appStateReportingChan := make(chan appStateUpdate, submissionRunnerWorkers)
appStateReportingChan := make(chan *appStateUpdate, submissionRunnerWorkers)
podStateReportingChan := make(chan interface{})

runner := newSparkSubmitRunner(submissionRunnerWorkers, appStateReportingChan)
Expand Down Expand Up @@ -238,21 +238,28 @@ func (s *SparkApplicationController) syncSparkApplication(key string) error {
if err != nil {
return err
}
s.submitApp(app, false)
s.submitApp(app)
return nil
}

func (s *SparkApplicationController) submitApp(app *v1alpha1.SparkApplication, resubmission bool) {
updatedApp := s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) {
if resubmission {
// Clear the Status field if it's a resubmission.
toUpdate.Status = v1alpha1.SparkApplicationStatus{}
}
toUpdate.Status.AppID = buildAppID(toUpdate)
toUpdate.Status.AppState.State = v1alpha1.NewState
createSparkUIService(toUpdate, s.kubeClient)
})
func (s *SparkApplicationController) submitApp(app *v1alpha1.SparkApplication) {
appStatus := v1alpha1.SparkApplicationStatus{
AppID: buildAppID(app),
AppState: v1alpha1.ApplicationState{
State: v1alpha1.NewState,
},
}
name, port, err := createSparkUIService(app, appStatus.AppID, s.kubeClient)
if err != nil {
glog.Errorf("failed to create a UI service for SparkApplication %s: %v", app.Name, err)
} else {
appStatus.DriverInfo.WebUIServiceName = name
appStatus.DriverInfo.WebUIPort = port
}

updatedApp := s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) {
*status = appStatus
})
if updatedApp == nil {
return
}
Expand All @@ -264,7 +271,7 @@ func (s *SparkApplicationController) submitApp(app *v1alpha1.SparkApplication, r
updatedApp.Name,
err)
}

s.runner.submit(newSubmission(submissionCmdArgs, updatedApp))
}

Expand Down Expand Up @@ -301,34 +308,34 @@ func (s *SparkApplicationController) processSingleDriverStateUpdate(
return nil
}

updated := s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) {
toUpdate.Status.DriverInfo.PodName = update.podName
// The application state is solely based on the driver pod phase except when submission fails and
// no driver pod is launched.
appState := driverPodPhaseToApplicationState(update.podPhase)
if isAppTerminated(appState) {
s.recorder.Eventf(
app,
apiv1.EventTypeNormal,
"SparkApplicationTermination",
"SparkApplication %s terminated with state: %v",
update.appName,
appState)
}

updated := s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) {
status.DriverInfo.PodName = update.podName
if update.nodeName != "" {
if nodeIP := s.getNodeExternalIP(update.nodeName); nodeIP != "" {
toUpdate.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", nodeIP,
toUpdate.Status.DriverInfo.WebUIPort)
status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", nodeIP,
status.DriverInfo.WebUIPort)
}
}

// Update the application based on the driver pod phase.
// The application state is solely based on the driver pod phase except when submission fails and
// no driver pod is launched.
toUpdate.Status.AppState.State = driverPodPhaseToApplicationState(update.podPhase)
status.AppState.State = appState
if !update.completionTime.IsZero() {
toUpdate.Status.CompletionTime = update.completionTime
status.CompletionTime = update.completionTime
}
})

if updated != nil && isAppTerminated(updated.Status.AppState.State) {
s.recorder.Eventf(
updated,
apiv1.EventTypeNormal,
"SparkApplicationTermination",
"SparkApplication %s terminated with state: %v",
updated.Name,
updated.Status.AppState)
}

return updated
}

Expand All @@ -338,7 +345,7 @@ func (s *SparkApplicationController) processAppStateUpdates() {
}
}

func (s *SparkApplicationController) processSingleAppStateUpdate(update appStateUpdate) {
func (s *SparkApplicationController) processSingleAppStateUpdate(update *appStateUpdate) {
key := getApplicationKey(update.namespace, update.name)
app, err := s.getSparkApplicationFromStore(key)
if err != nil {
Expand All @@ -347,22 +354,40 @@ func (s *SparkApplicationController) processSingleAppStateUpdate(update appState
return
}

updated := s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) {
toUpdate.Status.AppState.State = update.state
toUpdate.Status.AppState.ErrorMessage = update.errorMessage
if !update.submissionTime.IsZero() {
toUpdate.Status.SubmissionTime = update.submissionTime
}
})

if updated != nil && updated.Status.AppState.State == v1alpha1.FailedSubmissionState {
submissionRetries := app.Status.SubmissionRetries
if update.state == v1alpha1.FailedSubmissionState {
s.recorder.Eventf(
updated,
app,
apiv1.EventTypeNormal,
"SparkApplicationSubmissionFailure",
"SparkApplication %s failed submission",
updated.Name)
update.name)

if submissionRetries < app.Spec.MaxSubmissionRetries {
glog.Infof("Retrying submission of SparkApplication %s", update.name)
key := getApplicationKey(update.namespace, update.name)
s.queue.AddRateLimited(key)
submissionRetries++
s.recorder.Eventf(
app,
apiv1.EventTypeNormal,
"SparkApplicationSubmissionRetry",
"Retried submission of SparkApplication %s",
update.name)
} else {
glog.Errorf("maximum number of submission retries of SparkApplication %s has been reached, not "+
"attempting more retries", update.name)
}
}

s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) {
status.AppState.State = update.state
status.AppState.ErrorMessage = update.errorMessage
status.SubmissionRetries = submissionRetries
if !update.submissionTime.IsZero() {
status.SubmissionTime = update.submissionTime
}
})
}

func (s *SparkApplicationController) processSingleExecutorStateUpdate(update *executorStateUpdate) {
Expand All @@ -382,23 +407,24 @@ func (s *SparkApplicationController) processSingleExecutorStateUpdate(update *ex
return
}

s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) {
if toUpdate.Status.ExecutorState == nil {
toUpdate.Status.ExecutorState = make(map[string]v1alpha1.ExecutorState)
s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) {
if status.ExecutorState == nil {
status.ExecutorState = make(map[string]v1alpha1.ExecutorState)
}
if update.state != v1alpha1.ExecutorPendingState {
toUpdate.Status.ExecutorState[update.podName] = update.state
status.ExecutorState[update.podName] = update.state
}
})
}

func (s *SparkApplicationController) updateSparkApplicationWithRetries(
func (s *SparkApplicationController) updateSparkApplicationStatusWithRetries(
original *v1alpha1.SparkApplication,
toUpdate *v1alpha1.SparkApplication,
updateFunc func(*v1alpha1.SparkApplication)) *v1alpha1.SparkApplication {
updateFunc func(*v1alpha1.SparkApplicationStatus)) *v1alpha1.SparkApplication {
toUpdate := original.DeepCopy()

var lastUpdateErr error
for i := 0; i < maximumUpdateRetries; i++ {
updated, err := s.tryUpdate(original, toUpdate, updateFunc)
updated, err := s.tryUpdateStatus(original, toUpdate, updateFunc)
if err == nil {
return updated
}
Expand All @@ -422,11 +448,11 @@ func (s *SparkApplicationController) updateSparkApplicationWithRetries(
return nil
}

func (s *SparkApplicationController) tryUpdate(
func (s *SparkApplicationController) tryUpdateStatus(
original *v1alpha1.SparkApplication,
toUpdate *v1alpha1.SparkApplication,
updateFunc func(*v1alpha1.SparkApplication)) (*v1alpha1.SparkApplication, error) {
updateFunc(toUpdate)
updateFunc func(*v1alpha1.SparkApplicationStatus)) (*v1alpha1.SparkApplication, error) {
updateFunc(&toUpdate.Status)
if reflect.DeepEqual(original.Status, toUpdate.Status) {
return nil, nil
}
Expand Down Expand Up @@ -473,22 +499,38 @@ func (s *SparkApplicationController) getNodeExternalIP(nodeName string) string {
}

func (s *SparkApplicationController) handleRestart(app *v1alpha1.SparkApplication) {
if app.Spec.RestartPolicy == v1alpha1.Never || app.Spec.RestartPolicy == v1alpha1.Undefined {
return
}

if (app.Status.AppState.State == v1alpha1.FailedState && app.Spec.RestartPolicy == v1alpha1.OnFailure) ||
app.Spec.RestartPolicy == v1alpha1.Always {
glog.Infof("SparkApplication %s failed or terminated, restarting it with RestartPolicy %s",
app.Name, app.Spec.RestartPolicy)
s.recorder.Eventf(
app,
apiv1.EventTypeNormal,
"SparkApplicationResubmission",
"Re-submitting SparkApplication: %s",
"SparkApplicationRestart",
"Re-starting SparkApplication: %s",
app.Name)

s.submitApp(app, true)
// Cleanup old driver pod and UI service if necessary.
if app.Status.DriverInfo.PodName != "" {
err := s.kubeClient.CoreV1().Pods(app.Namespace).Delete(app.Status.DriverInfo.PodName,
&metav1.DeleteOptions{})
if err != nil {
glog.Errorf("failed to delete old driver pod %s of SparkApplication %s: %v",
app.Status.DriverInfo.PodName, app.Name, err)
}
}
if app.Status.DriverInfo.WebUIServiceName != "" {
err := s.kubeClient.CoreV1().Services(app.Namespace).Delete(app.Status.DriverInfo.WebUIServiceName,
&metav1.DeleteOptions{})
if err != nil {
glog.Errorf("failed to delete old web UI service %s of SparkApplication %s: %v",
app.Status.DriverInfo.WebUIServiceName, app.Name, err)
}
}

// Add the application key to the queue for re-submission.
key := getApplicationKey(app.Namespace, app.Name)
s.queue.AddRateLimited(key)
}
}

Expand Down
Loading

0 comments on commit 2cf8975

Please sign in to comment.