Skip to content

Commit

Permalink
Added API to pass custom Prometheus configuration file contained in a…
Browse files Browse the repository at this point in the history
… job image; automatically truncate the volume name to fit 63 chars
  • Loading branch information
yuchaoran2011 committed Feb 15, 2019
1 parent 11f80c4 commit 8d6a5d9
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 42 deletions.
11 changes: 4 additions & 7 deletions docs/api.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
# SparkApplication API

The Kubernetes Operator for Apache Spark uses [CustomResourceDefinitions](https://kubernetes.io/docs/concepts/api-extension/custom-resources/)
named `SparkApplication` and `ScheduledSparkApplication` for specifying one-time Spark applications and Spark applications
that are supposed to run on a standard [cron](https://en.wikipedia.org/wiki/Cron) schedule. Similarly to other kinds of
Kubernetes resources, they consist of a specification in a `Spec` field and a `Status` field. The definitions are organized
in the following structure. The v1beta1 version of the API definition is implemented
[here](../pkg/apis/sparkoperator.k8s.io/v1beta1/types.go).
The Kubernetes Operator for Apache Spark uses [CustomResourceDefinitions](https://kubernetes.io/docs/concepts/api-extension/custom-resources/) named `SparkApplication` and `ScheduledSparkApplication` for specifying one-time Spark applications and Spark applications
that are supposed to run on a standard [cron](https://en.wikipedia.org/wiki/Cron) schedule. Similarly to other kinds of Kubernetes resources, they consist of a specification in a `Spec` field and a `Status` field. The definitions are organized in the following structure. The v1beta1 version of the API definition is implemented [here](../pkg/apis/sparkoperator.k8s.io/v1beta1/types.go).

```
ScheduledSparkApplication
Expand Down Expand Up @@ -124,7 +120,8 @@ A `PrometheusSpec` configures how metrics are exposed to Prometheus.
| ------------- | ------------- | ------------- |
| `JmxExporterJar` | N/A | This specifies the path to the [Prometheus JMX exporter](https://github.com/prometheus/jmx_exporter) jar. |
| `Port` | N/A | If specified, the value will be used in the Java agent configuration for the Prometheus JMX exporter. The Java agent gets bound to the specified port if specified or `8090` otherwise by default. |
| `Configuration` | N/A | If specified, this contains the content of a custom Prometheus configuration used by the Prometheus JMX exporter. Otherwise, the content of `spark-docker/conf/prometheus.yaml` will be used. |
| `ConfigFile` | N/A | This specifies the full path of the Prometheus configuration file in the Spark job image. If specified, it will override the default configurations and take precedence over `Configuration` shown below. |
| `Configuration` | N/A | If specified, this contains the contents of a custom Prometheus configuration used by the Prometheus JMX exporter. Otherwise, the contents of `spark-docker/conf/prometheus.yaml` will be used, unless `ConfigFile` is specified. |

### `SparkApplicationStatus`

Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,12 @@ type PrometheusSpec struct {
// Optional.
// If not specified, 8090 will be used as the default.
Port *int32 `json:"port"`
// ConfigFile is the path to the custom Prometheus configuration file provided in the Spark job image.
// ConfigFile takes precedence over Configuration, which is shown below.
ConfigFile string `json:"configFile"`
// Configuration is the content of the Prometheus configuration needed by the Prometheus JMX exporter.
// Optional.
// If not specified, the content in spark-docker/conf/prometheus.yaml will be used.
// Configuration has no effect if ConfigFile is set.
Configuration *string `json:"configuration"`
}
83 changes: 48 additions & 35 deletions pkg/controller/sparkapplication/monitoring_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package sparkapplication

import (
"fmt"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -39,26 +40,55 @@ const (
)

func configPrometheusMonitoring(app *v1beta1.SparkApplication, kubeClient clientset.Interface) error {
prometheusConfigMapName := fmt.Sprintf("%s-%s", app.Name, prometheusConfigMapNameSuffix)
configMap := buildPrometheusConfigMap(app, prometheusConfigMapName)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
cm, err := kubeClient.CoreV1().ConfigMaps(app.Namespace).Get(prometheusConfigMapName, metav1.GetOptions{})

if apiErrors.IsNotFound(err) {
_, createErr := kubeClient.CoreV1().ConfigMaps(app.Namespace).Create(configMap)
return createErr
}
if err != nil {
return err
}
port := config.DefaultPrometheusJavaAgentPort
if app.Spec.Monitoring.Prometheus.Port != nil {
port = *app.Spec.Monitoring.Prometheus.Port
}

cm.Data = configMap.Data
_, updateErr := kubeClient.CoreV1().ConfigMaps(app.Namespace).Update(cm)
return updateErr
})
configFile := app.Spec.Monitoring.Prometheus.ConfigFile
var javaOption string
if configFile != "" {
glog.V(2).Infof("Overriding the default Prometheus configuration with config file %s in the Spark job image.", configFile)
javaOption = fmt.Sprintf("-javaagent:%s=%d:%s", app.Spec.Monitoring.Prometheus.JmxExporterJar,
port, configFile)
} else {
glog.V(2).Infof("Using the default Prometheus configuration.")
prometheusConfigMapName := fmt.Sprintf("%s-%s", app.Name, prometheusConfigMapNameSuffix)
configMap := buildPrometheusConfigMap(app, prometheusConfigMapName)
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
cm, err := kubeClient.CoreV1().ConfigMaps(app.Namespace).Get(prometheusConfigMapName, metav1.GetOptions{})

if apiErrors.IsNotFound(err) {
_, createErr := kubeClient.CoreV1().ConfigMaps(app.Namespace).Create(configMap)
return createErr
}
if err != nil {
return err
}

cm.Data = configMap.Data
_, updateErr := kubeClient.CoreV1().ConfigMaps(app.Namespace).Update(cm)
return updateErr
})

if retryErr != nil {
return fmt.Errorf("failed to apply %s in namespace %s: %v", prometheusConfigMapName, app.Namespace, retryErr)
if retryErr != nil {
return fmt.Errorf("failed to apply %s in namespace %s: %v", prometheusConfigMapName, app.Namespace, retryErr)
}
javaOption = fmt.Sprintf("-javaagent:%s=%d:%s/%s", app.Spec.Monitoring.Prometheus.JmxExporterJar,
port, prometheusConfigMapMountPath, prometheusConfigKey)

if app.Spec.Monitoring.ExposeDriverMetrics {
app.Spec.Driver.ConfigMaps = append(app.Spec.Driver.ConfigMaps, v1beta1.NamePath{
Name: prometheusConfigMapName,
Path: prometheusConfigMapMountPath,
})
}
if app.Spec.Monitoring.ExposeExecutorMetrics {
app.Spec.Executor.ConfigMaps = append(app.Spec.Executor.ConfigMaps, v1beta1.NamePath{
Name: prometheusConfigMapName,
Path: prometheusConfigMapMountPath,
})
}
}

/* work around for push gateway issue: https://github.com/prometheus/pushgateway/issues/97 */
Expand All @@ -70,19 +100,7 @@ func configPrometheusMonitoring(app *v1beta1.SparkApplication, kubeClient client
app.Spec.SparkConf["spark.metrics.namespace"] = metricNamespace
app.Spec.SparkConf["spark.metrics.conf"] = metricConf

port := config.DefaultPrometheusJavaAgentPort
if app.Spec.Monitoring.Prometheus.Port != nil {
port = *app.Spec.Monitoring.Prometheus.Port
}
javaOption := fmt.Sprintf("-javaagent:%s=%d:%s/%s", app.Spec.Monitoring.Prometheus.JmxExporterJar,
port, prometheusConfigMapMountPath, prometheusConfigKey)

if app.Spec.Monitoring.ExposeDriverMetrics {
app.Spec.Driver.ConfigMaps = append(app.Spec.Driver.ConfigMaps, v1beta1.NamePath{
Name: prometheusConfigMapName,
Path: prometheusConfigMapMountPath,
})

if app.Spec.Driver.Annotations == nil {
app.Spec.Driver.Annotations = make(map[string]string)
}
Expand All @@ -97,11 +115,6 @@ func configPrometheusMonitoring(app *v1beta1.SparkApplication, kubeClient client
}
}
if app.Spec.Monitoring.ExposeExecutorMetrics {
app.Spec.Executor.ConfigMaps = append(app.Spec.Executor.ConfigMaps, v1beta1.NamePath{
Name: prometheusConfigMapName,
Path: prometheusConfigMapMountPath,
})

if app.Spec.Executor.Annotations == nil {
app.Spec.Executor.Annotations = make(map[string]string)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/webhook/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package webhook

import (
"fmt"
"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -29,6 +30,7 @@ import (
const (
sparkDriverContainerName = "spark-kubernetes-driver"
sparkExecutorContainerName = "executor"
maxNameLength = 63
)

// patchOperation represents a RFC6902 JSON patch operation.
Expand Down Expand Up @@ -206,6 +208,11 @@ func addGeneralConfigMaps(pod *corev1.Pod) []*patchOperation {
namesToMountPaths := config.FindGeneralConfigMaps(pod.Annotations)
for name, mountPath := range namesToMountPaths {
volumeName := name + "-vol"
if len(volumeName) > maxNameLength {
glog.V(2).Infof("ConfigMap volume name %s is too long. Truncating to length %d.", volumeName, maxNameLength)
volumeName = volumeName[0:maxNameLength]
glog.V(2).Infof("Truncated volume name: %s.", volumeName)
}
patchOps = append(patchOps, addConfigMapVolume(pod, name, volumeName))
patchOps = append(patchOps, addConfigMapVolumeMount(pod, volumeName, mountPath))
}
Expand Down

0 comments on commit 8d6a5d9

Please sign in to comment.