Skip to content

Commit

Permalink
Populate Spark Application labels to Driver and Executors (kubeflow#940)
Browse files Browse the repository at this point in the history
  • Loading branch information
gongx authored Jun 3, 2020
1 parent 72f6893 commit 9d4592a
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 2 deletions.
17 changes: 17 additions & 0 deletions pkg/controller/sparkapplication/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,16 @@ func addDriverConfOptions(app *v1beta2.SparkApplication, submissionID string) ([
fmt.Sprintf("%s=%s", config.SparkDriverServiceAccountName, *app.Spec.Driver.ServiceAccount))
}

//Populate SparkApplication Labels to Driver
driverLabels := make(map[string]string)
for key, value := range app.Labels {
driverLabels[key] = value
}
for key, value := range app.Spec.Driver.Labels {
driverLabels[key] = value
}

for key, value := range driverLabels {
driverConfOptions = append(driverConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkDriverLabelKeyPrefix, key, value))
}
Expand Down Expand Up @@ -340,7 +349,15 @@ func addExecutorConfOptions(app *v1beta2.SparkApplication, submissionID string)
fmt.Sprintf("%s=%t", config.SparkExecutorDeleteOnTermination, *app.Spec.Executor.DeleteOnTermination))
}

//Populate SparkApplication Labels to Executors
executorLabels := make(map[string]string)
for key, value := range app.Labels {
executorLabels[key] = value
}
for key, value := range app.Spec.Executor.Labels {
executorLabels[key] = value
}
for key, value := range executorLabels {
executorConfOptions = append(executorConfOptions,
fmt.Sprintf("%s%s=%s", config.SparkExecutorLabelKeyPrefix, key, value))
}
Expand Down
123 changes: 121 additions & 2 deletions pkg/controller/sparkapplication/submission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package sparkapplication

import (
"fmt"
"github.com/google/uuid"
"sort"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -29,8 +32,12 @@ import (
)

const (
VolumeMountPathTemplate = "spark.kubernetes.%s.volumes.%s.%s.mount.path=%s"
VolumeMountOptionPathTemplate = "spark.kubernetes.%s.volumes.%s.%s.options.%s=%s"
VolumeMountPathTemplate = "spark.kubernetes.%s.volumes.%s.%s.mount.path=%s"
VolumeMountOptionPathTemplate = "spark.kubernetes.%s.volumes.%s.%s.options.%s=%s"
SparkDriverLabelAnnotationTemplate = "spark.kubernetes.driver.label.sparkoperator.k8s.io/%s=%s"
SparkDriverLabelTemplate = "spark.kubernetes.driver.label.%s=%s"
SparkExecutorLabelAnnotationTemplate = "spark.kubernetes.executor.label.sparkoperator.k8s.io/%s=%s"
SparkExecutorLabelTemplate = "spark.kubernetes.executor.label.%s=%s"
)

func TestAddLocalDir_HostPath(t *testing.T) {
Expand Down Expand Up @@ -354,3 +361,115 @@ func TestAddLocalDir_Driver_Executor(t *testing.T) {
assert.Equal(t, fmt.Sprintf(VolumeMountPathTemplate, "executor", "hostPath", volumes[0].Name, volumeMounts[0].MountPath), localDirOptions[2])
assert.Equal(t, fmt.Sprintf(VolumeMountOptionPathTemplate, "executor", "hostPath", volumes[0].Name, "path", volumes[0].HostPath.Path), localDirOptions[3])
}

func TestPopulateLabels_Driver_Executor(t *testing.T) {
const (
AppLabelKey = "app-label-key"
AppLabelValue = "app-label-value"
DriverLabelKey = "driver-label-key"
DriverLabelValue = "driver-label-key"
ExecutorLabelKey = "executor-label-key"
ExecutorLabelValue = "executor-label-key"
)

app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-test",
UID: "spark-test-1",
Labels: map[string]string{AppLabelKey: AppLabelValue},
},
Spec: v1beta2.SparkApplicationSpec{
Driver: v1beta2.DriverSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
Labels: map[string]string{DriverLabelKey: DriverLabelValue},
},
},
Executor: v1beta2.ExecutorSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
Labels: map[string]string{ExecutorLabelKey: ExecutorLabelValue},
},
},
},
}

