package org.apache.flink.connector.file.table.stream;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.connector.file.table.stream.PartitionCommitPredicate;
import org.apache.flink.util.StringUtils;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/table/stream/PartitionTimeCommitTrigger.class */
public class PartitionTimeCommitTrigger implements PartitionCommitTrigger {
    private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>("pending-partitions", new ListSerializer(StringSerializer.INSTANCE));
    private static final ListStateDescriptor<Map<Long, Long>> WATERMARKS_STATE_DESC = new ListStateDescriptor<>("checkpoint-id-to-watermark", new MapSerializer(LongSerializer.INSTANCE, LongSerializer.INSTANCE));
    private final ListState<List<String>> pendingPartitionsState;
    private final Set<String> pendingPartitions = new HashSet();
    private final ListState<Map<Long, Long>> watermarksState;
    private final TreeMap<Long, Long> watermarks;
    private final PartitionCommitPredicate partitionCommitPredicate;

    public PartitionTimeCommitTrigger(boolean z, OperatorStateStore operatorStateStore, PartitionCommitPredicate partitionCommitPredicate) throws Exception {
        this.pendingPartitionsState = operatorStateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        if (z) {
            this.pendingPartitions.addAll((Collection) ((Iterable) this.pendingPartitionsState.get()).iterator().next());
        }
        this.partitionCommitPredicate = partitionCommitPredicate;
        this.watermarksState = operatorStateStore.getListState(WATERMARKS_STATE_DESC);
        this.watermarks = new TreeMap<>();
        if (z) {
            this.watermarks.putAll((Map) ((Iterable) this.watermarksState.get()).iterator().next());
        }
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public void addPartition(String str) {
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            return;
        }
        this.pendingPartitions.add(str);
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public List<String> committablePartitions(long j) {
        if (!this.watermarks.containsKey(Long.valueOf(j))) {
            throw new IllegalArgumentException(String.format("Checkpoint(%d) has not been snapshot. The watermark information is: %s.", Long.valueOf(j), this.watermarks));
        }
        long longValue = this.watermarks.get(Long.valueOf(j)).longValue();
        this.watermarks.headMap(Long.valueOf(j), true).clear();
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.pendingPartitions.iterator();
        while (it.hasNext()) {
            String next = it.next();
            if (this.partitionCommitPredicate.isPartitionCommittable(createPredicateContext(next, longValue))) {
                arrayList.add(next);
                it.remove();
            }
        }
        return arrayList;
    }

    private PartitionCommitPredicate.PredicateContext createPredicateContext(final String str, final long j) {
        return new PartitionCommitPredicate.PredicateContext() { // from class: org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.1
            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public String partition() {
                return str;
            }

            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public long createProcTime() {
                throw new UnsupportedOperationException("Method createProcTime isn't supported in PartitionTimeCommitTrigger.");
            }

            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public long currentProcTime() {
                throw new UnsupportedOperationException("Method currentProcTime isn't supported in PartitionTimeCommitTrigger.");
            }

            @Override // org.apache.flink.connector.file.table.stream.PartitionCommitPredicate.PredicateContext
            public long currentWatermark() {
                return j;
            }
        };
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public void snapshotState(long j, long j2) throws Exception {
        this.pendingPartitionsState.update(Collections.singletonList(new ArrayList(this.pendingPartitions)));
        this.watermarks.put(Long.valueOf(j), Long.valueOf(j2));
        this.watermarksState.update(Collections.singletonList(new HashMap(this.watermarks)));
    }

    @Override // org.apache.flink.connector.file.table.stream.PartitionCommitTrigger
    public List<String> endInput() {
        ArrayList arrayList = new ArrayList(this.pendingPartitions);
        this.pendingPartitions.clear();
        return arrayList;
    }
}
