package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EventAnnouncement;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister.class */
public final class ChannelStatePersister {
    private static final Logger LOG = LoggerFactory.getLogger(ChannelStatePersister.class);
    private final InputChannelInfo channelInfo;
    private CheckpointStatus checkpointStatus = CheckpointStatus.COMPLETED;
    private long lastSeenBarrier = -1;
    private final ChannelStateWriter channelStateWriter;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/ChannelStatePersister$CheckpointStatus.class */
    public enum CheckpointStatus {
        COMPLETED,
        BARRIER_PENDING,
        BARRIER_RECEIVED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelStatePersister(ChannelStateWriter channelStateWriter, InputChannelInfo inputChannelInfo) {
        this.channelStateWriter = (ChannelStateWriter) Preconditions.checkNotNull(channelStateWriter);
        this.channelInfo = (InputChannelInfo) Preconditions.checkNotNull(inputChannelInfo);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void startPersisting(long j, List<Buffer> list) {
        logEvent("startPersisting", j);
        if (this.checkpointStatus != CheckpointStatus.BARRIER_RECEIVED && this.lastSeenBarrier < j) {
            this.checkpointStatus = CheckpointStatus.BARRIER_PENDING;
            this.lastSeenBarrier = j;
        } else if (this.checkpointStatus == CheckpointStatus.BARRIER_RECEIVED) {
            Preconditions.checkState(this.lastSeenBarrier >= j, "Internal error, #stopPersisting for last checkpoint has not been called.");
        }
        if (list.size() > 0) {
            this.channelStateWriter.addInputData(j, this.channelInfo, -2, CloseableIterator.fromList(list, (v0) -> {
                v0.recycleBuffer();
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void stopPersisting(long j) {
        logEvent("stopPersisting", j);
        if (j >= this.lastSeenBarrier) {
            this.checkpointStatus = CheckpointStatus.COMPLETED;
            this.lastSeenBarrier = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void maybePersist(Buffer buffer) {
        if (this.checkpointStatus == CheckpointStatus.BARRIER_PENDING && buffer.isBuffer()) {
            this.channelStateWriter.addInputData(this.lastSeenBarrier, this.channelInfo, -2, CloseableIterator.ofElement(buffer.retainBuffer(), (v0) -> {
                v0.recycleBuffer();
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<Long> checkForBarrier(Buffer buffer) throws IOException {
        AbstractEvent parseEvent = parseEvent(buffer);
        if (parseEvent instanceof CheckpointBarrier) {
            long id = ((CheckpointBarrier) parseEvent).getId();
            if (id >= (this.checkpointStatus == CheckpointStatus.COMPLETED ? this.lastSeenBarrier + 1 : this.lastSeenBarrier)) {
                logEvent("found barrier", id);
                this.checkpointStatus = CheckpointStatus.BARRIER_RECEIVED;
                this.lastSeenBarrier = id;
                return Optional.of(Long.valueOf(this.lastSeenBarrier));
            }
            logEvent("ignoring barrier", id);
        }
        if (parseEvent instanceof EventAnnouncement) {
            EventAnnouncement eventAnnouncement = (EventAnnouncement) parseEvent;
            if (eventAnnouncement.getAnnouncedEvent() instanceof CheckpointBarrier) {
                long id2 = ((CheckpointBarrier) eventAnnouncement.getAnnouncedEvent()).getId();
                logEvent("found announcement for barrier", id2);
                return Optional.of(Long.valueOf(id2));
            }
        }
        return Optional.empty();
    }

    private void logEvent(String str, long j) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("{} {}, lastSeenBarrier = {} ({}) @ {}", new Object[]{str, Long.valueOf(j), Long.valueOf(this.lastSeenBarrier), this.checkpointStatus, this.channelInfo});
        }
    }

    @Nullable
    protected AbstractEvent parseEvent(Buffer buffer) throws IOException {
        if (buffer.isBuffer()) {
            return null;
        }
        AbstractEvent fromBuffer = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
        buffer.setReaderIndex(0);
        return fromBuffer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasBarrierReceived() {
        return this.checkpointStatus == CheckpointStatus.BARRIER_RECEIVED;
    }

    public String toString() {
        return "ChannelStatePersister(lastSeenBarrier=" + this.lastSeenBarrier + " (" + this.checkpointStatus + ")}";
    }
}
