package org.apache.flink.streaming.api.operators;

import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest.class */
class SourceOperatorAlignmentTest {

    @Nullable
    private SourceOperatorTestContext context;

    @Nullable
    private SourceOperator<Integer, MockSourceSplit> operator;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.api.operators.SourceOperatorAlignmentTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperatorAlignmentTest$PunctuatedGenerator$GenerationMode = new int[PunctuatedGenerator.GenerationMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperatorAlignmentTest$PunctuatedGenerator$GenerationMode[PunctuatedGenerator.GenerationMode.ALL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$operators$SourceOperatorAlignmentTest$PunctuatedGenerator$GenerationMode[PunctuatedGenerator.GenerationMode.ODD.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest$PunctuatedGenerator.class */
    public static class PunctuatedGenerator implements WatermarkGenerator<Integer> {
        private GenerationMode mode;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest$PunctuatedGenerator$GenerationMode.class */
        public enum GenerationMode {
            ALL,
            ODD
        }

        public PunctuatedGenerator() {
            this(GenerationMode.ALL);
        }

        public PunctuatedGenerator(GenerationMode generationMode) {
            this.mode = generationMode;
        }

        public void onEvent(Integer num, long j, WatermarkOutput watermarkOutput) {
            boolean z;
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$operators$SourceOperatorAlignmentTest$PunctuatedGenerator$GenerationMode[this.mode.ordinal()]) {
                case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                    z = true;
                    break;
                case 2:
                    z = j % 2 == 1;
                    break;
                default:
                    throw new IllegalArgumentException("Unknown mode: " + this.mode);
            }
            if (z) {
                watermarkOutput.emitWatermark(new Watermark(j));
            }
        }

        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
        }
    }

    SourceOperatorAlignmentTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.context = new SourceOperatorTestContext(false, WatermarkStrategy.forGenerator(context -> {
            return new PunctuatedGenerator();
        }).withTimestampAssigner((num, j) -> {
            return num.intValue();
        }).withWatermarkAlignment("group1", Duration.ofMillis(100L), Duration.ofMillis(1L)));
        this.operator = this.context.getOperator();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.context.close();
        this.context = null;
        this.operator = null;
    }

