package org.apache.flink.kubernetes.operator.autoscaler;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Stream;
import org.apache.commons.math3.stat.StatUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.class */
public class ScalingMetricEvaluator {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);

    public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate(Configuration configuration, CollectedMetricHistory collectedMetricHistory) {
        HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap = new HashMap<>();
        SortedMap<Instant, CollectedMetrics> metricHistory = collectedMetricHistory.getMetricHistory();
        JobTopology jobTopology = collectedMetricHistory.getJobTopology();
        boolean isProcessingBacklog = isProcessingBacklog(jobTopology, metricHistory, configuration);
        for (JobVertexID jobVertexID : jobTopology.getVerticesInTopologicalOrder()) {
            hashMap.put(jobVertexID, evaluateMetrics(configuration, hashMap, metricHistory, jobTopology, jobVertexID, isProcessingBacklog));
        }
        return hashMap;
    }

    @VisibleForTesting
    protected static boolean isProcessingBacklog(JobTopology jobTopology, SortedMap<Instant, CollectedMetrics> sortedMap, Configuration configuration) {
        Map<JobVertexID, Map<ScalingMetric, Double>> vertexMetrics = sortedMap.get(sortedMap.lastKey()).getVertexMetrics();
        Stream<JobVertexID> stream = jobTopology.getVerticesInTopologicalOrder().stream();
        Objects.requireNonNull(jobTopology);
        return stream.filter(jobTopology::isSource).anyMatch(jobVertexID -> {
            double doubleValue = ((Double) ((Map) vertexMetrics.get(jobVertexID)).getOrDefault(ScalingMetric.LAG, Double.valueOf(0.0d))).doubleValue();
            double average = getAverage(ScalingMetric.CURRENT_PROCESSING_RATE, jobVertexID, sortedMap);
            if (Double.isNaN(average) || doubleValue / average <= ((Duration) configuration.get(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD)).toSeconds()) {
                return false;
            }
            LOG.info("Currently processing backlog at source {}", jobVertexID);
            return true;
        });
    }

    @NotNull
    private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(Configuration configuration, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap, SortedMap<Instant, CollectedMetrics> sortedMap, JobTopology jobTopology, JobVertexID jobVertexID, boolean z) {
        Map<ScalingMetric, Double> map = sortedMap.get(sortedMap.lastKey()).getVertexMetrics().get(jobVertexID);
        HashMap hashMap2 = new HashMap();
        computeTargetDataRate(jobTopology, jobVertexID, configuration, hashMap, sortedMap, map, hashMap2);
        hashMap2.put(ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(map.get(ScalingMetric.TRUE_PROCESSING_RATE).doubleValue(), getAverage(ScalingMetric.TRUE_PROCESSING_RATE, jobVertexID, sortedMap)));
        hashMap2.put(ScalingMetric.LOAD, new EvaluatedScalingMetric(map.get(ScalingMetric.LOAD).doubleValue(), getAverage(ScalingMetric.LOAD, jobVertexID, sortedMap)));
        hashMap2.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(((Integer) jobTopology.getParallelisms().get(jobVertexID)).intValue()));
        hashMap2.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(jobTopology.getMaxParallelisms().get(jobVertexID).intValue()));
        computeProcessingRateThresholds(hashMap2, configuration, z);
        return hashMap2;
    }

    @VisibleForTesting
    protected static void computeProcessingRateThresholds(Map<ScalingMetric, EvaluatedScalingMetric> map, Configuration configuration, boolean z) {
        double d;
        double d2;
        double d3 = configuration.getDouble(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
        double doubleValue = ((Double) configuration.get(AutoScalerOptions.TARGET_UTILIZATION)).doubleValue();
        if (z) {
            d = 1.0d;
            d2 = 0.0d;
        } else {
            d = doubleValue + d3;
            d2 = doubleValue - d3;
        }
        double targetProcessingCapacity = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, d, false);
        double targetProcessingCapacity2 = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, d2, true);
        map.put(ScalingMetric.SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(targetProcessingCapacity));
        map.put(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(targetProcessingCapacity2));
    }

    private void computeTargetDataRate(JobTopology jobTopology, JobVertexID jobVertexID, Configuration configuration, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap, SortedMap<Instant, CollectedMetrics> sortedMap, Map<ScalingMetric, Double> map, Map<ScalingMetric, EvaluatedScalingMetric> map2) {
        if (jobTopology.isSource(jobVertexID)) {
            double seconds = ((Duration) configuration.get(AutoScalerOptions.CATCH_UP_DURATION)).toSeconds();
            if (!map.containsKey(ScalingMetric.SOURCE_DATA_RATE)) {
                throw new RuntimeException("Cannot evaluate metrics without source target rate information");
            }
            map2.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(map.get(ScalingMetric.SOURCE_DATA_RATE).doubleValue(), getAverage(ScalingMetric.SOURCE_DATA_RATE, jobVertexID, sortedMap)));
            double doubleValue = seconds == 0.0d ? 0.0d : map.getOrDefault(ScalingMetric.LAG, Double.valueOf(0.0d)).doubleValue() / seconds;
            if (doubleValue > 0.0d) {
                LOG.debug("Extra backlog processing input rate for {} is {}", jobVertexID, Double.valueOf(doubleValue));
            }
            map2.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(doubleValue));
            return;
        }
        double d = 0.0d;
        double d2 = 0.0d;
        double d3 = 0.0d;
        for (JobVertexID jobVertexID2 : (Set) jobTopology.getInputs().get(jobVertexID)) {
            Map<ScalingMetric, EvaluatedScalingMetric> map3 = hashMap.get(jobVertexID2);
            EvaluatedScalingMetric evaluatedScalingMetric = map3.get(ScalingMetric.TARGET_DATA_RATE);
            double averageOutputRatio = getAverageOutputRatio(new Edge(jobVertexID2, jobVertexID), sortedMap);
            d += evaluatedScalingMetric.getCurrent() * averageOutputRatio;
            d2 += evaluatedScalingMetric.getAverage() * averageOutputRatio;
            d3 += map3.get(ScalingMetric.CATCH_UP_DATA_RATE).getCurrent() * averageOutputRatio;
        }
        map2.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(d, d2));
        map2.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(d3));
    }

    private static double getAverage(ScalingMetric scalingMetric, JobVertexID jobVertexID, SortedMap<Instant, CollectedMetrics> sortedMap) {
        double[] array = sortedMap.values().stream().map(collectedMetrics -> {
            return collectedMetrics.getVertexMetrics().get(jobVertexID);
        }).filter(map -> {
            return map.containsKey(scalingMetric);
        }).mapToDouble(map2 -> {
            return ((Double) map2.get(scalingMetric)).doubleValue();
        }).filter(d -> {
            return !Double.isNaN(d);
        }).toArray();
        for (double d2 : array) {
            if (Double.isInfinite(d2)) {
                return d2;
            }
        }
        return StatUtils.mean(array);
    }

    private static double getAverageOutputRatio(Edge edge, SortedMap<Instant, CollectedMetrics> sortedMap) {
        double[] array = sortedMap.values().stream().map(collectedMetrics -> {
            return collectedMetrics.getOutputRatios();
        }).filter(map -> {
            return map.containsKey(edge);
        }).mapToDouble(map2 -> {
            return ((Double) map2.get(edge)).doubleValue();
        }).filter(d -> {
            return !Double.isNaN(d);
        }).toArray();
        for (double d2 : array) {
            if (Double.isInfinite(d2)) {
                return d2;
            }
        }
        return StatUtils.mean(array);
    }
}
