Skip to content

Commit

Permalink
support filtering resources on custom labels (kubeflow#952)
Browse files Browse the repository at this point in the history
add doc
  • Loading branch information
AliGouta authored Jun 26, 2020
1 parent 07402b0 commit f62d8e3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
26 changes: 26 additions & 0 deletions docs/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ The Kubernetes Operator for Apache Spark ships with a command-line tool called `
* [Running Spark Applications on a Schedule using a ScheduledSparkApplication](#running-spark-applications-on-a-schedule-using-a-scheduledsparkapplication)
* [Enabling Leader Election for High Availability](#enabling-leader-election-for-high-availability)
* [Enabling Resource Quota Enforcement](#enabling-resource-quota-enforcement)
* [Running Multiple Instances Of The Operator Within The Same K8s Cluster](#running-multiple-instances-of-the-operator-within-the-same-k8s-cluster)
* [Customizing the Operator](#customizing-the-operator)

## Using a SparkApplication
Expand Down Expand Up @@ -752,6 +753,31 @@ The Spark Operator provides limited support for resource quota enforcement using

If you are running Spark applications in namespaces that are subject to resource quota constraints, consider enabling this feature to avoid driver resource starvation. Quota enforcement can be enabled with the command line arguments `-enable-resource-quota-enforcement=true`. It is recommended to also set `-webhook-fail-on-error=true`.

## Running Multiple Instances Of The Operator Within The Same K8s Cluster

If you need to run multiple instances of the operator within the same k8s cluster. Therefore, you need to make sure that the running instances should not compete for the same custom resources or pods. You can achieve this:

Either:
* By specifying a different `namespace` flag for each instance of the operator.

Or if you want your operator to watch specific resources that may exist in different namespaces:

* You need to add custom labels on resources by defining for each instance of the operator a different set of labels in `-label-selector-filter (e.g. env=dev,app-type=spark)`.
* Run different `webhook` instances by specifying different `-webhook-config-name` flag for each deployment of the operator.
* Specify different `webhook-svc-name` and/or `webhook-svc-namespace` for each instance of the operator.
* Edit the job that generates the certificates `webhook-init` by specifying the namespace and the service name of each instance of the operator, `e.g. command: ["/usr/bin/gencerts.sh", "-n", "ns-op1", "-s", "spark-op1-webhook", "-p"]`. Where `spark-op1-webhook` should match what you have specified in `webhook-svc-name`. For instance, if you use the following [helm chart](https://github.com/helm/charts/tree/master/incubator/sparkoperator) to deploy the operator you may specify for each instance of the operator a different `--namespace` and `--name-template` arguments to make sure you generate a different certificate for each instance, e.g:
```
helm install spark-op1 incubator/sparkoperator --namespace ns-op1
helm install spark-op2 incubator/sparkoperator --namespace ns-op2
```
Will run 2 `webhook-init` jobs. Each job executes respectively the command:
```
command: ["/usr/bin/gencerts.sh", "-n", "ns-op1", "-s", "spark-op1-webhook", "-p"`]
command: ["/usr/bin/gencerts.sh", "-n", "ns-op2", "-s", "spark-op2-webhook", "-p"`]
```
* Although resources are already filtered with respect to the specified labels on resources. You may also specify different labels in `-webhook-namespace-selector` and attach these labels to the namepsaces on which you want the webhook to listen to.
## Customizing the Operator
To customize the operator, you can follow the steps below:
Expand Down
16 changes: 16 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
labelSelectorFilter = flag.String("label-selector-filter", "", "A comma-separated list of key=value, or key labels to filter resources during watch and list based on the specified labels.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
Expand Down Expand Up @@ -251,6 +252,12 @@ func buildCustomResourceInformerFactory(crClient crclientset.Interface) crinform
if *namespace != apiv1.NamespaceAll {
factoryOpts = append(factoryOpts, crinformers.WithNamespace(*namespace))
}
if len(*labelSelectorFilter) > 0 {
tweakListOptionsFunc := func(options *metav1.ListOptions) {
options.LabelSelector = *labelSelectorFilter
}
factoryOpts = append(factoryOpts, crinformers.WithTweakListOptions(tweakListOptionsFunc))
}
return crinformers.NewSharedInformerFactoryWithOptions(
crClient,
// resyncPeriod. Every resyncPeriod, all resources in the cache will re-trigger events.
Expand All @@ -265,6 +272,9 @@ func buildPodInformerFactory(kubeClient clientset.Interface) informers.SharedInf
}
tweakListOptionsFunc := func(options *metav1.ListOptions) {
options.LabelSelector = fmt.Sprintf("%s,%s", operatorConfig.SparkRoleLabel, operatorConfig.LaunchedBySparkOperatorLabel)
if len(*labelSelectorFilter) > 0 {
options.LabelSelector = options.LabelSelector + "," + *labelSelectorFilter
}
}
podFactoryOpts = append(podFactoryOpts, informers.WithTweakListOptions(tweakListOptionsFunc))
return informers.NewSharedInformerFactoryWithOptions(kubeClient, time.Duration(*resyncInterval)*time.Second, podFactoryOpts...)
Expand All @@ -275,5 +285,11 @@ func buildCoreV1InformerFactory(kubeClient clientset.Interface) informers.Shared
if *namespace != apiv1.NamespaceAll {
coreV1FactoryOpts = append(coreV1FactoryOpts, informers.WithNamespace(*namespace))
}
if len(*labelSelectorFilter) > 0 {
tweakListOptionsFunc := func(options *metav1.ListOptions) {
options.LabelSelector = *labelSelectorFilter
}
coreV1FactoryOpts = append(coreV1FactoryOpts, informers.WithTweakListOptions(tweakListOptionsFunc))
}
return informers.NewSharedInformerFactoryWithOptions(kubeClient, time.Duration(*resyncInterval)*time.Second, coreV1FactoryOpts...)
}

0 comments on commit f62d8e3

Please sign in to comment.