package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.FilePathFilter;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.AssumptionViolatedException;
import org.junit.Before;

/* loaded from: input_file:org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.class */
public class ContinuousFileProcessingCheckpointITCase extends StreamFaultToleranceTestBase {
    private static final int NO_OF_FILES = 5;
    private static final int LINES_PER_FILE = 150;
    private static final int NO_OF_RETRIES = 3;
    private static final long INTERVAL = 100;
    private static File baseDir;
    private static FileSystem localFs;
    private static String localFsURI;
    private FileCreator fc;
    private static Map<Integer, Set<String>> actualCollectedContent;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase$FileCreator.class */
    private class FileCreator extends Thread {
        private final Set<Path> filesCreated;
        private final Map<Integer, String> fileContents;
        private long lastCreatedModTime;
        static final /* synthetic */ boolean $assertionsDisabled;

        private FileCreator() {
            this.filesCreated = new HashSet();
            this.fileContents = new HashMap();
            this.lastCreatedModTime = Long.MIN_VALUE;
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Tuple2 fillWithData;
            long modificationTime;
            for (int i = 0; i < ContinuousFileProcessingCheckpointITCase.NO_OF_FILES; i++) {
                do {
                    try {
                        Thread.sleep(50L);
                        fillWithData = ContinuousFileProcessingCheckpointITCase.this.fillWithData(ContinuousFileProcessingCheckpointITCase.localFsURI, "file", i, "This is test line.");
                        modificationTime = ContinuousFileProcessingCheckpointITCase.localFs.getFileStatus((Path) fillWithData.f0).getModificationTime();
                        if (modificationTime <= this.lastCreatedModTime) {
                            ContinuousFileProcessingCheckpointITCase.localFs.delete((Path) fillWithData.f0, false);
                        }
                    } catch (IOException | InterruptedException e) {
                        e.printStackTrace();
                        return;
                    }
                } while (modificationTime <= this.lastCreatedModTime);
                this.lastCreatedModTime = modificationTime;
                Path path = new Path(ContinuousFileProcessingCheckpointITCase.localFsURI + "/file" + i);
                ContinuousFileProcessingCheckpointITCase.localFs.rename((Path) fillWithData.f0, path);
                Assert.assertTrue(ContinuousFileProcessingCheckpointITCase.localFs.exists(path));
                this.filesCreated.add(path);
                this.fileContents.put(Integer.valueOf(i), fillWithData.f1);
            }
        }

        void clean() throws IOException {
            if (!$assertionsDisabled && ContinuousFileProcessingCheckpointITCase.localFs == null) {
                throw new AssertionError();
            }
            Iterator<Path> it = this.filesCreated.iterator();
            while (it.hasNext()) {
                ContinuousFileProcessingCheckpointITCase.localFs.delete(it.next(), false);
            }
            this.fileContents.clear();
        }

        Map<Integer, String> getFileContent() {
            return this.fileContents;
        }

