package org.apache.flink.runtime.io.network.api.reader;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.class */
abstract class AbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements ReaderBase {
    private final Map<InputChannelInfo, RecordDeserializer<T>> recordDeserializers;
    private RecordDeserializer<T> currentRecordDeserializer;
    private boolean finishedStateReading;
    private boolean requestedPartitions;
    private boolean isFinished;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRecordReader(InputGate inputGate, String[] strArr) {
        super(inputGate);
        this.recordDeserializers = (Map) inputGate.getChannelInfos().stream().collect(Collectors.toMap(Function.identity(), inputChannelInfo -> {
            return new SpillingAdaptiveSpanningRecordDeserializer(strArr);
        }));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean getNextRecord(T t) throws IOException, InterruptedException {
        if (!this.finishedStateReading) {
            this.inputGate.finishReadRecoveredState();
            this.finishedStateReading = true;
        }
        if (!this.requestedPartitions) {
            CompletableFuture<Void> stateConsumedFuture = this.inputGate.getStateConsumedFuture();
            while (!stateConsumedFuture.isDone()) {
                Preconditions.checkState(!this.inputGate.pollNext().isPresent());
            }
            this.inputGate.setChannelStateWriter(ChannelStateWriter.NO_OP);
            this.inputGate.requestPartitions();
            this.requestedPartitions = true;
        }
        if (this.isFinished) {
            return false;
        }
        while (true) {
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult nextRecord = this.currentRecordDeserializer.getNextRecord(t);
                if (nextRecord.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (nextRecord.isFullRecord()) {
                    return true;
                }
            }
            BufferOrEvent orElseThrow = this.inputGate.getNext().orElseThrow(IllegalStateException::new);
            if (orElseThrow.isBuffer()) {
                this.currentRecordDeserializer = this.recordDeserializers.get(orElseThrow.getChannelInfo());
                this.currentRecordDeserializer.setNextBuffer(orElseThrow.getBuffer());
            } else {
                if (this.recordDeserializers.get(orElseThrow.getChannelInfo()).hasUnfinishedData()) {
                    throw new IOException("Received an event in channel " + orElseThrow.getChannelInfo() + " while still having data from a record. This indicates broken serialization logic. If you are using custom serialization code (Writable or Value types), check their serialization routines. In the case of Kryo, check the respective Kryo serializer.");
                }
                if (!handleEvent(orElseThrow.getEvent())) {
                    continue;
                } else {
                    if (this.inputGate.isFinished()) {
                        this.isFinished = true;
                        return false;
                    }
                    if (hasReachedEndOfSuperstep()) {
                        return false;
                    }
                }
            }
        }
    }

    public void clearBuffers() {
        for (RecordDeserializer<T> recordDeserializer : this.recordDeserializers.values()) {
            Buffer currentBuffer = recordDeserializer.getCurrentBuffer();
            if (currentBuffer != null && !currentBuffer.isRecycled()) {
                currentBuffer.recycleBuffer();
            }
            recordDeserializer.clear();
        }
    }
}
