package org.apache.flink.kubernetes.operator.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import org.apache.commons.math3.stat.StatUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.class */
public class ScalingMetricEvaluator {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);
    private Clock clock = Clock.systemDefaultZone();

    public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate(Configuration configuration, CollectedMetrics collectedMetrics) {
        HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap = new HashMap<>();
        SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> metricHistory = collectedMetrics.getMetricHistory();
        JobTopology jobTopology = collectedMetrics.getJobTopology();
        for (JobVertexID jobVertexID : jobTopology.getVerticesInTopologicalOrder()) {
            hashMap.put(jobVertexID, computeVertexScalingSummary(configuration, hashMap, metricHistory, jobTopology, jobVertexID));
        }
        return hashMap;
    }

    @NotNull
    private Map<ScalingMetric, EvaluatedScalingMetric> computeVertexScalingSummary(Configuration configuration, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap, SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> sortedMap, JobTopology jobTopology, JobVertexID jobVertexID) {
        Map<ScalingMetric, Double> map = sortedMap.get(sortedMap.lastKey()).get(jobVertexID);
        HashMap hashMap2 = new HashMap();
        computeTargetDataRate(jobTopology, jobVertexID, configuration, hashMap, sortedMap, map, hashMap2);
        hashMap2.put(ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(map.get(ScalingMetric.TRUE_PROCESSING_RATE).doubleValue(), getAverage(ScalingMetric.TRUE_PROCESSING_RATE, jobVertexID, sortedMap)));
        hashMap2.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(((Integer) jobTopology.getParallelisms().get(jobVertexID)).intValue()));
        hashMap2.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(jobTopology.getMaxParallelisms().get(jobVertexID).intValue()));
        computeProcessingRateThresholds(hashMap2, configuration);
        if (!((Set) jobTopology.getOutputs().get(jobVertexID)).isEmpty()) {
            hashMap2.put(ScalingMetric.TRUE_OUTPUT_RATE, new EvaluatedScalingMetric(map.get(ScalingMetric.TRUE_OUTPUT_RATE).doubleValue(), getAverage(ScalingMetric.TRUE_OUTPUT_RATE, jobVertexID, sortedMap)));
            hashMap2.put(ScalingMetric.OUTPUT_RATIO, new EvaluatedScalingMetric(map.get(ScalingMetric.OUTPUT_RATIO).doubleValue(), getAverage(ScalingMetric.OUTPUT_RATIO, jobVertexID, sortedMap)));
        }
        return hashMap2;
    }

    @VisibleForTesting
    protected static void computeProcessingRateThresholds(Map<ScalingMetric, EvaluatedScalingMetric> map, Configuration configuration) {
        double d = configuration.getDouble(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
        double targetProcessingCapacity = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, ((Double) configuration.get(AutoScalerOptions.TARGET_UTILIZATION)).doubleValue() + d, false);
        double targetProcessingCapacity2 = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, ((Double) configuration.get(AutoScalerOptions.TARGET_UTILIZATION)).doubleValue() - d, true);
        map.put(ScalingMetric.SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(targetProcessingCapacity));
        map.put(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(targetProcessingCapacity2));
    }

    private void computeTargetDataRate(JobTopology jobTopology, JobVertexID jobVertexID, Configuration configuration, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap, SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> sortedMap, Map<ScalingMetric, Double> map, Map<ScalingMetric, EvaluatedScalingMetric> map2) {
        if (((Set) jobTopology.getInputs().get(jobVertexID)).isEmpty()) {
            double seconds = ((Duration) configuration.get(AutoScalerOptions.CATCH_UP_DURATION)).toSeconds();
            ScalingMetric scalingMetric = map.containsKey(ScalingMetric.TARGET_DATA_RATE) ? ScalingMetric.TARGET_DATA_RATE : ScalingMetric.SOURCE_DATA_RATE;
            if (!map.containsKey(scalingMetric)) {
                throw new RuntimeException("Cannot evaluate metrics without source target rate information");
            }
            map2.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(map.get(scalingMetric).doubleValue(), getAverage(scalingMetric, jobVertexID, sortedMap)));
            double doubleValue = seconds == 0.0d ? 0.0d : map.getOrDefault(ScalingMetric.LAG, Double.valueOf(0.0d)).doubleValue() / seconds;
            if (doubleValue > 0.0d) {
                LOG.debug("Extra backlog processing input rate for {} is {}", jobVertexID, Double.valueOf(doubleValue));
            }
            map2.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(doubleValue));
            return;
        }
        double d = 0.0d;
        double d2 = 0.0d;
        double d3 = 0.0d;
        Iterator it = ((Set) jobTopology.getInputs().get(jobVertexID)).iterator();
        while (it.hasNext()) {
            Map<ScalingMetric, EvaluatedScalingMetric> map3 = hashMap.get((JobVertexID) it.next());
            EvaluatedScalingMetric evaluatedScalingMetric = map3.get(ScalingMetric.TARGET_DATA_RATE);
            double average = map3.get(ScalingMetric.OUTPUT_RATIO).getAverage();
            d += evaluatedScalingMetric.getCurrent() * average;
            d2 += evaluatedScalingMetric.getAverage() * average;
            d3 += map3.get(ScalingMetric.CATCH_UP_DATA_RATE).getCurrent() * average;
        }
        map2.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(d, d2));
        map2.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(d3));
    }

    private double getAverage(ScalingMetric scalingMetric, JobVertexID jobVertexID, SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> sortedMap) {
        return StatUtils.mean(sortedMap.values().stream().map(map -> {
            return (Map) map.get(jobVertexID);
        }).filter(map2 -> {
            return map2.containsKey(scalingMetric);
        }).mapToDouble(map3 -> {
            return ((Double) map3.get(scalingMetric)).doubleValue();
        }).filter(d -> {
            return !Double.isNaN(d);
        }).toArray());
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }
}
