package org.apache.flink.streaming.api.operators;

import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.MockStateExecutor;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest.class */
class InternalTimerServiceAsyncImplTest {
    private AsyncExecutionController asyncExecutionController;
    private TestKeyContext keyContext;
    private TestProcessingTimeService processingTimeService;
    private InternalTimerServiceAsyncImpl<Integer, String> service;
    private StateFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler = new StateFutureImpl.AsyncFrameworkExceptionHandler() { // from class: org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImplTest.1
        public void handleException(String str, Throwable th) {
            throw new RuntimeException(str, th);
        }
    };

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest$TestKeyContext.class */
    private static class TestKeyContext implements KeyContext {
        private Object key;

        private TestKeyContext() {
        }

        public void setCurrentKey(Object obj) {
            this.key = obj;
        }

        public Object getCurrentKey() {
            return this.key;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/InternalTimerServiceAsyncImplTest$TestTriggerable.class */
    private static class TestTriggerable implements Triggerable<Integer, String> {
        private static int eventTriggerCount = 0;
        private static int processingTriggerCount = 0;

        private TestTriggerable() {
        }

        public void onEventTime(InternalTimer<Integer, String> internalTimer) throws Exception {
            eventTriggerCount++;
        }

        public void onProcessingTime(InternalTimer<Integer, String> internalTimer) throws Exception {
            processingTriggerCount++;
        }
    }

    InternalTimerServiceAsyncImplTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.asyncExecutionController = new AsyncExecutionController(new SyncMailboxExecutor(), this.exceptionHandler, new MockStateExecutor(), 128, 2, 1000L, 10);
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 128 - 1);
        this.keyContext = new TestKeyContext();
        this.processingTimeService = new TestProcessingTimeService();
        this.processingTimeService.setCurrentTime(0L);
        this.service = createInternalTimerService(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup(), keyGroupRange, this.keyContext, this.processingTimeService, IntSerializer.INSTANCE, StringSerializer.INSTANCE, new HeapPriorityQueueSetFactory(keyGroupRange, 128, 128), this.asyncExecutionController);
        int unused = TestTriggerable.processingTriggerCount = 0;
        int unused2 = TestTriggerable.eventTriggerCount = 0;
    }

    @Test
    void testTimerWithSameKey() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerProcessingTimeTimer("processing-timer-1", 1L);
        this.service.registerProcessingTimeTimer("processing-timer-2", 1L);
        this.service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, new TestTriggerable());
        Assertions.assertThat(TestTriggerable.processingTriggerCount).isEqualTo(0);
        this.processingTimeService.advance(1L);
        Assertions.assertThat(TestTriggerable.processingTriggerCount).isEqualTo(2);
    }

    @Test
    void testProcessingTimerFireOrder() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerProcessingTimeTimer("processing-timer-1", 1L);
        this.service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, new TestTriggerable());
        Assertions.assertThat(TestTriggerable.processingTriggerCount).isEqualTo(0);
        this.processingTimeService.advance(1L);
        Assertions.assertThat(TestTriggerable.processingTriggerCount).isEqualTo(1);
        this.keyContext.setCurrentKey("key-2");
        this.service.registerProcessingTimeTimer("processing-timer-2", 2L);
        Assertions.assertThat(TestTriggerable.processingTriggerCount).isEqualTo(1);
        RecordContext buildContext = this.asyncExecutionController.buildContext("record2", "key-2");
        this.asyncExecutionController.setCurrentContext(buildContext);
        this.asyncExecutionController.handleRequest((State) null, StateRequestType.VALUE_GET, (Object) null);
        this.processingTimeService.advance(1L);
        Assertions.assertThat(TestTriggerable.processingTriggerCount).isEqualTo(1);
        buildContext.release();
        this.processingTimeService.advance(1L);
        Assertions.assertThat(TestTriggerable.processingTriggerCount).isEqualTo(2);
    }

    @Test
    void testEventTimerFireOrder() throws Exception {
        this.keyContext.setCurrentKey("key-1");
        this.service.registerEventTimeTimer("event-timer-1", 1L);
        this.service.startTimerService(IntSerializer.INSTANCE, StringSerializer.INSTANCE, new TestTriggerable());
        Assertions.assertThat(TestTriggerable.eventTriggerCount).isEqualTo(0);
        this.service.advanceWatermark(1L);
        Assertions.assertThat(TestTriggerable.eventTriggerCount).isEqualTo(1);
        this.keyContext.setCurrentKey("key-2");
        this.service.registerEventTimeTimer("event-timer-2", 2L);
        Assertions.assertThat(TestTriggerable.eventTriggerCount).isEqualTo(1);
        RecordContext buildContext = this.asyncExecutionController.buildContext("record2", "key-2");
        this.asyncExecutionController.setCurrentContext(buildContext);
        this.asyncExecutionController.handleRequest((State) null, StateRequestType.VALUE_GET, (Object) null);
        this.service.advanceWatermark(2L);
        Assertions.assertThat(TestTriggerable.eventTriggerCount).isEqualTo(1);
        buildContext.release();
        this.service.advanceWatermark(3L);
        Assertions.assertThat(TestTriggerable.eventTriggerCount).isEqualTo(2);
    }

    private static <K, N> InternalTimerServiceAsyncImpl<K, N> createInternalTimerService(TaskIOMetricGroup taskIOMetricGroup, KeyGroupRange keyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, TypeSerializer<K> typeSerializer, TypeSerializer<N> typeSerializer2, PriorityQueueSetFactory priorityQueueSetFactory, AsyncExecutionController asyncExecutionController) {
        TimerSerializer timerSerializer = new TimerSerializer(typeSerializer, typeSerializer2);
        return new InternalTimerServiceAsyncImpl<>(taskIOMetricGroup, keyGroupRange, keyContext, processingTimeService, priorityQueueSetFactory.create("__async_processing_timers", timerSerializer), priorityQueueSetFactory.create("__async_event_timers", timerSerializer), StreamTaskCancellationContext.alwaysRunning(), asyncExecutionController);
    }
}
