From 90280b34abb98fb44b208a55f43f29031971ee53 Mon Sep 17 00:00:00 2001 From: Deep Mistry Date: Tue, 16 Jun 2026 10:02:47 -0400 Subject: [PATCH] pod-scaler: add cpu/memory authoritative max reduction flags --- cmd/pod-scaler/admission.go | 77 +++++++++++++++++--------------- cmd/pod-scaler/admission_test.go | 39 +++++++++++++--- cmd/pod-scaler/main.go | 38 ++++++++++------ 3 files changed, 97 insertions(+), 57 deletions(-) diff --git a/cmd/pod-scaler/admission.go b/cmd/pod-scaler/admission.go index 916b534fb59..0a3eda85f3a 100644 --- a/cmd/pod-scaler/admission.go +++ b/cmd/pod-scaler/admission.go @@ -37,7 +37,7 @@ import ( "github.com/openshift/ci-tools/pkg/steps" ) -func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, kubeClient kubernetes.Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, percentageMeasured float64, measuredPodCPUIncrease float64, systemReservedCPU int64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, reporter results.PodScalerReporter) { +func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, kubeClient kubernetes.Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, percentageMeasured float64, measuredPodCPUIncrease float64, systemReservedCPU int64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent float64, reporter results.PodScalerReporter) { logger := logrus.WithField("component", "pod-scaler admission") logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders)) if authoritativeCPUDryRun || authoritativeMemoryDryRun { @@ -57,7 +57,7 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int Port: port, CertDir: certDir, }) - server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, percentageMeasured: percentageMeasured, measuredPodCPUIncrease: measuredPodCPUIncrease, nodeCache: nodeCache, authoritativeCPU: authoritativeCPU, authoritativeMemory: authoritativeMemory, authoritativeCPUDryRun: authoritativeCPUDryRun, authoritativeMemoryDryRun: authoritativeMemoryDryRun, reporter: reporter}}) + server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, percentageMeasured: percentageMeasured, measuredPodCPUIncrease: measuredPodCPUIncrease, nodeCache: nodeCache, authoritativeCPU: authoritativeCPU, authoritativeMemory: authoritativeMemory, authoritativeCPUDryRun: authoritativeCPUDryRun, authoritativeMemoryDryRun: authoritativeMemoryDryRun, authoritativeCPUMaxReductionPercent: authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent: authoritativeMemoryMaxReductionPercent, reporter: reporter}}) logger.Info("Serving admission webhooks.") if err := server.Start(interrupts.Context()); err != nil { logrus.WithError(err).Fatal("Failed to serve webhooks.") @@ -65,22 +65,24 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int } type podMutator struct { - logger *logrus.Entry - client buildclientv1.BuildV1Interface - resources *resourceServer - mutateResourceLimits bool - decoder admission.Decoder - cpuCap int64 - memoryCap string - cpuPriorityScheduling int64 - percentageMeasured float64 - measuredPodCPUIncrease float64 - nodeCache *nodeAllocatableCache - authoritativeCPU bool - authoritativeMemory bool - authoritativeCPUDryRun bool - authoritativeMemoryDryRun bool - reporter results.PodScalerReporter + logger *logrus.Entry + client buildclientv1.BuildV1Interface + resources *resourceServer + mutateResourceLimits bool + decoder admission.Decoder + cpuCap int64 + memoryCap string + cpuPriorityScheduling int64 + percentageMeasured float64 + measuredPodCPUIncrease float64 + nodeCache *nodeAllocatableCache + authoritativeCPU bool + authoritativeMemory bool + authoritativeCPUDryRun bool + authoritativeMemoryDryRun bool + authoritativeCPUMaxReductionPercent float64 + authoritativeMemoryMaxReductionPercent float64 + reporter results.PodScalerReporter } func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response { @@ -129,7 +131,7 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio m.setMeasuredLabel(pod, false, logger) } - mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, isMeasured, m.nodeCache, m.measuredPodCPUIncrease, m.authoritativeCPU, m.authoritativeMemory, m.authoritativeCPUDryRun, m.authoritativeMemoryDryRun, m.reporter, logger) + mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, isMeasured, m.nodeCache, m.measuredPodCPUIncrease, m.authoritativeCPU, m.authoritativeMemory, m.authoritativeCPUDryRun, m.authoritativeMemoryDryRun, m.authoritativeCPUMaxReductionPercent, m.authoritativeMemoryMaxReductionPercent, m.reporter, logger) m.addPriorityClass(pod) marshaledPod, err := json.Marshal(pod) @@ -228,12 +230,10 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) { } } -const authoritativeMaxReductionPercent = 0.25 - var authoritativeMinCPURequest = resource.MustParse("10m") // useOursIfLarger updates fields in theirs when ours are larger, or lowers them when authoritative mode allows. -func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, isMeasured bool, workloadClass string, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, reporter results.PodScalerReporter, logger *logrus.Entry) { +func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, isMeasured bool, workloadClass string, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent float64, reporter results.PodScalerReporter, logger *logrus.Entry) { for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} { if item.Requests == nil { item.Requests = corev1.ResourceList{} @@ -286,9 +286,11 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo } authoritative := authoritativeMemory dryRun := authoritativeMemoryDryRun + maxReductionPercent := authoritativeMemoryMaxReductionPercent if field == corev1.ResourceCPU { authoritative = authoritativeCPU dryRun = authoritativeCPUDryRun + maxReductionPercent = authoritativeCPUMaxReductionPercent } if !authoritative && !dryRun { continue @@ -319,14 +321,15 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo if 1.0-(ourValue/theirValue) < 0.05 { continue } - capped := false - if 1.0-(ourValue/theirValue) > authoritativeMaxReductionPercent { - if field == corev1.ResourceCPU { - our.SetMilli(int64(float64(their.MilliValue()) * (1.0 - authoritativeMaxReductionPercent))) - } else { - our.Set(int64(theirValue * (1.0 - authoritativeMaxReductionPercent))) + reductionCapped := false + if 1.0-(ourValue/theirValue) > maxReductionPercent { + switch field { + case corev1.ResourceCPU: + our.SetMilli(int64(float64(their.MilliValue()) * (1.0 - maxReductionPercent))) + case corev1.ResourceMemory: + our.Set(int64(theirValue * (1.0 - maxReductionPercent))) } - capped = true + reductionCapped = true } if field == corev1.ResourceCPU { if our.Cmp(authoritativeMinCPURequest) < 0 { @@ -338,12 +341,12 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo } if dryRun { fieldLogger.WithFields(logrus.Fields{ - "event": "authoritative_decrease_dry_run", - "workloadClass": workloadClass, - "would_set": our.String(), - "authoritative": authoritative, - "reduction_pct": (1.0 - our.AsApproximateFloat64()/theirValue) * 100, - "capped_25pct": capped, + "event": "authoritative_decrease_dry_run", + "workloadClass": workloadClass, + "would_set": our.String(), + "authoritative": authoritative, + "reduction_pct": (1.0 - our.AsApproximateFloat64()/theirValue) * 100, + "reduction_capped": reductionCapped, }).Info("authoritative decrease dry-run") continue } @@ -408,7 +411,7 @@ func preventUnschedulableWithCaps(resources *corev1.ResourceRequirements, cpuCap } } -func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, isMeasured bool, nodeCache *nodeAllocatableCache, measuredPodCPUIncrease float64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, reporter results.PodScalerReporter, logger *logrus.Entry) { +func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, isMeasured bool, nodeCache *nodeAllocatableCache, measuredPodCPUIncrease float64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent float64, reporter results.PodScalerReporter, logger *logrus.Entry) { workloadClass := pod.Labels[ciWorkloadLabel] mutateResources := func(containers []corev1.Container) { @@ -460,7 +463,7 @@ func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceL logger.Debugf("recommendation exists for: %s (using max of measured and unmeasured)", containers[i].Name) workloadType := determineWorkloadType(pod.Annotations, pod.Labels) workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels) - useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, isMeasured, workloadClass, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun, reporter, logger) + useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, isMeasured, workloadClass, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent, reporter, logger) if mutateResourceLimits { reconcileLimits(&containers[i].Resources) } diff --git a/cmd/pod-scaler/admission_test.go b/cmd/pod-scaler/admission_test.go index 09ee1a89359..dbbe3370645 100644 --- a/cmd/pod-scaler/admission_test.go +++ b/cmd/pod-scaler/admission_test.go @@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { original := testCase.pod.DeepCopy() - mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, nil, 50.0, false, false, false, false, &defaultReporter, logrus.WithField("test", testCase.name)) + mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, nil, 50.0, false, false, false, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", testCase.name)) diff := cmp.Diff(original, testCase.pod) // In some cases, cmp.Diff decides to use non-breaking spaces, and it's not // particularly deterministic about this. We don't care. @@ -619,7 +619,7 @@ func TestMutatePodResources_ciWorkloadLabelDoesNotBreakCacheLookup(t *testing.T) }, } - mutatePodResources(pod, server, false, 10, "20Gi", false, nil, 50.0, false, false, false, false, &defaultReporter, logger) + mutatePodResources(pod, server, false, 10, "20Gi", false, nil, 50.0, false, false, false, false, 0.25, 0.25, &defaultReporter, logger) got := pod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue() const want = 6000 // 5000m recommendation inflated by 1.2 in useOursIfLarger @@ -788,7 +788,7 @@ func TestUseOursIfLarger(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { - useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", false, "", false, false, false, false, &defaultReporter, logrus.WithField("test", testCase.name)) + useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", false, "", false, false, false, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", testCase.name)) if diff := cmp.Diff(testCase.theirs, testCase.expected); diff != "" { t.Errorf("%s: got incorrect resources after mutation: %v", testCase.name, diff) } @@ -913,7 +913,7 @@ func TestUseOursIfLarger_authoritative(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", tc.isMeasured, "", tc.authoritativeCPU, tc.authoritativeMemory, false, false, &defaultReporter, logrus.WithField("test", tc.name)) + useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", tc.isMeasured, "", tc.authoritativeCPU, tc.authoritativeMemory, false, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", tc.name)) if diff := cmp.Diff(tc.theirs, tc.expected); diff != "" { t.Errorf("unexpected resources: %s", diff) } @@ -921,6 +921,33 @@ func TestUseOursIfLarger_authoritative(t *testing.T) { } } +func TestUseOursIfLarger_authoritativeUncappedMemory(t *testing.T) { + ours := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(10, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(1e1, resource.BinarySI), + }, + } + theirs := corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI), + }, + } + expected := corev1.ResourceRequirements{ + Limits: corev1.ResourceList{}, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI), + corev1.ResourceMemory: *resource.NewQuantity(12, resource.BinarySI), + }, + } + + useOursIfLarger(&ours, &theirs, "test", "build", false, "", true, true, false, false, 0.25, 1.0, &defaultReporter, logrus.WithField("test", t.Name())) + if diff := cmp.Diff(theirs, expected); diff != "" { + t.Errorf("unexpected resources: %s", diff) + } +} + func TestUseOursIfLarger_authoritativeDryRun(t *testing.T) { ours := corev1.ResourceRequirements{ Requests: corev1.ResourceList{ @@ -939,7 +966,7 @@ func TestUseOursIfLarger_authoritativeDryRun(t *testing.T) { }, } - useOursIfLarger(&ours, &theirs, "test", "build", false, "", true, false, true, false, &defaultReporter, logrus.WithField("test", t.Name())) + useOursIfLarger(&ours, &theirs, "test", "build", false, "", true, false, true, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", t.Name())) if diff := cmp.Diff(theirs, expected); diff != "" { t.Errorf("dry-run should not mutate resources: %s", diff) } @@ -1022,7 +1049,7 @@ func TestUseOursIsLarger_ReporterReports(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", false, "", false, false, false, false, &tc.reporter, logrus.WithField("test", tc.name)) + useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", false, "", false, false, false, false, 0.25, 0.25, &tc.reporter, logrus.WithField("test", tc.name)) if diff := cmp.Diff(tc.reporter.called, tc.expected); diff != "" { t.Errorf("actual and expected reporter states don't match, : %v", diff) diff --git a/cmd/pod-scaler/main.go b/cmd/pod-scaler/main.go index c0162420419..cc44fe13754 100644 --- a/cmd/pod-scaler/main.go +++ b/cmd/pod-scaler/main.go @@ -63,19 +63,21 @@ type consumerOptions struct { port int uiPort int - dataDir string - certDir string - mutateResourceLimits bool - cpuCap int64 - memoryCap string - cpuPriorityScheduling int64 - percentageMeasured float64 - measuredPodCPUIncrease float64 - systemReservedCPU int64 - authoritativeCPU bool - authoritativeMemory bool - authoritativeCPUDryRun bool - authoritativeMemoryDryRun bool + dataDir string + certDir string + mutateResourceLimits bool + cpuCap int64 + memoryCap string + cpuPriorityScheduling int64 + percentageMeasured float64 + measuredPodCPUIncrease float64 + systemReservedCPU int64 + authoritativeCPU bool + authoritativeMemory bool + authoritativeCPUDryRun bool + authoritativeMemoryDryRun bool + authoritativeCPUMaxReductionPercent float64 + authoritativeMemoryMaxReductionPercent float64 } func bindOptions(fs *flag.FlagSet) *options { @@ -106,6 +108,8 @@ func bindOptions(fs *flag.FlagSet) *options { fs.BoolVar(&o.authoritativeMemory, "authoritative-memory", false, "Allow admission to decrease memory requests and limits based on measured usage.") fs.BoolVar(&o.authoritativeCPUDryRun, "authoritative-cpu-dry-run", false, "Log CPU decreases that authoritative mode would apply without mutating pods.") fs.BoolVar(&o.authoritativeMemoryDryRun, "authoritative-memory-dry-run", false, "Log memory decreases that authoritative mode would apply without mutating pods.") + fs.Float64Var(&o.authoritativeCPUMaxReductionPercent, "authoritative-cpu-max-reduction-percent", 1.0, "Maximum CPU request reduction per admission in authoritative mode, as a fraction (0.25 = 25%, 1.0 = no cap).") + fs.Float64Var(&o.authoritativeMemoryMaxReductionPercent, "authoritative-memory-max-reduction-percent", 1.0, "Maximum memory request reduction per admission in authoritative mode, as a fraction (0.25 = 25%, 1.0 = no cap).") o.resultsOptions.Bind(fs) return &o } @@ -145,6 +149,12 @@ func (o *options) validate() error { if o.measuredPodCPUIncrease < 0 { return errors.New("--measured-pod-cpu-increase must be >= 0") } + if o.authoritativeCPUMaxReductionPercent < 0 || o.authoritativeCPUMaxReductionPercent > 1 { + return errors.New("--authoritative-cpu-max-reduction-percent must be between 0 and 1") + } + if o.authoritativeMemoryMaxReductionPercent < 0 || o.authoritativeMemoryMaxReductionPercent > 1 { + return errors.New("--authoritative-memory-max-reduction-percent must be between 0 and 1") + } if err := o.resultsOptions.Validate(); err != nil { return err } @@ -295,7 +305,7 @@ func mainAdmission(opts *options, cache Cache) { logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.") } - go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, kubeClient, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.percentageMeasured, opts.measuredPodCPUIncrease, opts.systemReservedCPU, opts.authoritativeCPU, opts.authoritativeMemory, opts.authoritativeCPUDryRun, opts.authoritativeMemoryDryRun, reporter) + go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, kubeClient, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.percentageMeasured, opts.measuredPodCPUIncrease, opts.systemReservedCPU, opts.authoritativeCPU, opts.authoritativeMemory, opts.authoritativeCPUDryRun, opts.authoritativeMemoryDryRun, opts.authoritativeCPUMaxReductionPercent, opts.authoritativeMemoryMaxReductionPercent, reporter) } func loaders(cache Cache) map[string][]*cacheReloader {