package org.apache.flink.kubernetes.operator.validation;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.class */
public class DefaultValidatorTest {
    private final DefaultValidator validator = new DefaultValidator(new FlinkConfigManager(new Configuration()));

    @Test
    public void testValidationWithoutDefaultConfig() {
        testSuccess(flinkDeployment -> {
        });
        testSuccess(flinkDeployment2 -> {
            flinkDeployment2.getMetadata().setName("session-cluster");
        });
        testError(flinkDeployment3 -> {
            flinkDeployment3.getMetadata().setName("session-cluster-1.13");
        }, "The FlinkDeployment name: session-cluster-1.13 is invalid, must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name',  or 'abc-123'), and the length must be no more than 45 characters.");
        testError(flinkDeployment4 -> {
            ((FlinkDeploymentSpec) flinkDeployment4.getSpec()).getJob().setState(JobState.SUSPENDED);
        }, "Job must start in running state");
        testError(flinkDeployment5 -> {
            ((FlinkDeploymentSpec) flinkDeployment5.getSpec()).getJob().setParallelism(0);
        }, "Job parallelism must be larger than 0");
        testError(flinkDeployment6 -> {
            TaskManagerSpec taskManagerSpec = new TaskManagerSpec();
            taskManagerSpec.setReplicas(0);
            ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).setTaskManager(taskManagerSpec);
        }, "TaskManager replicas must be larger than 0");
        testSuccess(flinkDeployment7 -> {
            ((FlinkDeploymentSpec) flinkDeployment7.getSpec()).getTaskManager().setReplicas(1);
            ((FlinkDeploymentSpec) flinkDeployment7.getSpec()).getJob().setParallelism(0);
        });
        testError(flinkDeployment8 -> {
            ((FlinkDeploymentSpec) flinkDeployment8.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment8.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, "Job could not be upgraded with last-state while HA disabled");
        testError(flinkDeployment9 -> {
            ((FlinkDeploymentSpec) flinkDeployment9.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment9.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        }, String.format("Job could not be upgraded with savepoint while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testError(flinkDeployment10 -> {
            ((FlinkDeploymentSpec) flinkDeployment10.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment10.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        }, "Checkpoint directory");
        testError(flinkDeployment11 -> {
            ((FlinkDeploymentSpec) flinkDeployment11.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment11.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, "Checkpoint directory");
        testSuccess(flinkDeployment12 -> {
            ((FlinkDeploymentSpec) flinkDeployment12.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment12.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        });
        testError(flinkDeployment13 -> {
            ((FlinkDeploymentSpec) flinkDeployment13.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment13.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        }, String.format("Savepoint could not be manually triggered for the running job while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testError(flinkDeployment14 -> {
            ((FlinkDeploymentSpec) flinkDeployment14.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key(), "1m"));
        }, String.format("Periodic savepoints cannot be enabled when config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testSuccess(flinkDeployment15 -> {
            ((FlinkDeploymentSpec) flinkDeployment15.getSpec()).setFlinkConfiguration(Collections.singletonMap("random", "config"));
        });
        testError(flinkDeployment16 -> {
            ((FlinkDeploymentSpec) flinkDeployment16.getSpec()).setFlinkConfiguration(Collections.singletonMap(KubernetesConfigOptions.NAMESPACE.key(), "myns"));
        }, "Forbidden Flink config key");
        testError(flinkDeployment17 -> {
            ((FlinkDeploymentSpec) flinkDeployment17.getSpec()).setFlinkConfiguration(Collections.singletonMap(HighAvailabilityOptions.HA_CLUSTER_ID.key(), "my-cluster-id"));
        }, "Forbidden Flink config key");
        testError(flinkDeployment18 -> {
            ((FlinkDeploymentSpec) flinkDeployment18.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key(), "true", KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED.key(), "false"));
        }, "Deployment recovery (" + KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED.key() + ") must be enabled");
        testSuccess(flinkDeployment19 -> {
            ((FlinkDeploymentSpec) flinkDeployment19.getSpec()).setLogConfiguration(Map.of("log4j-console.properties", "rootLogger.level = INFO"));
        });
        testError(flinkDeployment20 -> {
            ((FlinkDeploymentSpec) flinkDeployment20.getSpec()).setIngress(new IngressSpec());
        }, "Ingress template must be defined");
        testError(flinkDeployment21 -> {
            ((FlinkDeploymentSpec) flinkDeployment21.getSpec()).setIngress(IngressSpec.builder().template("example.com:port").build());
        }, "Unable to process the Ingress template(example.com:port). Error: Error at index 0 in: \"port\"");
        testSuccess(flinkDeployment22 -> {
            ((FlinkDeploymentSpec) flinkDeployment22.getSpec()).setIngress(IngressSpec.builder().template("example.com/{{namespace}}/{{name}}").build());
        });
        testError(flinkDeployment23 -> {
            ((FlinkDeploymentSpec) flinkDeployment23.getSpec()).setLogConfiguration(Map.of("random", "config"));
        }, "Invalid log config key");
        testError(flinkDeployment24 -> {
            ((FlinkDeploymentSpec) flinkDeployment24.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment24.getSpec()).getJobManager().setReplicas(2);
        }, "High availability should be enabled when starting standby JobManagers.");
        testError(flinkDeployment25 -> {
            ((FlinkDeploymentSpec) flinkDeployment25.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true"));
        }, "HA must be enabled for rollback support.");
        testError(flinkDeployment26 -> {
            ((FlinkDeploymentSpec) flinkDeployment26.getSpec()).getJobManager().setReplicas(0);
        }, "JobManager replicas should not be configured less than one.");
        testSuccess(flinkDeployment27 -> {
            ((FlinkDeploymentSpec) flinkDeployment27.getSpec()).getTaskManager().getResource().setMemory("1G");
        });
        testSuccess(flinkDeployment28 -> {
            ((FlinkDeploymentSpec) flinkDeployment28.getSpec()).getTaskManager().getResource().setMemory("100");
        });
        testError(flinkDeployment29 -> {
            ((FlinkDeploymentSpec) flinkDeployment29.getSpec()).getTaskManager().getResource().setMemory("invalid");
        }, "TaskManager resource memory parse error");
        testError(flinkDeployment30 -> {
            ((FlinkDeploymentSpec) flinkDeployment30.getSpec()).getJobManager().getResource().setMemory("invalid");
        }, "JobManager resource memory parse error");
        testError(flinkDeployment31 -> {
            ((FlinkDeploymentSpec) flinkDeployment31.getSpec()).getTaskManager().getResource().setMemory((String) null);
        }, "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
        testError(flinkDeployment32 -> {
            ((FlinkDeploymentSpec) flinkDeployment32.getSpec()).getJobManager().getResource().setMemory((String) null);
        }, "JobManager resource memory must be defined using `spec.jobManager.resource.memory`");
        testError(flinkDeployment33 -> {
            ((FlinkDeploymentSpec) flinkDeployment33.getSpec()).getTaskManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment33.getSpec()).setFlinkConfiguration(Map.of(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "1024m"));
        }, "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
        testSuccess(flinkDeployment34 -> {
            ((FlinkDeploymentSpec) flinkDeployment34.getSpec()).getJobManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment34.getSpec()).setFlinkConfiguration(Map.of(JobManagerOptions.JVM_HEAP_MEMORY.key(), "2048m"));
        });
        testSuccess(flinkDeployment35 -> {
            ((FlinkDeploymentSpec) flinkDeployment35.getSpec()).getTaskManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment35.getSpec()).setFlinkConfiguration(Map.of(TaskManagerOptions.TOTAL_FLINK_MEMORY.key(), "2048m"));
        });
        testSuccess(flinkDeployment36 -> {
            ((FlinkDeploymentSpec) flinkDeployment36.getSpec()).getTaskManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment36.getSpec()).setFlinkConfiguration(Map.of(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "1024m", TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "1024m"));
        });
        testSuccess(flinkDeployment37 -> {
            flinkDeployment37.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment37.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment37.getStatus()).getJobStatus().getSavepointInfo().setLastSavepoint(Savepoint.of("sp", SavepointTriggerType.UPGRADE));
            ((FlinkDeploymentStatus) flinkDeployment37.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment37.getSpec());
            flinkDeploymentSpec.getJob().setState(JobState.SUSPENDED);
            ((FlinkDeploymentStatus) flinkDeployment37.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment37);
            ((FlinkDeploymentSpec) flinkDeployment37.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "file:///flink-data/savepoints");
            ((FlinkDeploymentSpec) flinkDeployment37.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        });
        testError(flinkDeployment38 -> {
            flinkDeployment38.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment38.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment38.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentStatus) flinkDeployment38.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment38.getSpec()), flinkDeployment38);
            ((FlinkDeploymentSpec) flinkDeployment38.getSpec()).setJob((JobSpec) null);
        }, "Cannot switch from job to session cluster");
        testError(flinkDeployment39 -> {
            flinkDeployment39.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment39.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment39.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment39.getSpec());
            flinkDeploymentSpec.setJob((JobSpec) null);
            ((FlinkDeploymentStatus) flinkDeployment39.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment39);
        }, "Cannot switch from session to job cluster");
        testError(flinkDeployment40 -> {
            flinkDeployment40.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment40.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment40.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment40.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment40.getSpec());
            flinkDeploymentSpec.setMode(KubernetesDeploymentMode.NATIVE);
            ((FlinkDeploymentStatus) flinkDeployment40.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment40);
        }, "Cannot switch from native kubernetes to standalone kubernetes cluster");
        testError(flinkDeployment41 -> {
            flinkDeployment41.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment41.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment41.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment41.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment41.getSpec());
            flinkDeploymentSpec.setMode((KubernetesDeploymentMode) null);
            ((FlinkDeploymentStatus) flinkDeployment41.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment41);
        }, "Cannot switch from native kubernetes to standalone kubernetes cluster");
        testError(flinkDeployment42 -> {
            flinkDeployment42.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment42.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment42.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment42.getSpec()).setMode((KubernetesDeploymentMode) null);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment42.getSpec());
            flinkDeploymentSpec.setMode(KubernetesDeploymentMode.STANDALONE);
            ((FlinkDeploymentStatus) flinkDeployment42.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment42);
        }, "Cannot switch from standalone kubernetes to native kubernetes cluster");
        testError(flinkDeployment43 -> {
            flinkDeployment43.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment43.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment43.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment43.getSpec()).setMode(KubernetesDeploymentMode.NATIVE);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment43.getSpec());
            flinkDeploymentSpec.setMode(KubernetesDeploymentMode.STANDALONE);
            ((FlinkDeploymentStatus) flinkDeployment43.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment43);
        }, "Cannot switch from standalone kubernetes to native kubernetes cluster");
        testError(flinkDeployment44 -> {
            ((FlinkDeploymentSpec) flinkDeployment44.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
            ((FlinkDeploymentSpec) flinkDeployment44.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
            flinkDeployment44.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment44.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment44.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment44.getSpec());
            flinkDeploymentSpec.getJob().setUpgradeMode(UpgradeMode.STATELESS);
            flinkDeploymentSpec.getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
            ((FlinkDeploymentStatus) flinkDeployment44.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment44);
            ((FlinkDeploymentStatus) flinkDeployment44.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        }, String.format("Job could not be upgraded to last-state while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testError(flinkDeployment45 -> {
            ((FlinkDeploymentSpec) flinkDeployment45.getSpec()).setFlinkVersion((FlinkVersion) null);
        }, "Flink Version must be defined.");
        testSuccess(flinkDeployment46 -> {
            ((FlinkDeploymentSpec) flinkDeployment46.getSpec()).setFlinkVersion(FlinkVersion.v1_15);
        });
        testError(flinkDeployment47 -> {
            ((FlinkDeploymentSpec) flinkDeployment47.getSpec()).setServiceAccount((String) null);
        }, "spec.serviceAccount must be defined. If you use helm, its value should be the same with the name of jobServiceAccount.");
        testSuccess(flinkDeployment48 -> {
            ((FlinkDeploymentSpec) flinkDeployment48.getSpec()).setServiceAccount("flink");
        });
        testSuccess(flinkDeployment49 -> {
            ((FlinkDeploymentSpec) flinkDeployment49.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
            ((FlinkDeploymentSpec) flinkDeployment49.getSpec()).getFlinkConfiguration().put(HighAvailabilityOptions.HA_MODE.key(), "kubernetes");
        });
    }

    @Test
    public void testValidationWithDefaultConfig() {
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, KubernetesHaServicesFactory.class.getCanonicalName());
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "cpdir");
        testSuccess(flinkDeployment -> {
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, new DefaultValidator(new FlinkConfigManager(configuration)));
    }

    @EnumSource(UpgradeMode.class)
    @ParameterizedTest
    public void testFlinkVersionChangeValidation(UpgradeMode upgradeMode) {
        Consumer<FlinkDeployment> createFlinkVersionChange = createFlinkVersionChange(UpgradeMode.LAST_STATE, upgradeMode, JobState.SUSPENDED);
        if (upgradeMode == UpgradeMode.STATELESS) {
            testSuccess(createFlinkVersionChange);
        } else {
            testError(createFlinkVersionChange, "Changing flinkVersion after last-state suspend is not allowed.");
        }
        for (UpgradeMode upgradeMode2 : UpgradeMode.values()) {
            testSuccess(createFlinkVersionChange(upgradeMode2, upgradeMode, JobState.RUNNING));
        }
        testSuccess(createFlinkVersionChange(UpgradeMode.SAVEPOINT, upgradeMode, JobState.SUSPENDED));
        testSuccess(createFlinkVersionChange(UpgradeMode.STATELESS, upgradeMode, JobState.SUSPENDED));
    }

    @NotNull
    private Consumer<FlinkDeployment> createFlinkVersionChange(UpgradeMode upgradeMode, UpgradeMode upgradeMode2, JobState jobState) {
        return flinkDeployment -> {
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) flinkDeployment.getSpec();
            flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_15);
            flinkDeploymentSpec.getJob().setUpgradeMode(upgradeMode2);
            FlinkDeploymentSpec flinkDeploymentSpec2 = (FlinkDeploymentSpec) ReconciliationUtils.clone(flinkDeploymentSpec);
            flinkDeploymentSpec2.getJob().setUpgradeMode(upgradeMode);
            flinkDeploymentSpec2.getJob().setState(jobState);
            flinkDeploymentSpec2.setFlinkVersion(FlinkVersion.v1_14);
            ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec2, flinkDeployment);
        };
    }

    private void testSuccess(Consumer<FlinkDeployment> consumer) {
        testSuccess(consumer, this.validator);
    }

    private void testSuccess(Consumer<FlinkDeployment> consumer, DefaultValidator defaultValidator) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        consumer.accept(buildApplicationCluster);
        defaultValidator.validateDeployment(buildApplicationCluster).ifPresent(Assertions::fail);
    }

    private void testError(Consumer<FlinkDeployment> consumer, String str) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        consumer.accept(buildApplicationCluster);
        Optional validateDeployment = this.validator.validateDeployment(buildApplicationCluster);
        if (validateDeployment.isPresent()) {
            Assertions.assertTrue(((String) validateDeployment.get()).startsWith(str), (String) validateDeployment.get());
        } else {
            Assertions.fail("Did not get expected error: " + str);
        }
    }

    @Test
    public void testSessionJobWithSession() {
        testSessionJobValidateWithModifier(flinkSessionJob -> {
        }, flinkDeployment -> {
        }, null);
        testSessionJobValidate(TestUtils.buildSessionJob(), Optional.empty(), null);
        testSessionJobValidateWithModifier(flinkSessionJob2 -> {
            ((FlinkSessionJobSpec) flinkSessionJob2.getSpec()).setDeploymentName("not-match");
        }, flinkDeployment2 -> {
        }, "The session job's cluster id is not match with the session cluster");
        testSessionJobValidateWithModifier(flinkSessionJob3 -> {
        }, flinkDeployment3 -> {
            ((FlinkDeploymentSpec) flinkDeployment3.getSpec()).setJob(new JobSpec());
        }, "Can not submit session job to application cluster");
        testSessionJobValidateWithModifier(flinkSessionJob4 -> {
            ((FlinkSessionJobSpec) flinkSessionJob4.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, flinkDeployment4 -> {
        }, "The LAST_STATE upgrade mode is not supported in session job now.");
        testSessionJobValidateWithModifier(flinkSessionJob5 -> {
            ((FlinkSessionJobSpec) flinkSessionJob5.getSpec()).getJob().setState(JobState.SUSPENDED);
        }, flinkDeployment5 -> {
        }, "Job must start in running state");
        testSessionJobValidateWithModifier(flinkSessionJob6 -> {
            ((FlinkSessionJobSpec) flinkSessionJob6.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "headerKey1:headerValue1,headerKey2:headerValue2"));
        }, flinkDeployment6 -> {
        }, null);
        testSessionJobValidateWithModifier(flinkSessionJob7 -> {
            ((FlinkSessionJobSpec) flinkSessionJob7.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key(), "1m"));
        }, flinkDeployment7 -> {
        }, null);
        testSessionJobValidateWithModifier(flinkSessionJob8 -> {
            ((FlinkSessionJobStatus) flinkSessionJob8.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkSessionJobSpec) flinkSessionJob8.getSpec(), flinkSessionJob8);
            ((FlinkSessionJobSpec) flinkSessionJob8.getSpec()).setDeploymentName("new-deployment-name");
        }, flinkDeployment8 -> {
        }, "The deploymentName can't be changed");
    }

    private void testSessionJobValidateWithModifier(Consumer<FlinkSessionJob> consumer, Consumer<FlinkDeployment> consumer2, @Nullable String str) {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        consumer2.accept(buildSessionCluster);
        consumer.accept(buildSessionJob);
        testSessionJobValidate(buildSessionJob, Optional.of(buildSessionCluster), str);
    }

    private void testSessionJobValidate(FlinkSessionJob flinkSessionJob, Optional<FlinkDeployment> optional, @Nullable String str) {
        Optional validateSessionJob = this.validator.validateSessionJob(flinkSessionJob, optional);
        if (str == null) {
            validateSessionJob.ifPresent(Assertions::fail);
        } else if (validateSessionJob.isPresent()) {
            Assertions.assertTrue(((String) validateSessionJob.get()).startsWith(str), (String) validateSessionJob.get());
        } else {
            Assertions.fail("Did not get expected error: " + str);
        }
    }
}
