package org.apache.flink.runtime.io.network.api.writer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.class */
public class RecordWriterDelegateTest extends TestLogger {
    private static final int recordSize = 8;
    private static final int numberOfBuffers = 10;
    private static final int memorySegmentSize = 128;
    private NetworkBufferPool globalPool;

    @Before
    public void setup() {
        Assert.assertEquals("Illegal memory segment size,", 0L, 0L);
        this.globalPool = new NetworkBufferPool(10, 128);
    }

    @After
    public void teardown() {
        this.globalPool.destroyAllBufferPools();
        this.globalPool.destroy();
    }

    @Test
    public void testSingleRecordWriterAvailability() throws Exception {
        RecordWriter createRecordWriter = createRecordWriter(this.globalPool);
        SingleRecordWriter singleRecordWriter = new SingleRecordWriter(createRecordWriter);
        Assert.assertEquals(createRecordWriter, singleRecordWriter.getRecordWriter(0));
        verifyAvailability(singleRecordWriter);
    }

    @Test
    public void testMultipleRecordWritersAvailability() throws Exception {
        ArrayList arrayList = new ArrayList(2);
        for (int i = 0; i < 2; i++) {
            arrayList.add(createRecordWriter(this.globalPool));
        }
        MultipleRecordWriters multipleRecordWriters = new MultipleRecordWriters(arrayList);
        for (int i2 = 0; i2 < 2; i2++) {
            Assert.assertEquals(arrayList.get(i2), multipleRecordWriters.getRecordWriter(i2));
        }
        verifyAvailability(multipleRecordWriters);
    }

    @Test
    public void testSingleRecordWriterBroadcastEvent() throws Exception {
        ResultPartition createResultPartition = RecordWriterTest.createResultPartition(128, 2);
        verifyBroadcastEvent(new SingleRecordWriter(new RecordWriterBuilder().build(createResultPartition)), Collections.singletonList(createResultPartition));
    }

    @Test
    public void testMultipleRecordWritersBroadcastEvent() throws Exception {
        ArrayList arrayList = new ArrayList(2);
        ArrayList arrayList2 = new ArrayList(2);
        for (int i = 0; i < 2; i++) {
            ResultPartition createResultPartition = RecordWriterTest.createResultPartition(128, 2);
            arrayList2.add(createResultPartition);
            arrayList.add(new RecordWriterBuilder().build(createResultPartition));
        }
        verifyBroadcastEvent(new MultipleRecordWriters(arrayList), arrayList2);
    }

    private RecordWriter createRecordWriter(NetworkBufferPool networkBufferPool) throws Exception {
        BufferPool createBufferPool = networkBufferPool.createBufferPool(1, 1, 1, Integer.MAX_VALUE);
        ResultPartition build = new ResultPartitionBuilder().setBufferPoolFactory(() -> {
            return createBufferPool;
        }).build();
        build.setup();
        return new RecordWriterBuilder().build(build);
    }

    private void verifyAvailability(RecordWriterDelegate recordWriterDelegate) throws Exception {
        Assert.assertTrue(recordWriterDelegate.isAvailable());
        Assert.assertTrue(recordWriterDelegate.getAvailableFuture().isDone());
        RecordWriter recordWriter = recordWriterDelegate.getRecordWriter(0);
        for (int i = 0; i < 16; i++) {
            recordWriter.emit(new IntValue(i));
        }
        Assert.assertFalse(recordWriterDelegate.isAvailable());
        CompletableFuture availableFuture = recordWriterDelegate.getAvailableFuture();
        Assert.assertFalse(availableFuture.isDone());
        recordWriter.getTargetPartition().createSubpartitionView(0, new NoOpBufferAvailablityListener()).getNextBuffer().buffer().recycleBuffer();
        Assert.assertTrue(availableFuture.isDone());
        Assert.assertTrue(recordWriterDelegate.isAvailable());
        Assert.assertTrue(recordWriterDelegate.getAvailableFuture().isDone());
    }

    private void verifyBroadcastEvent(RecordWriterDelegate recordWriterDelegate, List<ResultPartition> list) throws Exception {
        CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(1L);
        recordWriterDelegate.broadcastEvent(cancelCheckpointMarker);
        for (ResultPartition resultPartition : list) {
            for (int i = 0; i < resultPartition.getNumberOfSubpartitions(); i++) {
                Assert.assertEquals(1L, resultPartition.getNumberOfQueuedBuffers(i));
                BufferOrEvent parseBuffer = RecordWriterTest.parseBuffer(resultPartition.createSubpartitionView(i, new NoOpBufferAvailablityListener()).getNextBuffer().buffer(), i);
                Assert.assertTrue(parseBuffer.isEvent());
                Assert.assertEquals(cancelCheckpointMarker, parseBuffer.getEvent());
            }
        }
    }
}
