package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.class */
public class TieredStorageProducerClient {
    private final boolean isBroadcastOnly;
    private final int numSubpartitions;
    private final BufferAccumulator bufferAccumulator;
    private final BufferCompressor bufferCompressor;
    private final List<TierProducerAgent> tierProducerAgents;
    private final int[] currentSubpartitionSegmentId;
    private final TierProducerAgent[] currentSubpartitionTierAgent;

    @Nullable
    private Consumer<TieredStorageProducerMetricUpdate> metricStatisticsUpdater;

    public TieredStorageProducerClient(int i, boolean z, BufferAccumulator bufferAccumulator, @Nullable BufferCompressor bufferCompressor, List<TierProducerAgent> list) {
        this.isBroadcastOnly = z;
        this.numSubpartitions = i;
        this.bufferAccumulator = bufferAccumulator;
        this.bufferCompressor = bufferCompressor;
        this.tierProducerAgents = list;
        this.currentSubpartitionSegmentId = new int[i];
        this.currentSubpartitionTierAgent = new TierProducerAgent[i];
        Arrays.fill(this.currentSubpartitionSegmentId, -1);
        bufferAccumulator.setup((v1, v2, v3) -> {
            writeAccumulatedBuffer(v1, v2, v3);
        });
    }

    public void write(ByteBuffer byteBuffer, TieredStorageSubpartitionId tieredStorageSubpartitionId, Buffer.DataType dataType, boolean z) throws IOException {
        if (!z || this.isBroadcastOnly) {
            this.bufferAccumulator.receive(byteBuffer, tieredStorageSubpartitionId, dataType, z);
            return;
        }
        for (int i = 0; i < this.numSubpartitions; i++) {
            this.bufferAccumulator.receive(byteBuffer.duplicate(), new TieredStorageSubpartitionId(i), dataType, z);
        }
    }

    public void setMetricStatisticsUpdater(Consumer<TieredStorageProducerMetricUpdate> consumer) {
        this.metricStatisticsUpdater = (Consumer) Preconditions.checkNotNull(consumer);
    }

    public void close() {
        this.bufferAccumulator.close();
        this.tierProducerAgents.forEach((v0) -> {
            v0.close();
        });
    }

    private void writeAccumulatedBuffer(TieredStorageSubpartitionId tieredStorageSubpartitionId, Buffer buffer, int i) {
        int readableBytes = buffer.readableBytes();
        try {
            if (this.currentSubpartitionTierAgent[tieredStorageSubpartitionId.getSubpartitionId()] == null) {
                chooseStorageTierToStartSegment(tieredStorageSubpartitionId, i + 1);
            }
            if (!this.currentSubpartitionTierAgent[tieredStorageSubpartitionId.getSubpartitionId()].tryWrite(tieredStorageSubpartitionId, buffer, this.bufferAccumulator, i)) {
                chooseStorageTierToStartSegment(tieredStorageSubpartitionId, i + 1);
                Preconditions.checkState(this.currentSubpartitionTierAgent[tieredStorageSubpartitionId.getSubpartitionId()].tryWrite(tieredStorageSubpartitionId, buffer, this.bufferAccumulator, i), "Failed to write the first buffer to the new segment");
            }
        } catch (IOException e) {
            buffer.recycleBuffer();
            ExceptionUtils.rethrow(e);
        }
        updateMetricStatistics(1, readableBytes);
    }

    private void chooseStorageTierToStartSegment(TieredStorageSubpartitionId tieredStorageSubpartitionId, int i) throws IOException {
        int subpartitionId = tieredStorageSubpartitionId.getSubpartitionId();
        int i2 = this.currentSubpartitionSegmentId[subpartitionId] + 1;
        for (TierProducerAgent tierProducerAgent : this.tierProducerAgents) {
            if (tierProducerAgent.tryStartNewSegment(tieredStorageSubpartitionId, i2, i)) {
                this.currentSubpartitionSegmentId[subpartitionId] = i2;
                this.currentSubpartitionTierAgent[subpartitionId] = tierProducerAgent;
                return;
            }
        }
        throw new IOException("Failed to choose a storage tier to start a new segment.");
    }

    private void updateMetricStatistics(int i, int i2) {
        ((Consumer) Preconditions.checkNotNull(this.metricStatisticsUpdater)).accept(new TieredStorageProducerMetricUpdate(i, i2));
    }
}
