From 2cf8975b2ab8c03bc92d2423ad221c8362503112 Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Mon, 12 Feb 2018 09:32:24 -0800 Subject: [PATCH] Added support for retrying failed submissions --- examples/spark-pi.yaml | 1 + hack/update-codegen.sh | 2 +- hack/verify-codegen.sh | 2 +- manifest/spark-operator-rbac.yaml | 3 +- manifest/spark-operator.yaml | 2 +- manifest/spark-rbac.yaml | 54 ++++++ .../sparkoperator.k8s.io/v1alpha1/types.go | 4 + pkg/controller/controller.go | 168 +++++++++++------- pkg/controller/controller_test.go | 105 ++++++++--- pkg/controller/sparkui.go | 23 ++- pkg/controller/sparkui_test.go | 19 +- pkg/controller/submission_runner.go | 6 +- pkg/controller/submission_runner_test.go | 4 +- pkg/crd/crd.go | 12 +- 14 files changed, 289 insertions(+), 116 deletions(-) create mode 100644 manifest/spark-rbac.yaml diff --git a/examples/spark-pi.yaml b/examples/spark-pi.yaml index 8a932ce15..65920f0b0 100644 --- a/examples/spark-pi.yaml +++ b/examples/spark-pi.yaml @@ -31,6 +31,7 @@ spec: memory: "512m" labels: version: 2.3.0 + serviceAccount: spark executor: cores: 1 instances: 1 diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index 20cb0895c..c503e4d01 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -31,4 +31,4 @@ ${CODEGEN_PKG}/generate-groups.sh "all" \ --output-base "$(dirname ${BASH_SOURCE})/../../.." # To use your own boilerplate text append: -# --go-header-file ${SCRIPT_ROOT}/hack/custom-boilerplate.go.txt \ No newline at end of file +# --go-header-file ${SCRIPT_ROOT}/hack/custom-boilerplate.go.txt diff --git a/hack/verify-codegen.sh b/hack/verify-codegen.sh index 68bc85a4c..9cc02a5a4 100755 --- a/hack/verify-codegen.sh +++ b/hack/verify-codegen.sh @@ -45,4 +45,4 @@ then else echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh" exit 1 -fi \ No newline at end of file +fi diff --git a/manifest/spark-operator-rbac.yaml b/manifest/spark-operator-rbac.yaml index da5745481..bdf34bd97 100644 --- a/manifest/spark-operator-rbac.yaml +++ b/manifest/spark-operator-rbac.yaml @@ -1,4 +1,3 @@ - # # Copyright 2017 Google LLC # @@ -49,4 +48,4 @@ subjects: roleRef: kind: ClusterRole name: sparkoperator - apiGroup: rbac.authorization.k8s.io \ No newline at end of file + apiGroup: rbac.authorization.k8s.io diff --git a/manifest/spark-operator.yaml b/manifest/spark-operator.yaml index b3f81250e..4539a5d19 100644 --- a/manifest/spark-operator.yaml +++ b/manifest/spark-operator.yaml @@ -40,4 +40,4 @@ spec: imagePullPolicy: Always command: ["/usr/bin/spark-operator"] args: - - -logtostderr \ No newline at end of file + - -logtostderr diff --git a/manifest/spark-rbac.yaml b/manifest/spark-rbac.yaml new file mode 100644 index 000000000..f47cc0dcc --- /dev/null +++ b/manifest/spark-rbac.yaml @@ -0,0 +1,54 @@ +# +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: Role +metadata: + namespace: default + name: spark-role +rules: +- apiGroups: + - "" # "" indicates the core API group + resources: + - "pods" + verbs: + - "*" +- apiGroups: + - "" # "" indicates the core API group + resources: + - "services" + verbs: + - "*" +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: RoleBinding +metadata: + name: spark-role-binding + namespace: default +subjects: +- kind: ServiceAccount + name: spark + namespace: default +roleRef: + kind: Role + name: spark-role + apiGroup: rbac.authorization.k8s.io diff --git a/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go b/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go index e70fd7a29..f1c02e0d7 100644 --- a/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go +++ b/pkg/apis/sparkoperator.k8s.io/v1alpha1/types.go @@ -118,6 +118,8 @@ type SparkApplicationSpec struct { RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"` // NodeSelector is the Kubernetes node selector to be added to the driver and executor pods. NodeSelector map[string]string `json:"nodeSelector,omitempty"` + // MaxSubmissionRetries is the maximum number of times to retry a failed submission. + MaxSubmissionRetries int32 `json:"maxSubmissionRetries,omitempty"` } // ApplicationStateType represents the type of the current state of an application. @@ -167,6 +169,8 @@ type SparkApplicationStatus struct { AppState ApplicationState `json:"applicationState"` // ExecutorState records the state of executors by executor Pod names. ExecutorState map[string]ExecutorState `json:"executorState,omitempty"` + // SubmissionRetries is the number of retries attempted for a failed submission. + SubmissionRetries int32 `json:"submissionRetries,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 02a870230..b8e11f0fb 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -64,7 +64,7 @@ type SparkApplicationController struct { recorder record.EventRecorder runner *sparkSubmitRunner sparkPodMonitor *sparkPodMonitor - appStateReportingChan <-chan appStateUpdate + appStateReportingChan <-chan *appStateUpdate podStateReportingChan <-chan interface{} } @@ -94,7 +94,7 @@ func newSparkApplicationController( queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "spark-application-controller") - appStateReportingChan := make(chan appStateUpdate, submissionRunnerWorkers) + appStateReportingChan := make(chan *appStateUpdate, submissionRunnerWorkers) podStateReportingChan := make(chan interface{}) runner := newSparkSubmitRunner(submissionRunnerWorkers, appStateReportingChan) @@ -238,21 +238,28 @@ func (s *SparkApplicationController) syncSparkApplication(key string) error { if err != nil { return err } - s.submitApp(app, false) + s.submitApp(app) return nil } -func (s *SparkApplicationController) submitApp(app *v1alpha1.SparkApplication, resubmission bool) { - updatedApp := s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) { - if resubmission { - // Clear the Status field if it's a resubmission. - toUpdate.Status = v1alpha1.SparkApplicationStatus{} - } - toUpdate.Status.AppID = buildAppID(toUpdate) - toUpdate.Status.AppState.State = v1alpha1.NewState - createSparkUIService(toUpdate, s.kubeClient) - }) +func (s *SparkApplicationController) submitApp(app *v1alpha1.SparkApplication) { + appStatus := v1alpha1.SparkApplicationStatus{ + AppID: buildAppID(app), + AppState: v1alpha1.ApplicationState{ + State: v1alpha1.NewState, + }, + } + name, port, err := createSparkUIService(app, appStatus.AppID, s.kubeClient) + if err != nil { + glog.Errorf("failed to create a UI service for SparkApplication %s: %v", app.Name, err) + } else { + appStatus.DriverInfo.WebUIServiceName = name + appStatus.DriverInfo.WebUIPort = port + } + updatedApp := s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) { + *status = appStatus + }) if updatedApp == nil { return } @@ -264,7 +271,7 @@ func (s *SparkApplicationController) submitApp(app *v1alpha1.SparkApplication, r updatedApp.Name, err) } - + s.runner.submit(newSubmission(submissionCmdArgs, updatedApp)) } @@ -301,34 +308,34 @@ func (s *SparkApplicationController) processSingleDriverStateUpdate( return nil } - updated := s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) { - toUpdate.Status.DriverInfo.PodName = update.podName + // The application state is solely based on the driver pod phase except when submission fails and + // no driver pod is launched. + appState := driverPodPhaseToApplicationState(update.podPhase) + if isAppTerminated(appState) { + s.recorder.Eventf( + app, + apiv1.EventTypeNormal, + "SparkApplicationTermination", + "SparkApplication %s terminated with state: %v", + update.appName, + appState) + } + + updated := s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) { + status.DriverInfo.PodName = update.podName if update.nodeName != "" { if nodeIP := s.getNodeExternalIP(update.nodeName); nodeIP != "" { - toUpdate.Status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", nodeIP, - toUpdate.Status.DriverInfo.WebUIPort) + status.DriverInfo.WebUIAddress = fmt.Sprintf("%s:%d", nodeIP, + status.DriverInfo.WebUIPort) } } - // Update the application based on the driver pod phase. - // The application state is solely based on the driver pod phase except when submission fails and - // no driver pod is launched. - toUpdate.Status.AppState.State = driverPodPhaseToApplicationState(update.podPhase) + status.AppState.State = appState if !update.completionTime.IsZero() { - toUpdate.Status.CompletionTime = update.completionTime + status.CompletionTime = update.completionTime } }) - if updated != nil && isAppTerminated(updated.Status.AppState.State) { - s.recorder.Eventf( - updated, - apiv1.EventTypeNormal, - "SparkApplicationTermination", - "SparkApplication %s terminated with state: %v", - updated.Name, - updated.Status.AppState) - } - return updated } @@ -338,7 +345,7 @@ func (s *SparkApplicationController) processAppStateUpdates() { } } -func (s *SparkApplicationController) processSingleAppStateUpdate(update appStateUpdate) { +func (s *SparkApplicationController) processSingleAppStateUpdate(update *appStateUpdate) { key := getApplicationKey(update.namespace, update.name) app, err := s.getSparkApplicationFromStore(key) if err != nil { @@ -347,22 +354,40 @@ func (s *SparkApplicationController) processSingleAppStateUpdate(update appState return } - updated := s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) { - toUpdate.Status.AppState.State = update.state - toUpdate.Status.AppState.ErrorMessage = update.errorMessage - if !update.submissionTime.IsZero() { - toUpdate.Status.SubmissionTime = update.submissionTime - } - }) - - if updated != nil && updated.Status.AppState.State == v1alpha1.FailedSubmissionState { + submissionRetries := app.Status.SubmissionRetries + if update.state == v1alpha1.FailedSubmissionState { s.recorder.Eventf( - updated, + app, apiv1.EventTypeNormal, "SparkApplicationSubmissionFailure", "SparkApplication %s failed submission", - updated.Name) + update.name) + + if submissionRetries < app.Spec.MaxSubmissionRetries { + glog.Infof("Retrying submission of SparkApplication %s", update.name) + key := getApplicationKey(update.namespace, update.name) + s.queue.AddRateLimited(key) + submissionRetries++ + s.recorder.Eventf( + app, + apiv1.EventTypeNormal, + "SparkApplicationSubmissionRetry", + "Retried submission of SparkApplication %s", + update.name) + } else { + glog.Errorf("maximum number of submission retries of SparkApplication %s has been reached, not "+ + "attempting more retries", update.name) + } } + + s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) { + status.AppState.State = update.state + status.AppState.ErrorMessage = update.errorMessage + status.SubmissionRetries = submissionRetries + if !update.submissionTime.IsZero() { + status.SubmissionTime = update.submissionTime + } + }) } func (s *SparkApplicationController) processSingleExecutorStateUpdate(update *executorStateUpdate) { @@ -382,23 +407,24 @@ func (s *SparkApplicationController) processSingleExecutorStateUpdate(update *ex return } - s.updateSparkApplicationWithRetries(app, app.DeepCopy(), func(toUpdate *v1alpha1.SparkApplication) { - if toUpdate.Status.ExecutorState == nil { - toUpdate.Status.ExecutorState = make(map[string]v1alpha1.ExecutorState) + s.updateSparkApplicationStatusWithRetries(app, func(status *v1alpha1.SparkApplicationStatus) { + if status.ExecutorState == nil { + status.ExecutorState = make(map[string]v1alpha1.ExecutorState) } if update.state != v1alpha1.ExecutorPendingState { - toUpdate.Status.ExecutorState[update.podName] = update.state + status.ExecutorState[update.podName] = update.state } }) } -func (s *SparkApplicationController) updateSparkApplicationWithRetries( +func (s *SparkApplicationController) updateSparkApplicationStatusWithRetries( original *v1alpha1.SparkApplication, - toUpdate *v1alpha1.SparkApplication, - updateFunc func(*v1alpha1.SparkApplication)) *v1alpha1.SparkApplication { + updateFunc func(*v1alpha1.SparkApplicationStatus)) *v1alpha1.SparkApplication { + toUpdate := original.DeepCopy() + var lastUpdateErr error for i := 0; i < maximumUpdateRetries; i++ { - updated, err := s.tryUpdate(original, toUpdate, updateFunc) + updated, err := s.tryUpdateStatus(original, toUpdate, updateFunc) if err == nil { return updated } @@ -422,11 +448,11 @@ func (s *SparkApplicationController) updateSparkApplicationWithRetries( return nil } -func (s *SparkApplicationController) tryUpdate( +func (s *SparkApplicationController) tryUpdateStatus( original *v1alpha1.SparkApplication, toUpdate *v1alpha1.SparkApplication, - updateFunc func(*v1alpha1.SparkApplication)) (*v1alpha1.SparkApplication, error) { - updateFunc(toUpdate) + updateFunc func(*v1alpha1.SparkApplicationStatus)) (*v1alpha1.SparkApplication, error) { + updateFunc(&toUpdate.Status) if reflect.DeepEqual(original.Status, toUpdate.Status) { return nil, nil } @@ -473,10 +499,6 @@ func (s *SparkApplicationController) getNodeExternalIP(nodeName string) string { } func (s *SparkApplicationController) handleRestart(app *v1alpha1.SparkApplication) { - if app.Spec.RestartPolicy == v1alpha1.Never || app.Spec.RestartPolicy == v1alpha1.Undefined { - return - } - if (app.Status.AppState.State == v1alpha1.FailedState && app.Spec.RestartPolicy == v1alpha1.OnFailure) || app.Spec.RestartPolicy == v1alpha1.Always { glog.Infof("SparkApplication %s failed or terminated, restarting it with RestartPolicy %s", @@ -484,11 +506,31 @@ func (s *SparkApplicationController) handleRestart(app *v1alpha1.SparkApplicatio s.recorder.Eventf( app, apiv1.EventTypeNormal, - "SparkApplicationResubmission", - "Re-submitting SparkApplication: %s", + "SparkApplicationRestart", + "Re-starting SparkApplication: %s", app.Name) - s.submitApp(app, true) + // Cleanup old driver pod and UI service if necessary. + if app.Status.DriverInfo.PodName != "" { + err := s.kubeClient.CoreV1().Pods(app.Namespace).Delete(app.Status.DriverInfo.PodName, + &metav1.DeleteOptions{}) + if err != nil { + glog.Errorf("failed to delete old driver pod %s of SparkApplication %s: %v", + app.Status.DriverInfo.PodName, app.Name, err) + } + } + if app.Status.DriverInfo.WebUIServiceName != "" { + err := s.kubeClient.CoreV1().Services(app.Namespace).Delete(app.Status.DriverInfo.WebUIServiceName, + &metav1.DeleteOptions{}) + if err != nil { + glog.Errorf("failed to delete old web UI service %s of SparkApplication %s: %v", + app.Status.DriverInfo.WebUIServiceName, app.Name, err) + } + } + + // Add the application key to the queue for re-submission. + key := getApplicationKey(app.Namespace, app.Name) + s.queue.AddRateLimited(key) } } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 47882ab44..4202a1bbf 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -70,7 +70,7 @@ func TestSubmitApp(t *testing.T) { } ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Create(app) - go ctrl.submitApp(app, false) + go ctrl.submitApp(app) submission := <-ctrl.runner.queue assert.Equal(t, app.Name, submission.name) assert.Equal(t, app.Namespace, submission.namespace) @@ -95,6 +95,7 @@ func TestOnAdd(t *testing.T) { key, ok := item.(string) assert.True(t, ok) assert.Equal(t, getApplicationKey(app.Namespace, app.Name), key) + ctrl.queue.Forget(item) assert.Equal(t, 1, len(recorder.Events)) event := <-recorder.Events @@ -120,9 +121,11 @@ func TestOnDelete(t *testing.T) { ctrl.onDelete(app) ctrl.queue.ShutDown() item, _ := ctrl.queue.Get() + defer ctrl.queue.Done(item) assert.True(t, item == nil) event := <-recorder.Events assert.True(t, strings.Contains(event, "SparkApplicationDeletion")) + ctrl.queue.Forget(item) } func TestProcessSingleDriverStateUpdate(t *testing.T) { @@ -288,7 +291,7 @@ func TestProcessSingleAppStateUpdate(t *testing.T) { } testFn := func(test testcase, t *testing.T) { - ctrl.processSingleAppStateUpdate(test.update) + ctrl.processSingleAppStateUpdate(&test.update) app, err := ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Get(app.Name, metav1.GetOptions{}) if err != nil { @@ -324,14 +327,13 @@ func TestProcessSingleExecutorStateUpdate(t *testing.T) { ctrl, _ := newFakeController() - appID := "foo-123" app := &v1alpha1.SparkApplication{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: "default", }, Status: v1alpha1.SparkApplicationStatus{ - AppID: appID, + AppID: "foo-123", AppState: v1alpha1.ApplicationState{ State: v1alpha1.NewState, ErrorMessage: "", @@ -439,23 +441,23 @@ func TestHandleRestart(t *testing.T) { testFn := func(test testcase, t *testing.T) { ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(test.app.Namespace).Create(test.app) - defer ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(test.app.Namespace).Delete(test.app.Name, - &metav1.DeleteOptions{}) - go ctrl.handleRestart(test.app) + ctrl.store.Add(test.app) + ctrl.handleRestart(test.app) if test.expectRestart { + go ctrl.processNextItem() submission := <-ctrl.runner.queue assert.Equal(t, test.app.Name, submission.name) assert.Equal(t, test.app.Namespace, submission.namespace) event := <-recorder.Events - assert.True(t, strings.Contains(event, "SparkApplicationResubmission")) + assert.True(t, strings.Contains(event, "SparkApplicationRestart")) } } testcases := []testcase{ { - name: "completed application with restart policy never", + name: "completed application with restart policy Never", app: &v1alpha1.SparkApplication{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "default"}, Spec: v1alpha1.SparkApplicationSpec{RestartPolicy: v1alpha1.Never}, Status: v1alpha1.SparkApplicationStatus{ AppState: v1alpha1.ApplicationState{State: v1alpha1.CompletedState}, @@ -464,9 +466,9 @@ func TestHandleRestart(t *testing.T) { expectRestart: false, }, { - name: "completed application with restart policy never", + name: "completed application with restart policy OnFailure", app: &v1alpha1.SparkApplication{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: "foo2", Namespace: "default"}, Spec: v1alpha1.SparkApplicationSpec{RestartPolicy: v1alpha1.OnFailure}, Status: v1alpha1.SparkApplicationStatus{ AppState: v1alpha1.ApplicationState{State: v1alpha1.CompletedState}, @@ -475,9 +477,9 @@ func TestHandleRestart(t *testing.T) { expectRestart: false, }, { - name: "completed application with restart policy never", + name: "completed application with restart policy Always", app: &v1alpha1.SparkApplication{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: "foo3", Namespace: "default"}, Spec: v1alpha1.SparkApplicationSpec{RestartPolicy: v1alpha1.Always}, Status: v1alpha1.SparkApplicationStatus{ AppState: v1alpha1.ApplicationState{State: v1alpha1.CompletedState}, @@ -486,9 +488,9 @@ func TestHandleRestart(t *testing.T) { expectRestart: true, }, { - name: "completed application with restart policy never", + name: "failed application with restart policy Never", app: &v1alpha1.SparkApplication{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: "foo4", Namespace: "default"}, Spec: v1alpha1.SparkApplicationSpec{RestartPolicy: v1alpha1.Never}, Status: v1alpha1.SparkApplicationStatus{ AppState: v1alpha1.ApplicationState{State: v1alpha1.FailedState}, @@ -497,9 +499,9 @@ func TestHandleRestart(t *testing.T) { expectRestart: false, }, { - name: "completed application with restart policy never", + name: "failed application with restart policy OnFailure", app: &v1alpha1.SparkApplication{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: "foo5", Namespace: "default"}, Spec: v1alpha1.SparkApplicationSpec{RestartPolicy: v1alpha1.OnFailure}, Status: v1alpha1.SparkApplicationStatus{ AppState: v1alpha1.ApplicationState{State: v1alpha1.FailedState}, @@ -508,9 +510,9 @@ func TestHandleRestart(t *testing.T) { expectRestart: true, }, { - name: "completed application with restart policy never", + name: "failed application with restart policy Always", app: &v1alpha1.SparkApplication{ - ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "default"}, + ObjectMeta: metav1.ObjectMeta{Name: "foo6", Namespace: "default"}, Spec: v1alpha1.SparkApplicationSpec{RestartPolicy: v1alpha1.Always}, Status: v1alpha1.SparkApplicationStatus{ AppState: v1alpha1.ApplicationState{State: v1alpha1.FailedState}, @@ -524,3 +526,66 @@ func TestHandleRestart(t *testing.T) { testFn(test, t) } } + +func TestResubmissionOnFailures(t *testing.T) { + ctrl, recorder := newFakeController() + + os.Setenv(kubernetesServiceHostEnvVar, "localhost") + os.Setenv(kubernetesServicePortEnvVar, "443") + + app := &v1alpha1.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "default", + }, + Spec: v1alpha1.SparkApplicationSpec{ + MaxSubmissionRetries: 2, + }, + Status: v1alpha1.SparkApplicationStatus{ + AppID: "foo-123", + AppState: v1alpha1.ApplicationState{ + State: v1alpha1.NewState, + ErrorMessage: "", + }, + }, + } + + ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Create(app) + ctrl.store.Add(app) + + testFn := func(t *testing.T, update *appStateUpdate) { + ctrl.processSingleAppStateUpdate(update) + item, _ := ctrl.queue.Get() + key, ok := item.(string) + assert.True(t, ok) + assert.Equal(t, getApplicationKey(app.Namespace, app.Name), key) + ctrl.queue.Forget(item) + ctrl.queue.Done(item) + + updatedApp, err := ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Get(app.Name, + metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, int32(1), updatedApp.Status.SubmissionRetries) + + event := <-recorder.Events + assert.True(t, strings.Contains(event, "SparkApplicationSubmissionFailure")) + event = <-recorder.Events + assert.True(t, strings.Contains(event, "SparkApplicationSubmissionRetry")) + } + + update := &appStateUpdate{ + namespace: app.Namespace, + name: app.Name, + state: v1alpha1.FailedSubmissionState, + } + + // First 2 failed submissions should result in re-submission attempts. + testFn(t, update) + testFn(t, update) + + // The next failed submission should not cause a re-submission attempt. + ctrl.processSingleAppStateUpdate(update) + assert.Equal(t, 0, ctrl.queue.Len()) +} diff --git a/pkg/controller/sparkui.go b/pkg/controller/sparkui.go index 56aded4d8..2c765e9ab 100644 --- a/pkg/controller/sparkui.go +++ b/pkg/controller/sparkui.go @@ -25,6 +25,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" + "fmt" "k8s.io/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1alpha1" "k8s.io/spark-on-k8s-operator/pkg/config" ) @@ -34,20 +35,22 @@ const ( defaultSparkWebUIPort = "4040" ) -func createSparkUIService(app *v1alpha1.SparkApplication, kubeClient clientset.Interface) { +func createSparkUIService( + app *v1alpha1.SparkApplication, + appID string, + kubeClient clientset.Interface) (string, int32, error) { portStr := getUITargetPort(app) port, err := strconv.Atoi(portStr) if err != nil { - glog.Errorf("invalid Spark UI port: %s", portStr) - return + return "", -1, fmt.Errorf("invalid Spark UI port: %s", portStr) } service := &apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: app.Status.AppID + "-ui-svc", + Name: appID + "-ui-svc", Namespace: app.Namespace, Labels: map[string]string{ - config.SparkAppIDLabel: app.Status.AppID, + config.SparkAppIDLabel: appID, }, OwnerReferences: []metav1.OwnerReference{getOwnerReference(app)}, }, @@ -59,7 +62,7 @@ func createSparkUIService(app *v1alpha1.SparkApplication, kubeClient clientset.I }, }, Selector: map[string]string{ - config.SparkAppIDLabel: app.Status.AppID, + config.SparkAppIDLabel: appID, sparkRoleLabel: sparkDriverRole, }, Type: apiv1.ServiceTypeNodePort, @@ -69,14 +72,10 @@ func createSparkUIService(app *v1alpha1.SparkApplication, kubeClient clientset.I glog.Infof("Creating a service %s for the Spark UI for application %s", service.Name, app.Name) service, err = kubeClient.CoreV1().Services(app.Namespace).Create(service) if err != nil { - glog.Errorf("failed to create a UI service for SparkApplication %s: %v", app.Name, err) - return + return "", -1, err } - app.Status.DriverInfo = v1alpha1.DriverInfo{ - WebUIServiceName: service.Name, - WebUIPort: service.Spec.Ports[0].NodePort, - } + return service.Name, service.Spec.Ports[0].NodePort, nil } // getWebUITargetPort attempts to get the Spark web UI port from configuration property spark.ui.port diff --git a/pkg/controller/sparkui_test.go b/pkg/controller/sparkui_test.go index 8717ebf01..bc9d143fd 100644 --- a/pkg/controller/sparkui_test.go +++ b/pkg/controller/sparkui_test.go @@ -40,7 +40,15 @@ func TestCreateSparkUIService(t *testing.T) { } testFn := func(test testcase, t *testing.T) { fakeClient := fake.NewSimpleClientset() - createSparkUIService(test.app, fakeClient) + uiServiceName, uiServicePort, err := createSparkUIService(test.app, test.app.Status.AppID, fakeClient) + if err != nil { + if test.expectError { + return + } + t.Fatal(err) + } + test.app.Status.DriverInfo.WebUIServiceName = uiServiceName + test.app.Status.DriverInfo.WebUIPort = uiServicePort if test.app.Status.DriverInfo.WebUIServiceName != test.expectedServiceName { t.Errorf("%s: for service name wanted %s got %s", test.name, test.expectedServiceName, test.app.Status.DriverInfo.WebUIServiceName) @@ -72,6 +80,11 @@ func TestCreateSparkUIService(t *testing.T) { } } + defaultPort, err := strconv.Atoi(defaultSparkWebUIPort) + if err != nil { + t.Fatal(err) + } + app1 := &v1alpha1.SparkApplication{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -97,10 +110,6 @@ func TestCreateSparkUIService(t *testing.T) { AppID: "foo-2", }, } - defaultPort, err := strconv.Atoi(defaultSparkWebUIPort) - if err != nil { - t.Fatal(err) - } app3 := &v1alpha1.SparkApplication{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", diff --git a/pkg/controller/submission_runner.go b/pkg/controller/submission_runner.go index 8f69d430f..4b0a3b46e 100644 --- a/pkg/controller/submission_runner.go +++ b/pkg/controller/submission_runner.go @@ -35,7 +35,7 @@ import ( type sparkSubmitRunner struct { workers int queue chan *submission - appStateReportingChan chan<- appStateUpdate + appStateReportingChan chan<- *appStateUpdate } // appStateUpdate encapsulates overall state update of a Spark application. @@ -47,7 +47,7 @@ type appStateUpdate struct { errorMessage string } -func newSparkSubmitRunner(workers int, appStateReportingChan chan<- appStateUpdate) *sparkSubmitRunner { +func newSparkSubmitRunner(workers int, appStateReportingChan chan<- *appStateUpdate) *sparkSubmitRunner { return &sparkSubmitRunner{ workers: workers, queue: make(chan *submission, workers), @@ -97,7 +97,7 @@ func (r *sparkSubmitRunner) runWorker() { stateUpdate.errorMessage = string(exitErr.Stderr) } - r.appStateReportingChan <- stateUpdate + r.appStateReportingChan <- &stateUpdate } else { glog.Infof("spark-submit completed for SparkApplication %s in namespace %s", s.name, s.namespace) } diff --git a/pkg/controller/submission_runner_test.go b/pkg/controller/submission_runner_test.go index 6e4428dea..7cd405fbf 100644 --- a/pkg/controller/submission_runner_test.go +++ b/pkg/controller/submission_runner_test.go @@ -27,14 +27,14 @@ import ( ) func TestNewRunner(t *testing.T) { - appStateReportingChan := make(chan<- appStateUpdate) + appStateReportingChan := make(chan<- *appStateUpdate) runner := newSparkSubmitRunner(3, appStateReportingChan) assert.Equal(t, 3, runner.workers, "number of workers should be 3") assert.Equal(t, 3, cap(runner.queue), "capacity of the work queue should be 3") } func TestSubmit(t *testing.T) { - appStateReportingChan := make(chan<- appStateUpdate) + appStateReportingChan := make(chan<- *appStateUpdate) runner := newSparkSubmitRunner(1, appStateReportingChan) app := &v1alpha1.SparkApplication{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}} submitCommandArgs := []string{"--master", "localhost", "-class", "foo"} diff --git a/pkg/crd/crd.go b/pkg/crd/crd.go index b8ea7f987..5edeaeb85 100644 --- a/pkg/crd/crd.go +++ b/pkg/crd/crd.go @@ -146,8 +146,8 @@ func getCustomResourceValidation() *apiextensionsv1beta1.CustomResourceValidatio "driver": { Properties: map[string]apiextensionsv1beta1.JSONSchemaProps{ "cores": { - Type: "number", - Minimum: float64Ptr(0), + Type: "number", + Minimum: float64Ptr(0), ExclusiveMinimum: true, }, "podName": { @@ -158,11 +158,11 @@ func getCustomResourceValidation() *apiextensionsv1beta1.CustomResourceValidatio "executor": { Properties: map[string]apiextensionsv1beta1.JSONSchemaProps{ "cores": { - Type: "integer", + Type: "integer", Minimum: float64Ptr(1), }, "instances": { - Type: "integer", + Type: "integer", Minimum: float64Ptr(1), }, }, @@ -170,11 +170,11 @@ func getCustomResourceValidation() *apiextensionsv1beta1.CustomResourceValidatio "deps": { Properties: map[string]apiextensionsv1beta1.JSONSchemaProps{ "downloadTimeout": { - Type: "integer", + Type: "integer", Minimum: float64Ptr(1), }, "maxSimultaneousDownloads": { - Type: "integer", + Type: "integer", Minimum: float64Ptr(1), }, },