package org.apache.flink.streaming.runtime.tasks;

import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper.class */
public class StreamOperatorWrapper<OUT, OP extends StreamOperator<OUT>> {
    private final OP wrapped;
    private final Optional<ProcessingTimeService> processingTimeService;
    private final MailboxExecutor mailboxExecutor;
    private final boolean isHead;
    private StreamOperatorWrapper<?, ?> previous;
    private StreamOperatorWrapper<?, ?> next;
    private boolean closed;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamOperatorWrapper$ReadIterator.class */
    static class ReadIterator implements Iterator<StreamOperatorWrapper<?, ?>>, Iterable<StreamOperatorWrapper<?, ?>> {
        private final boolean reverse;
        private StreamOperatorWrapper<?, ?> current;

        /* JADX INFO: Access modifiers changed from: package-private */
        public ReadIterator(StreamOperatorWrapper<?, ?> streamOperatorWrapper, boolean z) {
            this.current = streamOperatorWrapper;
            this.reverse = z;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.current != null;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public StreamOperatorWrapper<?, ?> next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            StreamOperatorWrapper<?, ?> streamOperatorWrapper = this.current;
            this.current = this.reverse ? ((StreamOperatorWrapper) this.current).previous : ((StreamOperatorWrapper) this.current).next;
            return streamOperatorWrapper;
        }

        @Override // java.lang.Iterable
        @Nonnull
        public Iterator<StreamOperatorWrapper<?, ?>> iterator() {
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamOperatorWrapper(OP op, Optional<ProcessingTimeService> optional, MailboxExecutor mailboxExecutor, boolean z) {
        this.wrapped = (OP) Preconditions.checkNotNull(op);
        this.processingTimeService = (Optional) Preconditions.checkNotNull(optional);
        this.mailboxExecutor = (MailboxExecutor) Preconditions.checkNotNull(mailboxExecutor);
        this.isHead = z;
    }

    public boolean isClosed() {
        return this.closed;
    }

    public void endOperatorInput(int i) throws Exception {
        if (this.wrapped instanceof BoundedOneInput) {
            ((BoundedOneInput) this.wrapped).endInput();
        } else if (this.wrapped instanceof BoundedMultiInput) {
            ((BoundedMultiInput) this.wrapped).endInput(i);
        }
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (this.closed) {
            return;
        }
        this.wrapped.notifyCheckpointComplete(j);
    }

    public OP getStreamOperator() {
        return this.wrapped;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setPrevious(StreamOperatorWrapper streamOperatorWrapper) {
        this.previous = streamOperatorWrapper;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setNext(StreamOperatorWrapper streamOperatorWrapper) {
        this.next = streamOperatorWrapper;
    }

    public void finish(StreamTaskActionExecutor streamTaskActionExecutor) throws Exception {
        if (!this.isHead) {
            streamTaskActionExecutor.runThrowing(() -> {
                endOperatorInput(1);
            });
        }
        quiesceTimeServiceAndFinishOperator(streamTaskActionExecutor);
        if (this.next != null) {
            this.next.finish(streamTaskActionExecutor);
        }
    }

    public void close() throws Exception {
        this.closed = true;
        this.wrapped.close();
    }

    private void quiesceTimeServiceAndFinishOperator(StreamTaskActionExecutor streamTaskActionExecutor) throws InterruptedException, ExecutionException {
        CompletableFuture thenCompose = quiesceProcessingTimeService().thenCompose(r5 -> {
            return deferFinishOperatorToMailbox(streamTaskActionExecutor);
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) r3 -> {
            return sendFinishedMail();
        });
        while (!thenCompose.isDone()) {
            do {
            } while (this.mailboxExecutor.tryYield());
            thenCompose.get(1L, TimeUnit.MILLISECONDS);
        }
        thenCompose.get();
    }

    private CompletableFuture<Void> deferFinishOperatorToMailbox(StreamTaskActionExecutor streamTaskActionExecutor) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.mailboxExecutor.execute(() -> {
            try {
                finishOperator(streamTaskActionExecutor);
                completableFuture.complete(null);
            } catch (Throwable th) {
                completableFuture.completeExceptionally(th);
            }
        }, "StreamOperatorWrapper#finishOperator for " + this.wrapped);
        return completableFuture;
    }

    private CompletableFuture<Void> quiesceProcessingTimeService() {
        return (CompletableFuture) this.processingTimeService.map((v0) -> {
            return v0.quiesce();
        }).orElse(CompletableFuture.completedFuture(null));
    }

    private CompletableFuture<Void> sendFinishedMail() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.mailboxExecutor.execute(() -> {
            completableFuture.complete(null);
        }, "StreamOperatorWrapper#sendFinishedMail for " + this.wrapped);
        return completableFuture;
    }

    private void finishOperator(StreamTaskActionExecutor streamTaskActionExecutor) throws Exception {
        OP op = this.wrapped;
        op.getClass();
        streamTaskActionExecutor.runThrowing(op::finish);
    }
}
