Skip to content

Commit

Permalink
Switched to use auto-generated clientset
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 committed Jan 14, 2018
1 parent d36b30c commit e021f07
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 267 deletions.
25 changes: 20 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,20 @@

**This is not an official Google product.**

## Project Status

**Project status:** *alpha*

Spark Operator is still under active development and has not been extensively tested yet. Use at your own risk. Backward-compatibility is not supported for alpha releases.

## Prerequisites

* Version >= 1.8 of Kubernetes.

Spark Operator relies on [Initializers](https://kubernetes.io/docs/admin/extensible-admission-controllers/#initializers)
and [garbage collection](https://kubernetes.io/docs/concepts/workloads/controllers/garbage-collection/) support for
custom resources which are in Kubernetes 1.8+.

# Spark Operator

Spark Operator is an experimental project aiming to make specifying and running [Spark](https://github.com/apache/spark)
Expand All @@ -20,7 +34,8 @@ cluster, without needing to use API server proxy or port forwarding.
* Automatically creating namespaces and setting up RBAC roles and quotas, and running users' applications in separate
namespaces for better resource isolation and quota management.

To make such automation possible, Spark Operator uses the Kubernetes [CustomResourceDefinition (CRD)](https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-custom-resource-definitions/) and a corresponding CRD controller as well as an [initializer](https://kubernetes.io/docs/admin/extensible-admission-controllers/#initializers). The CRD controller setups the environment for an application and submits the application to run on behalf of the user, whereas the initializer handles customization of the Spark Pods.
To make such automation possible, Spark Operator uses the Kubernetes
[CustomResourceDefinition (CRD)](https://kubernetes.io/docs/tasks/access-kubernetes-api/extend-api-custom-resource-definitions/) and a corresponding CRD controller as well as an [initializer](https://kubernetes.io/docs/admin/extensible-admission-controllers/#initializers). The CRD controller setups the environment for an application and submits the application to run on behalf of the user, whereas the initializer handles customization of the Spark Pods.

This approach is completely different than the one that has the submission client creates a CRD object. Having externally
created and managed CRD objects offer the following benefits:
Expand Down Expand Up @@ -97,16 +112,16 @@ respectively. The default values for both flags are 10 and 3, respectively.

## Running the Example Spark Application

To run the example Spark application, run the following command:
To run the Spark Pi example, run the following command:

```bash
kubectl create -f manifest/spark-application-example.yaml
kubectl create -f examples/spark-pi.yaml
```

This will create a `SparkApplication` object named `spark-app-example`. Check the object by running the following command:
This will create a `SparkApplication` object named `spark-pi`. Check the object by running the following command:

```bash
kubectl get sparkapplications spark-app-example -o=yaml
kubectl get sparkapplications spark-pi -o=yaml
```

This will show something similar to the following:
Expand Down
48 changes: 48 additions & 0 deletions hack/verify-codegen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/bin/bash

# Copyright 2017 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -o errexit
set -o nounset
set -o pipefail

SCRIPT_ROOT=$(dirname "${BASH_SOURCE}")/..

DIFFROOT="${SCRIPT_ROOT}/pkg"
TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
_tmp="${SCRIPT_ROOT}/_tmp"

cleanup() {
rm -rf "${_tmp}"
}
trap "cleanup" EXIT SIGINT

cleanup

mkdir -p "${TMP_DIFFROOT}"
cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}"

"${SCRIPT_ROOT}/hack/update-codegen.sh"
echo "diffing ${DIFFROOT} against freshly generated codegen"
ret=0
diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$?
cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}"
if [[ $ret -eq 0 ]]
then
echo "${DIFFROOT} up to date."
else
echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh"
exit 1
fi
24 changes: 16 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/golang/glog"

"k8s.io/spark-on-k8s-operator/pkg/controller"
"k8s.io/spark-on-k8s-operator/pkg/crd"
"k8s.io/spark-on-k8s-operator/pkg/initializer"
crdclientset "k8s.io/spark-on-k8s-operator/pkg/client/clientset/versioned"

apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -38,14 +38,22 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

var (
master = flag.String("master", "", "The address of the Kubernetes API server. "+
"Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if "+
"out-of-cluster.")
initializerThreads = flag.Int("initializer-threads", 10, "Number of worker threads "+
"used by the initializer controller.")
submissionRunnerThreads = flag.Int("submission-threads", 3, "Number of worker threads "+
"used by the submission runner.")
)

func main() {
kubeConfig := flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
initializerThreads := flag.Int("initializer-threads", 10, "Number of worker threads used by the initializer controller.")
submissionRunnerThreads := flag.Int("submission-threads", 3, "Number of worker threads used by the submission runner.")
flag.Parse()

// Create the client config. Use kubeConfig if given, otherwise assume in-cluster.
config, err := buildConfig(*kubeConfig)
config, err := buildConfig(*master, *kubeConfig)
if err != nil {
glog.Fatal(err)
}
Expand All @@ -63,7 +71,7 @@ func main() {

stopCh := make(chan struct{})

crdClient, err := crd.NewClient(config)
crdClient, err := crdclientset.NewForConfig(config)
if err != nil {
glog.Fatal(err)
}
Expand Down Expand Up @@ -92,9 +100,9 @@ func main() {
close(stopCh)
}

func buildConfig(kubeConfig string) (*rest.Config, error) {
func buildConfig(masterUrl string, kubeConfig string) (*rest.Config, error) {
if kubeConfig != "" {
return clientcmd.BuildConfigFromFlags("", kubeConfig)
return clientcmd.BuildConfigFromFlags(masterUrl, kubeConfig)
}
return rest.InClusterConfig()
}
Expand Down
44 changes: 19 additions & 25 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
"k8s.io/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1alpha1"
"k8s.io/spark-on-k8s-operator/pkg/crd"
"k8s.io/spark-on-k8s-operator/pkg/util"
crdclientset "k8s.io/spark-on-k8s-operator/pkg/client/clientset/versioned"
crdinformers "k8s.io/spark-on-k8s-operator/pkg/client/informers/externalversions"

apiv1 "k8s.io/api/core/v1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
Expand All @@ -49,7 +50,7 @@ const (

// SparkApplicationController manages instances of SparkApplication.
type SparkApplicationController struct {
crdClient crd.ClientInterface
crdClient crdclientset.Interface
kubeClient clientset.Interface
extensionsClient apiextensionsclient.Interface
recorder record.EventRecorder
Expand All @@ -64,7 +65,7 @@ type SparkApplicationController struct {

// New creates a new SparkApplicationController.
func New(
crdClient crd.ClientInterface,
crdClient crdclientset.Interface,
kubeClient clientset.Interface,
extensionsClient apiextensionsclient.Interface,
submissionRunnerWorkers int) *SparkApplicationController {
Expand All @@ -80,7 +81,7 @@ func New(
}

func newSparkApplicationController(
crdClient crd.ClientInterface,
crdClient crdclientset.Interface,
kubeClient clientset.Interface,
extensionsClient apiextensionsclient.Interface,
eventRecorder record.EventRecorder,
Expand Down Expand Up @@ -133,27 +134,19 @@ func (s *SparkApplicationController) Start(stopCh <-chan struct{}) error {
}

func (s *SparkApplicationController) startSparkApplicationInformer(stopCh <-chan struct{}) error {
listerWatcher := cache.NewListWatchFromClient(
s.crdClient.RESTClient(),
crd.Plural,
apiv1.NamespaceAll,
fields.Everything())

_, sparkApplicationInformer := cache.NewInformer(
listerWatcher,
&v1alpha1.SparkApplication{},
// resyncPeriod. Every resyncPeriod, all resources in the cache will retrigger events.
informerFactory := crdinformers.NewSharedInformerFactory(
s.crdClient,
// resyncPeriod. Every resyncPeriod, all resources in the cache will re-trigger events.
// Set to 0 to disable the resync.
0*time.Second,
// SparkApplication resource event handlers.
cache.ResourceEventHandlerFuncs{
AddFunc: s.onAdd,
DeleteFunc: s.onDelete,
})

go sparkApplicationInformer.Run(stopCh)
0*time.Second)
informer := informerFactory.Sparkoperator().V1alpha1().SparkApplications().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: s.onAdd,
DeleteFunc: s.onDelete,
})
go informer.Run(stopCh)

if !cache.WaitForCacheSync(stopCh, sparkApplicationInformer.HasSynced) {
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
return fmt.Errorf("timed out waiting for cache to sync")
}

Expand Down Expand Up @@ -354,15 +347,16 @@ func (s *SparkApplicationController) updateSparkApplicationWithRetries(
return nil
}

updated, err := s.crdClient.Update(toUpdate)
client := s.crdClient.SparkoperatorV1alpha1().SparkApplications(toUpdate.Namespace)
updated, err := client.Update(toUpdate)
if err == nil {
return updated
}

// Failed update to the API server.
// Get the latest version from the API server first and re-apply the update.
name := toUpdate.Name
toUpdate, err = s.crdClient.Get(name, toUpdate.Namespace)
toUpdate, err = client.Get(name, metav1.GetOptions{})
if err != nil {
glog.Errorf("failed to get SparkApplication %s: %v", name, err)
return nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"testing"

"k8s.io/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1alpha1"
"k8s.io/spark-on-k8s-operator/pkg/crd"
crdfake "k8s.io/spark-on-k8s-operator/pkg/client/clientset/versioned/fake"

"github.com/stretchr/testify/assert"

Expand All @@ -33,7 +33,7 @@ import (
)

func newFakeController() *SparkApplicationController {
crdClient := crd.NewFakeClient()
crdClient := crdfake.NewSimpleClientset()
kubeClient := kubeclientfake.NewSimpleClientset()
kubeClient.CoreV1().Nodes().Create(&apiv1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -64,6 +64,7 @@ func TestOnAdd(t *testing.T) {
Namespace: "default",
},
}
ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Create(app)

go ctrl.onAdd(app)
submission := <-ctrl.runner.queue
Expand All @@ -85,6 +86,7 @@ func TestOnDelete(t *testing.T) {
AppID: "foo-123",
},
}
ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Create(app)

driverPod := &apiv1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -130,6 +132,7 @@ func TestProcessSingleDriverStateUpdate(t *testing.T) {
},
},
}
ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Create(app)
ctrl.runningApps[app.Status.AppID] = app

testcases := []testcase{
Expand Down Expand Up @@ -210,6 +213,7 @@ func TestProcessSingleAppStateUpdate(t *testing.T) {
},
},
}
ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Create(app)
ctrl.runningApps[app.Status.AppID] = app

testcases := []testcase{
Expand Down Expand Up @@ -310,6 +314,7 @@ func TestProcessSingleExecutorStateUpdate(t *testing.T) {
ExecutorState: make(map[string]v1alpha1.ExecutorState),
},
}
ctrl.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Create(app)
ctrl.runningApps[app.Status.AppID] = app

testcases := []testcase{
Expand Down
Loading

0 comments on commit e021f07

Please sign in to comment.