package org.apache.flink.kubernetes.operator;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
import org.apache.flink.util.SerializedThrowable;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/TestingFlinkService.class */
public class TestingFlinkService extends AbstractFlinkService {
    public static final Map<String, String> CLUSTER_INFO = Map.of("flink-version", "15.0.0", "flink-revision", "1234567 @ 1970-01-01T00:00:00+00:00");
    private int savepointCounter;
    private int triggerCounter;
    private final List<Tuple3<String, JobStatusMessage, Configuration>> jobs;
    private final Map<JobID, String> jobErrors;
    private final Set<String> sessions;
    private boolean isPortReady;
    private boolean haDataAvailable;
    private boolean jobManagerReady;
    private boolean deployFailure;
    private Runnable sessionJobSubmittedCallback;
    private PodList podList;
    private Consumer<Configuration> listJobConsumer;
    private final List<String> disposedSavepoints;
    private final Map<String, Boolean> savepointTriggers;
    private int desiredReplicas;
    private int cancelJobCallCount;
    private Map<String, String> metricsValues;

    /* loaded from: input_file:org/apache/flink/kubernetes/operator/TestingFlinkService$SubmittedJobInfo.class */
    public static class SubmittedJobInfo {
        public final String savepointPath;
        public final JobStatusMessage jobStatusMessage;
        public final Configuration effectiveConfig;

        public SubmittedJobInfo(String str, JobStatusMessage jobStatusMessage, Configuration configuration) {
            this.savepointPath = str;
            this.jobStatusMessage = jobStatusMessage;
            this.effectiveConfig = configuration;
        }
    }

    public TestingFlinkService() {
        super((KubernetesClient) null, new FlinkConfigManager(new Configuration()));
        this.savepointCounter = 0;
        this.triggerCounter = 0;
        this.jobs = new ArrayList();
        this.jobErrors = new HashMap();
        this.sessions = new HashSet();
        this.isPortReady = true;
        this.haDataAvailable = true;
        this.jobManagerReady = true;
        this.deployFailure = false;
        this.podList = new PodList();
        this.listJobConsumer = configuration -> {
        };
        this.disposedSavepoints = new ArrayList();
        this.savepointTriggers = new HashMap();
        this.desiredReplicas = 0;
        this.cancelJobCallCount = 0;
        this.metricsValues = new HashMap();
    }

    public TestingFlinkService(KubernetesClient kubernetesClient) {
        super(kubernetesClient, new FlinkConfigManager(new Configuration()));
        this.savepointCounter = 0;
        this.triggerCounter = 0;
        this.jobs = new ArrayList();
        this.jobErrors = new HashMap();
        this.sessions = new HashSet();
        this.isPortReady = true;
        this.haDataAvailable = true;
        this.jobManagerReady = true;
        this.deployFailure = false;
        this.podList = new PodList();
        this.listJobConsumer = configuration -> {
        };
        this.disposedSavepoints = new ArrayList();
        this.savepointTriggers = new HashMap();
        this.desiredReplicas = 0;
        this.cancelJobCallCount = 0;
        this.metricsValues = new HashMap();
    }

    public <T extends HasMetadata> Context<T> getContext() {
        return new TestUtils.TestingContext<T>() { // from class: org.apache.flink.kubernetes.operator.TestingFlinkService.1
            @Override // org.apache.flink.kubernetes.operator.TestUtils.TestingContext
            public Optional<T> getSecondaryResource(Class cls, String str) {
                return (TestingFlinkService.this.jobs.isEmpty() && TestingFlinkService.this.sessions.isEmpty()) ? Optional.empty() : Optional.of(TestUtils.createDeployment(TestingFlinkService.this.jobManagerReady));
            }
        };
    }

    public void clear() {
        this.jobs.clear();
        this.sessions.clear();
        this.triggerCounter = 0;
        this.savepointCounter = 0;
    }

    public void clearJobsInTerminalState() {
        this.jobs.removeIf(tuple3 -> {
            return ((JobStatusMessage) tuple3.f1).getJobState().isTerminalState();
        });
    }

