package org.apache.flink.streaming.runtime.operators.windowing;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/windowing/ContinuousEventTimeTriggerTest.class */
class ContinuousEventTimeTriggerTest {
    ContinuousEventTimeTriggerTest() {
    }

    @Test
    void testTriggerHandlesAllOnTimerCalls() throws Exception {
        TriggerTestHarness triggerTestHarness = new TriggerTestHarness(ContinuousEventTimeTrigger.of(Time.milliseconds(5L)), new TimeWindow.Serializer());
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isZero();
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isZero();
        triggerTestHarness.advanceWatermark(10L);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(0L, 2L))).isEqualTo(TriggerResult.FIRE);
        triggerTestHarness.invokeOnEventTime(20L, new TimeWindow(0L, 2L));
    }

    @Test
    void testWindowSeparationAndFiring() throws Exception {
        TriggerTestHarness triggerTestHarness = new TriggerTestHarness(ContinuousEventTimeTrigger.of(Time.hours(1L)), new TimeWindow.Serializer());
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(0L, 2L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(0L, 2L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(0L, 2L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(2L, 4L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(2L, 4L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(4);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(0L, 2L))).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(2L, 4L))).isEqualTo(2);
        boolean z = false;
        for (Tuple2 tuple2 : triggerTestHarness.advanceWatermark(2L)) {
            if (((TimeWindow) tuple2.f0).equals(new TimeWindow(0L, 2L))) {
                z = true;
                Assertions.assertThat((Comparable) tuple2.f1).isEqualTo(TriggerResult.FIRE);
            }
        }
        Assertions.assertThat(z).isTrue();
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(0L, 2L))).isOne();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(2L, 4L))).isEqualTo(2);
        boolean z2 = false;
        for (Tuple2 tuple22 : triggerTestHarness.advanceWatermark(4L)) {
            if (((TimeWindow) tuple22.f0).equals(new TimeWindow(2L, 4L))) {
                z2 = true;
                Assertions.assertThat((Comparable) tuple22.f1).isEqualTo(TriggerResult.FIRE);
            }
        }
        Assertions.assertThat(z2).isTrue();
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(2);
    }

    @Test
    void testLateElementTriggersImmediately() throws Exception {
        TriggerTestHarness triggerTestHarness = new TriggerTestHarness(ContinuousEventTimeTrigger.of(Time.hours(1L)), new TimeWindow.Serializer());
        triggerTestHarness.advanceWatermark(2L);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(0L, 2L))).isEqualTo(TriggerResult.FIRE);
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isZero();
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isZero();
    }

    @Test
    void testClear() throws Exception {
        TriggerTestHarness triggerTestHarness = new TriggerTestHarness(ContinuousEventTimeTrigger.of(Time.hours(1L)), new TimeWindow.Serializer());
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(0L, 2L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(2L, 4L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(4);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(0L, 2L))).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(2L, 4L))).isEqualTo(2);
        triggerTestHarness.clearTriggerState(new TimeWindow(2L, 4L));
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isOne();
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(3);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(0L, 2L))).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(2L, 4L))).isOne();
        triggerTestHarness.clearTriggerState(new TimeWindow(0L, 2L));
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isZero();
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(2);
    }

    @Test
    void testMergingWindows() throws Exception {
        TriggerTestHarness triggerTestHarness = new TriggerTestHarness(ContinuousEventTimeTrigger.of(Time.hours(1L)), new TimeWindow.Serializer());
        Assertions.assertThat(ContinuousEventTimeTrigger.of(Time.hours(1L)).canMerge()).isTrue();
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(0L, 2L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.processElement(new StreamRecord(1), new TimeWindow(2L, 4L))).isEqualTo(TriggerResult.CONTINUE);
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(4);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(0L, 2L))).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(2L, 4L))).isEqualTo(2);
        triggerTestHarness.mergeWindows(new TimeWindow(0L, 4L), Lists.newArrayList(new TimeWindow[]{new TimeWindow(0L, 2L), new TimeWindow(2L, 4L)}));
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isOne();
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isEqualTo(5);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(0L, 2L))).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(2L, 4L))).isEqualTo(2);
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers(new TimeWindow(0L, 4L))).isOne();
        boolean z = false;
        for (Tuple2 tuple2 : triggerTestHarness.advanceWatermark(4L)) {
            if (((TimeWindow) tuple2.f0).equals(new TimeWindow(0L, 4L))) {
                z = true;
                Assertions.assertThat((Comparable) tuple2.f1).isEqualTo(TriggerResult.FIRE);
            }
        }
        Assertions.assertThat(z).isTrue();
        Assertions.assertThat(triggerTestHarness.numStateEntries()).isOne();
        Assertions.assertThat(triggerTestHarness.numProcessingTimeTimers()).isZero();
        Assertions.assertThat(triggerTestHarness.numEventTimeTimers()).isOne();
    }
}
