Skip to content

Commit

Permalink
Apply code reivew remarks:
Browse files Browse the repository at this point in the history
* Rename scaleup.Manager to scaleup.Orchestrator
* Remove factory and add Initialize function
* Rename the wrpapper package to orchestrator
* Rename NewOrchestrator func to just New
  • Loading branch information
kisieland committed Mar 20, 2023
1 parent 9cd44fd commit 5b6c50e
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 88 deletions.
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type AutoscalerOptions struct {
Backoff backoff.Backoff
DebuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
RemainingPdbTracker pdb.RemainingPdbTracker
ScaleUpManagerFactory scaleup.ManagerFactory
ScaleUpOrchestrator scaleup.Orchestrator
}

// Autoscaler is the main component of CA which scales up/down node groups according to its configuration
Expand Down Expand Up @@ -85,7 +85,7 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError)
opts.Backoff,
opts.DebuggingSnapshotter,
opts.RemainingPdbTracker,
opts.ScaleUpManagerFactory), nil
opts.ScaleUpOrchestrator), nil
}

// Initialize default options if not provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package wrapper
package orchestrator

import (
"strings"
Expand Down Expand Up @@ -43,50 +43,51 @@ import (
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
)

// ScaleUpManagerFactory implements scaleup.ManagerFactory interface.
type ScaleUpManagerFactory struct {
// ScaleUpOrchestrator implements scaleup.Orchestrator interface.
type ScaleUpOrchestrator struct {
autoscalingContext *context.AutoscalingContext
processors *ca_processors.AutoscalingProcessors
resourceManager *resource.Manager
clusterStateRegistry *clusterstate.ClusterStateRegistry
ignoredTaints taints.TaintKeySet
initialized bool
}

// NewManagerFactory returns new instance of scale up manager factory.
func NewManagerFactory() *ScaleUpManagerFactory {
return &ScaleUpManagerFactory{}
// New returns new instance of scale up wrapper.
func New() scaleup.Orchestrator {
return &ScaleUpOrchestrator{
initialized: false,
}
}

// NewManager returns new instance of scale up wrapper.
func (f *ScaleUpManagerFactory) NewManager(
// Initialize initializes the orchestrator object with required fields.
func (w *ScaleUpOrchestrator) Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
ignoredTaints taints.TaintKeySet,
) scaleup.Manager {
resourceManager := resource.NewManager(processors.CustomResourcesProcessor)
return &ScaleUpManager{
autoscalingContext: autoscalingContext,
processors: processors,
resourceManager: resourceManager,
clusterStateRegistry: clusterStateRegistry,
ignoredTaints: ignoredTaints,
}
}

// ScaleUpManager implements scaleup.Manager interface.
type ScaleUpManager struct {
autoscalingContext *context.AutoscalingContext
processors *ca_processors.AutoscalingProcessors
resourceManager *resource.Manager
clusterStateRegistry *clusterstate.ClusterStateRegistry
ignoredTaints taints.TaintKeySet
) {
w.autoscalingContext = autoscalingContext
w.processors = processors
w.clusterStateRegistry = clusterStateRegistry
w.ignoredTaints = ignoredTaints
w.resourceManager = resource.NewManager(processors.CustomResourcesProcessor)
w.initialized = true
}

// ScaleUp tries to scale the cluster up. Returns appropriate status or error if
// an unexpected error occurred. Assumes that all nodes in the cluster are ready
// and in sync with instance groups.
func (w *ScaleUpManager) ScaleUp(
func (w *ScaleUpOrchestrator) ScaleUp(
unschedulablePods []*apiv1.Pod,
nodes []*apiv1.Node,
daemonSets []*appsv1.DaemonSet,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !w.initialized {
scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
}

// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand Down Expand Up @@ -344,10 +345,14 @@ func (w *ScaleUpManager) ScaleUp(
// than the configured min size. The source of truth for the current node group
// size is the TargetSize queried directly from cloud providers. Returns
// appropriate status or error if an unexpected error occurred.
func (w *ScaleUpManager) ScaleUpToNodeGroupMinSize(
func (w *ScaleUpOrchestrator) ScaleUpToNodeGroupMinSize(
nodes []*apiv1.Node,
nodeInfos map[string]*schedulerframework.NodeInfo,
) (*status.ScaleUpStatus, errors.AutoscalerError) {
if !w.initialized {
scaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "ScaleUpOrchestrator is not initialized"))
}

now := time.Now()
nodeGroups := w.autoscalingContext.CloudProvider.NodeGroups()
scaleUpInfos := make([]nodegroupset.ScaleUpInfo, 0)
Expand Down Expand Up @@ -436,7 +441,7 @@ func (w *ScaleUpManager) ScaleUpToNodeGroupMinSize(
}

// ComputeExpansionOption computes expansion option based on pending pods and cluster state.
func (w *ScaleUpManager) ComputeExpansionOption(
func (w *ScaleUpOrchestrator) ComputeExpansionOption(
podEquivalenceGroups []*equivalence.PodGroup,
nodeGroup cloudprovider.NodeGroup,
nodeInfo *schedulerframework.NodeInfo,
Expand Down Expand Up @@ -487,7 +492,7 @@ func (w *ScaleUpManager) ComputeExpansionOption(
}

// IsNodeGroupReadyToScaleUp returns nil if node group is ready to be scaled up, otherwise a reason is provided.
func (w *ScaleUpManager) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeGroup, now time.Time) *SkippedReasons {
func (w *ScaleUpOrchestrator) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeGroup, now time.Time) *SkippedReasons {
// Autoprovisioned node groups without nodes are created later so skip check for them.
if nodeGroup.Exist() && !w.clusterStateRegistry.IsNodeGroupSafeToScaleUp(nodeGroup, now) {
// Hack that depends on internals of IsNodeGroupSafeToScaleUp.
Expand All @@ -502,7 +507,7 @@ func (w *ScaleUpManager) IsNodeGroupReadyToScaleUp(nodeGroup cloudprovider.NodeG
}

// IsNodeGroupResourceExceeded returns nil if node group resource limits are not exceeded, otherwise a reason is provided.
func (w *ScaleUpManager) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) *SkippedReasons {
func (w *ScaleUpOrchestrator) IsNodeGroupResourceExceeded(resourcesLeft resource.Limits, nodeGroup cloudprovider.NodeGroup, nodeInfo *schedulerframework.NodeInfo) *SkippedReasons {
resourcesDelta, err := w.resourceManager.DeltaForNode(w.autoscalingContext, nodeInfo, nodeGroup)
if err != nil {
klog.Errorf("Skipping node group %s; error getting node group resources: %v", nodeGroup.Id(), err)
Expand All @@ -528,7 +533,7 @@ func (w *ScaleUpManager) IsNodeGroupResourceExceeded(resourcesLeft resource.Limi
}

// GetCappedNewNodeCount caps resize according to cluster wide node count limit.
func (w *ScaleUpManager) GetCappedNewNodeCount(newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) {
func (w *ScaleUpOrchestrator) GetCappedNewNodeCount(newNodeCount, currentNodeCount int) (int, errors.AutoscalerError) {
if w.autoscalingContext.MaxNodesTotal > 0 && newNodeCount+currentNodeCount > w.autoscalingContext.MaxNodesTotal {
klog.V(1).Infof("Capping size to max cluster total size (%d)", w.autoscalingContext.MaxNodesTotal)
newNodeCount = w.autoscalingContext.MaxNodesTotal - currentNodeCount
Expand All @@ -542,7 +547,7 @@ func (w *ScaleUpManager) GetCappedNewNodeCount(newNodeCount, currentNodeCount in

// ExecuteScaleUps executes the scale ups, based on the provided scale up infos.
// In case of issues returns an error and a scale up info which failed to execute.
func (w *ScaleUpManager) ExecuteScaleUps(
func (w *ScaleUpOrchestrator) ExecuteScaleUps(
scaleUpInfos []nodegroupset.ScaleUpInfo,
nodeInfos map[string]*schedulerframework.NodeInfo,
now time.Time,
Expand All @@ -563,7 +568,7 @@ func (w *ScaleUpManager) ExecuteScaleUps(
return nil, nil
}

func (w *ScaleUpManager) executeScaleUp(
func (w *ScaleUpOrchestrator) executeScaleUp(
info nodegroupset.ScaleUpInfo,
gpuResourceName, gpuType string,
now time.Time,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package wrapper
package orchestrator

import (
"fmt"
Expand Down Expand Up @@ -544,9 +544,9 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleTestConfig) *ScaleTestResul
}

processors := NewTestProcessors(&context)
suManagerFactory := NewManagerFactory()
scaleUpWrapper := suManagerFactory.NewManager(&context, processors, clusterState, nil)
scaleUpStatus, err := scaleUpWrapper.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, nil)
scaleUpStatus, err := suOrchestrator.ScaleUp(extraPods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -701,9 +701,9 @@ func TestScaleUpUnhealthy(t *testing.T) {
p3 := BuildTestPod("p-new", 550, 0)

processors := NewTestProcessors(&context)
suManagerFactory := NewManagerFactory()
scaleUpWrapper := suManagerFactory.NewManager(&context, processors, clusterState, nil)
scaleUpStatus, err := scaleUpWrapper.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, nil)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)

assert.NoError(t, err)
// Node group is unhealthy.
Expand Down Expand Up @@ -744,9 +744,9 @@ func TestScaleUpNoHelp(t *testing.T) {
p3 := BuildTestPod("p-new", 500, 0)

processors := NewTestProcessors(&context)
suManagerFactory := NewManagerFactory()
scaleUpWrapper := suManagerFactory.NewManager(&context, processors, clusterState, nil)
scaleUpStatus, err := scaleUpWrapper.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, nil)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
processors.ScaleUpStatusProcessor.Process(&context, scaleUpStatus)

assert.NoError(t, err)
Expand Down Expand Up @@ -817,9 +817,9 @@ func TestScaleUpBalanceGroups(t *testing.T) {
}

processors := NewTestProcessors(&context)
suManagerFactory := NewManagerFactory()
scaleUpWrapper := suManagerFactory.NewManager(&context, processors, clusterState, nil)
scaleUpStatus, typedErr := scaleUpWrapper.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, nil)
scaleUpStatus, typedErr := suOrchestrator.ScaleUp(pods, nodes, []*appsv1.DaemonSet{}, nodeInfos)

assert.NoError(t, typedErr)
assert.True(t, scaleUpStatus.WasSuccessful())
Expand Down Expand Up @@ -879,9 +879,9 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) {
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

suManagerFactory := NewManagerFactory()
scaleUpWrapper := suManagerFactory.NewManager(&context, processors, clusterState, nil)
scaleUpStatus, err := scaleUpWrapper.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, nil)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -934,9 +934,9 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) {
nodes := []*apiv1.Node{}
nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, nil, time.Now())

suManagerFactory := NewManagerFactory()
scaleUpWrapper := suManagerFactory.NewManager(&context, processors, clusterState, nil)
scaleUpStatus, err := scaleUpWrapper.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, nil)
scaleUpStatus, err := suOrchestrator.ScaleUp([]*apiv1.Pod{p1, p2, p3}, nodes, []*appsv1.DaemonSet{}, nodeInfos)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, "autoprovisioned-T1", utils.GetStringFromChan(createdGroups))
Expand Down Expand Up @@ -989,9 +989,9 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) {
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
clusterState.UpdateNodes(nodes, nodeInfos, time.Now())

suManagerFactory := NewManagerFactory()
scaleUpWrapper := suManagerFactory.NewManager(&context, processors, clusterState, nil)
scaleUpStatus, err := scaleUpWrapper.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterState, nil)
scaleUpStatus, err := suOrchestrator.ScaleUpToNodeGroupMinSize(nodes, nodeInfos)
assert.NoError(t, err)
assert.True(t, scaleUpStatus.WasSuccessful())
assert.Equal(t, 1, len(scaleUpStatus.ScaleUpInfos))
Expand Down Expand Up @@ -1060,9 +1060,9 @@ func TestAuthError(t *testing.T) {

processors := NewTestProcessors(&context)
clusterStateRegistry := clusterstate.NewClusterStateRegistry(nil, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff())
suManagerFactory := NewManagerFactory()
suManager := suManagerFactory.NewManager(&context, processors, clusterStateRegistry, nil)
scaleUpWrapper := suManager.(*ScaleUpManager)
suOrchestrator := New()
suOrchestrator.Initialize(&context, processors, clusterStateRegistry, nil)
scaleUpWrapper := suOrchestrator.(*ScaleUpOrchestrator)
aerr := scaleUpWrapper.executeScaleUp(info, "", "", time.Now())
assert.Error(t, aerr)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package wrapper
package orchestrator

import (
"fmt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package wrapper
package orchestrator

import (
"reflect"
Expand Down
13 changes: 10 additions & 3 deletions cluster-autoscaler/core/scaleup/scaleup.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,19 @@ type ManagerFactory interface {
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
ignoredTaints taints.TaintKeySet,
) Manager
) Orchestrator
}

// Manager is a component that picks the node group to resize and triggers
// Orchestrator is a component that picks the node group to resize and triggers
// creation of needed instances.
type Manager interface {
type Orchestrator interface {
// Initialize initializes the orchestrator object with required fields.
Initialize(
autoscalingContext *context.AutoscalingContext,
processors *ca_processors.AutoscalingProcessors,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
ignoredTaints taints.TaintKeySet,
)
// ScaleUp tries to scale the cluster up. Returns appropriate status or error if
// an unexpected error occurred. Assumes that all nodes in the cluster are ready
// and in sync with instance groups.
Expand Down
18 changes: 9 additions & 9 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner"
scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
scaleup_wrapper "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/wrapper"
orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
Expand Down Expand Up @@ -86,7 +86,7 @@ type StaticAutoscaler struct {
lastScaleDownFailTime time.Time
scaleDownPlanner scaledown.Planner
scaleDownActuator scaledown.Actuator
scaleUpManager scaleup.Manager
scaleUpOrchestrator scaleup.Orchestrator
processors *ca_processors.AutoscalingProcessors
processorCallbacks *staticAutoscalerProcessorCallbacks
initialized bool
Expand Down Expand Up @@ -140,7 +140,7 @@ func NewStaticAutoscaler(
backoff backoff.Backoff,
debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter,
remainingPdbTracker pdb.RemainingPdbTracker,
scaleUpManagerFactory scaleup.ManagerFactory) *StaticAutoscaler {
scaleUpOrchestrator scaleup.Orchestrator) *StaticAutoscaler {

processorCallbacks := newStaticAutoscalerProcessorCallbacks()
autoscalingContext := context.NewAutoscalingContext(
Expand Down Expand Up @@ -196,10 +196,10 @@ func NewStaticAutoscaler(
}
processorCallbacks.scaleDownPlanner = scaleDownPlanner

if scaleUpManagerFactory == nil {
scaleUpManagerFactory = scaleup_wrapper.NewManagerFactory()
if scaleUpOrchestrator == nil {
scaleUpOrchestrator = orchestrator.New()
}
scaleUpManager := scaleUpManagerFactory.NewManager(autoscalingContext, processors, clusterStateRegistry, ignoredTaints)
scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, ignoredTaints)

// Set the initial scale times to be less than the start time so as to
// not start in cooldown mode.
Expand All @@ -211,7 +211,7 @@ func NewStaticAutoscaler(
lastScaleDownFailTime: initialScaleTime,
scaleDownPlanner: scaleDownPlanner,
scaleDownActuator: scaleDownActuator,
scaleUpManager: scaleUpManager,
scaleUpOrchestrator: scaleUpOrchestrator,
processors: processors,
processorCallbacks: processorCallbacks,
clusterStateRegistry: clusterStateRegistry,
Expand Down Expand Up @@ -573,7 +573,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
klog.V(1).Info("Unschedulable pods are very new, waiting one iteration for more")
} else {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = a.scaleUpManager.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups)
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUp(unschedulablePodsToHelp, readyNodes, daemonsets, nodeInfosForGroups)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
Expand Down Expand Up @@ -692,7 +692,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr

if a.EnforceNodeGroupMinSize {
scaleUpStart := preScaleUp()
scaleUpStatus, typedErr = a.scaleUpManager.ScaleUpToNodeGroupMinSize(readyNodes, nodeInfosForGroups)
scaleUpStatus, typedErr = a.scaleUpOrchestrator.ScaleUpToNodeGroupMinSize(readyNodes, nodeInfosForGroups)
if exit, err := postScaleUp(scaleUpStart); exit {
return err
}
Expand Down
Loading

0 comments on commit 5b6c50e

Please sign in to comment.