package org.apache.flink.streaming.runtime.operators.sink;

import java.util.List;
import java.util.function.IntSupplier;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase.class */
abstract class CommitterOperatorTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/CommitterOperatorTestBase$SinkAndCounters.class */
    static class SinkAndCounters {
        SupportsCommitter<String> sink;
        IntSupplier commitCounter;

        public SinkAndCounters(SupportsCommitter<String> supportsCommitter, IntSupplier intSupplier) {
            this.sink = supportsCommitter;
            this.commitCounter = intSupplier;
        }
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testEmitCommittables(boolean z) throws Exception {
        SinkAndCounters sinkWithPostCommit = z ? sinkWithPostCommit() : sinkWithoutPostCommit();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new CommitterOperatorFactory(sinkWithPostCommit.sink, false, true));
        oneInputStreamOperatorTestHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 1, 1, 0);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(committableSummary));
        CommittableWithLineage<?> committableWithLineage = new CommittableWithLineage<>("1", 1L, 1);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(committableWithLineage));
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(sinkWithPostCommit.commitCounter.getAsInt()).isEqualTo(1);
        if (z) {
            List<StreamElement> fromOutput = SinkTestUtil.fromOutput(oneInputStreamOperatorTestHarness.getOutput());
            SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(fromOutput.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
            SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(1))).isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        } else {
            Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        }
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exception {
        SinkAndCounters sinkWithPostCommit = sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness = createTestHarness(sinkWithPostCommit.sink, false, true);
        createTestHarness.open();
        createTestHarness.setProcessingTime(0L);
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 2, 2, 0);
        createTestHarness.processElement(new StreamRecord<>(committableSummary));
        CommittableWithLineage<?> committableWithLineage = new CommittableWithLineage<>("1", 1L, 1);
        createTestHarness.processElement(new StreamRecord<>(committableWithLineage));
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(createTestHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkWithPostCommit.commitCounter.getAsInt()).isZero();
        CommittableWithLineage<?> committableWithLineage2 = new CommittableWithLineage<>("2", 1L, 1);
        createTestHarness.processElement(new StreamRecord<>(committableWithLineage2));
        createTestHarness.getProcessingTimeService().setCurrentTime(2000L);
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(createTestHarness.getOutput());
        Assertions.assertThat(fromOutput).hasSize(3);
        Assertions.assertThat(sinkWithPostCommit.commitCounter.getAsInt()).isEqualTo(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(fromOutput.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(1))).isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(2))).isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage2, 0));
        createTestHarness.close();
    }

    @Test
    void testImmediatelyCommitLateCommittables() throws Exception {
        SinkAndCounters sinkWithPostCommit = sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness = createTestHarness(sinkWithPostCommit.sink, false, true);
        createTestHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, 1L, 1, 1, 0);
        createTestHarness.processElement(new StreamRecord<>(committableSummary));
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(createTestHarness.getOutput()).isEmpty();
        CommittableWithLineage<?> committableWithLineage = new CommittableWithLineage<>("1", 1L, 1);
        createTestHarness.processElement(new StreamRecord<>(committableWithLineage));
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(createTestHarness.getOutput());
        Assertions.assertThat(fromOutput).hasSize(2);
        Assertions.assertThat(sinkWithPostCommit.commitCounter.getAsInt()).isEqualTo(1);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(fromOutput.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(1))).isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        createTestHarness.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testEmitAllCommittablesOnEndOfInput(boolean z) throws Exception {
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness = createTestHarness(sinkWithPostCommit().sink, z, !z);
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord<>(new CommittableSummary(1, 2, (Long) null, 1, 1, 0)));
        createTestHarness.processElement(new StreamRecord<>(new CommittableSummary(2, 2, (Long) null, 1, 1, 0)));
        CommittableWithLineage<?> committableWithLineage = new CommittableWithLineage<>("1", (Long) null, 1);
        createTestHarness.processElement(new StreamRecord<>(committableWithLineage));
        CommittableWithLineage<?> committableWithLineage2 = new CommittableWithLineage<>("1", (Long) null, 2);
        createTestHarness.processElement(new StreamRecord<>(committableWithLineage2));
        createTestHarness.endInput();
        if (!z) {
            Assertions.assertThat(createTestHarness.getOutput()).isEmpty();
            createTestHarness.notifyOfCompletedCheckpoint(1L);
        }
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(createTestHarness.getOutput());
        Assertions.assertThat(fromOutput).hasSize(3);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(fromOutput.get(0))).hasFailedCommittables(0).hasOverallCommittables(2).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(1))).isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(2))).isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage2, 0));
        createTestHarness.close();
    }

    @Test
    void testStateRestore() throws Exception {
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness = createTestHarness(sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0);
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord<>(new CommittableSummary(0, 1, 0L, 1, 1, 0)));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage("1", 0L, 0);
        createTestHarness.processElement(new StreamRecord<>(committableWithLineage));
        createTestHarness.processElement(new StreamRecord<>(new CommittableSummary(1, 1, 0L, 1, 1, 0)));
        CommittableWithLineage committableWithLineage2 = new CommittableWithLineage("2", 0L, 1);
        createTestHarness.processElement(new StreamRecord<>(committableWithLineage2));
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 2L);
        createTestHarness.notifyOfCompletedCheckpoint(0L);
        Assertions.assertThat(createTestHarness.getOutput()).isEmpty();
        createTestHarness.close();
        SinkAndCounters sinkWithPostCommit = sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness2 = createTestHarness(sinkWithPostCommit.sink, false, true, 10, 10, 9);
        createTestHarness2.initializeState(snapshot);
        createTestHarness2.open();
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(createTestHarness2.getOutput());
        Assertions.assertThat(fromOutput).hasSize(3);
        Assertions.assertThat(sinkWithPostCommit.commitCounter.getAsInt()).isEqualTo(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(fromOutput.get(0))).hasCheckpointId(0L).hasFailedCommittables(0).hasOverallCommittables(2).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(1))).isEqualTo(new CommittableWithLineage<>(committableWithLineage.getCommittable(), 0L, 9));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(2))).isEqualTo(new CommittableWithLineage<>(committableWithLineage2.getCommittable(), 0L, 9));
        createTestHarness2.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testHandleEndInputInStreamingMode(boolean z) throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new CommitterOperatorFactory(sinkWithPostCommit().sink, false, z));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(new CommittableSummary(1, 1, 1L, 1, 1, 0)));
        CommittableWithLineage<?> committableWithLineage = new CommittableWithLineage<>("1", 1L, 1);
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(committableWithLineage));
        oneInputStreamOperatorTestHarness.endInput();
        if (z) {
            oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(1L);
        }
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(oneInputStreamOperatorTestHarness.getOutput());
        Assertions.assertThat(fromOutput).hasSize(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(fromOutput.get(0))).hasCheckpointId(1L).hasPendingCommittables(0).hasOverallCommittables(1).hasFailedCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(fromOutput.get(1))).isEqualTo(copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(2L);
        oneInputStreamOperatorTestHarness.endInput();
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).hasSize(2);
    }

    CommittableWithLineage<?> copyCommittableWithDifferentOrigin(CommittableWithLineage<?> committableWithLineage, int i) {
        return new CommittableWithLineage<>(committableWithLineage.getCommittable(), committableWithLineage.getCheckpointId().isPresent() ? Long.valueOf(committableWithLineage.getCheckpointId().getAsLong()) : null, i);
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness(SupportsCommitter<String> supportsCommitter, boolean z, boolean z2) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new CommitterOperatorFactory(supportsCommitter, z, z2));
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness(SupportsCommitter<String> supportsCommitter, boolean z, boolean z2, int i, int i2, int i3) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new CommitterOperatorFactory(supportsCommitter, z, z2), i, i2, i3);
    }

    abstract SinkAndCounters sinkWithPostCommit();

    abstract SinkAndCounters sinkWithPostCommitWithRetry();

    abstract SinkAndCounters sinkWithoutPostCommit();
}
