package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.util.ResourceCounter;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.class */
public class SlotSharingSlotAllocator implements SlotAllocator<VertexParallelismWithSlotSharing> {
    private final ReserveSlotFunction reserveSlotFunction;
    private final FreeSlotFunction freeSlotFunction;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator$ExecutionSlotSharingGroup.class */
    public static class ExecutionSlotSharingGroup {
        private final Set<ExecutionVertexID> containedExecutionVertices;

        public ExecutionSlotSharingGroup(Set<ExecutionVertexID> set) {
            this.containedExecutionVertices = set;
        }

        public Collection<ExecutionVertexID> getContainedExecutionVertices() {
            return this.containedExecutionVertices;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator$ExecutionSlotSharingGroupAndSlot.class */
    public static class ExecutionSlotSharingGroupAndSlot {
        private final ExecutionSlotSharingGroup executionSlotSharingGroup;
        private final SlotInfo slotInfo;

        public ExecutionSlotSharingGroupAndSlot(ExecutionSlotSharingGroup executionSlotSharingGroup, SlotInfo slotInfo) {
            this.executionSlotSharingGroup = executionSlotSharingGroup;
            this.slotInfo = slotInfo;
        }

        public ExecutionSlotSharingGroup getExecutionSlotSharingGroup() {
            return this.executionSlotSharingGroup;
        }

        public SlotInfo getSlotInfo() {
            return this.slotInfo;
        }
    }

    public SlotSharingSlotAllocator(ReserveSlotFunction reserveSlotFunction, FreeSlotFunction freeSlotFunction) {
        this.reserveSlotFunction = reserveSlotFunction;
        this.freeSlotFunction = freeSlotFunction;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public ResourceCounter calculateRequiredSlots(Iterable<JobInformation.VertexInformation> iterable) {
        int i = 0;
        Iterator<Integer> it = getMaxParallelismForSlotSharingGroups(iterable).values().iterator();
        while (it.hasNext()) {
            i += it.next().intValue();
        }
        return ResourceCounter.withResource(ResourceProfile.UNKNOWN, i);
    }

    private static Map<SlotSharingGroupId, Integer> getMaxParallelismForSlotSharingGroups(Iterable<JobInformation.VertexInformation> iterable) {
        HashMap hashMap = new HashMap();
        for (JobInformation.VertexInformation vertexInformation : iterable) {
            hashMap.compute(vertexInformation.getSlotSharingGroup().getSlotSharingGroupId(), (slotSharingGroupId, num) -> {
                return Integer.valueOf(num == null ? vertexInformation.getParallelism() : Math.max(num.intValue(), vertexInformation.getParallelism()));
            });
        }
        return hashMap;
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public Optional<VertexParallelismWithSlotSharing> determineParallelism(JobInformation jobInformation, Collection<? extends SlotInfo> collection) {
        int size = collection.size() / jobInformation.getSlotSharingGroups().size();
        if (size == 0) {
            return Optional.empty();
        }
        Iterator<? extends SlotInfo> it = collection.iterator();
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        Iterator<SlotSharingGroup> it2 = jobInformation.getSlotSharingGroups().iterator();
        while (it2.hasNext()) {
            Stream<JobVertexID> stream = it2.next().getJobVertexIds().stream();
            jobInformation.getClass();
            Map<JobVertexID, Integer> determineParallelism = determineParallelism((List) stream.map(jobInformation::getVertexInformation).collect(Collectors.toList()), size);
            Iterator<ExecutionSlotSharingGroup> it3 = createExecutionSlotSharingGroups(determineParallelism).iterator();
            while (it3.hasNext()) {
                arrayList.add(new ExecutionSlotSharingGroupAndSlot(it3.next(), it.next()));
            }
            hashMap.putAll(determineParallelism);
        }
        return Optional.of(new VertexParallelismWithSlotSharing(hashMap, arrayList));
    }

    private static Map<JobVertexID, Integer> determineParallelism(Collection<JobInformation.VertexInformation> collection, int i) {
        HashMap hashMap = new HashMap();
        for (JobInformation.VertexInformation vertexInformation : collection) {
            hashMap.put(vertexInformation.getJobVertexID(), Integer.valueOf(Math.min(vertexInformation.getParallelism(), i)));
        }
        return hashMap;
    }

    private static Iterable<ExecutionSlotSharingGroup> createExecutionSlotSharingGroups(Map<JobVertexID, Integer> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<JobVertexID, Integer> entry : map.entrySet()) {
            for (int i = 0; i < entry.getValue().intValue(); i++) {
                ((Set) hashMap.computeIfAbsent(Integer.valueOf(i), num -> {
                    return new HashSet();
                })).add(new ExecutionVertexID(entry.getKey(), i));
            }
        }
        return (Iterable) hashMap.values().stream().map(ExecutionSlotSharingGroup::new).collect(Collectors.toList());
    }

    @Override // org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator
    public Map<ExecutionVertexID, LogicalSlot> reserveResources(VertexParallelismWithSlotSharing vertexParallelismWithSlotSharing) {
        HashMap hashMap = new HashMap();
        for (ExecutionSlotSharingGroupAndSlot executionSlotSharingGroupAndSlot : vertexParallelismWithSlotSharing.getAssignments()) {
            SharedSlot reserveSharedSlot = reserveSharedSlot(executionSlotSharingGroupAndSlot.getSlotInfo());
            Iterator<ExecutionVertexID> it = executionSlotSharingGroupAndSlot.getExecutionSlotSharingGroup().getContainedExecutionVertices().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next(), reserveSharedSlot.allocateLogicalSlot());
            }
        }
        return hashMap;
    }

    private SharedSlot reserveSharedSlot(SlotInfo slotInfo) {
        return new SharedSlot(new SlotRequestId(), this.reserveSlotFunction.reserveSlot(slotInfo.getAllocationId(), ResourceProfile.UNKNOWN), slotInfo.willBeOccupiedIndefinitely(), () -> {
            this.freeSlotFunction.freeSlot(slotInfo.getAllocationId(), null, System.currentTimeMillis());
        });
    }
}
