package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequest;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecker;
import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.class */
class SlotSharingExecutionSlotAllocator implements ExecutionSlotAllocator {
    private static final Logger LOG = LoggerFactory.getLogger(SlotSharingExecutionSlotAllocator.class);
    private final PhysicalSlotProvider slotProvider;
    private final boolean slotWillBeOccupiedIndefinitely;
    private final SlotSharingStrategy slotSharingStrategy;
    private final Map<ExecutionSlotSharingGroup, SharedSlot> sharedSlots = new IdentityHashMap();
    private final SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory;
    private final PhysicalSlotRequestBulkChecker bulkChecker;
    private final Time allocationTimeout;
    private final Function<ExecutionVertexID, ResourceProfile> resourceProfileRetriever;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator$SlotExecutionVertexAssignment.class */
    public static class SlotExecutionVertexAssignment {
        private final ExecutionVertexID executionVertexId;
        private final CompletableFuture<LogicalSlot> logicalSlotFuture;

        SlotExecutionVertexAssignment(ExecutionVertexID executionVertexID, CompletableFuture<LogicalSlot> completableFuture) {
            this.executionVertexId = (ExecutionVertexID) Preconditions.checkNotNull(executionVertexID);
            this.logicalSlotFuture = (CompletableFuture) Preconditions.checkNotNull(completableFuture);
        }

        ExecutionVertexID getExecutionVertexId() {
            return this.executionVertexId;
        }

        CompletableFuture<LogicalSlot> getLogicalSlotFuture() {
            return this.logicalSlotFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SlotSharingExecutionSlotAllocator(PhysicalSlotProvider physicalSlotProvider, boolean z, SlotSharingStrategy slotSharingStrategy, SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory, PhysicalSlotRequestBulkChecker physicalSlotRequestBulkChecker, Time time, Function<ExecutionVertexID, ResourceProfile> function) {
        this.slotProvider = (PhysicalSlotProvider) Preconditions.checkNotNull(physicalSlotProvider);
        this.slotWillBeOccupiedIndefinitely = z;
        this.slotSharingStrategy = (SlotSharingStrategy) Preconditions.checkNotNull(slotSharingStrategy);
        this.sharedSlotProfileRetrieverFactory = (SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory) Preconditions.checkNotNull(sharedSlotProfileRetrieverFactory);
        this.bulkChecker = (PhysicalSlotRequestBulkChecker) Preconditions.checkNotNull(physicalSlotRequestBulkChecker);
        this.allocationTimeout = (Time) Preconditions.checkNotNull(time);
        this.resourceProfileRetriever = (Function) Preconditions.checkNotNull(function);
        this.slotProvider.disableBatchSlotRequestTimeoutCheck();
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public Map<ExecutionAttemptID, ExecutionSlotAssignment> allocateSlotsFor(List<ExecutionAttemptID> list) {
        HashMap hashMap = new HashMap();
        list.forEach(executionAttemptID -> {
        });
        Preconditions.checkState(hashMap.size() == list.size(), "SlotSharingExecutionSlotAllocator does not support one execution vertex to have multiple concurrent executions");
        return (Map) allocateSlotsForVertices((List) list.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toList())).stream().collect(Collectors.toMap(slotExecutionVertexAssignment -> {
            return (ExecutionAttemptID) hashMap.get(slotExecutionVertexAssignment.getExecutionVertexId());
        }, slotExecutionVertexAssignment2 -> {
            return new ExecutionSlotAssignment((ExecutionAttemptID) hashMap.get(slotExecutionVertexAssignment2.getExecutionVertexId()), slotExecutionVertexAssignment2.getLogicalSlotFuture());
        }));
    }

    private List<SlotExecutionVertexAssignment> allocateSlotsForVertices(List<ExecutionVertexID> list) {
        SharedSlotProfileRetriever createFromBulk = this.sharedSlotProfileRetrieverFactory.createFromBulk(new HashSet(list));
        Stream<ExecutionVertexID> stream = list.stream();
        SlotSharingStrategy slotSharingStrategy = this.slotSharingStrategy;
        slotSharingStrategy.getClass();
        Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> map = (Map) stream.collect(Collectors.groupingBy(slotSharingStrategy::getExecutionSlotSharingGroup));
        HashMap hashMap = new HashMap(map.size());
        HashSet hashSet = new HashSet(map.keySet());
        Map<ExecutionSlotSharingGroup, SharedSlot> tryAssignExistingSharedSlots = tryAssignExistingSharedSlots(hashSet);
        hashMap.putAll(tryAssignExistingSharedSlots);
        hashSet.removeAll(tryAssignExistingSharedSlots.keySet());
        if (!hashSet.isEmpty()) {
            Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots = allocateSharedSlots(hashSet, createFromBulk);
            hashMap.putAll(allocateSharedSlots);
            hashSet.removeAll(allocateSharedSlots.keySet());
            Preconditions.checkState(hashSet.isEmpty());
        }
        Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots = allocateLogicalSlotsFromSharedSlots(hashMap, map);
        this.bulkChecker.schedulePendingRequestBulkTimeoutCheck(createBulk(hashMap, map), this.allocationTimeout);
        Stream<ExecutionVertexID> stream2 = list.stream();
        allocateLogicalSlotsFromSharedSlots.getClass();
        return (List) stream2.map((v1) -> {
            return r1.get(v1);
        }).collect(Collectors.toList());
    }

    @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocator
    public void cancel(ExecutionAttemptID executionAttemptID) {
        cancelLogicalSlotRequest(executionAttemptID.getExecutionVertexId(), null);
    }

    private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexID, Throwable th) {
        ExecutionSlotSharingGroup executionSlotSharingGroup = this.slotSharingStrategy.getExecutionSlotSharingGroup(executionVertexID);
        Preconditions.checkNotNull(executionSlotSharingGroup, "There is no ExecutionSlotSharingGroup for ExecutionVertexID " + executionVertexID);
        SharedSlot sharedSlot = this.sharedSlots.get(executionSlotSharingGroup);
        if (sharedSlot != null) {
            sharedSlot.cancelLogicalSlotRequest(executionVertexID, th);
        } else {
            LOG.debug("There is no SharedSlot for ExecutionSlotSharingGroup of ExecutionVertexID {}", executionVertexID);
        }
    }

