package org.apache.flink.runtime.executiongraph.failover;

import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.scheduler.adaptive.JobFailureMetricReporter;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.throwable.ThrowableClassifier;
import org.apache.flink.runtime.throwable.ThrowableType;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/ExecutionFailureHandler.class */
public class ExecutionFailureHandler {
    public static final String FAILURE_LABEL_ATTRIBUTE_PREFIX = "failureLabel.";
    private final SchedulingTopology schedulingTopology;
    private final FailoverStrategy failoverStrategy;
    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
    private long numberOfRestarts;
    private final FailureEnricher.Context taskFailureCtx;
    private final FailureEnricher.Context globalFailureCtx;
    private final Collection<FailureEnricher> failureEnrichers;
    private final ComponentMainThreadExecutor mainThreadExecutor;
    private final MetricGroup metricGroup;
    private final boolean reportEventsAsSpans;
    private final JobFailureMetricReporter jobFailureMetricReporter;

    public ExecutionFailureHandler(Configuration configuration, SchedulingTopology schedulingTopology, FailoverStrategy failoverStrategy, RestartBackoffTimeStrategy restartBackoffTimeStrategy, ComponentMainThreadExecutor componentMainThreadExecutor, Collection<FailureEnricher> collection, FailureEnricher.Context context, FailureEnricher.Context context2, MetricGroup metricGroup) {
        this.schedulingTopology = (SchedulingTopology) Preconditions.checkNotNull(schedulingTopology);
        this.failoverStrategy = (FailoverStrategy) Preconditions.checkNotNull(failoverStrategy);
        this.restartBackoffTimeStrategy = (RestartBackoffTimeStrategy) Preconditions.checkNotNull(restartBackoffTimeStrategy);
        this.mainThreadExecutor = (ComponentMainThreadExecutor) Preconditions.checkNotNull(componentMainThreadExecutor);
        this.failureEnrichers = (Collection) Preconditions.checkNotNull(collection);
        this.taskFailureCtx = context;
        this.globalFailureCtx = context2;
        this.metricGroup = metricGroup;
        this.reportEventsAsSpans = ((Boolean) configuration.get(TraceOptions.REPORT_EVENTS_AS_SPANS)).booleanValue();
        this.jobFailureMetricReporter = new JobFailureMetricReporter(metricGroup);
    }

    public FailureHandlingResult getFailureHandlingResult(Execution execution, Throwable th, long j) {
        return handleFailureAndReport(execution, th, j, this.failoverStrategy.getTasksNeedingRestart(execution.getVertex().getID(), th), false);
    }

    public FailureHandlingResult getGlobalFailureHandlingResult(Throwable th, long j) {
        return handleFailureAndReport(null, th, j, (Set) IterableUtils.toStream(this.schedulingTopology.getVertices()).map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet()), true);
    }

    private CompletableFuture<Map<String, String>> labelFailure(Throwable th, boolean z) {
        if (this.failureEnrichers.isEmpty()) {
            return FailureEnricherUtils.EMPTY_FAILURE_LABELS;
        }
        return FailureEnricherUtils.labelFailure(th, z ? this.globalFailureCtx : this.taskFailureCtx, this.mainThreadExecutor, this.failureEnrichers);
    }

    private FailureHandlingResult handleFailureAndReport(@Nullable Execution execution, Throwable th, long j, Set<ExecutionVertexID> set, boolean z) {
        FailureHandlingResult handleFailure = handleFailure(execution, th, j, set, z);
        if (this.reportEventsAsSpans) {
            handleFailure.getFailureLabels().thenAcceptAsync(map -> {
                this.jobFailureMetricReporter.reportJobFailure(handleFailure, (Map<String, String>) map);
            }, (Executor) this.mainThreadExecutor);
        }
        return handleFailure;
    }

    private FailureHandlingResult handleFailure(@Nullable Execution execution, Throwable th, long j, Set<ExecutionVertexID> set, boolean z) {
        CompletableFuture<Map<String, String>> labelFailure = labelFailure(th, z);
        if (isUnrecoverableError(th)) {
            return FailureHandlingResult.unrecoverable(execution, new JobException("The failure is not recoverable", th), j, labelFailure, z, true);
        }
        boolean notifyFailure = this.restartBackoffTimeStrategy.notifyFailure(th);
        if (!this.restartBackoffTimeStrategy.canRestart()) {
            return FailureHandlingResult.unrecoverable(execution, new JobException("Recovery is suppressed by " + this.restartBackoffTimeStrategy, th), j, labelFailure, z, notifyFailure);
        }
        if (notifyFailure) {
            this.numberOfRestarts++;
        }
        return FailureHandlingResult.restartable(execution, th, j, labelFailure, set, this.restartBackoffTimeStrategy.getBackoffTime(), z, notifyFailure);
    }

    public static boolean isUnrecoverableError(Throwable th) {
        return ThrowableClassifier.findThrowableOfThrowableType(th, ThrowableType.NonRecoverableError).isPresent();
    }

    public long getNumberOfRestarts() {
        return this.numberOfRestarts;
    }
}
