Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 40 additions & 37 deletions cmd/pod-scaler/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/openshift/ci-tools/pkg/steps"
)

func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, kubeClient kubernetes.Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, percentageMeasured float64, measuredPodCPUIncrease float64, systemReservedCPU int64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, reporter results.PodScalerReporter) {
func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Interface, kubeClient kubernetes.Interface, loaders map[string][]*cacheReloader, mutateResourceLimits bool, cpuCap int64, memoryCap string, cpuPriorityScheduling int64, percentageMeasured float64, measuredPodCPUIncrease float64, systemReservedCPU int64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent float64, reporter results.PodScalerReporter) {
logger := logrus.WithField("component", "pod-scaler admission")
logger.Infof("Initializing admission webhook server with %d loaders.", len(loaders))
if authoritativeCPUDryRun || authoritativeMemoryDryRun {
Expand All @@ -57,30 +57,32 @@ func admit(port, healthPort int, certDir string, client buildclientv1.BuildV1Int
Port: port,
CertDir: certDir,
})
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, percentageMeasured: percentageMeasured, measuredPodCPUIncrease: measuredPodCPUIncrease, nodeCache: nodeCache, authoritativeCPU: authoritativeCPU, authoritativeMemory: authoritativeMemory, authoritativeCPUDryRun: authoritativeCPUDryRun, authoritativeMemoryDryRun: authoritativeMemoryDryRun, reporter: reporter}})
server.Register("/pods", &webhook.Admission{Handler: &podMutator{logger: logger, client: client, decoder: decoder, resources: resources, mutateResourceLimits: mutateResourceLimits, cpuCap: cpuCap, memoryCap: memoryCap, cpuPriorityScheduling: cpuPriorityScheduling, percentageMeasured: percentageMeasured, measuredPodCPUIncrease: measuredPodCPUIncrease, nodeCache: nodeCache, authoritativeCPU: authoritativeCPU, authoritativeMemory: authoritativeMemory, authoritativeCPUDryRun: authoritativeCPUDryRun, authoritativeMemoryDryRun: authoritativeMemoryDryRun, authoritativeCPUMaxReductionPercent: authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent: authoritativeMemoryMaxReductionPercent, reporter: reporter}})
logger.Info("Serving admission webhooks.")
if err := server.Start(interrupts.Context()); err != nil {
logrus.WithError(err).Fatal("Failed to serve webhooks.")
}
}

type podMutator struct {
logger *logrus.Entry
client buildclientv1.BuildV1Interface
resources *resourceServer
mutateResourceLimits bool
decoder admission.Decoder
cpuCap int64
memoryCap string
cpuPriorityScheduling int64
percentageMeasured float64
measuredPodCPUIncrease float64
nodeCache *nodeAllocatableCache
authoritativeCPU bool
authoritativeMemory bool
authoritativeCPUDryRun bool
authoritativeMemoryDryRun bool
reporter results.PodScalerReporter
logger *logrus.Entry
client buildclientv1.BuildV1Interface
resources *resourceServer
mutateResourceLimits bool
decoder admission.Decoder
cpuCap int64
memoryCap string
cpuPriorityScheduling int64
percentageMeasured float64
measuredPodCPUIncrease float64
nodeCache *nodeAllocatableCache
authoritativeCPU bool
authoritativeMemory bool
authoritativeCPUDryRun bool
authoritativeMemoryDryRun bool
authoritativeCPUMaxReductionPercent float64
authoritativeMemoryMaxReductionPercent float64
reporter results.PodScalerReporter
}

func (m *podMutator) Handle(ctx context.Context, req admission.Request) admission.Response {
Expand Down Expand Up @@ -129,7 +131,7 @@ func (m *podMutator) Handle(ctx context.Context, req admission.Request) admissio
m.setMeasuredLabel(pod, false, logger)
}

mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, isMeasured, m.nodeCache, m.measuredPodCPUIncrease, m.authoritativeCPU, m.authoritativeMemory, m.authoritativeCPUDryRun, m.authoritativeMemoryDryRun, m.reporter, logger)
mutatePodResources(pod, m.resources, m.mutateResourceLimits, m.cpuCap, m.memoryCap, isMeasured, m.nodeCache, m.measuredPodCPUIncrease, m.authoritativeCPU, m.authoritativeMemory, m.authoritativeCPUDryRun, m.authoritativeMemoryDryRun, m.authoritativeCPUMaxReductionPercent, m.authoritativeMemoryMaxReductionPercent, m.reporter, logger)
m.addPriorityClass(pod)

