package org.apache.flink.runtime.operators.coordination;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.class */
public class OperatorCoordinatorHolder implements OperatorCoordinator, OperatorCoordinatorCheckpointContext {
    private final OperatorCoordinator coordinator;
    private final OperatorID operatorId;
    private final LazyInitializedCoordinatorContext context;
    private final OperatorEventValve eventValve;
    private final int operatorParallelism;
    private final int operatorMaxParallelism;
    private Consumer<Throwable> globalFailureHandler;
    private ComponentMainThreadExecutor mainThreadExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.class */
    public static final class LazyInitializedCoordinatorContext implements OperatorCoordinator.Context {
        private static final Logger LOG = LoggerFactory.getLogger(LazyInitializedCoordinatorContext.class);
        private final OperatorID operatorId;
        private final OperatorEventValve eventValve;
        private final String operatorName;
        private final ClassLoader userCodeClassLoader;
        private final int operatorParallelism;
        private Consumer<Throwable> globalFailureHandler;
        private Executor schedulerExecutor;
        private volatile boolean failed;

        public LazyInitializedCoordinatorContext(OperatorID operatorID, OperatorEventValve operatorEventValve, String str, ClassLoader classLoader, int i) {
            this.operatorId = (OperatorID) Preconditions.checkNotNull(operatorID);
            this.eventValve = (OperatorEventValve) Preconditions.checkNotNull(operatorEventValve);
            this.operatorName = (String) Preconditions.checkNotNull(str);
            this.userCodeClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
            this.operatorParallelism = i;
        }

        void lazyInitialize(Consumer<Throwable> consumer, Executor executor) {
            this.globalFailureHandler = (Consumer) Preconditions.checkNotNull(consumer);
            this.schedulerExecutor = (Executor) Preconditions.checkNotNull(executor);
        }

        void unInitialize() {
            this.globalFailureHandler = null;
            this.schedulerExecutor = null;
        }

        boolean isInitialized() {
            return this.schedulerExecutor != null;
        }

        private void checkInitialized() {
            Preconditions.checkState(isInitialized(), "Context was not yet initialized");
        }

