package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.Collections;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.IsSame;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.class */
public class ExecutionFailureHandlerTest extends TestLogger {
    private static final long RESTART_DELAY_MS = 1234;
    private SchedulingTopology schedulingTopology;
    private TestFailoverStrategy failoverStrategy;
    private TestRestartBackoffTimeStrategy backoffTimeStrategy;
    private ExecutionFailureHandler executionFailureHandler;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest$TestFailoverStrategy.class */
    private static class TestFailoverStrategy implements FailoverStrategy {
        private Set<ExecutionVertexID> tasksToRestart;

        public void setTasksToRestart(Set<ExecutionVertexID> set) {
            this.tasksToRestart = set;
        }

        public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexID, Throwable th) {
            return this.tasksToRestart;
        }
    }

    @Before
    public void setUp() {
        TestingSchedulingTopology testingSchedulingTopology = new TestingSchedulingTopology();
        testingSchedulingTopology.newExecutionVertex();
        this.schedulingTopology = testingSchedulingTopology;
        this.failoverStrategy = new TestFailoverStrategy();
        this.backoffTimeStrategy = new TestRestartBackoffTimeStrategy(true, RESTART_DELAY_MS);
        this.executionFailureHandler = new ExecutionFailureHandler(this.schedulingTopology, this.failoverStrategy, this.backoffTimeStrategy);
    }

    @Test
    public void testNormalFailureHandling() {
        Set<ExecutionVertexID> singleton = Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0));
        this.failoverStrategy.setTasksToRestart(singleton);
        Exception exc = new Exception("test failure");
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getFailureHandlingResult(new ExecutionVertexID(new JobVertexID(), 0), exc);
        Assert.assertTrue(failureHandlingResult.canRestart());
        Assert.assertEquals(RESTART_DELAY_MS, failureHandlingResult.getRestartDelayMS());
        Assert.assertEquals(singleton, failureHandlingResult.getVerticesToRestart());
        Assert.assertThat(failureHandlingResult.getError(), IsSame.sameInstance(exc));
        Assert.assertEquals(1L, this.executionFailureHandler.getNumberOfRestarts());
    }

    @Test
    public void testRestartingSuppressedFailureHandlingResult() {
        this.backoffTimeStrategy.setCanRestart(false);
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getFailureHandlingResult(new ExecutionVertexID(new JobVertexID(), 0), new Exception("test failure"));
        Assert.assertFalse(failureHandlingResult.canRestart());
        Assert.assertNotNull(failureHandlingResult.getError());
        Assert.assertFalse(ExecutionFailureHandler.isUnrecoverableError(failureHandlingResult.getError()));
        try {
            failureHandlingResult.getVerticesToRestart();
            Assert.fail("get tasks to restart is not allowed when restarting is suppressed");
        } catch (IllegalStateException e) {
        }
        try {
            failureHandlingResult.getRestartDelayMS();
            Assert.fail("get restart delay is not allowed when restarting is suppressed");
        } catch (IllegalStateException e2) {
        }
        Assert.assertEquals(0L, this.executionFailureHandler.getNumberOfRestarts());
    }

    @Test
    public void testNonRecoverableFailureHandlingResult() {
        FailureHandlingResult failureHandlingResult = this.executionFailureHandler.getFailureHandlingResult(new ExecutionVertexID(new JobVertexID(), 0), new Exception((Throwable) new SuppressRestartsException(new Exception("test failure"))));
        Assert.assertFalse(failureHandlingResult.canRestart());
        Assert.assertNotNull(failureHandlingResult.getError());
        Assert.assertTrue(ExecutionFailureHandler.isUnrecoverableError(failureHandlingResult.getError()));
        try {
            failureHandlingResult.getVerticesToRestart();
            Assert.fail("get tasks to restart is not allowed when restarting is suppressed");
        } catch (IllegalStateException e) {
        }
        try {
            failureHandlingResult.getRestartDelayMS();
            Assert.fail("get restart delay is not allowed when restarting is suppressed");
        } catch (IllegalStateException e2) {
        }
        Assert.assertEquals(0L, this.executionFailureHandler.getNumberOfRestarts());
    }

    @Test
    public void testUnrecoverableErrorCheck() {
        Assert.assertFalse(ExecutionFailureHandler.isUnrecoverableError(new Exception()));
        Assert.assertTrue(ExecutionFailureHandler.isUnrecoverableError(new SuppressRestartsException(new Exception())));
        Assert.assertTrue(ExecutionFailureHandler.isUnrecoverableError(new Exception((Throwable) new SuppressRestartsException(new Exception()))));
    }

    @Test
    public void testGlobalFailureHandling() {
        Assert.assertEquals(IterableUtils.toStream(this.schedulingTopology.getVertices()).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()), this.executionFailureHandler.getGlobalFailureHandlingResult(new Exception("test failure")).getVerticesToRestart());
    }
}
