package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentList;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.class */
public class StandaloneFlinkServiceTest {
    KubernetesMockServer mockServer;
    private NamespacedKubernetesClient kubernetesClient;
    StandaloneFlinkService flinkStandaloneService;
    Configuration configuration = new Configuration();

    /* loaded from: input_file:org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest$TestingStandaloneFlinkService.class */
    class TestingStandaloneFlinkService extends StandaloneFlinkService {
        private Configuration runtimeConfig;

        public TestingStandaloneFlinkService(StandaloneFlinkService standaloneFlinkService) {
            super(standaloneFlinkService.kubernetesClient, standaloneFlinkService.configManager);
        }

        public Configuration getRuntimeConfig() {
            return this.runtimeConfig;
        }

        protected void submitClusterInternal(Configuration configuration, Mode mode) {
            this.runtimeConfig = configuration;
        }
    }

    @BeforeEach
    public void setup() {
        this.configuration.set(KubernetesConfigOptions.CLUSTER_ID, "test-cluster");
        this.configuration.set(KubernetesConfigOptions.NAMESPACE, "flink-operator-test");
        this.configuration.set(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT, 80);
        this.kubernetesClient = this.mockServer.createClient().inAnyNamespace();
        this.flinkStandaloneService = new StandaloneFlinkService(this.kubernetesClient, new FlinkConfigManager(this.configuration));
    }

    @Test
    public void testDeleteClusterDeployment() throws Exception {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        this.configuration = buildConfig(buildSessionCluster, this.configuration);
        createDeployments(buildSessionCluster);
        Assertions.assertEquals(2, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
        int requestCount = this.mockServer.getRequestCount();
        this.flinkStandaloneService.deleteClusterDeployment(buildSessionCluster.getMetadata(), (FlinkDeploymentStatus) buildSessionCluster.getStatus(), this.configuration, false);
        Assertions.assertEquals(2, this.mockServer.getRequestCount() - requestCount);
        Assertions.assertTrue(this.mockServer.getLastRequest().getPath().contains("taskmanager"));
        Assertions.assertEquals(0, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
    }

    @Test
    public void testDeleteClusterDeploymentWithHADelete() throws Exception {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        this.configuration = buildConfig(buildSessionCluster, this.configuration);
        createDeployments(buildSessionCluster);
        Assertions.assertEquals(2, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
        this.flinkStandaloneService.deleteClusterDeployment(buildSessionCluster.getMetadata(), (FlinkDeploymentStatus) buildSessionCluster.getStatus(), this.configuration, true);
        Assertions.assertEquals(0, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
    }

    @Test
    public void testTMReplicaScaleApplication() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        String name = buildApplicationCluster.getMetadata().getName();
        String namespace = buildApplicationCluster.getMetadata().getNamespace();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(JobManagerOptions.SCHEDULER_MODE.key(), SchedulerExecutionMode.REACTIVE.name());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(4);
        createDeployments(buildApplicationCluster);
        Assertions.assertTrue(this.flinkStandaloneService.scale(buildApplicationCluster.getMetadata(), ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), buildConfig(buildApplicationCluster, this.configuration)));
        Assertions.assertEquals(2, ((Deployment) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).get()).getSpec().getReplicas());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(100);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getTaskManager().setReplicas(2);
        Assertions.assertTrue(this.flinkStandaloneService.scale(buildApplicationCluster.getMetadata(), ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), buildConfig(buildApplicationCluster, this.configuration)));
        Assertions.assertEquals(2, ((Deployment) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).get()).getSpec().getReplicas());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(100);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().remove(JobManagerOptions.SCHEDULER_MODE.key());
        Assertions.assertFalse(this.flinkStandaloneService.scale(buildApplicationCluster.getMetadata(), ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), buildConfig(buildApplicationCluster, this.configuration)));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getTaskManager().setReplicas(10);
        Assertions.assertFalse(this.flinkStandaloneService.scale(buildApplicationCluster.getMetadata(), ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), buildConfig(buildApplicationCluster, this.configuration)));
    }

    @Test
    public void testTMReplicaScaleSession() throws Exception {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        String name = buildSessionCluster.getMetadata().getName();
        String namespace = buildSessionCluster.getMetadata().getNamespace();
        ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).getTaskManager().setReplicas(3);
        createDeployments(buildSessionCluster);
        Assertions.assertTrue(this.flinkStandaloneService.scale(buildSessionCluster.getMetadata(), ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).getJob(), buildConfig(buildSessionCluster, this.configuration)));
        Assertions.assertEquals(3, ((Deployment) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).get()).getSpec().getReplicas());
        ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).getTaskManager().setReplicas(10);
        createDeployments(buildSessionCluster);
        Assertions.assertTrue(this.flinkStandaloneService.scale(buildSessionCluster.getMetadata(), ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).getJob(), buildConfig(buildSessionCluster, this.configuration)));
        Assertions.assertEquals(10, ((Deployment) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).get()).getSpec().getReplicas());
    }

    @Test
    public void testSubmitSessionClusterConfigRemoval() throws Exception {
        TestingStandaloneFlinkService testingStandaloneFlinkService = new TestingStandaloneFlinkService(this.flinkStandaloneService);
        testingStandaloneFlinkService.submitSessionCluster(this.configuration);
        Assertions.assertFalse(testingStandaloneFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    @Test
    public void testDeployApplicationClusterConfigRemoval() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        TestingStandaloneFlinkService testingStandaloneFlinkService = new TestingStandaloneFlinkService(this.flinkStandaloneService);
        testingStandaloneFlinkService.deployApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), this.configuration);
        Assertions.assertFalse(testingStandaloneFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    private Configuration buildConfig(FlinkDeployment flinkDeployment, Configuration configuration) throws Exception {
        return FlinkConfigBuilder.buildFrom(flinkDeployment.getMetadata().getNamespace(), flinkDeployment.getMetadata().getName(), (FlinkDeploymentSpec) flinkDeployment.getSpec(), configuration);
    }

    private void createDeployments(AbstractFlinkResource abstractFlinkResource) {
        Deployment deployment = new Deployment();
        ObjectMeta objectMeta = new ObjectMeta();
        objectMeta.setName(StandaloneKubernetesUtils.getJobManagerDeploymentName(abstractFlinkResource.getMetadata().getName()));
        deployment.setMetadata(objectMeta);
        this.kubernetesClient.resource(deployment).inNamespace(abstractFlinkResource.getMetadata().getNamespace()).createOrReplace();
        Deployment deployment2 = new Deployment();
        ObjectMeta objectMeta2 = new ObjectMeta();
        objectMeta2.setName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(abstractFlinkResource.getMetadata().getName()));
        deployment2.setMetadata(objectMeta2);
        deployment2.setSpec(new DeploymentSpec());
        this.kubernetesClient.resource(deployment2).inNamespace(abstractFlinkResource.getMetadata().getNamespace()).createOrReplace();
    }
}