        void resetFailed() {
            this.failed = false;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public CompletableFuture<Acknowledge> sendEvent(OperatorEvent operatorEvent, int i) {
            checkInitialized();
            if (i < 0 || i >= currentParallelism()) {
                throw new IllegalArgumentException(String.format("subtask index %d out of bounds [0, %d).", Integer.valueOf(i), Integer.valueOf(currentParallelism())));
            }
            try {
                return this.eventValve.sendEvent(new SerializedValue<>(operatorEvent), i);
            } catch (IOException e) {
                throw new FlinkRuntimeException("Cannot serialize operator event", e);
            }
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public void failJob(Throwable th) {
            checkInitialized();
            if (this.failed) {
                LOG.warn("Ignoring the request to fail job because the job is already failing. The ignored failure cause is", th);
                return;
            }
            this.failed = true;
            FlinkException flinkException = new FlinkException("Global failure triggered by OperatorCoordinator for '" + this.operatorName + "' (operator " + this.operatorId + ").", th);
            this.schedulerExecutor.execute(() -> {
                this.globalFailureHandler.accept(flinkException);
            });
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public int currentParallelism() {
            return this.operatorParallelism;
        }

        @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator.Context
        public ClassLoader getUserCodeClassloader() {
            return this.userCodeClassLoader;
        }
    }

    private OperatorCoordinatorHolder(OperatorID operatorID, OperatorCoordinator operatorCoordinator, LazyInitializedCoordinatorContext lazyInitializedCoordinatorContext, OperatorEventValve operatorEventValve, int i, int i2) {
        this.operatorId = (OperatorID) Preconditions.checkNotNull(operatorID);
        this.coordinator = (OperatorCoordinator) Preconditions.checkNotNull(operatorCoordinator);
        this.context = (LazyInitializedCoordinatorContext) Preconditions.checkNotNull(lazyInitializedCoordinatorContext);
        this.eventValve = (OperatorEventValve) Preconditions.checkNotNull(operatorEventValve);
        this.operatorParallelism = i;
        this.operatorMaxParallelism = i2;
    }

    public void lazyInitialize(Consumer<Throwable> consumer, ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.globalFailureHandler = consumer;
        this.mainThreadExecutor = componentMainThreadExecutor;
        this.context.lazyInitialize(consumer, componentMainThreadExecutor);
    }

    public OperatorCoordinator coordinator() {
        return this.coordinator;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorInfo
    public OperatorID operatorId() {
        return this.operatorId;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorInfo
    public int maxParallelism() {
        return this.operatorMaxParallelism;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorInfo
    public int currentParallelism() {
        return this.operatorParallelism;
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void start() throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        Preconditions.checkState(this.context.isInitialized(), "Coordinator Context is not yet initialized");
        this.coordinator.start();
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, java.lang.AutoCloseable
    public void close() throws Exception {
        this.coordinator.close();
        this.context.unInitialize();
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void handleEventFromOperator(int i, OperatorEvent operatorEvent) throws Exception {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.handleEventFromOperator(i, operatorEvent);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator
    public void subtaskFailed(int i, @Nullable Throwable th) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.subtaskFailed(i, th);
        this.eventValve.resetForTask(i);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void subtaskReset(int i, long j) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.coordinator.subtaskReset(i, j);
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        this.mainThreadExecutor.execute(() -> {
            checkpointCoordinatorInternal(j, completableFuture);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void notifyCheckpointComplete(long j) {
        this.mainThreadExecutor.execute(() -> {
            this.coordinator.notifyCheckpointComplete(j);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void notifyCheckpointAborted(long j) {
        this.mainThreadExecutor.execute(() -> {
            this.coordinator.notifyCheckpointAborted(j);
        });
    }

    @Override // org.apache.flink.runtime.operators.coordination.OperatorCoordinator, org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void resetToCheckpoint(long j, @Nullable byte[] bArr) throws Exception {
        this.eventValve.reset();
        if (this.context != null) {
            this.context.resetFailed();
        }
        this.coordinator.resetToCheckpoint(j, bArr);
    }

    private void checkpointCoordinatorInternal(long j, CompletableFuture<byte[]> completableFuture) {
        this.mainThreadExecutor.assertRunningInMainThread();
        completableFuture.whenComplete((bArr, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(th);
                return;
            }
            try {
                this.eventValve.shutValve(j);
                completableFuture.complete(bArr);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        try {
            this.eventValve.markForCheckpoint(j);
            this.coordinator.checkpointCoordinator(j, completableFuture);
        } catch (Throwable th2) {
            ExceptionUtils.rethrowIfFatalErrorOrOOM(th2);
            completableFuture.completeExceptionally(th2);
            this.globalFailureHandler.accept(th2);
        }
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void afterSourceBarrierInjection(long j) {
        this.eventValve.openValveAndUnmarkCheckpoint();
    }

    @Override // org.apache.flink.runtime.checkpoint.OperatorCoordinatorCheckpointContext
    public void abortCurrentTriggering() {
        this.eventValve.openValveAndUnmarkCheckpoint();
    }

    public static OperatorCoordinatorHolder create(SerializedValue<OperatorCoordinator.Provider> serializedValue, ExecutionJobVertex executionJobVertex, ClassLoader classLoader) throws Exception {
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(classLoader);
        Throwable th = null;
        try {
            OperatorCoordinator.Provider provider = (OperatorCoordinator.Provider) serializedValue.deserializeValue(classLoader);
            OperatorID operatorId = provider.getOperatorId();
            OperatorCoordinatorHolder create = create(operatorId, provider, (serializedValue2, num) -> {
                return executionJobVertex.getTaskVertices()[num.intValue()].getCurrentExecutionAttempt().sendOperatorEvent(operatorId, serializedValue2);
            }, executionJobVertex.getName(), executionJobVertex.getGraph().getUserClassLoader(), executionJobVertex.getParallelism(), executionJobVertex.getMaxParallelism());
            if (of != null) {
                if (0 != 0) {
                    try {
                        of.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    of.close();
                }
            }
            return create;
        } catch (Throwable th3) {
            if (of != null) {
                if (0 != 0) {
                    try {
                        of.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    of.close();
                }
            }
            throw th3;
        }
    }

    @VisibleForTesting
    static OperatorCoordinatorHolder create(OperatorID operatorID, OperatorCoordinator.Provider provider, BiFunction<SerializedValue<OperatorEvent>, Integer, CompletableFuture<Acknowledge>> biFunction, String str, ClassLoader classLoader, int i, int i2) throws Exception {
        OperatorEventValve operatorEventValve = new OperatorEventValve(biFunction);
        LazyInitializedCoordinatorContext lazyInitializedCoordinatorContext = new LazyInitializedCoordinatorContext(operatorID, operatorEventValve, str, classLoader, i);
        return new OperatorCoordinatorHolder(operatorID, provider.create(lazyInitializedCoordinatorContext), lazyInitializedCoordinatorContext, operatorEventValve, i, i2);
    }
}
