package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.NoRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.TestRestartBackoffTimeStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.JobMasterPartitionTracker;
import org.apache.flink.runtime.io.network.partition.NoOpJobMasterPartitionTracker;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.DefaultExecutionDeploymentTracker;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.scheduler.LocalInputPreferredSlotSharingStrategy;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerTestingUtils.class */
public class SchedulerTestingUtils {
    private static final Logger LOG;
    private static final long DEFAULT_CHECKPOINT_TIMEOUT_MS = 600000;
    private static final Time DEFAULT_TIMEOUT;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerTestingUtils$DefaultSchedulerBuilder.class */
    public static class DefaultSchedulerBuilder {
        private final JobGraph jobGraph;
        private final ComponentMainThreadExecutor mainThreadExecutor;
        private SchedulingStrategyFactory schedulingStrategyFactory = new PipelinedRegionSchedulingStrategy.Factory();
        private Logger log = SchedulerTestingUtils.LOG;
        private Executor ioExecutor = TestingUtils.defaultExecutor();
        private Configuration jobMasterConfiguration = new Configuration();
        private ScheduledExecutorService futureExecutor = TestingUtils.defaultExecutor();
        private ScheduledExecutor delayExecutor = new ScheduledExecutorServiceAdapter(this.futureExecutor);
        private ClassLoader userCodeLoader = ClassLoader.getSystemClassLoader();
        private CheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
        private Time rpcTimeout = SchedulerTestingUtils.DEFAULT_TIMEOUT;
        private BlobWriter blobWriter = VoidBlobWriter.getInstance();
        private JobManagerJobMetricGroup jobManagerJobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
        private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE;
        private JobMasterPartitionTracker partitionTracker = NoOpJobMasterPartitionTracker.INSTANCE;
        private FailoverStrategy.Factory failoverStrategyFactory = new RestartPipelinedRegionFailoverStrategy.Factory();
        private RestartBackoffTimeStrategy restartBackoffTimeStrategy = NoRestartBackoffTimeStrategy.INSTANCE;
        private ExecutionVertexOperations executionVertexOperations = new DefaultExecutionVertexOperations();
        private ExecutionVertexVersioner executionVertexVersioner = new ExecutionVertexVersioner();
        private ExecutionSlotAllocatorFactory executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory();
        private JobStatusListener jobStatusListener = (jobID, jobStatus, j, th) -> {
        };

        public DefaultSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) {
            this.jobGraph = jobGraph;
            this.mainThreadExecutor = componentMainThreadExecutor;
        }

        public DefaultSchedulerBuilder setLogger(Logger logger) {
            this.log = logger;
            return this;
        }

        public DefaultSchedulerBuilder setIoExecutor(Executor executor) {
            this.ioExecutor = executor;
            return this;
        }

        public DefaultSchedulerBuilder setJobMasterConfiguration(Configuration configuration) {
            this.jobMasterConfiguration = configuration;
            return this;
        }

        public DefaultSchedulerBuilder setFutureExecutor(ScheduledExecutorService scheduledExecutorService) {
            this.futureExecutor = scheduledExecutorService;
            return this;
        }

        public DefaultSchedulerBuilder setDelayExecutor(ScheduledExecutor scheduledExecutor) {
            this.delayExecutor = scheduledExecutor;
            return this;
        }

        public DefaultSchedulerBuilder setUserCodeLoader(ClassLoader classLoader) {
            this.userCodeLoader = classLoader;
            return this;
        }

        public DefaultSchedulerBuilder setCheckpointRecoveryFactory(CheckpointRecoveryFactory checkpointRecoveryFactory) {
            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
            return this;
        }

        public DefaultSchedulerBuilder setRpcTimeout(Time time) {
            this.rpcTimeout = time;
            return this;
        }

        public DefaultSchedulerBuilder setBlobWriter(BlobWriter blobWriter) {
            this.blobWriter = blobWriter;
            return this;
        }

        public DefaultSchedulerBuilder setJobManagerJobMetricGroup(JobManagerJobMetricGroup jobManagerJobMetricGroup) {
            this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
            return this;
        }

        public DefaultSchedulerBuilder setShuffleMaster(ShuffleMaster<?> shuffleMaster) {
            this.shuffleMaster = shuffleMaster;
            return this;
        }

        public DefaultSchedulerBuilder setPartitionTracker(JobMasterPartitionTracker jobMasterPartitionTracker) {
            this.partitionTracker = jobMasterPartitionTracker;
            return this;
        }

        public DefaultSchedulerBuilder setSchedulingStrategyFactory(SchedulingStrategyFactory schedulingStrategyFactory) {
            this.schedulingStrategyFactory = schedulingStrategyFactory;
            return this;
        }

        public DefaultSchedulerBuilder setFailoverStrategyFactory(FailoverStrategy.Factory factory) {
            this.failoverStrategyFactory = factory;
            return this;
        }

        public DefaultSchedulerBuilder setRestartBackoffTimeStrategy(RestartBackoffTimeStrategy restartBackoffTimeStrategy) {
            this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
            return this;
        }

