package org.apache.flink.runtime.io.network.partition;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.testutils.serialization.types.ByteArrayType;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/FileBufferReaderITCase.class */
public class FileBufferReaderITCase extends TestLogger {
    private static final int parallelism = 8;
    private static final int numRecords = 100000;
    private static final int bufferSize = 4096;
    private static final int headerSize = 8;
    private static final int recordSize = 4088;
    private static final byte[] dataSource = new byte[recordSize];

    @Parameterized.Parameter
    public boolean sslEnabled;

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/FileBufferReaderITCase$TestSinkInvokable.class */
    public static final class TestSinkInvokable extends AbstractInvokable {
        private int numReceived;

        public TestSinkInvokable(Environment environment) {
            super(environment);
            this.numReceived = 0;
        }

        public void invoke() throws Exception {
            RecordReader recordReader = new RecordReader(getEnvironment().getInputGate(0), ByteArrayType.class, getEnvironment().getTaskManagerInfo().getTmpDirectories());
            while (recordReader.hasNext()) {
                recordReader.next();
                this.numReceived++;
            }
            Assert.assertThat(Integer.valueOf(this.numReceived), Matchers.is(Integer.valueOf(FileBufferReaderITCase.numRecords)));
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/FileBufferReaderITCase$TestSourceInvokable.class */
    public static final class TestSourceInvokable extends AbstractInvokable {
        public TestSourceInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordWriter build = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
            ByteArrayType byteArrayType = new ByteArrayType(FileBufferReaderITCase.dataSource);
            int i = 0;
            while (true) {
                int i2 = i;
                i++;
                if (i2 >= FileBufferReaderITCase.numRecords) {
                    return;
                }
                build.emit(byteArrayType);
                build.flushAll();
            }
        }
    }

    @Parameterized.Parameters(name = "SSL Enabled = {0}")
    public static List<Boolean> paras() {
        return Arrays.asList(true, false);
    }

    @BeforeClass
    public static void setup() {
        for (int i = 0; i < dataSource.length; i++) {
            dataSource[i] = 0;
        }
    }

    @Test
    public void testSequentialReading() throws Exception {
        Configuration createInternalSslConfigWithKeyAndTrustStores = this.sslEnabled ? SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores("JDK") : new Configuration();
        createInternalSslConfigWithKeyAndTrustStores.setString(RestOptions.BIND_PORT, "0");
        createInternalSslConfigWithKeyAndTrustStores.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
        createInternalSslConfigWithKeyAndTrustStores.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1g"));
        createInternalSslConfigWithKeyAndTrustStores.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096b"));
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(createInternalSslConfigWithKeyAndTrustStores).setNumTaskManagers(8).setNumSlotsPerTaskManager(1).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                miniCluster.executeJobBlocking(createJobGraph());
                if (miniCluster != null) {
                    if (0 == 0) {
                        miniCluster.close();
                        return;
                    }
                    try {
                        miniCluster.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th4;
        }
    }

    private static JobGraph createJobGraph() {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        SlotSharingGroup slotSharingGroup2 = new SlotSharingGroup();
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(TestSourceInvokable.class);
        jobVertex.setParallelism(8);
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(TestSinkInvokable.class);
        jobVertex2.setParallelism(8);
        jobVertex2.setSlotSharingGroup(slotSharingGroup2);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        return JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2);
    }
}
