Skip to content

Commit

Permalink
Merge pull request kubeflow#446 from liyinan926/master
Browse files Browse the repository at this point in the history
Added a submissionID to status and use it to group pod of the same run
  • Loading branch information
liyinan926 authored Mar 20, 2019
2 parents c105018 + 61a9a75 commit 1bbf0d0
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 26 deletions.
4 changes: 3 additions & 1 deletion pkg/apis/sparkoperator.k8s.io/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ const (
// ApplicationState tells the current state of the application and an error message in case of failures.
type ApplicationState struct {
State ApplicationStateType `json:"state"`
ErrorMessage string `json:"errorMessage"`
ErrorMessage string `json:"errorMessage,omitempty"`
}

// ExecutorState tells the current state of an executor.
Expand All @@ -281,6 +281,8 @@ const (
type SparkApplicationStatus struct {
// SparkApplicationID is set by the spark-distribution(via spark.app.id config) on the driver and executor pods
SparkApplicationID string `json:"sparkApplicationId,omitempty"`
// SubmissionID is a unique ID of the current submission of the application.
SubmissionID string `json:"submissionID,omitempty"`
// LastSubmissionAttemptTime is the time for the last application submission attempt.
LastSubmissionAttemptTime metav1.Time `json:"lastSubmissionAttemptTime,omitempty"`
// CompletionTime is the time when the application runs to completion if it does.
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ const (
SparkDriverRole = "driver"
// SparkExecutorRole is the value of the spark-role label for the executors.
SparkExecutorRole = "executor"
// SubmissionIDLabel is the label that records the submission ID of the current run of an application.
SubmissionIDLabel = LabelAnnotationPrefix + "submission-id"
)

const (
Expand Down
17 changes: 11 additions & 6 deletions pkg/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -129,7 +128,7 @@ func newSparkApplicationController(
controller.applicationLister = crdInformer.Lister()

podsInformer := podInformerFactory.Core().V1().Pods()
sparkPodEventHandler := newSparkPodEventHandler(controller.queue.AddRateLimited)
sparkPodEventHandler := newSparkPodEventHandler(controller.queue.AddRateLimited, controller.applicationLister)
podsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sparkPodEventHandler.onPodAdded,
UpdateFunc: sparkPodEventHandler.onPodUpdated,
Expand Down Expand Up @@ -271,9 +270,13 @@ type driverState struct {
// getAndUpdateAppState lists the driver and executor pods of the application
// and updates the application state based on the current phase of the pods.
func (c *Controller) getAndUpdateAppState(app *v1beta1.SparkApplication) error {
// Fetch all the pods for the application.
selector, _ := labels.NewRequirement(config.SparkAppNameLabel, selection.Equals, []string{app.Name})
pods, err := c.podLister.Pods(app.Namespace).List(labels.NewSelector().Add(*selector))
// Fetch all the pods for the current run of the application.
labelMap := map[string]string{config.SparkAppNameLabel: app.Name}
if app.Status.SubmissionID != "" {
labelMap[config.SubmissionIDLabel] = app.Status.SubmissionID
}
selector := labels.SelectorFromSet(labels.Set(labelMap))
pods, err := c.podLister.Pods(app.Namespace).List(selector)
if err != nil {
return fmt.Errorf("failed to get pods for SparkApplication %s/%s: %v", app.Namespace, app.Name, err)
}
Expand Down Expand Up @@ -554,7 +557,8 @@ func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1be
}
}

submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit)
submissionID := fmt.Sprintf("%s-%d", app.Name, time.Now().UnixNano())
submissionCmdArgs, err := buildSubmissionCommandArgs(appToSubmit, submissionID)
if err != nil {
app.Status = v1beta1.SparkApplicationStatus{
AppState: v1beta1.ApplicationState{
Expand Down Expand Up @@ -591,6 +595,7 @@ func (c *Controller) submitSparkApplication(app *v1beta1.SparkApplication) *v1be

glog.Infof("SparkApplication %s/%s has been submitted", app.Namespace, app.Name)
app.Status = v1beta1.SparkApplicationStatus{
SubmissionID: submissionID,
AppState: v1beta1.ApplicationState{
State: v1beta1.SubmittedState,
},
Expand Down
33 changes: 25 additions & 8 deletions pkg/controller/sparkapplication/spark_pod_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,33 @@ package sparkapplication

import (
"github.com/golang/glog"

apiv1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

crdlisters "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/listers/sparkoperator.k8s.io/v1beta1"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
)

// sparkPodEventHandler monitors Spark executor pods and update the SparkApplication objects accordingly.
type sparkPodEventHandler struct {
applicationLister crdlisters.SparkApplicationLister
// call-back function to enqueue SparkApp key for processing.
enqueueFunc func(appKey interface{})
}

// newSparkPodEventHandler creates a new sparkPodEventHandler instance.
func newSparkPodEventHandler(enqueueFunc func(appKey interface{})) *sparkPodEventHandler {
func newSparkPodEventHandler(enqueueFunc func(appKey interface{}), lister crdlisters.SparkApplicationLister) *sparkPodEventHandler {
monitor := &sparkPodEventHandler{
enqueueFunc: enqueueFunc,
enqueueFunc: enqueueFunc,
applicationLister: lister,
}
return monitor
}

func (s *sparkPodEventHandler) onPodAdded(obj interface{}) {
pod := obj.(*apiv1.Pod)
glog.V(2).Infof("Pod %s added in namespace %s.", pod.GetObjectMeta().GetName(), pod.GetObjectMeta().GetNamespace())
glog.V(2).Infof("Pod %s added in namespace %s.", pod.GetName(), pod.GetNamespace())
s.enqueueSparkAppForUpdate(pod)
}

Expand All @@ -49,7 +55,7 @@ func (s *sparkPodEventHandler) onPodUpdated(old, updated interface{}) {
if updatedPod.ResourceVersion == oldPod.ResourceVersion {
return
}
glog.V(2).Infof("Pod %s updated in namespace %s.", updatedPod.GetObjectMeta().GetName(), updatedPod.GetObjectMeta().GetNamespace())
glog.V(2).Infof("Pod %s updated in namespace %s.", updatedPod.GetName(), updatedPod.GetNamespace())
s.enqueueSparkAppForUpdate(updatedPod)

}
Expand All @@ -68,13 +74,24 @@ func (s *sparkPodEventHandler) onPodDeleted(obj interface{}) {
if deletedPod == nil {
return
}
glog.V(2).Infof("Pod %s deleted in namespace %s.", deletedPod.GetObjectMeta().GetName(), deletedPod.GetObjectMeta().GetNamespace())
glog.V(2).Infof("Pod %s deleted in namespace %s.", deletedPod.GetName(), deletedPod.GetNamespace())
s.enqueueSparkAppForUpdate(deletedPod)
}

func (s *sparkPodEventHandler) enqueueSparkAppForUpdate(pod *apiv1.Pod) {
if appKey, ok := createMetaNamespaceKey(pod); ok {
glog.V(2).Infof("Enqueuing SparkApplication %s for app update processing.", appKey)
s.enqueueFunc(appKey)
appName, exists := getAppName(pod)
if !exists {
return
}

if submissionID, exists := pod.Labels[config.SubmissionIDLabel]; exists {
app, err := s.applicationLister.SparkApplications(pod.GetNamespace()).Get(appName)
if err != nil || app.Status.SubmissionID != submissionID {
return
}
}

appKey := createMetaNamespaceKey(pod.GetNamespace(), appName)
glog.V(2).Infof("Enqueuing SparkApplication %s for app update processing.", appKey)
s.enqueueFunc(appKey)
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,6 @@ func TestOnPodDeleted(t *testing.T) {
func newMonitor() (*sparkPodEventHandler, workqueue.RateLimitingInterface) {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(),
"spark-application-controller-test")
monitor := newSparkPodEventHandler(queue.AddRateLimited)
monitor := newSparkPodEventHandler(queue.AddRateLimited, nil)
return monitor, queue
}
7 changes: 2 additions & 5 deletions pkg/controller/sparkapplication/sparkapp_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ import (
)

// Helper method to create a key with namespace and appName
func createMetaNamespaceKey(pod *apiv1.Pod) (string, bool) {
if appName, ok := getAppName(pod); ok {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), appName), true
}
return "", false
func createMetaNamespaceKey(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

func getAppName(pod *apiv1.Pod) (string, bool) {
Expand Down
14 changes: 9 additions & 5 deletions pkg/controller/sparkapplication/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func runSparkSubmit(submission *submission) (bool, error) {
return true, nil
}

func buildSubmissionCommandArgs(app *v1beta1.SparkApplication) ([]string, error) {
func buildSubmissionCommandArgs(app *v1beta1.SparkApplication, submissionID string) ([]string, error) {
var args []string
if app.Spec.MainClass != nil {
args = append(args, "--class", *app.Spec.MainClass)
Expand Down Expand Up @@ -161,14 +161,14 @@ func buildSubmissionCommandArgs(app *v1beta1.SparkApplication) ([]string, error)
// Add the driver and executor configuration options.
// Note that when the controller submits the application, it expects that all dependencies are local
// so init-container is not needed and therefore no init-container image needs to be specified.
options, err := addDriverConfOptions(app)
options, err := addDriverConfOptions(app, submissionID)
if err != nil {
return nil, err
}
for _, option := range options {
args = append(args, "--conf", option)
}
options, err = addExecutorConfOptions(app)
options, err = addExecutorConfOptions(app, submissionID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -248,13 +248,15 @@ func addDependenciesConfOptions(app *v1beta1.SparkApplication) []string {
return depsConfOptions
}

func addDriverConfOptions(app *v1beta1.SparkApplication) ([]string, error) {
func addDriverConfOptions(app *v1beta1.SparkApplication, submissionID string) ([]string, error) {
var driverConfOptions []string

driverConfOptions = append(driverConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkDriverLabelKeyPrefix, config.SparkAppNameLabel, app.Name))
driverConfOptions = append(driverConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkDriverLabelKeyPrefix, config.LaunchedBySparkOperatorLabel, "true"))
driverConfOptions = append(driverConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkDriverLabelKeyPrefix, config.SubmissionIDLabel, submissionID))

driverPodName := fmt.Sprintf("%s-driver", app.GetName())
if app.Spec.Driver.PodName != nil {
Expand Down Expand Up @@ -316,13 +318,15 @@ func addDriverConfOptions(app *v1beta1.SparkApplication) ([]string, error) {
return driverConfOptions, nil
}

func addExecutorConfOptions(app *v1beta1.SparkApplication) ([]string, error) {
func addExecutorConfOptions(app *v1beta1.SparkApplication, submissionID string) ([]string, error) {
var executorConfOptions []string

executorConfOptions = append(executorConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkExecutorLabelKeyPrefix, config.SparkAppNameLabel, app.Name))
executorConfOptions = append(executorConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkExecutorLabelKeyPrefix, config.LaunchedBySparkOperatorLabel, "true"))
executorConfOptions = append(executorConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkExecutorLabelKeyPrefix, config.SubmissionIDLabel, submissionID))

if app.Spec.Executor.Instances != nil {
conf := fmt.Sprintf("spark.executor.instances=%d", *app.Spec.Executor.Instances)
Expand Down

0 comments on commit 1bbf0d0

Please sign in to comment.