package org.apache.flink.state.changelog.restore;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.state.changelog.ChangelogKeyGroupedPriorityQueue;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.ChangelogStateFactory;
import org.apache.flink.state.changelog.KvStateChangeLogger;
import org.apache.flink.state.changelog.StateChangeLogger;
import org.apache.flink.util.function.ThrowingConsumer;

/* loaded from: input_file:org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget.class */
public class ChangelogMigrationRestoreTarget<K> implements ChangelogRestoreTarget<K> {
    private final AbstractKeyedStateBackend<K> keyedStateBackend;
    private final ChangelogStateFactory changelogStateFactory;
    private final FunctionDelegationHelper functionDelegationHelper = new FunctionDelegationHelper();

    /* loaded from: input_file:org/apache/flink/state/changelog/restore/ChangelogMigrationRestoreTarget$VoidStateChangeLogger.class */
    private static class VoidStateChangeLogger<Value, Namespace> implements KvStateChangeLogger<Value, Namespace>, StateChangeLogger<Value, Namespace> {
        private static final VoidStateChangeLogger<Object, Object> INSTANCE = new VoidStateChangeLogger<>();

        public static <Value, Namespace> VoidStateChangeLogger<Value, Namespace> getInstance() {
            return (VoidStateChangeLogger<Value, Namespace>) INSTANCE;
        }

        private VoidStateChangeLogger() {
        }

        @Override // org.apache.flink.state.changelog.KvStateChangeLogger
        public void namespacesMerged(Namespace namespace, Collection<Namespace> collection) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void valueUpdated(Value value, Namespace namespace) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void valueUpdatedInternal(Value value, Namespace namespace) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void valueAdded(Value value, Namespace namespace) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void valueCleared(Namespace namespace) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void valueElementAdded(ThrowingConsumer<DataOutputView, IOException> throwingConsumer, Namespace namespace) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void valueElementAddedOrUpdated(ThrowingConsumer<DataOutputView, IOException> throwingConsumer, Namespace namespace) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void valueElementRemoved(ThrowingConsumer<DataOutputView, IOException> throwingConsumer, Namespace namespace) throws IOException {
        }

        @Override // org.apache.flink.state.changelog.StateChangeLogger
        public void resetWritingMetaFlag() {
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    public ChangelogMigrationRestoreTarget(AbstractKeyedStateBackend<K> abstractKeyedStateBackend, ChangelogStateFactory changelogStateFactory) {
        this.keyedStateBackend = abstractKeyedStateBackend;
        this.changelogStateFactory = changelogStateFactory;
    }

    @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
    public KeyGroupRange getKeyGroupRange() {
        return this.keyedStateBackend.getKeyGroupRange();
    }

    @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
    public <N, S extends State, V> S createKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
        InternalKvState<K, N, V> internalKvState = (InternalKvState) this.keyedStateBackend.createOrUpdateInternalState(typeSerializer, stateDescriptor, StateSnapshotTransformer.StateSnapshotTransformFactory.noTransform(), true);
        ChangelogState existingState = this.changelogStateFactory.getExistingState(stateDescriptor.getName(), StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
        if (existingState == null) {
            existingState = this.changelogStateFactory.create((StateDescriptor) stateDescriptor, (InternalKvState) internalKvState, (KvStateChangeLogger) VoidStateChangeLogger.getInstance(), (InternalKeyContext) this.keyedStateBackend);
        } else {
            existingState.setDelegatedState(internalKvState);
        }
        this.functionDelegationHelper.addOrUpdate(stateDescriptor);
        return (S) existingState;
    }

    @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> createPqState(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        KeyGroupedInternalPriorityQueue create = this.keyedStateBackend.create(str, typeSerializer, true);
        ChangelogKeyGroupedPriorityQueue changelogKeyGroupedPriorityQueue = (ChangelogKeyGroupedPriorityQueue) this.changelogStateFactory.getExistingState(str, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
        if (changelogKeyGroupedPriorityQueue == null) {
            changelogKeyGroupedPriorityQueue = this.changelogStateFactory.create(str, create, VoidStateChangeLogger.getInstance(), typeSerializer);
        } else {
            changelogKeyGroupedPriorityQueue.setDelegatedState(create);
        }
        return changelogKeyGroupedPriorityQueue;
    }

    @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
    public ChangelogState getExistingState(String str, StateMetaInfoSnapshot.BackendStateType backendStateType) {
        return this.changelogStateFactory.getExistingState(str, backendStateType);
    }

    @Override // org.apache.flink.state.changelog.restore.ChangelogRestoreTarget
    public CheckpointableKeyedStateBackend<K> getRestoredKeyedStateBackend() {
        return wrapKeyedStateBackend(this.keyedStateBackend, this.changelogStateFactory, this.functionDelegationHelper);
    }

    private static <K> AbstractKeyedStateBackend<K> wrapKeyedStateBackend(final AbstractKeyedStateBackend<K> abstractKeyedStateBackend, final ChangelogStateFactory changelogStateFactory, final FunctionDelegationHelper functionDelegationHelper) {
        return new AbstractKeyedStateBackend<K>(abstractKeyedStateBackend) { // from class: org.apache.flink.state.changelog.restore.ChangelogMigrationRestoreTarget.1
            public void setCurrentKey(K k) {
                abstractKeyedStateBackend.setCurrentKey(k);
            }

            public void notifyCheckpointComplete(long j) throws Exception {
                abstractKeyedStateBackend.notifyCheckpointComplete(j);
            }

            @Nonnull
            public SavepointResources<K> savepoint() throws Exception {
                return abstractKeyedStateBackend.savepoint();
            }

            public int numKeyValueStateEntries() {
                return abstractKeyedStateBackend.numKeyValueStateEntries();
            }

            public <N> Stream<K> getKeys(String str, N n) {
                return abstractKeyedStateBackend.getKeys(str, n);
            }

            public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String str) {
                return abstractKeyedStateBackend.getKeysAndNamespaces(str);
            }

            /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;SEV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/StateSnapshotTransformer$StateSnapshotTransformFactory<TSEV;>;)TIS; */
            @Nonnull
            public State createOrUpdateInternalState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory stateSnapshotTransformFactory) throws Exception {
                return abstractKeyedStateBackend.createOrUpdateInternalState(typeSerializer, stateDescriptor, stateSnapshotTransformFactory);
            }

            public <N, S extends State> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
                S s = (S) super.getPartitionedState(n, typeSerializer, stateDescriptor);
                functionDelegationHelper.addOrUpdate(stateDescriptor);
                return s;
            }

            public <N, S extends State, V> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
                S s = (S) super.getOrCreateKeyedState(typeSerializer, stateDescriptor);
                functionDelegationHelper.addOrUpdate(stateDescriptor);
                return s;
            }

            @Nonnull
            public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
                return abstractKeyedStateBackend.create(str, typeSerializer);
            }

            @Nonnull
            public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                return abstractKeyedStateBackend.snapshot(j, j2, checkpointStreamFactory, checkpointOptions);
            }

            public void dispose() {
                abstractKeyedStateBackend.dispose();
                changelogStateFactory.dispose();
            }
        };
    }
}
