package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.TestingJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.DefaultExecutionDeployer;
import org.apache.flink.runtime.shuffle.TestingShuffleMaster;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.IterableUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionDeployerTest.class */
class DefaultExecutionDeployerTest {
    private ScheduledExecutorService executor;
    private ComponentMainThreadExecutor mainThreadExecutor;
    private TestExecutionOperationsDecorator testExecutionOperations;
    private ExecutionVertexVersioner executionVertexVersioner;
    private TestExecutionSlotAllocator testExecutionSlotAllocator;
    private TestingShuffleMaster shuffleMaster;
    private TestingJobMasterPartitionTracker partitionTracker;
    private Time partitionRegistrationTimeout;

    DefaultExecutionDeployerTest() {
    }

    @BeforeEach
    void setUp() {
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        this.testExecutionOperations = new TestExecutionOperationsDecorator(new ExecutionOperations() { // from class: org.apache.flink.runtime.scheduler.DefaultExecutionDeployerTest.1
            public void deploy(Execution execution) {
            }

            public CompletableFuture<?> cancel(Execution execution) {
                return null;
            }

            public void markFailed(Execution execution, Throwable th) {
            }
        });
        this.executionVertexVersioner = new ExecutionVertexVersioner();
        this.testExecutionSlotAllocator = new TestExecutionSlotAllocator();
        this.shuffleMaster = new TestingShuffleMaster();
        this.partitionTracker = new TestingJobMasterPartitionTracker();
        this.partitionRegistrationTimeout = Time.milliseconds(5000L);
    }

    @AfterEach
    void tearDown() {
        if (this.executor != null) {
            ExecutorUtils.gracefulShutdown(5L, TimeUnit.SECONDS, new ExecutorService[]{this.executor});
        }
    }

