Skip to content

Commit

Permalink
add test execution synchronization and argument passing, and simplify…
Browse files Browse the repository at this point in the history
… code
  • Loading branch information
simskij committed Oct 25, 2020
1 parent 3ab2056 commit 5ed3ff3
Show file tree
Hide file tree
Showing 13 changed files with 423 additions and 100 deletions.
20 changes: 16 additions & 4 deletions api/v1alpha1/k6_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,25 @@ import (

// K6Spec defines the desired state of K6
type K6Spec struct {
Script string `json:"script"`
Parallelism int32 `json:"parallelism"`
Separate bool `json:"separate,omitempty"`
Script string `json:"script"`
Parallelism int32 `json:"parallelism"`
Separate bool `json:"separate,omitempty"`
// Cleanup Cleanup `json:"cleanup,omitempty"` // TODO
Arguments string `json:"arguments,omitempty"`
}

// Cleanup allows for automatic cleanup of resources pre or post execution
// +kubebuilder:validation:Enum=pre;post
// type Cleanup string

// Stage describes which stage of the test execution lifecycle our runners are in
// +kubebuilder:validation:Enum=created;started
type Stage string

// K6Status defines the observed state of K6
type K6Status struct{}
type K6Status struct {
Stage Stage `json:"stage,omitempty"`
}

// K6 is the Schema for the k6s API
// +kubebuilder:object:root=true
Expand Down
12 changes: 12 additions & 0 deletions config/crd/bases/k6.io_k6s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ spec:
spec:
description: K6Spec defines the desired state of K6
properties:
arguments:
description: "\tCleanup Cleanup `json:\"cleanup,omitempty\"` //
TODO"
type: string
parallelism:
format: int32
type: integer
Expand All @@ -49,6 +53,14 @@ spec:
type: object
status:
description: K6Status defines the observed state of K6
properties:
stage:
description: Stage describes which stage of the test execution lifecycle
our runners are in
enum:
- created
- started
type: string
type: object
type: object
version: v1alpha1
Expand Down
13 changes: 2 additions & 11 deletions config/samples/k6_v1alpha1_configmap.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,9 @@ data:
import { Rate } from 'k6/metrics';
import { check, sleep } from 'k6';
import {
getExecutionSegments,
logSegment,
} from 'https://jslib.k6.io/k8s-distributed-execution/0.0.1/index.js';
const failRate = new Rate('failed_requests');
export function setup() {
logSegment();
}
export let options = Object.assign({
export let options = {
stages: [
{ target: 200, duration: '30s' },
{ target: 0, duration: '30s' },
Expand All @@ -28,7 +19,7 @@ data:
failed_requests: ['rate<=0'],
http_req_duration: ['p(95)<500'],
},
}, getExecutionSegments());
};
export default function () {
const result = http.get('https://test-api.k6.io/public/crocodiles/');
Expand Down
3 changes: 1 addition & 2 deletions config/samples/k6_v1alpha1_k6.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ metadata:
name: k6-sample
spec:
parallelism: 4
script: k6-test
separate: false
script: k6-test
68 changes: 20 additions & 48 deletions controllers/k6_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,12 @@ package controllers
import (
"context"
"fmt"
"github.com/k6io/operator/pkg/resources"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

"github.com/go-logr/logr"
"github.com/k6io/operator/api/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/k6io/operator/api/v1alpha1"
)

// K6Reconciler reconciles a K6 object
Expand All @@ -47,61 +42,38 @@ func (r *K6Reconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("k6", req.NamespacedName)

result := ctrl.Result{}

// Fetch the CRD
k6 := &v1alpha1.K6{}
err := r.Get(ctx, req.NamespacedName, k6)
if err != nil {
if errors.IsNotFound(err) {
log.Info("Request deleted. Skipping requeuing.")
return result, nil
log.Info("Request deleted. Nothing to reconcile.")
return ctrl.Result{}, nil
}
log.Error(err, "Could not fetch request")
return ctrl.Result{Requeue: true}, err
}

// Check for previous jobs
found := &batchv1.Job{}
namespacedName := types.NamespacedName{
Name: fmt.Sprintf("%s-1", k6.Name),
Namespace: k6.Namespace,
}

if err = r.Get(ctx, namespacedName, found); err == nil || !errors.IsNotFound(err) {
log.Info("Could not start a new test, Make sure you've deleted your previous run.")
return result, err
}

// Create jobs
for i := 1; i <= int(k6.Spec.Parallelism); i++ {
if err = r.LaunchTest(ctx, k6, i, log); err != nil {
return ctrl.Result{}, err
}
}

return result, nil
}

func (r *K6Reconciler) LaunchTest(ctx context.Context, k6 *v1alpha1.K6, index int, log logr.Logger) error {
msg := fmt.Sprintf("Launching k6 test #%d", index)
log.Info(msg)

job := resources.NewJob(k6, index)

if err := ctrl.SetControllerReference(k6, job, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for job")
return err
}

if err := r.Create(ctx, job); err != nil {
log.Error(err, "Failed to launch k6 test")
return err
switch k6.Status.Stage {
case "":
return CreateJobs(ctx, log, k6, r)
case "created":
return StartJobs(ctx, log, k6, r)
case "started":
// wait for test to finish and then mark as finished
return ctrl.Result{}, nil
case "finished":
// delete if configured
// notify if configured
return ctrl.Result{}, nil
}

return nil
err = fmt.Errorf("invalid status")
log.Error(err, "Invalid status for the k6 resource.")
return ctrl.Result{}, err
}

// SetupWithManager sets up a managed controller that will reconcile all events for the K6 CRD
func (r *K6Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.K6{}).
Expand Down
79 changes: 79 additions & 0 deletions controllers/k6_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package controllers

import (
"context"
"fmt"
"github.com/go-logr/logr"
"github.com/k6io/operator/api/v1alpha1"
"github.com/k6io/operator/pkg/resources/jobs"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
)

// CreateJobs that will spawn k6 pods, running distributed tests
func CreateJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) {

var err error
var res ctrl.Result

log.Info("Creating test jobs")

if res, err = createJobSpecs(ctx, log, k6, r); err != nil {
return res, err
}

k6.Status.Stage = "created"
if err = r.Client.Status().Update(ctx, k6); err != nil {
log.Error(err, "Could not update status of custom resource")
return ctrl.Result{}, nil
}

return ctrl.Result{}, nil
}

func createJobSpecs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) {
found := &batchv1.Job{}
namespacedName := types.NamespacedName{
Name: fmt.Sprintf("%s-1", k6.Name),
Namespace: k6.Namespace,
}

if err := r.Get(ctx, namespacedName, found); err == nil || !errors.IsNotFound(err) {
log.Info("Could not start a new test, Make sure you've deleted your previous run.")
return ctrl.Result{}, err
}

for i := 1; i <= int(k6.Spec.Parallelism); i++ {
if err := launchTest(ctx, k6, i, log, r); err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}

func launchTest(ctx context.Context, k6 *v1alpha1.K6, index int, log logr.Logger, r *K6Reconciler) error {
var job *batchv1.Job
var err error

msg := fmt.Sprintf("Launching k6 test #%d", index)
log.Info(msg)

if job, err = jobs.NewRunnerJob(k6, index); err != nil {
log.Error(err, "Failed to generate k6 test job")
return err
}

if err = ctrl.SetControllerReference(k6, job, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for job")
return err
}

if err = r.Create(ctx, job); err != nil {
log.Error(err, "Failed to launch k6 test")
return err
}

return nil
}
81 changes: 81 additions & 0 deletions controllers/k6_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package controllers

import (
"context"
"fmt"
"github.com/go-logr/logr"
"github.com/k6io/operator/api/v1alpha1"
"github.com/k6io/operator/pkg/resources/jobs"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"time"
)

// StartJobs in the Ready phase using a curl container
func StartJobs(ctx context.Context, log logr.Logger, k6 *v1alpha1.K6, r *K6Reconciler) (ctrl.Result, error) {
log.Info("Waiting for pods to get ready")

err := wait.PollImmediate(time.Second*5, time.Second*60, func() (done bool, err error) {
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.Name,
})

opts := &client.ListOptions{LabelSelector: selector}
pl := &v1.PodList{}

if e := r.List(ctx, pl, opts); e != nil {
log.Error(e, "Could not list pods")
return false, e
}

var count int
for _, pod := range pl.Items {
if pod.Status.Phase != "Running" {
continue
}
count++
}

log.Info(fmt.Sprintf("%d/%d pods ready", count, k6.Spec.Parallelism))

if count != int(k6.Spec.Parallelism) {
return false, nil
}

var ips []string

for _, pod := range pl.Items {
ips = append(ips, pod.Status.PodIP)
}

starter := jobs.NewStarterJob(k6, ips)

if err = ctrl.SetControllerReference(k6, starter, r.Scheme); err != nil {
log.Error(err, "Failed to set controller reference for job")
}

if err = r.Create(ctx, starter); err != nil {
log.Error(err, "Failed to launch k6 test starter")
return true, err
}

return true, nil
})

if err != nil {
log.Error(err, "Failed to start all jobs")
return ctrl.Result{}, err
}

k6.Status.Stage = "started"
if err = r.Client.Status().Update(ctx, k6); err != nil {
log.Error(err, "Could not update status of custom resource")
return ctrl.Result{}, err
}

return ctrl.Result{}, nil
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package main

import (
"flag"
"github.com/k6io/operator/controllers"
"os"

"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -28,7 +29,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log/zap"

k6v1alpha1 "github.com/k6io/operator/api/v1alpha1"
"github.com/k6io/operator/controllers"
// +kubebuilder:scaffold:imports
)

Expand Down
Loading

0 comments on commit 5ed3ff3

Please sign in to comment.