    @Test
    void testWatermarkAlignment() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
        mockSourceSplit.addRecord(1000);
        mockSourceSplit.addRecord(2000);
        mockSourceSplit.addRecord(3000);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        CollectingDataOutput<Integer> collectingDataOutput = new CollectingDataOutput<>();
        ArrayList arrayList = new ArrayList();
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
        arrayList.add(1000);
        this.context.getTimeService().advance(1L);
        assertLatestReportedWatermarkEvent(1000);
        assertOutput(collectingDataOutput, arrayList);
        Assertions.assertThat(this.operator.isAvailable()).isTrue();
        this.operator.handleOperatorEvent(new WatermarkAlignmentEvent(1000 - 1));
        Assertions.assertThat(this.operator.isAvailable()).isFalse();
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
        assertLatestReportedWatermarkEvent(1000);
        assertOutput(collectingDataOutput, arrayList);
        Assertions.assertThat(this.operator.isAvailable()).isFalse();
        this.operator.handleOperatorEvent(new WatermarkAlignmentEvent(1000 + 1));
        Assertions.assertThat(this.operator.isAvailable()).isTrue();
        this.operator.emitNext(collectingDataOutput);
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
        arrayList.add(2000);
        this.context.getTimeService().advance(1L);
        assertLatestReportedWatermarkEvent(2000);
        assertOutput(collectingDataOutput, arrayList);
        Assertions.assertThat(this.operator.isAvailable()).isFalse();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testWatermarkAlignmentWithIdleness(boolean z) throws Exception {
        SourceOperatorTestContext sourceOperatorTestContext = new SourceOperatorTestContext(true, WatermarkStrategy.forGenerator(context -> {
            return new PunctuatedGenerator(PunctuatedGenerator.GenerationMode.ODD);
        }).withWatermarkAlignment("group1", Duration.ofMillis(100L), Duration.ofMillis(1L)).withTimestampAssigner((num, j) -> {
            return num.intValue();
        }));
        Throwable th = null;
        try {
            try {
                SourceOperator<Integer, MockSourceSplit> operator = sourceOperatorTestContext.getOperator();
                operator.initializeState(sourceOperatorTestContext.createStateContext());
                operator.open();
                MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
                mockSourceSplit.addRecord(1);
                operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
                CollectingDataOutput<Integer> collectingDataOutput = new CollectingDataOutput<>();
                ArrayList arrayList = new ArrayList();
                Assertions.assertThat(operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
                arrayList.add(1);
                sourceOperatorTestContext.getTimeService().advance(1L);
                assertLatestReportedWatermarkEvent(sourceOperatorTestContext, 1);
                operator.handleOperatorEvent(new WatermarkAlignmentEvent(1 + 100));
                assertOutput(collectingDataOutput, arrayList);
                Assertions.assertThat(operator.isAvailable()).isTrue();
                Assertions.assertThat(operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
                sourceOperatorTestContext.getTimeService().advance(1L);
                assertLatestReportedWatermarkEvent(sourceOperatorTestContext, Long.MAX_VALUE);
                if (z) {
                    operator.handleOperatorEvent(new WatermarkAlignmentEvent(Watermark.MAX_WATERMARK.getTimestamp()));
                } else {
                    operator.handleOperatorEvent(new WatermarkAlignmentEvent(1 + 150));
                }
                MockSourceSplit mockSourceSplit2 = new MockSourceSplit(3);
                mockSourceSplit2.addRecord(2);
                operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit2), new MockSourceSplitSerializer()));
                Assertions.assertThat(operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
                arrayList.add(2);
                sourceOperatorTestContext.getTimeService().advance(1L);
                assertLatestReportedWatermarkEvent(sourceOperatorTestContext, 1);
                operator.handleOperatorEvent(new WatermarkAlignmentEvent(1 + 100));
                assertOutput(collectingDataOutput, arrayList);
                Assertions.assertThat(operator.isAvailable()).isTrue();
                if (sourceOperatorTestContext != null) {
                    if (0 == 0) {
                        sourceOperatorTestContext.close();
                        return;
                    }
                    try {
                        sourceOperatorTestContext.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (sourceOperatorTestContext != null) {
                if (th != null) {
                    try {
                        sourceOperatorTestContext.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    sourceOperatorTestContext.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testWatermarkAlignmentWithoutSplit() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        CollectingDataOutput<Integer> collectingDataOutput = new CollectingDataOutput<>();
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
        this.context.getTimeService().advance(1L);
        assertNoReportedWatermarkEvent(this.context);
        this.context.getTimeService().advance(1L);
        assertNoReportedWatermarkEvent(this.context);
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.NOTHING_AVAILABLE);
        MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
        mockSourceSplit.addRecord(10);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        ArrayList arrayList = new ArrayList();
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
        arrayList.add(10);
        this.context.getTimeService().advance(1L);
        assertLatestReportedWatermarkEvent(10);
        assertOutput(collectingDataOutput, arrayList);
    }

    @Test
    void testStopWhileWaitingForWatermarkAlignment() throws Exception {
        testWatermarkAlignment();
        CompletableFuture availableFuture = this.operator.getAvailableFuture();
        Assertions.assertThat(availableFuture).isNotDone();
        this.operator.stop(StopMode.NO_DRAIN);
        Assertions.assertThat(availableFuture).isDone();
        Assertions.assertThat(this.operator.isAvailable()).isTrue();
    }

    @Test
    void testReportedWatermarkDoNotDecrease() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(2);
        MockSourceSplit mockSourceSplit2 = new MockSourceSplit(3);
        mockSourceSplit.addRecord(2000);
        mockSourceSplit2.addRecord(1000);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        CollectingDataOutput collectingDataOutput = new CollectingDataOutput();
        this.operator.emitNext(collectingDataOutput);
        this.context.getTimeService().advance(1L);
        assertLatestReportedWatermarkEvent(2000);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit2), new MockSourceSplitSerializer()));
        this.operator.emitNext(collectingDataOutput);
        this.context.getTimeService().advance(1L);
        assertLatestReportedWatermarkEvent(2000);
    }

    @Test
    void testWatermarkAlignmentWhileSubtaskFinished() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.getReaderState().clear();
        this.operator.open();
        MockSourceSplit mockSourceSplit = new MockSourceSplit(1, 0, 1);
        mockSourceSplit.addRecord(1);
        this.operator.handleOperatorEvent(new AddSplitEvent(Collections.singletonList(mockSourceSplit), new MockSourceSplitSerializer()));
        CollectingDataOutput<Integer> collectingDataOutput = new CollectingDataOutput<>();
        ArrayList arrayList = new ArrayList();
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.MORE_AVAILABLE);
        arrayList.add(1);
        assertOutput(collectingDataOutput, arrayList);
        this.operator.handleOperatorEvent(new NoMoreSplitsEvent());
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.END_OF_DATA);
        Assertions.assertThat(this.operator.emitNext(collectingDataOutput)).isEqualTo(DataInputStatus.END_OF_INPUT);
        this.context.getTimeService().advance(1L);
        assertLatestReportedWatermarkEvent(Watermark.MAX_WATERMARK.getTimestamp());
    }

    private void assertOutput(CollectingDataOutput<Integer> collectingDataOutput, List<Integer> list) {
        Assertions.assertThat((List) collectingDataOutput.getEvents().stream().filter(obj -> {
            return obj instanceof StreamRecord;
        }).mapToInt(obj2 -> {
            return ((Integer) ((StreamRecord) obj2).getValue()).intValue();
        }).boxed().collect(Collectors.toList())).containsExactly(list.toArray(new Integer[0]));
    }

    private void assertLatestReportedWatermarkEvent(long j) {
        assertLatestReportedWatermarkEvent(this.context, j);
    }

    private void assertLatestReportedWatermarkEvent(SourceOperatorTestContext sourceOperatorTestContext, long j) {
        List list = (List) sourceOperatorTestContext.getGateway().getEventsSent().stream().filter(operatorEvent -> {
            return operatorEvent instanceof ReportedWatermarkEvent;
        }).collect(Collectors.toList());
        Assertions.assertThat(list).isNotEmpty();
        Assertions.assertThat(list.get(list.size() - 1)).isEqualTo(new ReportedWatermarkEvent(j));
    }

    private void assertNoReportedWatermarkEvent(SourceOperatorTestContext sourceOperatorTestContext) {
        Assertions.assertThat((List) sourceOperatorTestContext.getGateway().getEventsSent().stream().filter(operatorEvent -> {
            return operatorEvent instanceof ReportedWatermarkEvent;
        }).collect(Collectors.toList())).isEmpty();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -450625506:
                if (implMethodName.equals("lambda$testWatermarkAlignmentWithIdleness$e222e1c6$1")) {
                    z = 2;
                    break;
                }
                break;
            case -119662292:
                if (implMethodName.equals("lambda$setup$e0b9194e$1")) {
                    z = false;
                    break;
                }
                break;
            case 84835782:
                if (implMethodName.equals("lambda$setup$caa42ba5$1")) {
                    z = true;
                    break;
                }
                break;
            case 1021720108:
                if (implMethodName.equals("lambda$testWatermarkAlignmentWithIdleness$15cdedc9$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context -> {
                        return new PunctuatedGenerator();
                    };
                }
                break;
            case SourceOperatorTestContext.SUBTASK_INDEX /* 1 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;J)J")) {
                    return (num, j) -> {
                        return num.intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWatermarkGenerator") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/common/eventtime/WatermarkGeneratorSupplier$Context;)Lorg/apache/flink/api/common/eventtime/WatermarkGenerator;")) {
                    return context2 -> {
                        return new PunctuatedGenerator(PunctuatedGenerator.GenerationMode.ODD);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/eventtime/SerializableTimestampAssigner") && serializedLambda.getFunctionalInterfaceMethodName().equals("extractTimestamp") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;J)J") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/SourceOperatorAlignmentTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;J)J")) {
                    return (num2, j2) -> {
                        return num2.intValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
