Skip to content

Commit 8c7fdbb

Browse files
akhurana001liyinan926
authored andcommitted
Operator State Management + Ingress Creation (kubeflow#291)
* SparkOperator: Prometheus Metrics Integration * Prometheus Metric Update * Spark Operator:Prometheus Metric Integration * PositiveGauge rework * remove unwanted dependencies * Propogating ScheduledSpark App Labels * Doc update * Metric Description update * fix app wait * SparkOperator: Prometheus Metrics Integration * Spark Operator metrics:PR Comments * SparkOperator: Set completion time for Failed App * Operator Metrics: PR comments * Spark Operator: PR Comments * Controller Update * PR Comments * Docs Update * Driver State Transition Check Update * Operator State Management * Clean-up * Exposing Spark Application Id in Operator * SparkAppId updates * Add Lyft as a user and contributor to operator * Spark Operator Rework * Reworking Restart-Policy * Documentation update * PR comments * PR comments * Ingress impl * Ingress Tests + Updates * go fmt * PR Comments * missing files * AppId removal: Doc Update * Doc update * Delete UI/Ingress + Other minor changes * Add PENDING_RETRY State * PR comments * PR comments * Clean-up * Update controller.go * Add Terminal State * Terminal State * Spark improvements * event type * Events update * Update controller.go * Update controller.go * PR Comments * PR comments * Support Best-effort Spec updates * New State * PR comments * PR comments * go fmt * Docs update * PR feedback * PR Feedback * PR comments
1 parent 4b77040 commit 8c7fdbb

33 files changed

+2284
-2200
lines changed

docs/api.md

+9-7
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,8 @@ A `SparkApplicationSpec` has the following top-level fields:
5252
| `Driver` | N/A | A [`DriverSpec`](#driverspec) field. |
5353
| `Executor` | N/A | An [`ExecutorSpec`](#executorspec) field. |
5454
| `Deps` | N/A | A [`Dependencies`](#dependencies) field. |
55-
| `RestartPolicy` | N/A | The policy regarding if and in which conditions the controller should restart a terminated application. Valid values are `Never`, `Always`, and `OnFailure`. |
55+
| `RestartPolicy` | N/A | The policy regarding if and in which conditions the controller should restart a terminated application. |
5656
| `NodeSelector` | `spark.kubernetes.node.selector.[labelKey]` | Node selector of the driver pod and executor pods, with key `labelKey` and value as the label's value. |
57-
| `MaxSubmissionRetries` | N/A | The maximum number of times to retry a failed submission. |
58-
| `SubmissionRetryInterval` | N/A | The unit of intervals in seconds between submission retries. Depending on the implementation, the actual interval between two submission retries may be a multiple of `SubmissionRetryInterval`, e.g., if linear or exponential backoff is used. |
5957
| `MemoryOverheadFactor` | `spark.kubernetes.memoryOverheadFactor` | This sets the Memory Overhead Factor that will allocate memory to non-JVM memory. For JVM-based jobs this value will default to 0.10, for non-JVM jobs 0.40. Value of this field will be overridden by `Spec.Driver.MemoryOverhead` and `Spec.Executor.MemoryOverhead` if they are set. |
6058
| `Monitoring` | N/A | This specifies how monitoring of the Spark application should be handled, e.g., how driver and executor metrics are to be exposed. Currently only exposing metrics to Prometheus is supported. |
6159

@@ -135,12 +133,14 @@ A `SparkApplicationStatus` captures the status of a Spark application including
135133
| Field | Note |
136134
| ------------- | ------------- |
137135
| `AppID` | A randomly generated ID used to group all Kubernetes resources of an application. |
138-
| `SubmissionTime` | Time the application is submitted to run. |
136+
| `LastSubmissionAttemptTime` | Time for the last application submission attempt. |
139137
| `CompletionTime` | Time the application completes (if it does). |
140138
| `DriverInfo` | A [`DriverInfo`](#driverinfo) field. |
141139
| `AppState` | Current state of the application. |
142140
| `ExecutorState` | A map of executor pod names to executor state. |
143-
| `SubmissionRetries` | The number of submission retries for an application. |
141+
| `ExecutionAttempts` | The number of attempts made for an application. |
142+
| `SubmissionAttempts` | The number of submission attempts made for an application. |
143+
144144

145145
#### `DriverInfo`
146146

@@ -149,8 +149,10 @@ A `DriverInfo` captures information about the driver pod and the Spark web UI ru
149149
| Field | Note |
150150
| ------------- | ------------- |
151151
| `WebUIServiceName` | Name of the service for the Spark web UI. |
152-
| `WebUIPort` | Port on which the Spark web UI runs. |
153-
| `WebUIAddress` | Address to access the web UI from outside the cluster. |
152+
| `WebUIPort` | Port on which the Spark web UI runs on the Node. |
153+
| `WebUIAddress` | Address to access the web UI from outside the cluster via the Node. |
154+
| `WebUIIngressName` | Name of the ingress for the Spark web UI. |
155+
| `WebUIIngressAddress` | Address to access the web UI via the Ingress. |
154156
| `PodName` | Name of the driver pod. |
155157

156158
### `ScheduledSparkApplicationSpec`

docs/design.md

+6-10
Original file line numberDiff line numberDiff line change
@@ -39,24 +39,20 @@ When a `SparkApplication` object gets updated (i.e., when the `UpdateFunc` callb
3939

4040
The controller is also responsible for updating the status of a `SparkApplication` object with the help of the Spark pod monitor, which watches Spark pods and update the `SparkApplicationStatus` field of corresponding `SparkApplication` objects based on the status of the pods. The Spark pod monitor watches events of creation, updates, and deletion of Spark pods, creates status update messages based on the status of the pods, and sends the messages to the controller to process. When the controller receives a status update message, it gets the corresponding `SparkApplication` object from the cache store and updates the the `Status` accordingly.
4141

42-
As described in [API Definition](api.md), the `Status` field (of type `SparkApplicationStatus`) records the overall state of the application as well as the state of each executor pod. Note that the overall state of an application is determined by the driver pod state, except when submission fails, in which case no driver pod gets launched. Particulrly, the final application state is set to the termination state of the driver pod when applicable, i.e., `COMPLETED` if the driver pod completed or `FAILED` if the driver pod failed. If the driver pod gets deleted while running, the final application state is set to `FAILED`. If submission fails, the application state is set to `FAILED_SUBMISSION`.
42+
As described in [API Definition](api.md), the `Status` field (of type `SparkApplicationStatus`) records the overall state of the application as well as the state of each executor pod. Note that the overall state of an application is determined by the driver pod state, except when submission fails, in which case no driver pod gets launched. Particulrly, the final application state is set to the termination state of the driver pod when applicable, i.e., `COMPLETED` if the driver pod completed or `FAILED` if the driver pod failed. If the driver pod gets deleted while running, the final application state is set to `FAILED`. If submission fails, the application state is set to `FAILED_SUBMISSION`. There are two terminal states: `COMPLETED` and `FAILED` which means that any Application in these states will never be retried by the Operator. All other states are non-terminal and based on the State as well as RestartPolicy (discussed below) can be retried.
4343

4444
As part of preparing a submission for a newly created `SparkApplication` object, the controller parses the object and adds configuration options for adding certain annotations to the driver and executor pods of the application. The annotations are later used by the mutating admission webhook to configure the pods before they start to run. For example,if a Spark application needs a certain Kubernetes ConfigMap to be mounted into the driver and executor pods, the controller adds an annotation that specifies the name of the ConfigMap to mount. Later the mutating admission webhook sees the annotation on the pods and mount the ConfigMap to the pods.
4545

46-
## Handling Application Restart
46+
## Handling Application Restart And Failures
4747

48-
The operator provides a configurable option through the `RestartPolicy` field of `SparkApplicationSpec` (see the [API Definition](api.md) for more details) for specifying the application restart policy. The operator determines if an application should be restarted based on its termination state and the restart policy. As discussed above, the termination state of an application is based on the termination state of the driver pod. So effectively the decision is based on the termination state of the driver pod and the restart policy. Specifically, one of the following conditions applies:
48+
The operator provides a configurable option through the `RestartPolicy` field of `SparkApplicationSpec` (see the [Configuring Automatic Application Restart and Failure Handling](user-guide.md) for more details) for specifying the application restart policy. The operator determines if an application should be restarted based on its termination state and the restart policy. As discussed above, the termination state of an application is based on the termination state of the driver pod. So effectively the decision is based on the termination state of the driver pod and the restart policy. Specifically, one of the following conditions applies:
4949

50-
* If the restart policy is `Never`, the application is not restarted upon terminating.
51-
* If the restart policy is `Always`, the application gets restarted regardless of the termination state of the application.
52-
* If the restart policy is `OnFailure`, the application gets restarted if and only if the application failed. Note that in case the driver pod gets deleted while running, the application is considered being failed as discussed above. In this case, the application gets restarted if the restart policy is `OnFailure`.
50+
* If the restart policy type is `Never`, the application is not restarted upon terminating.
51+
* If the restart policy type is `Always`, the application gets restarted regardless of the termination state of the application. Please note that such an Application will never end up in a terminal state of `COMPLETED` or `FAILED`.
52+
* If the restart policy type is `OnFailure`, the application gets restarted if and only if the application failed and the retry limit is not reached. Note that in case the driver pod gets deleted while running, the application is considered being failed as discussed above. In this case, the application gets restarted if the restart policy is `OnFailure`.
5353

5454
When the operator decides to restart an application, it cleans up the Kubernetes resources associated with the previous terminated run of the application and enqueues the `SparkApplication` object of the application into the internal work queue, from which it gets picked up by a worker who will handle the submission. Note that instead of restarting the driver pod, the operator simply re-submits the application and lets the submission client create a new driver pod.
5555

56-
## Handling Retries of Failed Submissions
57-
58-
The submission of an application may fail for various reasons. Sometimes a submission may fail due to transient errors and a retry may succeed. The operator supports retries of failed submissions through a combination of the `MaxSubmissionRetries` field of `SparkApplicationSpec` and the `SubmissionRetries` field of `SparkApplicationStatus` (see the [API Definition](api.md) for more details). When the operator decides to retry a failed submission, it simply enqueues the `SparkApplication` object of the application into the internal work queue, from which it gets picked up by a worker who will handle the submission.
59-
6056
## Mutating Admission Webhook
6157

6258
The operator comes with an optional mutating admission webhook for customizing Spark driver and executor pods based on certain annotations on the pods added by the CRD controller. The annotations are set by the operator based on the application specifications. All Spark pod customization needs except for those natively support by Spark on Kubernetes are handled by the mutating admission webhook.

docs/quick-start-guide.md

+19-6
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,18 @@ A note about `metrics-labels`: In `Prometheus`, every unique combination of key-
7777
Additionally, these metrics are best-effort for the current operator run and will be reset on an operator restart. Also some of these metrics are generated by listening to pod state updates for the driver/executors
7878
and deleting the pods outside the operator might lead to incorrect metric values for some of these metrics.
7979

80+
## UI Access and Ingress
81+
The operator, by default, makes the Spark UI accessible by creating a service of type `NodePort` which exposes the UI via the node running the driver.
82+
The operator also supports creating an Ingress for the UI. This can be turned on by setting the `ingress-url-format` command-line flag. The `ingress-url-format`
83+
should be a template like `{{$appName}}.ingress.cluster.com` and the operator will replace the `{{$appName}}` with the appropriate appName.
84+
85+
The operator also sets both `WebUIAddress` which uses the Node's public IP as well as `WebUIIngressAddress` as part of the `DriverInfo` field of the `SparkApplication`.
86+
8087
## Configuration
8188

8289
The operator is typically deployed and run using the Helm chart. However, users can still run it outside a Kubernetes cluster and make it talk to the Kubernetes API server of a cluster by specifying path to `kubeconfig`, which can be done using the `-kubeconfig` flag.
8390

84-
The operator uses multiple workers in the `SparkApplication` controller and and the submission runner.
85-
The number of worker threads to use in the three places are controlled using command-line flags `-controller-threads` and `-submission-threads`, respectively. The default values for the flags are 10 and 3, respectively.
91+
The operator uses multiple workers in the `SparkApplication` controller. The number of worker threads are controlled using command-line flag `-controller-threads` which has a default value of 10.
8692

8793
The operator enables cache resynchronization so periodically the informers used by the operator will re-list existing objects it manages and re-trigger resource events. The resynchronization interval in seconds can be configured using the flag `-resync-interval`, with a default value of 30 seconds.
8894

@@ -150,10 +156,15 @@ spec:
150156
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
151157
mainClass: org.apache.spark.examples.SparkPi
152158
mode: cluster
153-
restartPolicy: Never
159+
restartPolicy:
160+
type: OnFailure
161+
onFailureRetries: 3
162+
onFailureRetryInterval: 10
163+
onSubmissionFailureRetries: 5
164+
onSubmissionFailureRetryInterval: 20
154165
type: Scala
155166
status:
156-
appId: spark-pi-2402118027
167+
sparkApplicationId: spark-5f4ba921c85ff3f1cb04bef324f9154c9
157168
applicationState:
158169
state: COMPLETED
159170
completionTime: 2018-02-20T23:33:55Z
@@ -162,9 +173,11 @@ status:
162173
webUIAddress: 35.192.234.248:31064
163174
webUIPort: 31064
164175
webUIServiceName: spark-pi-2402118027-ui-svc
176+
webUIIngressName: spark-pi-ui-ingress
177+
webUIIngressAddress: spark-pi.ingress.cluster.com
165178
executorState:
166179
spark-pi-83ba921c85ff3f1cb04bef324f9154c9-exec-1: COMPLETED
167-
submissionTime: 2018-02-20T23:32:27Z
180+
LastSubmissionAttemptTime: 2018-02-20T23:32:27Z
168181
```
169182
170183
To check events for the `SparkApplication` object, run the following command:
@@ -200,4 +213,4 @@ $ kubectl apply -f manifest/spark-operator-with-webhook.yaml
200213

201214
This will create a Deployment named `sparkoperator` and a Service named `spark-webhook` for the webhook in namespace `spark-operator`.
202215

203-
If the operator is installed via the Helm chart using the default settings (i.e. with webhook enabled), the above steps are all automated for you.
216+
If the operator is installed via the Helm chart using the default settings (i.e. with webhook enabled), the above steps are all automated for you.

docs/user-guide.md

+25-11
Original file line numberDiff line numberDiff line change
@@ -357,15 +357,28 @@ A `SparkApplication` can be updated using the `kubectl apply -f <updated YAML fi
357357

358358
A `SparkApplication` can be checked using the `kubectl describe sparkapplications <name>` command. The output of the command shows the specification and status of the `SparkApplication` as well as events associated with it. The events communicate the overall process and errors of the `SparkApplication`.
359359

360-
### Configuring Automatic Application Restart
361-
362-
The operator supports automatic application restart with a configurable `RestartPolicy` using the optional field `.spec.restartPolicy`, whose valid values include `Never`, `OnFailure`, and `Always`. Upon termination of an application, the operator determines if the application is subject to restart based on its termination state and the `RestartPolicy` in the specification. If the application is subject to restart, the operator restarts it by submitting a new run of it. The old driver pod is deleted if it still exists before submitting the new run, and a new driver pod is created by the submission client so effectively the driver gets restarted.
363-
364-
### Configuring Automatic Application Re-submission on Submission Failures
365-
366-
The operator supports automatically retrying failed submissions. When the operator failed to submit an
367-
application, it determines if the application is subject to a submission retry based on if the optional field
368-
`.spec.maxSubmissionRetries` is set and has a positive value and the number of times it has already retried. If the maximum submission retries has not been reached, the operator retries submitting the application using a linear backoff with the interval specified by `.spec.submissionRetryInterval`. If `.spec.submissionRetryInterval` is not set, the operator retries submitting the application immediately.
360+
### Configuring Automatic Application Restart and Failure Handling
361+
362+
The operator supports automatic application restart with a configurable `RestartPolicy` using the optional field
363+
`.spec.restartPolicy`. The following is an example of a sample `RestartPolicy`:
364+
365+
```yaml
366+
restartPolicy:
367+
type: OnFailure
368+
onFailureRetries: 3
369+
onFailureRetryInterval: 10
370+
onSubmissionFailureRetries: 5
371+
onSubmissionFailureRetryInterval: 20
372+
```
373+
The valid types of restartPolicy include `Never`, `OnFailure`, and `Always`. Upon termination of an application,
374+
the operator determines if the application is subject to restart based on its termination state and the
375+
`RestartPolicy` in the specification. If the application is subject to restart, the operator restarts it by
376+
submitting a new run of it. For `OnFailure`, the Operator further supports setting limits on number of retries
377+
via the `onFailureRetries` and `onSubmissionFailureRetries` fields. Additionally, if the submission retries has not been reached,
378+
the operator retries submitting the application using a linear backoff with the interval specified by
379+
`onFailureRetryInterval` and `onSubmissionFailureRetryInterval` which are required for both `OnFailure` and `Always` `RestartPolicy`.
380+
The old resources like driver pod, ui service/ingress etc. are deleted if it still exists before submitting the new run, and a new driver pod is created by the submission
381+
client so effectively the driver gets restarted.
369382

370383
## Running Spark Applications on a Schedule using a ScheduledSparkApplication
371384

@@ -395,7 +408,8 @@ spec:
395408
cores: 1
396409
instances: 1
397410
memory: 512m
398-
restartPolicy: Never
411+
restartPolicy:
412+
type: Never
399413
```
400414

401415
The concurrency of runs of an application is controlled by `.spec.concurrencyPolicy`, whose valid values are `Allow`, `Forbid`, and `Replace`, with `Allow` being the default. The meanings of each value is described below:
@@ -417,4 +431,4 @@ To customize the operator, you can follow the steps below:
417431
2. Create docker images to be used for Spark with [docker-image tool](https://spark.apache.org/docs/latest/running-on-kubernetes.html#docker-images).
418432
3. Create a new operator image based on the above image. You need to modify the `FROM` tag in the [Dockerfile](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/Dockerfile) with your Spark image.
419433
4. Build and push your operator image built above.
420-
5. Deploy the new image by modifying the [/manifest/spark-operator.yaml](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/manifest/spark-operator.yaml) file and specfiying your operator image.
434+
5. Deploy the new image by modifying the [/manifest/spark-operator.yaml](https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/manifest/spark-operator.yaml) file and specfiying your operator image.

docs/who-is-using.md

+1
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
| Microsoft (MileIQ) |@dharmeshkakadia| Production | AI & Analytics |
66
| CERN|@mrow4a| Evaluation | Data Mining & Analytics |
77
| Lightbend |@yuchaoran2011| Evaluation | Data Infrastructure & Operations |
8+
| Lyft |@kumare3| Evaluation | ML & Data Infrastructure |

examples/spark-pi.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,3 @@ spec:
5050
volumeMounts:
5151
- name: "test-volume"
5252
mountPath: "/tmp"
53-
restartPolicy: Never

examples/spark-pyfiles.yaml

+6-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ spec:
2828
image: "gcr.io/spark-operator/spark-py:v2.4.0"
2929
imagePullPolicy: Always
3030
mainApplicationFile: local:///opt/spark/examples/src/main/python/pyfiles.py
31+
restartPolicy:
32+
type: OnFailure
33+
onFailureRetries: 3
34+
onFailureRetryInterval: 10
35+
onSubmissionFailureRetries: 5
36+
onSubmissionFailureRetryInterval: 20
3137
arguments:
3238
- python2.7
3339
deps:
@@ -46,5 +52,3 @@ spec:
4652
memory: "512m"
4753
labels:
4854
version: 2.4.0
49-
restartPolicy: Never
50-

0 commit comments

Comments
 (0)