package org.apache.flink.runtime.state.changelog;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader.class */
public class StateChangelogHandleStreamHandleReader implements StateChangelogHandleReader<ChangelogStateHandleStreamImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(StateChangelogHandleStreamHandleReader.class);
    private final StateChangeIterator changeIterator;

    /* loaded from: input_file:org/apache/flink/runtime/state/changelog/StateChangelogHandleStreamHandleReader$StateChangeIterator.class */
    public interface StateChangeIterator {
        CloseableIterator<StateChange> read(StreamStateHandle streamStateHandle, long j) throws IOException;
    }

    public StateChangelogHandleStreamHandleReader(StateChangeIterator stateChangeIterator) {
        this.changeIterator = stateChangeIterator;
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogHandleReader
    public CloseableIterator<StateChange> getChanges(final ChangelogStateHandleStreamImpl changelogStateHandleStreamImpl) throws IOException {
        return new CloseableIterator<StateChange>() { // from class: org.apache.flink.runtime.state.changelog.StateChangelogHandleStreamHandleReader.1
            private final Iterator<Tuple2<StreamStateHandle, Long>> handleIterator;
            private CloseableIterator<StateChange> current = CloseableIterator.empty();

            {
                this.handleIterator = changelogStateHandleStreamImpl.getHandlesAndOffsets().iterator();
            }

            public boolean hasNext() {
                advance();
                return this.current.hasNext();
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public StateChange m642next() {
                advance();
                return (StateChange) this.current.next();
            }

            private void advance() {
                while (!this.current.hasNext() && this.handleIterator.hasNext()) {
                    try {
                        this.current.close();
                        Tuple2<StreamStateHandle, Long> next = this.handleIterator.next();
                        StateChangelogHandleStreamHandleReader.LOG.debug("read at {} from {}", next.f1, next.f0);
                        this.current = StateChangelogHandleStreamHandleReader.this.changeIterator.read((StreamStateHandle) next.f0, ((Long) next.f1).longValue());
                    } catch (Exception e) {
                        ExceptionUtils.rethrow(e);
                    }
                }
            }

            public void close() throws Exception {
                this.current.close();
            }
        };
    }
}