        public DefaultSchedulerBuilder setExecutionVertexOperations(ExecutionVertexOperations executionVertexOperations) {
            this.executionVertexOperations = executionVertexOperations;
            return this;
        }

        public DefaultSchedulerBuilder setExecutionVertexVersioner(ExecutionVertexVersioner executionVertexVersioner) {
            this.executionVertexVersioner = executionVertexVersioner;
            return this;
        }

        public DefaultSchedulerBuilder setExecutionSlotAllocatorFactory(ExecutionSlotAllocatorFactory executionSlotAllocatorFactory) {
            this.executionSlotAllocatorFactory = executionSlotAllocatorFactory;
            return this;
        }

        public DefaultSchedulerBuilder setJobStatusListener(JobStatusListener jobStatusListener) {
            this.jobStatusListener = jobStatusListener;
            return this;
        }

        public DefaultScheduler build() throws Exception {
            return new DefaultScheduler(this.log, this.jobGraph, this.ioExecutor, this.jobMasterConfiguration, componentMainThreadExecutor -> {
            }, this.futureExecutor, this.delayExecutor, this.userCodeLoader, this.checkpointRecoveryFactory, this.rpcTimeout, this.blobWriter, this.jobManagerJobMetricGroup, this.shuffleMaster, this.partitionTracker, this.schedulingStrategyFactory, this.failoverStrategyFactory, this.restartBackoffTimeStrategy, this.executionVertexOperations, this.executionVertexVersioner, this.executionSlotAllocatorFactory, new DefaultExecutionDeploymentTracker(), System.currentTimeMillis(), this.mainThreadExecutor, this.jobStatusListener);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/scheduler/SchedulerTestingUtils$TaskExecutorOperatorEventGatewayAdapter.class */
    private static final class TaskExecutorOperatorEventGatewayAdapter extends SimpleAckingTaskManagerGateway {
        private final TaskExecutorOperatorEventGateway operatorGateway;

        TaskExecutorOperatorEventGatewayAdapter(TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway) {
            this.operatorGateway = taskExecutorOperatorEventGateway;
        }

        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public CompletableFuture<Acknowledge> sendOperatorEventToTask(ExecutionAttemptID executionAttemptID, OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) {
            return this.operatorGateway.sendOperatorEventToTask(executionAttemptID, operatorID, serializedValue);
        }
    }

    private SchedulerTestingUtils() {
    }

    public static DefaultSchedulerBuilder newSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) {
        return new DefaultSchedulerBuilder(jobGraph, componentMainThreadExecutor);
    }

    public static DefaultScheduler createScheduler(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
        return newSchedulerBuilder(jobGraph, componentMainThreadExecutor).build();
    }

