package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.io.InputStream;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.state.AbstractChannelStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImpl.class */
public class SequentialChannelStateReaderImpl implements SequentialChannelStateReader {
    private final TaskStateSnapshot taskStateSnapshot;
    private final ChannelStateSerializer serializer = new ChannelStateSerializerImpl();
    private final ChannelStateChunkReader chunkReader = new ChannelStateChunkReader(this.serializer);

    public SequentialChannelStateReaderImpl(TaskStateSnapshot taskStateSnapshot) {
        this.taskStateSnapshot = taskStateSnapshot;
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader
    public void readInputData(InputGate[] inputGateArr) throws IOException, InterruptedException {
        InputChannelRecoveredStateHandler inputChannelRecoveredStateHandler = new InputChannelRecoveredStateHandler(inputGateArr);
        Throwable th = null;
        try {
            try {
                read(inputChannelRecoveredStateHandler, groupByDelegate(streamSubtaskStates(), (v0) -> {
                    return v0.getInputChannelState();
                }));
                if (inputChannelRecoveredStateHandler != null) {
                    if (0 == 0) {
                        inputChannelRecoveredStateHandler.close();
                        return;
                    }
                    try {
                        inputChannelRecoveredStateHandler.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (inputChannelRecoveredStateHandler != null) {
                if (th != null) {
                    try {
                        inputChannelRecoveredStateHandler.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    inputChannelRecoveredStateHandler.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader
    public void readOutputData(ResultPartitionWriter[] resultPartitionWriterArr, boolean z) throws IOException, InterruptedException {
        ResultSubpartitionRecoveredStateHandler resultSubpartitionRecoveredStateHandler = new ResultSubpartitionRecoveredStateHandler(resultPartitionWriterArr, z);
        Throwable th = null;
        try {
            try {
                read(resultSubpartitionRecoveredStateHandler, groupByDelegate(streamSubtaskStates(), (v0) -> {
                    return v0.getResultSubpartitionState();
                }));
                if (resultSubpartitionRecoveredStateHandler != null) {
                    if (0 == 0) {
                        resultSubpartitionRecoveredStateHandler.close();
                        return;
                    }
                    try {
                        resultSubpartitionRecoveredStateHandler.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (resultSubpartitionRecoveredStateHandler != null) {
                if (th != null) {
                    try {
                        resultSubpartitionRecoveredStateHandler.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    resultSubpartitionRecoveredStateHandler.close();
                }
            }
            throw th4;
        }
    }

    private <Info, Context, Handle extends AbstractChannelStateHandle<Info>> void read(RecoveredChannelStateHandler<Info, Context> recoveredChannelStateHandler, Map<StreamStateHandle, List<Handle>> map) throws IOException, InterruptedException {
        for (Map.Entry<StreamStateHandle, List<Handle>> entry : map.entrySet()) {
            readSequentially(entry.getKey(), entry.getValue(), recoveredChannelStateHandler);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <Info, Context, Handle extends AbstractChannelStateHandle<Info>> void readSequentially(StreamStateHandle streamStateHandle, List<Handle> list, RecoveredChannelStateHandler<Info, Context> recoveredChannelStateHandler) throws IOException, InterruptedException {
        InputStream openInputStream = streamStateHandle.openInputStream();
        Throwable th = null;
        try {
            try {
                this.serializer.readHeader(openInputStream);
                for (Tuple2 tuple2 : extractOffsetsSorted(list)) {
                    this.chunkReader.readChunk(openInputStream, ((Long) tuple2.f0).longValue(), recoveredChannelStateHandler, tuple2.f1);
                }
                if (openInputStream != null) {
                    if (0 == 0) {
                        openInputStream.close();
                        return;
                    }
                    try {
                        openInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (openInputStream != null) {
                if (th != null) {
                    try {
                        openInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    openInputStream.close();
                }
            }
            throw th4;
        }
    }

    private Stream<OperatorSubtaskState> streamSubtaskStates() {
        return this.taskStateSnapshot.getSubtaskStateMappings().stream().map((v0) -> {
            return v0.getValue();
        });
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Map<StreamStateHandle, List<Handle>> groupByDelegate(Stream<OperatorSubtaskState> stream, Function<OperatorSubtaskState, StateObjectCollection<Handle>> function) {
        return (Map) stream.map(function).flatMap((v0) -> {
            return v0.stream();
        }).peek(validate()).collect(Collectors.groupingBy((v0) -> {
            return v0.getDelegate();
        }));
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Consumer<Handle> validate() {
        HashSet hashSet = new HashSet();
        return abstractChannelStateHandle -> {
            Preconditions.checkState(hashSet.add(abstractChannelStateHandle.getInfo()), "duplicate channel info: %s");
        };
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> List<Tuple2<Long, Info>> extractOffsetsSorted(List<Handle> list) {
        return (List) list.stream().flatMap(SequentialChannelStateReaderImpl::extractOffsets).sorted(Comparator.comparingLong(tuple2 -> {
            return ((Long) tuple2.f0).longValue();
        })).collect(Collectors.toList());
    }

    private static <Info, Handle extends AbstractChannelStateHandle<Info>> Stream<Tuple2<Long, Info>> extractOffsets(Handle handle) {
        return (Stream<Tuple2<Long, Info>>) handle.getOffsets().stream().map(l -> {
            return Tuple2.of(l, handle.getInfo());
        });
    }

    @Override // org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader, java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
