package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerClusterITCase.class */
public class AdaptiveSchedulerClusterITCase extends TestLogger {
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int PARALLELISM = 4;
    private final Configuration configuration = createConfiguration();

    @Rule
    public final MiniClusterResource miniClusterResource = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.configuration).setNumberSlotsPerTaskManager(2).setNumberTaskManagers(2).build());

    private Configuration createConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);
        configuration.set(ClusterOptions.ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT, true);
        return configuration;
    }

    @Test
    public void testAutomaticScaleDownInCaseOfLostSlots() throws InterruptedException, IOException {
        Assume.assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(this.configuration));
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        JobGraph createBlockingJobGraph = createBlockingJobGraph(4);
        miniCluster.submitJob(createBlockingJobGraph).join();
        CompletableFuture requestJobResult = miniCluster.requestJobResult(createBlockingJobGraph.getJobID());
        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
        miniCluster.terminateTaskManager(0);
        Assert.assertTrue(((JobResult) requestJobResult.join()).isSuccess());
    }

    @Test
    public void testAutomaticScaleUp() throws Exception {
        Assume.assumeTrue(ClusterOptions.isDeclarativeResourceManagementEnabled(this.configuration));
        MiniCluster miniCluster = this.miniClusterResource.getMiniCluster();
        JobGraph createBlockingJobGraph = createBlockingJobGraph(6);
        OnceBlockingNoOpInvokable.resetFor(4);
        this.log.info("Submitting job with parallelism of 6, to a cluster with only one TM.");
        miniCluster.submitJob(createBlockingJobGraph).join();
        CompletableFuture requestJobResult = miniCluster.requestJobResult(createBlockingJobGraph.getJobID());
        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
        this.log.info("Start additional TaskManager to scale up to the full parallelism.");
        OnceBlockingNoOpInvokable.resetInstanceCount();
        OnceBlockingNoOpInvokable.resetFor(6);
        miniCluster.startTaskManager();
        this.log.info("Waiting until Invokable is running with higher parallelism");
        OnceBlockingNoOpInvokable.waitUntilOpsAreRunning();
        Assert.assertEquals(6, OnceBlockingNoOpInvokable.getInstanceCount());
        Assert.assertTrue(((JobResult) requestJobResult.join()).isSuccess());
    }

    private JobGraph createBlockingJobGraph(int i) throws IOException {
        JobVertex jobVertex = new JobVertex("Blocking operator");
        OnceBlockingNoOpInvokable.resetFor(i);
        jobVertex.setInvokableClass(OnceBlockingNoOpInvokable.class);
        jobVertex.setParallelism(i);
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        streamingJobGraph.setExecutionConfig(executionConfig);
        return streamingJobGraph;
    }
}
