package org.apache.flink.kubernetes.operator.observer.deployment;

import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.HashMap;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.utils.SpecWithMeta;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.class */
public class SessionObserverTest extends OperatorTestBase {
    private KubernetesClient kubernetesClient;
    private final Context<FlinkDeployment> readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
    private TestObserverAdapter<FlinkDeployment> observer;

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        this.observer = new TestObserverAdapter<>(this, new SessionObserver(this.configManager, this.eventRecorder));
    }

    @Test
    public void observeSessionCluster() {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        ReconciliationUtils.updateStatusForDeployedSpec(buildSessionCluster, new Configuration());
        this.observer.observe(buildSessionCluster, this.readyContext);
        Assertions.assertNull(((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        this.observer.observe(buildSessionCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        this.observer.observe(buildSessionCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        this.flinkService.setPortReady(false);
        this.observer.observe(buildSessionCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        this.flinkService.setPortReady(true);
        this.observer.observe(buildSessionCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        this.observer.observe(buildSessionCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
    }

    @Test
    public void observeAlreadyUpgraded() {
        Deployment createDeployment = TestUtils.createDeployment(true);
        createDeployment.getMetadata().setAnnotations(new HashMap());
        Context<?> createContextWithDeployment = TestUtils.createContextWithDeployment(createDeployment);
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        buildSessionCluster.getMetadata().setGeneration(123L);
        ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getReconciliationStatus();
        ReconciliationUtils.updateStatusForDeployedSpec(buildSessionCluster, new Configuration());
        ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).getFlinkConfiguration().put("k", "1");
        buildSessionCluster.getMetadata().setGeneration(321L);
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(buildSessionCluster, new FlinkConfigManager(new Configuration()).getDeployConfig(buildSessionCluster.getMetadata(), (FlinkDeploymentSpec) buildSessionCluster.getSpec()));
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildSessionCluster.getStatus();
        FlinkDeploymentReconciliationStatus reconciliationStatus = flinkDeploymentStatus.getReconciliationStatus();
        Assertions.assertEquals(flinkDeploymentStatus.getReconciliationStatus().getState(), ReconciliationState.UPGRADING);
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        this.observer.observe(buildSessionCluster, TestUtils.createEmptyContext());
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        this.observer.observe(buildSessionCluster, createContextWithDeployment);
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        createDeployment.getMetadata().getAnnotations().put("flinkdeployment.flink.apache.org/generation", "321");
        buildSessionCluster.getMetadata().setGeneration(322L);
        ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).getFlinkConfiguration().put("k", "2");
        this.observer.observe(buildSessionCluster, createContextWithDeployment);
        Assertions.assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        SpecWithMeta deserializeLastReconciledSpecWithMeta = flinkDeploymentStatus.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
        Assertions.assertEquals(321L, deserializeLastReconciledSpecWithMeta.getMeta().getMetadata().getGeneration());
        Assertions.assertEquals("1", deserializeLastReconciledSpecWithMeta.getSpec().getFlinkConfiguration().get("k"));
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
