Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds provisioner limits based on cpu and mem #817

Merged
merged 27 commits into from
Nov 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
632e737
Limit provisioner by cpu and memory
suket22 Nov 19, 2021
db8dbb1
Update pkg/apis/provisioning/v1alpha5/provisioner.go
suket22 Nov 19, 2021
5a7759a
Update pkg/controllers/provisioning/provisioner.go
suket22 Nov 19, 2021
5c5b3cf
Update pkg/controllers/provisioning/resourcecounter.go
suket22 Nov 19, 2021
e00778a
Separate controller for counting resources
suket22 Nov 22, 2021
4a6cd5e
Separate controller for counting resources
suket22 Nov 22, 2021
7633fc9
Etarn fixes
ellistarn Nov 23, 2021
9310ccd
Merge pull request #1 from ellistarn/limitsImpl
suket22 Nov 23, 2021
217942f
Some more fixes - don't default the status
suket22 Nov 23, 2021
2f291aa
Remove extra logging statement
suket22 Nov 23, 2021
9d67667
Fix defaults, fix binding errors
suket22 Nov 23, 2021
3285786
More minor fixes
suket22 Nov 23, 2021
4531d68
Remove extra patch from rbac
suket22 Nov 23, 2021
00a10d2
Addressing comments on the PR
suket22 Nov 23, 2021
2e20056
Compare provisionerSpecs before stopping existing Provisioners
suket22 Nov 23, 2021
7134769
Fix failing build!
suket22 Nov 23, 2021
64c1714
Merge branch 'main' into limitsImpl
suket22 Nov 23, 2021
555d18d
Don't reassign the provisioner in the launcher
suket22 Nov 24, 2021
f866907
Addressing more comments on the PR
suket22 Nov 24, 2021
74d6bd9
Adds basic unit test
suket22 Nov 24, 2021
afa1796
Update pkg/controllers/counter/controller.go
suket22 Nov 24, 2021
9974847
Update pkg/apis/provisioning/v1alpha5/provisioner.go
suket22 Nov 24, 2021
dfbc043
Update pkg/controllers/counter/controller.go
suket22 Nov 24, 2021
3036eea
More refactoring
suket22 Nov 24, 2021
ca36e42
Merge branch 'main' into limitsImpl
suket22 Nov 24, 2021
26c1831
Update pkg/controllers/provisioning/suite_test.go
suket22 Nov 24, 2021
2591f0a
Apply suggestions from code review
suket22 Nov 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions charts/karpenter/crds/karpenter.sh_provisioners.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,31 @@ spec:
description: Labels are layered with Requirements and applied to every
node.
type: object
limits:
description: Defines a set of bounds that Karpenter obeys when provisioning
capacity.
properties:
resources:
description: Resources contains all the allocatable resources
that Karpenter supports for limiting.
properties:
cpu:
anyOf:
- type: integer
- type: string
description: CPU, in cores. (500m = .5 cores)
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
memory:
anyOf:
- type: integer
- type: string
description: Memory, in bytes. (500Gi = 500GiB = 500 * 1024
* 1024 * 1024)
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
type: object
type: object
provider:
description: Provider contains fields specific to your cloudprovider.
type: object
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
go.uber.org/multierr v1.7.0
golang.org/x/time v0.0.0-20210611083556-38a9dc6acbc6
golang.org/x/tools v0.1.8-0.20211014194737-fc98fb2abd48 // indirect
gopkg.in/inf.v0 v0.9.1
k8s.io/api v0.20.7
k8s.io/apimachinery v0.20.7
k8s.io/client-go v0.20.7
Expand Down Expand Up @@ -85,7 +86,6 @@ require (
google.golang.org/grpc v1.38.0 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/alecthomas/kingpin.v2 v2.2.6 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
37 changes: 37 additions & 0 deletions pkg/apis/provisioning/v1alpha5/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha5

import (
"context"
"math"

"k8s.io/apimachinery/pkg/api/resource"
)

// Limits define bounds on the resources being provisioned by Karpenter
type Limits struct {
// Resources contains all the allocatable resources that Karpenter supports for limiting.
Resources Resources `json:"resources,omitempty"`
}

func (l *Limits) Default(ctx context.Context) {
if l.Resources.CPU == nil {
l.Resources.CPU = resource.NewScaledQuantity(math.MaxInt64, 0)
suket22 marked this conversation as resolved.
Show resolved Hide resolved
}
if l.Resources.Memory == nil {
l.Resources.Memory = resource.NewScaledQuantity(math.MaxInt64, 0)
}
}
2 changes: 2 additions & 0 deletions pkg/apis/provisioning/v1alpha5/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type ProvisionerSpec struct {
// Termination due to expiration is disabled if this field is not set.
// +optional
TTLSecondsUntilExpired *int64 `json:"ttlSecondsUntilExpired,omitempty"`
// Defines a set of bounds that Karpenter obeys when provisioning capacity.
suket22 marked this conversation as resolved.
Show resolved Hide resolved
Limits `json:"limits,omitempty"`
suket22 marked this conversation as resolved.
Show resolved Hide resolved
}

// Provisioner is the Schema for the Provisioners API
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/provisioning/v1alpha5/provisioner_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// SetDefaults for the provisioner
func (p *Provisioner) SetDefaults(ctx context.Context) {
p.Spec.Constraints.Default(ctx)
p.Spec.Limits.Default(ctx)
}

// Default the constraints
Expand Down
28 changes: 28 additions & 0 deletions pkg/apis/provisioning/v1alpha5/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha5

import (
"k8s.io/apimachinery/pkg/api/resource"
)

// Resources contains a list of all the allocatable resources that can be used to define a bound on penter's
// scaling actions.
type Resources struct {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
// CPU, in cores. (500m = .5 cores)
CPU *resource.Quantity `json:"cpu,omitempty"`
// Memory, in bytes. (500Gi = 500GiB = 500 * 1024 * 1024 * 1024)
Memory *resource.Quantity `json:"memory,omitempty"`
}
42 changes: 42 additions & 0 deletions pkg/apis/provisioning/v1alpha5/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/cloudprovider/aws/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ func (p *InstanceProvider) instanceToNode(instance *ec2.Instance, instanceTypes
v1.ResourceCPU: *instanceType.CPU(),
v1.ResourceMemory: *instanceType.Memory(),
},
Capacity: v1.ResourceList{
v1.ResourcePods: *instanceType.Pods(),
v1.ResourceCPU: *instanceType.CPU(),
v1.ResourceMemory: *instanceType.Memory(),
},
NodeInfo: v1.NodeSystemInfo{
Architecture: v1alpha1.AWSToKubeArchitectures[aws.StringValue(instance.Architecture)],
OSImage: aws.StringValue(instance.ImageId),
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewController(ctx context.Context, kubeClient client.Client, coreV1Client c
kubeClient: kubeClient,
cloudProvider: cloudProvider,
scheduler: scheduling.NewScheduler(kubeClient, cloudProvider),
launcher: &Launcher{KubeClient: kubeClient, CoreV1Client: coreV1Client, CloudProvider: cloudProvider, Packer: &binpacking.Packer{}},
launcher: &Launcher{KubeClient: kubeClient, CoreV1Client: coreV1Client, CloudProvider: cloudProvider, Packer: &binpacking.Packer{}, ResourceCounter: NewResourceCounter(kubeClient)},
}
}

Expand Down
22 changes: 17 additions & 5 deletions pkg/controllers/provisioning/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,36 @@ import (
)

type Launcher struct {
Packer *binpacking.Packer
KubeClient client.Client
CoreV1Client corev1.CoreV1Interface
CloudProvider cloudprovider.CloudProvider
Packer *binpacking.Packer
KubeClient client.Client
CoreV1Client corev1.CoreV1Interface
CloudProvider cloudprovider.CloudProvider
ResourceCounter *ResourceCounter
}

func (l *Launcher) Launch(ctx context.Context, schedules []*scheduling.Schedule, instanceTypes []cloudprovider.InstanceType) error {
func (l *Launcher) Launch(ctx context.Context, schedules []*scheduling.Schedule,
instanceTypes []cloudprovider.InstanceType, provisioner v1alpha5.Provisioner) error {

suket22 marked this conversation as resolved.
Show resolved Hide resolved
// Pack and bind pods
errs := make([]error, len(schedules))
remainingResources, err := l.ResourceCounter.remainingResourceCounts(ctx, provisioner.Spec.Limits, provisioner.Name)
suket22 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("cannot determine remaining resources, not launching instances: %w", err)
}
workqueue.ParallelizeUntil(ctx, len(schedules), len(schedules), func(index int) {
for _, packing := range l.Packer.Pack(ctx, schedules[index], instanceTypes) {
if remainingResources.isInsufficient(ctx) {
errs[index] = multierr.Append(errs[index], fmt.Errorf("provisioner limits exceeded"))
suket22 marked this conversation as resolved.
Show resolved Hide resolved
return
suket22 marked this conversation as resolved.
Show resolved Hide resolved
}
// Create thread safe channel to pop off packed pod slices
packedPods := make(chan []*v1.Pod, len(packing.Pods))
for _, pods := range packing.Pods {
packedPods <- pods
}
close(packedPods)
if err := <-l.CloudProvider.Create(ctx, packing.Constraints, packing.InstanceTypeOptions, packing.NodeQuantity, func(node *v1.Node) error {
remainingResources.updateCountsFor(node)
node.Labels = functional.UnionStringMaps(node.Labels, packing.Constraints.Labels)
node.Spec.Taints = append(node.Spec.Taints, packing.Constraints.Taints...)
return l.bind(ctx, node, <-packedPods)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (p *Provisioner) provision(ctx context.Context) (err error) {
return fmt.Errorf("getting instance types")
}
// Launch capacity and bind pods
if err := p.launcher.Launch(ctx, schedules, instanceTypes); err != nil {
if err := p.launcher.Launch(ctx, schedules, instanceTypes, *p.Provisioner); err != nil {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("launching capacity, %w", err)
}
return nil
Expand Down
85 changes: 85 additions & 0 deletions pkg/controllers/provisioning/resourcecounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provisioning

import (
"context"

"github.com/awslabs/karpenter/pkg/apis/provisioning/v1alpha5"
"gopkg.in/inf.v0"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"knative.dev/pkg/logging"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type ResourceCount struct {
CPU *inf.Dec
Memory *inf.Dec
}

type ResourceCounter struct {
suket22 marked this conversation as resolved.
Show resolved Hide resolved
KubeClient client.Client
}

// Constructs a new instance of the resource counter
func NewResourceCounter(kubeClient client.Client) *ResourceCounter {
return &ResourceCounter{
KubeClient: kubeClient,
}
}

func (r *ResourceCounter) remainingResourceCounts(ctx context.Context, limits v1alpha5.Limits, provisionerName string) (*ResourceCount, error) {
// We recalculate remaining resources each provisioning loop, because the terminate controller could've freed up some capacity since
// we last provisioned worker nodes.
nodeList := v1.NodeList{}
withProvisionerName := client.MatchingLabels{v1alpha5.ProvisionerNameLabelKey: provisionerName}
if err := r.KubeClient.List(ctx, &nodeList, withProvisionerName); err != nil {
return nil, err
}
resourceCount := ResourceCount{
CPU: resource.Zero.AsDec(),
Memory: resource.Zero.AsDec(),
}
for _, node := range nodeList.Items {
resourceCount.CPU.Add(resourceCount.CPU, node.Status.Capacity.Cpu().AsDec())
resourceCount.Memory.Add(resourceCount.Memory, node.Status.Capacity.Memory().AsDec())
}
suket22 marked this conversation as resolved.
Show resolved Hide resolved

limits = *limits.DeepCopy()
remainingResources := &ResourceCount{
CPU: limits.Resources.CPU.AsDec().Sub(limits.Resources.CPU.AsDec(), resourceCount.CPU),
Memory: limits.Resources.Memory.AsDec().Sub(limits.Resources.Memory.AsDec(), resourceCount.Memory),
}
return remainingResources, nil
}

// Reduces the resource counts based on how many resources a node uses.
func (r *ResourceCount) updateCountsFor(node *v1.Node) *ResourceCount {
r.CPU.Sub(r.CPU, node.Status.Capacity.Cpu().AsDec())
r.Memory.Sub(r.Memory, node.Status.Capacity.Memory().AsDec())
return r
}

func (r *ResourceCount) isInsufficient(ctx context.Context) bool {
if r.CPU.Cmp(resource.Zero.AsDec()) <= 0 {
logging.FromContext(ctx).Errorf("cpu limits breached")
return true
} else if r.Memory.Cmp(resource.Zero.AsDec()) <= 0 {
logging.FromContext(ctx).Errorf("memory limits breached")
return true
}
return false
}
1 change: 1 addition & 0 deletions pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ var _ = BeforeEach(func() {
ObjectMeta: metav1.ObjectMeta{Name: v1alpha5.DefaultProvisioner.Name},
Spec: v1alpha5.ProvisionerSpec{},
}
provisioner.SetDefaults(ctx)
suket22 marked this conversation as resolved.
Show resolved Hide resolved
})

var _ = AfterEach(func() {
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ var _ = Describe("Provisioning", func() {
},
Spec: v1alpha5.ProvisionerSpec{},
}
provisioner.SetDefaults(ctx)
})

AfterEach(func() {
Expand Down