package org.apache.flink.runtime.dispatcher.runner;

import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.dispatcher.DispatcherBootstrapFactory;
import org.apache.flink.runtime.dispatcher.DispatcherFactory;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherServices;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServicesWithJobGraphStore;
import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
import org.apache.flink.runtime.dispatcher.SingleJobJobGraphStore;
import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.VoidHistoryServerArchivist;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase.class */
public class DefaultDispatcherRunnerITCase extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultDispatcherRunnerITCase.class);
    private static final Time TIMEOUT = Time.seconds(10);

    @ClassRule
    public static TestingRpcServiceResource rpcServiceResource = new TestingRpcServiceResource();

    @ClassRule
    public static BlobServerResource blobServerResource = new BlobServerResource();
    private JobGraph jobGraph;
    private TestingLeaderElectionService dispatcherLeaderElectionService;
    private TestingFatalErrorHandler fatalErrorHandler;
    private JobGraphStore jobGraphStore;
    private PartialDispatcherServices partialDispatcherServices;
    private DefaultDispatcherRunnerFactory dispatcherRunnerFactory;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/runner/DefaultDispatcherRunnerITCase$TestingDispatcherFactory.class */
    private static class TestingDispatcherFactory implements DispatcherFactory {
        private final JobManagerRunnerFactory jobManagerRunnerFactory;

        private TestingDispatcherFactory(JobManagerRunnerFactory jobManagerRunnerFactory) {
            this.jobManagerRunnerFactory = jobManagerRunnerFactory;
        }

        public Dispatcher createDispatcher(RpcService rpcService, DispatcherId dispatcherId, Collection<JobGraph> collection, DispatcherBootstrapFactory dispatcherBootstrapFactory, PartialDispatcherServicesWithJobGraphStore partialDispatcherServicesWithJobGraphStore) throws Exception {
            return new StandaloneDispatcher(rpcService, dispatcherId, collection, dispatcherBootstrapFactory, DispatcherServices.from(partialDispatcherServicesWithJobGraphStore, this.jobManagerRunnerFactory));
        }
    }

    @Before
    public void setup() {
        this.dispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner(SessionDispatcherFactory.INSTANCE);
        this.jobGraph = createJobGraph();
        this.dispatcherLeaderElectionService = new TestingLeaderElectionService();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.jobGraphStore = TestingJobGraphStore.newBuilder().build();
        this.partialDispatcherServices = new PartialDispatcherServices(new Configuration(), new TestingHighAvailabilityServicesBuilder().build(), CompletableFuture::new, blobServerResource.getBlobServer(), new TestingHeartbeatServices(), UnregisteredMetricGroups::createUnregisteredJobManagerMetricGroup, new MemoryExecutionGraphInfoStore(), this.fatalErrorHandler, VoidHistoryServerArchivist.INSTANCE, (String) null, ForkJoinPool.commonPool());
    }

    @After
    public void teardown() throws Exception {
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @Test
    public void leaderChange_afterJobSubmission_recoversSubmittedJob() throws Exception {
        DispatcherRunner createDispatcherRunner = createDispatcherRunner();
        Throwable th = null;
        try {
            electLeaderAndRetrieveGateway(UUID.randomUUID()).submitJob(this.jobGraph, TIMEOUT).get();
            this.dispatcherLeaderElectionService.notLeader();
            Assert.assertThat((Collection) electLeaderAndRetrieveGateway(UUID.randomUUID()).listJobs(TIMEOUT).get(), Matchers.contains(new JobID[]{this.jobGraph.getJobID()}));
            if (createDispatcherRunner != null) {
                if (0 == 0) {
                    createDispatcherRunner.close();
                    return;
                }
                try {
                    createDispatcherRunner.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createDispatcherRunner != null) {
                if (0 != 0) {
                    try {
                        createDispatcherRunner.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createDispatcherRunner.close();
                }
            }
            throw th3;
        }
    }

    private DispatcherGateway electLeaderAndRetrieveGateway(UUID uuid) throws InterruptedException, ExecutionException {
        this.dispatcherLeaderElectionService.isLeader(uuid);
        LeaderConnectionInfo leaderConnectionInfo = this.dispatcherLeaderElectionService.getConfirmationFuture().get();
        return (DispatcherGateway) rpcServiceResource.getTestingRpcService().connect(leaderConnectionInfo.getAddress(), DispatcherId.fromUuid(leaderConnectionInfo.getLeaderSessionId()), DispatcherGateway.class).get();
    }

    @Test
    public void leaderChange_withBlockingJobManagerTermination_doesNotAffectNewLeader() throws Exception {
        TestingJobManagerRunnerFactory testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory(1);
        this.dispatcherRunnerFactory = DefaultDispatcherRunnerFactory.createSessionRunner(new TestingDispatcherFactory(testingJobManagerRunnerFactory));
        this.jobGraphStore = new SingleJobJobGraphStore(this.jobGraph);
        DispatcherRunner createDispatcherRunner = createDispatcherRunner();
        Throwable th = null;
        try {
            try {
                this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
                TestingJobManagerRunner takeCreatedJobManagerRunner = testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
                this.dispatcherLeaderElectionService.notLeader();
                LOG.info("Re-grant leadership first time.");
                this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID());
                Thread.sleep(1L);
                this.dispatcherLeaderElectionService.notLeader();
                LOG.info("Re-grant leadership second time.");
                UUID randomUUID = UUID.randomUUID();
                CompletableFuture<UUID> isLeader = this.dispatcherLeaderElectionService.isLeader(randomUUID);
                Assert.assertThat(Boolean.valueOf(isLeader.isDone()), Matchers.is(false));
                LOG.info("Complete the termination of the first job manager runner.");
                takeCreatedJobManagerRunner.completeTerminationFuture();
                Assert.assertThat(isLeader.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.is(Matchers.equalTo(randomUUID)));
                Assert.assertEquals(this.jobGraph.getJobID(), Iterables.getOnlyElement((Iterable) ((DispatcherGateway) rpcServiceResource.getTestingRpcService().connect(this.dispatcherLeaderElectionService.getAddress(), DispatcherId.fromUuid(randomUUID), DispatcherGateway.class).get()).listJobs(TIMEOUT).get()));
                if (createDispatcherRunner != null) {
                    if (0 == 0) {
                        createDispatcherRunner.close();
                        return;
                    }
                    try {
                        createDispatcherRunner.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createDispatcherRunner != null) {
                if (th != null) {
                    try {
                        createDispatcherRunner.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createDispatcherRunner.close();
                }
            }
            throw th4;
        }
    }

    private static JobGraph createJobGraph() {
        return JobGraphTestUtils.singleNoOpJobGraph();
    }

    private DispatcherRunner createDispatcherRunner() throws Exception {
        return this.dispatcherRunnerFactory.createDispatcherRunner(this.dispatcherLeaderElectionService, this.fatalErrorHandler, () -> {
            return this.jobGraphStore;
        }, TestingUtils.defaultExecutor(), rpcServiceResource.getTestingRpcService(), this.partialDispatcherServices);
    }
}
