package org.apache.flink.streaming.api.functions.source.datagen;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.BlockingSourceContext;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/datagen/DataGeneratorSourceTest.class */
public class DataGeneratorSourceTest {
    @Test
    void testRandomGenerator() throws Exception {
        final DataGeneratorSource dataGeneratorSource = new DataGeneratorSource(RandomGenerator.longGenerator(10L, 20L));
        new AbstractStreamOperatorTestHarness((StreamOperator) new StreamSource(dataGeneratorSource), 1, 1, 0).open();
        final int i = 1000;
        final ArrayList arrayList = new ArrayList();
        dataGeneratorSource.run(new SourceFunction.SourceContext<Long>() { // from class: org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSourceTest.1
            private Object lock = new Object();
            private int emitNumber = 0;

            public void collect(Long l) {
                int i2 = this.emitNumber + 1;
                this.emitNumber = i2;
                if (i2 > i) {
                    dataGeneratorSource.isRunning = false;
                }
                arrayList.add(l);
            }

            public void collectWithTimestamp(Long l, long j) {
            }

            public void emitWatermark(Watermark watermark) {
            }

            public void markAsTemporarilyIdle() {
            }

            public Object getCheckpointLock() {
                return this.lock;
            }

            public void close() {
            }
        });
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat((Long) it.next()).isBetween(10L, 20L);
        }
    }

    @Test
    void testSequenceCheckpointRestore() throws Exception {
        HashSet hashSet = new HashSet();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 > 100) {
                innerTestDataGenCheckpointRestore(() -> {
                    return new DataGeneratorSource(SequenceGenerator.longGenerator(0L, 100L));
                }, hashSet);
                return;
            } else {
                hashSet.add(Long.valueOf(j2));
                j = j2 + 1;
            }
        }
    }

    public static <T> void innerTestDataGenCheckpointRestore(Supplier<DataGeneratorSource<T>> supplier, Set<T> set) throws Exception {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        OneShotLatch oneShotLatch3 = new OneShotLatch();
        OneShotLatch oneShotLatch4 = new OneShotLatch();
        DataGeneratorSource<T> dataGeneratorSource = supplier.get();
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator) new StreamSource(dataGeneratorSource), 2, 2, 0);
        abstractStreamOperatorTestHarness.open();
        DataGeneratorSource<T> dataGeneratorSource2 = supplier.get();
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness2 = new AbstractStreamOperatorTestHarness((StreamOperator) new StreamSource(dataGeneratorSource2), 2, 2, 1);
        abstractStreamOperatorTestHarness2.open();
        Thread thread = new Thread(() -> {
            try {
                dataGeneratorSource.run(new BlockingSourceContext("1", oneShotLatch, oneShotLatch2, concurrentHashMap, 21));
            } catch (Throwable th) {
                th.printStackTrace();
            }
        });
        Thread thread2 = new Thread(() -> {
            try {
                dataGeneratorSource2.run(new BlockingSourceContext("2", oneShotLatch3, oneShotLatch4, concurrentHashMap, 32));
            } catch (Throwable th) {
                th.printStackTrace();
            }
        });
        thread.start();
        thread2.start();
        if (!oneShotLatch.isTriggered()) {
            oneShotLatch.await();
        }
        if (!oneShotLatch3.isTriggered()) {
            oneShotLatch3.await();
        }
        OperatorSubtaskState repackageState = AbstractStreamOperatorTestHarness.repackageState(abstractStreamOperatorTestHarness.snapshot(0L, 0L), abstractStreamOperatorTestHarness2.snapshot(0L, 0L));
        DataGeneratorSource<T> dataGeneratorSource3 = supplier.get();
        StreamSource streamSource = new StreamSource(dataGeneratorSource3);
        OperatorSubtaskState repartitionOperatorState = AbstractStreamOperatorTestHarness.repartitionOperatorState(repackageState, 2, 2, 1, 0);
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness3 = new AbstractStreamOperatorTestHarness((StreamOperator) streamSource, 2, 1, 0);
        abstractStreamOperatorTestHarness3.setup();
        abstractStreamOperatorTestHarness3.initializeState(repartitionOperatorState);
        abstractStreamOperatorTestHarness3.open();
        OneShotLatch oneShotLatch5 = new OneShotLatch();
        OneShotLatch oneShotLatch6 = new OneShotLatch();
        oneShotLatch6.trigger();
        Thread thread3 = new Thread(() -> {
            try {
                dataGeneratorSource3.run(new BlockingSourceContext("3", oneShotLatch5, oneShotLatch6, concurrentHashMap, 3));
            } catch (Throwable th) {
                th.printStackTrace();
            }
        });
        thread3.start();
        thread3.join();
        Assertions.assertThat(concurrentHashMap).hasSize(3);
        HashSet hashSet = new HashSet(set.size());
        Iterator it = concurrentHashMap.entrySet().iterator();
        while (it.hasNext()) {
            List list = (List) concurrentHashMap.get((String) ((Map.Entry) it.next()).getKey());
            Assertions.assertThat(list).isNotEmpty();
            for (Object obj : list) {
                ((AbstractBooleanAssert) Assertions.assertThat(hashSet.add(obj)).as("Duplicate entry: " + obj, new Object[0])).isTrue();
                Assertions.assertThat(set).as("Unexpected element: " + obj, new Object[0]).contains(new Object[]{obj});
            }
        }
        Assertions.assertThat(hashSet).hasSameSizeAs(set);
        oneShotLatch2.trigger();
        oneShotLatch4.trigger();
        thread.join();
        thread2.join();
    }
}
