+
image
string
@@ -2746,5 +2787,5 @@ map[string]string
Generated with gen-crd-api-reference-docs
-on git commit 555c27a .
+on git commit bc7bbd0 .
diff --git a/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
index f1b68cd85..23b6d4c21 100644
--- a/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
+++ b/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
@@ -3613,6 +3613,8 @@ spec:
additionalProperties:
type: string
type: object
+ proxyUser:
+ type: string
pythonVersion:
enum:
- "2"
diff --git a/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml b/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml
index 04765f541..e61ac1142 100644
--- a/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml
+++ b/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml
@@ -3599,6 +3599,8 @@ spec:
additionalProperties:
type: string
type: object
+ proxyUser:
+ type: string
pythonVersion:
enum:
- "2"
diff --git a/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go b/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
index 032e215eb..30b2637dd 100644
--- a/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
+++ b/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
@@ -187,6 +187,10 @@ type SparkApplicationSpec struct {
// Mode is the deployment mode of the Spark application.
// +kubebuilder:validation:Enum={cluster,client}
Mode DeployMode `json:"mode,omitempty"`
+ // ProxyUser specifies the user to impersonate when submitting the application.
+ // It maps to the command-line flag "--proxy-user" in spark-submit.
+ // +optional
+ ProxyUser *string `json:"proxyUser,omitempty"`
// Image is the container image for the driver, executor, and init-container. Any custom container images for the
// driver, executor, or init-container takes precedence over this.
// +optional
diff --git a/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go b/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go
index c0e014d33..eb3d8c89e 100644
--- a/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go
+++ b/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go
@@ -593,6 +593,11 @@ func (in *SparkApplicationList) DeepCopyObject() runtime.Object {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) {
*out = *in
+ if in.ProxyUser != nil {
+ in, out := &in.ProxyUser, &out.ProxyUser
+ *out = new(string)
+ **out = **in
+ }
if in.Image != nil {
in, out := &in.Image, &out.Image
*out = new(string)
diff --git a/pkg/controller/sparkapplication/submission.go b/pkg/controller/sparkapplication/submission.go
index cebf3b000..5531b43ad 100644
--- a/pkg/controller/sparkapplication/submission.go
+++ b/pkg/controller/sparkapplication/submission.go
@@ -96,6 +96,12 @@ func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName str
args = append(args, "--master", masterURL)
args = append(args, "--deploy-mode", string(app.Spec.Mode))
+
+ // Add proxy user
+ if app.Spec.ProxyUser != nil {
+ args = append(args, "--proxy-user", *app.Spec.ProxyUser)
+ }
+
args = append(args, "--conf", fmt.Sprintf("%s=%s", config.SparkAppNamespaceKey, app.Namespace))
args = append(args, "--conf", fmt.Sprintf("%s=%s", config.SparkAppNameKey, app.Name))
args = append(args, "--conf", fmt.Sprintf("%s=%s", config.SparkDriverPodNameKey, driverPodName))
diff --git a/pkg/controller/sparkapplication/submission_test.go b/pkg/controller/sparkapplication/submission_test.go
index e00015b4b..9f32bbb15 100644
--- a/pkg/controller/sparkapplication/submission_test.go
+++ b/pkg/controller/sparkapplication/submission_test.go
@@ -18,6 +18,7 @@ package sparkapplication
import (
"fmt"
+ "os"
"reflect"
"sort"
"strconv"
@@ -543,3 +544,42 @@ func TestDynamicAllocationOptions(t *testing.T) {
assert.Equal(t, fmt.Sprintf("%s=10", config.SparkDynamicAllocationMaxExecutors), options[4])
assert.Equal(t, fmt.Sprintf("%s=6000000", config.SparkDynamicAllocationShuffleTrackingTimeout), options[5])
}
+
+func TestProxyUserArg(t *testing.T) {
+ const (
+ host = "localhost"
+ port = "6443"
+ )
+
+ if err := os.Setenv(kubernetesServiceHostEnvVar, host); err != nil {
+ t.Fatal(err)
+ }
+ if err := os.Setenv(kubernetesServicePortEnvVar, port); err != nil {
+ t.Fatal(err)
+ }
+
+ app := &v1beta2.SparkApplication{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "spark-test",
+ UID: "spark-test-1",
+ },
+ Spec: v1beta2.SparkApplicationSpec{
+ Mode: v1beta2.ClusterMode,
+ ProxyUser: stringptr("foo"),
+ },
+ }
+
+ submissionID := uuid.New().String()
+ driverPodName := getDriverPodName(app)
+ args, err := buildSubmissionCommandArgs(app, driverPodName, submissionID)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t, "--master", args[0])
+ assert.Equal(t, fmt.Sprintf("k8s://https://%s:%s", host, port), args[1])
+ assert.Equal(t, "--deploy-mode", args[2])
+ assert.Equal(t, string(v1beta2.ClusterMode), args[3])
+ assert.Equal(t, "--proxy-user", args[4])
+ assert.Equal(t, "foo", args[5])
+}
|