submissionID := uuid.New().String()
driverOptions, err := addDriverConfOptions(app, submissionID)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 5, len(driverOptions))
assert.Equal(t, fmt.Sprintf(SparkDriverLabelAnnotationTemplate, "app-name", "spark-test"), driverOptions[0])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelAnnotationTemplate, "launched-by-spark-operator", strconv.FormatBool(true)), driverOptions[1])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelAnnotationTemplate, "submission-id", submissionID), driverOptions[2])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelTemplate, AppLabelKey, AppLabelValue), driverOptions[3])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelTemplate, DriverLabelKey, DriverLabelValue), driverOptions[4])

executorOptions, err := addExecutorConfOptions(app, submissionID)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 5, len(executorOptions))
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelAnnotationTemplate, "app-name", "spark-test"), executorOptions[0])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelAnnotationTemplate, "launched-by-spark-operator", strconv.FormatBool(true)), executorOptions[1])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelAnnotationTemplate, "submission-id", submissionID), executorOptions[2])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelTemplate, AppLabelKey, AppLabelValue), executorOptions[3])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelTemplate, ExecutorLabelKey, ExecutorLabelValue), executorOptions[4])
}

func TestPopulateLabelsOverride_Driver_Executor(t *testing.T) {
const (
AppLabelKey = "app-label-key"
AppLabelValue = "app-label-value"
DriverLabelKey = "driver-label-key"
DriverLabelValue = "driver-label-key"
DriverAppLabelOverride = "driver-app-label-override"
ExecutorLabelKey = "executor-label-key"
ExecutorLabelValue = "executor-label-key"
ExecutorAppLabelOverride = "executor-app-label-override"
)

app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-test",
UID: "spark-test-1",
Labels: map[string]string{AppLabelKey: AppLabelValue},
},
Spec: v1beta2.SparkApplicationSpec{
Driver: v1beta2.DriverSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
Labels: map[string]string{DriverLabelKey: DriverLabelValue, AppLabelKey: DriverAppLabelOverride},
},
},
Executor: v1beta2.ExecutorSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
Labels: map[string]string{ExecutorLabelKey: ExecutorLabelValue, AppLabelKey: ExecutorAppLabelOverride},
},
},
},
}

submissionID := uuid.New().String()
driverOptions, err := addDriverConfOptions(app, submissionID)
if err != nil {
t.Fatal(err)
}
sort.Strings(driverOptions)
assert.Equal(t, 5, len(driverOptions))
assert.Equal(t, fmt.Sprintf(SparkDriverLabelTemplate, AppLabelKey, DriverAppLabelOverride), driverOptions[0])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelTemplate, DriverLabelKey, DriverLabelValue), driverOptions[1])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelAnnotationTemplate, "app-name", "spark-test"), driverOptions[2])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelAnnotationTemplate, "launched-by-spark-operator", strconv.FormatBool(true)), driverOptions[3])
assert.Equal(t, fmt.Sprintf(SparkDriverLabelAnnotationTemplate, "submission-id", submissionID), driverOptions[4])

executorOptions, err := addExecutorConfOptions(app, submissionID)
if err != nil {
t.Fatal(err)
}
sort.Strings(executorOptions)
assert.Equal(t, 5, len(executorOptions))
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelTemplate, AppLabelKey, ExecutorAppLabelOverride), executorOptions[0])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelTemplate, ExecutorLabelKey, ExecutorLabelValue), executorOptions[1])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelAnnotationTemplate, "app-name", "spark-test"), executorOptions[2])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelAnnotationTemplate, "launched-by-spark-operator", strconv.FormatBool(true)), executorOptions[3])
assert.Equal(t, fmt.Sprintf(SparkExecutorLabelAnnotationTemplate, "submission-id", submissionID), executorOptions[4])
}

0 comments on commit 9d4592a

Please sign in to comment.