package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.HamcrestCondition;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest.class */
class CheckpointStorageLoaderTest {

    @TempDir
    private Path tmp;
    private final Logger LOG = LoggerFactory.getLogger(CheckpointStorageLoaderTest.class);
    private final ClassLoader cl = getClass().getClassLoader();

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$FailingFactory.class */
    static final class FailingFactory implements CheckpointStorageFactory<CheckpointStorage> {
        FailingFactory() {
        }

        public CheckpointStorage createFromConfig(ReadableConfig readableConfig, ClassLoader classLoader) throws IllegalConfigurationException {
            throw new IllegalConfigurationException("fail!");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$LegacyStateBackend.class */
    static final class LegacyStateBackend implements StateBackend, CheckpointStorage {
        LegacyStateBackend() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobID) throws IOException {
            return null;
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters operatorStateBackendParameters) throws Exception {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$MockStorage.class */
    public static final class MockStorage implements CheckpointStorage {
        MockStorage() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobID) throws IOException {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$ModernStateBackend.class */
    public static final class ModernStateBackend implements StateBackend {
        ModernStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(StateBackend.KeyedStateBackendParameters<K> keyedStateBackendParameters) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters operatorStateBackendParameters) throws Exception {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$NormalizedPathMatcher.class */
    public static class NormalizedPathMatcher extends TypeSafeMatcher<org.apache.flink.core.fs.Path> {
        private final org.apache.flink.core.fs.Path reNormalizedExpected;

        private NormalizedPathMatcher(org.apache.flink.core.fs.Path path) {
            this.reNormalizedExpected = path == null ? null : new org.apache.flink.core.fs.Path(path.toString());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean matchesSafely(org.apache.flink.core.fs.Path path) {
            if (this.reNormalizedExpected == null) {
                return path == null;
            }
            return this.reNormalizedExpected.equals(new org.apache.flink.core.fs.Path(path.toString()));
        }

        public void describeTo(Description description) {
            description.appendValue(this.reNormalizedExpected);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$WorkingFactory.class */
    static final class WorkingFactory implements CheckpointStorageFactory<MockStorage> {
        WorkingFactory() {
        }

        /* renamed from: createFromConfig, reason: merged with bridge method [inline-methods] */
        public MockStorage m587createFromConfig(ReadableConfig readableConfig, ClassLoader classLoader) throws IllegalConfigurationException {
            return new MockStorage();
        }
    }

    CheckpointStorageLoaderTest() {
    }

    @Test
    void testNoCheckpointStorageDefined() throws Exception {
        Assertions.assertThat(CheckpointStorageLoader.fromConfig(new Configuration(), this.cl, (Logger) null)).isNotPresent();
    }

    @Test
    void testLegacyStateBackendTakesPrecedence() throws Exception {
        LegacyStateBackend legacyStateBackend = new LegacyStateBackend();
        Assertions.assertThat(CheckpointStorageLoader.load(new MockStorage(), legacyStateBackend, new Configuration(), new Configuration(), this.cl, this.LOG)).withFailMessage("Legacy state backends should always take precedence", new Object[0]).isEqualTo(legacyStateBackend);
    }

    @Test
    void testModernStateBackendDoesNotTakePrecedence() throws Exception {
        ModernStateBackend modernStateBackend = new ModernStateBackend();
        MockStorage mockStorage = new MockStorage();
        Assertions.assertThat(CheckpointStorageLoader.load(mockStorage, modernStateBackend, new Configuration(), new Configuration(), this.cl, this.LOG)).withFailMessage("Modern state backends should never take precedence", new Object[0]).isEqualTo(mockStorage);
    }

    @Test
    void testLoadingFromFactory() throws Exception {
        Configuration configuration = new Configuration();
        Configuration configuration2 = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, WorkingFactory.class.getName());
        configuration2.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        Assertions.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), configuration, configuration2, this.cl, this.LOG)).isInstanceOf(MockStorage.class);
        Assertions.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), new Configuration(), configuration2, this.cl, this.LOG)).isInstanceOf(JobManagerCheckpointStorage.class);
    }

