package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory;

import java.io.IOException;
import java.util.Arrays;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfSegmentEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionWriter;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.class */
public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceProducer {
    private final int numBuffersPerSegment;
    private final int subpartitionMaxQueuedBuffers;
    private final TieredStorageMemoryManager memoryManager;
    private final int[] currentSubpartitionWriteBuffers;
    private final boolean[] nettyConnectionEstablished;
    private final MemoryTierSubpartitionProducerAgent[] subpartitionProducerAgents;
    private final BufferCompressor bufferCompressor;

    public MemoryTierProducerAgent(TieredStoragePartitionId tieredStoragePartitionId, int i, int i2, int i3, int i4, boolean z, TieredStorageMemoryManager tieredStorageMemoryManager, TieredStorageNettyService tieredStorageNettyService, TieredStorageResourceRegistry tieredStorageResourceRegistry, BufferCompressor bufferCompressor) {
        Preconditions.checkArgument(i3 >= i2, "One segment should contain at least one buffer.");
        Preconditions.checkArgument(!z, "Broadcast only partition is not allowed to use the memory tier.");
        this.numBuffersPerSegment = i3 / i2;
        this.subpartitionMaxQueuedBuffers = i4;
        this.memoryManager = tieredStorageMemoryManager;
        this.currentSubpartitionWriteBuffers = new int[i];
        this.nettyConnectionEstablished = new boolean[i];
        this.subpartitionProducerAgents = new MemoryTierSubpartitionProducerAgent[i];
        this.bufferCompressor = bufferCompressor;
        Arrays.fill(this.currentSubpartitionWriteBuffers, 0);
        tieredStorageNettyService.registerProducer(tieredStoragePartitionId, this);
        for (int i5 = 0; i5 < i; i5++) {
            this.subpartitionProducerAgents[i5] = new MemoryTierSubpartitionProducerAgent(i5);
        }
        tieredStorageResourceRegistry.registerResource(tieredStoragePartitionId, this::releaseResources);
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent
    public boolean tryStartNewSegment(TieredStorageSubpartitionId tieredStorageSubpartitionId, int i, int i2) {
        boolean z = this.nettyConnectionEstablished[tieredStorageSubpartitionId.getSubpartitionId()] && this.subpartitionProducerAgents[tieredStorageSubpartitionId.getSubpartitionId()].numQueuedBuffers() < this.subpartitionMaxQueuedBuffers && this.memoryManager.getMaxNonReclaimableBuffers(TieredStorageUtils.getMemoryTierName()) - this.memoryManager.numOwnerRequestedBuffer(TieredStorageUtils.getMemoryTierName()) > Math.max(this.numBuffersPerSegment, i2) && this.memoryManager.ensureCapacity(Math.max(this.numBuffersPerSegment, i2));
        if (z) {
            this.subpartitionProducerAgents[tieredStorageSubpartitionId.getSubpartitionId()].updateSegmentId(i);
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent
    public boolean tryWrite(TieredStorageSubpartitionId tieredStorageSubpartitionId, Buffer buffer, Object obj, int i) {
        int subpartitionId = tieredStorageSubpartitionId.getSubpartitionId();
        if (this.currentSubpartitionWriteBuffers[subpartitionId] != 0 && this.currentSubpartitionWriteBuffers[subpartitionId] + 1 + i > this.numBuffersPerSegment) {
            appendEndOfSegmentEvent(subpartitionId);
            this.currentSubpartitionWriteBuffers[subpartitionId] = 0;
            return false;
        }
        Buffer compressBufferIfPossible = TieredStorageUtils.compressBufferIfPossible(buffer, this.bufferCompressor);
        if (compressBufferIfPossible.isBuffer()) {
            this.memoryManager.transferBufferOwnership(obj, TieredStorageUtils.getMemoryTierName(), compressBufferIfPossible);
        }
        int[] iArr = this.currentSubpartitionWriteBuffers;
        iArr[subpartitionId] = iArr[subpartitionId] + 1;
        addFinishedBuffer(compressBufferIfPossible, subpartitionId);
        return true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer
    public void connectionEstablished(TieredStorageSubpartitionId tieredStorageSubpartitionId, NettyConnectionWriter nettyConnectionWriter) {
        this.subpartitionProducerAgents[tieredStorageSubpartitionId.getSubpartitionId()].connectionEstablished(nettyConnectionWriter);
        this.nettyConnectionEstablished[tieredStorageSubpartitionId.getSubpartitionId()] = true;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyServiceProducer
    public void connectionBroken(NettyConnectionId nettyConnectionId) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent, java.lang.AutoCloseable
    public void close() {
    }

    private void releaseResources() {
        Arrays.stream(this.subpartitionProducerAgents).forEach((v0) -> {
            v0.release();
        });
    }

    private void appendEndOfSegmentEvent(int i) {
        try {
            MemorySegment wrap = MemorySegmentFactory.wrap(EventSerializer.toSerializedEvent(EndOfSegmentEvent.INSTANCE).array());
            addFinishedBuffer(new NetworkBuffer(wrap, FreeingBufferRecycler.INSTANCE, Buffer.DataType.END_OF_SEGMENT, wrap.size()), i);
        } catch (IOException e) {
            ExceptionUtils.rethrow(e, "Failed to append end of segment event,");
        }
    }

    private void addFinishedBuffer(Buffer buffer, int i) {
        this.subpartitionProducerAgents[i].addFinishedBuffer(buffer);
    }
}
