package org.apache.flink.state.changelog;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/state/changelog/ChangelogStateBackend.class */
public class ChangelogStateBackend implements DelegatingStateBackend, ConfigurableStateBackend {
    private static final long serialVersionUID = 1000;
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogStateBackend.class);
    private final StateBackend delegatedStateBackend;

    ChangelogStateBackend(StateBackend stateBackend) {
        this.delegatedStateBackend = (StateBackend) Preconditions.checkNotNull(stateBackend);
        Preconditions.checkArgument(!(stateBackend instanceof DelegatingStateBackend), "Recursive Delegation is not supported.");
        LOG.info("ChangelogStateBackend is used, delegating {}.", this.delegatedStateBackend.getClass().getSimpleName());
    }

    public <K> ChangelogKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
        return restore(environment, str, keyGroupRange, ttlTimeProvider, collection, collection2 -> {
            return this.delegatedStateBackend.createKeyedStateBackend(environment, jobID, str, typeSerializer, i, keyGroupRange, taskKvStateRegistry, ttlTimeProvider, metricGroup, collection2, closeableRegistry);
        });
    }

    public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry, double d) throws Exception {
        return restore(environment, str, keyGroupRange, ttlTimeProvider, collection, collection2 -> {
            return this.delegatedStateBackend.createKeyedStateBackend(environment, jobID, str, typeSerializer, i, keyGroupRange, taskKvStateRegistry, ttlTimeProvider, metricGroup, collection2, closeableRegistry, d);
        });
    }

    public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
        return this.delegatedStateBackend.createOperatorStateBackend(environment, str, collection, closeableRegistry);
    }

    public boolean useManagedMemory() {
        return this.delegatedStateBackend.useManagedMemory();
    }

    public StateBackend getDelegatedStateBackend() {
        return this.delegatedStateBackend;
    }

    public StateBackend configure(ReadableConfig readableConfig, ClassLoader classLoader) throws IllegalConfigurationException {
        return this.delegatedStateBackend instanceof ConfigurableStateBackend ? new ChangelogStateBackend(this.delegatedStateBackend.configure(readableConfig, classLoader)) : this;
    }

    private <K> ChangelogKeyedStateBackend<K> restore(Environment environment, String str, KeyGroupRange keyGroupRange, TtlTimeProvider ttlTimeProvider, Collection<KeyedStateHandle> collection, ChangelogBackendRestoreOperation.BaseBackendBuilder<K> baseBackendBuilder) throws Exception {
        StateChangelogStorage stateChangelogStorage = (StateChangelogStorage) Preconditions.checkNotNull(environment.getTaskStateManager().getStateChangelogStorage(), "Changelog storage is null when creating and restoring the ChangelogKeyedStateBackend.");
        return ChangelogBackendRestoreOperation.restore(stateChangelogStorage.createReader(), environment.getUserCodeClassLoader().asClassLoader(), castHandles(collection), baseBackendBuilder, (abstractKeyedStateBackend, collection2) -> {
            return new ChangelogKeyedStateBackend(abstractKeyedStateBackend, environment.getExecutionConfig(), ttlTimeProvider, stateChangelogStorage.createWriter(str, keyGroupRange), collection2, environment.getMainMailboxExecutor(), environment.getAsyncOperationsThreadPool());
        });
    }

    private Collection<ChangelogStateBackendHandle> castHandles(Collection<KeyedStateHandle> collection) {
        if (collection.stream().anyMatch(keyedStateHandle -> {
            return !(keyedStateHandle instanceof ChangelogStateBackendHandle);
        })) {
            LOG.warn("Some state handles do not contain changelog: {} (ok if recovery from a savepoint)", collection);
        }
        return (Collection) collection.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).map(keyedStateHandle2 -> {
            return keyedStateHandle2 instanceof ChangelogStateBackendHandle ? (ChangelogStateBackendHandle) keyedStateHandle2 : new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.singletonList(keyedStateHandle2), Collections.emptyList(), keyedStateHandle2.getKeyGroupRange());
        }).collect(Collectors.toList());
    }

    /* renamed from: createKeyedStateBackend, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ CheckpointableKeyedStateBackend m5createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection collection, CloseableRegistry closeableRegistry) throws Exception {
        return createKeyedStateBackend(environment, jobID, str, typeSerializer, i, keyGroupRange, taskKvStateRegistry, ttlTimeProvider, metricGroup, (Collection<KeyedStateHandle>) collection, closeableRegistry);
    }
}