    private static Map<ExecutionVertexID, SlotExecutionVertexAssignment> allocateLogicalSlotsFromSharedSlots(Map<ExecutionSlotSharingGroup, SharedSlot> map, Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> map2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<ExecutionSlotSharingGroup, List<ExecutionVertexID>> entry : map2.entrySet()) {
            ExecutionSlotSharingGroup key = entry.getKey();
            for (ExecutionVertexID executionVertexID : entry.getValue()) {
                hashMap.put(executionVertexID, new SlotExecutionVertexAssignment(executionVertexID, map.get(key).allocateLogicalSlot(executionVertexID)));
            }
        }
        return hashMap;
    }

    private Map<ExecutionSlotSharingGroup, SharedSlot> tryAssignExistingSharedSlots(Set<ExecutionSlotSharingGroup> set) {
        HashMap hashMap = new HashMap(set.size());
        for (ExecutionSlotSharingGroup executionSlotSharingGroup : set) {
            SharedSlot sharedSlot = this.sharedSlots.get(executionSlotSharingGroup);
            if (sharedSlot != null) {
                hashMap.put(executionSlotSharingGroup, sharedSlot);
            }
        }
        return hashMap;
    }

    private Map<ExecutionSlotSharingGroup, SharedSlot> allocateSharedSlots(Set<ExecutionSlotSharingGroup> set, SharedSlotProfileRetriever sharedSlotProfileRetriever) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        for (ExecutionSlotSharingGroup executionSlotSharingGroup : set) {
            SlotRequestId slotRequestId = new SlotRequestId();
            ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(executionSlotSharingGroup);
            arrayList.add(new PhysicalSlotRequest(slotRequestId, sharedSlotProfileRetriever.getSlotProfile(executionSlotSharingGroup, physicalSlotResourceProfile), this.slotWillBeOccupiedIndefinitely));
            hashMap2.put(slotRequestId, executionSlotSharingGroup);
            hashMap3.put(slotRequestId, physicalSlotResourceProfile);
        }
        this.slotProvider.allocatePhysicalSlots(arrayList).forEach((slotRequestId2, completableFuture) -> {
            ExecutionSlotSharingGroup executionSlotSharingGroup2 = (ExecutionSlotSharingGroup) hashMap2.get(slotRequestId2);
            SharedSlot sharedSlot = new SharedSlot(slotRequestId2, (ResourceProfile) hashMap3.get(slotRequestId2), executionSlotSharingGroup2, completableFuture.thenApply((v0) -> {
                return v0.getPhysicalSlot();
            }), this.slotWillBeOccupiedIndefinitely, this::releaseSharedSlot);
            hashMap.put(executionSlotSharingGroup2, sharedSlot);
            Preconditions.checkState(!this.sharedSlots.containsKey(executionSlotSharingGroup2));
            this.sharedSlots.put(executionSlotSharingGroup2, sharedSlot);
        });
        return hashMap;
    }

    private void releaseSharedSlot(ExecutionSlotSharingGroup executionSlotSharingGroup) {
        SharedSlot remove = this.sharedSlots.remove(executionSlotSharingGroup);
        Preconditions.checkNotNull(remove);
        Preconditions.checkState(remove.isEmpty(), "Trying to remove a shared slot with physical request id %s which has assigned logical slots", new Object[]{remove.getPhysicalSlotRequestId()});
        this.slotProvider.cancelSlotRequest(remove.getPhysicalSlotRequestId(), new FlinkException("Slot is being returned from SlotSharingExecutionSlotAllocator."));
    }

    private ResourceProfile getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup executionSlotSharingGroup) {
        return !executionSlotSharingGroup.getResourceProfile().equals(ResourceProfile.UNKNOWN) ? executionSlotSharingGroup.getResourceProfile() : (ResourceProfile) executionSlotSharingGroup.getExecutionVertexIds().stream().reduce(ResourceProfile.ZERO, (resourceProfile, executionVertexID) -> {
            return resourceProfile.merge(this.resourceProfileRetriever.apply(executionVertexID));
        }, (v0, v1) -> {
            return v0.merge(v1);
        });
    }

    private SharingPhysicalSlotRequestBulk createBulk(Map<ExecutionSlotSharingGroup, SharedSlot> map, Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> map2) {
        SharingPhysicalSlotRequestBulk sharingPhysicalSlotRequestBulk = new SharingPhysicalSlotRequestBulk(map2, (Map) map2.keySet().stream().collect(Collectors.toMap(executionSlotSharingGroup -> {
            return executionSlotSharingGroup;
        }, executionSlotSharingGroup2 -> {
            return ((SharedSlot) map.get(executionSlotSharingGroup2)).getPhysicalSlotResourceProfile();
        })), this::cancelLogicalSlotRequest);
        registerPhysicalSlotRequestBulkCallbacks(map, map2.keySet(), sharingPhysicalSlotRequestBulk);
        return sharingPhysicalSlotRequestBulk;
    }

    private static void registerPhysicalSlotRequestBulkCallbacks(Map<ExecutionSlotSharingGroup, SharedSlot> map, Iterable<ExecutionSlotSharingGroup> iterable, SharingPhysicalSlotRequestBulk sharingPhysicalSlotRequestBulk) {
        for (ExecutionSlotSharingGroup executionSlotSharingGroup : iterable) {
            CompletableFuture<PhysicalSlot> slotContextFuture = map.get(executionSlotSharingGroup).getSlotContextFuture();
            slotContextFuture.thenAccept(physicalSlot -> {
                sharingPhysicalSlotRequestBulk.markFulfilled(executionSlotSharingGroup, physicalSlot.getAllocationId());
            });
            slotContextFuture.exceptionally(th -> {
                sharingPhysicalSlotRequestBulk.clearPendingRequests();
                return null;
            });
        }
    }
}
