package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.checkpointing.utils.AccumulatingIntegerSink;
import org.apache.flink.test.checkpointing.utils.CancellingIntegerSource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/checkpointing/UnalignedCheckpointCompatibilityITCase.class */
public class UnalignedCheckpointCompatibilityITCase extends TestLogger {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final int TOTAL_ELEMENTS = 20;
    private static final int FIRST_RUN_EL_COUNT = 10;
    private static final int FIRST_RUN_BACKPRESSURE_MS = 100;
    private static final int PARALLELISM = 1;
    private final boolean startAligned;
    private final CheckpointType type;
    private static MiniClusterWithClientResource miniCluster;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "type: {0}, startAligned: {1}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{CheckpointType.CHECKPOINT, true}, new Object[]{CheckpointType.CHECKPOINT, false}, new Object[]{CheckpointType.SAVEPOINT, true}, new Object[]{CheckpointType.SAVEPOINT, false}};
    }

    public UnalignedCheckpointCompatibilityITCase(CheckpointType checkpointType, boolean z) {
        this.startAligned = z;
        this.type = checkpointType;
    }

    @BeforeClass
    public static void setupMiniCluster() throws Exception {
        File root = temporaryFolder.getRoot();
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, root.toURI().toString());
        configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, Integer.MAX_VALUE);
        miniCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).build());
        miniCluster.before();
    }

    @AfterClass
    public static void teardownMiniCluster() {
        miniCluster.after();
    }

    @Before
    public void cleanDirectory() throws IOException {
        FileUtils.cleanDirectory(temporaryFolder.getRoot());
    }

    @Test
    public void test() throws Exception {
        Tuple2<String, Map<String, Object>> runAndTakeSavepoint = this.type.isSavepoint() ? runAndTakeSavepoint() : runAndTakeExternalCheckpoint();
        String str = (String) runAndTakeSavepoint.f0;
        Map map = (Map) runAndTakeSavepoint.f1;
        Map<String, Object> runFromSavepoint = runFromSavepoint(str, !this.startAligned, TOTAL_ELEMENTS);
        if (this.type.isSavepoint()) {
            Assert.assertEquals(intRange(0, TOTAL_ELEMENTS), extractAndConcat(map, runFromSavepoint));
        }
    }

    private Tuple2<String, Map<String, Object>> runAndTakeSavepoint() throws Exception {
        JobClient submitJobInitially = submitJobInitially(env(this.startAligned, 0));
        CommonTestUtils.waitForAllTaskRunning(() -> {
            return (AccessExecutionGraph) miniCluster.getMiniCluster().getExecutionGraph(submitJobInitially.getJobID()).get();
        });
        Thread.sleep(100L);
        return new Tuple2<>(submitJobInitially.stopWithSavepoint(false, tempFolder().toURI().toString()).get(), submitJobInitially.getJobExecutionResult().thenApply((v0) -> {
            return v0.getAllAccumulatorResults();
        }).get());
    }

    private Tuple2<String, Map<String, Object>> runAndTakeExternalCheckpoint() throws Exception {
        JobClient submitJobInitially = submitJobInitially(env(this.startAligned, 100));
        File waitForChild = waitForChild(waitForChild(waitForChild(temporaryFolder.getRoot(), (file, str) -> {
            return true;
        }), (file2, str2) -> {
            return str2.startsWith("chk-");
        }), (file3, str3) -> {
            return str3.equals("_metadata");
        });
        cancelJob(submitJobInitially);
        return new Tuple2<>(waitForChild.getParentFile().toString(), Collections.emptyMap());
    }

    private static JobClient submitJobInitially(StreamExecutionEnvironment streamExecutionEnvironment) throws Exception {
        return streamExecutionEnvironment.executeAsync(dag(FIRST_RUN_EL_COUNT, true, 100, streamExecutionEnvironment));
    }

    private Map<String, Object> runFromSavepoint(String str, boolean z, int i) throws Exception {
        StreamExecutionEnvironment env = env(z, 50);
        StreamGraph dag = dag(i, false, 0, env);
        dag.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        return env.execute(dag).getJobExecutionResult().getAllAccumulatorResults();
    }

    private static File waitForChild(File file, FilenameFilter filenameFilter) throws InterruptedException {
        File[] listFiles = file.listFiles(filenameFilter);
        while (true) {
            File[] fileArr = listFiles;
            if (fileArr.length != 0) {
                return (File) Arrays.stream(fileArr).max(Comparator.naturalOrder()).get();
            }
            Thread.sleep(50L);
            listFiles = file.listFiles(filenameFilter);
        }
    }

    private void cancelJob(JobClient jobClient) throws InterruptedException, ExecutionException {
        jobClient.cancel().get();
        try {
            jobClient.getJobExecutionResult();
        } catch (Exception e) {
        }
    }

    private StreamExecutionEnvironment env(boolean z, int i) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(PARALLELISM);
        executionEnvironment.setRestartStrategy(new RestartStrategies.NoRestartStrategyConfiguration());
        executionEnvironment.getCheckpointConfig().enableUnalignedCheckpoints(!z);
        executionEnvironment.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
        executionEnvironment.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        if (i > 0) {
            executionEnvironment.enableCheckpointing(i);
        }
        return executionEnvironment;
    }

    private static StreamGraph dag(int i, boolean z, int i2, StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.addSource(CancellingIntegerSource.upTo(i, z)).addSink(new AccumulatingIntegerSink(i2));
        return streamExecutionEnvironment.getStreamGraph();
    }

    private static List<Integer> intRange(int i, int i2) {
        return (List) IntStream.range(i, i2).boxed().collect(Collectors.toList());
    }

    private static List<Integer> extractAndConcat(Map<String, Object>... mapArr) {
        return (List) Stream.of((Object[]) mapArr).map(AccumulatingIntegerSink::getOutput).peek(list -> {
            Preconditions.checkState(!list.isEmpty());
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    private File tempFolder() throws IOException {
        return temporaryFolder.newFolder();
    }
}
