package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.concurrent.NeverCompleteFuture;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImplTest.class */
class ProcessingTimeServiceImplTest {
    private static final Duration TESTING_TIMEOUT = Duration.ofSeconds(10);
    private SystemProcessingTimeService timerService;

    ProcessingTimeServiceImplTest() {
    }

    @BeforeEach
    void setup() {
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        this.timerService = new SystemProcessingTimeService((v1) -> {
            r3.complete(v1);
        });
    }

    @AfterEach
    void teardown() {
        this.timerService.shutdownService();
    }

    @Test
    void testTimerRegistrationAndCancellation() throws TimeoutException, InterruptedException, ExecutionException {
        ProcessingTimeServiceImpl processingTimeServiceImpl = new ProcessingTimeServiceImpl(this.timerService, processingTimeCallback -> {
            return processingTimeCallback;
        });
        ScheduledFuture registerTimer = processingTimeServiceImpl.registerTimer(Long.MAX_VALUE, j -> {
        });
        Assertions.assertThat(this.timerService.getNumTasksScheduled()).isOne();
        Assertions.assertThat(registerTimer.cancel(false)).isTrue();
        Assertions.assertThat(registerTimer).isDone().isCancelled();
        CompletableFuture completableFuture = new CompletableFuture();
        ScheduledFuture registerTimer2 = processingTimeServiceImpl.registerTimer(0L, j2 -> {
            completableFuture.complete(null);
        });
        registerTimer2.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat(completableFuture).isDone();
        Assertions.assertThat(registerTimer2).isNotCancelled();
        CompletableFuture completableFuture2 = new CompletableFuture();
        ScheduledFuture scheduleAtFixedRate = processingTimeServiceImpl.scheduleAtFixedRate(j3 -> {
            completableFuture2.complete(null);
        }, 0L, Long.MAX_VALUE);
        completableFuture2.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat(scheduleAtFixedRate.cancel(false)).isTrue();
        Assertions.assertThat(scheduleAtFixedRate).isDone().isCancelled();
    }

    @Test
    void testQuiesce() throws Exception {
        ProcessingTimeServiceImpl processingTimeServiceImpl = new ProcessingTimeServiceImpl(this.timerService, processingTimeCallback -> {
            return processingTimeCallback;
        });
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        ScheduledFuture registerTimer = processingTimeServiceImpl.registerTimer(0L, j -> {
            completableFuture.complete(null);
            oneShotLatch.await();
        });
        completableFuture.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        CompletableFuture quiesce = processingTimeServiceImpl.quiesce();
        Assertions.assertThat(processingTimeServiceImpl.registerTimer(0L, j2 -> {
        })).isInstanceOf(NeverCompleteFuture.class);
        Assertions.assertThat(processingTimeServiceImpl.scheduleAtFixedRate(j3 -> {
        }, 0L, Long.MAX_VALUE)).isInstanceOf(NeverCompleteFuture.class);
        Assertions.assertThat(quiesce).isNotDone();
        oneShotLatch.trigger();
        registerTimer.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat(quiesce).isDone();
    }

    @Test
    void testQuiesceWhenNoRunningTimers() {
        Assertions.assertThat(new ProcessingTimeServiceImpl(this.timerService, processingTimeCallback -> {
            return processingTimeCallback;
        }).quiesce()).isDone();
    }
}
