package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.runtime.util.NoOpGroupCache;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.FunctionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.class */
class DefaultExecutionGraphDeploymentTest {

    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
    protected PermanentBlobService blobCache = null;

    protected void checkJobOffloaded(DefaultExecutionGraph defaultExecutionGraph) throws Exception {
        Assertions.assertThat(defaultExecutionGraph.getTaskDeploymentDescriptorFactory().getSerializedJobInformation()).isInstanceOf(TaskDeploymentDescriptor.NonOffloaded.class);
    }

    protected void checkTaskOffloaded(ExecutionGraph executionGraph, JobVertexID jobVertexID) throws Exception {
        Assertions.assertThat(executionGraph.getJobVertex(jobVertexID).getTaskInformationOrBlobKey().isLeft()).isTrue();
    }

    @Test
    void testBuildDeploymentDescriptor() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        JobVertexID jobVertexID4 = new JobVertexID();
        JobVertex jobVertex = new JobVertex("v1", jobVertexID);
        JobVertex jobVertex2 = new JobVertex("v2", jobVertexID2);
        JobVertex jobVertex3 = new JobVertex("v3", jobVertexID3);
        JobVertex jobVertex4 = new JobVertex("v4", jobVertexID4);
        jobVertex.setParallelism(10);
        jobVertex2.setParallelism(10);
        jobVertex3.setParallelism(10);
        jobVertex4.setParallelism(10);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        jobVertex3.setInvokableClass(BatchTask.class);
        jobVertex4.setInvokableClass(BatchTask.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph batchJobGraph = JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2, jobVertex3, jobVertex4);
        JobID jobID = batchJobGraph.getJobID();
        DefaultExecutionGraph build = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(batchJobGraph).setBlobWriter(this.blobWriter).build(new DirectScheduledExecutorService());
        build.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        checkJobOffloaded(build);
        ExecutionVertex executionVertex = ((ExecutionJobVertex) build.getAllVertices().get(jobVertexID2)).getTaskVertices()[3];
        SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        simpleAckingTaskManagerGateway.setSubmitConsumer(FunctionUtils.uncheckedConsumer(taskDeploymentDescriptor -> {
            taskDeploymentDescriptor.loadBigData(this.blobCache, new NoOpGroupCache(), new NoOpGroupCache(), new NoOpGroupCache());
            completableFuture.complete(taskDeploymentDescriptor);
        }));
        TestingLogicalSlot createTestingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(simpleAckingTaskManagerGateway).createTestingLogicalSlot();
        Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.CREATED);
        executionVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
        executionVertex.getCurrentExecutionAttempt().registerProducedPartitions(createTestingLogicalSlot.getTaskManagerLocation()).get();
        executionVertex.deployToSlot(createTestingLogicalSlot);
        Assertions.assertThat(executionVertex.getExecutionState()).isEqualTo(ExecutionState.DEPLOYING);
        checkTaskOffloaded(build, executionVertex.getJobvertexId());
        TaskDeploymentDescriptor taskDeploymentDescriptor2 = (TaskDeploymentDescriptor) completableFuture.get();
        Assertions.assertThat(taskDeploymentDescriptor2).isNotNull();
        JobInformation jobInformation = taskDeploymentDescriptor2.getJobInformation();
        TaskInformation taskInformation = taskDeploymentDescriptor2.getTaskInformation();
        Assertions.assertThat(taskDeploymentDescriptor2.getJobId()).isEqualTo(jobID);
        Assertions.assertThat(jobInformation.getJobId()).isEqualTo(jobID);
        Assertions.assertThat(taskInformation.getJobVertexId()).isEqualTo(jobVertexID2);
        Assertions.assertThat(taskDeploymentDescriptor2.getSubtaskIndex()).isEqualTo(3);
        Assertions.assertThat(taskInformation.getNumberOfSubtasks()).isEqualTo(10);
        Assertions.assertThat(taskInformation.getInvokableClassName()).isEqualTo(BatchTask.class.getName());
        Assertions.assertThat(taskInformation.getTaskName()).isEqualTo("v2");
        List producedPartitions = taskDeploymentDescriptor2.getProducedPartitions();
        List inputGates = taskDeploymentDescriptor2.getInputGates();
        Assertions.assertThat(producedPartitions).hasSize(2);
        Assertions.assertThat(inputGates).hasSize(1);
        Iterator it = producedPartitions.iterator();
        Iterator it2 = inputGates.iterator();
        Assertions.assertThat(((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions()).isEqualTo(10);
        Assertions.assertThat(((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions()).isEqualTo(10);
        ShuffleDescriptor[] shuffleDescriptors = ((InputGateDeploymentDescriptor) it2.next()).getShuffleDescriptors();
        Assertions.assertThat(shuffleDescriptors.length).isEqualTo(10);
        int i = 0;
        Iterator it3 = ((ConsumedPartitionGroup) executionVertex.getAllConsumedPartitionGroups().iterator().next()).iterator();
        while (it3.hasNext()) {
            int i2 = i;
            i++;
            Assertions.assertThat(shuffleDescriptors[i2].getResultPartitionID().getPartitionId()).isEqualTo((IntermediateResultPartitionID) it3.next());
        }
    }

    @Test
    void testRegistrationOfExecutionsFinishing() throws Exception {
        SchedulerBase schedulerBase = setupScheduler(new JobVertex("v1", new JobVertexID()), 7650, new JobVertex("v2", new JobVertexID()), 2350);
        Iterator it = new ArrayList(schedulerBase.getExecutionGraph().getRegisteredExecutions().values()).iterator();
        while (it.hasNext()) {
            ((Execution) it.next()).markFinished();
        }
        Assertions.assertThat(schedulerBase.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testRegistrationOfExecutionsFailing() throws Exception {
        SchedulerBase schedulerBase = setupScheduler(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6);
        Iterator it = new ArrayList(schedulerBase.getExecutionGraph().getRegisteredExecutions().values()).iterator();
        while (it.hasNext()) {
            ((Execution) it.next()).markFailed((Throwable) null);
        }
        Assertions.assertThat(schedulerBase.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testRegistrationOfExecutionsFailedExternally() throws Exception {
        SchedulerBase schedulerBase = setupScheduler(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6);
        Iterator it = new ArrayList(schedulerBase.getExecutionGraph().getRegisteredExecutions().values()).iterator();
        while (it.hasNext()) {
            ((Execution) it.next()).fail((Throwable) null);
        }
        Assertions.assertThat(schedulerBase.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testAccumulatorsAndMetricsForwarding() throws Exception {
        SchedulerBase schedulerBase = setupScheduler(new JobVertex("v1", new JobVertexID()), 1, new JobVertex("v2", new JobVertexID()), 1);
        ExecutionGraph executionGraph = schedulerBase.getExecutionGraph();
        Map registeredExecutions = executionGraph.getRegisteredExecutions();
        Execution execution = (Execution) registeredExecutions.values().iterator().next();
        IOMetrics iOMetrics = new IOMetrics(0L, 0L, 0L, 0L, 0L, 0.0d, 0L);
        HashMap hashMap = new HashMap();
        hashMap.put("acc", new IntCounter(4));
        schedulerBase.updateTaskExecutionState(new TaskExecutionState(execution.getAttemptId(), ExecutionState.CANCELED, (Throwable) null, new AccumulatorSnapshot(executionGraph.getJobID(), execution.getAttemptId(), hashMap), iOMetrics));
        assertIOMetricsEqual(execution.getIOMetrics(), iOMetrics);
        Assertions.assertThat(execution.getUserAccumulators()).isNotNull();
        Assertions.assertThat(((Accumulator) execution.getUserAccumulators().get("acc")).getLocalValue()).isEqualTo(4);
        Execution execution2 = (Execution) registeredExecutions.values().iterator().next();
        IOMetrics iOMetrics2 = new IOMetrics(0L, 0L, 0L, 0L, 0L, 0.0d, 0L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("acc", new IntCounter(8));
        schedulerBase.updateTaskExecutionState(new TaskExecutionState(execution2.getAttemptId(), ExecutionState.FAILED, (Throwable) null, new AccumulatorSnapshot(executionGraph.getJobID(), execution2.getAttemptId(), hashMap2), iOMetrics2));
        assertIOMetricsEqual(execution2.getIOMetrics(), iOMetrics2);
        Assertions.assertThat(execution2.getUserAccumulators()).isNotNull();
        Assertions.assertThat(((Accumulator) execution2.getUserAccumulators().get("acc")).getLocalValue()).isEqualTo(8);
    }

    @Test
    void testAccumulatorsAndMetricsStorage() throws Exception {
        Map registeredExecutions = setupScheduler(new JobVertex("v1", new JobVertexID()), 1, new JobVertex("v2", new JobVertexID()), 1).getExecutionGraph().getRegisteredExecutions();
        IOMetrics iOMetrics = new IOMetrics(0L, 0L, 0L, 0L, 0L, 0.0d, 0L);
        Map emptyMap = Collections.emptyMap();
        Execution execution = (Execution) registeredExecutions.values().iterator().next();
        execution.cancel();
        execution.completeCancelling(emptyMap, iOMetrics, false);
        assertIOMetricsEqual(execution.getIOMetrics(), iOMetrics);
        Assertions.assertThat(execution.getUserAccumulators()).isEqualTo(emptyMap);
        Execution execution2 = (Execution) registeredExecutions.values().iterator().next();
        execution2.markFailed(new Throwable(), false, emptyMap, iOMetrics, false, true);
        assertIOMetricsEqual(execution2.getIOMetrics(), iOMetrics);
        Assertions.assertThat(execution2.getUserAccumulators()).isEqualTo(emptyMap);
    }

    @Test
    void testRegistrationOfExecutionsCanceled() throws Exception {
        SchedulerBase schedulerBase = setupScheduler(new JobVertex("v1", new JobVertexID()), 19, new JobVertex("v2", new JobVertexID()), 37);
        for (Execution execution : new ArrayList(schedulerBase.getExecutionGraph().getRegisteredExecutions().values())) {
            execution.cancel();
            execution.completeCancelling();
        }
        Assertions.assertThat(schedulerBase.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testNoResourceAvailableFailure() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(2);
        jobVertex2.setParallelism(2);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        DefaultScheduler build = new DefaultSchedulerBuilder(JobGraphTestUtils.batchJobGraph(jobVertex, jobVertex2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1))).setFutureExecutor(new DirectScheduledExecutorService()).setBlobWriter(this.blobWriter).build();
        ExecutionGraph executionGraph = build.getExecutionGraph();
        checkJobOffloaded((DefaultExecutionGraph) executionGraph);
        build.startScheduling();
        ExecutionAttemptID attemptId = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        build.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.RUNNING));
        build.updateTaskExecutionState(new TaskExecutionState(attemptId, ExecutionState.FINISHED, (Throwable) null));
        Assertions.assertThat(executionGraph.getState()).isEqualTo(JobStatus.FAILED);
    }

    @Test
    void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
        Assertions.assertThat(createExecutionGraph(new Configuration()).getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()).isEqualTo(((Integer) CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue());
    }

    private SchedulerBase setupScheduler(JobVertex jobVertex, int i, JobVertex jobVertex2, int i2) throws Exception {
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i2);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        DefaultScheduler build = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory()).setFutureExecutor(new DirectScheduledExecutorService()).setBlobWriter(this.blobWriter).build();
        ExecutionGraph executionGraph = build.getExecutionGraph();
        checkJobOffloaded((DefaultExecutionGraph) executionGraph);
        build.startScheduling();
        Assertions.assertThat(executionGraph.getRegisteredExecutions()).hasSize(i + i2);
        return build;
    }

    @Test
    void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, -10);
        ExecutionGraph createExecutionGraph = createExecutionGraph(configuration);
        Assertions.assertThat(createExecutionGraph.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()).isNotEqualTo(-10);
        Assertions.assertThat(createExecutionGraph.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()).isEqualTo(((Integer) CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue());
    }

    @Test
    void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
        JobVertex jobVertex = new JobVertex("source");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(2);
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        jobVertex2.setParallelism(1);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3);
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            arrayBlockingQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway(), JobMasterId.generate());
        JobGraph streamingJobGraph = JobGraphTestUtils.streamingJobGraph(jobVertex, jobVertex2);
        TestingPhysicalSlotProvider createWithoutImmediatePhysicalSlotCreation = TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
        DefaultScheduler build = new DefaultSchedulerBuilder(streamingJobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory(SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(createWithoutImmediatePhysicalSlotCreation)).setFutureExecutor(new DirectScheduledExecutorService()).build();
        ExecutionGraph executionGraph = build.getExecutionGraph();
        build.startScheduling();
        ArrayList arrayList = new ArrayList(createWithoutImmediatePhysicalSlotCreation.getResponses().values());
        Collections.shuffle(arrayList);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((CompletableFuture) it.next()).complete(TestingPhysicalSlot.builder().withTaskManagerLocation(localTaskManagerLocation).withTaskManagerGateway(rpcTaskManagerGateway).build());
        }
        ArrayList arrayList2 = new ArrayList(3);
        for (int i = 0; i < 3; i++) {
            arrayList2.add(arrayBlockingQueue.take());
        }
        ArrayList arrayList3 = new ArrayList(2);
        for (ExecutionVertex executionVertex : executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()) {
            arrayList3.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        ArrayList arrayList4 = new ArrayList(1);
        for (ExecutionVertex executionVertex2 : executionGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()) {
            arrayList4.add(executionVertex2.getCurrentExecutionAttempt().getAttemptId());
        }
        Assertions.assertThat(isDeployedInTopologicalOrder(arrayList2, Arrays.asList(arrayList3, arrayList4))).isTrue();
    }

    private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
        JobGraph emptyJobGraph = JobGraphTestUtils.emptyJobGraph();
        emptyJobGraph.setSnapshotSettings(new JobCheckpointingSettings(new CheckpointCoordinatorConfiguration(100L, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, 0, 0L), (SerializedValue) null));
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(emptyJobGraph).setJobMasterConfig(configuration).setBlobWriter(this.blobWriter).build((ScheduledExecutorService) EXECUTOR_EXTENSION.getExecutor());
    }

    private static boolean isDeployedInTopologicalOrder(List<ExecutionAttemptID> list, List<Collection<ExecutionAttemptID>> list2) {
        Iterator<ExecutionAttemptID> it = list.iterator();
        Iterator<Collection<ExecutionAttemptID>> it2 = list2.iterator();
        while (it2.hasNext()) {
            ArrayList arrayList = new ArrayList(it2.next());
            while (!arrayList.isEmpty() && it.hasNext()) {
                if (!arrayList.remove(it.next())) {
                    return false;
                }
            }
            if (!arrayList.isEmpty()) {
                return false;
            }
        }
        return !it.hasNext();
    }

    private void assertIOMetricsEqual(IOMetrics iOMetrics, IOMetrics iOMetrics2) {
        Assertions.assertThat(iOMetrics.numBytesIn).isEqualTo(iOMetrics2.numBytesIn);
        Assertions.assertThat(iOMetrics.numBytesOut).isEqualTo(iOMetrics2.numBytesOut);
        Assertions.assertThat(iOMetrics.numRecordsIn).isEqualTo(iOMetrics2.numRecordsIn);
        Assertions.assertThat(iOMetrics.numRecordsOut).isEqualTo(iOMetrics2.numRecordsOut);
        Assertions.assertThat(iOMetrics.accumulateIdleTime).isEqualTo(iOMetrics2.accumulateIdleTime);
        Assertions.assertThat(iOMetrics.accumulateBusyTime).isEqualTo(iOMetrics2.accumulateBusyTime);
        Assertions.assertThat(iOMetrics.accumulateBackPressuredTime).isEqualTo(iOMetrics2.accumulateBackPressuredTime);
        Assertions.assertThat(iOMetrics.resultPartitionBytes).isEqualTo(iOMetrics2.resultPartitionBytes);
    }
}
