package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.class */
public class TaskExecutorITCase extends TestLogger {
    private static final Duration TESTING_TIMEOUT = Duration.ofMinutes(2);
    private static final int NUM_TMS = 2;
    private static final int SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    private MiniCluster miniCluster;

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorITCase$BlockingOperator.class */
    public static class BlockingOperator extends TestingAbstractInvokables.Receiver {
        private static CountDownLatch countDownLatch = new CountDownLatch(1);

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        @Override // org.apache.flink.runtime.jobmaster.TestingAbstractInvokables.Receiver
        public void invoke() throws Exception {
            countDownLatch.await();
            super.invoke();
        }

        public static void unblock() {
            countDownLatch.countDown();
        }

        public static void reset() {
            countDownLatch = new CountDownLatch(1);
        }
    }

    @Before
    public void setup() throws Exception {
        this.miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build());
        this.miniCluster.start();
    }

    @After
    public void teardown() throws Exception {
        if (this.miniCluster != null) {
            this.miniCluster.close();
        }
    }

    @Test
    public void testJobReExecutionAfterTaskExecutorTermination() throws Exception {
        CompletableFuture<JobResult> submitJobAndWaitUntilRunning = submitJobAndWaitUntilRunning(createJobGraph(4));
        this.miniCluster.terminateTaskManager(0);
        Assert.assertThat(Boolean.valueOf(submitJobAndWaitUntilRunning.get().isSuccess()), Matchers.is(false));
        this.miniCluster.startTaskManager();
        JobGraph createJobGraph = createJobGraph(4);
        BlockingOperator.unblock();
        this.miniCluster.submitJob(createJobGraph).get();
        this.miniCluster.requestJobResult(createJobGraph.getJobID()).get();
    }

    @Test
    public void testJobRecoveryWithFailingTaskExecutor() throws Exception {
        CompletableFuture<JobResult> submitJobAndWaitUntilRunning = submitJobAndWaitUntilRunning(createJobGraphWithRestartStrategy(4));
        this.miniCluster.startTaskManager();
        this.miniCluster.terminateTaskManager(0).get();
        BlockingOperator.unblock();
        Assert.assertThat(Boolean.valueOf(submitJobAndWaitUntilRunning.get().isSuccess()), Matchers.is(true));
    }

    private CompletableFuture<JobResult> submitJobAndWaitUntilRunning(JobGraph jobGraph) throws Exception {
        this.miniCluster.submitJob(jobGraph).get();
        CompletableFuture<JobResult> requestJobResult = this.miniCluster.requestJobResult(jobGraph.getJobID());
        Assert.assertThat(Boolean.valueOf(requestJobResult.isDone()), Matchers.is(false));
        CommonTestUtils.waitUntilCondition(jobIsRunning(() -> {
            return this.miniCluster.getExecutionGraph(jobGraph.getJobID());
        }), Deadline.fromNow(TESTING_TIMEOUT), 50L);
        return requestJobResult;
    }

    private SupplierWithException<Boolean, Exception> jobIsRunning(Supplier<CompletableFuture<? extends AccessExecutionGraph>> supplier) {
        Predicate<AccessExecutionGraph> allExecutionsPredicate = ExecutionGraphTestUtils.allExecutionsPredicate(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED)));
        return () -> {
            AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) ((CompletableFuture) supplier.get()).join();
            return Boolean.valueOf(allExecutionsPredicate.test(accessExecutionGraph) && accessExecutionGraph.getState() == JobStatus.RUNNING);
        };
    }

    private JobGraph createJobGraphWithRestartStrategy(int i) throws IOException {
        JobGraph createJobGraph = createJobGraph(i);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0L));
        createJobGraph.setExecutionConfig(executionConfig);
        return createJobGraph;
    }

    private JobGraph createJobGraph(int i) {
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setParallelism(i);
        jobVertex.setInvokableClass(TestingAbstractInvokables.Sender.class);
        JobVertex jobVertex2 = new JobVertex("Blocking receiver");
        jobVertex2.setParallelism(i);
        jobVertex2.setInvokableClass(BlockingOperator.class);
        BlockingOperator.reset();
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        return JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
    }
}
