package org.apache.flink.kubernetes.operator.autoscaler;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.SortedMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.class */
public class JobVertexScaler {
    private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);

    @VisibleForTesting
    public static final String INNEFFECTIVE_MESSAGE_FORMAT = "Skipping further scale up after ineffective previous scale up for %s";
    private Clock clock = Clock.system(ZoneId.systemDefault());
    private EventRecorder eventRecorder;

    public JobVertexScaler(EventRecorder eventRecorder) {
        this.eventRecorder = eventRecorder;
    }

    public int computeScaleTargetParallelism(AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration, JobVertexID jobVertexID, Map<ScalingMetric, EvaluatedScalingMetric> map, SortedMap<Instant, ScalingSummary> sortedMap) {
        int current = (int) map.get(ScalingMetric.PARALLELISM).getCurrent();
        double average = map.get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        if (Double.isNaN(average)) {
            LOG.warn("True processing rate is not available for {}, cannot compute new parallelism", jobVertexID);
            return current;
        }
        double targetProcessingCapacity = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, ((Double) configuration.get(AutoScalerOptions.TARGET_UTILIZATION)).doubleValue(), true);
        if (Double.isNaN(targetProcessingCapacity)) {
            LOG.warn("Target data rate is not available for {}, cannot compute new parallelism", jobVertexID);
            return current;
        }
        LOG.debug("Target processing capacity for {} is {}", jobVertexID, Double.valueOf(targetProcessingCapacity));
        double d = targetProcessingCapacity / average;
        double doubleValue = 1.0d - ((Double) configuration.get(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR)).doubleValue();
        if (d < doubleValue) {
            LOG.debug("Computed scale factor of {} for {} is capped by maximum scale down factor to {}", new Object[]{Double.valueOf(d), jobVertexID, Double.valueOf(doubleValue)});
            d = doubleValue;
        }
        int scale = scale(current, (int) map.get(ScalingMetric.MAX_PARALLELISM).getCurrent(), d, configuration.getInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM), configuration.getInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM));
        if (scale == current || blockScalingBasedOnPastActions(abstractFlinkResource, jobVertexID, configuration, map, sortedMap, current, scale)) {
            return current;
        }
        map.put(ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(targetProcessingCapacity));
        return scale;
    }

    private boolean blockScalingBasedOnPastActions(AbstractFlinkResource<?, ?> abstractFlinkResource, JobVertexID jobVertexID, Configuration configuration, Map<ScalingMetric, EvaluatedScalingMetric> map, SortedMap<Instant, ScalingSummary> sortedMap, int i, int i2) {
        if (sortedMap.isEmpty()) {
            return false;
        }
        boolean z = i < i2;
        Instant lastKey = sortedMap.lastKey();
        ScalingSummary scalingSummary = sortedMap.get(lastKey);
        if (i == scalingSummary.getNewParallelism() && scalingSummary.isScaledUp()) {
            return z ? detectIneffectiveScaleUp(abstractFlinkResource, jobVertexID, configuration, map, scalingSummary) : detectImmediateScaleDownAfterScaleUp(jobVertexID, configuration, lastKey);
        }
        return false;
    }

    private boolean detectImmediateScaleDownAfterScaleUp(JobVertexID jobVertexID, Configuration configuration, Instant instant) {
        if (!instant.plus((TemporalAmount) configuration.get(AutoScalerOptions.SCALE_UP_GRACE_PERIOD)).isAfter(this.clock.instant())) {
            return false;
        }
        LOG.info("Skipping immediate scale down after scale up within grace period for {}", jobVertexID);
        return true;
    }

    private boolean detectIneffectiveScaleUp(AbstractFlinkResource<?, ?> abstractFlinkResource, JobVertexID jobVertexID, Configuration configuration, Map<ScalingMetric, EvaluatedScalingMetric> map, ScalingSummary scalingSummary) {
        double average = scalingSummary.getMetrics().get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        if ((map.get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage() - average) / (scalingSummary.getMetrics().get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent() - average) >= ((Double) configuration.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD)).doubleValue()) {
            return false;
        }
        String format = String.format(INNEFFECTIVE_MESSAGE_FORMAT, jobVertexID);
        this.eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Normal, EventRecorder.Reason.IneffectiveScaling, EventRecorder.Component.Operator, format);
        if (!((Boolean) configuration.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)).booleanValue()) {
            return false;
        }
        LOG.info(format);
        return true;
    }

    @VisibleForTesting
    protected static int scale(int i, int i2, double d, int i3, int i4) {
        Preconditions.checkArgument(i3 <= i4, "The minimum parallelism must not be greater than the maximum parallelism.");
        if (i3 > i2) {
            LOG.warn("Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.", Integer.valueOf(i3), Integer.valueOf(i2));
        }
        if (i2 < i4 && i4 != Integer.MAX_VALUE) {
            LOG.debug("Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.", Integer.valueOf(i4), Integer.valueOf(i2));
        }
        int min = (int) Math.min(Math.ceil(d * i), 2.147483647E9d);
        int min2 = Math.min(i2, i4);
        int min3 = Math.min(Math.max(i3, min), min2);
        for (int i5 = min3; i5 <= i2 / 2 && i5 <= min2; i5++) {
            if (i2 % i5 == 0) {
                return i5;
            }
        }
        return min3;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    public void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }
}
