package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.class */
public class ApplicationReconcilerUpgradeModeTest extends OperatorTestBase {
    private KubernetesClient kubernetesClient;
    private TestReconcilerAdapter<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> reconciler;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerUpgradeModeTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode = new int[UpgradeMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[UpgradeMode.STATELESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[UpgradeMode.SAVEPOINT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[UpgradeMode.LAST_STATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        this.reconciler = new TestReconcilerAdapter<>(this, new ApplicationReconciler(this.kubernetesClient, this.eventRecorder, this.statusRecorder, new NoopJobAutoscalerFactory()));
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromStatelessToStateless(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToStateless(flinkVersion, UpgradeMode.STATELESS);
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromSavepointToStateless(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToStateless(flinkVersion, UpgradeMode.SAVEPOINT);
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromLastStateToStateless(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToStateless(flinkVersion, UpgradeMode.LAST_STATE);
    }

    private void testUpgradeToStateless(FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = buildApplicationCluster(flinkVersion, upgradeMode);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkDeployment cloneDeploymentWithUpgradeMode = cloneDeploymentWithUpgradeMode(buildApplicationCluster, UpgradeMode.STATELESS);
        this.reconciler.reconcile(cloneDeploymentWithUpgradeMode, this.context);
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(cloneDeploymentWithUpgradeMode, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertNull(listJobs.get(0).f0);
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromStatelessToSavepoint(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToSavepoint(flinkVersion, UpgradeMode.STATELESS);
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromSavepointToSavepoint(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToSavepoint(flinkVersion, UpgradeMode.SAVEPOINT);
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromLastStateToSavepoint(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToSavepoint(flinkVersion, UpgradeMode.LAST_STATE);
    }

    private void testUpgradeToSavepoint(FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = buildApplicationCluster(flinkVersion, upgradeMode);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkDeployment cloneDeploymentWithUpgradeMode = cloneDeploymentWithUpgradeMode(buildApplicationCluster, UpgradeMode.SAVEPOINT);
        ((FlinkDeploymentSpec) cloneDeploymentWithUpgradeMode.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "test-savepoint-dir");
        this.reconciler.reconcile(cloneDeploymentWithUpgradeMode, this.context);
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(cloneDeploymentWithUpgradeMode, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals("savepoint_0", listJobs.get(0).f0);
        Assertions.assertEquals(SavepointTriggerType.UPGRADE, ((FlinkDeploymentStatus) cloneDeploymentWithUpgradeMode.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerType());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromStatelessToLastState(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToLastState(flinkVersion, UpgradeMode.STATELESS);
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromSavepointToLastState(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToLastState(flinkVersion, UpgradeMode.SAVEPOINT);
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeFromLastStateToLastState(FlinkVersion flinkVersion) throws Exception {
        testUpgradeToLastState(flinkVersion, UpgradeMode.LAST_STATE);
    }

    private void testUpgradeToLastState(FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = buildApplicationCluster(flinkVersion, upgradeMode);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, listJobs);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(100L);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().setLastStableSpec(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec());
        this.flinkService.setHaDataAvailable(false);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState("RECONCILING");
        Assertions.assertThrows(RecoveryFailureException.class, () -> {
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
            this.reconciler.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        });
        Assertions.assertThrows(RecoveryFailureException.class, () -> {
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
            this.reconciler.reconcile(buildApplicationCluster, this.context);
        });
        this.flinkService.clear();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(200L);
        this.flinkService.setHaDataAvailable(false);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setLastSavepoint(Savepoint.of("finished_sp", SavepointTriggerType.UPGRADE));
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState("FINISHED");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "test-savepoint-dir");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals("finished_sp", listJobs.get(0).f0);
    }

    private FlinkDeployment cloneDeploymentWithUpgradeMode(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode) {
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setUpgradeMode(upgradeMode);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getFlinkConfiguration().put("new", "conf");
        return flinkDeployment2;
    }

    @MethodSource({"testUpgradeJmDeployCannotStartParams"})
    @ParameterizedTest
    public void testUpgradeJmDeployCannotStart(UpgradeMode upgradeMode, UpgradeMode upgradeMode2) throws Exception {
        this.flinkService.setHaDataAvailable(true);
        this.flinkService.setJobManagerReady(true);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        JobSpec job = ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob();
        job.setUpgradeMode(upgradeMode);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        job.setState(JobState.SUSPENDED);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentSpec deserializeLastReconciledSpec = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        Assertions.assertEquals(JobState.SUSPENDED, deserializeLastReconciledSpec.getJob().getState());
        Assertions.assertEquals(upgradeMode, deserializeLastReconciledSpec.getJob().getUpgradeMode());
        job.setState(JobState.RUNNING);
        job.setUpgradeMode(upgradeMode2);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentSpec deserializeLastReconciledSpec2 = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        Assertions.assertEquals(JobState.RUNNING, deserializeLastReconciledSpec2.getJob().getState());
        Assertions.assertEquals(upgradeMode2 == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : upgradeMode, deserializeLastReconciledSpec2.getJob().getUpgradeMode());
        this.flinkService.setJobManagerReady(false);
        this.flinkService.setHaDataAvailable(false);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        job.setState(JobState.RUNNING);
        job.setEntryClass("newClass");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentSpec deserializeLastReconciledSpec3 = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        if (upgradeMode == UpgradeMode.LAST_STATE && upgradeMode2 != UpgradeMode.STATELESS) {
            Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
            Assertions.assertEquals(JobState.RUNNING, deserializeLastReconciledSpec3.getJob().getState());
            return;
        }
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobState.SUSPENDED, deserializeLastReconciledSpec3.getJob().getState());
        Assertions.assertEquals(upgradeMode2 == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.SAVEPOINT, deserializeLastReconciledSpec3.getJob().getUpgradeMode());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        if (upgradeMode == UpgradeMode.STATELESS || upgradeMode2 == UpgradeMode.STATELESS) {
            Assertions.assertNull(this.flinkService.listJobs().get(0).f0);
        } else {
            Assertions.assertEquals("savepoint_0", this.flinkService.listJobs().get(0).f0);
        }
    }

    @MethodSource({"testInitialJmDeployCannotStartParams"})
    @ParameterizedTest
    public void testInitialJmDeployCannotStart(UpgradeMode upgradeMode, boolean z) throws Exception {
        this.flinkService.setHaDataAvailable(false);
        this.flinkService.setJobManagerReady(false);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        if (z) {
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setInitialSavepointPath("init-sp");
        }
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        FlinkDeploymentSpec deserializeLastReconciledSpec = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        if (z) {
            Assertions.assertEquals("init-sp", this.flinkService.listJobs().get(0).f0);
            Assertions.assertEquals("init-sp", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
            Assertions.assertEquals(UpgradeMode.SAVEPOINT, deserializeLastReconciledSpec.getJob().getUpgradeMode());
        } else {
            Assertions.assertNull(this.flinkService.listJobs().get(0).f0);
            Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint());
            Assertions.assertEquals(UpgradeMode.STATELESS, deserializeLastReconciledSpec.getJob().getUpgradeMode());
        }
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(upgradeMode);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setImage("new-image-1");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals(upgradeMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.SAVEPOINT, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getUpgradeMode());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentSpec deserializeLastReconciledSpec2 = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        Assertions.assertEquals("new-image-1", deserializeLastReconciledSpec2.getImage());
        Assertions.assertEquals(upgradeMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.SAVEPOINT, deserializeLastReconciledSpec2.getJob().getUpgradeMode());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        Assertions.assertEquals((!z || upgradeMode == UpgradeMode.STATELESS) ? null : "init-sp", this.flinkService.listJobs().get(0).f0);
    }

    @Test
    public void testLastStateMaxCheckpointAge() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        long currentTimeMillis = System.currentTimeMillis();
        jobStatus.setState("RUNNING");
        jobStatus.setStartTime(Long.toString(currentTimeMillis));
        jobStatus.setJobId(new JobID().toString());
        ApplicationReconciler reconciler = this.reconciler.getReconciler();
        FlinkResourceContext resourceContext = getResourceContext(buildApplicationCluster);
        Configuration deployConfig = resourceContext.getDeployConfig((AbstractFlinkSpec) buildApplicationCluster.getSpec());
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
        deployConfig.set(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE, Duration.ofMinutes(1L));
        this.flinkService.setCheckpointInfo(Tuple2.of(Optional.empty(), Optional.empty()));
        jobStatus.setStartTime(Long.toString(currentTimeMillis));
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
        jobStatus.setStartTime(Long.toString(currentTimeMillis - 61000));
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
        this.flinkService.setCheckpointInfo(Tuple2.of(Optional.empty(), Optional.of(new CheckpointHistoryWrapper.PendingCheckpointInfo(0L, currentTimeMillis - 30000))));
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.pendingUpgrade(), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
        this.flinkService.setCheckpointInfo(Tuple2.of(Optional.empty(), Optional.of(new CheckpointHistoryWrapper.PendingCheckpointInfo(0L, currentTimeMillis - 61000))));
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
        jobStatus.setStartTime(Long.toString(currentTimeMillis - 30000));
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
        jobStatus.setStartTime(Long.toString(currentTimeMillis - 61000));
        this.flinkService.setCheckpointInfo(Tuple2.of(Optional.of(new CheckpointHistoryWrapper.CompletedCheckpointInfo(0L, "s", currentTimeMillis - 30000)), Optional.of(new CheckpointHistoryWrapper.PendingCheckpointInfo(0L, currentTimeMillis - 61000))));
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
        jobStatus.setStartTime(Long.toString(currentTimeMillis - 61000));
        this.flinkService.setCheckpointInfo(Tuple2.of(Optional.of(new CheckpointHistoryWrapper.CompletedCheckpointInfo(0L, "s", currentTimeMillis - 61000)), Optional.of(new CheckpointHistoryWrapper.PendingCheckpointInfo(0L, currentTimeMillis - 61000))));
        Assertions.assertEquals(AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT), reconciler.getAvailableUpgradeMode(resourceContext, deployConfig));
    }

    private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{UpgradeMode.LAST_STATE, true}), Arguments.of(new Object[]{UpgradeMode.LAST_STATE, false}), Arguments.of(new Object[]{UpgradeMode.SAVEPOINT, true}), Arguments.of(new Object[]{UpgradeMode.SAVEPOINT, false}), Arguments.of(new Object[]{UpgradeMode.STATELESS, true}), Arguments.of(new Object[]{UpgradeMode.STATELESS, false})});
    }

    private static Stream<Arguments> testUpgradeJmDeployCannotStartParams() {
        ArrayList arrayList = new ArrayList();
        for (UpgradeMode upgradeMode : UpgradeMode.values()) {
            for (UpgradeMode upgradeMode2 : UpgradeMode.values()) {
                arrayList.add(Arguments.of(new Object[]{upgradeMode, upgradeMode2}));
            }
        }
        return arrayList.stream();
    }

    @Test
    public void testLastStateOnDeletedDeployment() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        this.flinkService.deleteClusterDeployment(buildApplicationCluster.getMetadata(), (FlinkDeploymentStatus) buildApplicationCluster.getStatus(), Configuration.fromMap(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration()), false);
        this.flinkService.setHaDataAvailable(true);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(123L);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentSpec deserializeLastReconciledSpec = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        Assertions.assertEquals(UpgradeMode.LAST_STATE, deserializeLastReconciledSpec.getJob().getUpgradeMode());
        Assertions.assertEquals(JobState.SUSPENDED, deserializeLastReconciledSpec.getJob().getState());
    }

    @Test
    public void testUpgradeModeChangeFromSavepointToLastState() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setImage("new-image-1");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.FINISHED.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals("savepoint_0", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.RUNNING);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setImage("new-image-2");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals("savepoint_0", listJobs.get(0).f0);
    }

    @Test
    public void testUpgradeModeChangedToLastStateShouldTriggerSavepointWhileHADisabled() throws Exception {
        this.flinkService.setHaDataAvailable(false);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setImage("new-image-1");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertNull(this.flinkService.listJobs().get(0).f0);
        Assertions.assertNotEquals("new-image-1", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getImage());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("new-image-1", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getImage());
        Assertions.assertEquals("savepoint_0", this.flinkService.listJobs().get(0).f0);
    }

    @Test
    public void testUpgradeModeChangedToLastStateShouldNotTriggerSavepointWhileHAEnabled() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertNotEquals(UpgradeMode.LAST_STATE, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getUpgradeMode());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setImage("new-image-1");
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("new-image-1", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getImage());
        Assertions.assertNull(this.flinkService.listJobs().get(0).f0);
    }

    public static FlinkDeployment buildApplicationCluster(FlinkVersion flinkVersion, UpgradeMode upgradeMode) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(upgradeMode);
        Map flinkConfiguration = ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$kubernetes$operator$api$spec$UpgradeMode[upgradeMode.ordinal()]) {
            case 1:
                flinkConfiguration.remove(HighAvailabilityOptions.HA_MODE.key());
                flinkConfiguration.remove(HighAvailabilityOptions.HA_STORAGE_PATH.key());
                flinkConfiguration.remove(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
                flinkConfiguration.remove(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
                break;
            case 2:
                flinkConfiguration.remove(HighAvailabilityOptions.HA_MODE.key());
                flinkConfiguration.remove(HighAvailabilityOptions.HA_STORAGE_PATH.key());
                break;
            case 3:
                flinkConfiguration.remove(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
                break;
            default:
                throw new RuntimeException("Unsupported upgrade mode " + upgradeMode);
        }
        return buildApplicationCluster;
    }

    private void verifyAndSetRunningJobsToStatus(FlinkDeployment flinkDeployment, List<Tuple3<String, JobStatusMessage, Configuration>> list) {
        Assertions.assertEquals(1, list.size());
        Assertions.assertNull(list.get(0).f0);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobStatus(new JobStatus().toBuilder().jobId(((JobStatusMessage) list.get(0).f1).getJobId().toHexString()).jobName(((JobStatusMessage) list.get(0).f1).getJobName()).startTime(Long.toString(System.currentTimeMillis())).state("RUNNING").build());
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