marshaledPod, err := json.Marshal(pod)
Expand Down Expand Up @@ -228,12 +230,10 @@ func mutatePodLabels(pod *corev1.Pod, build *buildv1.Build) {
}
}

const authoritativeMaxReductionPercent = 0.25

var authoritativeMinCPURequest = resource.MustParse("10m")

// useOursIfLarger updates fields in theirs when ours are larger, or lowers them when authoritative mode allows.
func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, isMeasured bool, workloadClass string, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, workloadName, workloadType string, isMeasured bool, workloadClass string, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent float64, reporter results.PodScalerReporter, logger *logrus.Entry) {
for _, item := range []*corev1.ResourceRequirements{allOfOurs, allOfTheirs} {
if item.Requests == nil {
item.Requests = corev1.ResourceList{}
Expand Down Expand Up @@ -286,9 +286,11 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
}
authoritative := authoritativeMemory
dryRun := authoritativeMemoryDryRun
maxReductionPercent := authoritativeMemoryMaxReductionPercent
if field == corev1.ResourceCPU {
authoritative = authoritativeCPU
dryRun = authoritativeCPUDryRun
maxReductionPercent = authoritativeCPUMaxReductionPercent
}
if !authoritative && !dryRun {
continue
Expand Down Expand Up @@ -319,14 +321,15 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
if 1.0-(ourValue/theirValue) < 0.05 {
continue
}
capped := false
if 1.0-(ourValue/theirValue) > authoritativeMaxReductionPercent {
if field == corev1.ResourceCPU {
our.SetMilli(int64(float64(their.MilliValue()) * (1.0 - authoritativeMaxReductionPercent)))
} else {
our.Set(int64(theirValue * (1.0 - authoritativeMaxReductionPercent)))
reductionCapped := false
if 1.0-(ourValue/theirValue) > maxReductionPercent {
switch field {
case corev1.ResourceCPU:
our.SetMilli(int64(float64(their.MilliValue()) * (1.0 - maxReductionPercent)))
case corev1.ResourceMemory:
our.Set(int64(theirValue * (1.0 - maxReductionPercent)))
}
capped = true
reductionCapped = true
}
if field == corev1.ResourceCPU {
if our.Cmp(authoritativeMinCPURequest) < 0 {
Expand All @@ -338,12 +341,12 @@ func useOursIfLarger(allOfOurs, allOfTheirs *corev1.ResourceRequirements, worklo
}
if dryRun {
fieldLogger.WithFields(logrus.Fields{
"event": "authoritative_decrease_dry_run",
"workloadClass": workloadClass,
"would_set": our.String(),
"authoritative": authoritative,
"reduction_pct": (1.0 - our.AsApproximateFloat64()/theirValue) * 100,
"capped_25pct": capped,
"event": "authoritative_decrease_dry_run",
"workloadClass": workloadClass,
"would_set": our.String(),
"authoritative": authoritative,
"reduction_pct": (1.0 - our.AsApproximateFloat64()/theirValue) * 100,
"reduction_capped": reductionCapped,
}).Info("authoritative decrease dry-run")
continue
}
Expand Down Expand Up @@ -408,7 +411,7 @@ func preventUnschedulableWithCaps(resources *corev1.ResourceRequirements, cpuCap
}
}

func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, isMeasured bool, nodeCache *nodeAllocatableCache, measuredPodCPUIncrease float64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, reporter results.PodScalerReporter, logger *logrus.Entry) {
func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceLimits bool, cpuCap int64, memoryCap string, isMeasured bool, nodeCache *nodeAllocatableCache, measuredPodCPUIncrease float64, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun bool, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent float64, reporter results.PodScalerReporter, logger *logrus.Entry) {
workloadClass := pod.Labels[ciWorkloadLabel]

mutateResources := func(containers []corev1.Container) {
Expand Down Expand Up @@ -460,7 +463,7 @@ func mutatePodResources(pod *corev1.Pod, server *resourceServer, mutateResourceL
logger.Debugf("recommendation exists for: %s (using max of measured and unmeasured)", containers[i].Name)
workloadType := determineWorkloadType(pod.Annotations, pod.Labels)
workloadName := determineWorkloadName(pod.Name, containers[i].Name, workloadType, pod.Labels)
useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, isMeasured, workloadClass, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun, reporter, logger)
useOursIfLarger(&resources, &containers[i].Resources, workloadName, workloadType, isMeasured, workloadClass, authoritativeCPU, authoritativeMemory, authoritativeCPUDryRun, authoritativeMemoryDryRun, authoritativeCPUMaxReductionPercent, authoritativeMemoryMaxReductionPercent, reporter, logger)
if mutateResourceLimits {
reconcileLimits(&containers[i].Resources)
}
Expand Down
39 changes: 33 additions & 6 deletions cmd/pod-scaler/admission_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func TestMutatePodResources(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
original := testCase.pod.DeepCopy()
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, nil, 50.0, false, false, false, false, &defaultReporter, logrus.WithField("test", testCase.name))
mutatePodResources(testCase.pod, testCase.server, testCase.mutateResourceLimits, 10, "20Gi", false, nil, 50.0, false, false, false, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", testCase.name))
diff := cmp.Diff(original, testCase.pod)
// In some cases, cmp.Diff decides to use non-breaking spaces, and it's not
// particularly deterministic about this. We don't care.
Expand Down Expand Up @@ -619,7 +619,7 @@ func TestMutatePodResources_ciWorkloadLabelDoesNotBreakCacheLookup(t *testing.T)
},
}

mutatePodResources(pod, server, false, 10, "20Gi", false, nil, 50.0, false, false, false, false, &defaultReporter, logger)
mutatePodResources(pod, server, false, 10, "20Gi", false, nil, 50.0, false, false, false, false, 0.25, 0.25, &defaultReporter, logger)

got := pod.Spec.Containers[0].Resources.Requests.Cpu().MilliValue()
const want = 6000 // 5000m recommendation inflated by 1.2 in useOursIfLarger
Expand Down Expand Up @@ -788,7 +788,7 @@ func TestUseOursIfLarger(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", false, "", false, false, false, false, &defaultReporter, logrus.WithField("test", testCase.name))
useOursIfLarger(&testCase.ours, &testCase.theirs, "test", "build", false, "", false, false, false, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", testCase.name))
if diff := cmp.Diff(testCase.theirs, testCase.expected); diff != "" {
t.Errorf("%s: got incorrect resources after mutation: %v", testCase.name, diff)
}
Expand Down Expand Up @@ -913,14 +913,41 @@ func TestUseOursIfLarger_authoritative(t *testing.T) {
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", tc.isMeasured, "", tc.authoritativeCPU, tc.authoritativeMemory, false, false, &defaultReporter, logrus.WithField("test", tc.name))
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", tc.isMeasured, "", tc.authoritativeCPU, tc.authoritativeMemory, false, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", tc.name))
if diff := cmp.Diff(tc.theirs, tc.expected); diff != "" {
t.Errorf("unexpected resources: %s", diff)
}
})
}
}

func TestUseOursIfLarger_authoritativeUncappedMemory(t *testing.T) {
ours := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(10, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(1e1, resource.BinarySI),
},
}
theirs := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(100, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(2e10, resource.BinarySI),
},
}
expected := corev1.ResourceRequirements{
Limits: corev1.ResourceList{},
Requests: corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(75, resource.DecimalSI),
corev1.ResourceMemory: *resource.NewQuantity(12, resource.BinarySI),
},
}

useOursIfLarger(&ours, &theirs, "test", "build", false, "", true, true, false, false, 0.25, 1.0, &defaultReporter, logrus.WithField("test", t.Name()))
if diff := cmp.Diff(theirs, expected); diff != "" {
t.Errorf("unexpected resources: %s", diff)
}
}

