package org.apache.flink.migration.runtime.checkpoint.savepoint;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.migration.runtime.checkpoint.KeyGroupState;
import org.apache.flink.migration.runtime.checkpoint.SubtaskState;
import org.apache.flink.migration.runtime.checkpoint.TaskState;
import org.apache.flink.migration.runtime.state.AbstractStateBackend;
import org.apache.flink.migration.runtime.state.KvStateSnapshot;
import org.apache.flink.migration.runtime.state.StateHandle;
import org.apache.flink.migration.runtime.state.filesystem.AbstractFileStateHandle;
import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle;
import org.apache.flink.migration.state.MigrationKeyGroupStateHandle;
import org.apache.flink.migration.state.MigrationStreamStateHandle;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList;
import org.apache.flink.migration.util.SerializedValue;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.MultiStreamStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/migration/runtime/checkpoint/savepoint/SavepointV0Serializer.class */
public class SavepointV0Serializer implements SavepointSerializer<SavepointV2> {
    public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
    private static final StreamStateHandle SIGNAL_0 = new ByteStreamStateHandle("SIGNAL_0", new byte[]{0});
    private static final StreamStateHandle SIGNAL_1 = new ByteStreamStateHandle("SIGNAL_1", new byte[]{1});
    private static final int MAX_SIZE = 4194304;

    private SavepointV0Serializer() {
    }