        static {
            $assertionsDisabled = !ContinuousFileProcessingCheckpointITCase.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase$TestingSinkFunction.class */
    private static class TestingSinkFunction extends RichSinkFunction<String> implements ListCheckpointed<Tuple2<Long, Map<Integer, Set<String>>>>, CheckpointListener {
        private long elementsToFailure;
        private Map<Integer, Set<String>> actualContent = new HashMap();
        private boolean hasRestoredAfterFailure = false;
        private long elementCounter = 0;
        private volatile int successfulCheckpoints = 0;

        TestingSinkFunction() {
        }

        public void open(Configuration configuration) throws Exception {
            Assert.assertEquals(1L, getRuntimeContext().getNumberOfParallelSubtasks());
            this.elementsToFailure = (new Random().nextLong() % (105 - 60)) + 60;
        }

        public void invoke(String str) throws Exception {
            int fileIdx = getFileIdx(str);
            Set<String> set = this.actualContent.get(Integer.valueOf(fileIdx));
            if (set == null) {
                set = new HashSet();
                this.actualContent.put(Integer.valueOf(fileIdx), set);
            }
            if (!set.add(str + "\n")) {
                Assert.fail("Duplicate line: " + str);
                System.exit(0);
            }
            this.elementCounter++;
            if (this.elementCounter >= 750) {
                Map unused = ContinuousFileProcessingCheckpointITCase.actualCollectedContent = this.actualContent;
                throw new SuccessException();
            }
            if (!this.hasRestoredAfterFailure && this.successfulCheckpoints < 2) {
                Thread.sleep(5L);
            }
            if (!this.hasRestoredAfterFailure && this.successfulCheckpoints >= 2 && this.elementCounter >= this.elementsToFailure) {
                throw new Exception("Task Failure @ elem: " + this.elementCounter + " / " + this.elementsToFailure);
            }
        }

        public void close() {
            try {
                super.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public List<Tuple2<Long, Map<Integer, Set<String>>>> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(new Tuple2(Long.valueOf(this.elementCounter), this.actualContent));
        }

        public void restoreState(List<Tuple2<Long, Map<Integer, Set<String>>>> list) throws Exception {
            Tuple2<Long, Map<Integer, Set<String>>> tuple2 = list.get(0);
            this.elementCounter = ((Long) tuple2.f0).longValue();
            this.actualContent = (Map) tuple2.f1;
            this.hasRestoredAfterFailure = this.elementCounter != 0;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            this.successfulCheckpoints++;
        }

        private int getFileIdx(String str) {
            return Integer.parseInt(str.split(":")[0]);
        }
    }

    @Before
    public void createHDFS() {
        if (this.failoverStrategy.equals(StreamFaultToleranceTestBase.FailoverStrategy.RestartPipelinedRegionStrategy)) {
            throw new AssumptionViolatedException("ignored ContinuousFileProcessingCheckpointITCase when using RestartPipelinedRegionStrategy");
        }
        try {
            baseDir = new File("./target/localfs/fs_tests").getAbsoluteFile();
            FileUtil.fullyDelete(baseDir);
            org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
            localFsURI = "file:///" + baseDir + "/";
            localFs = new Path(localFsURI).getFileSystem(configuration);
        } catch (Throwable th) {
            th.printStackTrace();
            Assert.fail("Test failed " + th.getMessage());
        }
    }

    @After
    public void destroyHDFS() {
        try {
            if (baseDir != null) {
                FileUtil.fullyDelete(baseDir);
            }
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }

    @Override // org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase
    public void testProgram(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(NO_OF_RETRIES, 0L));
        streamExecutionEnvironment.enableCheckpointing(10L);
        this.fc = new FileCreator();
        this.fc.start();
        TextInputFormat textInputFormat = new TextInputFormat(new org.apache.flink.core.fs.Path(localFsURI));
        textInputFormat.setFilesFilter(FilePathFilter.createDefaultFilter());
        DataStreamSource readFile = streamExecutionEnvironment.readFile(textInputFormat, localFsURI, FileProcessingMode.PROCESS_CONTINUOUSLY, INTERVAL);
        readFile.flatMap(new FlatMapFunction<String, String>() { // from class: org.apache.flink.test.checkpointing.ContinuousFileProcessingCheckpointITCase.1
            public void flatMap(String str, Collector<String> collector) throws Exception {
                collector.collect(str);
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((String) obj, (Collector<String>) collector);
            }
        }).addSink(new TestingSinkFunction()).setParallelism(1);
    }

    @Override // org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase
    public void postSubmit() throws Exception {
        this.fc.join();
        Map<Integer, Set<String>> map = actualCollectedContent;
        Assert.assertEquals(map.size(), this.fc.getFileContent().size());
        for (Integer num : this.fc.getFileContent().keySet()) {
            Assert.assertTrue(map.keySet().contains(num));
            ArrayList arrayList = new ArrayList(map.get(num));
            Collections.sort(arrayList, new Comparator<String>() { // from class: org.apache.flink.test.checkpointing.ContinuousFileProcessingCheckpointITCase.2
                @Override // java.util.Comparator
                public int compare(String str, String str2) {
                    return ContinuousFileProcessingCheckpointITCase.this.getLineNo(str) - ContinuousFileProcessingCheckpointITCase.this.getLineNo(str2);
                }
            });
            StringBuilder sb = new StringBuilder();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                sb.append((String) it.next());
            }
            Assert.assertEquals(this.fc.getFileContent().get(num), sb.toString());
        }
        map.clear();
        actualCollectedContent.clear();
        this.fc.clean();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getLineNo(String str) {
        String[] split = str.split("\\s");
        return Integer.parseInt(split[split.length - 1]);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Tuple2<Path, String> fillWithData(String str, String str2, int i, String str3) throws IOException, InterruptedException {
        if (!$assertionsDisabled && localFs == null) {
            throw new AssertionError();
        }
        Path path = new Path(str + "/." + str2 + i);
        FSDataOutputStream create = localFs.create(path);
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < LINES_PER_FILE; i2++) {
            String str4 = i + ": " + str3 + " " + i2 + "\n";
            sb.append(str4);
            create.write(str4.getBytes(ConfigConstants.DEFAULT_CHARSET));
        }
        create.close();
        return new Tuple2<>(path, sb.toString());
    }

    static {
        $assertionsDisabled = !ContinuousFileProcessingCheckpointITCase.class.desiredAssertionStatus();
        actualCollectedContent = new HashMap();
    }
}
