Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds provisioner limits based on cpu and mem #817

Merged
merged 27 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
632e737
Limit provisioner by cpu and memory
suket22 Nov 19, 2021
db8dbb1
Update pkg/apis/provisioning/v1alpha5/provisioner.go
suket22 Nov 19, 2021
5a7759a
Update pkg/controllers/provisioning/provisioner.go
suket22 Nov 19, 2021
5c5b3cf
Update pkg/controllers/provisioning/resourcecounter.go
suket22 Nov 19, 2021
e00778a
Separate controller for counting resources
suket22 Nov 22, 2021
4a6cd5e
Separate controller for counting resources
suket22 Nov 22, 2021
7633fc9
Etarn fixes
ellistarn Nov 23, 2021
9310ccd
Merge pull request #1 from ellistarn/limitsImpl
suket22 Nov 23, 2021
217942f
Some more fixes - don't default the status
suket22 Nov 23, 2021
2f291aa
Remove extra logging statement
suket22 Nov 23, 2021
9d67667
Fix defaults, fix binding errors
suket22 Nov 23, 2021
3285786
More minor fixes
suket22 Nov 23, 2021
4531d68
Remove extra patch from rbac
suket22 Nov 23, 2021
00a10d2
Addressing comments on the PR
suket22 Nov 23, 2021
2e20056
Compare provisionerSpecs before stopping existing Provisioners
suket22 Nov 23, 2021
7134769
Fix failing build!
suket22 Nov 23, 2021
64c1714
Merge branch 'main' into limitsImpl
suket22 Nov 23, 2021
555d18d
Don't reassign the provisioner in the launcher
suket22 Nov 24, 2021
f866907
Addressing more comments on the PR
suket22 Nov 24, 2021
74d6bd9
Adds basic unit test
suket22 Nov 24, 2021
afa1796
Update pkg/controllers/counter/controller.go
suket22 Nov 24, 2021
9974847
Update pkg/apis/provisioning/v1alpha5/provisioner.go
suket22 Nov 24, 2021
dfbc043
Update pkg/controllers/counter/controller.go
suket22 Nov 24, 2021
3036eea
More refactoring
suket22 Nov 24, 2021
ca36e42
Merge branch 'main' into limitsImpl
suket22 Nov 24, 2021
26c1831
Update pkg/controllers/provisioning/suite_test.go
suket22 Nov 24, 2021
2591f0a
Apply suggestions from code review
suket22 Nov 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More minor fixes
  • Loading branch information
suket22 committed Nov 23, 2021
commit 3285786ab1b94a6c262bb98bc2fe14eabe7c714b
48 changes: 10 additions & 38 deletions pkg/apis/provisioning/v1alpha5/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,57 +15,29 @@ limitations under the License.
package v1alpha5

