package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/AbstractBufferStorage.class */
public abstract class AbstractBufferStorage implements BufferStorage {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractBufferStorage.class);
    protected final ArrayDeque<BufferOrEventSequence> queuedBuffered = new ArrayDeque<>();
    protected final long maxBufferedBytes;
    protected final String taskName;
    protected BufferOrEventSequence currentBuffered;
    protected long rolledBytes;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractBufferStorage(long j, String str) {
        Preconditions.checkArgument(j == -1 || j > 0);
        this.maxBufferedBytes = j;
        this.taskName = str;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public boolean isFull() {
        return this.maxBufferedBytes > 0 && getRolledBytes() + getPendingBytes() > this.maxBufferedBytes;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public void rollOver() throws IOException {
        if (this.currentBuffered == null) {
            this.currentBuffered = rollOverReusingResources();
            if (this.currentBuffered != null) {
                this.currentBuffered.open();
            }
        } else {
            LOG.debug("{}: Checkpoint skipped via buffered data:Pushing back current alignment buffers and feeding back new alignment data first.", this.taskName);
            BufferOrEventSequence rollOverWithoutReusingResources = rollOverWithoutReusingResources();
            if (rollOverWithoutReusingResources != null) {
                rollOverWithoutReusingResources.open();
                this.queuedBuffered.addFirst(this.currentBuffered);
                this.rolledBytes += this.currentBuffered.size();
                this.currentBuffered = rollOverWithoutReusingResources;
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Size of buffered data: {} bytes", this.taskName, Long.valueOf(this.currentBuffered == null ? 0L : this.currentBuffered.size()));
        }
    }

    protected abstract BufferOrEventSequence rollOverWithoutReusingResources() throws IOException;

    protected abstract BufferOrEventSequence rollOverReusingResources() throws IOException;

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.currentBuffered != null) {
            this.currentBuffered.cleanup();
        }
        Iterator<BufferOrEventSequence> it = this.queuedBuffered.iterator();
        while (it.hasNext()) {
            it.next().cleanup();
        }
        this.queuedBuffered.clear();
        this.rolledBytes = 0L;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public long getRolledBytes() {
        return this.rolledBytes;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public boolean isEmpty() {
        return this.currentBuffered == null;
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public Optional<BufferOrEvent> pollNext() throws IOException {
        if (this.currentBuffered == null) {
            return Optional.empty();
        }
        Optional<BufferOrEvent> ofNullable = Optional.ofNullable(this.currentBuffered.getNext());
        if (!ofNullable.isPresent()) {
            completeBufferedSequence();
        }
        return ofNullable;
    }

    protected void completeBufferedSequence() throws IOException {
        LOG.debug("{}: Finished feeding back buffered data.", this.taskName);
        this.currentBuffered.cleanup();
        this.currentBuffered = this.queuedBuffered.pollFirst();
        if (this.currentBuffered != null) {
            this.currentBuffered.open();
            this.rolledBytes -= this.currentBuffered.size();
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.BufferStorage
    public long getMaxBufferedBytes() {
        return this.maxBufferedBytes;
    }
}