    @Test
    void testDefaultCheckpointStorage() throws Exception {
        Assertions.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), new Configuration(), new Configuration(), this.cl, this.LOG)).isInstanceOf(JobManagerCheckpointStorage.class);
        String path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        String path2 = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        Configuration configuration = new Configuration();
        Configuration configuration2 = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path);
        configuration2.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path2);
        Assertions.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), configuration, configuration2, this.cl, this.LOG).getCheckpointPath()).isEqualTo(new org.apache.flink.core.fs.Path((String) configuration.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY)));
        Assertions.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), new Configuration(), configuration2, this.cl, this.LOG).getCheckpointPath()).isEqualTo(new org.apache.flink.core.fs.Path((String) configuration2.get(CheckpointingOptions.CHECKPOINTS_DIRECTORY)));
    }

    @Test
    void testLoadingFails() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "does.not.exist");
        Assertions.assertThatThrownBy(() -> {
            CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), new Configuration(), configuration, this.cl, this.LOG);
        }).isInstanceOf(DynamicCodeLoadingException.class);
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, File.class.getName());
        Assertions.assertThatThrownBy(() -> {
            CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), new Configuration(), configuration, this.cl, this.LOG);
        }).isInstanceOf(DynamicCodeLoadingException.class);
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, FailingFactory.class.getName());
        Assertions.assertThatThrownBy(() -> {
            CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), new Configuration(), configuration, this.cl, this.LOG);
        }).isInstanceOf(IllegalConfigurationException.class);
    }

    @Test
    void testLoadJobManagerStorageNoParameters() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        Assertions.assertThat((CheckpointStorage) CheckpointStorageLoader.fromConfig(configuration, this.cl, (Logger) null).get()).isInstanceOf(JobManagerCheckpointStorage.class);
    }

    @Test
    void testLoadJobManagerStorageWithParameters() throws Exception {
        String path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        org.apache.flink.core.fs.Path path2 = new org.apache.flink.core.fs.Path(path);
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path);
        JobManagerCheckpointStorage jobManagerCheckpointStorage = (CheckpointStorage) CheckpointStorageLoader.fromConfig(configuration, this.cl, (Logger) null).get();
        Assertions.assertThat(jobManagerCheckpointStorage).isInstanceOf(JobManagerCheckpointStorage.class);
        Assertions.assertThat(jobManagerCheckpointStorage.getSavepointPath()).isEqualTo(path2);
    }

    @Test
    void testConfigureJobManagerStorage() throws Exception {
        String path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        org.apache.flink.core.fs.Path path2 = new org.apache.flink.core.fs.Path(path);
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path);
        JobManagerCheckpointStorage load = CheckpointStorageLoader.load(new JobManagerCheckpointStorage(100), new ModernStateBackend(), new Configuration(), configuration, this.cl, this.LOG);
        Assertions.assertThat(load).isInstanceOf(JobManagerCheckpointStorage.class);
        JobManagerCheckpointStorage jobManagerCheckpointStorage = load;
        Assertions.assertThat(jobManagerCheckpointStorage.getSavepointPath()).is(HamcrestCondition.matching(normalizedPath(path2)));
        Assertions.assertThat(jobManagerCheckpointStorage.getMaxStateSize()).isEqualTo(100);
    }

    @Test
    void testConfigureJobManagerStorageWithParameters() throws Exception {
        String path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        String path2 = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        Configuration configuration = new Configuration();
        Configuration configuration2 = new Configuration();
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path);
        configuration2.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path2);
        JobManagerCheckpointStorage load = CheckpointStorageLoader.load(new JobManagerCheckpointStorage(), new ModernStateBackend(), configuration2, configuration, this.cl, this.LOG);
        Assertions.assertThat(load).isInstanceOf(JobManagerCheckpointStorage.class);
        Assertions.assertThat(load.getSavepointPath()).isEqualTo(new org.apache.flink.core.fs.Path(path));
    }

    @Test
    void testLoadFileSystemCheckpointStorage() throws Exception {
        String path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        String path2 = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        org.apache.flink.core.fs.Path path3 = new org.apache.flink.core.fs.Path(path);
        org.apache.flink.core.fs.Path path4 = new org.apache.flink.core.fs.Path(path2);
        MemorySize parse = MemorySize.parse("900kb");
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path);
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path2);
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, parse);
        configuration.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, Integer.valueOf(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE));
        FileSystemCheckpointStorage fileSystemCheckpointStorage = (CheckpointStorage) CheckpointStorageLoader.fromConfig(configuration, this.cl, (Logger) null).get();
        Assertions.assertThat(fileSystemCheckpointStorage).isInstanceOf(FileSystemCheckpointStorage.class);
        FileSystemCheckpointStorage fileSystemCheckpointStorage2 = fileSystemCheckpointStorage;
        Assertions.assertThat(fileSystemCheckpointStorage2.getCheckpointPath()).is(HamcrestCondition.matching(normalizedPath(path3)));
        Assertions.assertThat(fileSystemCheckpointStorage2.getSavepointPath()).is(HamcrestCondition.matching(normalizedPath(path4)));
        Assertions.assertThat(fileSystemCheckpointStorage2.getMinFileSizeThreshold()).isEqualTo(parse.getBytes());
        Assertions.assertThat(fileSystemCheckpointStorage2.getWriteBufferSize()).isEqualTo(Math.max(parse.getBytes(), 1024L));
    }

    @Test
    void testLoadFileSystemCheckpointStorageMixed() throws Exception {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI());
        String path2 = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        String path3 = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        org.apache.flink.core.fs.Path path4 = new org.apache.flink.core.fs.Path(path3);
        FileSystemCheckpointStorage fileSystemCheckpointStorage = new FileSystemCheckpointStorage(path, 1000000, 4000000);
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path2);
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path3);
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20"));
        configuration.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000);
        FileSystemCheckpointStorage load = CheckpointStorageLoader.load(fileSystemCheckpointStorage, new ModernStateBackend(), new Configuration(), configuration, this.cl, this.LOG);
        Assertions.assertThat(load).isInstanceOf(FileSystemCheckpointStorage.class);
        FileSystemCheckpointStorage fileSystemCheckpointStorage2 = load;
        Assertions.assertThat(fileSystemCheckpointStorage2.getCheckpointPath()).is(HamcrestCondition.matching(normalizedPath(path)));
        Assertions.assertThat(fileSystemCheckpointStorage2.getSavepointPath()).is(HamcrestCondition.matching(normalizedPath(path4)));
        Assertions.assertThat(fileSystemCheckpointStorage2.getMinFileSizeThreshold()).isEqualTo(1000000);
        Assertions.assertThat(fileSystemCheckpointStorage2.getWriteBufferSize()).isEqualTo(4000000);
    }

    @Test
    void testHighAvailabilityDefault() throws Exception {
        String path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI()).toString();
        testMemoryBackendHighAvailabilityDefault(path, null);
        testMemoryBackendHighAvailabilityDefault(path, new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI().toString()));
    }

    @Test
    void testHighAvailabilityDefaultLocalPaths() throws Exception {
        String path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).getAbsolutePath()).toString();
        testMemoryBackendHighAvailabilityDefault(path, null);
        testMemoryBackendHighAvailabilityDefault(path, new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.tmp).toURI().toString()).makeQualified(FileSystem.getLocalFileSystem()));
    }

    private void testMemoryBackendHighAvailabilityDefault(String str, org.apache.flink.core.fs.Path path) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
        configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, str);
        Configuration configuration2 = new Configuration();
        configuration2.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        configuration2.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration2.set(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
        configuration2.set(HighAvailabilityOptions.HA_STORAGE_PATH, str);
        if (path != null) {
            configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path.toUri().toString());
            configuration2.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path.toUri().toString());
        }
        JobManagerCheckpointStorage load = CheckpointStorageLoader.load(new JobManagerCheckpointStorage(), new ModernStateBackend(), new Configuration(), configuration, this.cl, this.LOG);
        JobManagerCheckpointStorage load2 = CheckpointStorageLoader.load((CheckpointStorage) null, new ModernStateBackend(), new Configuration(), configuration2, this.cl, this.LOG);
        Assertions.assertThat(load).isInstanceOf(JobManagerCheckpointStorage.class);
        Assertions.assertThat(load2).isInstanceOf(JobManagerCheckpointStorage.class);
        JobManagerCheckpointStorage jobManagerCheckpointStorage = load;
        JobManagerCheckpointStorage jobManagerCheckpointStorage2 = load2;
        Assertions.assertThat(jobManagerCheckpointStorage.getSavepointPath()).isNull();
        Assertions.assertThat(jobManagerCheckpointStorage2.getSavepointPath()).isNull();
        if (path != null) {
            Assertions.assertThat(jobManagerCheckpointStorage.getCheckpointPath()).is(HamcrestCondition.matching(normalizedPath(path)));
            Assertions.assertThat(jobManagerCheckpointStorage2.getCheckpointPath()).is(HamcrestCondition.matching(normalizedPath(path)));
        } else {
            Assertions.assertThat(jobManagerCheckpointStorage.getCheckpointPath()).isNull();
            Assertions.assertThat(jobManagerCheckpointStorage2.getCheckpointPath()).isNull();
        }
    }

    private static Matcher<org.apache.flink.core.fs.Path> normalizedPath(org.apache.flink.core.fs.Path path) {
        return new NormalizedPathMatcher(path);
    }
}
