package org.apache.flink.contrib.streaming.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.CheckedSupplier;

/* loaded from: input_file:org/apache/flink/contrib/streaming/state/RocksDBStateUploader.class */
public class RocksDBStateUploader implements Closeable {
    private static final int READ_BUFFER_SIZE = 16384;
    private final RocksDBStateDataTransferHelper transfer;

    @VisibleForTesting
    public RocksDBStateUploader(int i) {
        this(RocksDBStateDataTransferHelper.forThreadNum(i));
    }

    public RocksDBStateUploader(RocksDBStateDataTransferHelper rocksDBStateDataTransferHelper) {
        this.transfer = rocksDBStateDataTransferHelper;
    }

    public List<IncrementalKeyedStateHandle.HandleAndLocalPath> uploadFilesToCheckpointFs(@Nonnull List<Path> list, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws Exception {
        List<CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath>> createUploadFutures = createUploadFutures(list, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
        ArrayList arrayList = new ArrayList(list.size());
        try {
            FutureUtils.waitForAll(createUploadFutures).get();
            Iterator<CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath>> it = createUploadFutures.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().get());
            }
            return arrayList;
        } catch (ExecutionException e) {
            Throwable stripException = ExceptionUtils.stripException(ExceptionUtils.stripExecutionException(e), RuntimeException.class);
            if (stripException instanceof IOException) {
                throw ((IOException) stripException);
            }
            throw new FlinkRuntimeException("Failed to upload data for state handles.", e);
        }
    }

    private List<CompletableFuture<IncrementalKeyedStateHandle.HandleAndLocalPath>> createUploadFutures(List<Path> list, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) {
        return (List) list.stream().map(path -> {
            return CompletableFuture.supplyAsync(CheckedSupplier.unchecked(() -> {
                return uploadLocalFileToCheckpointFs(path, checkpointStreamFactory, checkpointedStateScope, closeableRegistry, closeableRegistry2);
            }), this.transfer.getExecutorService());
        }).collect(Collectors.toList());
    }

    private IncrementalKeyedStateHandle.HandleAndLocalPath uploadLocalFileToCheckpointFs(Path path, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope checkpointedStateScope, CloseableRegistry closeableRegistry, CloseableRegistry closeableRegistry2) throws IOException {
        StreamStateHandle streamStateHandle;
        InputStream inputStream = null;
        AutoCloseable autoCloseable = null;
        try {
            byte[] bArr = new byte[READ_BUFFER_SIZE];
            inputStream = Files.newInputStream(path, new OpenOption[0]);
            closeableRegistry.registerCloseable(inputStream);
            autoCloseable = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);
            closeableRegistry.registerCloseable(autoCloseable);
            while (true) {
                int read = inputStream.read(bArr);
                if (read == -1) {
                    break;
                }
                autoCloseable.write(bArr, 0, read);
            }
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                streamStateHandle = autoCloseable.closeAndGetHandle();
                autoCloseable = null;
            } else {
                streamStateHandle = null;
            }
            StreamStateHandle streamStateHandle2 = streamStateHandle;
            closeableRegistry2.registerCloseable(() -> {
                StateUtil.discardStateObjectQuietly(streamStateHandle2);
            });
            IncrementalKeyedStateHandle.HandleAndLocalPath of = IncrementalKeyedStateHandle.HandleAndLocalPath.of(streamStateHandle, path.getFileName().toString());
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                IOUtils.closeQuietly(autoCloseable);
            }
            return of;
        } catch (Throwable th) {
            if (closeableRegistry.unregisterCloseable(inputStream)) {
                IOUtils.closeQuietly(inputStream);
            }
            if (closeableRegistry.unregisterCloseable(autoCloseable)) {
                IOUtils.closeQuietly(autoCloseable);
            }
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.transfer.close();
    }
}
