package org.apache.flink.connector.file.src.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SupportsHandleExecutionAttemptSourceEvent;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.PendingSplitsCheckpoint;
import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
import org.apache.flink.connector.file.src.enumerate.DynamicFileEnumerator;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/src/impl/DynamicFileSplitEnumerator.class */
public class DynamicFileSplitEnumerator<SplitT extends FileSourceSplit> implements SplitEnumerator<SplitT, PendingSplitsCheckpoint<SplitT>>, SupportsHandleExecutionAttemptSourceEvent {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicFileSplitEnumerator.class);
    private final SplitEnumeratorContext<SplitT> context;
    private final DynamicFileEnumerator.Provider fileEnumeratorFactory;
    private final FileSplitAssigner.Provider splitAssignerFactory;
    private final Set<String> assignedSplits = new HashSet();
    private transient Set<String> allEnumeratingSplits;
    private transient FileSplitAssigner splitAssigner;

    public DynamicFileSplitEnumerator(SplitEnumeratorContext<SplitT> splitEnumeratorContext, DynamicFileEnumerator.Provider provider, FileSplitAssigner.Provider provider2) {
        this.context = (SplitEnumeratorContext) Preconditions.checkNotNull(splitEnumeratorContext);
        this.splitAssignerFactory = (FileSplitAssigner.Provider) Preconditions.checkNotNull(provider2);
        this.fileEnumeratorFactory = (DynamicFileEnumerator.Provider) Preconditions.checkNotNull(provider);
    }

    public void start() {
    }

    public void close() throws IOException {
    }

    public void addReader(int i) {
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        if (this.context.registeredReaders().containsKey(Integer.valueOf(i))) {
            if (this.splitAssigner == null) {
                createSplitAssigner(null);
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} {} is requesting a file source split", Integer.valueOf(i), str == null ? "(no host locality info)" : "(on host '" + str + "')");
            }
            Optional<FileSourceSplit> nextUnassignedSplit = getNextUnassignedSplit(str);
            if (!nextUnassignedSplit.isPresent()) {
                this.context.signalNoMoreSplits(i);
                LOG.info("No more splits available for subtask {}", Integer.valueOf(i));
            } else {
                FileSourceSplit fileSourceSplit = nextUnassignedSplit.get();
                this.context.assignSplit(fileSourceSplit, i);
                this.assignedSplits.add(fileSourceSplit.splitId());
                LOG.debug("Assigned split to subtask {} : {}", Integer.valueOf(i), fileSourceSplit);
            }
        }
    }

    private Optional<FileSourceSplit> getNextUnassignedSplit(String str) {
        Optional<FileSourceSplit> next = this.splitAssigner.getNext(str);
        while (true) {
            Optional<FileSourceSplit> optional = next;
            if (!optional.isPresent()) {
                return optional;
            }
            if (!this.assignedSplits.contains(optional.get().splitId())) {
                return optional;
            }
            next = this.splitAssigner.getNext(str);
        }
    }

    public void handleSourceEvent(int i, SourceEvent sourceEvent) {
        if (!(sourceEvent instanceof DynamicFilteringEvent)) {
            LOG.error("Received unrecognized event: {}", sourceEvent);
        } else {
            LOG.warn("Received DynamicFilteringEvent: {}", Integer.valueOf(i));
            createSplitAssigner(((DynamicFilteringEvent) sourceEvent).getData());
        }
    }

    private void createSplitAssigner(@Nullable DynamicFilteringData dynamicFilteringData) {
        DynamicFileEnumerator create = this.fileEnumeratorFactory.create();
        if (dynamicFilteringData != null) {
            create.setDynamicFilteringData(dynamicFilteringData);
        }
        try {
            Collection<FileSourceSplit> enumerateSplits = create.enumerateSplits(new Path[1], this.context.currentParallelism());
            this.allEnumeratingSplits = (Set) enumerateSplits.stream().map((v0) -> {
                return v0.splitId();
            }).collect(Collectors.toSet());
            this.splitAssigner = this.splitAssignerFactory.create(enumerateSplits);
        } catch (IOException e) {
            throw new FlinkRuntimeException("Could not enumerate file splits", e);
        }
    }

    public void addSplitsBack(List<SplitT> list, int i) {
        LOG.debug("Dynamic File Source Enumerator adds splits back: {}", list);
        if (this.splitAssigner != null) {
            ArrayList arrayList = new ArrayList(list);
            arrayList.removeIf(fileSourceSplit -> {
                return !this.allEnumeratingSplits.contains(fileSourceSplit.splitId());
            });
            arrayList.forEach(fileSourceSplit2 -> {
                this.assignedSplits.remove(fileSourceSplit2.splitId());
            });
            this.splitAssigner.addSplits(arrayList);
        }
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public PendingSplitsCheckpoint<SplitT> m43snapshotState(long j) {
        throw new UnsupportedOperationException("DynamicFileSplitEnumerator only supports batch execution.");
    }

    public void handleSourceEvent(int i, int i2, SourceEvent sourceEvent) {
        handleSourceEvent(i, sourceEvent);
    }
}
