package org.apache.flink.runtime.scheduler;

import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridgeBuilder;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProviderImpl;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.CheckedSupplier;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultSchedulerBatchSchedulingTest.class */
class DefaultSchedulerBatchSchedulingTest {
    protected final Logger log = LoggerFactory.getLogger(getClass());

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    DefaultSchedulerBatchSchedulingTest() {
    }

    @BeforeAll
    private static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterAll
    private static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    void testSchedulingOfJobWithFewerSlotsThanParallelism() throws Exception {
        Time milliseconds = Time.milliseconds(5L);
        JobGraph createBatchJobGraph = createBatchJobGraph(5);
        SlotPool createSlotPool = createSlotPool(mainThreadExecutor, milliseconds);
        Throwable th = null;
        try {
            try {
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(5);
                TestingTaskExecutorGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
                    arrayBlockingQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
                    return CompletableFuture.completedFuture(Acknowledge.get());
                }).createTestingTaskExecutorGateway();
                PhysicalSlotProvider physicalSlotProviderImpl = new PhysicalSlotProviderImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), createSlotPool);
                GloballyTerminalJobStatusListener globallyTerminalJobStatusListener = new GloballyTerminalJobStatusListener();
                SchedulerNG createScheduler = createScheduler(createBatchJobGraph, mainThreadExecutor, physicalSlotProviderImpl, milliseconds, globallyTerminalJobStatusListener);
                createScheduler.getClass();
                CompletableFuture.runAsync(createScheduler::startScheduling, mainThreadExecutor).join();
                SlotPoolUtils.offerSlots(createSlotPool, mainThreadExecutor, Collections.singletonList(ResourceProfile.ANY), new RpcTaskManagerGateway(createTestingTaskExecutorGateway, JobMasterId.generate()));
                Thread.sleep(milliseconds.toMilliseconds());
                CompletableFuture<JobStatus> terminationFuture = globallyTerminalJobStatusListener.getTerminationFuture();
                for (int i = 0; i < 5; i++) {
                    arrayBlockingQueue.getClass();
                    CompletableFuture supplyAsync = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(arrayBlockingQueue::take));
                    CompletableFuture.anyOf(supplyAsync, terminationFuture).join();
                    if (supplyAsync.isDone()) {
                        finishExecution((ExecutionAttemptID) supplyAsync.get(), createScheduler, mainThreadExecutor);
                    } else {
                        Assertions.fail(String.format("Job reached a globally terminal state %s before all executions were finished.", terminationFuture.get()));
                    }
                }
                FlinkAssertions.assertThatFuture(terminationFuture).isCompletedWithValue(JobStatus.FINISHED);
                if (createSlotPool != null) {
                    if (0 == 0) {
                        createSlotPool.close();
                        return;
                    }
                    try {
                        createSlotPool.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createSlotPool != null) {
                if (th != null) {
                    try {
                        createSlotPool.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createSlotPool.close();
                }
            }
            throw th4;
        }
    }

    private void finishExecution(ExecutionAttemptID executionAttemptID, SchedulerNG schedulerNG, ComponentMainThreadExecutor componentMainThreadExecutor) {
        CompletableFuture.runAsync(() -> {
            schedulerNG.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.INITIALIZING));
            schedulerNG.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.RUNNING));
            schedulerNG.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.FINISHED));
        }, componentMainThreadExecutor).join();
    }

    private SlotPool createSlotPool(ComponentMainThreadExecutor componentMainThreadExecutor, Time time) throws Exception {
        return new DeclarativeSlotPoolBridgeBuilder().setBatchSlotTimeout(time).buildAndStart(componentMainThreadExecutor);
    }

    private JobGraph createBatchJobGraph(int i) {
        JobVertex jobVertex = new JobVertex("testing task");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return JobGraphTestUtils.batchJobGraph(jobVertex);
    }

    private SchedulerNG createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor, PhysicalSlotProvider physicalSlotProvider, Time time, JobStatusListener jobStatusListener) throws Exception {
        return new DefaultSchedulerBuilder(jobGraph, componentMainThreadExecutor, (ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, time)).setJobStatusListener(jobStatusListener).build();
    }
}