func TestUseOursIfLarger_authoritativeDryRun(t *testing.T) {
ours := corev1.ResourceRequirements{
Requests: corev1.ResourceList{
Expand All @@ -939,7 +966,7 @@ func TestUseOursIfLarger_authoritativeDryRun(t *testing.T) {
},
}

useOursIfLarger(&ours, &theirs, "test", "build", false, "", true, false, true, false, &defaultReporter, logrus.WithField("test", t.Name()))
useOursIfLarger(&ours, &theirs, "test", "build", false, "", true, false, true, false, 0.25, 0.25, &defaultReporter, logrus.WithField("test", t.Name()))
if diff := cmp.Diff(theirs, expected); diff != "" {
t.Errorf("dry-run should not mutate resources: %s", diff)
}
Expand Down Expand Up @@ -1022,7 +1049,7 @@ func TestUseOursIsLarger_ReporterReports(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", false, "", false, false, false, false, &tc.reporter, logrus.WithField("test", tc.name))
useOursIfLarger(&tc.ours, &tc.theirs, "test", "build", false, "", false, false, false, false, 0.25, 0.25, &tc.reporter, logrus.WithField("test", tc.name))

if diff := cmp.Diff(tc.reporter.called, tc.expected); diff != "" {
t.Errorf("actual and expected reporter states don't match, : %v", diff)
Expand Down
38 changes: 24 additions & 14 deletions cmd/pod-scaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,21 @@ type consumerOptions struct {
port int
uiPort int

dataDir string
certDir string
mutateResourceLimits bool
cpuCap int64
memoryCap string
cpuPriorityScheduling int64
percentageMeasured float64
measuredPodCPUIncrease float64
systemReservedCPU int64
authoritativeCPU bool
authoritativeMemory bool
authoritativeCPUDryRun bool
authoritativeMemoryDryRun bool
dataDir string
certDir string
mutateResourceLimits bool
cpuCap int64
memoryCap string
cpuPriorityScheduling int64
percentageMeasured float64
measuredPodCPUIncrease float64
systemReservedCPU int64
authoritativeCPU bool
authoritativeMemory bool
authoritativeCPUDryRun bool
authoritativeMemoryDryRun bool
authoritativeCPUMaxReductionPercent float64
authoritativeMemoryMaxReductionPercent float64
}

func bindOptions(fs *flag.FlagSet) *options {
Expand Down Expand Up @@ -106,6 +108,8 @@ func bindOptions(fs *flag.FlagSet) *options {
fs.BoolVar(&o.authoritativeMemory, "authoritative-memory", false, "Allow admission to decrease memory requests and limits based on measured usage.")
fs.BoolVar(&o.authoritativeCPUDryRun, "authoritative-cpu-dry-run", false, "Log CPU decreases that authoritative mode would apply without mutating pods.")
fs.BoolVar(&o.authoritativeMemoryDryRun, "authoritative-memory-dry-run", false, "Log memory decreases that authoritative mode would apply without mutating pods.")
fs.Float64Var(&o.authoritativeCPUMaxReductionPercent, "authoritative-cpu-max-reduction-percent", 1.0, "Maximum CPU request reduction per admission in authoritative mode, as a fraction (0.25 = 25%, 1.0 = no cap).")
fs.Float64Var(&o.authoritativeMemoryMaxReductionPercent, "authoritative-memory-max-reduction-percent", 1.0, "Maximum memory request reduction per admission in authoritative mode, as a fraction (0.25 = 25%, 1.0 = no cap).")
o.resultsOptions.Bind(fs)
return &o
}
Expand Down Expand Up @@ -145,6 +149,12 @@ func (o *options) validate() error {
if o.measuredPodCPUIncrease < 0 {
return errors.New("--measured-pod-cpu-increase must be >= 0")
}
if o.authoritativeCPUMaxReductionPercent < 0 || o.authoritativeCPUMaxReductionPercent > 1 {
return errors.New("--authoritative-cpu-max-reduction-percent must be between 0 and 1")
}
if o.authoritativeMemoryMaxReductionPercent < 0 || o.authoritativeMemoryMaxReductionPercent > 1 {
return errors.New("--authoritative-memory-max-reduction-percent must be between 0 and 1")
}
if err := o.resultsOptions.Validate(); err != nil {
return err
}
Expand Down Expand Up @@ -295,7 +305,7 @@ func mainAdmission(opts *options, cache Cache) {
logrus.WithError(err).Fatal("Failed to create pod-scaler reporter.")
}

go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, kubeClient, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.percentageMeasured, opts.measuredPodCPUIncrease, opts.systemReservedCPU, opts.authoritativeCPU, opts.authoritativeMemory, opts.authoritativeCPUDryRun, opts.authoritativeMemoryDryRun, reporter)
go admit(opts.port, opts.instrumentationOptions.HealthPort, opts.certDir, client, kubeClient, loaders(cache), opts.mutateResourceLimits, opts.cpuCap, opts.memoryCap, opts.cpuPriorityScheduling, opts.percentageMeasured, opts.measuredPodCPUIncrease, opts.systemReservedCPU, opts.authoritativeCPU, opts.authoritativeMemory, opts.authoritativeCPUDryRun, opts.authoritativeMemoryDryRun, opts.authoritativeCPUMaxReductionPercent, opts.authoritativeMemoryMaxReductionPercent, reporter)
}

func loaders(cache Cache) map[string][]*cacheReloader {
Expand Down