package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailureHandlingResult;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder;
import org.apache.flink.runtime.scheduler.exceptionhistory.FailureHandlingResultSnapshot;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultScheduler.class */
public class DefaultScheduler extends SchedulerBase implements SchedulerOperations {
    private final Logger log;
    private final ClassLoader userCodeLoader;
    private final ExecutionSlotAllocator executionSlotAllocator;
    private final ExecutionFailureHandler executionFailureHandler;
    private final ScheduledExecutor delayExecutor;
    private final SchedulingStrategy schedulingStrategy;
    private final ExecutionVertexOperations executionVertexOperations;
    private final Set<ExecutionVertexID> verticesWaitingForRestart;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultScheduler$DefaultExecutionSlotAllocationContext.class */
    private class DefaultExecutionSlotAllocationContext implements ExecutionSlotAllocationContext {
        private DefaultExecutionSlotAllocationContext() {
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public ResourceProfile getResourceProfile(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexID).getResourceProfile();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public AllocationID getPriorAllocationId(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.getExecutionVertex(executionVertexID).getLatestPriorAllocation();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public SchedulingTopology getSchedulingTopology() {
            return DefaultScheduler.this.getSchedulingTopology();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<SlotSharingGroup> getLogicalSlotSharingGroups() {
            return DefaultScheduler.this.getJobGraph().getSlotSharingGroups();
        }

        @Override // org.apache.flink.runtime.scheduler.ExecutionSlotAllocationContext
        public Set<CoLocationGroup> getCoLocationGroups() {
            return DefaultScheduler.this.getJobGraph().getCoLocationGroups();
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Collection<Collection<ExecutionVertexID>> getConsumedResultPartitionsProducers(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.InputsLocationsRetriever
        public Optional<CompletableFuture<TaskManagerLocation>> getTaskManagerLocation(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.inputsLocationsRetriever.getTaskManagerLocation(executionVertexID);
        }

        @Override // org.apache.flink.runtime.scheduler.StateLocationRetriever
        public Optional<TaskManagerLocation> getStateLocation(ExecutionVertexID executionVertexID) {
            return DefaultScheduler.this.stateLocationRetriever.getStateLocation(executionVertexID);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DefaultScheduler(Logger logger, JobGraph jobGraph, Executor executor, Configuration configuration, Consumer<ComponentMainThreadExecutor> consumer, ScheduledExecutor scheduledExecutor, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, SchedulingStrategyFactory schedulingStrategyFactory, FailoverStrategy.Factory factory, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ExecutionVertexOperations executionVertexOperations, ExecutionVertexVersioner executionVertexVersioner, ExecutionSlotAllocatorFactory executionSlotAllocatorFactory, long j, ComponentMainThreadExecutor componentMainThreadExecutor, JobStatusListener jobStatusListener, ExecutionGraphFactory executionGraphFactory) throws Exception {
        super(logger, jobGraph, executor, configuration, classLoader, checkpointRecoveryFactory, jobManagerJobMetricGroup, executionVertexVersioner, j, componentMainThreadExecutor, jobStatusListener, executionGraphFactory);
        this.log = logger;
        this.delayExecutor = (ScheduledExecutor) Preconditions.checkNotNull(scheduledExecutor);
        this.userCodeLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.executionVertexOperations = (ExecutionVertexOperations) Preconditions.checkNotNull(executionVertexOperations);
        FailoverStrategy create = factory.create(getSchedulingTopology(), getResultPartitionAvailabilityChecker());
        logger.info("Using failover strategy {} for {} ({}).", new Object[]{create, jobGraph.getName(), jobGraph.getJobID()});
        this.executionFailureHandler = new ExecutionFailureHandler(getSchedulingTopology(), create, restartBackoffTimeStrategy);
        this.schedulingStrategy = schedulingStrategyFactory.createInstance(this, getSchedulingTopology());
        this.executionSlotAllocator = ((ExecutionSlotAllocatorFactory) Preconditions.checkNotNull(executionSlotAllocatorFactory)).createInstance(new DefaultExecutionSlotAllocationContext());
        this.verticesWaitingForRestart = new HashSet();
        consumer.accept(componentMainThreadExecutor);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected long getNumberOfRestarts() {
        return this.executionFailureHandler.getNumberOfRestarts();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected void cancelAllPendingSlotRequestsInternal() {
        Stream map = IterableUtils.toStream(getSchedulingTopology().getVertices()).map((v0) -> {
            return v0.getId();
        });
        ExecutionSlotAllocator executionSlotAllocator = this.executionSlotAllocator;
        executionSlotAllocator.getClass();
        map.forEach(executionSlotAllocator::cancel);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected void startSchedulingInternal() {
        this.log.info("Starting scheduling with scheduling strategy [{}]", this.schedulingStrategy.getClass().getName());
        transitionToRunning();
        this.schedulingStrategy.startScheduling();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected void updateTaskExecutionStateInternal(ExecutionVertexID executionVertexID, TaskExecutionStateTransition taskExecutionStateTransition) {
        this.schedulingStrategy.onExecutionStateChange(executionVertexID, taskExecutionStateTransition.getExecutionState());
        maybeHandleTaskFailure(taskExecutionStateTransition, executionVertexID);
    }

    private void maybeHandleTaskFailure(TaskExecutionStateTransition taskExecutionStateTransition, ExecutionVertexID executionVertexID) {
        if (taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED) {
            handleTaskFailure(executionVertexID, taskExecutionStateTransition.getError(this.userCodeLoader));
        }
    }

    private void handleTaskFailure(ExecutionVertexID executionVertexID, @Nullable Throwable th) {
        long currentTimeMillis = System.currentTimeMillis();
        setGlobalFailureCause(th, currentTimeMillis);
        notifyCoordinatorsAboutTaskFailure(executionVertexID, th);
        maybeRestartTasks(this.executionFailureHandler.getFailureHandlingResult(executionVertexID, th, currentTimeMillis));
    }

    private void notifyCoordinatorsAboutTaskFailure(ExecutionVertexID executionVertexID, @Nullable Throwable th) {
        ExecutionJobVertex executionJobVertex = getExecutionJobVertex(executionVertexID.getJobVertexId());
        int subtaskIndex = executionVertexID.getSubtaskIndex();
        executionJobVertex.getOperatorCoordinators().forEach(operatorCoordinatorHolder -> {
            operatorCoordinatorHolder.subtaskFailed(subtaskIndex, th);
        });
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void handleGlobalFailure(Throwable th) {
        long currentTimeMillis = System.currentTimeMillis();
        setGlobalFailureCause(th, currentTimeMillis);
        this.log.info("Trying to recover from a global failure.", th);
        maybeRestartTasks(this.executionFailureHandler.getGlobalFailureHandlingResult(th, currentTimeMillis));
    }

    private void maybeRestartTasks(FailureHandlingResult failureHandlingResult) {
        if (failureHandlingResult.canRestart()) {
            restartTasksWithDelay(failureHandlingResult);
        } else {
            failJob(failureHandlingResult.getError(), failureHandlingResult.getTimestamp());
        }
    }

    private void restartTasksWithDelay(FailureHandlingResult failureHandlingResult) {
        Set<ExecutionVertexID> verticesToRestart = failureHandlingResult.getVerticesToRestart();
        HashSet hashSet = new HashSet(this.executionVertexVersioner.recordVertexModifications(verticesToRestart).values());
        boolean isGlobalFailure = failureHandlingResult.isGlobalFailure();
        addVerticesToRestartPending(verticesToRestart);
        CompletableFuture<?> cancelTasksAsync = cancelTasksAsync(verticesToRestart);
        FailureHandlingResultSnapshot create = FailureHandlingResultSnapshot.create(failureHandlingResult, executionVertexID -> {
            return getExecutionVertex(executionVertexID).getCurrentExecutionAttempt();
        });
        this.delayExecutor.schedule(() -> {
            FutureUtils.assertNoException(cancelTasksAsync.thenRunAsync(() -> {
                archiveFromFailureHandlingResult(create);
                restartTasks(hashSet, isGlobalFailure);
            }, (Executor) getMainThreadExecutor()));
        }, failureHandlingResult.getRestartDelayMS(), TimeUnit.MILLISECONDS);
    }

    private void addVerticesToRestartPending(Set<ExecutionVertexID> set) {
        this.verticesWaitingForRestart.addAll(set);
        transitionExecutionGraphState(JobStatus.RUNNING, JobStatus.RESTARTING);
    }

    private void removeVerticesFromRestartPending(Set<ExecutionVertexID> set) {
        this.verticesWaitingForRestart.removeAll(set);
        if (this.verticesWaitingForRestart.isEmpty()) {
            transitionExecutionGraphState(JobStatus.RESTARTING, JobStatus.RUNNING);
        }
    }

    private void restartTasks(Set<ExecutionVertexVersion> set, boolean z) {
        Set<ExecutionVertexID> unmodifiedExecutionVertices = this.executionVertexVersioner.getUnmodifiedExecutionVertices(set);
        removeVerticesFromRestartPending(unmodifiedExecutionVertices);
        resetForNewExecutions(unmodifiedExecutionVertices);
        try {
            restoreState(unmodifiedExecutionVertices, z);
            this.schedulingStrategy.restartTasks(unmodifiedExecutionVertices);
        } catch (Throwable th) {
            handleGlobalFailure(th);
        }
    }

    private CompletableFuture<?> cancelTasksAsync(Set<ExecutionVertexID> set) {
        return FutureUtils.combineAll((List) set.stream().map(this::cancelExecutionVertex).collect(Collectors.toList()));
    }

    private CompletableFuture<?> cancelExecutionVertex(ExecutionVertexID executionVertexID) {
        ExecutionVertex executionVertex = getExecutionVertex(executionVertexID);
        notifyCoordinatorOfCancellation(executionVertex);
        this.executionSlotAllocator.cancel(executionVertexID);
        return this.executionVertexOperations.cancel(executionVertex);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerBase
    protected void notifyPartitionDataAvailableInternal(IntermediateResultPartitionID intermediateResultPartitionID) {
        this.schedulingStrategy.onPartitionConsumable(intermediateResultPartitionID);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerOperations
    public void allocateSlotsAndDeploy(List<ExecutionVertexDeploymentOption> list) {
        validateDeploymentOptions(list);
        Map<ExecutionVertexID, ExecutionVertexDeploymentOption> groupDeploymentOptionsByVertexId = groupDeploymentOptionsByVertexId(list);
        List<ExecutionVertexID> list2 = (List) list.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toList());
        Map<ExecutionVertexID, ExecutionVertexVersion> recordVertexModifications = this.executionVertexVersioner.recordVertexModifications(list2);
        transitionToScheduled(list2);
        waitForAllSlotsAndDeploy(createDeploymentHandles(recordVertexModifications, groupDeploymentOptionsByVertexId, allocateSlots(list)));
    }

    private void validateDeploymentOptions(Collection<ExecutionVertexDeploymentOption> collection) {
        collection.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).map(this::getExecutionVertex).forEach(executionVertex -> {
            Preconditions.checkState(executionVertex.getExecutionState() == ExecutionState.CREATED, "expected vertex %s to be in CREATED state, was: %s", new Object[]{executionVertex.getID(), executionVertex.getExecutionState()});
        });
    }

    private static Map<ExecutionVertexID, ExecutionVertexDeploymentOption> groupDeploymentOptionsByVertexId(Collection<ExecutionVertexDeploymentOption> collection) {
        return (Map) collection.stream().collect(Collectors.toMap((v0) -> {
            return v0.getExecutionVertexId();
        }, Function.identity()));
    }

    private List<SlotExecutionVertexAssignment> allocateSlots(List<ExecutionVertexDeploymentOption> list) {
        return this.executionSlotAllocator.allocateSlotsFor((List) list.stream().map((v0) -> {
            return v0.getExecutionVertexId();
        }).collect(Collectors.toList()));
    }

    private static List<DeploymentHandle> createDeploymentHandles(Map<ExecutionVertexID, ExecutionVertexVersion> map, Map<ExecutionVertexID, ExecutionVertexDeploymentOption> map2, List<SlotExecutionVertexAssignment> list) {
        return (List) list.stream().map(slotExecutionVertexAssignment -> {
            ExecutionVertexID executionVertexId = slotExecutionVertexAssignment.getExecutionVertexId();
            return new DeploymentHandle((ExecutionVertexVersion) map.get(executionVertexId), (ExecutionVertexDeploymentOption) map2.get(executionVertexId), slotExecutionVertexAssignment);
        }).collect(Collectors.toList());
    }

    private void waitForAllSlotsAndDeploy(List<DeploymentHandle> list) {
        FutureUtils.assertNoException(assignAllResources(list).handle((BiFunction<? super Void, Throwable, ? extends U>) deployAll(list)));
    }

    private CompletableFuture<Void> assignAllResources(List<DeploymentHandle> list) {
        ArrayList arrayList = new ArrayList();
        for (DeploymentHandle deploymentHandle : list) {
            arrayList.add(deploymentHandle.getSlotExecutionVertexAssignment().getLogicalSlotFuture().handle((BiFunction<? super LogicalSlot, Throwable, ? extends U>) assignResourceOrHandleError(deploymentHandle)));
        }
        return FutureUtils.waitForAll(arrayList);
    }

    private BiFunction<Void, Throwable, Void> deployAll(List<DeploymentHandle> list) {
        return (r6, th) -> {
            propagateIfNonNull(th);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                DeploymentHandle deploymentHandle = (DeploymentHandle) it.next();
                CompletableFuture<LogicalSlot> logicalSlotFuture = deploymentHandle.getSlotExecutionVertexAssignment().getLogicalSlotFuture();
                Preconditions.checkState(logicalSlotFuture.isDone());
                FutureUtils.assertNoException(logicalSlotFuture.handle((BiFunction<? super LogicalSlot, Throwable, ? extends U>) deployOrHandleError(deploymentHandle)));
            }
            return null;
        };
    }

    private static void propagateIfNonNull(Throwable th) {
        if (th != null) {
            throw new CompletionException(th);
        }
    }

    private BiFunction<LogicalSlot, Throwable, Void> assignResourceOrHandleError(DeploymentHandle deploymentHandle) {
        ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
        ExecutionVertexID executionVertexId = deploymentHandle.getExecutionVertexId();
        return (logicalSlot, th) -> {
            if (this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                this.log.debug("Refusing to assign slot to execution vertex {} because this deployment was superseded by another deployment", executionVertexId);
                releaseSlotIfPresent(logicalSlot);
                return null;
            }
            if (th != null) {
                handleTaskDeploymentFailure(executionVertexId, maybeWrapWithNoResourceAvailableException(th));
                return null;
            }
            ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);
            executionVertex.getCurrentExecutionAttempt().registerProducedPartitions(logicalSlot.getTaskManagerLocation(), deploymentHandle.getDeploymentOption().notifyPartitionDataAvailable());
            executionVertex.tryAssignResource(logicalSlot);
            return null;
        };
    }

    private void releaseSlotIfPresent(@Nullable LogicalSlot logicalSlot) {
        if (logicalSlot != null) {
            logicalSlot.releaseSlot(null);
        }
    }

    private void handleTaskDeploymentFailure(ExecutionVertexID executionVertexID, Throwable th) {
        this.executionVertexOperations.markFailed(getExecutionVertex(executionVertexID), th);
    }

    private static Throwable maybeWrapWithNoResourceAvailableException(Throwable th) {
        return ExceptionUtils.stripCompletionException(th) instanceof TimeoutException ? new NoResourceAvailableException("Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources.", th) : th;
    }

    private BiFunction<Object, Throwable, Void> deployOrHandleError(DeploymentHandle deploymentHandle) {
        ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();
        ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();
        return (obj, th) -> {
            if (this.executionVertexVersioner.isModified(requiredVertexVersion)) {
                this.log.debug("Refusing to deploy execution vertex {} because this deployment was superseded by another deployment", executionVertexId);
                return null;
            }
            if (th == null) {
                deployTaskSafe(executionVertexId);
                return null;
            }
            handleTaskDeploymentFailure(executionVertexId, th);
            return null;
        };
    }

    private void deployTaskSafe(ExecutionVertexID executionVertexID) {
        try {
            this.executionVertexOperations.deploy(getExecutionVertex(executionVertexID));
        } catch (Throwable th) {
            handleTaskDeploymentFailure(executionVertexID, th);
        }
    }

    private void notifyCoordinatorOfCancellation(ExecutionVertex executionVertex) {
        ExecutionState executionState = executionVertex.getExecutionState();
        if (executionState == ExecutionState.FAILED || executionState == ExecutionState.CANCELING || executionState == ExecutionState.CANCELED) {
            return;
        }
        Iterator<OperatorCoordinatorHolder> it = executionVertex.getJobVertex().getOperatorCoordinators().iterator();
        while (it.hasNext()) {
            it.next().subtaskFailed(executionVertex.getParallelSubtaskIndex(), null);
        }
    }
}
