package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl;
import org.apache.flink.util.clock.ManualClock;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/PhysicalSlotRequestBulkCheckerImplTest.class */
class PhysicalSlotRequestBulkCheckerImplTest {
    private static final Time TIMEOUT = Time.milliseconds(50);
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;
    private final ManualClock clock = new ManualClock();
    private PhysicalSlotRequestBulkCheckerImpl bulkChecker;
    private Set<PhysicalSlot> slots;
    private Supplier<Set<SlotInfo>> slotsRetriever;

    PhysicalSlotRequestBulkCheckerImplTest() {
    }

    @BeforeAll
    private static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterAll
    private static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @BeforeEach
    private void setup() {
        this.slots = new HashSet();
        this.slotsRetriever = () -> {
            return new HashSet(this.slots);
        };
        this.bulkChecker = new PhysicalSlotRequestBulkCheckerImpl(this.slotsRetriever, this.clock);
        this.bulkChecker.start(mainThreadExecutor);
    }

    @Test
    void testPendingBulkIsNotCancelled() throws InterruptedException, ExecutionException {
        CompletableFuture<SlotRequestId> completableFuture = new CompletableFuture<>();
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createPhysicalSlotRequestBulkWithCancellationFuture(completableFuture, new SlotRequestId()), TIMEOUT);
        checkNotCancelledAfter(completableFuture, 2 * TIMEOUT.toMilliseconds());
    }

    @Test
    void testFulfilledBulkIsNotCancelled() throws InterruptedException, ExecutionException {
        CompletableFuture<SlotRequestId> completableFuture = new CompletableFuture<>();
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createPhysicalSlotRequestBulkWithCancellationFuture(completableFuture, new SlotRequestId()), TIMEOUT);
        checkNotCancelledAfter(completableFuture, 2 * TIMEOUT.toMilliseconds());
    }

    private static void checkNotCancelledAfter(CompletableFuture<?> completableFuture, long j) throws ExecutionException, InterruptedException {
        mainThreadExecutor.schedule(() -> {
        }, j, TimeUnit.MILLISECONDS).get();
        Assertions.assertThatThrownBy(() -> {
            FlinkAssertions.assertThatFuture(completableFuture).isNotDone();
            completableFuture.get(j, TimeUnit.MILLISECONDS);
        }).withFailMessage("The future must not have been cancelled", new Object[0]).isInstanceOf(TimeoutException.class);
        FlinkAssertions.assertThatFuture(completableFuture).isNotDone();
    }

    @Test
    void testUnfulfillableBulkIsCancelled() {
        CompletableFuture<SlotRequestId> completableFuture = new CompletableFuture<>();
        SlotRequestId slotRequestId = new SlotRequestId();
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createPhysicalSlotRequestBulkWithCancellationFuture(completableFuture, slotRequestId), TIMEOUT);
        this.clock.advanceTime(TIMEOUT.toMilliseconds() + 1, TimeUnit.MILLISECONDS);
        Assertions.assertThat(completableFuture.join()).isEqualTo(slotRequestId);
    }

    @Test
    void testBulkFulfilledOnCheck() {
        SlotRequestId slotRequestId = new SlotRequestId();
        PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(slotRequestId);
        createPhysicalSlotRequestBulk.markRequestFulfilled(slotRequestId, new AllocationID());
        Assertions.assertThat(checkBulkTimeout(new PhysicalSlotRequestBulkWithTimestamp(createPhysicalSlotRequestBulk))).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.FULFILLED);
    }

    @Test
    void testBulkTimeoutOnCheck() {
        PhysicalSlotRequestBulkWithTimestamp createPhysicalSlotRequestBulkWithTimestamp = createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
        this.clock.advanceTime(TIMEOUT.toMilliseconds() + 1, TimeUnit.MILLISECONDS);
        Assertions.assertThat(checkBulkTimeout(createPhysicalSlotRequestBulkWithTimestamp)).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.TIMEOUT);
    }

    @Test
    void testBulkPendingOnCheckIfFulfillable() {
        PhysicalSlotRequestBulkWithTimestamp createPhysicalSlotRequestBulkWithTimestamp = createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId());
        PhysicalSlotTestUtils.occupyPhysicalSlot(addOneSlot(), false);
        Assertions.assertThat(checkBulkTimeout(createPhysicalSlotRequestBulkWithTimestamp)).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING);
    }

    @Test
    void testBulkPendingOnCheckIfUnfulfillableButNotTimedOut() {
        Assertions.assertThat(checkBulkTimeout(createPhysicalSlotRequestBulkWithTimestamp(new SlotRequestId()))).isEqualTo(PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult.PENDING);
    }

    @Test
    void testBulkFulfillable() {
        PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new SlotRequestId());
        addOneSlot();
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isTrue();
    }

    @Test
    void testBulkUnfulfillableWithInsufficientSlots() {
        PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
        addOneSlot();
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isFalse();
    }

    @Test
    void testBulkUnfulfillableWithSlotAlreadyAssignedToBulk() {
        SlotRequestId slotRequestId = new SlotRequestId();
        PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(slotRequestId, new SlotRequestId());
        createPhysicalSlotRequestBulk.markRequestFulfilled(slotRequestId, addOneSlot().getAllocationId());
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isFalse();
    }

    @Test
    void testBulkUnfulfillableWithSlotOccupiedIndefinitely() {
        PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
        PhysicalSlot addOneSlot = addOneSlot();
        addOneSlot();
        PhysicalSlotTestUtils.occupyPhysicalSlot(addOneSlot, true);
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isFalse();
    }

    @Test
    void testBulkFulfillableWithSlotOccupiedTemporarily() {
        PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk = createPhysicalSlotRequestBulk(new SlotRequestId(), new SlotRequestId());
        PhysicalSlot addOneSlot = addOneSlot();
        addOneSlot();
        PhysicalSlotTestUtils.occupyPhysicalSlot(addOneSlot, false);
        Assertions.assertThat(isFulfillable(createPhysicalSlotRequestBulk)).isTrue();
    }

    private PhysicalSlotRequestBulkWithTimestamp createPhysicalSlotRequestBulkWithTimestamp(SlotRequestId... slotRequestIdArr) {
        PhysicalSlotRequestBulkWithTimestamp physicalSlotRequestBulkWithTimestamp = new PhysicalSlotRequestBulkWithTimestamp(createPhysicalSlotRequestBulk(slotRequestIdArr));
        physicalSlotRequestBulkWithTimestamp.markUnfulfillable(this.clock.relativeTimeMillis());
        return physicalSlotRequestBulkWithTimestamp;
    }

    private static PhysicalSlotRequestBulkImpl createPhysicalSlotRequestBulk(SlotRequestId... slotRequestIdArr) {
        TestingPhysicalSlotRequestBulkBuilder newBuilder = TestingPhysicalSlotRequestBulkBuilder.newBuilder();
        for (SlotRequestId slotRequestId : slotRequestIdArr) {
            newBuilder.addPendingRequest(slotRequestId, ResourceProfile.UNKNOWN);
        }
        return newBuilder.buildPhysicalSlotRequestBulkImpl();
    }

    private PhysicalSlotRequestBulk createPhysicalSlotRequestBulkWithCancellationFuture(CompletableFuture<SlotRequestId> completableFuture, SlotRequestId slotRequestId) {
        return TestingPhysicalSlotRequestBulkBuilder.newBuilder().addPendingRequest(slotRequestId, ResourceProfile.UNKNOWN).setCanceller((slotRequestId2, th) -> {
            completableFuture.complete(slotRequestId2);
        }).buildPhysicalSlotRequestBulkImpl();
    }

    private PhysicalSlot addOneSlot() {
        PhysicalSlot createPhysicalSlot = PhysicalSlotTestUtils.createPhysicalSlot();
        CompletableFuture.runAsync(() -> {
            this.slots.add(createPhysicalSlot);
        }, mainThreadExecutor).join();
        return createPhysicalSlot;
    }

    private PhysicalSlotRequestBulkCheckerImpl.TimeoutCheckResult checkBulkTimeout(PhysicalSlotRequestBulkWithTimestamp physicalSlotRequestBulkWithTimestamp) {
        return this.bulkChecker.checkPhysicalSlotRequestBulkTimeout(physicalSlotRequestBulkWithTimestamp, TIMEOUT);
    }

    private boolean isFulfillable(PhysicalSlotRequestBulk physicalSlotRequestBulk) {
        return PhysicalSlotRequestBulkCheckerImpl.isSlotRequestBulkFulfillable(physicalSlotRequestBulk, this.slotsRetriever);
    }
}
