package org.apache.flink.runtime.jobmaster;

import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobmaster.slotpool.PreferredAllocationRequestSlotMatchingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
import org.apache.flink.runtime.scheduler.DefaultSchedulerFactory;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerFactory;
import org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchSchedulerFactory;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.class */
class DefaultSlotPoolServiceSchedulerFactoryTest {
    DefaultSlotPoolServiceSchedulerFactoryTest() {
    }

    @Test
    void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        DefaultSlotPoolServiceSchedulerFactory fromConfiguration = DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(configuration, JobType.BATCH, true);
        Assertions.assertThat(fromConfiguration.getSchedulerNGFactory()).isInstanceOf(AdaptiveBatchSchedulerFactory.class);
        Assertions.assertThat(fromConfiguration.getSchedulerType()).isEqualTo(JobManagerOptions.SchedulerType.AdaptiveBatch);
    }

    @Test
    void testAdaptiveSchedulerForReactiveMode() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);
        DefaultSlotPoolServiceSchedulerFactory fromConfiguration = DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(configuration, JobType.STREAMING, false);
        Assertions.assertThat(fromConfiguration.getSchedulerNGFactory()).isInstanceOf(AdaptiveSchedulerFactory.class);
        Assertions.assertThat(fromConfiguration.getSchedulerType()).isEqualTo(JobManagerOptions.SchedulerType.Adaptive);
    }

    @Test
    void testFallBackSchedulerWithAdaptiveSchedulerTestProperty() {
        String saveAdaptiveSchedulerTestPropertyValue = saveAdaptiveSchedulerTestPropertyValue();
        System.setProperty("flink.tests.enable-adaptive-scheduler", "true");
        DefaultSlotPoolServiceSchedulerFactory fromConfiguration = DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(new Configuration(), JobType.BATCH, true);
        Assertions.assertThat(fromConfiguration.getSchedulerNGFactory()).isInstanceOf(AdaptiveBatchSchedulerFactory.class);
        Assertions.assertThat(fromConfiguration.getSchedulerType()).isEqualTo(JobManagerOptions.SchedulerType.AdaptiveBatch);
        DefaultSlotPoolServiceSchedulerFactory fromConfiguration2 = DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(new Configuration(), JobType.STREAMING, false);
        Assertions.assertThat(fromConfiguration2.getSchedulerNGFactory()).isInstanceOf(AdaptiveSchedulerFactory.class);
        Assertions.assertThat(fromConfiguration2.getSchedulerType()).isEqualTo(JobManagerOptions.SchedulerType.Adaptive);
        restoreAdaptiveSchedulerTestPropertiesValue(saveAdaptiveSchedulerTestPropertyValue);
    }

    @Test
    void testFallBackSchedulerWithoutAdaptiveSchedulerTestProperty() {
        String saveAdaptiveSchedulerTestPropertyValue = saveAdaptiveSchedulerTestPropertyValue();
        System.clearProperty("flink.tests.enable-adaptive-scheduler");
        DefaultSlotPoolServiceSchedulerFactory fromConfiguration = DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(new Configuration(), JobType.BATCH, true);
        Assertions.assertThat(fromConfiguration.getSchedulerNGFactory()).isInstanceOf(AdaptiveBatchSchedulerFactory.class);
        Assertions.assertThat(fromConfiguration.getSchedulerType()).isEqualTo(JobManagerOptions.SchedulerType.AdaptiveBatch);
        DefaultSlotPoolServiceSchedulerFactory fromConfiguration2 = DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(new Configuration(), JobType.STREAMING, false);
        Assertions.assertThat(fromConfiguration2.getSchedulerNGFactory()).isInstanceOf(DefaultSchedulerFactory.class);
        Assertions.assertThat(fromConfiguration2.getSchedulerType()).isEqualTo(JobManagerOptions.SchedulerType.Default);
        restoreAdaptiveSchedulerTestPropertiesValue(saveAdaptiveSchedulerTestPropertyValue);
    }

    @MethodSource({"testGetRequestSlotMatchingStrategy"})
    @ParameterizedTest
    public void testGetRequestSlotMatchingStrategy(boolean z, JobType jobType, RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
        Configuration configuration = new Configuration();
        configuration.set(StateRecoveryOptions.LOCAL_RECOVERY, Boolean.valueOf(z));
        Assertions.assertThat(DefaultSlotPoolServiceSchedulerFactory.getRequestSlotMatchingStrategy(configuration, jobType)).isSameAs(requestSlotMatchingStrategy);
    }

    private static Stream<Arguments> testGetRequestSlotMatchingStrategy() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{false, JobType.BATCH, SimpleRequestSlotMatchingStrategy.INSTANCE}), Arguments.of(new Object[]{false, JobType.STREAMING, SimpleRequestSlotMatchingStrategy.INSTANCE}), Arguments.of(new Object[]{true, JobType.BATCH, SimpleRequestSlotMatchingStrategy.INSTANCE}), Arguments.of(new Object[]{true, JobType.STREAMING, PreferredAllocationRequestSlotMatchingStrategy.INSTANCE})});
    }

    private String saveAdaptiveSchedulerTestPropertyValue() {
        return System.getProperty("flink.tests.enable-adaptive-scheduler");
    }

    private void restoreAdaptiveSchedulerTestPropertiesValue(String str) {
        if (str == null) {
            System.clearProperty("flink.tests.enable-adaptive-scheduler");
        } else {
            System.setProperty("flink.tests.enable-adaptive-scheduler", str);
        }
    }
}