    public static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor) {
        return createSchedulerBuilder(jobGraph, componentMainThreadExecutor, new SimpleAckingTaskManagerGateway());
    }

    public static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor, TaskExecutorOperatorEventGateway taskExecutorOperatorEventGateway) {
        return createSchedulerBuilder(jobGraph, componentMainThreadExecutor, taskExecutorOperatorEventGateway instanceof TaskManagerGateway ? (TaskManagerGateway) taskExecutorOperatorEventGateway : new TaskExecutorOperatorEventGatewayAdapter(taskExecutorOperatorEventGateway));
    }

    public static DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph, ComponentMainThreadExecutor componentMainThreadExecutor, TaskManagerGateway taskManagerGateway) {
        return newSchedulerBuilder(jobGraph, componentMainThreadExecutor).setSchedulingStrategyFactory(new PipelinedRegionSchedulingStrategy.Factory()).setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)).setExecutionSlotAllocatorFactory(new TestExecutionSlotAllocatorFactory(taskManagerGateway));
    }

    public static void enableCheckpointing(JobGraph jobGraph) {
        enableCheckpointing(jobGraph, null, null);
    }

    public static void enableCheckpointing(JobGraph jobGraph, @Nullable StateBackend stateBackend, @Nullable CheckpointStorage checkpointStorage) {
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(Long.MAX_VALUE, DEFAULT_CHECKPOINT_TIMEOUT_MS, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, false, 0);
        SerializedValue serializedValue = null;
        if (stateBackend != null) {
            try {
                serializedValue = new SerializedValue(stateBackend);
            } catch (IOException e) {
                throw new RuntimeException("could not serialize state backend", e);
            }
        }
        SerializedValue serializedValue2 = null;
        if (checkpointStorage != null) {
            try {
                serializedValue2 = new SerializedValue(checkpointStorage);
            } catch (IOException e2) {
                throw new RuntimeException("could not serialize checkpoint storage", e2);
            }
        }
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(checkpointCoordinatorConfiguration, serializedValue, serializedValue2, (SerializedValue) null));
    }

    public static Collection<ExecutionAttemptID> getAllCurrentExecutionAttempts(DefaultScheduler defaultScheduler) {
        return (Collection) StreamSupport.stream(defaultScheduler.requestJob().getArchivedExecutionGraph().getAllExecutionVertices().spliterator(), false).map(archivedExecutionVertex -> {
            return archivedExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
        }).collect(Collectors.toList());
    }

    public static ExecutionState getExecutionState(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        return getJobVertex(defaultScheduler, jobVertexID).getTaskVertices()[i].getCurrentExecutionAttempt().getState();
    }

    public static void failExecution(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(getAttemptId(defaultScheduler, jobVertexID, i), ExecutionState.FAILED, new Exception("test task failure")));
    }

    public static void canceledExecution(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(getAttemptId(defaultScheduler, jobVertexID, i), ExecutionState.CANCELED, new Exception("test task failure")));
    }

    public static void setExecutionToRunning(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        defaultScheduler.updateTaskExecutionState(new TaskExecutionState(getAttemptId(defaultScheduler, jobVertexID, i), ExecutionState.RUNNING));
    }

    public static void setAllExecutionsToRunning(DefaultScheduler defaultScheduler) {
        getAllCurrentExecutionAttempts(defaultScheduler).forEach(executionAttemptID -> {
            defaultScheduler.updateTaskExecutionState(new TaskExecutionState(executionAttemptID, ExecutionState.RUNNING));
        });
    }

    public static void setAllExecutionsToCancelled(DefaultScheduler defaultScheduler) {
        Iterator<ExecutionAttemptID> it = getAllCurrentExecutionAttempts(defaultScheduler).iterator();
        while (it.hasNext()) {
            Assert.assertTrue("could not switch task to RUNNING", defaultScheduler.updateTaskExecutionState(new TaskExecutionState(it.next(), ExecutionState.CANCELED)));
        }
    }

    public static void acknowledgePendingCheckpoint(DefaultScheduler defaultScheduler, long j) throws CheckpointException {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        JobID jobId = defaultScheduler.getJobId();
        Iterator<ExecutionAttemptID> it = getAllCurrentExecutionAttempts(defaultScheduler).iterator();
        while (it.hasNext()) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, it.next(), j), "Unknown location");
        }
    }

    public static CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler defaultScheduler) throws Exception {
        return getCheckpointCoordinator(defaultScheduler).triggerCheckpoint(false);
    }

    public static void acknowledgeCurrentCheckpoint(DefaultScheduler defaultScheduler) {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        Assert.assertEquals("Coordinator has not ", 1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        while (pendingCheckpoint.getNumberOfNonAcknowledgedOperatorCoordinators() > 0) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                Assert.fail("interrupted");
            }
        }
        getAllCurrentExecutionAttempts(defaultScheduler).forEach(executionAttemptID -> {
            defaultScheduler.acknowledgeCheckpoint(pendingCheckpoint.getJobId(), executionAttemptID, pendingCheckpoint.getCheckpointId(), new CheckpointMetrics(), (TaskStateSnapshot) null);
        });
    }

    public static CompletedCheckpoint takeCheckpoint(DefaultScheduler defaultScheduler) throws Exception {
        CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(defaultScheduler);
        checkpointCoordinator.triggerCheckpoint(false);
        Assert.assertEquals("test setup inconsistent", 1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        CompletableFuture completionFuture = pendingCheckpoint.getCompletionFuture();
        acknowledgePendingCheckpoint(defaultScheduler, pendingCheckpoint.getCheckpointId());
        CompletedCheckpoint completedCheckpoint = (CompletedCheckpoint) completionFuture.getNow(null);
        Assert.assertNotNull("checkpoint not complete", completedCheckpoint);
        return completedCheckpoint;
    }

    public static CheckpointCoordinator getCheckpointCoordinator(SchedulerBase schedulerBase) {
        return schedulerBase.getCheckpointCoordinator();
    }

    private static ExecutionJobVertex getJobVertex(DefaultScheduler defaultScheduler, JobVertexID jobVertexID) {
        return defaultScheduler.getExecutionVertex(new ExecutionVertexID(jobVertexID, 0)).getJobVertex();
    }

    public static ExecutionAttemptID getAttemptId(DefaultScheduler defaultScheduler, JobVertexID jobVertexID, int i) {
        ExecutionJobVertex jobVertex = getJobVertex(defaultScheduler, jobVertexID);
        if ($assertionsDisabled || jobVertex != null) {
            return jobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getAttemptId();
        }
        throw new AssertionError();
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory() {
        return newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithInfiniteSlotCreation());
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider) {
        return newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, DEFAULT_TIMEOUT);
    }

    public static SlotSharingExecutionSlotAllocatorFactory newSlotSharingExecutionSlotAllocatorFactory(PhysicalSlotProvider physicalSlotProvider, Time time) {
        return new SlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider, true, new TestingPhysicalSlotRequestBulkChecker(), time, new LocalInputPreferredSlotSharingStrategy.Factory());
    }

    static {
        $assertionsDisabled = !SchedulerTestingUtils.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(SchedulerTestingUtils.class);
        DEFAULT_TIMEOUT = Time.seconds(300L);
    }
}
