package org.apache.flink.kubernetes.operator.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.Pod;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.client.deployment.application.ApplicationConfiguration;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.DeploymentOptionsInternal;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/config/FlinkConfigBuilderTest.class */
public class FlinkConfigBuilderTest {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(new YAMLFactory());
    private static FlinkDeployment flinkDeployment;
    private static final String CUSTOM_LOG_CONFIG = "rootLogger.level = INFO";

    @BeforeAll
    public static void prepareFlinkDeployment() {
        flinkDeployment = TestUtils.buildApplicationCluster();
        Container container = new Container();
        container.setName("container0");
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setPodTemplate(TestUtils.getTestPod("pod0 hostname", "pod0 api version", List.of(container)));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setIngress(IngressSpec.builder().template("test.com").build());
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJobManager().setReplicas(2);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setParallelism(2);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setLogConfiguration(Map.of("log4j-console.properties", CUSTOM_LOG_CONFIG));
    }

    @Test
    public void testApplyImage() {
        Assertions.assertEquals(BaseTestUtils.IMAGE, new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyImage().build().get(KubernetesConfigOptions.CONTAINER_IMAGE));
    }

    @Test
    public void testApplyImagePolicy() {
        Assertions.assertEquals("IfNotPresent", ((KubernetesConfigOptions.ImagePullPolicy) new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyImagePullPolicy().build().get(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY)).toString());
    }

    @Test
    public void testApplyFlinkConfiguration() {
        Configuration build = new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyFlinkConfiguration().build();
        Assertions.assertEquals(2, ((Integer) build.get(TaskManagerOptions.NUM_TASK_SLOTS)).intValue());
        Assertions.assertEquals(KubernetesConfigOptions.ServiceExposedType.ClusterIP, build.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
        Assertions.assertEquals(false, build.get(WebOptions.CANCEL_ENABLE));
        Assertions.assertEquals(flinkDeployment.getMetadata().getName(), build.get(PipelineOptions.NAME));
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).setFlinkConfiguration(Map.of(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), KubernetesConfigOptions.ServiceExposedType.LoadBalancer.name()));
        Assertions.assertEquals(KubernetesConfigOptions.ServiceExposedType.LoadBalancer, new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyFlinkConfiguration().build().get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        Assertions.assertEquals(FlinkConfigBuilder.DEFAULT_CHECKPOINTING_INTERVAL, new FlinkConfigBuilder(flinkDeployment2, new Configuration().set(HighAvailabilityOptions.HA_MODE, KubernetesHaServicesFactory.class.getCanonicalName())).applyFlinkConfiguration().build().get(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL));
        Assertions.assertEquals(false, new FlinkConfigBuilder(TestUtils.buildSessionCluster(), new Configuration()).applyFlinkConfiguration().build().get(WebOptions.CANCEL_ENABLE));
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testApplyFlinkConfigurationShouldSetShutdownOnFinishBasedOnFlinkVersion(FlinkVersion flinkVersion) {
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setFlinkVersion(flinkVersion);
        Configuration build = new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyFlinkConfiguration().build();
        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
            Assertions.assertFalse(build.getBoolean(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH));
        } else {
            Assertions.assertTrue(build.getBoolean(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH));
        }
    }

    @Test
    public void testApplyLogConfiguration() throws IOException {
        File file = new File((String) new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyLogConfiguration().build().get(DeploymentOptionsInternal.CONF_DIR), "log4j-console.properties");
        Assertions.assertTrue(file.exists() && file.isFile() && file.canRead());
        Assertions.assertEquals(CUSTOM_LOG_CONFIG, Files.readString(file.toPath()));
    }

