package org.apache.flink.connector.base.source.reader.fetcher;

import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.class */
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
    private static final Logger LOG;
    private final int id;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final SplitReader<E, SplitT> splitReader;
    private final Consumer<Throwable> errorHandler;
    private final Runnable shutdownHook;

    @GuardedBy("lock")
    private boolean closed;

    @GuardedBy("lock")
    private boolean paused;
    private final FetchTask<E, SplitT> fetchTask;
    private final boolean allowUnalignedSourceSplits;
    private final Consumer<Collection<String>> splitFinishedHook;
    static final /* synthetic */ boolean $assertionsDisabled;

    @GuardedBy("lock")
    private final Deque<SplitFetcherTask> taskQueue = new ArrayDeque();
    private final Map<String, SplitT> assignedSplits = new HashMap();

    @GuardedBy("lock")
    @Nullable
    private SplitFetcherTask runningTask = null;
    private final ReentrantLock lock = new ReentrantLock();

    @GuardedBy("lock")
    private final Condition nonEmpty = this.lock.newCondition();

    @GuardedBy("lock")
    private final Condition resumed = this.lock.newCondition();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SplitFetcher(int i, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> futureCompletingBlockingQueue, SplitReader<E, SplitT> splitReader, Consumer<Throwable> consumer, Runnable runnable, Consumer<Collection<String>> consumer2, boolean z) {
        this.id = i;
        this.elementsQueue = (FutureCompletingBlockingQueue) Preconditions.checkNotNull(futureCompletingBlockingQueue);
        this.splitReader = (SplitReader) Preconditions.checkNotNull(splitReader);
        this.errorHandler = (Consumer) Preconditions.checkNotNull(consumer);
        this.shutdownHook = (Runnable) Preconditions.checkNotNull(runnable);
        this.allowUnalignedSourceSplits = z;
        this.splitFinishedHook = consumer2;
        this.fetchTask = new FetchTask<>(splitReader, futureCompletingBlockingQueue, collection -> {
            Map<String, SplitT> map = this.assignedSplits;
            map.getClass();
            collection.forEach((v1) -> {
                r1.remove(v1);
            });
            consumer2.accept(collection);
            LOG.info("Finished reading from splits {}", collection);
        }, i);
    }

    /* JADX WARN: Code restructure failed: missing block: B:17:0x0041, code lost:
    
        r5 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:18:0x0042, code lost:
    
        r4.errorHandler.accept(r5);
     */
    /* JADX WARN: Code restructure failed: missing block: B:19:0x004f, code lost:
    
        org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.LOG.info("Split fetcher {} exited.", java.lang.Integer.valueOf(r4.id));
        r4.shutdownHook.run();
     */
    @Override // java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            Method dump skipped, instructions count: 375
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run():void");
    }

    boolean runOnce() {
        this.lock.lock();
        try {
            if (this.closed) {
                return false;
            }
            SplitFetcherTask nextTaskUnsafe = getNextTaskUnsafe();
            if (nextTaskUnsafe == null) {
                return true;
            }
            LOG.debug("Prepare to run {}", nextTaskUnsafe);
            this.runningTask = nextTaskUnsafe;
            try {
                boolean run = nextTaskUnsafe.run();
                this.lock.lock();
                try {
                    this.runningTask = null;
                    processTaskResultUnsafe(nextTaskUnsafe, run);
                    return true;
                } finally {
                }
            } catch (Exception e) {
                throw new RuntimeException(String.format("SplitFetcher thread %d received unexpected exception while polling the records", Integer.valueOf(this.id)), e);
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void processTaskResultUnsafe(SplitFetcherTask splitFetcherTask, boolean z) {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        if (!z) {
            if (splitFetcherTask != this.fetchTask) {
                this.taskQueue.addFirst(splitFetcherTask);
                LOG.debug("Reenqueuing woken task {}", splitFetcherTask);
                return;
            }
            return;
        }
        LOG.debug("Finished running task {}", splitFetcherTask);
        if (this.assignedSplits.isEmpty() && this.taskQueue.isEmpty()) {
            this.elementsQueue.notifyAvailable();
        }
    }

    @Nullable
    private SplitFetcherTask getNextTaskUnsafe() {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        try {
            if (this.paused) {
                this.resumed.await();
                return null;
            }
            if (!this.taskQueue.isEmpty()) {
                return this.taskQueue.poll();
            }
            if (!this.assignedSplits.isEmpty()) {
                return this.fetchTask;
            }
            this.nonEmpty.await();
            return this.taskQueue.poll();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("The thread was interrupted while waiting for a fetcher task.");
        }
    }

    public void addSplits(List<SplitT> list) {
        this.lock.lock();
        try {
            enqueueTaskUnsafe(new AddSplitsTask(this.splitReader, list, this.assignedSplits));
            wakeUpUnsafe(true);
        } finally {
            this.lock.unlock();
        }
    }

    public void removeSplits(List<SplitT> list) {
        this.lock.lock();
        try {
            enqueueTaskUnsafe(new RemoveSplitsTask(this.splitReader, list, this.assignedSplits, this.splitFinishedHook));
            wakeUpUnsafe(true);
        } finally {
            this.lock.unlock();
        }
    }

    public void pauseOrResumeSplits(Collection<SplitT> collection, Collection<SplitT> collection2) {
        this.lock.lock();
        try {
            enqueueTaskUnsafe(new PauseOrResumeSplitsTask(this.splitReader, collection, collection2, this.allowUnalignedSourceSplits));
            wakeUpUnsafe(true);
        } finally {
            this.lock.unlock();
        }
    }

    public void enqueueTask(SplitFetcherTask splitFetcherTask) {
        this.lock.lock();
        try {
            enqueueTaskUnsafe(splitFetcherTask);
        } finally {
            this.lock.unlock();
        }
    }

    private void enqueueTaskUnsafe(SplitFetcherTask splitFetcherTask) {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        this.taskQueue.add(splitFetcherTask);
        this.nonEmpty.signal();
    }

    public SplitReader<E, SplitT> getSplitReader() {
        return this.splitReader;
    }

    public int fetcherId() {
        return this.id;
    }

    public void shutdown() {
        this.lock.lock();
        try {
            if (!this.closed) {
                this.closed = true;
                this.paused = false;
                LOG.info("Shutting down split fetcher {}", Integer.valueOf(this.id));
                wakeUpUnsafe(false);
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, SplitT> assignedSplits() {
        return this.assignedSplits;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isIdle() {
        boolean z;
        this.lock.lock();
        try {
            if (this.assignedSplits.isEmpty() && this.taskQueue.isEmpty()) {
                if (this.runningTask == null) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            this.lock.unlock();
        }
    }

    void wakeUp(boolean z) {
        this.lock.lock();
        try {
            wakeUpUnsafe(z);
        } finally {
            this.lock.unlock();
        }
    }

    private void wakeUpUnsafe(boolean z) {
        if (!$assertionsDisabled && !this.lock.isHeldByCurrentThread()) {
            throw new AssertionError();
        }
        SplitFetcherTask splitFetcherTask = this.runningTask;
        if (splitFetcherTask != null) {
            LOG.debug("Waking up running task {}", splitFetcherTask);
            splitFetcherTask.wakeUp();
        } else {
            if (z) {
                return;
            }
            LOG.debug("Waking up fetcher thread.");
            this.nonEmpty.signal();
            this.resumed.signal();
        }
    }

    public void pause() {
        this.lock.lock();
        try {
            this.paused = true;
        } finally {
            this.lock.unlock();
        }
    }

    public void resume() {
        this.lock.lock();
        try {
            this.paused = false;
            this.resumed.signal();
        } finally {
            this.lock.unlock();
        }
    }

    static {
        $assertionsDisabled = !SplitFetcher.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(SplitFetcher.class);
    }
}
