package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingClusterClient;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.class */
public class NativeFlinkServiceTest {
    KubernetesClient client;
    private final Configuration configuration = new Configuration();
    private final FlinkConfigManager configManager = new FlinkConfigManager(this.configuration);

    /* loaded from: input_file:org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest$TestingNativeFlinkService.class */
    class TestingNativeFlinkService extends NativeFlinkService {
        private Configuration runtimeConfig;

        public TestingNativeFlinkService(NativeFlinkService nativeFlinkService) {
            super(nativeFlinkService.kubernetesClient, nativeFlinkService.configManager);
        }

        protected void deployApplicationCluster(JobSpec jobSpec, Configuration configuration) {
            this.runtimeConfig = configuration;
        }

        protected void submitClusterInternal(Configuration configuration) throws Exception {
            this.runtimeConfig = configuration;
        }

        public Configuration getRuntimeConfig() {
            return this.runtimeConfig;
        }
    }

    @BeforeEach
    public void setup() {
        this.configuration.set(KubernetesConfigOptions.CLUSTER_ID, "test-cluster");
        this.configuration.set(KubernetesConfigOptions.NAMESPACE, "flink-operator-test");
        this.configuration.set(FlinkConfigBuilder.FLINK_VERSION, FlinkVersion.v1_15);
    }

