package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.ResultPartitionBytes;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.util.NoOpTierShuffleDescriptor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.IgnoreShutdownRejectedExecutionHandler;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.class */
class TieredResultPartitionTest {
    private static final int NUM_THREADS = 4;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private static final int NUM_TOTAL_BUFFERS = 1000;
    private static final int NUM_TOTAL_BYTES_IN_READ_POOL = 33554432;
    private FileChannelManager fileChannelManager;
    private NetworkBufferPool globalPool;
    private BatchShuffleReadBufferPool readBufferPool;
    private ScheduledExecutorService readIOExecutor;
    private TaskIOMetricGroup taskIOMetricGroup;

    @TempDir
    public Path tempDataPath;

    TieredResultPartitionTest() {
    }

    @BeforeEach
    void before() {
        this.fileChannelManager = new FileChannelManagerImpl(new String[]{this.tempDataPath.toString()}, "testing");
        this.globalPool = new NetworkBufferPool(1000, 1024);
        this.readBufferPool = new BatchShuffleReadBufferPool(33554432L, 1024);
        this.readIOExecutor = new ScheduledThreadPoolExecutor(4, new ExecutorThreadFactory("test-io-scheduler-thread"), new IgnoreShutdownRejectedExecutionHandler());
    }

    @AfterEach
    void after() throws Exception {
        this.fileChannelManager.close();
        this.globalPool.destroy();
        this.readBufferPool.destroy();
        this.readIOExecutor.shutdown();
    }

