package org.apache.flink.runtime.scheduler.adaptive;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/BackgroundTaskTest.class */
class BackgroundTaskTest {

    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> TEST_EXECUTOR_EXTENSION = new TestExecutorExtension<>(() -> {
        return Executors.newFixedThreadPool(2);
    });

    BackgroundTaskTest() {
    }

    @Test
    void testFinishedBackgroundTaskIsTerminated() {
        BackgroundTask finishedBackgroundTask = BackgroundTask.finishedBackgroundTask();
        FlinkAssertions.assertThatFuture(finishedBackgroundTask.getTerminationFuture()).isDone();
        finishedBackgroundTask.getTerminationFuture().join();
    }

    @Test
    void testFinishedBackgroundTaskDoesNotContainAResult() {
        FlinkAssertions.assertThatFuture(BackgroundTask.finishedBackgroundTask().getResultFuture()).isCompletedExceptionally();
    }

    @Test
    void testNormalCompletionOfBackgroundTask() {
        BackgroundTask runAfter = BackgroundTask.finishedBackgroundTask().runAfter(() -> {
            return "foobar";
        }, TEST_EXECUTOR_EXTENSION.getExecutor());
        Assertions.assertThat((String) runAfter.getResultFuture().join()).isEqualTo("foobar");
        runAfter.getTerminationFuture().join();
    }

    @Test
    void testExceptionalCompletionOfBackgroundTask() throws InterruptedException {
        Exception exc = new Exception("Test exception");
        BackgroundTask runAfter = BackgroundTask.finishedBackgroundTask().runAfter(() -> {
            throw exc;
        }, TEST_EXECUTOR_EXTENSION.getExecutor());
        Assertions.assertThatThrownBy(() -> {
        }).withFailMessage("Expected an exceptionally completed result future.", new Object[0]).isInstanceOf(ExecutionException.class).hasCause(exc);
        runAfter.getTerminationFuture().join();
    }

    @Test
    void testRunAfterExecutesBackgroundTaskAfterPreviousHasCompleted() {
        OneShotLatch oneShotLatch = new OneShotLatch();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        BackgroundTask runAfter = BackgroundTask.initialBackgroundTask(() -> {
            oneShotLatch.await();
            arrayBlockingQueue.offer(1);
            return null;
        }, TEST_EXECUTOR_EXTENSION.getExecutor()).runAfter(() -> {
            arrayBlockingQueue.offer(2);
            return null;
        }, TEST_EXECUTOR_EXTENSION.getExecutor());
        oneShotLatch.trigger();
        runAfter.getTerminationFuture().join();
        Assertions.assertThat(arrayBlockingQueue).contains(new Integer[]{1, 2});
    }

    @Test
    void testAbortSkipsTasksWhichHaveNotBeenStarted() {
        OneShotLatch oneShotLatch = new OneShotLatch();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        BackgroundTask runAfter = BackgroundTask.initialBackgroundTask(() -> {
            oneShotLatch.await();
            arrayBlockingQueue.offer(1);
            return null;
        }, TEST_EXECUTOR_EXTENSION.getExecutor()).runAfter(() -> {
            arrayBlockingQueue.offer(2);
            return null;
        }, TEST_EXECUTOR_EXTENSION.getExecutor());
        BackgroundTask runAfter2 = runAfter.runAfter(() -> {
            arrayBlockingQueue.offer(3);
            return null;
        }, TEST_EXECUTOR_EXTENSION.getExecutor());
        runAfter.abort();
        oneShotLatch.trigger();
        runAfter2.getTerminationFuture().join();
        Assertions.assertThat(arrayBlockingQueue).contains(new Integer[]{1, 3});
    }
}