    @Test
    public void testCancelJobWithStatelessUpgradeMode() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        CompletableFuture completableFuture = new CompletableFuture();
        testingClusterClient.setCancelFunction(jobID -> {
            completableFuture.complete(jobID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        FlinkService createFlinkService = createFlinkService(testingClusterClient);
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState("RUNNING");
        createFlinkService.cancelJob(buildApplicationCluster, UpgradeMode.STATELESS, this.configManager.getObserveConfig(buildApplicationCluster));
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, completableFuture.get());
        Assertions.assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testCancelJobWithSavepointUpgradeMode(FlinkVersion flinkVersion) throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        CompletableFuture completableFuture = new CompletableFuture();
        this.configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file:///path/of/svp-1");
        testingClusterClient.setStopWithSavepointFunction((jobID, bool, str) -> {
            completableFuture.complete(new Tuple3(jobID, bool, str));
            return CompletableFuture.completedFuture("file:///path/of/svp-1");
        });
        FlinkService createFlinkService = createFlinkService(testingClusterClient);
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "file:///path/of/svp-1");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkVersion(flinkVersion);
        createFlinkService.cancelJob(buildApplicationCluster, UpgradeMode.SAVEPOINT, this.configManager.getObserveConfig(buildApplicationCluster));
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, ((Tuple3) completableFuture.get()).f0);
        Assertions.assertFalse(((Boolean) ((Tuple3) completableFuture.get()).f1).booleanValue());
        Assertions.assertEquals("file:///path/of/svp-1", ((Tuple3) completableFuture.get()).f2);
        Assertions.assertEquals("file:///path/of/svp-1", jobStatus.getSavepointInfo().getLastSavepoint().getLocation());
        Assertions.assertEquals(jobStatus.getState(), org.apache.flink.api.common.JobStatus.FINISHED.name());
        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_14)) {
            Assertions.assertEquals(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus(), JobManagerDeploymentStatus.READY);
        } else {
            Assertions.assertEquals(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus(), JobManagerDeploymentStatus.MISSING);
        }
    }

    @Test
    public void testCancelJobWithLastStateUpgradeMode() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        FlinkService createFlinkService = createFlinkService(new TestingClusterClient(this.configuration, "test-cluster"));
        this.client.resource(createTestingDeployment()).create();
        Assertions.assertNotNull(((RollableScalableResource) ((NonNamespaceOperation) this.client.apps().deployments().inNamespace("flink-operator-test")).withName("test-cluster")).get());
        JobID generate = JobID.generate();
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        createFlinkService.cancelJob(buildApplicationCluster, UpgradeMode.LAST_STATE, this.configManager.getObserveConfig(buildApplicationCluster));
        Assertions.assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
        Assertions.assertNull(((RollableScalableResource) ((NonNamespaceOperation) this.client.apps().deployments().inNamespace("flink-operator-test")).withName("test-cluster")).get());
    }

    @Test
    public void testTriggerSavepoint() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        CompletableFuture completableFuture = new CompletableFuture();
        this.configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file:///path/of/svp");
        testingClusterClient.setRequestProcessor((messageHeaders, messageParameters, requestBody) -> {
            completableFuture.complete(new Tuple3((JobID) ((SavepointTriggerMessageParameters) messageParameters).jobID.getValue(), (String) ((SavepointTriggerRequestBody) requestBody).getTargetDirectory().get(), Boolean.valueOf(((SavepointTriggerRequestBody) requestBody).isCancelJob())));
            return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId()));
        });
        FlinkService createFlinkService = createFlinkService(testingClusterClient);
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        JobStatus jobStatus = new JobStatus();
        jobStatus.setJobId(generate.toString());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobStatus(jobStatus);
        createFlinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), this.configuration);
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, ((Tuple3) completableFuture.get()).f0);
        Assertions.assertEquals("file:///path/of/svp", ((Tuple3) completableFuture.get()).f1);
        Assertions.assertFalse(((Boolean) ((Tuple3) completableFuture.get()).f2).booleanValue());
    }

    @Test
    public void testGetLastCheckpoint() throws Exception {
        ObjectMapper strictObjectMapper = RestMapperUtils.getStrictObjectMapper();
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        ArrayList arrayList = new ArrayList();
        testingClusterClient.setRequestProcessor((messageHeaders, messageParameters, requestBody) -> {
            if (messageHeaders instanceof CustomCheckpointingStatisticsHeaders) {
                return CompletableFuture.completedFuture((ResponseBody) arrayList.get(0));
            }
            Assertions.fail("unknown request");
            return null;
        });
        FlinkService createFlinkService = createFlinkService(testingClusterClient);
        arrayList.add((CheckpointHistoryWrapper) strictObjectMapper.readValue("{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":{\"className\":\"completed\",\"id\":96,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212837604,\"latest_ack_timestamp\":1653212837621,\"checkpointed_size\":28437,\"state_size\":28437,\"end_to_end_duration\":17,\"alignment_buffered\":0,\"processed_data\":560,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96\",\"discarded\":false},\"savepoint\":{\"className\":\"completed\",\"id\":51,\"status\":\"COMPLETED\",\"is_savepoint\":true,\"trigger_timestamp\":1653212748176,\"latest_ack_timestamp\":1653212748233,\"checkpointed_size\":53670,\"state_size\":53670,\"end_to_end_duration\":57,\"alignment_buffered\":0,\"processed_data\":483,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"SAVEPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-e8ea2482ce4f\",\"discarded\":false},\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-5930e5326ca7\"}},\"history\":[{\"className\":\"completed\",\"id\":96,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212837604,\"latest_ack_timestamp\":1653212837621,\"checkpointed_size\":28437,\"state_size\":28437,\"end_to_end_duration\":17,\"alignment_buffered\":0,\"processed_data\":560,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96\",\"discarded\":false},{\"className\":\"completed\",\"id\":95,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212835603,\"latest_ack_timestamp\":1653212835622,\"checkpointed_size\":28473,\"state_size\":28473,\"end_to_end_duration\":19,\"alignment_buffered\":0,\"processed_data\":42,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-95\",\"discarded\":true},{\"className\":\"completed\",\"id\":94,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212833603,\"latest_ack_timestamp\":1653212833623,\"checkpointed_size\":27969,\"state_size\":27969,\"end_to_end_duration\":20,\"alignment_buffered\":0,\"processed_data\":28,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-94\",\"discarded\":true},{\"className\":\"completed\",\"id\":93,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212831603,\"latest_ack_timestamp\":1653212831621,\"checkpointed_size\":28113,\"state_size\":28113,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":138,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-93\",\"discarded\":true},{\"className\":\"completed\",\"id\":92,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212829603,\"latest_ack_timestamp\":1653212829621,\"checkpointed_size\":28293,\"state_size\":28293,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":196,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-92\",\"discarded\":true},{\"className\":\"completed\",\"id\":91,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212827603,\"latest_ack_timestamp\":1653212827629,\"checkpointed_size\":27969,\"state_size\":27969,\"end_to_end_duration\":26,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-91\",\"discarded\":true},{\"className\":\"completed\",\"id\":90,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212825603,\"latest_ack_timestamp\":1653212825641,\"checkpointed_size\":27735,\"state_size\":27735,\"end_to_end_duration\":38,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-90\",\"discarded\":true},{\"className\":\"completed\",\"id\":89,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212823603,\"latest_ack_timestamp\":1653212823618,\"checkpointed_size\":28545,\"state_size\":28545,\"end_to_end_duration\":15,\"alignment_buffered\":0,\"processed_data\":364,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-89\",\"discarded\":true},{\"className\":\"completed\",\"id\":88,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212821603,\"latest_ack_timestamp\":1653212821619,\"checkpointed_size\":28275,\"state_size\":28275,\"end_to_end_duration\":16,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-88\",\"discarded\":true},{\"className\":\"completed\",\"id\":87,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1653212819604,\"latest_ack_timestamp\":1653212819622,\"checkpointed_size\":28518,\"state_size\":28518,\"end_to_end_duration\":18,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-87\",\"discarded\":true}]}", CheckpointHistoryWrapper.class));
        Assertions.assertEquals("file:/flink-data/checkpoints/00000000000000000000000000000000/chk-96", ((Savepoint) createFlinkService.getLastCheckpoint(new JobID(), new Configuration()).get()).getLocation());
        arrayList.set(0, (CheckpointHistoryWrapper) strictObjectMapper.readValue("{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":null,\"savepoint\":null,\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"file:/flink-data/savepoints/savepoint-000000-5930e5326ca7\"}},\"history\":[]}", CheckpointHistoryWrapper.class));
        Assertions.assertEquals("file:/flink-data/savepoints/savepoint-000000-5930e5326ca7", ((Savepoint) createFlinkService.getLastCheckpoint(new JobID(), new Configuration()).get()).getLocation());
        arrayList.set(0, (CheckpointHistoryWrapper) strictObjectMapper.readValue("{\"counts\":{\"restored\":1,\"total\":79,\"in_progress\":0,\"completed\":69,\"failed\":10},\"summary\":{\"checkpointed_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"state_size\":{\"min\":23928,\"max\":53670,\"avg\":28551,\"p50\":28239,\"p90\":28563,\"p95\":28635,\"p99\":53670,\"p999\":53670},\"end_to_end_duration\":{\"min\":14,\"max\":117,\"avg\":24,\"p50\":22,\"p90\":32,\"p95\":40.5,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1274,\"avg\":280,\"p50\":112,\"p90\":840,\"p95\":1071,\"p99\":1274,\"p999\":1274},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":null,\"savepoint\":null,\"failed\":null,\"restored\":{\"id\":27,\"restore_timestamp\":1653212683022,\"is_savepoint\":true,\"external_path\":\"<checkpoint-not-externally-addressable>\"}},\"history\":[]}", CheckpointHistoryWrapper.class));
        try {
            createFlinkService.getLastCheckpoint(new JobID(), new Configuration());
            Assertions.fail();
        } catch (RecoveryFailureException e) {
        }
    }

    @Test
    public void testGetLastSavepointRestCompatibility() throws JsonProcessingException {
        ObjectMapper strictObjectMapper = RestMapperUtils.getStrictObjectMapper();
        strictObjectMapper.readValue("{\"counts\":{\"restored\":0,\"total\":2,\"in_progress\":0,\"completed\":2,\"failed\":0},\"summary\":{\"state_size\":{\"min\":8646,\"max\":25626,\"avg\":17136},\"end_to_end_duration\":{\"min\":95,\"max\":420,\"avg\":257},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0},\"processed_data\":{\"min\":0,\"max\":70,\"avg\":35},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0}},\"latest\":{\"completed\":{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652347653972,\"latest_ack_timestamp\":1652347654392,\"state_size\":8646,\"end_to_end_duration\":420,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-cp/9f096f515d5d66dbda0d854b5d5a7af2/chk-1\",\"discarded\":true},\"savepoint\":{\"@class\":\"completed\",\"id\":2,\"status\":\"COMPLETED\",\"is_savepoint\":true,\"trigger_timestamp\":1652347655184,\"latest_ack_timestamp\":1652347655279,\"state_size\":25626,\"end_to_end_duration\":95,\"alignment_buffered\":0,\"processed_data\":70,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"SYNC_SAVEPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-sp/savepoint-9f096f-cebc9a861a41\",\"discarded\":false},\"failed\":null,\"restored\":null},\"history\":[{\"@class\":\"completed\",\"id\":2,\"status\":\"COMPLETED\",\"is_savepoint\":true,\"trigger_timestamp\":1652347655184,\"latest_ack_timestamp\":1652347655279,\"state_size\":25626,\"end_to_end_duration\":95,\"alignment_buffered\":0,\"processed_data\":70,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"SYNC_SAVEPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-sp/savepoint-9f096f-cebc9a861a41\",\"discarded\":false},{\"@class\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652347653972,\"latest_ack_timestamp\":1652347654392,\"state_size\":8646,\"end_to_end_duration\":420,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/opt/flink/volume/flink-cp/9f096f515d5d66dbda0d854b5d5a7af2/chk-1\",\"discarded\":true}]}", CheckpointHistoryWrapper.class);
        strictObjectMapper.readValue("{\"counts\":{\"restored\":0,\"total\":12,\"in_progress\":0,\"completed\":3,\"failed\":9},\"summary\":{\"checkpointed_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"state_size\":{\"min\":4308,\"max\":16053,\"avg\":11856,\"p50\":15207,\"p90\":16053,\"p95\":16053,\"p99\":16053,\"p999\":16053},\"end_to_end_duration\":{\"min\":31,\"max\":117,\"avg\":61,\"p50\":36,\"p90\":117,\"p95\":117,\"p99\":117,\"p999\":117},\"alignment_buffered\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0},\"processed_data\":{\"min\":0,\"max\":1232,\"avg\":448,\"p50\":112,\"p90\":1232,\"p95\":1232,\"p99\":1232,\"p999\":1232},\"persisted_data\":{\"min\":0,\"max\":0,\"avg\":0,\"p50\":0,\"p90\":0,\"p95\":0,\"p99\":0,\"p999\":0}},\"latest\":{\"completed\":{\"className\":\"completed\",\"id\":3,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652348962111,\"latest_ack_timestamp\":1652348962142,\"checkpointed_size\":15207,\"state_size\":15207,\"end_to_end_duration\":31,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-3\",\"discarded\":true},\"savepoint\":null,\"failed\":null,\"restored\":null},\"history\":[{\"className\":\"completed\",\"id\":3,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652348962111,\"latest_ack_timestamp\":1652348962142,\"checkpointed_size\":15207,\"state_size\":15207,\"end_to_end_duration\":31,\"alignment_buffered\":0,\"processed_data\":0,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-3\",\"discarded\":true},{\"className\":\"completed\",\"id\":2,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652348960109,\"latest_ack_timestamp\":1652348960145,\"checkpointed_size\":16053,\"state_size\":16053,\"end_to_end_duration\":36,\"alignment_buffered\":0,\"processed_data\":1232,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-2\",\"discarded\":true},{\"className\":\"completed\",\"id\":1,\"status\":\"COMPLETED\",\"is_savepoint\":false,\"trigger_timestamp\":1652348958109,\"latest_ack_timestamp\":1652348958226,\"checkpointed_size\":4308,\"state_size\":4308,\"end_to_end_duration\":117,\"alignment_buffered\":0,\"processed_data\":112,\"persisted_data\":0,\"num_subtasks\":4,\"num_acknowledged_subtasks\":4,\"checkpoint_type\":\"CHECKPOINT\",\"tasks\":{},\"external_path\":\"file:/flink-data/checkpoints/00000000000000000000000000000000/chk-1\",\"discarded\":true}]}", CheckpointHistoryWrapper.class);
    }

    @Test
    public void testClusterInfoRestCompatibility() throws JsonProcessingException {
        ObjectMapper strictObjectMapper = RestMapperUtils.getStrictObjectMapper();
    }

    @Test
    public void testRemoveOperatorConfig() {
        Assertions.assertFalse(AbstractFlinkService.removeOperatorConfigs(createOperatorConfig()).containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    @Test
    public void testSubmitApplicationClusterConfigRemoval() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        Configuration createOperatorConfig = createOperatorConfig();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        TestingNativeFlinkService testingNativeFlinkService = new TestingNativeFlinkService(createFlinkService(testingClusterClient));
        testingNativeFlinkService.submitApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), createOperatorConfig, false);
        Assertions.assertFalse(testingNativeFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    @Test
    public void testSubmitSessionClusterConfigRemoval() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        Configuration createOperatorConfig = createOperatorConfig();
        TestingNativeFlinkService testingNativeFlinkService = new TestingNativeFlinkService(createFlinkService(testingClusterClient));
        testingNativeFlinkService.submitSessionCluster(createOperatorConfig);
        Assertions.assertFalse(testingNativeFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    @Test
    public void testEffectiveStatus() {
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.RUNNING, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 4))));
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.RUNNING, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 2), Tuple2.of(ExecutionState.FINISHED, 2))));
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.CREATED, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.RUNNING, Tuple2.of(ExecutionState.RUNNING, 2), Tuple2.of(ExecutionState.SCHEDULED, 2))));
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.FINISHED, AbstractFlinkService.getEffectiveStatus(getJobDetails(org.apache.flink.api.common.JobStatus.FINISHED, Tuple2.of(ExecutionState.FINISHED, 4))));
    }

    @Test
    public void testNativeSavepointFormat() throws Exception {
        TestingClusterClient testingClusterClient = new TestingClusterClient(this.configuration, "test-cluster");
        CompletableFuture completableFuture = new CompletableFuture();
        this.configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file:///path/of/svp");
        testingClusterClient.setRequestProcessor((messageHeaders, messageParameters, requestBody) -> {
            completableFuture.complete(new Tuple4((JobID) ((SavepointTriggerMessageParameters) messageParameters).jobID.getValue(), (String) ((SavepointTriggerRequestBody) requestBody).getTargetDirectory().get(), Boolean.valueOf(((SavepointTriggerRequestBody) requestBody).isCancelJob()), ((SavepointTriggerRequestBody) requestBody).getFormatType()));
            return CompletableFuture.completedFuture(new TriggerResponse(new TriggerId()));
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingClusterClient.setStopWithSavepointFormat((jobID, savepointFormatType, str) -> {
            completableFuture2.complete(new Tuple3(jobID, savepointFormatType, str));
            return CompletableFuture.completedFuture("file:///path/of/svp");
        });
        FlinkService createFlinkService = createFlinkService(testingClusterClient);
        JobID generate = JobID.generate();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "file:///path/of/svp");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        JobStatus jobStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus();
        jobStatus.setJobId(generate.toHexString());
        jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        jobStatus.setJobId(generate.toString());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobStatus(jobStatus);
        createFlinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), new Configuration(this.configuration).set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE));
        Assertions.assertTrue(completableFuture.isDone());
        Assertions.assertEquals(generate, ((Tuple4) completableFuture.get()).f0);
        Assertions.assertEquals("file:///path/of/svp", ((Tuple4) completableFuture.get()).f1);
        Assertions.assertFalse(((Boolean) ((Tuple4) completableFuture.get()).f2).booleanValue());
        Assertions.assertEquals(SavepointFormatType.NATIVE, ((Tuple4) completableFuture.get()).f3);
        createFlinkService.cancelJob(buildApplicationCluster, UpgradeMode.SAVEPOINT, new Configuration(this.configManager.getObserveConfig(buildApplicationCluster)).set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE, SavepointFormatType.NATIVE));
        Assertions.assertTrue(completableFuture2.isDone());
        Assertions.assertEquals(generate, ((Tuple3) completableFuture2.get()).f0);
        Assertions.assertEquals(SavepointFormatType.NATIVE, ((Tuple3) completableFuture2.get()).f1);
        Assertions.assertEquals("file:///path/of/svp", ((Tuple3) completableFuture2.get()).f2);
    }

    private JobDetails getJobDetails(org.apache.flink.api.common.JobStatus jobStatus, Tuple2<ExecutionState, Integer>... tuple2Arr) {
        int[] iArr = new int[ExecutionState.values().length];
        for (Tuple2<ExecutionState, Integer> tuple2 : tuple2Arr) {
            iArr[((ExecutionState) tuple2.f0).ordinal()] = ((Integer) tuple2.f1).intValue();
        }
        return new JobDetails(new JobID(), "test-job", System.currentTimeMillis(), -1L, 0L, jobStatus, System.currentTimeMillis(), iArr, Arrays.stream(iArr).sum());
    }

    private FlinkService createFlinkService(final ClusterClient<String> clusterClient) {
        return new NativeFlinkService(this.client, new FlinkConfigManager(this.configuration)) { // from class: org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest.1
            public ClusterClient<String> getClusterClient(Configuration configuration) {
                return clusterClient;
            }
        };
    }

    private Deployment createTestingDeployment() {
        return ((DeploymentBuilder) ((DeploymentBuilder) new DeploymentBuilder().withNewMetadata().withName("test-cluster").withNamespace("flink-operator-test").endMetadata()).withNewSpec().endSpec()).build();
    }

    private Configuration createOperatorConfig() {
        return Configuration.fromMap(Map.of(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key(), "80"));
    }
}
