package org.apache.flink.kubernetes.operator.autoscaler;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.deployment.JobAutoScaler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.class */
public class JobAutoScalerImpl implements JobAutoScaler {
    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);
    private final KubernetesClient kubernetesClient;
    private final ScalingMetricCollector metricsCollector;
    private final ScalingMetricEvaluator evaluator;
    private final ScalingExecutor scalingExecutor;
    private final EventRecorder eventRecorder;

    @VisibleForTesting
    final Map<ResourceID, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>> lastEvaluatedMetrics = new ConcurrentHashMap();

    @VisibleForTesting
    final Map<ResourceID, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap();

    @VisibleForTesting
    final AutoscalerInfoManager infoManager;

    public JobAutoScalerImpl(KubernetesClient kubernetesClient, ScalingMetricCollector scalingMetricCollector, ScalingMetricEvaluator scalingMetricEvaluator, ScalingExecutor scalingExecutor, EventRecorder eventRecorder) {
        this.kubernetesClient = kubernetesClient;
        this.metricsCollector = scalingMetricCollector;
        this.evaluator = scalingMetricEvaluator;
        this.scalingExecutor = scalingExecutor;
        this.eventRecorder = eventRecorder;
        this.infoManager = new AutoscalerInfoManager(kubernetesClient);
    }

    public void cleanup(FlinkResourceContext<?> flinkResourceContext) {
        LOG.info("Cleaning up autoscaling meta data");
        AbstractFlinkResource<?, ?> resource = flinkResourceContext.getResource();
        this.metricsCollector.cleanup(resource);
        ResourceID fromResource = ResourceID.fromResource(resource);
        this.lastEvaluatedMetrics.remove(fromResource);
        this.flinkMetrics.remove(fromResource);
        this.infoManager.removeInfoFromCache(resource);
    }

    public Map<String, String> getParallelismOverrides(FlinkResourceContext<?> flinkResourceContext) {
        Configuration observeConfig = flinkResourceContext.getObserveConfig();
        try {
            Optional<AutoScalerInfo> info = this.infoManager.getInfo(flinkResourceContext.getResource());
            if (info.isPresent()) {
                AutoScalerInfo autoScalerInfo = info.get();
                if (observeConfig.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED) || autoScalerInfo.getCurrentOverrides().isEmpty()) {
                    return autoScalerInfo.getCurrentOverrides();
                }
                autoScalerInfo.removeCurrentOverrides();
                autoScalerInfo.replaceInKubernetes(this.kubernetesClient);
            }
        } catch (Exception e) {
            LOG.error("Error while getting parallelism overrides", e);
        }
        return Map.of();
    }

    public boolean scale(FlinkResourceContext<?> flinkResourceContext) {
        Configuration observeConfig = flinkResourceContext.getObserveConfig();
        AbstractFlinkResource<?, ?> resource = flinkResourceContext.getResource();
        ResourceID fromResource = ResourceID.fromResource(resource);
        AutoscalerFlinkMetrics orInitAutoscalerFlinkMetrics = getOrInitAutoscalerFlinkMetrics(flinkResourceContext, fromResource);
        try {
            try {
                if (((AbstractFlinkSpec) resource.getSpec()).getJob() == null || !observeConfig.getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
                    LOG.debug("Job autoscaler is disabled");
                    if (0 != 0) {
                        this.lastEvaluatedMetrics.put(fromResource, null);
                        orInitAutoscalerFlinkMetrics.registerScalingMetrics(() -> {
                            return this.lastEvaluatedMetrics.get(fromResource);
                        });
                    }
                    return false;
                }
                CommonStatus commonStatus = (CommonStatus) resource.getStatus();
                if (commonStatus.getLifecycleState() != ResourceLifecycleState.STABLE || !commonStatus.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
                    LOG.info("Job autoscaler is waiting for RUNNING job state");
                    this.lastEvaluatedMetrics.remove(fromResource);
                    if (0 != 0) {
                        this.lastEvaluatedMetrics.put(fromResource, null);
                        orInitAutoscalerFlinkMetrics.registerScalingMetrics(() -> {
                            return this.lastEvaluatedMetrics.get(fromResource);
                        });
                    }
                    return false;
                }
                AutoScalerInfo orCreateInfo = this.infoManager.getOrCreateInfo(resource);
                CollectedMetricHistory updateMetrics = this.metricsCollector.updateMetrics(resource, orCreateInfo, flinkResourceContext.getFlinkService(), observeConfig);
                if (updateMetrics.getMetricHistory().isEmpty()) {
                    orCreateInfo.replaceInKubernetes(this.kubernetesClient);
                    if (0 != 0) {
                        this.lastEvaluatedMetrics.put(fromResource, null);
                        orInitAutoscalerFlinkMetrics.registerScalingMetrics(() -> {
                            return this.lastEvaluatedMetrics.get(fromResource);
                        });
                    }
                    return false;
                }
                LOG.debug("Collected metrics: {}", updateMetrics);
                Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate = this.evaluator.evaluate(observeConfig, updateMetrics);
                LOG.debug("Evaluated metrics: {}", evaluate);
                initRecommendedParallelism(evaluate);
                if (!updateMetrics.isFullyCollected()) {
                    resetRecommendedParallelism(evaluate);
                    orCreateInfo.replaceInKubernetes(this.kubernetesClient);
                    if (evaluate != null) {
                        this.lastEvaluatedMetrics.put(fromResource, evaluate);
                        orInitAutoscalerFlinkMetrics.registerScalingMetrics(() -> {
                            return this.lastEvaluatedMetrics.get(fromResource);
                        });
                    }
                    return false;
                }
                boolean scaleResource = this.scalingExecutor.scaleResource(resource, orCreateInfo, observeConfig, evaluate);
                if (scaleResource) {
                    orInitAutoscalerFlinkMetrics.numScalings.inc();
                } else {
                    orInitAutoscalerFlinkMetrics.numBalanced.inc();
                }
                orCreateInfo.replaceInKubernetes(this.kubernetesClient);
                if (evaluate != null) {
                    this.lastEvaluatedMetrics.put(fromResource, evaluate);
                    orInitAutoscalerFlinkMetrics.registerScalingMetrics(() -> {
                        return this.lastEvaluatedMetrics.get(fromResource);
                    });
                }
                return scaleResource;
            } catch (Throwable th) {
                LOG.error("Error while scaling resource", th);
                orInitAutoscalerFlinkMetrics.numErrors.inc();
                this.eventRecorder.triggerEvent(resource, EventRecorder.Type.Warning, EventRecorder.Reason.AutoscalerError, EventRecorder.Component.Operator, th.getMessage());
                if (0 != 0) {
                    this.lastEvaluatedMetrics.put(fromResource, null);
                    orInitAutoscalerFlinkMetrics.registerScalingMetrics(() -> {
                        return this.lastEvaluatedMetrics.get(fromResource);
                    });
                }
                return false;
            }
        } catch (Throwable th2) {
            if (0 != 0) {
                this.lastEvaluatedMetrics.put(fromResource, null);
                orInitAutoscalerFlinkMetrics.registerScalingMetrics(() -> {
                    return this.lastEvaluatedMetrics.get(fromResource);
                });
            }
            throw th2;
        }
    }

    private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(FlinkResourceContext<? extends AbstractFlinkResource<?, ?>> flinkResourceContext, ResourceID resourceID) {
        return this.flinkMetrics.computeIfAbsent(resourceID, resourceID2 -> {
            return new AutoscalerFlinkMetrics(flinkResourceContext.getResourceMetricGroup().addGroup("AutoScaler"));
        });
    }

    private void initRecommendedParallelism(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map) {
        map.forEach((jobVertexID, map2) -> {
            map2.put(ScalingMetric.RECOMMENDED_PARALLELISM, (EvaluatedScalingMetric) map2.get(ScalingMetric.PARALLELISM));
        });
    }

    private void resetRecommendedParallelism(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map) {
        map.forEach((jobVertexID, map2) -> {
            map2.put(ScalingMetric.RECOMMENDED_PARALLELISM, null);
        });
    }
}
