package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterEntryPointExceptionUtils;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.PermanentlyFencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.flink.util.function.ThrowingConsumer;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher.class */
public abstract class Dispatcher extends PermanentlyFencedRpcEndpoint<DispatcherId> implements DispatcherGateway {
    public static final String DISPATCHER_NAME = "dispatcher";
    private final Configuration configuration;
    private final JobGraphWriter jobGraphWriter;
    private final RunningJobsRegistry runningJobsRegistry;
    private final HighAvailabilityServices highAvailabilityServices;
    private final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
    private final JobManagerSharedServices jobManagerSharedServices;
    private final HeartbeatServices heartbeatServices;
    private final BlobServer blobServer;
    private final FatalErrorHandler fatalErrorHandler;
    private final Map<JobID, DispatcherJob> runningJobs;
    private final Collection<JobGraph> recoveredJobs;
    private final DispatcherBootstrapFactory dispatcherBootstrapFactory;
    private final ExecutionGraphInfoStore executionGraphInfoStore;
    private final JobManagerRunnerFactory jobManagerRunnerFactory;
    private final JobManagerMetricGroup jobManagerMetricGroup;
    private final HistoryServerArchivist historyServerArchivist;
    private final Executor ioExecutor;

    @Nullable
    private final String metricServiceQueryAddress;
    private final Map<JobID, CompletableFuture<Void>> dispatcherJobTerminationFutures;
    protected final CompletableFuture<ApplicationStatus> shutDownFuture;
    private DispatcherBootstrap dispatcherBootstrap;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher$CleanupJobState.class */
    public enum CleanupJobState {
        LOCAL(false),
        GLOBAL(true);

        final boolean cleanupHAData;