    @Test
    void testDeployTasks() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(singleNonParallelJobVertexJobGraph());
        deployTasks(createExecutionDeployer(), createExecutionGraph);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).containsExactly(new ExecutionAttemptID[]{getAnyExecution(createExecutionGraph).getAttemptId()});
    }

    @Test
    void testDeployTasksOnlyIfAllSlotRequestsAreFulfilled() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(singleJobVertexJobGraph(4));
        ExecutionDeployer createExecutionDeployer = createExecutionDeployer();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        deployTasks(createExecutionDeployer, createExecutionGraph);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).isEmpty();
        this.testExecutionSlotAllocator.completePendingRequest(getAnyExecution(createExecutionGraph).getAttemptId());
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).isEmpty();
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(4);
    }

    @Test
    void testDeploymentFailures() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.testExecutionOperations.enableFailDeploy();
        ExecutionGraph createExecutionGraph = createExecutionGraph(singleNonParallelJobVertexJobGraph);
        deployTasks(createExecutionDeployer(), createExecutionGraph);
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).containsExactly(new ExecutionAttemptID[]{getAnyExecution(createExecutionGraph).getAttemptId()});
    }

    @Test
    void testSlotAllocationTimeout() throws Exception {
        JobGraph singleJobVertexJobGraph = singleJobVertexJobGraph(2);
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        ExecutionGraph createExecutionGraph = createExecutionGraph(singleJobVertexJobGraph);
        deployTasks(createExecutionDeployer(), createExecutionGraph);
        Assertions.assertThat(this.testExecutionSlotAllocator.getPendingRequests()).hasSize(2);
        ExecutionAttemptID attemptId = getAnyExecution(createExecutionGraph).getAttemptId();
        this.testExecutionSlotAllocator.timeoutPendingRequest(attemptId);
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).containsExactly(new ExecutionAttemptID[]{attemptId});
    }

    @Test
    void testSkipDeploymentIfVertexVersionOutdated() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        ExecutionGraph createExecutionGraph = createExecutionGraph(singleNonParallelJobVertexJobGraph);
        deployTasks(createExecutionDeployer(), createExecutionGraph);
        this.executionVertexVersioner.recordModification(getAnyExecution(createExecutionGraph).getAttemptId().getExecutionVertexId());
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionOperations.getDeployedVertices()).isEmpty();
    }

    @Test
    void testReleaseSlotIfVertexVersionOutdated() throws Exception {
        JobGraph singleNonParallelJobVertexJobGraph = singleNonParallelJobVertexJobGraph();
        this.testExecutionSlotAllocator.disableAutoCompletePendingRequests();
        ExecutionGraph createExecutionGraph = createExecutionGraph(singleNonParallelJobVertexJobGraph);
        deployTasks(createExecutionDeployer(), createExecutionGraph);
        this.executionVertexVersioner.recordModification(getAnyExecution(createExecutionGraph).getAttemptId().getExecutionVertexId());
        this.testExecutionSlotAllocator.completePendingRequests();
        Assertions.assertThat(this.testExecutionSlotAllocator.getReturnedSlots()).hasSize(1);
    }

    @Test
    void testDeployOnlyIfVertexIsCreated() throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(singleNonParallelJobVertexJobGraph());
        ExecutionDeployer createExecutionDeployer = createExecutionDeployer();
        deployTasks(createExecutionDeployer, createExecutionGraph);
        Assertions.assertThatThrownBy(() -> {
            deployTasks(createExecutionDeployer, createExecutionGraph);
        }).as("IllegalStateException should happen", new Object[0]).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testDeploymentWaitForProducedPartitionRegistration() throws Exception {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        ArrayList arrayList = new ArrayList();
        this.partitionTracker.setStartTrackingPartitionsConsumer((resourceID, resultPartitionDeploymentDescriptor) -> {
            arrayList.add(resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID());
        });
        deployTasks(createExecutionDeployer(), createExecutionGraph(nonParallelSourceSinkJobGraph()));
        Assertions.assertThat(arrayList).isEmpty();
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).isEmpty();
        this.shuffleMaster.completeAllPendingRegistrations();
        Assertions.assertThat(arrayList).hasSize(1);
        Assertions.assertThat(this.testExecutionOperations.getDeployedExecutions()).hasSize(2);
    }

    @Test
    void testFailedProducedPartitionRegistration() throws Exception {
        this.shuffleMaster.setAutoCompleteRegistration(false);
        deployTasks(createExecutionDeployer(), createExecutionGraph(nonParallelSourceSinkJobGraph()));
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).isEmpty();
        this.shuffleMaster.failAllPendingRegistrations();
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).hasSize(1);
    }

    @Test
    void testDirectExceptionOnProducedPartitionRegistration() throws Exception {
        this.shuffleMaster.setThrowExceptionalOnRegistration(true);
        deployTasks(createExecutionDeployer(), createExecutionGraph(nonParallelSourceSinkJobGraph()));
        Assertions.assertThat(this.testExecutionOperations.getFailedExecutions()).hasSize(1);
    }

    @Test
    void testProducedPartitionRegistrationTimeout() throws Exception {
        ScheduledExecutorService scheduledExecutorService = null;
        try {
            this.partitionRegistrationTimeout = Time.milliseconds(1L);
            scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
            this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(scheduledExecutorService);
            this.shuffleMaster.setAutoCompleteRegistration(false);
            deployTasks(createExecutionDeployer(), createExecutionGraph(nonParallelSourceSinkJobGraph()));
            this.testExecutionOperations.awaitFailedExecutions(1);
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
        } catch (Throwable th) {
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
            }
            throw th;
        }
    }

    private static JobGraph singleNonParallelJobVertexJobGraph() {
        return singleJobVertexJobGraph(1);
    }

    private static JobGraph singleJobVertexJobGraph(int i) {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return JobGraphTestUtils.streamingJobGraph(jobVertex);
    }

    private static JobGraph nonParallelSourceSinkJobGraph() {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        return JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setShuffleMaster(this.shuffleMaster).setPartitionTracker(this.partitionTracker).build(this.executor);
        build.setInternalTaskFailuresListener(new TestingInternalFailuresListener());
        build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return build;
    }

    private ExecutionDeployer createExecutionDeployer() {
        return new DefaultExecutionDeployer.Factory().createInstance(LoggerFactory.getLogger(DefaultExecutionDeployer.class), this.testExecutionSlotAllocator, this.testExecutionOperations, this.executionVertexVersioner, this.partitionRegistrationTimeout, (executionVertexID, allocationID) -> {
        }, this.mainThreadExecutor);
    }

    private void deployTasks(ExecutionDeployer executionDeployer, ExecutionGraph executionGraph) {
        deployTasks(executionDeployer, (List<Execution>) IterableUtils.toStream(executionGraph.getAllExecutionVertices()).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).collect(Collectors.toList()));
    }

    private void deployTasks(ExecutionDeployer executionDeployer, List<Execution> list) {
        executionDeployer.allocateSlotsAndDeploy(list, this.executionVertexVersioner.recordVertexModifications((Set) list.stream().map((v0) -> {
            return v0.getAttemptId();
        }).map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toSet())));
    }

    private static Execution getAnyExecution(ExecutionGraph executionGraph) {
        return (Execution) executionGraph.getRegisteredExecutions().values().iterator().next();
    }
}
