package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.class */
class BoundedBlockingSubpartitionAvailabilityTest {

    @TempDir
    private static Path tmpFolder;
    private static final int BUFFER_SIZE = 32768;

    BoundedBlockingSubpartitionAvailabilityTest() {
    }

    @Test
    void testInitiallyNotAvailable() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(10);
        CountingAvailabilityListener countingAvailabilityListener = new CountingAvailabilityListener();
        ResultSubpartitionView createView = PartitionTestUtils.createView(createPartitionWithData, countingAvailabilityListener);
        Assertions.assertThat(countingAvailabilityListener.numNotifications).isZero();
        createView.releaseAllResources();
        createPartitionWithData.release();
    }

    @Test
    void testUnavailableWhenBuffersExhausted() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(100000);
        ResultSubpartitionView createView = PartitionTestUtils.createView(createPartitionWithData, new CountingAvailabilityListener());
        List<ResultSubpartition.BufferAndBacklog> drainAvailableData = drainAvailableData(createView);
        Assertions.assertThat(createView.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        Assertions.assertThat(drainAvailableData.get(drainAvailableData.size() - 1).isDataAvailable()).isFalse();
        createView.releaseAllResources();
        createPartitionWithData.release();
    }

    @Test
    void testAvailabilityNotificationWhenBuffersReturn() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(100000);
        CountingAvailabilityListener countingAvailabilityListener = new CountingAvailabilityListener();
        ResultSubpartitionView createView = PartitionTestUtils.createView(createPartitionWithData, countingAvailabilityListener);
        List<ResultSubpartition.BufferAndBacklog> drainAvailableData = drainAvailableData(createView);
        drainAvailableData.get(0).buffer().recycleBuffer();
        drainAvailableData.get(1).buffer().recycleBuffer();
        Assertions.assertThat(createView.getAvailabilityAndBacklog(true).isAvailable()).isTrue();
        Assertions.assertThat(countingAvailabilityListener.numNotifications).isOne();
        createView.releaseAllResources();
        createPartitionWithData.release();
    }

    @Test
    void testNotAvailableWhenEmpty() throws Exception {
        ResultSubpartition createPartitionWithData = createPartitionWithData(100000);
        ResultSubpartitionView createReadView = createPartitionWithData.createReadView(new NoOpBufferAvailablityListener());
        drainAllData(createReadView);
        Assertions.assertThat(createReadView.getAvailabilityAndBacklog(true).isAvailable()).isFalse();
        createReadView.releaseAllResources();
        createPartitionWithData.release();
    }

    private static ResultSubpartition createPartitionWithData(int i) throws IOException {
        ResultSubpartition resultSubpartition = new ResultPartitionBuilder().setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT).setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE).setSSLEnabled(true).setFileChannelManager(new FileChannelManagerImpl(new String[]{TempDirUtils.newFolder(tmpFolder).getAbsolutePath()}, "data")).setNetworkBufferSize(32768).build().getAllPartitions()[0];
        writeBuffers(resultSubpartition, i);
        resultSubpartition.finish();
        return resultSubpartition;
    }

    private static void writeBuffers(ResultSubpartition resultSubpartition, int i) throws IOException {
        for (int i2 = 0; i2 < i; i2++) {
            resultSubpartition.add(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(32768));
        }
    }

    private static List<ResultSubpartition.BufferAndBacklog> drainAvailableData(ResultSubpartitionView resultSubpartitionView) throws Exception {
        ArrayList arrayList = new ArrayList();
        while (true) {
            ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
            if (nextBuffer == null) {
                return arrayList;
            }
            arrayList.add(nextBuffer);
        }
    }

    private static void drainAllData(ResultSubpartitionView resultSubpartitionView) throws Exception {
        while (true) {
            ResultSubpartition.BufferAndBacklog nextBuffer = resultSubpartitionView.getNextBuffer();
            if (nextBuffer == null) {
                return;
            } else {
                nextBuffer.buffer().recycleBuffer();
            }
        }
    }
}
