package org.apache.flink.streaming.runtime.operators.sink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/GlobalStreamingCommitterHandlerTest.class */
public class GlobalStreamingCommitterHandlerTest extends TestLogger {
    @Test(expected = IllegalStateException.class)
    public void throwExceptionWithoutSerializer() throws Exception {
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(new TestSink.DefaultGlobalCommitter(), null);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
    }

    @Test(expected = IllegalStateException.class)
    public void throwExceptionWithoutCommitter() throws Exception {
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(null, TestSink.StringCommittableSerializer.INSTANCE);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
    }

    @Test
    public void supportRetryInNextCommit() throws Exception {
        List asList = Arrays.asList("lazy", "leaf");
        TestSink.RetryOnceGlobalCommitter retryOnceGlobalCommitter = new TestSink.RetryOnceGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(retryOnceGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements(SinkTestUtil.committableRecords(asList));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        MatcherAssert.assertThat(retryOnceGlobalCommitter.getCommittedData(), Matchers.hasSize(0));
        createTestHarness.snapshot(2L, 2L);
        createTestHarness.notifyOfCompletedCheckpoint(2L);
        MatcherAssert.assertThat(retryOnceGlobalCommitter.getCommittedData(), Matchers.contains(new String[]{"lazy|leaf"}));
        createTestHarness.close();
    }

    @Test
    public void supportRetryByTime() throws Exception {
        List asList = Arrays.asList("lazy", "leaf");
        TestSink.RetryOnceGlobalCommitter retryOnceGlobalCommitter = new TestSink.RetryOnceGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(retryOnceGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements(SinkTestUtil.committableRecords(asList));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        MatcherAssert.assertThat(retryOnceGlobalCommitter.getCommittedData(), Matchers.hasSize(0));
        createTestHarness.getProcessingTimeService().setCurrentTime(Long.MAX_VALUE);
        MatcherAssert.assertThat(retryOnceGlobalCommitter.getCommittedData(), Matchers.contains(new String[]{"lazy|leaf"}));
        createTestHarness.close();
    }

    @Test
    public void closeCommitter() throws Exception {
        TestSink.DefaultGlobalCommitter defaultGlobalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.close();
        MatcherAssert.assertThat(Boolean.valueOf(defaultGlobalCommitter.isClosed()), Matchers.is(true));
    }

    @Test
    public void restoredFromMergedState() throws Exception {
        List<String> asList = Arrays.asList("host", "drop");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(createTestHarness(), SinkTestUtil.toBytes(asList));
        List<String> asList2 = Arrays.asList("future", "evil", "how");
        OperatorSubtaskState buildSubtaskState2 = TestHarnessUtil.buildSubtaskState(createTestHarness(), SinkTestUtil.toBytes(asList2));
        TestSink.DefaultGlobalCommitter defaultGlobalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeState(OneInputStreamOperatorTestHarness.repartitionOperatorState(OneInputStreamOperatorTestHarness.repackageState(buildSubtaskState, buildSubtaskState2), 2, 2, 1, 0));
        createTestHarness.open();
        ArrayList arrayList = new ArrayList();
        arrayList.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(asList));
        arrayList.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(asList2));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        createTestHarness.close();
        MatcherAssert.assertThat(defaultGlobalCommitter.getCommittedData(), Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void commitMultipleStagesTogether() throws Exception {
        TestSink.DefaultGlobalCommitter defaultGlobalCommitter = new TestSink.DefaultGlobalCommitter();
        List<String> asList = Arrays.asList("cautious", "nature");
        List<String> asList2 = Arrays.asList("count", "over");
        List<String> asList3 = Arrays.asList("lawyer", "grammar");
        ArrayList arrayList = new ArrayList();
        arrayList.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(asList));
        arrayList.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(asList2));
        arrayList.add(TestSink.DefaultGlobalCommitter.COMBINER.apply(asList3));
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements(SinkTestUtil.committableRecords(asList));
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.processElements(SinkTestUtil.committableRecords(asList2));
        createTestHarness.snapshot(2L, 2L);
        createTestHarness.processElements(SinkTestUtil.committableRecords(asList3));
        createTestHarness.snapshot(3L, 3L);
        createTestHarness.notifyOfCompletedCheckpoint(3L);
        createTestHarness.close();
        MatcherAssert.assertThat(defaultGlobalCommitter.getCommittedData(), Matchers.containsInAnyOrder(arrayList.toArray()));
    }

    @Test
    public void filterRecoveredCommittables() throws Exception {
        List<String> asList = Arrays.asList("silent", "elder", "patience");
        String apply = TestSink.DefaultGlobalCommitter.COMBINER.apply(asList);
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(createTestHarness(), SinkTestUtil.toBytes(asList));
        TestSink.DefaultGlobalCommitter defaultGlobalCommitter = new TestSink.DefaultGlobalCommitter(apply);
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeState(buildSubtaskState);
        createTestHarness.open();
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        Assert.assertTrue(defaultGlobalCommitter.getCommittedData().isEmpty());
        createTestHarness.close();
    }

    @Test
    public void endOfInput() throws Exception {
        TestSink.DefaultGlobalCommitter defaultGlobalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.snapshot(1L, 1L);
        createTestHarness.endInput();
        createTestHarness.notifyOfCompletedCheckpoint(1L);
        createTestHarness.close();
        MatcherAssert.assertThat(defaultGlobalCommitter.getCommittedData(), Matchers.contains(new String[]{TestSink.END_OF_INPUT_STR}));
    }

    private OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness() throws Exception {
        return createTestHarness(new TestSink.DefaultGlobalCommitter(), TestSink.StringCommittableSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness(GlobalCommitter<String, String> globalCommitter) throws Exception {
        return createTestHarness(globalCommitter, TestSink.StringCommittableSerializer.INSTANCE);
    }

    private OneInputStreamOperatorTestHarness<byte[], byte[]> createTestHarness(GlobalCommitter<String, String> globalCommitter, SimpleVersionedSerializer<String> simpleVersionedSerializer) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new CommitterOperatorFactory(TestSink.newBuilder().setCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).setGlobalCommitter(globalCommitter).setGlobalCommittableSerializer(simpleVersionedSerializer).build(), false));
    }
}