    @Test
    public void testApplyCommonPodTemplate() throws Exception {
        Configuration build = new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyCommonPodTemplate().build();
        Pod pod = (Pod) OBJECT_MAPPER.readValue(new File(build.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)), Pod.class);
        Pod pod2 = (Pod) OBJECT_MAPPER.readValue(new File(build.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)), Pod.class);
        Assertions.assertEquals("container0", ((Container) pod.getSpec().getContainers().get(0)).getName());
        Assertions.assertEquals("container0", ((Container) pod2.getSpec().getContainers().get(0)).getName());
    }

    @Test
    public void testApplyIngressDomain() {
        Assertions.assertEquals(KubernetesConfigOptions.ServiceExposedType.ClusterIP, new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyIngressDomain().build().get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE));
    }

    @Test
    public void testApplyServiceAccount() {
        Assertions.assertEquals("flink-operator", new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyServiceAccount().build().get(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT));
    }

    @Test
    public void testApplyJobManagerSpec() throws Exception {
        Configuration build = new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyJobManagerSpec().build();
        Assertions.assertNull(build.getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE));
        Assertions.assertEquals(MemorySize.parse("2048m"), build.get(JobManagerOptions.TOTAL_PROCESS_MEMORY));
        Assertions.assertEquals(Double.valueOf(1.0d), (Double) build.get(KubernetesConfigOptions.JOB_MANAGER_CPU));
        Assertions.assertEquals(2, (Integer) build.get(KubernetesConfigOptions.KUBERNETES_JOBMANAGER_REPLICAS));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJobManager().setPodTemplate(TestUtils.getTestPod("pod1 hostname", "pod1 api version", new ArrayList()));
        Assertions.assertEquals("pod1 api version", ((Pod) OBJECT_MAPPER.readValue(new File(new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyJobManagerSpec().build().getString(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE)), Pod.class)).getApiVersion());
    }

    @Test
    public void testApplyTaskManagerSpec() throws Exception {
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).setPodTemplate((Pod) null);
        Configuration build = new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyTaskManagerSpec().build();
        Assertions.assertNull(build.getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE));
        Assertions.assertEquals(MemorySize.parse("2048m"), build.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY));
        Assertions.assertEquals(Double.valueOf(1.0d), (Double) build.get(KubernetesConfigOptions.TASK_MANAGER_CPU));
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getTaskManager().setPodTemplate(TestUtils.getTestPod("pod2 hostname", "pod2 api version", new ArrayList()));
        Assertions.assertEquals("pod2 api version", ((Pod) OBJECT_MAPPER.readValue(new File(new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyTaskManagerSpec().build().getString(KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE)), Pod.class)).getApiVersion());
    }

    @Test
    public void testApplyJobOrSessionSpec() throws Exception {
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setAllowNonRestoredState(true);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setArgs(new String[]{"--test", "123"});
        Configuration build = new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyJobOrSessionSpec().build();
        Assertions.assertTrue(build.getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
        Assertions.assertEquals(KubernetesDeploymentTarget.APPLICATION.getName(), build.get(DeploymentOptions.TARGET));
        Assertions.assertEquals("local:///tmp/sample.jar", ((List) build.get(PipelineOptions.JARS)).get(0));
        Assertions.assertEquals(2, (Integer) build.get(CoreOptions.DEFAULT_PARALLELISM));
        Assertions.assertEquals(List.of("--test", "123"), build.get(ApplicationConfiguration.APPLICATION_ARGS));
        FlinkDeployment flinkDeployment3 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment2);
        ((FlinkDeploymentSpec) flinkDeployment3.getSpec()).setTaskManager(new TaskManagerSpec());
        ((FlinkDeploymentSpec) flinkDeployment3.getSpec()).getTaskManager().setReplicas(3);
        ((FlinkDeploymentSpec) flinkDeployment3.getSpec()).getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "4");
        Assertions.assertEquals(12, (Integer) new FlinkConfigBuilder(flinkDeployment3, new Configuration()).applyFlinkConfiguration().applyJobOrSessionSpec().build().get(CoreOptions.DEFAULT_PARALLELISM));
    }

    @Test
    public void testApplyJobOrSessionSpecWithNoJar() throws Exception {
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setJarURI((String) null);
        Assertions.assertNull(new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyJobOrSessionSpec().build().get(PipelineOptions.JARS));
    }

    @Test
    public void testAllowNonRestoredStateInSpecOverrideInFlinkConf() throws URISyntaxException {
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setAllowNonRestoredState(false);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "true");
        Assertions.assertFalse(new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyJobOrSessionSpec().build().getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setAllowNonRestoredState(true);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE.key(), "false");
        Assertions.assertTrue(new FlinkConfigBuilder(flinkDeployment, new Configuration()).applyJobOrSessionSpec().build().getBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
    }

    @Test
    public void testApplyStandaloneApplicationSpec() throws URISyntaxException, IOException {
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setEntryClass("entry.class");
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setJarURI("local:///flink/opt/StateMachine.jar");
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).setTaskManager(new TaskManagerSpec());
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getTaskManager().setReplicas(3);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
        Configuration build = new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyFlinkConfiguration().applyTaskManagerSpec().applyJobOrSessionSpec().build();
        Assertions.assertEquals("remote", build.getString(DeploymentOptions.TARGET));
        Assertions.assertEquals(StandaloneKubernetesConfigOptionsInternal.ClusterMode.APPLICATION, build.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
        Assertions.assertEquals(6, build.getInteger(CoreOptions.DEFAULT_PARALLELISM));
        Assertions.assertEquals("entry.class", build.getString(ApplicationConfiguration.APPLICATION_MAIN_CLASS));
        Assertions.assertEquals(3, (Integer) build.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
        MatcherAssert.assertThat(ConfigUtils.decodeListFromConfig(build, PipelineOptions.CLASSPATHS, (v0) -> {
            return v0.toString();
        }), Matchers.containsInAnyOrder(new String[]{"file:///flink/opt/StateMachine.jar"}));
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getTaskManager().setReplicas((Integer) null);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setParallelism(10);
        Assertions.assertEquals(5, (Integer) new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyFlinkConfiguration().applyTaskManagerSpec().applyJobOrSessionSpec().build().get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
    }

    @Test
    public void testApplyStandaloneSessionSpec() throws URISyntaxException, IOException {
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).setJob((JobSpec) null);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).setTaskManager(new TaskManagerSpec());
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getTaskManager().setReplicas(5);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getFlinkConfiguration().put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
        Configuration build = new FlinkConfigBuilder(flinkDeployment2, new Configuration()).applyFlinkConfiguration().applyTaskManagerSpec().applyJobOrSessionSpec().build();
        Assertions.assertEquals("remote", build.getString(DeploymentOptions.TARGET));
        Assertions.assertEquals(StandaloneKubernetesConfigOptionsInternal.ClusterMode.SESSION, build.get(StandaloneKubernetesConfigOptionsInternal.CLUSTER_MODE));
        Assertions.assertEquals(5, (Integer) build.get(StandaloneKubernetesConfigOptionsInternal.KUBERNETES_TASKMANAGER_REPLICAS));
    }

    @Test
    public void testBuildFrom() throws Exception {
        Configuration buildFrom = FlinkConfigBuilder.buildFrom(flinkDeployment.getMetadata().getNamespace(), flinkDeployment.getMetadata().getName(), (FlinkDeploymentSpec) flinkDeployment.getSpec(), new Configuration());
        String namespace = flinkDeployment.getMetadata().getNamespace();
        String name = flinkDeployment.getMetadata().getName();
        Assertions.assertEquals(namespace, buildFrom.get(KubernetesConfigOptions.NAMESPACE));
        Assertions.assertEquals(name, buildFrom.get(KubernetesConfigOptions.CLUSTER_ID));
    }
}
