package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingConsumer;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerTest.class */
public class ResourceManagerTest extends TestLogger {
    private static final Time TIMEOUT = Time.minutes(2);
    private static final HeartbeatServices heartbeatServices = new HeartbeatServices(1000, 10000);
    private static final HeartbeatServices fastHeartbeatServices = new HeartbeatServices(1, 1);
    private static final HardwareDescription hardwareDescription = new HardwareDescription(42, 1337, 1337, 0);
    private static final int dataPort = 1234;
    private static final int jmxPort = 23456;
    private static TestingRpcService rpcService;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingLeaderElectionService resourceManagerLeaderElectionService;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceID resourceManagerResourceId;
    private TestingResourceManager resourceManager;
    private ResourceManagerId resourceManagerId;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.resourceManagerLeaderElectionService = new TestingLeaderElectionService();
        this.highAvailabilityServices.setResourceManagerLeaderElectionService(this.resourceManagerLeaderElectionService);
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManagerResourceId = ResourceID.generate();
    }

    @After
    public void after() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint(this.resourceManager, TIMEOUT);
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
        }
        if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcServices(TIMEOUT, new RpcService[]{rpcService});
        }
    }

    @Test
    public void testRequestTaskManagerInfo() throws Exception {
        ResourceID generate = ResourceID.generate();
        RpcGateway createTestingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        this.resourceManager = createAndStartResourceManager(heartbeatServices);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        TaskManagerInfo taskManagerInfo = (TaskManagerInfo) resourceManagerGateway.requestTaskManagerInfo(generate, TestingUtils.TIMEOUT()).get();
        Assert.assertEquals(generate, taskManagerInfo.getResourceId());
        Assert.assertEquals(hardwareDescription, taskManagerInfo.getHardwareDescription());
        Assert.assertEquals(createTestingTaskExecutorGateway.getAddress(), taskManagerInfo.getAddress());
        Assert.assertEquals(1234L, taskManagerInfo.getDataPort());
        Assert.assertEquals(23456L, taskManagerInfo.getJmxPort());
        Assert.assertEquals(0L, taskManagerInfo.getNumberSlots());
        Assert.assertEquals(0L, taskManagerInfo.getNumberAvailableSlots());
    }

    private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, String str) throws Exception {
        Assert.assertThat(resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(str, resourceID, dataPort, jmxPort, hardwareDescription, new TaskExecutorMemoryConfiguration(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L), ResourceProfile.ZERO, ResourceProfile.ZERO), TestingUtils.TIMEOUT()).get(), Matchers.instanceOf(RegistrationResponse.Success.class));
    }

    @Test
    public void testHeartbeatTimeoutWithJobMaster() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingJobMasterGatewayBuilder testingJobMasterGatewayBuilder = new TestingJobMasterGatewayBuilder();
        completableFuture.getClass();
        TestingJobMasterGatewayBuilder resourceManagerHeartbeatConsumer = testingJobMasterGatewayBuilder.setResourceManagerHeartbeatConsumer((v1) -> {
            r1.complete(v1);
        });
        completableFuture2.getClass();
        RpcGateway build = resourceManagerHeartbeatConsumer.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).build();
        rpcService.registerGateway(build.getAddress(), build);
        JobID jobID = new JobID();
        ResourceID generate = ResourceID.generate();
        SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService(build.getAddress(), build.m203getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(jobID2 -> {
            Assert.assertThat(jobID2, Matchers.is(Matchers.equalTo(jobID)));
            return settableLeaderRetrievalService;
        });
        runHeartbeatTimeoutTest(resourceManagerGateway -> {
            Assert.assertThat(resourceManagerGateway.registerJobManager(build.m203getFencingToken(), generate, build.getAddress(), jobID, TIMEOUT).get(), Matchers.instanceOf(RegistrationResponse.Success.class));
        }, resourceID -> {
            Assert.assertThat((ResourceID) completableFuture.getNow(null), Matchers.anyOf(Matchers.is(resourceID), Matchers.is(Matchers.nullValue())));
            Assert.assertThat(completableFuture2.get(), Matchers.is(Matchers.equalTo(this.resourceManagerId)));
        });
    }

    @Test
    public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        completableFuture2.getClass();
        TestingTaskExecutorGatewayBuilder disconnectResourceManagerConsumer = testingTaskExecutorGatewayBuilder.setDisconnectResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        });
        completableFuture.getClass();
        RpcGateway createTestingTaskExecutorGateway = disconnectResourceManagerConsumer.setHeartbeatResourceManagerConsumer((v1) -> {
            r1.complete(v1);
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        runHeartbeatTimeoutTest(resourceManagerGateway -> {
            registerTaskExecutor(resourceManagerGateway, generate, createTestingTaskExecutorGateway.getAddress());
        }, resourceID -> {
            Assert.assertThat((ResourceID) completableFuture.getNow(null), Matchers.anyOf(Matchers.is(resourceID), Matchers.is(Matchers.nullValue())));
            Assert.assertThat(completableFuture2.get(), Matchers.instanceOf(TimeoutException.class));
        });
    }

    private void runHeartbeatTimeoutTest(ThrowingConsumer<ResourceManagerGateway, Exception> throwingConsumer, ThrowingConsumer<ResourceID, Exception> throwingConsumer2) throws Exception {
        this.resourceManager = createAndStartResourceManager(fastHeartbeatServices);
        throwingConsumer.accept(this.resourceManager.getSelfGateway(ResourceManagerGateway.class));
        throwingConsumer2.accept(this.resourceManagerResourceId);
    }

    private TestingResourceManager createAndStartResourceManager(HeartbeatServices heartbeatServices2) throws Exception {
        TestingResourceManager testingResourceManager = new TestingResourceManager(rpcService, this.resourceManagerResourceId, this.highAvailabilityServices, heartbeatServices2, SlotManagerBuilder.newBuilder().setScheduledExecutor(rpcService.getScheduledExecutor()).build(), NoOpResourceManagerPartitionTracker::get, new JobLeaderIdService(this.highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime()), this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
        testingResourceManager.start();
        this.resourceManagerId = ResourceManagerId.generate();
        this.resourceManagerLeaderElectionService.isLeader(this.resourceManagerId.toUUID()).get();
        return testingResourceManager;
    }
}
