package org.apache.flink.kubernetes.operator.controller;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventBuilder;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.fabric8.mockwebserver.dsl.DelayPathable;
import io.fabric8.mockwebserver.dsl.ReturnOrWebsocketable;
import io.fabric8.mockwebserver.dsl.TimesOnceableOrHttpHeaderable;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.class */
public class FlinkDeploymentControllerTest {
    private TestingFlinkService flinkService;
    private Context<FlinkDeployment> context;
    private TestingFlinkDeploymentController testController;
    private KubernetesMockServer mockServer;
    private KubernetesClient kubernetesClient;
    private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
    Event mockedEvent = ((EventBuilder) new EventBuilder().withNewMetadata().withName("name").endMetadata()).withType("type").withReason("reason").build();

    @BeforeEach
    public void setup() {
        this.flinkService = new TestingFlinkService(this.kubernetesClient);
        this.context = this.flinkService.getContext();
        this.testController = new TestingFlinkDeploymentController(this.configManager, this.kubernetesClient, this.flinkService);
        this.kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void verifyBasicReconcileLoop(FlinkVersion flinkVersion) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        UpdateControl<FlinkDeployment> reconcile = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(4, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis())), reconcile.getScheduleDelay());
        FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError());
        Assertions.assertEquals(buildApplicationCluster.getSpec(), reconciliationStatus.deserializeLastReconciledSpec());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        UpdateControl<FlinkDeployment> reconcile2 = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(5, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile2.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getRestApiReadyDelay().toMillis())), reconcile2.getScheduleDelay());
        UpdateControl<FlinkDeployment> reconcile3 = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(6, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile3.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile3.getScheduleDelay());
        UpdateControl<FlinkDeployment> reconcile4 = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(6, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile4.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile4.getScheduleDelay());
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        JobStatusMessage jobStatusMessage = (JobStatusMessage) this.flinkService.listJobs().get(0).f1;
        Assertions.assertEquals(jobStatusMessage.getJobId().toHexString(), jobStatus.getJobId());
        Assertions.assertEquals(jobStatusMessage.getJobName(), jobStatus.getJobName());
        Assertions.assertEquals(jobStatusMessage.getJobState().toString(), jobStatus.getState());
        Assertions.assertEquals(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setJob((JobSpec) null);
        UpdateControl<FlinkDeployment> reconcile5 = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(7, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile5.isUpdateStatus());
        FlinkDeploymentReconciliationStatus reconciliationStatus2 = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError().contains("Cannot switch from job to session cluster"));
        Assertions.assertNotNull(reconciliationStatus2.deserializeLastReconciledSpec().getJob());
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus2 = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        JobStatusMessage jobStatusMessage2 = (JobStatusMessage) this.flinkService.listJobs().get(0).f1;
        Assertions.assertEquals(jobStatusMessage2.getJobId().toHexString(), jobStatus2.getJobId());
        Assertions.assertEquals(jobStatusMessage2.getJobName(), jobStatus2.getJobName());
        Assertions.assertEquals(jobStatusMessage2.getJobState().toString(), jobStatus2.getState());
        Assertions.assertEquals(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
    }

    @Test
    public void verifyFailedDeployment() throws Exception {
        TestUtils.ValidatingResponseProvider validatingResponseProvider = new TestUtils.ValidatingResponseProvider(this.mockedEvent, recordedRequest -> {
            Assertions.assertTrue(recordedRequest.getBody().readUtf8().contains("Starting deployment"));
        });
        ((TimesOnceableOrHttpHeaderable) ((ReturnOrWebsocketable) ((DelayPathable) this.mockServer.expect().post()).withPath("/api/v1/namespaces/flink-operator-test/events")).andReply(validatingResponseProvider)).once();
        TestUtils.ValidatingResponseProvider validatingResponseProvider2 = new TestUtils.ValidatingResponseProvider(this.mockedEvent, recordedRequest2 -> {
            Assertions.assertTrue(recordedRequest2.getBody().readUtf8().contains(TestUtils.DEPLOYMENT_ERROR));
        });
        ((TimesOnceableOrHttpHeaderable) ((ReturnOrWebsocketable) ((DelayPathable) this.mockServer.expect().post()).withPath("/api/v1/namespaces/flink-operator-test/events")).andReply(validatingResponseProvider2)).once();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.testController.reconcile(buildApplicationCluster, this.context);
        UpdateControl<FlinkDeployment> reconcile = this.testController.reconcile(buildApplicationCluster, TestUtils.createContextWithFailedJobManagerDeployment());
        validatingResponseProvider.assertValidated();
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile.getScheduleDelay());
        Assertions.assertEquals(JobManagerDeploymentStatus.ERROR, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        validatingResponseProvider2.assertValidated();
        Assertions.assertNotNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError());
        UpdateControl<FlinkDeployment> reconcile2 = this.testController.reconcile(buildApplicationCluster, TestUtils.createContextWithFailedJobManagerDeployment());
        Assertions.assertEquals(JobManagerDeploymentStatus.ERROR, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertFalse(reconcile2.isUpdateStatus());
        Assertions.assertEquals(ReconciliationUtils.rescheduleAfter(JobManagerDeploymentStatus.ERROR, buildApplicationCluster, this.configManager.getOperatorConfiguration()).toMillis(), (Long) reconcile2.getScheduleDelay().get());
    }

    @ValueSource(strings = {"CrashLoopBackOff", "ImagePullBackOff", "ErrImagePull"})
    @ParameterizedTest
    public void verifyInProgressDeploymentWithError(String str) throws Exception {
        String str2 = "container fails";
        TestUtils.ValidatingResponseProvider validatingResponseProvider = new TestUtils.ValidatingResponseProvider(this.mockedEvent, recordedRequest -> {
            Assertions.assertTrue(recordedRequest.getBody().readUtf8().contains("Starting deployment"));
        });
        ((TimesOnceableOrHttpHeaderable) ((ReturnOrWebsocketable) ((DelayPathable) this.mockServer.expect().post()).withPath("/api/v1/namespaces/flink-operator-test/events")).andReply(validatingResponseProvider)).once();
        TestUtils.ValidatingResponseProvider validatingResponseProvider2 = new TestUtils.ValidatingResponseProvider(this.mockedEvent, recordedRequest2 -> {
            String readUtf8 = recordedRequest2.getBody().readUtf8();
            Assertions.assertTrue(readUtf8.contains(str));
            Assertions.assertTrue(readUtf8.contains(str2));
        });
        ((TimesOnceableOrHttpHeaderable) ((ReturnOrWebsocketable) ((DelayPathable) this.mockServer.expect().post()).withPath("/api/v1/namespaces/flink-operator-test/events")).andReply(validatingResponseProvider2)).once();
        this.flinkService.setPodList(TestUtils.createFailedPodList("container fails", str));
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.testController.reconcile(buildApplicationCluster, this.context);
        UpdateControl<FlinkDeployment> reconcile = this.testController.reconcile(buildApplicationCluster, TestUtils.createContextWithInProgressDeployment());
        validatingResponseProvider.assertValidated();
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile.getScheduleDelay());
        Assertions.assertEquals(JobManagerDeploymentStatus.ERROR, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertNotNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError());
        UpdateControl<FlinkDeployment> reconcile2 = this.testController.reconcile(buildApplicationCluster, TestUtils.createContextWithFailedJobManagerDeployment());
        Assertions.assertEquals(JobManagerDeploymentStatus.ERROR, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertFalse(reconcile2.isUpdateStatus());
        Assertions.assertEquals(ReconciliationUtils.rescheduleAfter(JobManagerDeploymentStatus.READY, buildApplicationCluster, this.configManager.getOperatorConfiguration()).toMillis(), (Long) reconcile2.getScheduleDelay().get());
        validatingResponseProvider2.assertValidated();
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void verifyUpgradeFromSavepoint(FlinkVersion flinkVersion) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setInitialSavepointPath("s0");
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "file:///flink-data/savepoints");
        this.testController.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs.size());
        Assertions.assertEquals("s0", listJobs.get(0).f0);
        Assertions.assertEquals(new TaskManagerInfo("component=taskmanager,app=" + buildApplicationCluster.getMetadata().getName(), 1), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getTaskManager());
        ArrayList arrayList = new ArrayList(listJobs);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setInitialSavepointPath("s1");
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(arrayList, new ArrayList(this.flinkService.listJobs()));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(100);
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().isEmpty());
        Assertions.assertEquals(0L, (Long) this.testController.reconcile(buildApplicationCluster, this.context).getScheduleDelay().get());
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(1, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().size());
        Assertions.assertEquals(new TaskManagerInfo("", 0), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getTaskManager());
        this.testController.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs2 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs2.size());
        Assertions.assertEquals("savepoint_0", listJobs2.get(0).f0);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().size());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(flinkVersion.isNewerVersionThan(FlinkVersion.v1_14) ? JobManagerDeploymentStatus.READY : JobManagerDeploymentStatus.MISSING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.RUNNING);
        this.testController.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs3 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs3.size());
        Assertions.assertEquals("savepoint_1", listJobs3.get(0).f0);
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.cleanup(buildApplicationCluster, this.context);
        Assertions.assertEquals(0, this.flinkService.listJobs().size());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void verifyStatelessUpgrade(FlinkVersion flinkVersion) throws Exception {
        this.testController.events().clear();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setInitialSavepointPath("s0");
        this.testController.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs.size());
        Assertions.assertEquals("s0", listJobs.get(0).f0);
        Assertions.assertEquals(1, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(100);
        UpdateControl<FlinkDeployment> reconcile = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Suspended, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(0L, (Long) reconcile.getScheduleDelay().get());
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        UpdateControl<FlinkDeployment> reconcile2 = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(ReconciliationUtils.rescheduleAfter(JobManagerDeploymentStatus.DEPLOYING, buildApplicationCluster, this.configManager.getOperatorConfiguration()).toMillis(), (Long) reconcile2.getScheduleDelay().get());
        this.testController.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs2 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs2.size());
        Assertions.assertNull(listJobs2.get(0).f0);
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Suspended, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.RUNNING);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs3 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs3.size());
        Assertions.assertNull(listJobs3.get(0).f0);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(123L);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Suspended, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setLogConfiguration(Map.of("invalid", "conf"));
        this.testController.reconcile(buildApplicationCluster, TestUtils.createEmptyContext());
        Assertions.assertEquals(2, this.testController.events().size());
        this.testController.events().remove();
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().remove().getReason()));
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        List list = (List) this.testController.events().stream().filter(event -> {
            return !event.getReason().equals(EventRecorder.Reason.ValidationError.name());
        }).collect(Collectors.toList());
        Assertions.assertEquals(1, list.size());
        Assertions.assertEquals(EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(((Event) list.get(0)).getReason()));
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgradeNotReadyClusterSession(FlinkVersion flinkVersion) throws Exception {
        testUpgradeNotReadyCluster(TestUtils.buildSessionCluster(flinkVersion));
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes"})
    @ParameterizedTest
    public void testUpgradeNotReadyClusterApplication(FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(upgradeMode);
        testUpgradeNotReadyCluster((FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster));
        Assertions.assertEquals(upgradeMode, ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().getUpgradeMode());
    }

    @Test
    public void verifyReconcileWithBadConfig() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(RestOptions.PORT.key(), "8088");
        Assertions.assertFalse(this.testController.reconcile(buildApplicationCluster, this.context).isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJobManager().setReplicas(-1);
        UpdateControl<FlinkDeployment> reconcile = this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError().contains("JobManager replicas should not be configured less than one."));
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJobManager().setReplicas(1);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(0);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(RestOptions.PORT.key(), "12345");
        this.flinkService.setListJobConsumer(configuration -> {
            Assertions.assertEquals(8088, (Integer) configuration.get(RestOptions.PORT));
        });
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
    }

    @Test
    public void verifyReconcileWithAChangedOperatorModeToSession() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        Assertions.assertFalse(this.testController.reconcile(buildApplicationCluster, this.context).isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        UpdateControl<FlinkDeployment> reconcile = this.testController.reconcile(buildApplicationCluster, this.context);
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), jobStatus.getState());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setJob((JobSpec) null);
        Assertions.assertFalse(this.testController.reconcile(buildApplicationCluster, this.context).isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError().contains("Cannot switch from job to session cluster"));
        Assertions.assertNotNull(ReconciliationUtils.getDeployedSpec(buildApplicationCluster).getJob());
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus2 = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        JobStatusMessage jobStatusMessage = (JobStatusMessage) this.flinkService.listJobs().get(0).f1;
        Assertions.assertEquals(jobStatusMessage.getJobId().toHexString(), jobStatus2.getJobId());
        Assertions.assertEquals(jobStatusMessage.getJobName(), jobStatus2.getJobName());
        Assertions.assertEquals(jobStatusMessage.getJobState().toString(), jobStatus2.getState());
    }

    @Test
    public void verifyReconcileWithAChangedOperatorModeToApplication() throws Exception {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        Assertions.assertFalse(this.testController.reconcile(buildSessionCluster, this.context).isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        UpdateControl<FlinkDeployment> reconcile = this.testController.reconcile(buildSessionCluster, this.context);
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobStatus();
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNull(jobStatus.getState());
        ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).setJob(((FlinkSessionJobSpec) TestUtils.buildSessionJob().getSpec()).getJob());
        Assertions.assertFalse(this.testController.reconcile(buildSessionCluster, this.context).isUpdateStatus());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getError().contains("Cannot switch from session to job cluster"));
        Assertions.assertNull(ReconciliationUtils.getDeployedSpec(buildSessionCluster).getJob());
    }

    private void testUpgradeNotReadyCluster(FlinkDeployment flinkDeployment) throws Exception {
        this.flinkService.clear();
        this.testController.reconcile(flinkDeployment, this.context);
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment.getSpec());
        if (flinkDeploymentSpec.getJob() != null) {
            flinkDeploymentSpec.getJob().setUpgradeMode(UpgradeMode.STATELESS);
        }
        Assertions.assertEquals(flinkDeploymentSpec, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
        this.flinkService.setPortReady(false);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setServiceAccount(((FlinkDeploymentSpec) flinkDeployment.getSpec()).getServiceAccount() + "-2");
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus());
        FlinkDeploymentSpec flinkDeploymentSpec2 = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment.getSpec());
        if (flinkDeploymentSpec2.getJob() != null && flinkDeploymentSpec2.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
            flinkDeploymentSpec2.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }
        Assertions.assertEquals(flinkDeploymentSpec2, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
        this.flinkService.setPortReady(true);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        if (((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob() != null) {
            Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState());
        } else {
            Assertions.assertEquals(JobStatus.FINISHED.name(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState());
        }
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus());
        if (((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob() == null) {
            return;
        }
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setServiceAccount(((FlinkDeploymentSpec) flinkDeployment.getSpec()).getServiceAccount() + "-3");
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        this.flinkService.setPortReady(false);
        this.flinkService.setHaDataAvailable(false);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setServiceAccount(((FlinkDeploymentSpec) flinkDeployment.getSpec()).getServiceAccount() + "-4");
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        if (((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
            Assertions.assertEquals(flinkDeployment.getSpec(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
            return;
        }
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNotEquals(flinkDeployment.getSpec(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
        this.flinkService.setHaDataAvailable(true);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(UpgradeMode.LAST_STATE, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getUpgradeMode());
        this.flinkService.setPortReady(true);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus());
        this.flinkService.setPortReady(false);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED.key(), "false");
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setServiceAccount(((FlinkDeploymentSpec) flinkDeployment.getSpec()).getServiceAccount() + "-5");
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertNotEquals(flinkDeployment.getSpec(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
        this.flinkService.setPortReady(true);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus());
    }

    @Test
    public void testSuccessfulObservationShouldClearErrors() throws Exception {
        this.flinkService.setPodList(TestUtils.createFailedPodList("deploy errors", "CrashLoopBackOff"));
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, TestUtils.createContextWithInProgressDeployment());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        Assertions.assertEquals(JobManagerDeploymentStatus.ERROR, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError().contains("deploy errors"));
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError());
        Assertions.assertEquals(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
    }

    @Test
    public void testValidationError() throws Exception {
        Assertions.assertTrue(this.testController.events().isEmpty());
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(-1);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1, this.testController.events().size());
        Assertions.assertEquals(ResourceLifecycleState.FAILED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getLifecycleState());
        Event remove = this.testController.events().remove();
        Assertions.assertEquals("Warning", remove.getType());
        Assertions.assertEquals("ValidationError", remove.getReason());
        Assertions.assertTrue(remove.getMessage().startsWith("Job parallelism "));
    }

    @Test
    public void testEventOfNonDeploymentFailedException() throws Exception {
        Assertions.assertTrue(this.testController.events().isEmpty());
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.flinkService.setDeployFailure(true);
        try {
            this.testController.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        } catch (Exception e) {
        }
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals("Submit", this.testController.events().remove().getReason());
        Event remove = this.testController.events().remove();
        Assertions.assertEquals("ClusterDeploymentException", remove.getReason());
        Assertions.assertEquals("Deployment failure", remove.getMessage());
    }

    @Test
    public void cleanUpNewDeployment() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        KubernetesResourceMetricGroup resourceMetricGroup = this.testController.getContextFactory().getResourceContext(buildApplicationCluster, this.context).getResourceMetricGroup();
        Assertions.assertNotNull(this.testController.cleanup(buildApplicationCluster, this.context));
        Assertions.assertTrue(resourceMetricGroup.isClosed());
        Assertions.assertTrue(this.testController.getContextFactory().getMetricGroups().isEmpty());
    }

    @Test
    public void testIngressLifeCycle() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertNull(((Resource) ((NonNamespaceOperation) this.kubernetesClient.network().v1().ingresses().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).withName(buildApplicationCluster.getMetadata().getName())).get());
        FlinkDeployment buildApplicationCluster2 = TestUtils.buildApplicationCluster();
        IngressSpec.IngressSpecBuilder builder = IngressSpec.builder();
        builder.template("{{name}}.{{namespace}}.example.com");
        ((FlinkDeploymentSpec) buildApplicationCluster2.getSpec()).setIngress(builder.build());
        this.testController.reconcile(buildApplicationCluster2, this.context);
        this.testController.reconcile(buildApplicationCluster2, this.context);
        HasMetadata ingress = getIngress(buildApplicationCluster2);
        Assertions.assertNotNull(ingress);
        Assertions.assertEquals(getIngressHost(ingress), IngressUtils.getIngressUrl("{{name}}.{{namespace}}.example.com", buildApplicationCluster2.getMetadata().getName(), buildApplicationCluster2.getMetadata().getNamespace()).getHost());
        builder.template("http://{{name}}.{{namespace}}.foo.bar");
        ((FlinkDeploymentSpec) buildApplicationCluster2.getSpec()).setIngress(builder.build());
        this.testController.reconcile(buildApplicationCluster2, this.context);
        this.testController.reconcile(buildApplicationCluster2, this.context);
        Assertions.assertEquals(getIngressHost(getIngress(buildApplicationCluster2)), IngressUtils.getIngressUrl("{{name}}.{{namespace}}.foo.bar", buildApplicationCluster2.getMetadata().getName(), buildApplicationCluster2.getMetadata().getNamespace()).getHost());
    }

    @Test
    public void testInitialSavepointOnError() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setInitialSavepointPath("msp");
        this.flinkService.setDeployFailure(true);
        try {
            this.testController.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        } catch (Exception e) {
        }
        this.flinkService.setDeployFailure(false);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("msp", this.flinkService.listJobs().get(0).f0);
    }

    @Test
    public void testInitialHaError() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_15);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.flinkService.setHaDataAvailable(false);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.RUNNING);
        this.testController.events().clear();
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(3, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertTrue(this.testController.events().poll().getMessage().contains("HA metadata not available to restore from last state."));
        this.testController.events().clear();
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertTrue(this.testController.events().poll().getMessage().contains("HA metadata not available to restore from last state."));
        this.flinkService.setHaDataAvailable(true);
        this.testController.events().clear();
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
    }

    @Test
    public void verifyCanaryHandling() throws Exception {
        FlinkDeployment createCanaryDeployment = TestUtils.createCanaryDeployment();
        this.kubernetesClient.resource(createCanaryDeployment).create();
        Assertions.assertTrue(this.testController.reconcile(createCanaryDeployment, this.context).isNoUpdate());
        Assertions.assertEquals(0, this.testController.getInternalStatusUpdateCount());
        Assertions.assertEquals(1, this.testController.getCanaryResourceManager().getNumberOfActiveCanaries());
        this.testController.cleanup(createCanaryDeployment, this.context);
        Assertions.assertEquals(0, this.testController.getInternalStatusUpdateCount());
        Assertions.assertEquals(0, this.testController.getCanaryResourceManager().getNumberOfActiveCanaries());
    }

    private HasMetadata getIngress(FlinkDeployment flinkDeployment) {
        return IngressUtils.ingressInNetworkingV1(this.kubernetesClient) ? (HasMetadata) ((Resource) ((NonNamespaceOperation) this.kubernetesClient.network().v1().ingresses().inNamespace(flinkDeployment.getMetadata().getNamespace())).withName(flinkDeployment.getMetadata().getName())).get() : (HasMetadata) ((Resource) ((NonNamespaceOperation) this.kubernetesClient.network().v1beta1().ingresses().inNamespace(flinkDeployment.getMetadata().getNamespace())).withName(flinkDeployment.getMetadata().getName())).get();
    }

    private String getIngressHost(HasMetadata hasMetadata) {
        if (IngressUtils.ingressInNetworkingV1(this.kubernetesClient)) {
            IngressRule ingressRule = (IngressRule) ((Ingress) hasMetadata).getSpec().getRules().stream().findFirst().get();
            Assertions.assertNotNull(ingressRule);
            return ingressRule.getHost();
        }
        io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule ingressRule2 = (io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule) ((io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress) hasMetadata).getSpec().getRules().stream().findFirst().get();
        Assertions.assertNotNull(ingressRule2);
        return ingressRule2.getHost();
    }
}
