package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest.class */
class StateAssignmentOperationTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private static final int MAX_P = 256;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/StateAssignmentOperationTest$OperatorIdWithParallelism.class */
    public static class OperatorIdWithParallelism {
        private final OperatorID operatorID;
        private final int parallelism;

        public OperatorID getOperatorID() {
            return this.operatorID;
        }

        public int getParallelism() {
            return this.parallelism;
        }

        public OperatorIdWithParallelism(OperatorID operatorID, int i) {
            this.operatorID = operatorID;
            this.parallelism = i;
        }
    }

    StateAssignmentOperationTest() {
    }

    @Test
    void testRepartitionSplitDistributeStates() {
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
        HashMap hashMap = new HashMap(1);
        hashMap.put("t-1", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle("test1", new byte[30]))).build());
        HashMap hashMap2 = new HashMap(1);
        hashMap2.put("t-2", new OperatorStateHandle.StateMetaInfo(new long[]{0, 15}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
        operatorState.putState(1, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap2, new ByteStreamStateHandle("test2", new byte[40]))).build());
        verifyOneKindPartitionableStateRescale(operatorState, operatorID);
    }

    @Test
    void testRepartitionUnionState() {
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
        HashMap hashMap = new HashMap(2);
        hashMap.put("t-3", new OperatorStateHandle.StateMetaInfo(new long[]{0}, OperatorStateHandle.Mode.UNION));
        hashMap.put("t-4", new OperatorStateHandle.StateMetaInfo(new long[]{22, 44}, OperatorStateHandle.Mode.UNION));
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle("test1", new byte[50]))).build());
        HashMap hashMap2 = new HashMap(1);
        hashMap2.put("t-3", new OperatorStateHandle.StateMetaInfo(new long[]{0}, OperatorStateHandle.Mode.UNION));
        operatorState.putState(1, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap2, new ByteStreamStateHandle("test2", new byte[20]))).build());
        verifyOneKindPartitionableStateRescale(operatorState, operatorID);
    }

    @Test
    void testRepartitionBroadcastState() {
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
        HashMap hashMap = new HashMap(2);
        hashMap.put("t-5", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.BROADCAST));
        hashMap.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.BROADCAST));
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle("test1", new byte[60]))).build());
        HashMap hashMap2 = new HashMap(2);
        hashMap2.put("t-5", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.BROADCAST));
        hashMap2.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.BROADCAST));
        operatorState.putState(1, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap2, new ByteStreamStateHandle("test2", new byte[60]))).build());
        verifyOneKindPartitionableStateRescale(operatorState, operatorID);
    }

    @Test
    void testRepartitionBroadcastStateWithNullSubtaskState() {
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
        HashMap hashMap = new HashMap(2);
        hashMap.put("t-5", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.BROADCAST));
        hashMap.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.BROADCAST));
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle("test1", new byte[60]))).build());
        verifyOneKindPartitionableStateRescale(operatorState, operatorID);
    }

    @Test
    void testRepartitionBroadcastStateWithEmptySubtaskState() {
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
        HashMap hashMap = new HashMap(2);
        hashMap.put("t-5", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.BROADCAST));
        hashMap.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.BROADCAST));
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle("test1", new byte[60]))).build());
        operatorState.putState(1, OperatorSubtaskState.builder().build());
        verifyOneKindPartitionableStateRescale(operatorState, operatorID);
    }

    @Test
    void testReDistributeCombinedPartitionableStates() {
        OperatorID operatorID = new OperatorID();
        OperatorState operatorState = new OperatorState(operatorID, 2, 4);
        HashMap hashMap = new HashMap(6);
        hashMap.put("t-1", new OperatorStateHandle.StateMetaInfo(new long[]{0}, OperatorStateHandle.Mode.UNION));
        hashMap.put("t-2", new OperatorStateHandle.StateMetaInfo(new long[]{22, 44}, OperatorStateHandle.Mode.UNION));
        hashMap.put("t-3", new OperatorStateHandle.StateMetaInfo(new long[]{52, 63}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
        hashMap.put("t-4", new OperatorStateHandle.StateMetaInfo(new long[]{67, 74, 75}, OperatorStateHandle.Mode.BROADCAST));
        hashMap.put("t-5", new OperatorStateHandle.StateMetaInfo(new long[]{77, 88, 92}, OperatorStateHandle.Mode.BROADCAST));
        hashMap.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{101, 123, 127}, OperatorStateHandle.Mode.BROADCAST));
        operatorState.putState(0, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap, new ByteStreamStateHandle("test1", new byte[130]))).build());
        HashMap hashMap2 = new HashMap(3);
        hashMap2.put("t-1", new OperatorStateHandle.StateMetaInfo(new long[]{0}, OperatorStateHandle.Mode.UNION));
        hashMap2.put("t-4", new OperatorStateHandle.StateMetaInfo(new long[]{20, 27, 28}, OperatorStateHandle.Mode.BROADCAST));
        hashMap2.put("t-5", new OperatorStateHandle.StateMetaInfo(new long[]{30, 44, 48}, OperatorStateHandle.Mode.BROADCAST));
        hashMap2.put("t-6", new OperatorStateHandle.StateMetaInfo(new long[]{57, 79, 83}, OperatorStateHandle.Mode.BROADCAST));
        operatorState.putState(1, OperatorSubtaskState.builder().setManagedOperatorState(new OperatorStreamStateHandle(hashMap2, new ByteStreamStateHandle("test2", new byte[86]))).build());
        verifyCombinedPartitionableStateRescale(operatorState, operatorID, 2, 3);
        verifyCombinedPartitionableStateRescale(operatorState, operatorID, 2, 1);
        verifyCombinedPartitionableStateRescale(operatorState, operatorID, 2, 2);
    }

    private void verifyAndCollectStateInfo(OperatorState operatorState, OperatorID operatorID, int i, int i2, Map<String, Integer> map) {
        HashMap hashMap = new HashMap();
        StateAssignmentOperation.reDistributePartitionableStates(Collections.singletonMap(operatorID, operatorState), i2, (v0) -> {
            return v0.getManagedOperatorState();
        }, RoundRobinOperatorStateRepartitioner.INSTANCE, hashMap);
        for (List list : hashMap.values()) {
            EnumMap enumMap = new EnumMap(OperatorStateHandle.Mode.class);
            for (OperatorStateHandle.Mode mode : OperatorStateHandle.Mode.values()) {
                enumMap.put((EnumMap) mode, (OperatorStateHandle.Mode) new HashMap());
            }
            Iterator it = list.iterator();
            while (it.hasNext()) {
                for (Map.Entry entry : ((OperatorStateHandle) it.next()).getStateNameToPartitionOffsets().entrySet()) {
                    String str = (String) entry.getKey();
                    map.merge(str, 1, (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    });
                    OperatorStateHandle.StateMetaInfo stateMetaInfo = (OperatorStateHandle.StateMetaInfo) entry.getValue();
                    ((Map) enumMap.get(stateMetaInfo.getDistributionMode())).merge(str, Integer.valueOf(stateMetaInfo.getOffsets().length), (v0, v1) -> {
                        return Integer.sum(v0, v1);
                    });
                }
            }
            for (Map.Entry entry2 : enumMap.entrySet()) {
                OperatorStateHandle.Mode mode2 = (OperatorStateHandle.Mode) entry2.getKey();
                Map map2 = (Map) entry2.getValue();
                if (OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.equals(mode2)) {
                    if (i < i2) {
                        map2.values().forEach(num -> {
                            Assertions.assertThat(num.intValue()).isOne();
                        });
                    } else {
                        map2.values().forEach(num2 -> {
                            Assertions.assertThat(num2.intValue()).isEqualTo(2);
                        });
                    }
                } else if (OperatorStateHandle.Mode.UNION.equals(mode2)) {
                    map2.values().forEach(num3 -> {
                        Assertions.assertThat(num3.intValue()).isEqualTo(2);
                    });
                } else {
                    map2.values().forEach(num4 -> {
                        Assertions.assertThat(num4.intValue()).isEqualTo(3);
                    });
                }
            }
        }
    }

    private void verifyOneKindPartitionableStateRescale(OperatorState operatorState, OperatorID operatorID) {
        verifyOneKindPartitionableStateRescale(operatorState, operatorID, 2, 3);
        verifyOneKindPartitionableStateRescale(operatorState, operatorID, 2, 1);
        verifyOneKindPartitionableStateRescale(operatorState, operatorID, 2, 2);
    }

    private void verifyOneKindPartitionableStateRescale(OperatorState operatorState, OperatorID operatorID, int i, int i2) {
        HashMap hashMap = new HashMap();
        verifyAndCollectStateInfo(operatorState, operatorID, i, i2, hashMap);
        Assertions.assertThat(hashMap).hasSize(2);
        if (hashMap.containsKey("t-1")) {
            if (i < i2) {
                Assertions.assertThat(hashMap.get("t-1").intValue()).isEqualTo(2);
                Assertions.assertThat(hashMap.get("t-2").intValue()).isEqualTo(2);
            } else {
                Assertions.assertThat(hashMap.get("t-1").intValue()).isOne();
                Assertions.assertThat(hashMap.get("t-2").intValue()).isOne();
            }
        }
        if (hashMap.containsKey("t-3")) {
            Assertions.assertThat(hashMap.get("t-3").intValue()).isEqualTo(2 * i2);
            Assertions.assertThat(hashMap.get("t-4").intValue()).isEqualTo(i2);
        }
        if (hashMap.containsKey("t-5")) {
            Assertions.assertThat(hashMap.get("t-5").intValue()).isEqualTo(i2);
            Assertions.assertThat(hashMap.get("t-6").intValue()).isEqualTo(i2);
        }
    }

    private void verifyCombinedPartitionableStateRescale(OperatorState operatorState, OperatorID operatorID, int i, int i2) {
        HashMap hashMap = new HashMap();
        verifyAndCollectStateInfo(operatorState, operatorID, i, i2, hashMap);
        Assertions.assertThat(hashMap.size()).isEqualTo(6);
        Assertions.assertThat(hashMap.get("t-1").intValue()).isEqualTo(2 * i2);
        Assertions.assertThat(hashMap.get("t-2").intValue()).isEqualTo(i2);
        if (i < i2) {
            Assertions.assertThat(hashMap.get("t-3").intValue()).isEqualTo(2);
        } else {
            Assertions.assertThat(hashMap.get("t-3").intValue()).isOne();
        }
        Assertions.assertThat(hashMap.get("t-4").intValue()).isEqualTo(i2);
        Assertions.assertThat(hashMap.get("t-5").intValue()).isEqualTo(i2);
        Assertions.assertThat(hashMap.get("t-6").intValue()).isEqualTo(i2);
    }

    @Test
    void testChannelStateAssignmentStability() throws JobException, JobExecutionException {
        List<OperatorID> buildOperatorIds = buildOperatorIds(10);
        Map<OperatorID, ExecutionJobVertex> buildVertices = buildVertices(buildOperatorIds, 100, SubtaskStateMapper.RANGE, SubtaskStateMapper.ROUND_ROBIN);
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(buildOperatorIds, 100);
        new StateAssignmentOperation(0L, new HashSet(buildVertices.values()), buildOperatorStates, false).assignStates();
        for (OperatorID operatorID : buildOperatorIds) {
            for (int i = 0; i < 100; i++) {
                Assertions.assertThat(getAssignedState(buildVertices.get(operatorID), operatorID, i)).isEqualTo(buildOperatorStates.get(operatorID).getState(i));
            }
        }
    }

    /* JADX WARN: Type inference failed for: r8v13, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r8v18, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r8v3, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r8v8, types: [int[], int[][]] */
    @Test
    void testChannelStateAssignmentDownscalingTwoDifferentGates() throws JobException, JobExecutionException {
        JobVertex createJobVertex = createJobVertex(new OperatorID(), 2);
        JobVertex createJobVertex2 = createJobVertex(new OperatorID(), 2);
        JobVertex createJobVertex3 = createJobVertex(new OperatorID(), 2);
        List<OperatorID> list = (List) Stream.of((Object[]) new JobVertex[]{createJobVertex, createJobVertex2, createJobVertex3}).map(jobVertex -> {
            return ((OperatorIDPair) jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID();
        }).collect(Collectors.toList());
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(list, 3);
        connectVertices(createJobVertex, createJobVertex3, SubtaskStateMapper.ARBITRARY, SubtaskStateMapper.RANGE);
        connectVertices(createJobVertex2, createJobVertex3, SubtaskStateMapper.ROUND_ROBIN, SubtaskStateMapper.ROUND_ROBIN);
        Map<OperatorID, ExecutionJobVertex> executionVertices = toExecutionVertices(createJobVertex, createJobVertex2, createJobVertex3);
        new StateAssignmentOperation(0L, new HashSet(executionVertices.values()), buildOperatorStates, false).assignStates();
        Assertions.assertThat(getAssignedState(executionVertices.get(list.get(2)), list.get(2), 0).getInputRescalingDescriptor()).isEqualTo(new InflightDataRescalingDescriptor((InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor[]) InflightDataRescalingDescriptorUtil.array(gate(InflightDataRescalingDescriptorUtil.to(0, 1), InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.to(1)}), InflightDataRescalingDescriptorUtil.set(1), InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType.RESCALING), gate(InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.to(1)}), Collections.emptySet(), InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType.RESCALING))));
        Assertions.assertThat(getAssignedState(executionVertices.get(list.get(2)), list.get(2), 0).getInputRescalingDescriptor()).isEqualTo(new InflightDataRescalingDescriptor((InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor[]) InflightDataRescalingDescriptorUtil.array(gate(InflightDataRescalingDescriptorUtil.to(0, 1), InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.to(1)}), InflightDataRescalingDescriptorUtil.set(1), InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType.RESCALING), gate(InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.to(1)}), Collections.emptySet(), InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType.RESCALING))));
    }

    private InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor gate(int[] iArr, RescaleMappings rescaleMappings, Set<Integer> set, InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor.MappingType mappingType) {
        return new InflightDataRescalingDescriptor.InflightDataGateOrPartitionRescalingDescriptor(iArr, rescaleMappings, set, mappingType);
    }

    @Test
    public void testChannelStateAssignmentTwoGatesPartiallyDownscaling() throws JobException, JobExecutionException {
        JobVertex createJobVertex = createJobVertex(new OperatorID(), 2);
        JobVertex createJobVertex2 = createJobVertex(new OperatorID(), 2);
        JobVertex createJobVertex3 = createJobVertex(new OperatorID(), 3);
        List<OperatorID> list = (List) Stream.of((Object[]) new JobVertex[]{createJobVertex, createJobVertex2, createJobVertex3}).map(jobVertex -> {
            return ((OperatorIDPair) jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID();
        }).collect(Collectors.toList());
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(list, 3);
        connectVertices(createJobVertex, createJobVertex3, SubtaskStateMapper.ARBITRARY, SubtaskStateMapper.FULL);
        connectVertices(createJobVertex2, createJobVertex3, SubtaskStateMapper.ROUND_ROBIN, SubtaskStateMapper.ROUND_ROBIN);
        Map<OperatorID, ExecutionJobVertex> executionVertices = toExecutionVertices(createJobVertex, createJobVertex2, createJobVertex3);
        new StateAssignmentOperation(0L, new HashSet(executionVertices.values()), buildOperatorStates, false).assignStates();
        Assertions.assertThat(getAssignedState(executionVertices.get(list.get(2)), list.get(2), 0).getInputChannelState().size()).isEqualTo(6);
        Assertions.assertThat(getAssignedState(executionVertices.get(list.get(2)), list.get(2), 1).getInputChannelState().size()).isEqualTo(6);
        Assertions.assertThat(getAssignedState(executionVertices.get(list.get(2)), list.get(2), 2).getInputChannelState().size()).isEqualTo(6);
    }

    /* JADX WARN: Type inference failed for: r5v10, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v14, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v4, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v7, types: [int[], int[][]] */
    @Test
    void testChannelStateAssignmentDownscaling() throws JobException, JobExecutionException {
        List<OperatorID> buildOperatorIds = buildOperatorIds(2);
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(buildOperatorIds, 3);
        Map<OperatorID, ExecutionJobVertex> buildVertices = buildVertices(buildOperatorIds, 2, SubtaskStateMapper.RANGE, SubtaskStateMapper.ROUND_ROBIN);
        new StateAssignmentOperation(0L, new HashSet(buildVertices.values()), buildOperatorStates, false).assignStates();
        for (OperatorID operatorID : buildOperatorIds) {
            assertState(buildVertices, operatorID, buildOperatorStates, 0, (v0) -> {
                return v0.getInputChannelState();
            }, 0, 1);
            assertState(buildVertices, operatorID, buildOperatorStates, 1, (v0) -> {
                return v0.getInputChannelState();
            }, 1, 2);
            assertState(buildVertices, operatorID, buildOperatorStates, 0, (v0) -> {
                return v0.getResultSubpartitionState();
            }, 0, 2);
            assertState(buildVertices, operatorID, buildOperatorStates, 1, (v0) -> {
                return v0.getResultSubpartitionState();
            }, 1);
        }
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(0)), buildOperatorIds.get(0), 0).getOutputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(0, 2), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 1), InflightDataRescalingDescriptorUtil.to(1, 2)})), InflightDataRescalingDescriptorUtil.set(new Integer[0])));
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(0)), buildOperatorIds.get(0), 1).getOutputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(1), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 1), InflightDataRescalingDescriptorUtil.to(1, 2)})), InflightDataRescalingDescriptorUtil.set(new Integer[0])));
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(1)), buildOperatorIds.get(1), 0).getInputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(0, 1), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.to(1)})), InflightDataRescalingDescriptorUtil.set(1)));
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(1)), buildOperatorIds.get(1), 1).getInputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(1, 2), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0, 2), InflightDataRescalingDescriptorUtil.to(1)})), InflightDataRescalingDescriptorUtil.set(1)));
    }

    @Test
    void testChannelStateAssignmentNoRescale() throws JobException, JobExecutionException {
        List<OperatorID> buildOperatorIds = buildOperatorIds(2);
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(buildOperatorIds, 2);
        Map<OperatorID, ExecutionJobVertex> buildVertices = buildVertices(buildOperatorIds, 2, SubtaskStateMapper.RANGE, SubtaskStateMapper.ROUND_ROBIN);
        new StateAssignmentOperation(0L, new HashSet(buildVertices.values()), buildOperatorStates, false).assignStates();
        for (OperatorID operatorID : buildOperatorIds) {
            assertState(buildVertices, operatorID, buildOperatorStates, 0, (v0) -> {
                return v0.getInputChannelState();
            }, 0);
            assertState(buildVertices, operatorID, buildOperatorStates, 1, (v0) -> {
                return v0.getInputChannelState();
            }, 1);
            assertState(buildVertices, operatorID, buildOperatorStates, 0, (v0) -> {
                return v0.getResultSubpartitionState();
            }, 0);
            assertState(buildVertices, operatorID, buildOperatorStates, 1, (v0) -> {
                return v0.getResultSubpartitionState();
            }, 1);
        }
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(0)), buildOperatorIds.get(0), 0).getOutputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptor.NO_RESCALE);
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(0)), buildOperatorIds.get(0), 1).getOutputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptor.NO_RESCALE);
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(1)), buildOperatorIds.get(1), 0).getInputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptor.NO_RESCALE);
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(1)), buildOperatorIds.get(1), 1).getInputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptor.NO_RESCALE);
    }

    /* JADX WARN: Type inference failed for: r5v10, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v15, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v20, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v4, types: [int[], int[][]] */
    /* JADX WARN: Type inference failed for: r5v7, types: [int[], int[][]] */
    @Test
    void testChannelStateAssignmentUpscaling() throws JobException, JobExecutionException {
        List<OperatorID> buildOperatorIds = buildOperatorIds(2);
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(buildOperatorIds, 2);
        Map<OperatorID, ExecutionJobVertex> buildVertices = buildVertices(buildOperatorIds, 3, SubtaskStateMapper.RANGE, SubtaskStateMapper.ROUND_ROBIN);
        new StateAssignmentOperation(0L, new HashSet(buildVertices.values()), buildOperatorStates, false).assignStates();
        for (OperatorID operatorID : buildOperatorIds) {
            assertState(buildVertices, operatorID, buildOperatorStates, 0, (v0) -> {
                return v0.getInputChannelState();
            }, 0);
            assertState(buildVertices, operatorID, buildOperatorStates, 1, (v0) -> {
                return v0.getInputChannelState();
            }, 0, 1);
            assertState(buildVertices, operatorID, buildOperatorStates, 2, (v0) -> {
                return v0.getInputChannelState();
            }, 1);
            assertState(buildVertices, operatorID, buildOperatorStates, 0, (v0) -> {
                return v0.getResultSubpartitionState();
            }, 0);
            assertState(buildVertices, operatorID, buildOperatorStates, 1, (v0) -> {
                return v0.getResultSubpartitionState();
            }, 1);
            assertState(buildVertices, operatorID, buildOperatorStates, 2, (v0) -> {
                return v0.getResultSubpartitionState();
            }, new int[0]);
        }
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(0)), buildOperatorIds.get(0), 0).getOutputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(0), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0), InflightDataRescalingDescriptorUtil.to(0, 1), InflightDataRescalingDescriptorUtil.to(1)})), InflightDataRescalingDescriptorUtil.set(new Integer[0])));
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(0)), buildOperatorIds.get(0), 1).getOutputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(1), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0), InflightDataRescalingDescriptorUtil.to(0, 1), InflightDataRescalingDescriptorUtil.to(1)})), InflightDataRescalingDescriptorUtil.set(new Integer[0])));
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(0)), buildOperatorIds.get(0), 2).getOutputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptor.NO_RESCALE);
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(1)), buildOperatorIds.get(1), 0).getInputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(0), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0), InflightDataRescalingDescriptorUtil.to(1), InflightDataRescalingDescriptorUtil.to(new int[0])})), InflightDataRescalingDescriptorUtil.set(0, 1)));
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(1)), buildOperatorIds.get(1), 1).getInputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(0, 1), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0), InflightDataRescalingDescriptorUtil.to(1), InflightDataRescalingDescriptorUtil.to(new int[0])})), InflightDataRescalingDescriptorUtil.set(0, 1)));
        Assertions.assertThat(getAssignedState(buildVertices.get(buildOperatorIds.get(1)), buildOperatorIds.get(1), 2).getInputRescalingDescriptor()).isEqualTo(InflightDataRescalingDescriptorUtil.rescalingDescriptor(InflightDataRescalingDescriptorUtil.to(1), (RescaleMappings[]) InflightDataRescalingDescriptorUtil.array(InflightDataRescalingDescriptorUtil.mappings(new int[]{InflightDataRescalingDescriptorUtil.to(0), InflightDataRescalingDescriptorUtil.to(1), InflightDataRescalingDescriptorUtil.to(new int[0])})), InflightDataRescalingDescriptorUtil.set(0, 1)));
    }

    @Test
    void testOnlyUpstreamChannelStateAssignment() throws JobException, JobExecutionException {
        List<OperatorID> buildOperatorIds = buildOperatorIds(2);
        HashMap hashMap = new HashMap();
        Random random = new Random();
        OperatorState operatorState = new OperatorState(buildOperatorIds.get(0), 2, MAX_P);
        operatorState.putState(0, OperatorSubtaskState.builder().setResultSubpartitionState(new StateObjectCollection(Arrays.asList(StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random), StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random)))).build());
        hashMap.put(buildOperatorIds.get(0), operatorState);
        Map<OperatorID, ExecutionJobVertex> buildVertices = buildVertices(buildOperatorIds, 3, SubtaskStateMapper.RANGE, SubtaskStateMapper.ROUND_ROBIN);
        new StateAssignmentOperation(0L, new HashSet(buildVertices.values()), hashMap, false).assignStates();
        for (ExecutionVertex executionVertex : buildVertices.get(buildOperatorIds.get(0)).getTaskVertices()) {
            Assertions.assertThat(executionVertex.getCurrentExecutionAttempt().getTaskRestore()).isNotNull();
        }
        for (ExecutionVertex executionVertex2 : buildVertices.get(buildOperatorIds.get(1)).getTaskVertices()) {
            Assertions.assertThat(executionVertex2.getCurrentExecutionAttempt().getTaskRestore()).isNotNull();
        }
    }

    @Test
    void testOnlyUpstreamChannelRescaleStateAssignment() throws JobException, JobExecutionException {
        Random random = new Random();
        testOnlyUpstreamOrDownstreamRescalingInternal(OperatorSubtaskState.builder().setResultSubpartitionState(new StateObjectCollection(Arrays.asList(StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random), StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random)))).build(), null, 5, 7);
    }

    @Test
    void testOnlyDownstreamChannelRescaleStateAssignment() throws JobException, JobExecutionException {
        Random random = new Random();
        testOnlyUpstreamOrDownstreamRescalingInternal(null, OperatorSubtaskState.builder().setInputChannelState(new StateObjectCollection(Arrays.asList(StateHandleDummyUtil.createNewInputChannelStateHandle(10, random), StateHandleDummyUtil.createNewInputChannelStateHandle(10, random)))).build(), 5, 5);
    }

    private void testOnlyUpstreamOrDownstreamRescalingInternal(@Nullable OperatorSubtaskState operatorSubtaskState, @Nullable OperatorSubtaskState operatorSubtaskState2, int i, int i2) throws JobException, JobExecutionException {
        Preconditions.checkArgument(operatorSubtaskState != operatorSubtaskState2 && (operatorSubtaskState == null || operatorSubtaskState2 == null), "Either upstream or downstream state must exist, but not both");
        int i3 = 5;
        int i4 = 5;
        List<OperatorID> buildOperatorIds = buildOperatorIds(2);
        HashMap hashMap = new HashMap();
        OperatorState operatorState = new OperatorState(buildOperatorIds.get(0), 5, MAX_P);
        OperatorState operatorState2 = new OperatorState(buildOperatorIds.get(1), 5, MAX_P);
        hashMap.put(buildOperatorIds.get(0), operatorState);
        hashMap.put(buildOperatorIds.get(1), operatorState2);
        if (operatorSubtaskState != null) {
            operatorState.putState(0, operatorSubtaskState);
            i4 = 3;
        }
        if (operatorSubtaskState2 != null) {
            operatorState2.putState(0, operatorSubtaskState2);
            i3 = 3;
        }
        ArrayList arrayList = new ArrayList(2);
        arrayList.add(new OperatorIdWithParallelism(buildOperatorIds.get(0), i3));
        arrayList.add(new OperatorIdWithParallelism(buildOperatorIds.get(1), i4));
        Map<OperatorID, ExecutionJobVertex> buildVertices = buildVertices(arrayList, SubtaskStateMapper.RANGE, SubtaskStateMapper.ROUND_ROBIN);
        new StateAssignmentOperation(0L, new HashSet(buildVertices.values()), hashMap, false).assignStates();
        ExecutionJobVertex executionJobVertex = buildVertices.get(buildOperatorIds.get(0));
        ExecutionJobVertex executionJobVertex2 = buildVertices.get(buildOperatorIds.get(1));
        List<TaskStateSnapshot> taskStateSnapshotFromVertex = getTaskStateSnapshotFromVertex(executionJobVertex);
        List<TaskStateSnapshot> taskStateSnapshotFromVertex2 = getTaskStateSnapshotFromVertex(executionJobVertex2);
        checkMappings(taskStateSnapshotFromVertex, (v0) -> {
            return v0.getOutputRescalingDescriptor();
        }, i);
        checkMappings(taskStateSnapshotFromVertex2, (v0) -> {
            return v0.getInputRescalingDescriptor();
        }, i2);
    }

    private void checkMappings(List<TaskStateSnapshot> list, Function<TaskStateSnapshot, InflightDataRescalingDescriptor> function, int i) {
        Assertions.assertThat(list.stream().map(function).mapToInt(inflightDataRescalingDescriptor -> {
            int length = inflightDataRescalingDescriptor.getOldSubtaskIndexes(0).length;
            Assertions.assertThat(length).isGreaterThan(0);
            return length;
        }).sum()).isEqualTo(i);
    }

    @Test
    void testStateWithFullyFinishedOperators() throws JobException, JobExecutionException {
        List<OperatorID> buildOperatorIds = buildOperatorIds(2);
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(Collections.singletonList(buildOperatorIds.get(1)), 3);
        buildOperatorStates.put(buildOperatorIds.get(0), new FullyFinishedOperatorState(buildOperatorIds.get(0), 3, MAX_P));
        Map<OperatorID, ExecutionJobVertex> buildVertices = buildVertices(buildOperatorIds, 2, SubtaskStateMapper.RANGE, SubtaskStateMapper.ROUND_ROBIN);
        new StateAssignmentOperation(0L, new HashSet(buildVertices.values()), buildOperatorStates, false).assignStates();
        for (ExecutionVertex executionVertex : buildVertices.get(buildOperatorIds.get(0)).getTaskVertices()) {
            Assertions.assertThat(executionVertex.getCurrentExecutionAttempt().getTaskRestore().getTaskStateSnapshot().isTaskDeployedAsFinished()).isTrue();
        }
        for (ExecutionVertex executionVertex2 : buildVertices.get(buildOperatorIds.get(1)).getTaskVertices()) {
            Assertions.assertThat(executionVertex2.getCurrentExecutionAttempt().getTaskRestore().getTaskStateSnapshot().isTaskDeployedAsFinished()).isFalse();
        }
    }

    private void assertState(Map<OperatorID, ExecutionJobVertex> map, OperatorID operatorID, Map<OperatorID, OperatorState> map2, int i, Function<OperatorSubtaskState, StateObjectCollection<?>> function, int... iArr) {
        Assertions.assertThat(function.apply(getAssignedState(map.get(operatorID), operatorID, i)).containsAll((Collection) Arrays.stream(iArr).boxed().flatMap(num -> {
            return ((StateObjectCollection) function.apply(((OperatorState) map2.get(operatorID)).getState(num.intValue()))).stream();
        }).collect(Collectors.toList()))).isTrue();
    }

    @Test
    void assigningStatesShouldWorkWithUserDefinedOperatorIdsAsWell() {
        OperatorID operatorID = new OperatorID();
        OperatorID operatorID2 = new OperatorID();
        List<OperatorID> singletonList = Collections.singletonList(operatorID2);
        ExecutionJobVertex buildExecutionJobVertex = buildExecutionJobVertex(operatorID, operatorID2, 1);
        Map<OperatorID, OperatorState> buildOperatorStates = buildOperatorStates(singletonList, 1);
        new StateAssignmentOperation(0L, Collections.singleton(buildExecutionJobVertex), buildOperatorStates, false).assignStates();
        Assertions.assertThat(getAssignedState(buildExecutionJobVertex, operatorID, 0)).isEqualTo(buildOperatorStates.get(operatorID2).getState(0));
    }

    @Test
    void assigningStateHandlesCanNotBeNull() {
        OperatorState operatorState = new OperatorState(new OperatorID(), 1, MAX_P);
        List managedKeyedStateHandles = StateAssignmentOperation.getManagedKeyedStateHandles(operatorState, KeyGroupRange.of(0, 1));
        List rawKeyedStateHandles = StateAssignmentOperation.getRawKeyedStateHandles(operatorState, KeyGroupRange.of(0, 1));
        Assertions.assertThat(managedKeyedStateHandles).isEmpty();
        Assertions.assertThat(rawKeyedStateHandles).isEmpty();
    }

    private List<OperatorID> buildOperatorIds(int i) {
        return (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new OperatorID();
        }).collect(Collectors.toList());
    }

    private Map<OperatorID, OperatorState> buildOperatorStates(List<OperatorID> list, int i) {
        Random random = new Random();
        OperatorID operatorID = list.get(list.size() - 1);
        return (Map) list.stream().collect(Collectors.toMap(Function.identity(), operatorID2 -> {
            OperatorState operatorState = new OperatorState(operatorID2, i, MAX_P);
            for (int i2 = 0; i2 < i; i2++) {
                operatorState.putState(i2, OperatorSubtaskState.builder().setManagedOperatorState(new StateObjectCollection(Arrays.asList(StateHandleDummyUtil.createNewOperatorStateHandle(10, random), StateHandleDummyUtil.createNewOperatorStateHandle(10, random)))).setRawOperatorState(new StateObjectCollection(Arrays.asList(StateHandleDummyUtil.createNewOperatorStateHandle(10, random), StateHandleDummyUtil.createNewOperatorStateHandle(10, random)))).setManagedKeyedState(StateObjectCollection.singleton(StateHandleDummyUtil.createNewKeyedStateHandle(KeyGroupRange.of(i2, i2)))).setRawKeyedState(StateObjectCollection.singleton(StateHandleDummyUtil.createNewKeyedStateHandle(KeyGroupRange.of(i2, i2)))).setInputChannelState(operatorID2 == list.get(0) ? StateObjectCollection.empty() : new StateObjectCollection(Arrays.asList(StateHandleDummyUtil.createNewInputChannelStateHandle(10, random), StateHandleDummyUtil.createNewInputChannelStateHandle(10, random)))).setResultSubpartitionState(operatorID2 == operatorID ? StateObjectCollection.empty() : new StateObjectCollection(Arrays.asList(StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random), StateHandleDummyUtil.createNewResultSubpartitionStateHandle(10, random)))).build());
            }
            return operatorState;
        }));
    }

    private Map<OperatorID, ExecutionJobVertex> buildVertices(List<OperatorID> list, int i, SubtaskStateMapper subtaskStateMapper, SubtaskStateMapper subtaskStateMapper2) throws JobException, JobExecutionException {
        return buildVertices((List) list.stream().map(operatorID -> {
            return new OperatorIdWithParallelism(operatorID, i);
        }).collect(Collectors.toList()), subtaskStateMapper, subtaskStateMapper2);
    }

    private Map<OperatorID, ExecutionJobVertex> buildVertices(List<OperatorIdWithParallelism> list, SubtaskStateMapper subtaskStateMapper, SubtaskStateMapper subtaskStateMapper2) throws JobException, JobExecutionException {
        JobVertex[] jobVertexArr = (JobVertex[]) list.stream().map(operatorIdWithParallelism -> {
            return createJobVertex(operatorIdWithParallelism.getOperatorID(), operatorIdWithParallelism.getOperatorID(), operatorIdWithParallelism.getParallelism());
        }).toArray(i -> {
            return new JobVertex[i];
        });
        for (int i2 = 1; i2 < jobVertexArr.length; i2++) {
            connectVertices(jobVertexArr[i2 - 1], jobVertexArr[i2], subtaskStateMapper2, subtaskStateMapper);
        }
        return toExecutionVertices(jobVertexArr);
    }

    private Map<OperatorID, ExecutionJobVertex> toExecutionVertices(JobVertex... jobVertexArr) throws JobException, JobExecutionException {
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphTestUtils.streamingJobGraph(jobVertexArr)).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
        return (Map) Arrays.stream(jobVertexArr).collect(Collectors.toMap(jobVertex -> {
            return ((OperatorIDPair) jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID();
        }, jobVertex2 -> {
            try {
                return build.getJobVertex(jobVertex2.getID());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }));
    }

    private void connectVertices(JobVertex jobVertex, JobVertex jobVertex2, SubtaskStateMapper subtaskStateMapper, SubtaskStateMapper subtaskStateMapper2) {
        JobEdge connectNewDataSetAsInput = jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        connectNewDataSetAsInput.setDownstreamSubtaskStateMapper(subtaskStateMapper2);
        connectNewDataSetAsInput.setUpstreamSubtaskStateMapper(subtaskStateMapper);
    }

    private ExecutionJobVertex buildExecutionJobVertex(OperatorID operatorID, OperatorID operatorID2, int i) {
        try {
            return ExecutionGraphTestUtils.getExecutionJobVertex(createJobVertex(operatorID, operatorID2, i));
        } catch (Exception e) {
            throw new AssertionError("Cannot create ExecutionJobVertex", e);
        }
    }

    private JobVertex createJobVertex(OperatorID operatorID, int i) {
        return createJobVertex(operatorID, operatorID, i);
    }

    private JobVertex createJobVertex(OperatorID operatorID, OperatorID operatorID2, int i) {
        JobVertex jobVertex = new JobVertex(operatorID.toHexString(), new JobVertexID(), Collections.singletonList(OperatorIDPair.of(operatorID, operatorID2)));
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return jobVertex;
    }

    private List<TaskStateSnapshot> getTaskStateSnapshotFromVertex(ExecutionJobVertex executionJobVertex) {
        return (List) Arrays.stream(executionJobVertex.getTaskVertices()).map((v0) -> {
            return v0.getCurrentExecutionAttempt();
        }).map((v0) -> {
            return v0.getTaskRestore();
        }).map((v0) -> {
            return v0.getTaskStateSnapshot();
        }).collect(Collectors.toList());
    }

    private OperatorSubtaskState getAssignedState(ExecutionJobVertex executionJobVertex, OperatorID operatorID, int i) {
        return executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore().getTaskStateSnapshot().getSubtaskStateByOperatorID(operatorID);
    }
}
