From 5ed3ff3dc4c5ea78457d1960230fe8c25ff5a175 Mon Sep 17 00:00:00 2001 From: Simon Aronsson Date: Sun, 25 Oct 2020 13:41:29 +0100 Subject: [PATCH] add test execution synchronization and argument passing, and simplify code --- api/v1alpha1/k6_types.go | 20 ++++- config/crd/bases/k6.io_k6s.yaml | 12 +++ config/samples/k6_v1alpha1_configmap.yml | 13 +-- config/samples/k6_v1alpha1_k6.yaml | 3 +- controllers/k6_controller.go | 68 +++++----------- controllers/k6_create.go | 79 ++++++++++++++++++ controllers/k6_start.go | 81 +++++++++++++++++++ main.go | 2 +- pkg/resources/containers/curl.go | 51 ++++++++++++ .../{resources.go => jobs/runner.go} | 80 ++++++++++-------- pkg/resources/jobs/starter.go | 36 +++++++++ pkg/segmentation/segmentation.go | 46 +++++++++++ pkg/segmentation/suite_test.go | 32 ++++++++ 13 files changed, 423 insertions(+), 100 deletions(-) create mode 100644 controllers/k6_create.go create mode 100644 controllers/k6_start.go create mode 100644 pkg/resources/containers/curl.go rename pkg/resources/{resources.go => jobs/runner.go} (67%) create mode 100644 pkg/resources/jobs/starter.go create mode 100644 pkg/segmentation/segmentation.go create mode 100644 pkg/segmentation/suite_test.go diff --git a/api/v1alpha1/k6_types.go b/api/v1alpha1/k6_types.go index b5e0da04..82581669 100644 --- a/api/v1alpha1/k6_types.go +++ b/api/v1alpha1/k6_types.go @@ -20,13 +20,25 @@ import ( // K6Spec defines the desired state of K6 type K6Spec struct { - Script string `json:"script"` - Parallelism int32 `json:"parallelism"` - Separate bool `json:"separate,omitempty"` + Script string `json:"script"` + Parallelism int32 `json:"parallelism"` + Separate bool `json:"separate,omitempty"` +// Cleanup Cleanup `json:"cleanup,omitempty"` // TODO + Arguments string `json:"arguments,omitempty"` } +// Cleanup allows for automatic cleanup of resources pre or post execution +// +kubebuilder:validation:Enum=pre;post +// type Cleanup string + +// Stage describes which stage of the test execution lifecycle our runners are in +// +kubebuilder:validation:Enum=created;started +type Stage string + // K6Status defines the observed state of K6 -type K6Status struct{} +type K6Status struct { + Stage Stage `json:"stage,omitempty"` +} // K6 is the Schema for the k6s API // +kubebuilder:object:root=true diff --git a/config/crd/bases/k6.io_k6s.yaml b/config/crd/bases/k6.io_k6s.yaml index be775d2b..a55b33a6 100644 --- a/config/crd/bases/k6.io_k6s.yaml +++ b/config/crd/bases/k6.io_k6s.yaml @@ -36,6 +36,10 @@ spec: spec: description: K6Spec defines the desired state of K6 properties: + arguments: + description: "\tCleanup Cleanup `json:\"cleanup,omitempty\"` // + TODO" + type: string parallelism: format: int32 type: integer @@ -49,6 +53,14 @@ spec: type: object status: description: K6Status defines the observed state of K6 + properties: + stage: + description: Stage describes which stage of the test execution lifecycle + our runners are in + enum: + - created + - started + type: string type: object type: object version: v1alpha1 diff --git a/config/samples/k6_v1alpha1_configmap.yml b/config/samples/k6_v1alpha1_configmap.yml index 0be61a14..51f2961f 100644 --- a/config/samples/k6_v1alpha1_configmap.yml +++ b/config/samples/k6_v1alpha1_configmap.yml @@ -8,18 +8,9 @@ data: import { Rate } from 'k6/metrics'; import { check, sleep } from 'k6'; - import { - getExecutionSegments, - logSegment, - } from 'https://jslib.k6.io/k8s-distributed-execution/0.0.1/index.js'; - const failRate = new Rate('failed_requests'); - export function setup() { - logSegment(); - } - - export let options = Object.assign({ + export let options = { stages: [ { target: 200, duration: '30s' }, { target: 0, duration: '30s' }, @@ -28,7 +19,7 @@ data: failed_requests: ['rate<=0'], http_req_duration: ['p(95)<500'], }, - }, getExecutionSegments()); + }; export default function () { const result = http.get('https://test-api.k6.io/public/crocodiles/'); diff --git a/config/samples/k6_v1alpha1_k6.yaml b/config/samples/k6_v1alpha1_k6.yaml index 9eca2ab7..3381fa53 100644 --- a/config/samples/k6_v1alpha1_k6.yaml +++ b/config/samples/k6_v1alpha1_k6.yaml @@ -4,5 +4,4 @@ metadata: name: k6-sample spec: parallelism: 4 - script: k6-test - separate: false \ No newline at end of file + script: k6-test \ No newline at end of file diff --git a/controllers/k6_controller.go b/controllers/k6_controller.go index 7dec8674..9eecff49 100644 --- a/controllers/k6_controller.go +++ b/controllers/k6_controller.go @@ -17,17 +17,12 @@ package controllers import ( "context" "fmt" - "github.com/k6io/operator/pkg/resources" - batchv1 "k8s.io/api/batch/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/types" - "github.com/go-logr/logr" + "github.com/k6io/operator/api/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/k6io/operator/api/v1alpha1" ) // K6Reconciler reconciles a K6 object @@ -47,61 +42,38 @@ func (r *K6Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ctx := context.Background() log := r.Log.WithValues("k6", req.NamespacedName) - result := ctrl.Result{} - // Fetch the CRD k6 := &v1alpha1.K6{} err := r.Get(ctx, req.NamespacedName, k6) if err != nil { if errors.IsNotFound(err) { - log.Info("Request deleted. Skipping requeuing.") - return result, nil + log.Info("Request deleted. Nothing to reconcile.") + return ctrl.Result{}, nil } log.Error(err, "Could not fetch request") return ctrl.Result{Requeue: true}, err } - // Check for previous jobs - found := &batchv1.Job{} - namespacedName := types.NamespacedName{ - Name: fmt.Sprintf("%s-1", k6.Name), - Namespace: k6.Namespace, - } - - if err = r.Get(ctx, namespacedName, found); err == nil || !errors.IsNotFound(err) { - log.Info("Could not start a new test, Make sure you've deleted your previous run.") - return result, err - } - - // Create jobs - for i := 1; i <= int(k6.Spec.Parallelism); i++ { - if err = r.LaunchTest(ctx, k6, i, log); err != nil { - return ctrl.Result{}, err - } - } - - return result, nil -} - -func (r *K6Reconciler) LaunchTest(ctx context.Context, k6 *v1alpha1.K6, index int, log logr.Logger) error { - msg := fmt.Sprintf("Launching k6 test #%d", index) - log.Info(msg) - - job := resources.NewJob(k6, index) - - if err := ctrl.SetControllerReference(k6, job, r.Scheme); err != nil { - log.Error(err, "Failed to set controller reference for job") - return err - } - - if err := r.Create(ctx, job); err != nil { - log.Error(err, "Failed to launch k6 test") - return err + switch k6.Status.Stage { + case "": + return CreateJobs(ctx, log, k6, r) + case "created": + return StartJobs(ctx, log, k6, r) + case "started": + // wait for test to finish and then mark as finished + return ctrl.Result{}, nil + case "finished": + // delete if configured + // notify if configured + return ctrl.Result{}, nil } - return nil + err = fmt.Errorf("invalid status") + log.Error(err, "Invalid status for the k6 resource.") + return ctrl.Result{}, err } +// SetupWithManager sets up a managed controller that will reconcile all events for the K6 CRD func (r *K6Reconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.K6{}). diff --git a/controllers/k6_create.go b/controllers/k6_create.go new file mode 100644 index 00000000..893cbc2a --- /dev/null +++ b/controllers/k6_create.go @@ -0,0 +1,79 @@ +package controllers + +import ( + "context" + "fmt" + "github.com/go-logr/logr" + "github.com/k6io/operator/api/v1alpha1" + "github.com/k6io/operator/pkg/resources/jobs" + batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" +) + +// CreateJobs that will spawn k6 pods, running distributed tests +func CreateJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) { + + var err error + var res ctrl.Result + + log.Info("Creating test jobs") + + if res, err = createJobSpecs(ctx, log, k6, r); err != nil { + return res, err + } + + k6.Status.Stage = "created" + if err = r.Client.Status().Update(ctx, k6); err != nil { + log.Error(err, "Could not update status of custom resource") + return ctrl.Result{}, nil + } + + return ctrl.Result{}, nil +} + +func createJobSpecs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) { + found := &batchv1.Job{} + namespacedName := types.NamespacedName{ + Name: fmt.Sprintf("%s-1", k6.Name), + Namespace: k6.Namespace, + } + + if err := r.Get(ctx, namespacedName, found); err == nil || !errors.IsNotFound(err) { + log.Info("Could not start a new test, Make sure you've deleted your previous run.") + return ctrl.Result{}, err + } + + for i := 1; i <= int(k6.Spec.Parallelism); i++ { + if err := launchTest(ctx, k6, i, log, r); err != nil { + return ctrl.Result{}, err + } + } + return ctrl.Result{}, nil +} + +func launchTest(ctx context.Context, k6 *v1alpha1.K6, index int, log logr.Logger, r *K6Reconciler) error { + var job *batchv1.Job + var err error + + msg := fmt.Sprintf("Launching k6 test #%d", index) + log.Info(msg) + + if job, err = jobs.NewRunnerJob(k6, index); err != nil { + log.Error(err, "Failed to generate k6 test job") + return err + } + + if err = ctrl.SetControllerReference(k6, job, r.Scheme); err != nil { + log.Error(err, "Failed to set controller reference for job") + return err + } + + if err = r.Create(ctx, job); err != nil { + log.Error(err, "Failed to launch k6 test") + return err + } + + return nil +} diff --git a/controllers/k6_start.go b/controllers/k6_start.go new file mode 100644 index 00000000..1b9e59b5 --- /dev/null +++ b/controllers/k6_start.go @@ -0,0 +1,81 @@ +package controllers + +import ( + "context" + "fmt" + "github.com/go-logr/logr" + "github.com/k6io/operator/api/v1alpha1" + "github.com/k6io/operator/pkg/resources/jobs" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "time" +) + +// StartJobs in the Ready phase using a curl container +func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) { + log.Info("Waiting for pods to get ready") + + err := wait.PollImmediate(time.Second*5, time.Second*60, func() (done bool, err error) { + selector := labels.SelectorFromSet(map[string]string{ + "app": "k6", + "k6_cr": k6.Name, + }) + + opts := &client.ListOptions{LabelSelector: selector} + pl := &v1.PodList{} + + if e := r.List(ctx, pl, opts); e != nil { + log.Error(e, "Could not list pods") + return false, e + } + + var count int + for _, pod := range pl.Items { + if pod.Status.Phase != "Running" { + continue + } + count++ + } + + log.Info(fmt.Sprintf("%d/%d pods ready", count, k6.Spec.Parallelism)) + + if count != int(k6.Spec.Parallelism) { + return false, nil + } + + var ips []string + + for _, pod := range pl.Items { + ips = append(ips, pod.Status.PodIP) + } + + starter := jobs.NewStarterJob(k6, ips) + + if err = ctrl.SetControllerReference(k6, starter, r.Scheme); err != nil { + log.Error(err, "Failed to set controller reference for job") + } + + if err = r.Create(ctx, starter); err != nil { + log.Error(err, "Failed to launch k6 test starter") + return true, err + } + + return true, nil + }) + + if err != nil { + log.Error(err, "Failed to start all jobs") + return ctrl.Result{}, err + } + + k6.Status.Stage = "started" + if err = r.Client.Status().Update(ctx, k6); err != nil { + log.Error(err, "Could not update status of custom resource") + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} diff --git a/main.go b/main.go index 1788aa3b..49582c0c 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "github.com/k6io/operator/controllers" "os" "k8s.io/apimachinery/pkg/runtime" @@ -28,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log/zap" k6v1alpha1 "github.com/k6io/operator/api/v1alpha1" - "github.com/k6io/operator/controllers" // +kubebuilder:scaffold:imports ) diff --git a/pkg/resources/containers/curl.go b/pkg/resources/containers/curl.go new file mode 100644 index 00000000..b8ff663a --- /dev/null +++ b/pkg/resources/containers/curl.go @@ -0,0 +1,51 @@ +package containers + +import ( + "encoding/json" + "fmt" + v1 "k8s.io/api/core/v1" + "strings" +) + +// NewCurlContainer is used to get a template for a new k6 starting curl container. +func NewCurlContainer(ips []string) v1.Container { + req, _ := json.Marshal( + statusAPIRequest{ + Data: statusAPIRequestData{ + Attributes: statusAPIRequestDataAttributes{ + Paused: false, + }, + ID: "default", + Type: "status", + }, + }) + + var parts []string + for _, ip := range ips { + parts = append(parts, fmt.Sprintf("curl -X PATCH -H 'Content-Type: application/json' http://%s:6565/v1/status -d '%s'", ip, req)) + } + + return v1.Container{ + Name: fmt.Sprintf("k6-curl"), + Image: "radial/busyboxplus:curl", + Command: []string{ + "sh", + "-c", + strings.Join(parts, ";"), + }, + } +} + +type statusAPIRequest struct { + Data statusAPIRequestData `json:"data"` +} + +type statusAPIRequestData struct { + Attributes statusAPIRequestDataAttributes `json:"attributes"` + ID string `json:"id"` + Type string `json:"type"` +} + +type statusAPIRequestDataAttributes struct { + Paused bool `json:"paused"` +} diff --git a/pkg/resources/resources.go b/pkg/resources/jobs/runner.go similarity index 67% rename from pkg/resources/resources.go rename to pkg/resources/jobs/runner.go index 864e7634..ab2c646c 100644 --- a/pkg/resources/resources.go +++ b/pkg/resources/jobs/runner.go @@ -1,16 +1,40 @@ -package resources +package jobs import ( "fmt" "github.com/k6io/operator/api/v1alpha1" + "github.com/k6io/operator/pkg/segmentation" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// NewJob creates a new k6 job from a CRD -func NewJob(k *v1alpha1.K6, index int) *batchv1.Job { +// NewRunnerJob creates a new k6 job from a CRD +func NewRunnerJob(k *v1alpha1.K6, index int) (*batchv1.Job, error) { name := fmt.Sprintf("%s-%d", k.Name, index) + command := []string{"k6", "run"} + + if k.Spec.Parallelism > 1 { + var args []string + var err error + + if args, err = segmentation.NewCommandFragments(index, int(k.Spec.Parallelism)); err != nil { + return nil, err + + } + command = append(command, args...) + } + + if k.Spec.Arguments != "" { + command = append(command, k.Spec.Arguments) + } + command = append( + command, + "/test/test.js", + "--address=0.0.0.0:6565", + "--paused") + + var zero int64 = 0 job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ @@ -23,18 +47,20 @@ func NewJob(k *v1alpha1.K6, index int) *batchv1.Job { Labels: newLabels(k.Name), }, Spec: corev1.PodSpec{ + Hostname: name, RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{{ Image: "loadimpact/k6:latest", Name: "k6", - Command: []string{"k6", "run", "/test/test.js"}, + Command: command, VolumeMounts: []corev1.VolumeMount{{ Name: "k6-test-volume", MountPath: "/test", }}, - Env: newEnvVars(k.Spec.Parallelism, index), + Ports: []corev1.ContainerPort{{ContainerPort: 6565}}, }}, - Volumes: newVolumeSpec(k.Spec.Script), + TerminationGracePeriodSeconds: &zero, + Volumes: newVolumeSpec(k.Spec.Script), }, }, }, @@ -43,34 +69,7 @@ func NewJob(k *v1alpha1.K6, index int) *batchv1.Job { if k.Spec.Separate { job.Spec.Template.Spec.Affinity = newAntiAffinity() } - - return job -} - -func newEnvVars(parallelism int32, index int) []corev1.EnvVar { - return []corev1.EnvVar{ - { - Name: "K6_INSTANCES_INDEX", - Value: fmt.Sprintf("%d", index), - }, - { - Name: "K6_INSTANCES_TOTAL", - Value: fmt.Sprintf("%d", parallelism), - }, - } -} - -func newVolumeSpec(script string) []corev1.Volume { - return []corev1.Volume{{ - Name: "k6-test-volume", - VolumeSource: corev1.VolumeSource{ - ConfigMap: &corev1.ConfigMapVolumeSource{ - LocalObjectReference: corev1.LocalObjectReference{ - Name: script, - }, - }, - }, - }} + return job, nil } func newAntiAffinity() *corev1.Affinity { @@ -96,6 +95,19 @@ func newAntiAffinity() *corev1.Affinity { } } +func newVolumeSpec(script string) []corev1.Volume { + return []corev1.Volume{{ + Name: "k6-test-volume", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: script, + }, + }, + }, + }} +} + func newLabels(name string) map[string]string { return map[string]string{ "app": "k6", diff --git a/pkg/resources/jobs/starter.go b/pkg/resources/jobs/starter.go new file mode 100644 index 00000000..c33a5a71 --- /dev/null +++ b/pkg/resources/jobs/starter.go @@ -0,0 +1,36 @@ +package jobs + +import ( + "fmt" + "github.com/k6io/operator/api/v1alpha1" + "github.com/k6io/operator/pkg/resources/containers" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// NewStarterJob builds a template used for creating a starter job +func NewStarterJob(k6 *v1alpha1.K6, ips []string) *batchv1.Job { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-starter", k6.Name), + Namespace: k6.Namespace, + }, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "k6", + "k6_cr": k6.Name, + }, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + containers.NewCurlContainer(ips), + }, + }, + }, + }, + } +} diff --git a/pkg/segmentation/segmentation.go b/pkg/segmentation/segmentation.go new file mode 100644 index 00000000..fe05b4e7 --- /dev/null +++ b/pkg/segmentation/segmentation.go @@ -0,0 +1,46 @@ +package segmentation + +import ( + "errors" + "fmt" + "strings" +) + +const ( + beginning = "0" + end = "1" +) + +// NewCommandFragments builds command fragments for starting k6 with execution segments. +func NewCommandFragments(index int, total int) ([]string, error) { + + if index > total { + return nil, errors.New("node index exceeds configured parallelism") + } + + parts := []string{beginning} + + for i := 1; i < total; i++ { + parts = append(parts, fmt.Sprintf("%d/%d", i, total)) + } + + parts = append(parts, end) + + getSegmentPart := func(index int, total int) string { + if index == 0 { + return "0" + } + if index == total { + return "1" + } + return fmt.Sprintf("%d/%d", index, total) + } + + segment := fmt.Sprintf("%s:%s", getSegmentPart(index-1, total), getSegmentPart(index, total)) + sequence := strings.Join(parts[:], ",") + + return []string{ + fmt.Sprintf("--execution-segment=%s", segment), + fmt.Sprintf("--execution-segment-sequence=%s", sequence), + }, nil +} diff --git a/pkg/segmentation/suite_test.go b/pkg/segmentation/suite_test.go new file mode 100644 index 00000000..a8c07d00 --- /dev/null +++ b/pkg/segmentation/suite_test.go @@ -0,0 +1,32 @@ +package segmentation_test + +import ( + "fmt" + "github.com/k6io/operator/pkg/segmentation" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" + "testing" +) + +func TestSegmentation(t *testing.T) { + RegisterFailHandler(Fail) + + RunSpecsWithDefaultAndCustomReporters(t, + "Segmentation Suite", + []Reporter{printer.NewlineReporter{}}) +} + +var _ = Describe("the execution segmentation string generator", func() { + When("given the index 1 and total 4", func() { + It("should return proper segmentation fragments", func() { + output, err := segmentation.NewCommandFragments(1, 4) + fmt.Print(output) + Expect(err).NotTo(HaveOccurred()) + Expect(output).To(Equal([]string{ + "--execution-segment=0:1/4", + "--execution-segment-sequence=0,1/4,2/4,3/4,1", + })) + }) + }) +})