package org.apache.flink.runtime.minicluster;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.class */
public class MiniClusterJobDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterJobDispatcher.class);
    private final Object lock;
    private final Configuration configuration;
    private final RpcService[] rpcServices;
    private final HighAvailabilityServices haServices;
    private final HeartbeatServices heartbeatServices;
    private final JobManagerServices jobManagerServices;
    private final MetricRegistry metricRegistry;
    private final int numJobManagers;
    private volatile JobManagerRunner[] runners;
    private volatile boolean shutdown;

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher$BlockingJobSync.class */
    private static class BlockingJobSync implements OnCompletionActions, FatalErrorHandler {
        private final JobID jobId;
        private final CountDownLatch jobMastersToWaitFor;
        private volatile Throwable jobException;
        private volatile Throwable runnerException;
        private volatile JobExecutionResult result;

        BlockingJobSync(JobID jobID, int i) {
            this.jobId = jobID;
            this.jobMastersToWaitFor = new CountDownLatch(i);
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFinished(JobExecutionResult jobExecutionResult) {
            this.result = jobExecutionResult;
            this.jobMastersToWaitFor.countDown();
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFailed(Throwable th) {
            this.jobException = th;
            this.jobMastersToWaitFor.countDown();
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFinishedByOther() {
            this.jobMastersToWaitFor.countDown();
        }

        @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
        public void onFatalError(Throwable th) {
            if (this.runnerException == null) {
                this.runnerException = th;
            }
        }

        public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
            this.jobMastersToWaitFor.await();
            Throwable th = this.jobException;
            Throwable th2 = this.runnerException;
            JobExecutionResult jobExecutionResult = this.result;
            if (th != null) {
                if (th instanceof JobExecutionException) {
                    throw ((JobExecutionException) th);
                }
                throw new JobExecutionException(this.jobId, "The job execution failed", th);
            }
            if (jobExecutionResult != null) {
                return jobExecutionResult;
            }
            if (th2 != null) {
                throw new JobExecutionException(this.jobId, "The job execution failed because all JobManagers encountered fatal errors", th2);
            }
            throw new IllegalStateException("Bug: Job finished with neither error nor result.");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher$DetachedFinalizer.class */
    private class DetachedFinalizer implements OnCompletionActions, FatalErrorHandler {
        private final JobID jobID;
        private final AtomicInteger numJobManagersToWaitFor;

        private DetachedFinalizer(JobID jobID, int i) {
            this.jobID = jobID;
            this.numJobManagersToWaitFor = new AtomicInteger(i);
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFinished(JobExecutionResult jobExecutionResult) {
            decrementCheckAndCleanup();
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFailed(Throwable th) {
            decrementCheckAndCleanup();
        }

        @Override // org.apache.flink.runtime.jobmanager.OnCompletionActions
        public void jobFinishedByOther() {
            decrementCheckAndCleanup();
        }

        @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
        public void onFatalError(Throwable th) {
            decrementCheckAndCleanup();
        }

        private void decrementCheckAndCleanup() {
            if (this.numJobManagersToWaitFor.decrementAndGet() == 0) {
                MiniClusterJobDispatcher.this.runners = null;
                MiniClusterJobDispatcher.this.clearJobRunningState(this.jobID);
            }
        }
    }

    public MiniClusterJobDispatcher(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
        this(configuration, highAvailabilityServices, heartbeatServices, metricRegistry, 1, new RpcService[]{rpcService});
    }

    public MiniClusterJobDispatcher(Configuration configuration, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, int i, RpcService[] rpcServiceArr) throws Exception {
        this.lock = new Object();
        Preconditions.checkArgument(i >= 1);
        Preconditions.checkArgument(rpcServiceArr.length == i);
        this.configuration = (Configuration) Preconditions.checkNotNull(configuration);
        this.rpcServices = rpcServiceArr;
        this.haServices = (HighAvailabilityServices) Preconditions.checkNotNull(highAvailabilityServices);
        this.heartbeatServices = (HeartbeatServices) Preconditions.checkNotNull(heartbeatServices);
        this.metricRegistry = (MetricRegistry) Preconditions.checkNotNull(metricRegistry);
        this.numJobManagers = i;
        LOG.info("Creating JobMaster services");
        this.jobManagerServices = JobManagerServices.fromConfiguration(configuration, highAvailabilityServices);
    }

    public void shutdown() {
        synchronized (this.lock) {
            if (!this.shutdown) {
                this.shutdown = true;
                LOG.info("Shutting down the job dispatcher");
                JobManagerRunner[] jobManagerRunnerArr = this.runners;
                if (jobManagerRunnerArr != null) {
                    this.runners = null;
                    for (JobManagerRunner jobManagerRunner : jobManagerRunnerArr) {
                        jobManagerRunner.shutdown();
                    }
                }
            }
        }
    }

    public void runDetached(JobGraph jobGraph) throws JobExecutionException {
        Preconditions.checkNotNull(jobGraph);
        LOG.info("Received job for detached execution: {} ({})", jobGraph.getName(), jobGraph.getJobID());
        synchronized (this.lock) {
            Preconditions.checkState(!this.shutdown, "mini cluster is shut down");
            Preconditions.checkState(this.runners == null, "mini cluster can only execute one job at a time");
            DetachedFinalizer detachedFinalizer = new DetachedFinalizer(jobGraph.getJobID(), this.numJobManagers);
            this.runners = startJobRunners(jobGraph, detachedFinalizer, detachedFinalizer);
        }
    }

    public JobExecutionResult runJobBlocking(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(jobGraph);
        LOG.info("Received job for blocking execution: {} ({})", jobGraph.getName(), jobGraph.getJobID());
        BlockingJobSync blockingJobSync = new BlockingJobSync(jobGraph.getJobID(), this.numJobManagers);
        synchronized (this.lock) {
            Preconditions.checkState(!this.shutdown, "mini cluster is shut down");
            Preconditions.checkState(this.runners == null, "mini cluster can only execute one job at a time");
            this.runners = startJobRunners(jobGraph, blockingJobSync, blockingJobSync);
        }
        try {
            JobExecutionResult result = blockingJobSync.getResult();
            this.runners = null;
            clearJobRunningState(jobGraph.getJobID());
            return result;
        } catch (Throwable th) {
            this.runners = null;
            clearJobRunningState(jobGraph.getJobID());
            throw th;
        }
    }

    private JobManagerRunner[] startJobRunners(JobGraph jobGraph, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler) throws JobExecutionException {
        LOG.info("Starting {} JobMaster(s) for job {} ({})", new Object[]{Integer.valueOf(this.numJobManagers), jobGraph.getName(), jobGraph.getJobID()});
        JobManagerRunner[] jobManagerRunnerArr = new JobManagerRunner[this.numJobManagers];
        for (int i = 0; i < this.numJobManagers; i++) {
            try {
                jobManagerRunnerArr[i] = new JobManagerRunner(ResourceID.generate(), jobGraph, this.configuration, this.rpcServices[i], this.haServices, this.heartbeatServices, this.jobManagerServices, this.metricRegistry, onCompletionActions, fatalErrorHandler);
                jobManagerRunnerArr[i].start();
            } catch (Throwable th) {
                for (int i2 = 0; i2 <= i; i2++) {
                    try {
                        if (jobManagerRunnerArr[i] != null) {
                            jobManagerRunnerArr[i].shutdown();
                        }
                    } catch (Throwable th2) {
                    }
                }
                try {
                    this.haServices.getRunningJobsRegistry().setJobFinished(jobGraph.getJobID());
                } catch (Throwable th3) {
                    LOG.warn("Could not properly unregister job from high-availability services", th3);
                }
                throw new JobExecutionException(jobGraph.getJobID(), "Could not start the JobManager(s) for the job", th);
            }
        }
        return jobManagerRunnerArr;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void clearJobRunningState(JobID jobID) {
        try {
            this.haServices.getRunningJobsRegistry().clearJob(jobID);
        } catch (Throwable th) {
            LOG.warn("Could not clear job {} at the status registry of the high-availability services", jobID, th);
        }
    }
}