import (
"context"
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

const (
// CPU limit, in cores. (500m = .5 cores)
ResourceLimitsCPU v1.ResourceName = "cpu"
// Memory limit, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
ResourceLimitsMemory v1.ResourceName = "memory"
)

var DefaultCPULimits *resource.Quantity = resource.NewScaledQuantity(100, 0)
var DefaultMemoryLimits *resource.Quantity = resource.NewScaledQuantity(400, resource.Giga)

// Limits define bounds on the resources being provisioned by Karpenter
type Limits struct {
// Resources contains all the allocatable resources that Karpenter supports for limiting.
Resources v1.ResourceList `json:"resources,omitempty"`
}

func (l *Limits) Default(ctx context.Context) {
if l.Resources == nil {
l.Resources = v1.ResourceList{
ResourceLimitsCPU: *DefaultCPULimits,
ResourceLimitsMemory: *DefaultMemoryLimits,
}
return
}
if _, ok := l.Resources[ResourceLimitsCPU]; !ok {
l.Resources[ResourceLimitsCPU] = *DefaultCPULimits
}
if _, ok := l.Resources[ResourceLimitsMemory]; !ok {
l.Resources[ResourceLimitsMemory] = *DefaultMemoryLimits
}
}

func (p *Provisioner) HasExceededResources() (bool, error) {
func (p *Provisioner) HasExceededResources() error {
var currentResource = p.Status.Resources
var currentLimits = p.Spec.Limits.Resources

var CPUUsage = currentResource[ResourceLimitsCPU]
if CPUUsage.Cmp(currentLimits[ResourceLimitsCPU]) >= 0 {
return true, fmt.Errorf("cpu limits exceeded")
}

var MemoryUsage = currentResource[ResourceLimitsCPU]
if MemoryUsage.Cmp(currentLimits[ResourceLimitsCPU]) >= 0 {
return true, fmt.Errorf("memory limits exceeded")
for resourceName, usage := range currentResource {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("resourceName %v resourceUsage %v\n", resourceName, usage)
suket22 marked this conversation as resolved.
Show resolved Hide resolved
if limit, ok := currentLimits[resourceName]; ok {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
fmt.Printf("limitName %v limitUsage %v\n", resourceName, limit)
if usage.Cmp(limit) >= 0 {
return fmt.Errorf("%v limits exceeded", resourceName)
suket22 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
return false, nil
return nil
}
1 change: 0 additions & 1 deletion pkg/apis/provisioning/v1alpha5/provisioner_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
// SetDefaults for the provisioner
func (p *Provisioner) SetDefaults(ctx context.Context) {
p.Spec.Constraints.Default(ctx)
p.Spec.Limits.Default(ctx)
}

// Default the constraints
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/provisioning/v1alpha5/provisioner_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *ProvisionerSpec) validate(ctx context.Context) (errs *apis.FieldError)
return errs.Also(
s.validateTTLSecondsUntilExpired(),
s.validateTTLSecondsAfterEmpty(),
s.validateResourceLimits(),
s.Constraints.Validate(ctx),
)
}
Expand All @@ -52,13 +53,21 @@ func (s *ProvisionerSpec) validateTTLSecondsUntilExpired() (errs *apis.FieldErro
}
return errs
}

func (s *ProvisionerSpec) validateTTLSecondsAfterEmpty() (errs *apis.FieldError) {
if ptr.Int64Value(s.TTLSecondsAfterEmpty) < 0 {
return errs.Also(apis.ErrInvalidValue("cannot be negative", "ttlSecondsAfterEmpty"))
}
return errs
}

func (s *ProvisionerSpec) validateResourceLimits() (errs *apis.FieldError) {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
if s.Limits.Resources == nil || len(s.Limits.Resources) == 0 {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
return errs.Also(apis.ErrInvalidValue("cannot be empty", "limits"))
}
return errs
}

// Validate the constraints
func (c *Constraints) Validate(ctx context.Context) (errs *apis.FieldError) {
return errs.Also(
Expand Down
13 changes: 4 additions & 9 deletions pkg/controllers/counter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@ package counter
import (
"context"
"fmt"
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/logging"
controllerruntime "sigs.k8s.io/controller-runtime"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -53,7 +51,6 @@ func NewController(ctx context.Context, kubeClient client.Client) *Controller {

// Reconcile a control loop for the resource
func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
logging.FromContext(ctx).Infof("Updating resource counts on provisioner")
// Retrieve the provisioner
suket22 marked this conversation as resolved.
Show resolved Hide resolved
provisioner := &v1alpha5.Provisioner{}
if err := c.kubeClient.Get(ctx, req.NamespacedName, provisioner); err != nil {
Expand All @@ -75,8 +72,7 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, fmt.Errorf("failed to persist changes to %s, %w", req.NamespacedName, err)
}

// Refresh the reconciler state values every 5 minutes irrespective of node events
return reconcile.Result{RequeueAfter: 5 * time.Minute}, nil
return reconcile.Result{}, nil
}

func (c *Controller) resourceCountsFor(ctx context.Context, provisionerName string) (v1.ResourceList, error) {
Expand All @@ -93,8 +89,8 @@ func (c *Controller) resourceCountsFor(ctx context.Context, provisionerName stri
memory.Add(*node.Status.Capacity.Memory())
}
return v1.ResourceList{
v1alpha5.ResourceLimitsCPU: *cpu,
v1alpha5.ResourceLimitsMemory: *memory,
v1.ResourceCPU: *cpu,
v1.ResourceMemory: *memory,
}, nil

}
Expand All @@ -116,8 +112,7 @@ func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
&source.Kind{Type: &v1.Node{}},
handler.EnqueueRequestsFromMapFunc(func(o client.Object) (requests []reconcile.Request) {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
if provisionerName, ok := o.GetLabels()[v1alpha5.ProvisionerNameLabelKey]; ok {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
requests = append(requests, reconcile.Request{NamespacedName: types.NamespacedName{Name: provisionerName}})
return requests
return []reconcile.Request{{NamespacedName: types.NamespacedName{Name: provisionerName}}}
}
return nil
}),
Expand Down
15 changes: 14 additions & 1 deletion pkg/controllers/provisioning/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/workqueue"
"knative.dev/pkg/logging"
Expand All @@ -49,9 +50,13 @@ func (l *Launcher) Launch(ctx context.Context, provisioner *v1alpha5.Provisioner

suket22 marked this conversation as resolved.
Show resolved Hide resolved
// Pack and bind pods
errs := make([]error, len(schedules))
provisioner, err := l.updateState(ctx, provisioner)
if err != nil {
return fmt.Errorf("unable to determine status of provisioner")
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure why I didn't need this in my earlier revisions. Maybe I was doing something else that was forcing the provisioner to get updated or I wasn't testing my scale ups fast enough, but without this the state in this provisioner object is really off. Since it's hitting a cache, I don't mind moving this into the for loop below either so we reduce the chance of staleness

workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(index int) {
for _, packing := range l.Packer.Pack(ctx, schedules[index], instanceTypes) {
if ok, err := provisioner.HasExceededResources(); ok {
if err := provisioner.HasExceededResources(); err != nil {
errs[index] = multierr.Append(errs[index], err)
suket22 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
Expand Down Expand Up @@ -116,6 +121,14 @@ func (l *Launcher) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err
return nil
}

func (l *Launcher) updateState(ctx context.Context, provisioner *v1alpha5.Provisioner) (*v1alpha5.Provisioner, error) {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
provisionerCopy := &v1alpha5.Provisioner{}
if err := l.KubeClient.Get(ctx, types.NamespacedName{Name: provisioner.Name, Namespace: provisioner.Namespace}, provisionerCopy); err != nil {
return nil, err
}
return provisionerCopy, nil
}

var bindTimeHistogram = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: metrics.Namespace,
Expand Down