package org.apache.flink.kubernetes.operator.autoscaler;

import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.Edge;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import org.apache.flink.shaded.guava30.com.google.common.collect.UnmodifiableIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.class */
public abstract class ScalingMetricCollector {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class);
    private final Map<ResourceID, Map<JobVertexID, Map<String, FlinkMetric>>> availableVertexMetricNames = new ConcurrentHashMap();
    private final Map<ResourceID, SortedMap<Instant, CollectedMetrics>> histories = new ConcurrentHashMap();
    private Clock clock = Clock.systemDefaultZone();

    public CollectedMetricHistory updateMetrics(AbstractFlinkResource<?, ?> abstractFlinkResource, AutoScalerInfo autoScalerInfo, FlinkService flinkService, Configuration configuration) throws Exception {
        ResourceID fromResource = ResourceID.fromResource(abstractFlinkResource);
        JobID fromHexString = JobID.fromHexString(((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus().getJobId());
        Instant instant = this.clock.instant();
        SortedMap<Instant, CollectedMetrics> computeIfAbsent = this.histories.computeIfAbsent(fromResource, resourceID -> {
            return autoScalerInfo.getMetricHistory();
        });
        Instant firstKey = computeIfAbsent.isEmpty() ? instant : computeIfAbsent.firstKey();
        JobDetailsInfo jobDetailsInfo = flinkService.getJobDetailsInfo(fromHexString, configuration);
        Instant jobUpdateTs = getJobUpdateTs(jobDetailsInfo);
        if (jobUpdateTs.isAfter(firstKey)) {
            LOG.info("Job updated at {}. Clearing metrics.", jobUpdateTs);
            autoScalerInfo.clearMetricHistory();
            cleanup(abstractFlinkResource);
            computeIfAbsent.clear();
            firstKey = instant;
        }
        JobTopology jobTopology = getJobTopology(flinkService, configuration, autoScalerInfo, jobDetailsInfo);
        Duration metricWindowSize = getMetricWindowSize(configuration);
        computeIfAbsent.headMap(instant.minus((TemporalAmount) metricWindowSize)).clear();
        Instant plus = jobUpdateTs.plus((TemporalAmount) configuration.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        if (instant.isBefore(plus)) {
            LOG.info("Skipping metric collection during stabilization period until {}", plus);
            return new CollectedMetricHistory(jobTopology, Collections.emptySortedMap());
        }
        computeIfAbsent.put(instant, convertToScalingMetrics(fromResource, queryAllAggregatedMetrics(abstractFlinkResource, flinkService, configuration, queryFilteredMetricNames(flinkService, abstractFlinkResource, configuration, jobTopology)), jobTopology, configuration));
        autoScalerInfo.updateMetricHistory(computeIfAbsent);
        CollectedMetricHistory collectedMetricHistory = new CollectedMetricHistory(jobTopology, computeIfAbsent);
        Instant plus2 = firstKey.plus((TemporalAmount) metricWindowSize);
        collectedMetricHistory.setFullyCollected(!instant.isBefore(plus2));
        if (!collectedMetricHistory.isFullyCollected()) {
            LOG.info("Metric window not full until {}", plus2);
        }
        return collectedMetricHistory;
    }

    protected Duration getMetricWindowSize(Configuration configuration) {
        return (Duration) configuration.get(AutoScalerOptions.METRICS_WINDOW);
    }

    @VisibleForTesting
    protected Instant getJobUpdateTs(JobDetailsInfo jobDetailsInfo) {
        return Instant.ofEpochMilli(((Long) jobDetailsInfo.getTimestamps().values().stream().max((v0, v1) -> {
            return Long.compare(v0, v1);
        }).get()).longValue());
    }

    protected JobTopology getJobTopology(FlinkService flinkService, Configuration configuration, AutoScalerInfo autoScalerInfo, JobDetailsInfo jobDetailsInfo) throws Exception {
        JobTopology jobTopology = getJobTopology(jobDetailsInfo);
        autoScalerInfo.updateVertexList(jobTopology.getVerticesInTopologicalOrder(), this.clock.instant(), configuration);
        updateKafkaSourceMaxParallelisms(flinkService, configuration, jobDetailsInfo.getJobId(), jobTopology);
        AutoScalerUtils.excludeVerticesFromScaling(configuration, jobTopology.getFinishedVertices());
        return jobTopology;
    }

    @VisibleForTesting
    protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
        Map map = (Map) jobDetailsInfo.getJobVertexInfos().stream().collect(Collectors.toMap((v0) -> {
            return v0.getJobVertexID();
        }, (v0) -> {
            return v0.getMaxParallelism();
        }));
        String jsonPlan = jobDetailsInfo.getJsonPlan();
        return JobTopology.fromJsonPlan(jsonPlan.substring("RawJson{json='".length(), jsonPlan.length() - "'}".length()), map, (Set) jobDetailsInfo.getJobVertexInfos().stream().filter(jobVertexDetailsInfo -> {
            return jobVertexDetailsInfo.getExecutionState() == ExecutionState.FINISHED;
        }).map((v0) -> {
            return v0.getJobVertexID();
        }).collect(Collectors.toSet()));
    }

    private void updateKafkaSourceMaxParallelisms(FlinkService flinkService, Configuration configuration, JobID jobID, JobTopology jobTopology) throws Exception {
        RestClusterClient<?> clusterClient = flinkService.getClusterClient(configuration);
        try {
            Pattern compile = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
            UnmodifiableIterator it = jobTopology.getInputs().entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (((Set) entry.getValue()).isEmpty()) {
                    JobVertexID jobVertexID = (JobVertexID) entry.getKey();
                    long count = queryAggregatedMetricNames(clusterClient, jobID, jobVertexID).stream().map((v0) -> {
                        return v0.getId();
                    }).filter(compile.asMatchPredicate()).count();
                    if (count > 0) {
                        LOG.debug("Updating source {} max parallelism based on available partitions to {}", jobVertexID, Long.valueOf(count));
                        jobTopology.updateMaxParallelism(jobVertexID, (int) count);
                    }
                }
            }
            if (clusterClient != null) {
                clusterClient.close();
            }
        } catch (Throwable th) {
            if (clusterClient != null) {
                try {
                    clusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private CollectedMetrics convertToScalingMetrics(ResourceID resourceID, Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> map, JobTopology jobTopology, Configuration configuration) {
        HashMap hashMap = new HashMap();
        Set<JobVertexID> finishedVertices = jobTopology.getFinishedVertices();
        if (!finishedVertices.isEmpty()) {
            map = new HashMap((Map<? extends JobVertexID, ? extends Map<FlinkMetric, AggregatedMetric>>) map);
            Iterator<JobVertexID> it = finishedVertices.iterator();
            while (it.hasNext()) {
                map.put(it.next(), FlinkMetric.FINISHED_METRICS);
            }
        }
        map.forEach((jobVertexID, map2) -> {
            LOG.debug("Calculating vertex scaling metrics for {} from {}", jobVertexID, map2);
            HashMap hashMap2 = new HashMap();
            hashMap.put(jobVertexID, hashMap2);
            if (jobTopology.isSource(jobVertexID)) {
                ScalingMetrics.computeLagMetrics(map2, hashMap2);
            }
            ScalingMetrics.computeLoadMetrics(jobVertexID, map2, hashMap2, configuration);
            ScalingMetrics.computeDataRateMetrics(jobVertexID, map2, hashMap2, jobTopology, computeLagGrowthRate(resourceID, jobVertexID, (Double) hashMap2.get(ScalingMetric.LAG)), configuration);
            hashMap2.entrySet().forEach(entry -> {
                entry.setValue(Double.valueOf(ScalingMetrics.roundMetric(((Double) entry.getValue()).doubleValue())));
            });
            LOG.debug("Vertex scaling metrics for {}: {}", jobVertexID, hashMap2);
        });
        Map<Edge, Double> computeOutputRatios = ScalingMetrics.computeOutputRatios(map, jobTopology);
        LOG.debug("Output ratios: {}", computeOutputRatios);
        return new CollectedMetrics(hashMap, computeOutputRatios);
    }

    private double computeLagGrowthRate(ResourceID resourceID, JobVertexID jobVertexID, Double d) {
        Double d2;
        SortedMap<Instant, CollectedMetrics> sortedMap = this.histories.get(resourceID);
        if (sortedMap == null || sortedMap.isEmpty()) {
            return Double.NaN;
        }
        Map<ScalingMetric, Double> map = sortedMap.get(sortedMap.lastKey()).getVertexMetrics().get(jobVertexID);
        if (map == null || (d2 = map.get(ScalingMetric.LAG)) == null || d == null) {
            return Double.NaN;
        }
        return (d.doubleValue() - d2.doubleValue()) / Duration.between(r0, this.clock.instant()).toSeconds();
    }

    protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(FlinkService flinkService, AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration, JobTopology jobTopology) {
        JobID fromHexString = JobID.fromHexString(((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus().getJobId());
        List<JobVertexID> verticesInTopologicalOrder = jobTopology.getVerticesInTopologicalOrder();
        Map<JobVertexID, Map<String, FlinkMetric>> compute = this.availableVertexMetricNames.compute(ResourceID.fromResource(abstractFlinkResource), (resourceID, map) -> {
            if (map != null && map.keySet().equals(jobTopology.getParallelisms().keySet())) {
                return map;
            }
            try {
                RestClusterClient clusterClient = flinkService.getClusterClient(configuration);
                try {
                    Map map = (Map) verticesInTopologicalOrder.stream().filter(jobVertexID -> {
                        return !jobTopology.getFinishedVertices().contains(jobVertexID);
                    }).collect(Collectors.toMap(jobVertexID2 -> {
                        return jobVertexID2;
                    }, jobVertexID3 -> {
                        return getFilteredVertexMetricNames(clusterClient, fromHexString, jobVertexID3, jobTopology);
                    }));
                    if (clusterClient != null) {
                        clusterClient.close();
                    }
                    return map;
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        compute.keySet().removeAll(jobTopology.getFinishedVertices());
        return compute;
    }

    protected Map<String, FlinkMetric> getFilteredVertexMetricNames(RestClusterClient<?> restClusterClient, JobID jobID, JobVertexID jobVertexID, JobTopology jobTopology) {
        Collection<AggregatedMetric> queryAggregatedMetricNames = queryAggregatedMetricNames(restClusterClient, jobID, jobVertexID);
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        hashSet.add(FlinkMetric.BUSY_TIME_PER_SEC);
        if (jobTopology.isSource(jobVertexID)) {
            hashSet.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
            List<String> findAll = FlinkMetric.PENDING_RECORDS.findAll(queryAggregatedMetricNames);
            if (findAll.isEmpty()) {
                LOG.warn("pendingRecords metric for {} could not be found. Either a legacy source or an idle source. Assuming no pending records.", jobVertexID);
            }
            findAll.forEach(str -> {
                hashMap.put(str, FlinkMetric.PENDING_RECORDS);
            });
            FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC.findAny(queryAggregatedMetricNames).ifPresent(str2 -> {
                hashMap.put(str2, FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
            });
        } else {
            hashSet.add(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
        }
        if (!((Set) jobTopology.getOutputs().get(jobVertexID)).isEmpty()) {
            hashSet.add(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            FlinkMetric flinkMetric = (FlinkMetric) it.next();
            Optional<String> findAny = flinkMetric.findAny(queryAggregatedMetricNames);
            if (!findAny.isPresent()) {
                throw new RuntimeException("Could not find required metric " + flinkMetric + " for " + jobVertexID);
            }
            hashMap.put(findAny.get(), flinkMetric);
        }
        return hashMap;
    }

    @VisibleForTesting
    protected Collection<AggregatedMetric> queryAggregatedMetricNames(RestClusterClient<?> restClusterClient, JobID jobID, JobVertexID jobVertexID) throws Exception {
        AggregatedSubtaskMetricsParameters aggregatedSubtaskMetricsParameters = new AggregatedSubtaskMetricsParameters();
        Iterator it = aggregatedSubtaskMetricsParameters.getPathParameters().iterator();
        ((JobIDPathParameter) it.next()).resolve(jobID);
        ((JobVertexIdPathParameter) it.next()).resolve(jobVertexID);
        return ((AggregatedMetricsResponseBody) restClusterClient.sendRequest(AggregatedSubtaskMetricsHeaders.getInstance(), aggregatedSubtaskMetricsParameters, EmptyRequestBody.getInstance()).get()).getMetrics();
    }

    protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(AbstractFlinkResource<?, ?> abstractFlinkResource, FlinkService flinkService, Configuration configuration, Map<JobVertexID, Map<String, FlinkMetric>> map);

    public void cleanup(AbstractFlinkResource<?, ?> abstractFlinkResource) {
        ResourceID fromResource = ResourceID.fromResource(abstractFlinkResource);
        this.histories.remove(fromResource);
        this.availableVertexMetricNames.remove(fromResource);
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }

    @VisibleForTesting
    protected Map<ResourceID, Map<JobVertexID, Map<String, FlinkMetric>>> getAvailableVertexMetricNames() {
        return this.availableVertexMetricNames;
    }

    @VisibleForTesting
    protected Map<ResourceID, SortedMap<Instant, CollectedMetrics>> getHistories() {
        return this.histories;
    }
}