    @Override // org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer
    public void serialize(SavepointV2 savepointV2, DataOutputStream dataOutputStream) throws IOException {
        throw new UnsupportedOperationException("This serializer is read-only and only exists for backwards compatibility");
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer
    public SavepointV2 deserialize(DataInputStream dataInputStream, ClassLoader classLoader) throws IOException {
        long readLong = dataInputStream.readLong();
        int readInt = dataInputStream.readInt();
        ArrayList arrayList = new ArrayList(readInt);
        for (int i = 0; i < readInt; i++) {
            TaskState taskState = new TaskState(new JobVertexID(dataInputStream.readLong(), dataInputStream.readLong()), dataInputStream.readInt());
            arrayList.add(taskState);
            int readInt2 = dataInputStream.readInt();
            for (int i2 = 0; i2 < readInt2; i2++) {
                taskState.putState(dataInputStream.readInt(), new SubtaskState(readSerializedValueStateHandle(dataInputStream), dataInputStream.readLong(), dataInputStream.readLong()));
            }
            int readInt3 = dataInputStream.readInt();
            for (int i3 = 0; i3 < readInt3; i3++) {
                taskState.putKvState(dataInputStream.readInt(), new KeyGroupState(readSerializedValueStateHandle(dataInputStream), dataInputStream.readLong(), dataInputStream.readLong()));
            }
        }
        try {
            return convertSavepoint(arrayList, classLoader, readLong);
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private static SerializedValue<StateHandle<?>> readSerializedValueStateHandle(DataInputStream dataInputStream) throws IOException {
        SerializedValue<StateHandle<?>> fromBytes;
        int readInt = dataInputStream.readInt();
        if (readInt == -1) {
            fromBytes = new SerializedValue<>((Object) null);
        } else {
            byte[] bArr = new byte[readInt];
            dataInputStream.readFully(bArr, 0, readInt);
            fromBytes = SerializedValue.fromBytes(bArr);
        }
        return fromBytes;
    }

    private SavepointV2 convertSavepoint(List<TaskState> list, ClassLoader classLoader, long j) throws Exception {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<TaskState> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(convertTaskState(it.next(), classLoader, j));
        }
        return new SavepointV2(j, arrayList);
    }

    private org.apache.flink.runtime.checkpoint.TaskState convertTaskState(TaskState taskState, ClassLoader classLoader, long j) throws Exception {
        JobVertexID jobVertexID = taskState.getJobVertexID();
        int parallelism = taskState.getParallelism();
        int determineOperatorChainLength = determineOperatorChainLength(taskState, classLoader);
        org.apache.flink.runtime.checkpoint.TaskState taskState2 = new org.apache.flink.runtime.checkpoint.TaskState(jobVertexID, parallelism, parallelism, determineOperatorChainLength);
        if (determineOperatorChainLength > 0) {
            for (Map.Entry<Integer, SubtaskState> entry : taskState.getSubtaskStatesById().entrySet()) {
                int intValue = entry.getKey().intValue();
                taskState2.putState(intValue, convertSubtaskState(entry.getValue(), intValue, classLoader, j));
            }
        }
        return taskState2;
    }

    private org.apache.flink.runtime.checkpoint.SubtaskState convertSubtaskState(SubtaskState subtaskState, int i, ClassLoader classLoader, long j) throws Exception {
        StreamTaskState[] state = ((StreamTaskStateList) subtaskState.getState().deserializeValue(classLoader)).getState(classLoader);
        List asList = Arrays.asList(new StreamStateHandle[state.length]);
        KeyGroupsStateHandle keyGroupsStateHandle = null;
        for (int i2 = 0; i2 < state.length; i2++) {
            StreamTaskState streamTaskState = state[i2];
            if (streamTaskState != null) {
                asList.set(i2, convertOperatorAndFunctionState(streamTaskState));
                HashMap<String, KvStateSnapshot<?, ?, ?, ?>> kvStates = streamTaskState.getKvStates();
                if (null != kvStates) {
                    Preconditions.checkState(null == keyGroupsStateHandle, "Found more than one keyed state in chain");
                    keyGroupsStateHandle = convertKeyedBackendState(kvStates, i, j);
                }
            }
        }
        ChainedStateHandle chainedStateHandle = new ChainedStateHandle(asList);
        ChainedStateHandle chainedStateHandle2 = new ChainedStateHandle(Arrays.asList(new OperatorStateHandle[chainedStateHandle.getLength()]));
        return new org.apache.flink.runtime.checkpoint.SubtaskState(chainedStateHandle, chainedStateHandle2, chainedStateHandle2, keyGroupsStateHandle, null);
    }

    public static StreamStateHandle convertOperatorAndFunctionState(StreamTaskState streamTaskState) throws Exception {
        ArrayList arrayList = new ArrayList(4);
        StateHandle<Serializable> functionState = streamTaskState.getFunctionState();
        StateHandle<?> operatorState = streamTaskState.getOperatorState();
        if (null != functionState) {
            arrayList.add(SIGNAL_1);
            arrayList.add(convertStateHandle(functionState));
        } else {
            arrayList.add(SIGNAL_0);
        }
        if (null != operatorState) {
            arrayList.add(convertStateHandle(operatorState));
        }
        return new MigrationStreamStateHandle(new MultiStreamStateHandle(arrayList));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r15v0, types: [java.io.OutputStream, org.apache.flink.runtime.state.CheckpointStreamFactory$CheckpointStateOutputStream] */
    /* JADX WARN: Type inference failed for: r15v1 */
    /* JADX WARN: Type inference failed for: r15v2 */
    public static KeyGroupsStateHandle convertKeyedBackendState(HashMap<String, KvStateSnapshot<?, ?, ?, ?>> hashMap, int i, long j) throws Exception {
        if (null == hashMap) {
            return null;
        }
        AutoCloseable createCheckpointStateOutputStream = new MemCheckpointStreamFactory(MAX_SIZE).createCheckpointStateOutputStream(j, 0L);
        try {
            long pos = createCheckpointStateOutputStream.getPos();
            InstantiationUtil.serializeObject((OutputStream) createCheckpointStateOutputStream, hashMap);
            StreamStateHandle closeAndGetHandle = createCheckpointStateOutputStream.closeAndGetHandle();
            createCheckpointStateOutputStream = 0;
            if (null == closeAndGetHandle) {
                IOUtils.closeQuietly((AutoCloseable) null);
                return null;
            }
            MigrationKeyGroupStateHandle migrationKeyGroupStateHandle = new MigrationKeyGroupStateHandle(new KeyGroupRangeOffsets(i, i, new long[]{pos}), closeAndGetHandle);
            IOUtils.closeQuietly((AutoCloseable) null);
            return migrationKeyGroupStateHandle;
        } catch (Throwable th) {
            IOUtils.closeQuietly(createCheckpointStateOutputStream);
            throw th;
        }
    }

    private int determineOperatorChainLength(TaskState taskState, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        Collection<SubtaskState> states = taskState.getStates();
        if (states == null || states.isEmpty()) {
            return 0;
        }
        Object deserializeValue = states.iterator().next().getState().deserializeValue(classLoader);
        if (deserializeValue instanceof StreamTaskStateList) {
            return ((StreamTaskStateList) deserializeValue).getState(classLoader).length;
        }
        return 0;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static StreamStateHandle convertStateHandle(StateHandle<?> stateHandle) throws Exception {
        if (stateHandle instanceof AbstractFileStateHandle) {
            return new FileStateHandle(((AbstractFileStateHandle) stateHandle).getFilePath(), stateHandle.getStateSize());
        }
        if (stateHandle instanceof SerializedStateHandle) {
            byte[] serializedData = ((SerializedStateHandle) stateHandle).getSerializedData();
            return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(serializedData)), serializedData);
        }
        if (stateHandle instanceof org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) {
            byte[] data = ((org.apache.flink.migration.runtime.state.memory.ByteStreamStateHandle) stateHandle).getData();
            return new ByteStreamStateHandle(String.valueOf(System.identityHashCode(data)), data);
        }
        if (stateHandle instanceof AbstractStateBackend.DataInputViewHandle) {
            return convertStateHandle(((AbstractStateBackend.DataInputViewHandle) stateHandle).getStreamStateHandle());
        }
        throw new IllegalArgumentException("Unknown state handle type: " + stateHandle);
    }

    @VisibleForTesting
    public void serializeOld(SavepointV0 savepointV0, DataOutputStream dataOutputStream) throws IOException {
        dataOutputStream.writeLong(savepointV0.getCheckpointId());
        dataOutputStream.writeInt(savepointV0.getOldTaskStates().size());
        for (TaskState taskState : savepointV0.getOldTaskStates()) {
            dataOutputStream.writeLong(taskState.getJobVertexID().getLowerPart());
            dataOutputStream.writeLong(taskState.getJobVertexID().getUpperPart());
            int parallelism = taskState.getParallelism();
            dataOutputStream.writeInt(parallelism);
            dataOutputStream.writeInt(taskState.getNumberCollectedStates());
            for (int i = 0; i < parallelism; i++) {
                SubtaskState state = taskState.getState(i);
                if (state != null) {
                    dataOutputStream.writeInt(i);
                    SerializedValue<StateHandle<?>> state2 = state.getState();
                    if (state2 == null) {
                        dataOutputStream.writeInt(-1);
                    } else {
                        byte[] byteArray = state2.getByteArray();
                        dataOutputStream.writeInt(byteArray.length);
                        dataOutputStream.write(byteArray, 0, byteArray.length);
                    }
                    dataOutputStream.writeLong(state.getStateSize());
                    dataOutputStream.writeLong(state.getDuration());
                }
            }
            dataOutputStream.writeInt(taskState.getNumberCollectedKvStates());
            for (int i2 = 0; i2 < parallelism; i2++) {
                KeyGroupState kvState = taskState.getKvState(i2);
                if (kvState != null) {
                    dataOutputStream.write(i2);
                    SerializedValue<StateHandle<?>> keyGroupState = kvState.getKeyGroupState();
                    if (keyGroupState == null) {
                        dataOutputStream.writeInt(-1);
                    } else {
                        byte[] byteArray2 = keyGroupState.getByteArray();
                        dataOutputStream.writeInt(byteArray2.length);
                        dataOutputStream.write(byteArray2, 0, byteArray2.length);
                    }
                    dataOutputStream.writeLong(kvState.getStateSize());
                    dataOutputStream.writeLong(kvState.getDuration());
                }
            }
        }
    }
}