    @Test
    void testClose() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(1, 1);
        createTieredStoreResultPartition(1, createBufferPool, false).close();
        Assertions.assertThat(createBufferPool.isDestroyed()).isTrue();
    }

    @Test
    void testRelease() throws Exception {
        BufferPool createBufferPool = this.globalPool.createBufferPool(10, 10);
        TieredResultPartition createTieredStoreResultPartition = createTieredStoreResultPartition(2, createBufferPool, false);
        createTieredStoreResultPartition.emitRecord(ByteBuffer.allocate(5120), 1);
        createTieredStoreResultPartition.close();
        Assertions.assertThat(createBufferPool.isDestroyed()).isTrue();
        createTieredStoreResultPartition.release();
        while (((File[]) Preconditions.checkNotNull(this.fileChannelManager.getPaths()[0].listFiles())).length != 0) {
            Thread.sleep(10L);
        }
        Assertions.assertThat(1000).isEqualTo(this.globalPool.getNumberOfAvailableMemorySegments());
    }

    @Test
    void testMinMaxNetworkBuffersTieredResultPartition() {
        Pair minMaxNetworkBuffersPerResultPartition = NettyShuffleUtils.getMinMaxNetworkBuffersPerResultPartition(100, 5, 100, 10, 105, true, 103, ResultPartitionType.HYBRID_SELECTIVE);
        Assertions.assertThat((Integer) minMaxNetworkBuffersPerResultPartition.getLeft()).isEqualTo(103);
        Assertions.assertThat((Integer) minMaxNetworkBuffersPerResultPartition.getRight()).isEqualTo(Integer.MAX_VALUE);
    }

    @Test
    void testCreateSubpartitionViewAfterRelease() throws Exception {
        TieredResultPartition createTieredStoreResultPartition = createTieredStoreResultPartition(2, this.globalPool.createBufferPool(10, 10), false);
        createTieredStoreResultPartition.release();
        Assertions.assertThatThrownBy(() -> {
            createTieredStoreResultPartition.createSubpartitionView(new ResultSubpartitionIndexSet(0), new NoOpBufferAvailablityListener());
        }).isInstanceOf(IllegalStateException.class);
    }

    @Test
    void testEmitRecords() throws Exception {
        TieredResultPartition createTieredStoreResultPartition = createTieredStoreResultPartition(2, this.globalPool.createBufferPool(3, 3), false);
        Throwable th = null;
        try {
            try {
                createTieredStoreResultPartition.emitRecord(ByteBuffer.allocate(1024), 0);
                createTieredStoreResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
                verifySubpartitionBytes(2 * 1024, 1024);
                if (createTieredStoreResultPartition != null) {
                    if (0 == 0) {
                        createTieredStoreResultPartition.close();
                        return;
                    }
                    try {
                        createTieredStoreResultPartition.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTieredStoreResultPartition != null) {
                if (th != null) {
                    try {
                        createTieredStoreResultPartition.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTieredStoreResultPartition.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testMetricsUpdateForBroadcastOnlyResultPartition() throws Exception {
        TieredResultPartition createTieredStoreResultPartition = createTieredStoreResultPartition(2, this.globalPool.createBufferPool(3, 3), true);
        Throwable th = null;
        try {
            try {
                createTieredStoreResultPartition.broadcastRecord(ByteBuffer.allocate(1024));
                verifySubpartitionBytes(1024, 1024);
                if (createTieredStoreResultPartition != null) {
                    if (0 == 0) {
                        createTieredStoreResultPartition.close();
                        return;
                    }
                    try {
                        createTieredStoreResultPartition.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTieredStoreResultPartition != null) {
                if (th != null) {
                    try {
                        createTieredStoreResultPartition.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTieredStoreResultPartition.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testRequestBuffersAfterPoolSizeDecreased() throws IOException {
        BufferPool createBufferPool = this.globalPool.createBufferPool(1, 20);
        TieredResultPartition createTieredStoreResultPartitionWithStorageManager = createTieredStoreResultPartitionWithStorageManager(1, createBufferPool, false);
        ResultSubpartitionView createSubpartitionView = createTieredStoreResultPartitionWithStorageManager.createSubpartitionView(0, new NoOpBufferAvailablityListener());
        for (int i = 0; i < 10; i++) {
            createTieredStoreResultPartitionWithStorageManager.emitRecord(ByteBuffer.allocate(1024), 0);
        }
        verifySubpartitionBytes(10240);
        createBufferPool.setNumBuffers(1);
        createTieredStoreResultPartitionWithStorageManager.emitRecord(ByteBuffer.allocate(1024), 0);
        verifySubpartitionBytes(11264);
        createSubpartitionView.releaseAllResources();
        createTieredStoreResultPartitionWithStorageManager.release();
    }

    private TieredResultPartition createTieredStoreResultPartition(int i, BufferPool bufferPool, boolean z) throws IOException {
        TestingTierProducerAgent build = new TestingTierProducerAgent.Builder().build();
        TieredStorageResourceRegistry tieredStorageResourceRegistry = new TieredStorageResourceRegistry();
        TieredResultPartition tieredResultPartition = new TieredResultPartition("TieredStoreResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.HYBRID_SELECTIVE, i, i, new ResultPartitionManager(), new BufferCompressor(1024, NettyShuffleEnvironmentOptions.CompressionCodec.LZ4), () -> {
            return bufferPool;
        }, new TieredStorageProducerClient(i, z, new TestingBufferAccumulator(), (BufferCompressor) null, Collections.singletonList(build)), tieredStorageResourceRegistry, new TieredStorageNettyServiceImpl(tieredStorageResourceRegistry), Collections.emptyList(), new TestingTieredStorageMemoryManager.Builder().build());
        this.taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        tieredResultPartition.setup();
        tieredResultPartition.setMetricGroup(this.taskIOMetricGroup);
        return tieredResultPartition;
    }

    private TieredResultPartition createTieredStoreResultPartitionWithStorageManager(int i, BufferPool bufferPool, boolean z) throws IOException {
        TieredStorageConfiguration fromConfiguration = TieredStorageConfiguration.fromConfiguration(new Configuration());
        TieredStorageResourceRegistry tieredStorageResourceRegistry = new TieredStorageResourceRegistry();
        TieredResultPartition createTieredResultPartition = new TieredResultPartitionFactory(fromConfiguration, new TieredStorageNettyServiceImpl(tieredStorageResourceRegistry), tieredStorageResourceRegistry).createTieredResultPartition("TieredStoreResultPartitionTest", 0, new ResultPartitionID(), ResultPartitionType.HYBRID_SELECTIVE, i, i, Integer.MAX_VALUE, 1024, Boolean.valueOf(z), new ResultPartitionManager(), new BufferCompressor(1024, NettyShuffleEnvironmentOptions.CompressionCodec.LZ4), Arrays.asList(NoOpTierShuffleDescriptor.INSTANCE, NoOpTierShuffleDescriptor.INSTANCE), () -> {
            return bufferPool;
        }, this.fileChannelManager, this.readBufferPool, this.readIOExecutor, false);
        this.taskIOMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup();
        createTieredResultPartition.setup();
        createTieredResultPartition.setMetricGroup(this.taskIOMetricGroup);
        return createTieredResultPartition;
    }

    private void verifySubpartitionBytes(long... jArr) {
        IOMetrics createSnapshot = this.taskIOMetricGroup.createSnapshot();
        Assertions.assertThat(createSnapshot.getResultPartitionBytes()).hasSize(1);
        Assertions.assertThat(((ResultPartitionBytes) createSnapshot.getResultPartitionBytes().values().iterator().next()).getSubpartitionBytes()).containsExactly(jArr);
    }
}