        CleanupJobState(boolean z) {
            this.cleanupHAData = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/Dispatcher$ExecutionType.class */
    public enum ExecutionType {
        SUBMISSION,
        RECOVERY
    }

    /* JADX WARN: Multi-variable type inference failed */
    public Dispatcher(RpcService rpcService, DispatcherId dispatcherId, Collection<JobGraph> collection, DispatcherBootstrapFactory dispatcherBootstrapFactory, DispatcherServices dispatcherServices) throws Exception {
        super(rpcService, AkkaRpcServiceUtils.createRandomName(DISPATCHER_NAME), dispatcherId);
        Preconditions.checkNotNull(dispatcherServices);
        this.configuration = dispatcherServices.getConfiguration();
        this.highAvailabilityServices = dispatcherServices.getHighAvailabilityServices();
        this.resourceManagerGatewayRetriever = dispatcherServices.getResourceManagerGatewayRetriever();
        this.heartbeatServices = dispatcherServices.getHeartbeatServices();
        this.blobServer = dispatcherServices.getBlobServer();
        this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
        this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
        this.jobManagerMetricGroup = dispatcherServices.getJobManagerMetricGroup();
        this.metricServiceQueryAddress = dispatcherServices.getMetricQueryServiceAddress();
        this.ioExecutor = dispatcherServices.getIoExecutor();
        this.jobManagerSharedServices = JobManagerSharedServices.fromConfiguration(this.configuration, this.blobServer, this.fatalErrorHandler);
        this.runningJobsRegistry = this.highAvailabilityServices.getRunningJobsRegistry();
        this.runningJobs = new HashMap(16);
        this.historyServerArchivist = dispatcherServices.getHistoryServerArchivist();
        this.executionGraphInfoStore = dispatcherServices.getArchivedExecutionGraphStore();
        this.jobManagerRunnerFactory = dispatcherServices.getJobManagerRunnerFactory();
        this.dispatcherJobTerminationFutures = new HashMap(2);
        this.shutDownFuture = new CompletableFuture<>();
        this.dispatcherBootstrapFactory = (DispatcherBootstrapFactory) Preconditions.checkNotNull(dispatcherBootstrapFactory);
        this.recoveredJobs = new HashSet(collection);
    }

    public CompletableFuture<ApplicationStatus> getShutDownFuture() {
        return this.shutDownFuture;
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public void onStart() throws Exception {
        try {
            startDispatcherServices();
            startRecoveredJobs();
            this.dispatcherBootstrap = this.dispatcherBootstrapFactory.create((DispatcherGateway) getSelfGateway(DispatcherGateway.class), getRpcService().getScheduledExecutor(), this::onFatalError);
        } catch (Throwable th) {
            FlinkException dispatcherException = new DispatcherException(String.format("Could not start the Dispatcher %s", getAddress()), th);
            onFatalError(dispatcherException);
            throw dispatcherException;
        }
    }

    private void startDispatcherServices() throws Exception {
        try {
            registerDispatcherMetrics(this.jobManagerMetricGroup);
        } catch (Exception e) {
            handleStartDispatcherServicesException(e);
        }
    }

    private void startRecoveredJobs() {
        Iterator<JobGraph> it = this.recoveredJobs.iterator();
        while (it.hasNext()) {
            runRecoveredJob(it.next());
        }
        this.recoveredJobs.clear();
    }

    private void runRecoveredJob(JobGraph jobGraph) {
        Preconditions.checkNotNull(jobGraph);
        try {
            runJob(jobGraph, ExecutionType.RECOVERY);
        } catch (Throwable th) {
            onFatalError(new DispatcherException(String.format("Could not start recovered job %s.", jobGraph.getJobID()), th));
        }
    }

    private void handleStartDispatcherServicesException(Exception exc) throws Exception {
        try {
            stopDispatcherServices();
        } catch (Exception e) {
            exc.addSuppressed(e);
        }
        throw exc;
    }

    @Override // org.apache.flink.runtime.rpc.RpcEndpoint
    public CompletableFuture<Void> onStop() {
        this.log.info("Stopping dispatcher {}.", getAddress());
        return FutureUtils.runAfterwards(terminateRunningJobsAndGetTerminationFuture(), () -> {
            this.dispatcherBootstrap.stop();
            stopDispatcherServices();
            this.log.info("Stopped dispatcher {}.", getAddress());
        });
    }

    private void stopDispatcherServices() throws Exception {
        Exception exc = null;
        try {
            this.jobManagerSharedServices.shutdown();
        } catch (Exception e) {
            exc = e;
        }
        this.jobManagerMetricGroup.close();
        ExceptionUtils.tryRethrowException(exc);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time time) {
        this.log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
        try {
            return isDuplicateJob(jobGraph.getJobID()) ? FutureUtils.completedExceptionally(new DuplicateJobSubmissionException(jobGraph.getJobID())) : isPartialResourceConfigured(jobGraph) ? FutureUtils.completedExceptionally(new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have resources configured. The limitation will be removed in future versions.")) : internalSubmitJob(jobGraph);
        } catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private boolean isDuplicateJob(JobID jobID) throws FlinkException {
        try {
            return this.runningJobsRegistry.getJobSchedulingStatus(jobID) == RunningJobsRegistry.JobSchedulingStatus.DONE || this.runningJobs.containsKey(jobID);
        } catch (IOException e) {
            throw new FlinkException(String.format("Failed to retrieve job scheduling status for job %s.", jobID), e);
        }
    }

    private boolean isPartialResourceConfigured(JobGraph jobGraph) {
        boolean z = false;
        boolean z2 = false;
        Iterator<JobVertex> it = jobGraph.getVertices().iterator();
        while (it.hasNext()) {
            if (it.next().getMinResources() == ResourceSpec.UNKNOWN) {
                z = true;
            } else {
                z2 = true;
            }
            if (z && z2) {
                return true;
            }
        }
        return false;
    }

    private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        this.log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
        return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob).thenApply(r2 -> {
            return Acknowledge.get();
        }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (acknowledge, th) -> {
            if (th == null) {
                return acknowledge;
            }
            cleanUpJobData(jobGraph.getJobID(), true);
            ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(th);
            Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
            this.log.error("Failed to submit job {}.", jobGraph.getJobID(), stripCompletionException);
            throw new CompletionException((Throwable) new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", stripCompletionException));
        }, this.ioExecutor);
    }

    private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        this.jobGraphWriter.putJobGraph(jobGraph);
        runJob(jobGraph, ExecutionType.SUBMISSION);
    }

    private void runJob(JobGraph jobGraph, ExecutionType executionType) {
        Preconditions.checkState(!this.runningJobs.containsKey(jobGraph.getJobID()));
        long currentTimeMillis = System.currentTimeMillis();
        DispatcherJob createFor = DispatcherJob.createFor(createJobManagerRunner(jobGraph, currentTimeMillis), jobGraph.getJobID(), jobGraph.getName(), currentTimeMillis);
        this.runningJobs.put(jobGraph.getJobID(), createFor);
        JobID jobID = jobGraph.getJobID();
        CompletableFuture<Void> thenCompose = createFor.getResultFuture().handleAsync((dispatcherJobResult, th) -> {
            Preconditions.checkState(this.runningJobs.get(jobID) == createFor, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
            return dispatcherJobResult != null ? handleDispatcherJobResult(jobID, dispatcherJobResult, executionType) : dispatcherJobFailed(jobID, th);
        }, (Executor) getMainThreadExecutor()).thenApply((Function<? super U, ? extends U>) cleanupJobState -> {
            return removeJob(jobID, cleanupJobState);
        }).thenCompose(Function.identity());
        FutureUtils.assertNoException(thenCompose);
        registerDispatcherJobTerminationFuture(jobID, thenCompose);
    }

    private CleanupJobState handleDispatcherJobResult(JobID jobID, DispatcherJobResult dispatcherJobResult, ExecutionType executionType) {
        return (dispatcherJobResult.isInitializationFailure() && executionType == ExecutionType.RECOVERY) ? dispatcherJobFailed(jobID, dispatcherJobResult.getInitializationFailure()) : jobReachedGloballyTerminalState(dispatcherJobResult.getExecutionGraphInfo());
    }

    private CleanupJobState dispatcherJobFailed(JobID jobID, Throwable th) {
        if (th instanceof JobNotFinishedException) {
            jobNotFinished(jobID);
        } else {
            jobMasterFailed(jobID, th);
        }
        return CleanupJobState.LOCAL;
    }

    CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long j) {
        RpcService rpcService = getRpcService();
        return CompletableFuture.supplyAsync(() -> {
            try {
                JobManagerRunner createJobManagerRunner = this.jobManagerRunnerFactory.createJobManagerRunner(jobGraph, this.configuration, rpcService, this.highAvailabilityServices, this.heartbeatServices, this.jobManagerSharedServices, new DefaultJobManagerJobMetricGroupFactory(this.jobManagerMetricGroup), this.fatalErrorHandler, j);
                createJobManagerRunner.start();
                return createJobManagerRunner;
            } catch (Exception e) {
                throw new CompletionException((Throwable) new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
            }
        }, this.ioExecutor);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Collection<JobID>> listJobs(Time time) {
        return CompletableFuture.completedFuture(Collections.unmodifiableSet(new HashSet(this.runningJobs.keySet())));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> disposeSavepoint(String str, Time time) {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        return CompletableFuture.supplyAsync(() -> {
            this.log.info("Disposing savepoint {}.", str);
            try {
                Checkpoints.disposeSavepoint(str, this.configuration, contextClassLoader, this.log);
                return Acknowledge.get();
            } catch (IOException | FlinkException e) {
                throw new CompletionException((Throwable) new FlinkException(String.format("Could not dispose savepoint %s.", str), e));
            }
        }, this.jobManagerSharedServices.getScheduledExecutorService());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> cancelJob(JobID jobID, Time time) {
        return (CompletableFuture) getDispatcherJob(jobID).map(dispatcherJob -> {
            return dispatcherJob.cancel(time);
        }).orElseGet(() -> {
            this.log.debug("Dispatcher is unable to cancel job {}: not found", jobID);
            return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID));
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time time) {
        CompletableFuture runResourceManagerCommand = runResourceManagerCommand(resourceManagerGateway -> {
            return resourceManagerGateway.requestResourceOverview(time);
        });
        CompletableFuture<U> thenApply = FutureUtils.combineAll(queryJobMastersForInformation(dispatcherJob -> {
            return dispatcherJob.requestJobStatus(time);
        })).thenApply(this::flattenOptionalCollection);
        JobsOverview storedJobsOverview = this.executionGraphInfoStore.getStoredJobsOverview();
        return thenApply.thenCombine((CompletionStage) runResourceManagerCommand, (BiFunction<? super U, ? super U, ? extends V>) (collection, resourceOverview) -> {
            return new ClusterOverview(resourceOverview, JobsOverview.create(collection).combine(storedJobsOverview));
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time time) {
        CompletableFuture<U> thenApply = FutureUtils.combineAll(queryJobMastersForInformation(dispatcherJob -> {
            return dispatcherJob.requestJobDetails(time);
        })).thenApply(this::flattenOptionalCollection);
        Collection<JobDetails> availableJobDetails = this.executionGraphInfoStore.getAvailableJobDetails();
        return thenApply.thenApply((Function<? super U, ? extends U>) collection -> {
            ArrayList arrayList = new ArrayList(availableJobDetails.size() + collection.size());
            arrayList.addAll(collection);
            arrayList.addAll(availableJobDetails);
            return new MultipleJobsDetails(arrayList);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<JobStatus> requestJobStatus(JobID jobID, Time time) {
        return (CompletableFuture) getDispatcherJob(jobID).map(dispatcherJob -> {
            return dispatcherJob.requestJobStatus(time);
        }).orElseGet(() -> {
            JobDetails availableJobDetails = this.executionGraphInfoStore.getAvailableJobDetails(jobID);
            return availableJobDetails == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : CompletableFuture.completedFuture(availableJobDetails.getStatus());
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<ExecutionGraphInfo> requestExecutionGraphInfo(JobID jobID, Time time) {
        return ((CompletableFuture) getDispatcherJob(jobID).map(dispatcherJob -> {
            return dispatcherJob.requestJob(time);
        }).orElse(FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)))).exceptionally(th -> {
            ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobID);
            if (executionGraphInfo == null) {
                throw new CompletionException(ExceptionUtils.stripCompletionException(th));
            }
            return executionGraphInfo;
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<JobResult> requestJobResult(JobID jobID, Time time) {
        DispatcherJob dispatcherJob = this.runningJobs.get(jobID);
        if (dispatcherJob != null) {
            return dispatcherJob.getResultFuture().thenApply(dispatcherJobResult -> {
                return JobResult.createFrom(dispatcherJobResult.getExecutionGraphInfo().getArchivedExecutionGraph());
            });
        }
        ExecutionGraphInfo executionGraphInfo = this.executionGraphInfoStore.get(jobID);
        return executionGraphInfo == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : CompletableFuture.completedFuture(JobResult.createFrom(executionGraphInfo.getArchivedExecutionGraph()));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<String>> requestMetricQueryServiceAddresses(Time time) {
        return this.metricServiceQueryAddress != null ? CompletableFuture.completedFuture(Collections.singleton(this.metricServiceQueryAddress)) : CompletableFuture.completedFuture(Collections.emptyList());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServiceAddresses(Time time) {
        return runResourceManagerCommand(resourceManagerGateway -> {
            return resourceManagerGateway.requestTaskManagerMetricQueryServiceAddresses(time);
        });
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Integer> getBlobServerPort(Time time) {
        return CompletableFuture.completedFuture(Integer.valueOf(this.blobServer.getPort()));
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<String> triggerSavepoint(JobID jobID, String str, boolean z, Time time) {
        return performOperationOnJobMasterGateway(jobID, jobMasterGateway -> {
            return jobMasterGateway.triggerSavepoint(str, z, time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<String> stopWithSavepoint(JobID jobID, String str, boolean z, Time time) {
        return performOperationOnJobMasterGateway(jobID, jobMasterGateway -> {
            return jobMasterGateway.stopWithSavepoint(str, z, time);
        });
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<Acknowledge> shutDownCluster() {
        return shutDownCluster(ApplicationStatus.SUCCEEDED);
    }

    @Override // org.apache.flink.runtime.dispatcher.DispatcherGateway
    public CompletableFuture<Acknowledge> shutDownCluster(ApplicationStatus applicationStatus) {
        this.shutDownFuture.complete(applicationStatus);
        return CompletableFuture.completedFuture(Acknowledge.get());
    }

    @Override // org.apache.flink.runtime.webmonitor.RestfulGateway
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(JobID jobID, OperatorID operatorID, SerializedValue<CoordinationRequest> serializedValue, Time time) {
        return performOperationOnJobMasterGateway(jobID, jobMasterGateway -> {
            return jobMasterGateway.deliverCoordinationRequestToCoordinator(operatorID, serializedValue, time);
        });
    }

    private void registerDispatcherJobTerminationFuture(JobID jobID, CompletableFuture<Void> completableFuture) {
        Preconditions.checkState(!this.dispatcherJobTerminationFutures.containsKey(jobID));
        this.dispatcherJobTerminationFutures.put(jobID, completableFuture);
        completableFuture.thenRunAsync(() -> {
            CompletableFuture<Void> remove = this.dispatcherJobTerminationFutures.remove(jobID);
            if (remove == null || remove == completableFuture) {
                return;
            }
            this.dispatcherJobTerminationFutures.put(jobID, remove);
        }, (Executor) getMainThreadExecutor());
    }

    private CompletableFuture<Void> removeJob(JobID jobID, CleanupJobState cleanupJobState) {
        return ((DispatcherJob) Preconditions.checkNotNull(this.runningJobs.remove(jobID))).closeAsync().thenRunAsync(() -> {
            cleanUpJobData(jobID, cleanupJobState.cleanupHAData);
        }, this.ioExecutor);
    }

    private void cleanUpJobData(JobID jobID, boolean z) {
        this.jobManagerMetricGroup.removeJob(jobID);
        boolean z2 = false;
        if (z) {
            try {
                this.jobGraphWriter.removeJobGraph(jobID);
                z2 = true;
            } catch (Exception e) {
                this.log.warn("Could not properly remove job {} from submitted job graph store.", jobID, e);
            }
            try {
                this.runningJobsRegistry.clearJob(jobID);
            } catch (IOException e2) {
                this.log.warn("Could not properly remove job {} from the running jobs registry.", jobID, e2);
            }
        } else {
            try {
                this.jobGraphWriter.releaseJobGraph(jobID);
            } catch (Exception e3) {
                this.log.warn("Could not properly release job {} from submitted job graph store.", jobID, e3);
            }
        }
        this.blobServer.cleanupJob(jobID, z2);
    }

    private void terminateRunningJobs() {
        this.log.info("Stopping all currently running jobs of dispatcher {}.", getAddress());
        Iterator it = new HashSet(this.runningJobs.keySet()).iterator();
        while (it.hasNext()) {
            terminateJob((JobID) it.next());
        }
    }

    private void terminateJob(JobID jobID) {
        DispatcherJob dispatcherJob = this.runningJobs.get(jobID);
        if (dispatcherJob != null) {
            dispatcherJob.closeAsync();
        }
    }

    private CompletableFuture<Void> terminateRunningJobsAndGetTerminationFuture() {
        terminateRunningJobs();
        return FutureUtils.completeAll(this.dispatcherJobTerminationFutures.values());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onFatalError(Throwable th) {
        this.fatalErrorHandler.onFatalError(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CleanupJobState jobReachedGloballyTerminalState(ExecutionGraphInfo executionGraphInfo) {
        ArchivedExecutionGraph archivedExecutionGraph = executionGraphInfo.getArchivedExecutionGraph();
        Preconditions.checkArgument(archivedExecutionGraph.getState().isGloballyTerminalState(), "Job %s is in state %s which is not globally terminal.", new Object[]{archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState()});
        this.log.info("Job {} reached globally terminal state {}.", archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState());
        archiveExecutionGraph(executionGraphInfo);
        return CleanupJobState.GLOBAL;
    }

    private void archiveExecutionGraph(ExecutionGraphInfo executionGraphInfo) {
        try {
            this.executionGraphInfoStore.put(executionGraphInfo);
        } catch (IOException e) {
            this.log.info("Could not store completed job {}({}).", new Object[]{executionGraphInfo.getArchivedExecutionGraph().getJobName(), executionGraphInfo.getArchivedExecutionGraph().getJobID(), e});
        }
        this.historyServerArchivist.archiveExecutionGraph(executionGraphInfo).whenComplete((acknowledge, th) -> {
            if (th != null) {
                this.log.info("Could not archive completed job {}({}) to the history server.", new Object[]{executionGraphInfo.getArchivedExecutionGraph().getJobName(), executionGraphInfo.getArchivedExecutionGraph().getJobID(), th});
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void jobNotFinished(JobID jobID) {
        this.log.info("Job {} was not finished by JobManager.", jobID);
    }

    private void jobMasterFailed(JobID jobID, Throwable th) {
        onFatalError(new FlinkException(String.format("JobMaster for job %s failed.", jobID), th));
    }

    private CompletableFuture<JobMasterGateway> getJobMasterGateway(JobID jobID) {
        DispatcherJob dispatcherJob = this.runningJobs.get(jobID);
        return dispatcherJob == null ? FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobID)) : !dispatcherJob.isInitialized() ? FutureUtils.completedExceptionally(new UnavailableDispatcherOperationException("Unable to get JobMasterGateway for initializing job. The requested operation is not available while the JobManager is initializing.")) : dispatcherJob.getJobMasterGateway();
    }

    private <T> CompletableFuture<T> performOperationOnJobMasterGateway(JobID jobID, Function<JobMasterGateway, CompletableFuture<T>> function) {
        return (CompletableFuture<T>) getJobMasterGateway(jobID).thenCompose((Function<? super JobMasterGateway, ? extends CompletionStage<U>>) function);
    }

    private CompletableFuture<ResourceManagerGateway> getResourceManagerGateway() {
        return this.resourceManagerGatewayRetriever.getFuture();
    }

    private Optional<DispatcherJob> getDispatcherJob(JobID jobID) {
        return Optional.ofNullable(this.runningJobs.get(jobID));
    }

    private <T> CompletableFuture<T> runResourceManagerCommand(Function<ResourceManagerGateway, CompletableFuture<T>> function) {
        return getResourceManagerGateway().thenApply((Function<? super ResourceManagerGateway, ? extends U>) function).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    private <T> List<T> flattenOptionalCollection(Collection<Optional<T>> collection) {
        return (List) collection.stream().filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    @Nonnull
    private <T> List<CompletableFuture<Optional<T>>> queryJobMastersForInformation(Function<DispatcherJob, CompletableFuture<T>> function) {
        ArrayList arrayList = new ArrayList(this.runningJobs.size());
        Iterator<DispatcherJob> it = this.runningJobs.values().iterator();
        while (it.hasNext()) {
            arrayList.add(function.apply(it.next()).handle((BiFunction) (obj, th) -> {
                return Optional.ofNullable(obj);
            }));
        }
        return arrayList;
    }

    private CompletableFuture<Void> waitForTerminatingJob(JobID jobID, JobGraph jobGraph, ThrowingConsumer<JobGraph, ?> throwingConsumer) {
        return getJobTerminationFuture(jobID).exceptionally(th -> {
            throw new CompletionException((Throwable) new DispatcherException(String.format("Termination of previous JobManager for job %s failed. Cannot submit job under the same job id.", jobID), th));
        }).thenAcceptAsync(FunctionUtils.uncheckedConsumer(r7 -> {
            this.dispatcherJobTerminationFutures.remove(jobID);
            throwingConsumer.accept(jobGraph);
        }), (Executor) getMainThreadExecutor());
    }

    CompletableFuture<Void> getJobTerminationFuture(JobID jobID) {
        return this.runningJobs.containsKey(jobID) ? FutureUtils.completedExceptionally(new DispatcherException(String.format("Job with job id %s is still running.", jobID))) : this.dispatcherJobTerminationFutures.getOrDefault(jobID, CompletableFuture.completedFuture(null));
    }

    private void registerDispatcherMetrics(MetricGroup metricGroup) {
        metricGroup.gauge(MetricNames.NUM_RUNNING_JOBS, () -> {
            return Long.valueOf(this.runningJobs.size());
        });
    }

    public CompletableFuture<Void> onRemovedJobGraph(JobID jobID) {
        return CompletableFuture.runAsync(() -> {
            terminateJob(jobID);
        }, getMainThreadExecutor());
    }
}