    public Set<String> getSessions() {
        return this.sessions;
    }

    public void submitApplicationCluster(JobSpec jobSpec, Configuration configuration, boolean z) throws Exception {
        if (z) {
            validateHaMetadataExists(configuration);
        }
        deployApplicationCluster(jobSpec, removeOperatorConfigs(configuration));
    }

    protected void deployApplicationCluster(JobSpec jobSpec, Configuration configuration) throws Exception {
        if (this.deployFailure) {
            throw new Exception("Deployment failure");
        }
        if (!this.jobs.isEmpty()) {
            throw new Exception("Cannot submit 2 application clusters at the same time");
        }
        JobID jobID = new JobID();
        if (configuration.contains(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)) {
            jobID = JobID.fromHexString((String) configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID));
        }
        this.jobs.add(Tuple3.of((String) configuration.get(SavepointConfigOptions.SAVEPOINT_PATH), new JobStatusMessage(jobID, configuration.getString(KubernetesConfigOptions.CLUSTER_ID), JobStatus.RUNNING, System.currentTimeMillis()), configuration));
    }

    protected void validateHaMetadataExists(Configuration configuration) {
        if (!isHaMetadataAvailable(configuration)) {
            throw new RecoveryFailureException("HA metadata not available to restore from last state. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "RestoreFailed");
        }
    }

    public boolean isHaMetadataAvailable(Configuration configuration) {
        return HighAvailabilityMode.isHighAvailabilityModeActivated(configuration) && this.haDataAvailable;
    }

    public void setHaDataAvailable(boolean z) {
        this.haDataAvailable = z;
    }

    public void setJobManagerReady(boolean z) {
        this.jobManagerReady = z;
    }

    public void setDeployFailure(boolean z) {
        this.deployFailure = z;
    }

    public void setSessionJobSubmittedCallback(Runnable runnable) {
        this.sessionJobSubmittedCallback = runnable;
    }

    public void submitSessionCluster(Configuration configuration) throws Exception {
        if (this.deployFailure) {
            throw new Exception("Deployment failure");
        }
        this.sessions.add((String) configuration.get(KubernetesConfigOptions.CLUSTER_ID));
    }

    public JobID submitJobToSessionCluster(ObjectMeta objectMeta, FlinkSessionJobSpec flinkSessionJobSpec, Configuration configuration, @Nullable String str) throws Exception {
        if (this.deployFailure) {
            throw new Exception("Deployment failure");
        }
        JobID generateSessionJobFixedJobID = FlinkUtils.generateSessionJobFixedJobID(objectMeta);
        this.jobs.add(Tuple3.of(str, new JobStatusMessage(generateSessionJobFixedJobID, configuration.getString(KubernetesConfigOptions.CLUSTER_ID), JobStatus.RUNNING, System.currentTimeMillis()), configuration));
        if (this.sessionJobSubmittedCallback != null) {
            this.sessionJobSubmittedCallback.run();
        }
        return generateSessionJobFixedJobID;
    }

    public Collection<JobStatusMessage> listJobs(Configuration configuration) throws Exception {
        if (this.isPortReady) {
            return super.listJobs(configuration);
        }
        throw new TimeoutException("JM port is unavailable");
    }

    public void setListJobConsumer(Consumer<Configuration> consumer) {
        this.listJobConsumer = consumer;
    }

    public List<Tuple3<String, JobStatusMessage, Configuration>> listJobs() {
        return this.jobs;
    }

    public long getRunningCount() {
        return this.jobs.stream().filter(tuple3 -> {
            return !((JobStatusMessage) tuple3.f1).getJobState().isTerminalState();
        }).count();
    }

    public void triggerSavepoint(String str, SavepointTriggerType savepointTriggerType, SavepointInfo savepointInfo, Configuration configuration) {
        int i = this.triggerCounter;
        this.triggerCounter = i + 1;
        String str2 = "trigger_" + i;
        savepointInfo.setTrigger(str2, savepointTriggerType, SavepointFormatType.valueOf(SavepointUtils.getSavepointFormatType(configuration).name()));
        this.savepointTriggers.put(str2, false);
    }

    public SavepointFetchResult fetchSavepointInfo(String str, String str2, Configuration configuration) {
        if (!this.savepointTriggers.containsKey(str)) {
            return SavepointFetchResult.error("Failed");
        }
        if (!this.savepointTriggers.get(str).booleanValue()) {
            this.savepointTriggers.put(str, true);
            return SavepointFetchResult.pending();
        }
        int i = this.savepointCounter;
        this.savepointCounter = i + 1;
        return SavepointFetchResult.completed("savepoint_" + i);
    }

    public ClusterClient<String> getClusterClient(Configuration configuration) throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(configuration);
        FlinkVersion flinkVersion = (FlinkVersion) configuration.get(FlinkConfigBuilder.FLINK_VERSION);
        testingClusterClient.setListJobsFunction(() -> {
            this.listJobConsumer.accept(configuration);
            if (this.jobs.isEmpty() && !this.sessions.isEmpty() && ((String) configuration.get(DeploymentOptions.TARGET)).equals(KubernetesDeploymentTarget.APPLICATION.getName())) {
                throw new RuntimeException("Trying to list a job without submitting it");
            }
            return CompletableFuture.completedFuture((Collection) this.jobs.stream().map(tuple3 -> {
                return (JobStatusMessage) tuple3.f1;
            }).collect(Collectors.toList()));
        });
        testingClusterClient.setStopWithSavepointFunction((jobID, bool, str) -> {
            try {
                return CompletableFuture.completedFuture(cancelJob(flinkVersion, jobID, true));
            } catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
        });
        testingClusterClient.setCancelFunction(jobID2 -> {
            try {
                cancelJob(flinkVersion, jobID2, false);
                return CompletableFuture.completedFuture(Acknowledge.get());
            } catch (Exception e) {
                return CompletableFuture.failedFuture(e);
            }
        });
        testingClusterClient.setRequestResultFunction(jobID3 -> {
            JobResult.Builder netRuntime = new JobResult.Builder().jobId(jobID3).netRuntime(1L);
            if (this.jobErrors.containsKey(jobID3)) {
                netRuntime.serializedThrowable(new SerializedThrowable(new RuntimeException(this.jobErrors.get(jobID3))));
            }
            return CompletableFuture.completedFuture(netRuntime.build());
        });
        testingClusterClient.setRequestProcessor((messageHeaders, messageParameters, requestBody) -> {
            return messageHeaders instanceof JobsOverviewHeaders ? CompletableFuture.completedFuture(getMultipleJobsDetails()) : CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
        });
        return testingClusterClient;
    }

    private MultipleJobsDetails getMultipleJobsDetails() {
        return new MultipleJobsDetails((Collection) this.jobs.stream().map(tuple3 -> {
            return (JobStatusMessage) tuple3.f1;
        }).map(TestingFlinkService::toJobDetails).collect(Collectors.toList()));
    }

    private static JobDetails toJobDetails(JobStatusMessage jobStatusMessage) {
        return new JobDetails(jobStatusMessage.getJobId(), jobStatusMessage.getJobName(), jobStatusMessage.getStartTime(), -1L, System.currentTimeMillis() - jobStatusMessage.getStartTime(), jobStatusMessage.getJobState(), System.currentTimeMillis(), new int[ExecutionState.values().length], 0);
    }

    public void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        cancelJob(flinkDeployment, upgradeMode, configuration, false);
    }

    private String cancelJob(FlinkVersion flinkVersion, JobID jobID, boolean z) throws Exception {
        String str;
        this.cancelJobCallCount++;
        Optional<Tuple3<String, JobStatusMessage, Configuration>> findAny = this.jobs.stream().filter(tuple3 -> {
            return ((JobStatusMessage) tuple3.f1).getJobId().equals(jobID);
        }).findAny();
        if (findAny.isEmpty()) {
            throw new Exception("Job not found");
        }
        if (z) {
            int i = this.savepointCounter;
            this.savepointCounter = i + 1;
            str = "savepoint_" + i;
        } else {
            str = null;
        }
        String str2 = str;
        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
            JobStatusMessage jobStatusMessage = (JobStatusMessage) findAny.get().f1;
            findAny.get().f1 = new JobStatusMessage(jobStatusMessage.getJobId(), jobStatusMessage.getJobName(), JobStatus.FINISHED, jobStatusMessage.getStartTime());
            findAny.get().f0 = str2;
        } else {
            this.jobs.removeIf(tuple32 -> {
                return ((JobStatusMessage) tuple32.f1).getJobId().equals(jobID);
            });
        }
        return str2;
    }

    protected void deleteClusterInternal(ObjectMeta objectMeta, Configuration configuration, boolean z) {
        this.jobs.clear();
        this.sessions.remove(objectMeta.getName());
    }

    public void waitForClusterShutdown(Configuration configuration) {
    }

    public void disposeSavepoint(String str, Configuration configuration) {
        this.disposedSavepoints.add(str);
    }

    public List<String> getDisposedSavepoints() {
        return this.disposedSavepoints;
    }

    public Optional<Savepoint> getLastCheckpoint(JobID jobID, Configuration configuration) throws Exception {
        Optional<Tuple3<String, JobStatusMessage, Configuration>> findAny = this.jobs.stream().filter(tuple3 -> {
            return ((JobStatusMessage) tuple3.f1).getJobId().equals(jobID);
        }).findAny();
        if (findAny.isEmpty()) {
            throw new Exception("Job not found");
        }
        Tuple3<String, JobStatusMessage, Configuration> tuple32 = findAny.get();
        if (((JobStatusMessage) tuple32.f1).getJobState().isGloballyTerminalState()) {
            return tuple32.f0 != null ? Optional.of(Savepoint.of((String) tuple32.f0, SavepointTriggerType.UNKNOWN)) : Optional.empty();
        }
        throw new Exception("Checkpoint should not be queried if job is not in terminal state");
    }

    public boolean isJobManagerPortReady(Configuration configuration) {
        return this.isPortReady;
    }

    public void setPortReady(boolean z) {
        this.isPortReady = z;
    }

    public PodList getJmPodList(FlinkDeployment flinkDeployment, Configuration configuration) {
        return this.podList;
    }

    protected PodList getJmPodList(String str, String str2) {
        return this.podList;
    }

    public void setJmPodList(PodList podList) {
        this.podList = podList;
    }

    public void markApplicationJobFailedWithError(JobID jobID, String str) throws Exception {
        Optional<Tuple3<String, JobStatusMessage, Configuration>> findFirst = this.jobs.stream().filter(tuple3 -> {
            return ((JobStatusMessage) tuple3.f1).getJobId().equals(jobID);
        }).findFirst();
        if (findFirst.isEmpty()) {
            throw new Exception("The target job missed");
        }
        JobStatusMessage jobStatusMessage = (JobStatusMessage) findFirst.get().f1;
        findFirst.get().f1 = new JobStatusMessage(jobStatusMessage.getJobId(), jobStatusMessage.getJobName(), JobStatus.FAILED, jobStatusMessage.getStartTime());
        this.jobErrors.put(jobID, str);
    }

    public Map<String, String> getClusterInfo(Configuration configuration) {
        return CLUSTER_INFO;
    }

    public boolean scale(ObjectMeta objectMeta, JobSpec jobSpec, Configuration configuration) {
        if (configuration.get(JobManagerOptions.SCHEDULER_MODE) != SchedulerExecutionMode.REACTIVE && jobSpec != null) {
            return false;
        }
        this.desiredReplicas = ((Integer) configuration.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS)).intValue();
        return true;
    }

    public int getDesiredReplicas() {
        return this.desiredReplicas;
    }

    public void setMetricValue(String str, String str2) {
        this.metricsValues.put(str, str2);
    }

    public Map<String, String> getMetrics(Configuration configuration, String str, List<String> list) {
        return this.metricsValues;
    }

    public int getCancelJobCallCount() {
        return this.cancelJobCallCount;
    }
}
