Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds provisioner limits based on cpu and mem #817

Merged
merged 27 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
632e737
Limit provisioner by cpu and memory
suket22 Nov 19, 2021
db8dbb1
Update pkg/apis/provisioning/v1alpha5/provisioner.go
suket22 Nov 19, 2021
5a7759a
Update pkg/controllers/provisioning/provisioner.go
suket22 Nov 19, 2021
5c5b3cf
Update pkg/controllers/provisioning/resourcecounter.go
suket22 Nov 19, 2021
e00778a
Separate controller for counting resources
suket22 Nov 22, 2021
4a6cd5e
Separate controller for counting resources
suket22 Nov 22, 2021
7633fc9
Etarn fixes
ellistarn Nov 23, 2021
9310ccd
Merge pull request #1 from ellistarn/limitsImpl
suket22 Nov 23, 2021
217942f
Some more fixes - don't default the status
suket22 Nov 23, 2021
2f291aa
Remove extra logging statement
suket22 Nov 23, 2021
9d67667
Fix defaults, fix binding errors
suket22 Nov 23, 2021
3285786
More minor fixes
suket22 Nov 23, 2021
4531d68
Remove extra patch from rbac
suket22 Nov 23, 2021
00a10d2
Addressing comments on the PR
suket22 Nov 23, 2021
2e20056
Compare provisionerSpecs before stopping existing Provisioners
suket22 Nov 23, 2021
7134769
Fix failing build!
suket22 Nov 23, 2021
64c1714
Merge branch 'main' into limitsImpl
suket22 Nov 23, 2021
555d18d
Don't reassign the provisioner in the launcher
suket22 Nov 24, 2021
f866907
Addressing more comments on the PR
suket22 Nov 24, 2021
74d6bd9
Adds basic unit test
suket22 Nov 24, 2021
afa1796
Update pkg/controllers/counter/controller.go
suket22 Nov 24, 2021
9974847
Update pkg/apis/provisioning/v1alpha5/provisioner.go
suket22 Nov 24, 2021
dfbc043
Update pkg/controllers/counter/controller.go
suket22 Nov 24, 2021
3036eea
More refactoring
suket22 Nov 24, 2021
ca36e42
Merge branch 'main' into limitsImpl
suket22 Nov 24, 2021
26c1831
Update pkg/controllers/provisioning/suite_test.go
suket22 Nov 24, 2021
2591f0a
Apply suggestions from code review
suket22 Nov 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Don't reassign the provisioner in the launcher
  • Loading branch information
suket22 committed Nov 24, 2021
commit 555d18de96ab07c6cbc3a684fc33d79058d2b041
13 changes: 0 additions & 13 deletions pkg/apis/provisioning/v1alpha5/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ limitations under the License.
package v1alpha5

import (
"fmt"

v1 "k8s.io/api/core/v1"
)

Expand All @@ -25,14 +23,3 @@ type Limits struct {
// Resources contains all the allocatable resources that Karpenter supports for limiting.
Resources v1.ResourceList `json:"resources,omitempty"`
}

func (p *Provisioner) HasExceededResources() error {
for resourceName, usage := range p.Status.Resources {
if limit, ok := p.Spec.Limits.Resources[resourceName]; ok {
if usage.Cmp(limit) >= 0 {
return fmt.Errorf("resource limit %s exceeded", resourceName)
}
}
}
return nil
}
6 changes: 0 additions & 6 deletions pkg/controllers/counter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,15 @@ func (c *Controller) Reconcile(ctx context.Context, req reconcile.Request) (reco
return reconcile.Result{}, nil
}
persisted := provisioner.DeepCopy()

// Determine resource usage and update provisioner.status.resources
resourceCounts, err := c.resourceCountsFor(ctx, provisioner.Name)
if err != nil {
return reconcile.Result{}, err
}

provisioner.Status.Resources = resourceCounts
if err := c.kubeClient.Status().Patch(ctx, provisioner, client.MergeFrom(persisted)); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to persist changes to %s, %w", req.NamespacedName, err)
}

return reconcile.Result{}, nil
}

