package org.apache.flink.runtime.blob;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.PermanentBlobCache;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/blob/BlobCacheCleanupTest.class */
class BlobCacheCleanupTest {

    @TempDir
    private Path tempDir;
    private final Random rnd = new Random();

    BlobCacheCleanupTest() {
    }

    @Test
    void testPermanentBlobCleanup() throws IOException, InterruptedException {
        JobID jobID = new JobID();
        ArrayList arrayList = new ArrayList();
        BlobServer blobServer = null;
        PermanentBlobService permanentBlobService = null;
        byte[] bArr = new byte[128];
        try {
            Configuration configuration = new Configuration();
            configuration.set(BlobServerOptions.CLEANUP_INTERVAL, 1L);
            blobServer = TestingBlobUtils.createServer(this.tempDir, configuration);
            blobServer.start();
            permanentBlobService = TestingBlobUtils.createPermanentCache(this.tempDir, configuration, blobServer);
            arrayList.add(blobServer.putPermanent(jobID, bArr));
            bArr[0] = (byte) (bArr[0] + 1);
            arrayList.add(blobServer.putPermanent(jobID, bArr));
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(0, jobID, permanentBlobService);
            permanentBlobService.registerJob(jobID);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(0, jobID, permanentBlobService);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                permanentBlobService.getFile(jobID, (PermanentBlobKey) it.next());
            }
            permanentBlobService.registerJob(jobID);
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                permanentBlobService.getFile(jobID, (PermanentBlobKey) it2.next());
            }
            Assertions.assertThat(TestingBlobHelpers.checkFilesExist(jobID, arrayList, permanentBlobService, true)).isEqualTo(2);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, permanentBlobService);
            permanentBlobService.releaseJob(jobID);
            Assertions.assertThat(TestingBlobHelpers.checkFilesExist(jobID, arrayList, permanentBlobService, true)).isEqualTo(2);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, permanentBlobService);
            permanentBlobService.releaseJob(jobID);
            verifyJobCleanup(permanentBlobService, jobID, arrayList);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            if (permanentBlobService != null) {
                permanentBlobService.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            TestingBlobHelpers.checkFileCountForJob(0, jobID, blobServer);
        } catch (Throwable th) {
            if (permanentBlobService != null) {
                permanentBlobService.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            TestingBlobHelpers.checkFileCountForJob(0, jobID, blobServer);
            throw th;
        }
    }

    @Test
    void testPermanentJobReferences() throws IOException {
        JobID jobID = new JobID();
        Configuration configuration = new Configuration();
        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, 3600000L);
        PermanentBlobCache createPermanentCache = TestingBlobUtils.createPermanentCache(this.tempDir, configuration, new InetSocketAddress("localhost", 12345));
        Throwable th = null;
        try {
            try {
                createPermanentCache.registerJob(jobID);
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).references).isOne();
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).keepUntil).isEqualTo(-1L);
                createPermanentCache.registerJob(jobID);
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).references).isEqualTo(2);
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).keepUntil).isEqualTo(-1L);
                createPermanentCache.releaseJob(jobID);
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).references).isOne();
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).keepUntil).isEqualTo(-1L);
                long currentTimeMillis = System.currentTimeMillis() + ((Long) configuration.get(BlobServerOptions.CLEANUP_INTERVAL)).longValue();
                createPermanentCache.releaseJob(jobID);
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).references).isZero();
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).keepUntil).isGreaterThanOrEqualTo(currentTimeMillis);
                createPermanentCache.registerJob(jobID);
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).references).isOne();
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).keepUntil).isEqualTo(-1L);
                long currentTimeMillis2 = System.currentTimeMillis() + ((Long) configuration.get(BlobServerOptions.CLEANUP_INTERVAL)).longValue();
                createPermanentCache.releaseJob(jobID);
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).references).isZero();
                Assertions.assertThat(((PermanentBlobCache.RefCount) createPermanentCache.getJobRefCounters().get(jobID)).keepUntil).isGreaterThanOrEqualTo(currentTimeMillis2);
                if (createPermanentCache != null) {
                    if (0 == 0) {
                        createPermanentCache.close();
                        return;
                    }
                    try {
                        createPermanentCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createPermanentCache != null) {
                if (th != null) {
                    try {
                        createPermanentCache.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createPermanentCache.close();
                }
            }
            throw th4;
        }
    }

    @Disabled("manual test due to stalling: ensures a BLOB is retained first and only deleted after the (long) timeout ")
    @Test
    void testPermanentBlobDeferredCleanup() throws IOException, InterruptedException {
        JobID jobID = new JobID();
        ArrayList arrayList = new ArrayList();
        BlobServer blobServer = null;
        PermanentBlobService permanentBlobService = null;
        byte[] bArr = new byte[128];
        try {
            Configuration configuration = new Configuration();
            configuration.set(BlobServerOptions.CLEANUP_INTERVAL, 5L);
            blobServer = new BlobServer(configuration, TempDirUtils.newFolder(this.tempDir), new VoidBlobStore());
            blobServer.start();
            BlobCacheSizeTracker blobCacheSizeTracker = new BlobCacheSizeTracker(MemorySize.ofMebiBytes(100L).getBytes());
            permanentBlobService = TestingBlobUtils.createPermanentCache(this.tempDir, configuration, blobServer, blobCacheSizeTracker);
            arrayList.add(blobServer.putPermanent(jobID, bArr));
            bArr[0] = (byte) (bArr[0] + 1);
            arrayList.add(blobServer.putPermanent(jobID, bArr));
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(0, jobID, permanentBlobService);
            checkBlobCacheSizeTracker(blobCacheSizeTracker, jobID, 0);
            permanentBlobService.registerJob(jobID);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(0, jobID, permanentBlobService);
            checkBlobCacheSizeTracker(blobCacheSizeTracker, jobID, 0);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                permanentBlobService.readFile(jobID, (PermanentBlobKey) it.next());
            }
            permanentBlobService.registerJob(jobID);
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                permanentBlobService.readFile(jobID, (PermanentBlobKey) it2.next());
            }
            Assertions.assertThat(TestingBlobHelpers.checkFilesExist(jobID, arrayList, permanentBlobService, true)).isEqualTo(2);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, permanentBlobService);
            checkBlobCacheSizeTracker(blobCacheSizeTracker, jobID, 2);
            permanentBlobService.releaseJob(jobID);
            Assertions.assertThat(TestingBlobHelpers.checkFilesExist(jobID, arrayList, permanentBlobService, true)).isEqualTo(2);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, permanentBlobService);
            checkBlobCacheSizeTracker(blobCacheSizeTracker, jobID, 2);
            permanentBlobService.releaseJob(jobID);
            Assertions.assertThat(TestingBlobHelpers.checkFilesExist(jobID, arrayList, permanentBlobService, true)).isEqualTo(2);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, permanentBlobService);
            Thread.sleep(5 / 5);
            Assertions.assertThat(TestingBlobHelpers.checkFilesExist(jobID, arrayList, permanentBlobService, true)).isEqualTo(2);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, permanentBlobService);
            Thread.sleep((5 * 4) / 5);
            verifyJobCleanup(permanentBlobService, jobID, arrayList);
            checkBlobCacheSizeTracker(blobCacheSizeTracker, jobID, 0);
            TestingBlobHelpers.checkFileCountForJob(2, jobID, blobServer);
            if (permanentBlobService != null) {
                permanentBlobService.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            TestingBlobHelpers.checkFileCountForJob(0, jobID, blobServer);
        } catch (Throwable th) {
            if (permanentBlobService != null) {
                permanentBlobService.close();
            }
            if (blobServer != null) {
                blobServer.close();
            }
            TestingBlobHelpers.checkFileCountForJob(0, jobID, blobServer);
            throw th;
        }
    }

    @Test
    void testTransientBlobNoJobCleanup() throws Exception {
        testTransientBlobCleanup(null);
    }

    @Test
    void testTransientBlobForJobCleanup() throws Exception {
        testTransientBlobCleanup(new JobID());
    }

    /* JADX WARN: Failed to calculate best type for var: r20v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Failed to calculate best type for var: r21v0 ??
    java.lang.NullPointerException
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 20, insn: 0x0230: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r20 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:66:0x0230 */
    /* JADX WARN: Not initialized variable reg: 21, insn: 0x0235: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r21 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:68:0x0235 */
    /* JADX WARN: Type inference failed for: r20v0, types: [org.apache.flink.runtime.blob.BlobServer] */
    /* JADX WARN: Type inference failed for: r21v0, types: [java.lang.Throwable] */
    private void testTransientBlobCleanup(@Nullable JobID jobID) throws Exception {
        ArrayList arrayList = new ArrayList(3);
        byte[] bArr = new byte[2000000];
        this.rnd.nextBytes(bArr);
        byte[] copyOfRange = Arrays.copyOfRange(bArr, 10, 54);
        Configuration configuration = new Configuration();
        configuration.set(BlobServerOptions.CLEANUP_INTERVAL, 1L);
        Tuple2<BlobServer, BlobCacheService> createServerAndCache = TestingBlobUtils.createServerAndCache(this.tempDir, configuration);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            try {
                BlobServer blobServer = (BlobServer) createServerAndCache.f0;
                Throwable th = null;
                BlobCacheService blobCacheService = (BlobCacheService) createServerAndCache.f1;
                Throwable th2 = null;
                try {
                    try {
                        ConcurrentMap blobExpiryTimes = blobCacheService.getTransientBlobService().getBlobExpiryTimes();
                        blobServer.start();
                        BlobKey blobKey = (TransientBlobKey) BlobServerPutTest.put((BlobService) blobServer, jobID, bArr, BlobKey.BlobType.TRANSIENT_BLOB);
                        BlobKey blobKey2 = (TransientBlobKey) BlobServerPutTest.put((BlobService) blobServer, jobID, copyOfRange, BlobKey.BlobType.TRANSIENT_BLOB);
                        long currentTimeMillis = System.currentTimeMillis() + 1;
                        BlobServerPutTest.verifyContents((BlobService) blobCacheService, jobID, blobKey, bArr);
                        Long l = (Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey));
                        Assertions.assertThat(l).isGreaterThanOrEqualTo(currentTimeMillis);
                        Assertions.assertThat((Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey2))).isNull();
                        Thread.sleep(1L);
                        long currentTimeMillis2 = System.currentTimeMillis() + 1;
                        BlobServerPutTest.verifyContents((BlobService) blobCacheService, jobID, blobKey2, copyOfRange);
                        Assertions.assertThat(l).isEqualTo(blobExpiryTimes.get(Tuple2.of(jobID, blobKey)));
                        Assertions.assertThat((Long) blobExpiryTimes.get(Tuple2.of(jobID, blobKey2))).isGreaterThanOrEqualTo(currentTimeMillis2);
                        if (jobID != null) {
                            blobServer.globalCleanupAsync(jobID, newSingleThreadExecutor).join();
                        } else {
                            blobServer.deleteFromCache(blobKey);
                            blobServer.deleteFromCache(blobKey2);
                        }
                        TestingBlobHelpers.checkFileCountForJob(0, jobID, blobServer);
                        long currentTimeMillis3 = System.currentTimeMillis() + (3 * 1);
                        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3);
                        for (int i = 0; i < 3; i++) {
                            arrayList.add(CompletableFuture.supplyAsync(() -> {
                                while (System.currentTimeMillis() < currentTimeMillis3) {
                                    try {
                                        BlobServerGetTest.get(blobCacheService, jobID, blobKey);
                                    } catch (IOException e) {
                                        throw new CompletionException((Throwable) new FlinkException("Could not retrieve blob.", e));
                                    }
                                }
                                return null;
                            }, newFixedThreadPool));
                        }
                        FutureUtils.combineAll(arrayList).get();
                        BlobCachePutTest.verifyDeletedEventually(blobServer, jobID, blobKey, blobKey2);
                        if (blobCacheService != null) {
                            if (0 != 0) {
                                try {
                                    blobCacheService.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                blobCacheService.close();
                            }
                        }
                        if (blobServer != null) {
                            if (0 != 0) {
                                try {
                                    blobServer.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                blobServer.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (blobCacheService != null) {
                        if (th2 != null) {
                            try {
                                blobCacheService.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            blobCacheService.close();
                        }
                    }
                    throw th5;
                }
            } finally {
                Assertions.assertThat(newSingleThreadExecutor.shutdownNow()).isEmpty();
            }
        } finally {
        }
    }

    static void verifyJobCleanup(PermanentBlobCache permanentBlobCache, JobID jobID, List<? extends BlobKey> list) throws InterruptedException, IOException {
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        do {
            Thread.sleep(100L);
            if (TestingBlobHelpers.checkFilesExist(jobID, list, permanentBlobCache, false) == 0) {
                break;
            }
        } while (System.currentTimeMillis() < currentTimeMillis);
        TestingBlobHelpers.checkFileCountForJob(0, jobID, permanentBlobCache);
    }

    private static void checkBlobCacheSizeTracker(BlobCacheSizeTracker blobCacheSizeTracker, JobID jobID, int i) {
        Assertions.assertThat(blobCacheSizeTracker.getBlobKeysByJobId(jobID)).hasSize(i);
    }
}
