package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.class */
public class SessionJobReconcilerTest extends OperatorTestBase {
    private KubernetesClient kubernetesClient;
    private TestReconcilerAdapter<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> reconciler;

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        Configuration configuration = new Configuration();
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED, true);
        this.configManager = new FlinkConfigManager(configuration);
        this.reconciler = new TestReconcilerAdapter<>(this, new SessionJobReconciler(this.kubernetesClient, this.eventRecorder, this.statusRecorder, this.configManager));
    }

    @Test
    public void testSubmitAndCleanUp() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, TestUtils.createEmptyContext());
        Assertions.assertEquals(0, this.flinkService.listJobs().size());
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithNotReadyFlinkDeployment());
        Assertions.assertEquals(0, this.flinkService.listJobs().size());
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        this.reconciler.cleanup(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(JobStatus.FINISHED, ((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobState());
    }

    @Test
    public void testCancelJobRescheduled() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        this.flinkService.setPortReady(false);
        Assertions.assertEquals(10000L, (Long) this.reconciler.cleanup(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment()).getScheduleDelay().get());
        Assertions.assertEquals(JobStatus.RUNNING, ((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobState());
        this.flinkService.setPortReady(true);
        Assertions.assertEquals(true, Boolean.valueOf(this.reconciler.cleanup(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment()).isRemoveFinalizer()));
        Assertions.assertEquals(JobStatus.FINISHED, ((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobState());
    }

    @Test
    public void testCancelJobNotFound() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        this.flinkService.setFlinkJobNotFound(true);
        this.reconciler.cleanup(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(true, Boolean.valueOf(this.reconciler.cleanup(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment()).isRemoveFinalizer()));
    }

    @Test
    public void testCancelJobTerminatedWithoutCancellation() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        this.flinkService.setFlinkJobTerminatedWithoutCancellation(true);
        this.reconciler.cleanup(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(true, Boolean.valueOf(this.reconciler.cleanup(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment()).isRemoveFinalizer()));
    }

    @Test
    public void testRestart() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).setRestartNonce(2L);
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(JobStatus.FINISHED, ((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobState());
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
    }

    @Test
    public void testRestartWhenFailed() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment();
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().setState(JobStatus.FAILED.name());
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(2, this.flinkService.listJobs().size());
        Assertions.assertEquals(JobStatus.RUNNING, ((JobStatusMessage) this.flinkService.listJobs().get(1).f1).getJobState());
    }

    @Test
    public void testSubmitWithInitialSavepointPath() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob().setInitialSavepointPath("file:///init-sp");
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), "file:///init-sp", this.flinkService.listJobs());
    }

    @Test
    public void testStatelessUpgrade() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment();
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        FlinkSessionJob flinkSessionJob = (FlinkSessionJob) ReconciliationUtils.clone(buildSessionJob);
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setParallelism(2);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.FINISHED, ((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobState());
        verifyJobState(flinkSessionJob, JobState.SUSPENDED, "FINISHED");
        this.flinkService.clear();
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(flinkSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
    }

    @Test
    public void testSavepointUpgrade() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment();
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        Assertions.assertTrue(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().isEmpty());
        FlinkSessionJob flinkSessionJob = (FlinkSessionJob) ReconciliationUtils.clone(buildSessionJob);
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setParallelism(3);
        verifyAndSetRunningJobsToStatus(flinkSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.FINISHED, ((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobState());
        verifyJobState(flinkSessionJob, JobState.SUSPENDED, "FINISHED");
        Assertions.assertEquals(1, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().size());
        Assertions.assertEquals(SavepointTriggerType.UPGRADE, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerType());
        this.flinkService.clear();
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(flinkSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), "savepoint_0", this.flinkService.listJobs());
    }

    @Test
    public void testTriggerSavepoint() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment();
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        FlinkSessionJob flinkSessionJob = (FlinkSessionJob) ReconciliationUtils.clone(buildSessionJob);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus()));
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setSavepointTriggerNonce(2L);
        ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().setState(JobStatus.CREATED.name());
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus()));
        ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().setState(JobStatus.RUNNING.name());
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertTrue(SavepointUtils.savepointInProgress(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus()));
        Assertions.assertNull(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getSavepointTriggerNonce());
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setSavepointTriggerNonce(3L);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals("trigger_0", ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(1, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getParallelism());
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setParallelism(100);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals("trigger_0", ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(SavepointTriggerType.MANUAL, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getTriggerType());
        Assertions.assertEquals(1, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getParallelism());
        ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().resetTrigger();
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo(), flinkSessionJob);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(100, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getParallelism());
        verifyAndSetRunningJobsToStatus(flinkSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().resetTrigger();
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo(), flinkSessionJob);
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setSavepointTriggerNonce(4L);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals("trigger_1", ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().resetTrigger();
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo(), flinkSessionJob);
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setSavepointTriggerNonce((Long) null);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus()));
    }

    private static Stream<Arguments> cancelStatelessSessionJobParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{JobStatus.INITIALIZING, true}), Arguments.of(new Object[]{JobStatus.CREATED, true}), Arguments.of(new Object[]{JobStatus.RUNNING, true}), Arguments.of(new Object[]{JobStatus.FAILING, true}), Arguments.of(new Object[]{JobStatus.FAILED, false}), Arguments.of(new Object[]{JobStatus.CANCELLING, true}), Arguments.of(new Object[]{JobStatus.CANCELED, false}), Arguments.of(new Object[]{JobStatus.FINISHED, false}), Arguments.of(new Object[]{JobStatus.RESTARTING, true}), Arguments.of(new Object[]{JobStatus.SUSPENDED, true}), Arguments.of(new Object[]{JobStatus.RECONCILING, true})});
    }

    @Test
    public void testCancelStatelessSessionJobParams() {
        Assertions.assertEquals(JobStatus.values().length, cancelStatelessSessionJobParams().count());
    }

    @MethodSource({"cancelStatelessSessionJobParams"})
    @ParameterizedTest
    public void testCancelStatelessSessionJob(JobStatus jobStatus, boolean z) throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        Tuple3<String, JobStatusMessage, Configuration> tuple3 = this.flinkService.listJobs().get(0);
        JobStatusMessage jobStatusMessage = (JobStatusMessage) tuple3.f1;
        Configuration configuration = (Configuration) tuple3.f2;
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        tuple3.f1 = new JobStatusMessage(jobStatusMessage.getJobId(), jobStatusMessage.getJobName(), jobStatus, jobStatusMessage.getStartTime());
        ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().setState(jobStatus.name());
        this.flinkService.cancelSessionJob(buildSessionJob, UpgradeMode.STATELESS, configuration);
        if (z) {
            Assertions.assertEquals(1, this.flinkService.getCancelJobCallCount());
            Assertions.assertEquals(JobStatus.FINISHED, ((JobStatusMessage) tuple3.f1).getJobState());
        } else {
            Assertions.assertEquals(0, this.flinkService.getCancelJobCallCount());
            Assertions.assertEquals(jobStatus, ((JobStatusMessage) tuple3.f1).getJobState());
        }
        Assertions.assertEquals(JobStatus.FINISHED.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
    }

    private static Stream<Arguments> cancelSavepointSessionJobParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{JobStatus.INITIALIZING, true, false}), Arguments.of(new Object[]{JobStatus.CREATED, true, false}), Arguments.of(new Object[]{JobStatus.RUNNING, false, true}), Arguments.of(new Object[]{JobStatus.FAILING, true, false}), Arguments.of(new Object[]{JobStatus.FAILED, false, false}), Arguments.of(new Object[]{JobStatus.CANCELLING, true, false}), Arguments.of(new Object[]{JobStatus.CANCELED, false, false}), Arguments.of(new Object[]{JobStatus.FINISHED, false, false}), Arguments.of(new Object[]{JobStatus.RESTARTING, true, false}), Arguments.of(new Object[]{JobStatus.SUSPENDED, true, false}), Arguments.of(new Object[]{JobStatus.RECONCILING, true, false})});
    }

    @Test
    public void testCancelSavepointSessionJobParams() {
        Assertions.assertEquals(JobStatus.values().length, cancelSavepointSessionJobParams().count());
    }

    @MethodSource({"cancelSavepointSessionJobParams"})
    @ParameterizedTest
    public void testCancelSavepointSessionJob(JobStatus jobStatus, boolean z, boolean z2) throws Exception {
        Assertions.assertTrue((z && z2) ? false : true, "Expecting an exception and cancel to be called is and oxymoron");
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        Tuple3<String, JobStatusMessage, Configuration> tuple3 = this.flinkService.listJobs().get(0);
        JobStatusMessage jobStatusMessage = (JobStatusMessage) tuple3.f1;
        Configuration configuration = (Configuration) tuple3.f2;
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        tuple3.f1 = new JobStatusMessage(jobStatusMessage.getJobId(), jobStatusMessage.getJobName(), jobStatus, jobStatusMessage.getStartTime());
        ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().setState(jobStatus.name());
        if (z) {
            Assertions.assertTrue(((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
                this.flinkService.cancelSessionJob(buildSessionJob, UpgradeMode.SAVEPOINT, configuration);
            })).getMessage().contains("Unexpected non-terminal status"));
        } else {
            this.flinkService.cancelSessionJob(buildSessionJob, UpgradeMode.SAVEPOINT, configuration);
        }
        if (z2) {
            Assertions.assertEquals(1, this.flinkService.getCancelJobCallCount());
            Assertions.assertEquals("savepoint_0", tuple3.f0);
            Assertions.assertEquals(JobStatus.FINISHED, ((JobStatusMessage) tuple3.f1).getJobState());
        } else {
            Assertions.assertEquals(0, this.flinkService.getCancelJobCallCount());
            Assertions.assertNull(tuple3.f0);
            Assertions.assertEquals(jobStatus, ((JobStatusMessage) tuple3.f1).getJobState());
        }
        if (z) {
            return;
        }
        Assertions.assertEquals(JobStatus.FINISHED.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
    }

    private Tuple3<String, JobStatusMessage, Configuration> verifyAndReturnTheSubmittedJob(FlinkSessionJob flinkSessionJob, List<Tuple3<String, JobStatusMessage, Configuration>> list) {
        JobID fromHexString = JobID.fromHexString(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getJobId());
        Tuple3<String, JobStatusMessage, Configuration> tuple3 = list.stream().filter(tuple32 -> {
            return ((JobStatusMessage) tuple32.f1).getJobId().equals(fromHexString);
        }).findAny().get();
        Assertions.assertNotNull(tuple3);
        return tuple3;
    }

    private void verifyAndSetRunningJobsToStatus(FlinkSessionJob flinkSessionJob, JobState jobState, String str, @Nullable String str2, List<Tuple3<String, JobStatusMessage, Configuration>> list) {
        Tuple3<String, JobStatusMessage, Configuration> verifyAndReturnTheSubmittedJob = verifyAndReturnTheSubmittedJob(flinkSessionJob, list);
        Assertions.assertEquals(str2, verifyAndReturnTheSubmittedJob.f0);
        verifyJobState(flinkSessionJob, jobState, str);
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus();
        jobStatus.setJobName(((JobStatusMessage) verifyAndReturnTheSubmittedJob.f1).getJobName());
        jobStatus.setState("RUNNING");
    }

    private void verifyJobState(FlinkSessionJob flinkSessionJob, JobState jobState, String str) {
        Assertions.assertEquals(jobState, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(str, ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getState());
    }

    @Test
    public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment();
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        verifyAndSetRunningJobsToStatus(buildSessionJob, JobState.RUNNING, JobStatus.RECONCILING.name(), null, this.flinkService.listJobs());
        FlinkSessionJob flinkSessionJob = (FlinkSessionJob) ReconciliationUtils.clone(buildSessionJob);
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals("trigger_0", ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(JobState.RUNNING.name(), ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getState());
        this.configManager.updateDefaultConfig(Configuration.fromMap(Map.of(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT.key(), "true")));
        ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().setParallelism(100);
        this.reconciler.reconcile(flinkSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals("trigger_0", ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals("FINISHED", ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getState());
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
