package org.apache.flink.table.runtime.operators.wmassigners;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.dataformat.GenericRow;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/wmassigners/MiniBatchAssignerOperatorTest.class */
public class MiniBatchAssignerOperatorTest extends WatermarkAssignerOperatorTestBase {
    @Test
    public void testMiniBatchAssignerOperator() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new MiniBatchAssignerOperator(100L));
        long j = 0;
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{1L})));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{2L})));
        oneInputStreamOperatorTestHarness.processWatermark(new Watermark(2L));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{3L})));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{4L})));
        ConcurrentLinkedQueue output = oneInputStreamOperatorTestHarness.getOutput();
        long j2 = 1;
        long j3 = 0;
        while (true) {
            if (output.size() > 0) {
                Object poll = output.poll();
                Assert.assertNotNull(poll);
                Tuple2<Long, Long> validateElement = validateElement(poll, j2, j3);
                long longValue = ((Long) validateElement.f0).longValue();
                j3 = ((Long) validateElement.f1).longValue();
                if (poll instanceof Watermark) {
                    break;
                }
                Assert.assertEquals(j2, longValue - 1);
                j2++;
                Assert.assertEquals(0L, j3);
            } else {
                j += 10;
                oneInputStreamOperatorTestHarness.setProcessingTime(j);
            }
        }
        Assert.assertEquals(100L, j3);
        output.clear();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{4L})));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{5L})));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{6L})));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{7L})));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(GenericRow.of(new Object[]{8L})));
        ConcurrentLinkedQueue output2 = oneInputStreamOperatorTestHarness.getOutput();
        long j4 = 4;
        long j5 = 100;
        while (true) {
            if (output2.size() > 0) {
                Object poll2 = output2.poll();
                Assert.assertNotNull(poll2);
                Tuple2<Long, Long> validateElement2 = validateElement(poll2, j4, j5);
                long longValue2 = ((Long) validateElement2.f0).longValue();
                j5 = ((Long) validateElement2.f1).longValue();
                if (poll2 instanceof Watermark) {
                    Assert.assertEquals(200L, j5);
                    output2.clear();
                    oneInputStreamOperatorTestHarness.processWatermark(new Watermark(Long.MAX_VALUE));
                    Assert.assertEquals(Long.MAX_VALUE, ((Watermark) oneInputStreamOperatorTestHarness.getOutput().poll()).getTimestamp());
                    return;
                }
                Assert.assertEquals(j4, longValue2 - 1);
                j4++;
                Assert.assertEquals(100L, j5);
            } else {
                j += 10;
                oneInputStreamOperatorTestHarness.setProcessingTime(j);
            }
        }
    }
}