Expand All @@ -78,10 +75,8 @@ func (c *Controller) resourceCountsFor(ctx context.Context, provisionerName stri
if err := c.kubeClient.List(ctx, &nodes, client.MatchingLabels{v1alpha5.ProvisionerNameLabelKey: provisionerName}); err != nil {
return nil, err
}

var cpu = resource.NewScaledQuantity(0, 0)
var memory = resource.NewScaledQuantity(0, resource.Giga)

for _, node := range nodes.Items {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
cpu.Add(*node.Status.Capacity.Cpu())
memory.Add(*node.Status.Capacity.Memory())
Expand All @@ -90,7 +85,6 @@ func (c *Controller) resourceCountsFor(ctx context.Context, provisionerName stri
v1.ResourceCPU: *cpu,
v1.ResourceMemory: *memory,
}, nil

}

// Register the controller to the manager
Expand Down
21 changes: 16 additions & 5 deletions pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione
provisioner.Spec.Requirements = provisioner.Spec.Requirements.
With(requirements(instanceTypes)).
With(v1alpha5.LabelRequirements(provisioner.Spec.Labels))
if currentProvisioner, ok := c.provisioners.Load(provisioner.Name); ok && c.areEqual(currentProvisioner.(*Provisioner).Spec, provisioner.Spec) {
if !c.hasChanged(ctx, provisioner) {
// If the provisionerSpecs haven't changed, we don't need to stop and drain the current Provisioner.
return nil
}
Expand All @@ -122,10 +122,21 @@ func (c *Controller) Apply(ctx context.Context, provisioner *v1alpha5.Provisione
return nil
}

func (c *Controller) areEqual(provisionerSpecOld v1alpha5.ProvisionerSpec, provisionerSpecNew v1alpha5.ProvisionerSpec) bool {
hashKeyOld, _ := hashstructure.Hash(provisionerSpecOld, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
hashKeyNew, _ := hashstructure.Hash(provisionerSpecNew, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
return hashKeyOld == hashKeyNew
// Returns true if the new candidate provisioner is different than the provisioner in memory.
func (c *Controller) hasChanged(ctx context.Context, provisionerNew *v1alpha5.Provisioner) bool {
oldProvisioner, _ := c.provisioners.Load(provisionerNew.Name)
if oldProvisioner == nil {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
return true
}
hashKeyOld, err := hashstructure.Hash(oldProvisioner.(*Provisioner).Spec, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
if err != nil {
logging.FromContext(ctx).Fatalf("Unable to hash old provisioner spec: %s", err.Error())
}
hashKeyNew, err := hashstructure.Hash(provisionerNew.Spec, hashstructure.FormatV2, &hashstructure.HashOptions{SlicesAsSets: true})
if err != nil {
logging.FromContext(ctx).Fatalf("Unable to hash new provisioner spec: %s", err.Error())
}
return hashKeyOld != hashKeyNew
}

// List the active provisioners
Expand Down
23 changes: 13 additions & 10 deletions pkg/controllers/provisioning/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,9 @@ func (l *Launcher) Launch(ctx context.Context, provisioner *v1alpha5.Provisioner

suket22 marked this conversation as resolved.
Show resolved Hide resolved
// Pack and bind pods
errs := make([]error, len(schedules))
provisioner, err := l.updateState(ctx, provisioner)
if err != nil {
return fmt.Errorf("unable to determine status of provisioner")
}
workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(index int) {
for _, packing := range l.Packer.Pack(ctx, schedules[index], instanceTypes) {
if err := provisioner.HasExceededResources(); err != nil {
if err := l.verifyResourceLimits(ctx, provisioner); err != nil {
errs[index] = multierr.Append(errs[index], err)
suket22 marked this conversation as resolved.
Show resolved Hide resolved
continue
}
Expand Down Expand Up @@ -122,12 +118,19 @@ func (l *Launcher) bind(ctx context.Context, node *v1.Node, pods []*v1.Pod) (err
return nil
}

func (l *Launcher) updateState(ctx context.Context, provisioner *v1alpha5.Provisioner) (*v1alpha5.Provisioner, error) {
provisionerCopy := &v1alpha5.Provisioner{}
if err := l.KubeClient.Get(ctx, types.NamespacedName{Name: provisioner.Name, Namespace: provisioner.Namespace}, provisionerCopy); err != nil {
return nil, err
func (l *Launcher) verifyResourceLimits(ctx context.Context, provisioner *v1alpha5.Provisioner) error {
provisionerLatest := &v1alpha5.Provisioner{}
if err := l.KubeClient.Get(ctx, types.NamespacedName{Name: provisioner.Name, Namespace: provisioner.Namespace}, provisionerLatest); err != nil {
return err
suket22 marked this conversation as resolved.
Show resolved Hide resolved
}
return provisionerCopy, nil
for resourceName, usage := range provisionerLatest.Status.Resources {
if limit, ok := provisionerLatest.Spec.Limits.Resources[resourceName]; ok {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
if usage.Cmp(limit) >= 0 {
return fmt.Errorf("%s resource usage of %v exceeds limit of %v", resourceName, usage.AsDec(), limit.AsDec())
}
}
}
return nil
}

var bindTimeHistogram = prometheus.NewHistogramVec(
Expand Down