package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.class */
public class SlotManager implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class);
    private final ScheduledExecutor scheduledExecutor;
    private final Time taskManagerRequestTimeout;
    private final Time slotRequestTimeout;
    private final Time taskManagerTimeout;
    private final HashMap<SlotID, TaskManagerSlot> slots = new HashMap<>(16);
    private final LinkedHashMap<SlotID, TaskManagerSlot> freeSlots = new LinkedHashMap<>(16);
    private final HashMap<InstanceID, TaskManagerRegistration> taskManagerRegistrations = new HashMap<>(4);
    private final HashMap<AllocationID, SlotID> fulfilledSlotRequests = new HashMap<>(16);
    private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests = new HashMap<>(16);
    private UUID leaderId = null;
    private ResourceManagerActions resourceManagerActions = null;
    private Executor mainThreadExecutor = null;
    private ScheduledFuture<?> taskManagerTimeoutCheck = null;
    private ScheduledFuture<?> slotRequestTimeoutCheck = null;
    private boolean started = false;

    public SlotManager(ScheduledExecutor scheduledExecutor, Time time, Time time2, Time time3) {
        this.scheduledExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
        this.taskManagerRequestTimeout = (Time) Preconditions.checkNotNull(time);
        this.slotRequestTimeout = (Time) Preconditions.checkNotNull(time2);
        this.taskManagerTimeout = (Time) Preconditions.checkNotNull(time3);
    }

    public void start(UUID uuid, Executor executor, ResourceManagerActions resourceManagerActions) {
        LOG.info("Starting the SlotManager.");
        this.leaderId = (UUID) Preconditions.checkNotNull(uuid);
        this.mainThreadExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.resourceManagerActions = (ResourceManagerActions) Preconditions.checkNotNull(resourceManagerActions);
        this.started = true;
        this.taskManagerTimeoutCheck = this.scheduledExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.1
            @Override // java.lang.Runnable
            public void run() {
                SlotManager.this.mainThreadExecutor.execute(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        SlotManager.this.checkTaskManagerTimeouts();
                    }
                });
            }
        }, 0L, this.taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        this.slotRequestTimeoutCheck = this.scheduledExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.2
            @Override // java.lang.Runnable
            public void run() {
                SlotManager.this.mainThreadExecutor.execute(new Runnable() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        SlotManager.this.checkSlotRequestTimeouts();
                    }
                });
            }
        }, 0L, this.slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    public void suspend() {
        LOG.info("Suspending the SlotManager.");
        this.taskManagerTimeoutCheck.cancel(false);
        this.slotRequestTimeoutCheck.cancel(false);
        this.taskManagerTimeoutCheck = null;
        this.slotRequestTimeoutCheck = null;
        Iterator<PendingSlotRequest> it = this.pendingSlotRequests.values().iterator();
        while (it.hasNext()) {
            cancelPendingSlotRequest(it.next());
        }
        this.pendingSlotRequests.clear();
        Iterator it2 = new ArrayList(this.taskManagerRegistrations.keySet()).iterator();
        while (it2.hasNext()) {
            unregisterTaskManager((InstanceID) it2.next());
        }
        this.leaderId = null;
        this.resourceManagerActions = null;
        this.started = false;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOG.info("Closing the SlotManager.");
        suspend();
    }

    public boolean registerSlotRequest(SlotRequest slotRequest) throws SlotManagerException {
        checkInit();
        if (checkDuplicateRequest(slotRequest.getAllocationId())) {
            LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
            return false;
        }
        PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
        this.pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
        try {
            internalRequestSlot(pendingSlotRequest);
            return true;
        } catch (ResourceManagerException e) {
            this.pendingSlotRequests.remove(slotRequest.getAllocationId());
            throw new SlotManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
        }
    }

    public boolean unregisterSlotRequest(AllocationID allocationID) {
        checkInit();
        PendingSlotRequest remove = this.pendingSlotRequests.remove(allocationID);
        if (null != remove) {
            cancelPendingSlotRequest(remove);
            return true;
        }
        LOG.debug("No pending slot request with allocation id {} found.", allocationID);
        return false;
    }

    public void registerTaskManager(TaskExecutorConnection taskExecutorConnection, SlotReport slotReport) {
        checkInit();
        LOG.info("Register TaskManager {} at the SlotManager.", taskExecutorConnection.getInstanceID());
        if (this.taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
            reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport);
            return;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<SlotStatus> it = slotReport.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getSlotID());
        }
        TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(taskExecutorConnection, arrayList);
        this.taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
        Iterator<SlotStatus> it2 = slotReport.iterator();
        while (it2.hasNext()) {
            SlotStatus next = it2.next();
            registerSlot(next.getSlotID(), next.getAllocationID(), next.getResourceProfile(), taskExecutorConnection);
        }
        if (!anySlotUsed(taskManagerRegistration.getSlots())) {
            taskManagerRegistration.markIdle();
        } else {
            taskManagerRegistration.markUsed();
        }
    }

    public boolean unregisterTaskManager(InstanceID instanceID) {
        checkInit();
        TaskManagerRegistration remove = this.taskManagerRegistrations.remove(instanceID);
        if (null != remove) {
            internalUnregisterTaskManager(remove);
            return true;
        }
        LOG.debug("There is no task manager registered with instance ID {}. Ignoring this message.", instanceID);
        return false;
    }

    public boolean reportSlotStatus(InstanceID instanceID, SlotReport slotReport) {
        checkInit();
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(instanceID);
        if (null == taskManagerRegistration) {
            LOG.debug("Received slot report for unknown task manager with instance id {}. Ignoring this report.", instanceID);
            return false;
        }
        boolean z = true;
        Iterator<SlotStatus> it = slotReport.iterator();
        while (it.hasNext()) {
            SlotStatus next = it.next();
            if (!taskManagerRegistration.containsSlot(next.getSlotID()) || !updateSlot(next.getSlotID(), next.getAllocationID())) {
                throw new IllegalStateException("Reported a slot status for slot " + next.getSlotID() + " which has not been registered.");
            }
            z &= this.slots.get(next.getSlotID()).isFree();
        }
        if (z) {
            taskManagerRegistration.markIdle();
            return true;
        }
        taskManagerRegistration.markUsed();
        return true;
    }

    public void freeSlot(SlotID slotID, AllocationID allocationID) {
        checkInit();
        TaskManagerSlot taskManagerSlot = this.slots.get(slotID);
        if (null == taskManagerSlot) {
            LOG.debug("Trying to free a slot {} which has not been registered. Ignoring this message.", slotID);
            return;
        }
        if (!taskManagerSlot.isAllocated()) {
            LOG.debug("Slot {} has not been allocated.", allocationID);
            return;
        }
        if (!Objects.equals(allocationID, taskManagerSlot.getAllocationId())) {
            LOG.debug("Received request to free slot {} with expected allocation id {}, but actual allocation id {} differs. Ignoring the request.", new Object[]{slotID, allocationID, taskManagerSlot.getAllocationId()});
            return;
        }
        taskManagerSlot.setAllocationId(null);
        this.fulfilledSlotRequests.remove(allocationID);
        if (taskManagerSlot.isFree()) {
            handleFreeSlot(taskManagerSlot);
        }
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
        if (null != taskManagerRegistration) {
            if (anySlotUsed(taskManagerRegistration.getSlots())) {
                taskManagerRegistration.markUsed();
            } else {
                taskManagerRegistration.markIdle();
            }
        }
    }

    protected PendingSlotRequest findMatchingRequest(ResourceProfile resourceProfile) {
        for (PendingSlotRequest pendingSlotRequest : this.pendingSlotRequests.values()) {
            if (!pendingSlotRequest.isAssigned() && resourceProfile.isMatching(pendingSlotRequest.getResourceProfile())) {
                return pendingSlotRequest;
            }
        }
        return null;
    }

    protected TaskManagerSlot findMatchingSlot(ResourceProfile resourceProfile) {
        Iterator<Map.Entry<SlotID, TaskManagerSlot>> it = this.freeSlots.entrySet().iterator();
        while (it.hasNext()) {
            TaskManagerSlot value = it.next().getValue();
            Preconditions.checkState(value.isFree());
            if (value.getResourceProfile().isMatching(resourceProfile)) {
                it.remove();
                return value;
            }
        }
        return null;
    }

    private void registerSlot(SlotID slotID, AllocationID allocationID, ResourceProfile resourceProfile, TaskExecutorConnection taskExecutorConnection) {
        if (this.slots.containsKey(slotID)) {
            removeSlot(slotID);
        }
        TaskManagerSlot taskManagerSlot = new TaskManagerSlot(slotID, resourceProfile, taskExecutorConnection, allocationID);
        this.slots.put(slotID, taskManagerSlot);
        if (taskManagerSlot.isFree()) {
            handleFreeSlot(taskManagerSlot);
        }
        if (taskManagerSlot.isAllocated()) {
            this.fulfilledSlotRequests.put(taskManagerSlot.getAllocationId(), slotID);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean updateSlot(SlotID slotID, AllocationID allocationID) {
        TaskManagerSlot taskManagerSlot = this.slots.get(slotID);
        if (null == taskManagerSlot) {
            LOG.debug("Trying to update unknown slot with slot id {}.", slotID);
            return false;
        }
        taskManagerSlot.setAllocationId(allocationID);
        if (null == allocationID) {
            return true;
        }
        if (taskManagerSlot.hasPendingSlotRequest()) {
            PendingSlotRequest assignedSlotRequest = taskManagerSlot.getAssignedSlotRequest();
            if (Objects.equals(assignedSlotRequest.getAllocationId(), allocationID)) {
                cancelPendingSlotRequest(assignedSlotRequest);
                this.pendingSlotRequests.remove(assignedSlotRequest.getAllocationId());
            } else {
                rejectPendingSlotRequest(assignedSlotRequest, new Exception("Task manager reported slot " + slotID + " being already allocated."));
            }
            taskManagerSlot.setAssignedSlotRequest(null);
        }
        this.fulfilledSlotRequests.put(allocationID, slotID);
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
        if (null == taskManagerRegistration) {
            return true;
        }
        taskManagerRegistration.markUsed();
        return true;
    }

    private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
        TaskManagerSlot findMatchingSlot = findMatchingSlot(pendingSlotRequest.getResourceProfile());
        if (findMatchingSlot != null) {
            allocateSlot(findMatchingSlot, pendingSlotRequest);
        } else {
            this.resourceManagerActions.allocateResource(pendingSlotRequest.getResourceProfile());
        }
    }

    private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
        TaskExecutorGateway taskExecutorGateway = taskManagerSlot.getTaskManagerConnection().getTaskExecutorGateway();
        final FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        final AllocationID allocationId = pendingSlotRequest.getAllocationId();
        final SlotID slotId = taskManagerSlot.getSlotId();
        taskManagerSlot.setAssignedSlotRequest(pendingSlotRequest);
        pendingSlotRequest.setRequestFuture(flinkCompletableFuture);
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
        if (taskManagerRegistration == null) {
            throw new IllegalStateException("Could not find a registered task manager for instance id " + taskManagerSlot.getInstanceId() + '.');
        }
        taskManagerRegistration.markUsed();
        taskExecutorGateway.requestSlot(slotId, pendingSlotRequest.getJobId(), allocationId, pendingSlotRequest.getTargetAddress(), this.leaderId, this.taskManagerRequestTimeout).handle(new BiFunction<Acknowledge, Throwable, Void>() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.3
            @Override // org.apache.flink.runtime.concurrent.BiFunction
            public Void apply(Acknowledge acknowledge, Throwable th) {
                if (acknowledge != null) {
                    flinkCompletableFuture.complete(acknowledge);
                    return null;
                }
                flinkCompletableFuture.completeExceptionally(th);
                return null;
            }
        });
        flinkCompletableFuture.handleAsync(new BiFunction<Acknowledge, Throwable, Void>() { // from class: org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager.4
            @Override // org.apache.flink.runtime.concurrent.BiFunction
            public Void apply(Acknowledge acknowledge, Throwable th) {
                if (acknowledge != null) {
                    SlotManager.this.updateSlot(slotId, allocationId);
                    return null;
                }
                if (th instanceof SlotOccupiedException) {
                    SlotManager.this.updateSlot(slotId, ((SlotOccupiedException) th).getAllocationId());
                } else {
                    SlotManager.this.removeSlotRequestFromSlot(slotId, allocationId);
                }
                if (th instanceof CancellationException) {
                    SlotManager.LOG.debug("Slot allocation request {} has been cancelled.", allocationId, th);
                    return null;
                }
                SlotManager.this.handleFailedSlotRequest(slotId, allocationId, th);
                return null;
            }
        }, this.mainThreadExecutor);
    }

    private void handleFreeSlot(TaskManagerSlot taskManagerSlot) {
        PendingSlotRequest findMatchingRequest = findMatchingRequest(taskManagerSlot.getResourceProfile());
        if (null != findMatchingRequest) {
            allocateSlot(taskManagerSlot, findMatchingRequest);
        } else {
            this.freeSlots.put(taskManagerSlot.getSlotId(), taskManagerSlot);
        }
    }

    private void removeSlots(Iterable<SlotID> iterable) {
        Iterator<SlotID> it = iterable.iterator();
        while (it.hasNext()) {
            removeSlot(it.next());
        }
    }

    private void removeSlot(SlotID slotID) {
        TaskManagerSlot remove = this.slots.remove(slotID);
        if (null == remove) {
            LOG.debug("There was no slot registered with slot id {}.", slotID);
            return;
        }
        this.freeSlots.remove(slotID);
        if (remove.hasPendingSlotRequest()) {
            rejectPendingSlotRequest(remove.getAssignedSlotRequest(), new Exception("The assigned slot " + remove.getSlotId() + " was removed."));
        }
        this.fulfilledSlotRequests.remove(remove.getAllocationId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void removeSlotRequestFromSlot(SlotID slotID, AllocationID allocationID) {
        TaskManagerSlot taskManagerSlot = this.slots.get(slotID);
        if (null == taskManagerSlot) {
            LOG.debug("There was no slot with {} registered. Probably this slot has been already freed.", slotID);
            return;
        }
        if (taskManagerSlot.hasPendingSlotRequest() && Objects.equals(allocationID, taskManagerSlot.getAssignedSlotRequest().getAllocationId())) {
            taskManagerSlot.setAssignedSlotRequest(null);
        }
        if (taskManagerSlot.isFree()) {
            handleFreeSlot(taskManagerSlot);
        }
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(taskManagerSlot.getInstanceId());
        if (null == taskManagerRegistration || anySlotUsed(taskManagerRegistration.getSlots())) {
            return;
        }
        taskManagerRegistration.markIdle();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleFailedSlotRequest(SlotID slotID, AllocationID allocationID, Throwable th) {
        PendingSlotRequest pendingSlotRequest = this.pendingSlotRequests.get(allocationID);
        LOG.debug("Slot request with allocation id {} failed for slot {}.", new Object[]{allocationID, slotID, th});
        if (null == pendingSlotRequest) {
            LOG.debug("There was not pending slot request with allocation id {}. Probably the request has been fulfilled or cancelled.", allocationID);
            return;
        }
        pendingSlotRequest.setRequestFuture(null);
        try {
            internalRequestSlot(pendingSlotRequest);
        } catch (ResourceManagerException e) {
            this.pendingSlotRequests.remove(allocationID);
            this.resourceManagerActions.notifyAllocationFailure(pendingSlotRequest.getJobId(), allocationID, e);
        }
    }

    private void rejectPendingSlotRequest(PendingSlotRequest pendingSlotRequest, Exception exc) {
        CompletableFuture<Acknowledge> requestFuture = pendingSlotRequest.getRequestFuture();
        if (null != requestFuture) {
            requestFuture.completeExceptionally(new SlotAllocationException(exc));
        } else {
            LOG.debug("Cannot reject pending slot request {}, since no request has been sent.", pendingSlotRequest.getAllocationId());
        }
    }

    private void cancelPendingSlotRequest(PendingSlotRequest pendingSlotRequest) {
        CompletableFuture<Acknowledge> requestFuture = pendingSlotRequest.getRequestFuture();
        if (null != requestFuture) {
            requestFuture.cancel(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkTaskManagerTimeouts() {
        if (this.taskManagerRegistrations.isEmpty()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<InstanceID, TaskManagerRegistration>> it = this.taskManagerRegistrations.entrySet().iterator();
        while (it.hasNext()) {
            TaskManagerRegistration value = it.next().getValue();
            if (anySlotUsed(value.getSlots())) {
                value.markUsed();
            } else if (currentTimeMillis - value.getIdleSince() >= this.taskManagerTimeout.toMilliseconds()) {
                it.remove();
                internalUnregisterTaskManager(value);
                this.resourceManagerActions.releaseResource(value.getInstanceId());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkSlotRequestTimeouts() {
        if (this.pendingSlotRequests.isEmpty()) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        Iterator<Map.Entry<AllocationID, PendingSlotRequest>> it = this.pendingSlotRequests.entrySet().iterator();
        while (it.hasNext()) {
            PendingSlotRequest value = it.next().getValue();
            if (currentTimeMillis - value.getCreationTimestamp() >= this.slotRequestTimeout.toMilliseconds()) {
                it.remove();
                if (value.isAssigned()) {
                    cancelPendingSlotRequest(value);
                }
                this.resourceManagerActions.notifyAllocationFailure(value.getJobId(), value.getAllocationId(), new TimeoutException("The allocation could not be fulfilled in time."));
            }
        }
    }

    private void internalUnregisterTaskManager(TaskManagerRegistration taskManagerRegistration) {
        Preconditions.checkNotNull(taskManagerRegistration);
        removeSlots(taskManagerRegistration.getSlots());
    }

    private boolean checkDuplicateRequest(AllocationID allocationID) {
        return this.pendingSlotRequests.containsKey(allocationID) || this.fulfilledSlotRequests.containsKey(allocationID);
    }

    private boolean anySlotUsed(Iterable<SlotID> iterable) {
        if (null == iterable) {
            return false;
        }
        Iterator<SlotID> it = iterable.iterator();
        while (it.hasNext()) {
            TaskManagerSlot taskManagerSlot = this.slots.get(it.next());
            if (null != taskManagerSlot && taskManagerSlot.isAllocated()) {
                return true;
            }
        }
        return false;
    }

    private void checkInit() {
        Preconditions.checkState(this.started, "The slot manager has not been started.");
    }

    @VisibleForTesting
    TaskManagerSlot getSlot(SlotID slotID) {
        return this.slots.get(slotID);
    }

    @VisibleForTesting
    int getNumberRegisteredSlots() {
        return this.slots.size();
    }

    @VisibleForTesting
    PendingSlotRequest getSlotRequest(AllocationID allocationID) {
        return this.pendingSlotRequests.get(allocationID);
    }

    @VisibleForTesting
    boolean isTaskManagerIdle(InstanceID instanceID) {
        TaskManagerRegistration taskManagerRegistration = this.taskManagerRegistrations.get(instanceID);
        if (null != taskManagerRegistration) {
            return taskManagerRegistration.isIdle();
        }
        return false;
    }
}
