package org.apache.hadoop.tools.fedbalance;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceJob;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/hadoop/tools/fedbalance/TestDistCpProcedure.class */
public class TestDistCpProcedure {
    private static MiniDFSCluster cluster;
    private static Configuration conf;
    static final String MOUNT = "mock_mount_point";
    private static final String SRCDAT = "srcdat";
    private static final String DSTDAT = "dstdat";
    private static final long BLOCK_SIZE = 1024;
    private static final long FILE_SIZE = 102400;
    private static String nnUri;
    private FileEntry[] srcfiles = {new FileEntry(SRCDAT, true), new FileEntry("srcdat/a", false), new FileEntry("srcdat/b", true), new FileEntry("srcdat/b/c", false)};

    @Rule
    public Timeout globalTimeout = new Timeout(180000, TimeUnit.MILLISECONDS);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hadoop/tools/fedbalance/TestDistCpProcedure$Call.class */
    public interface Call {
        void execute() throws IOException, BalanceProcedure.RetryException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/tools/fedbalance/TestDistCpProcedure$FileEntry.class */
    public static class FileEntry {
        private String path;
        private boolean isDir;

        FileEntry(String str, boolean z) {
            this.path = str;
            this.isDir = z;
        }

        String getPath() {
            return this.path;
        }

        boolean isDirectory() {
            return this.isDir;
        }
    }

    @BeforeClass
    public static void beforeClass() throws IOException {
        DistCpProcedure.enableForTest();
        conf = new Configuration();
        conf.setLong("dfs.namenode.fs-limits.min-block-size", BLOCK_SIZE);
        conf.setLong("dfs.blocksize", BLOCK_SIZE);
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
        cluster.waitActive();
        conf.set("hdfs.fedbalance.procedure.scheduler.journal.uri", "hdfs://" + cluster.getNameNode().getHostAndPort() + "/procedure");
        nnUri = FileSystem.getDefaultUri(conf).toString();
    }

    @AfterClass
    public static void afterClass() {
        DistCpProcedure.disableForTest();
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    @Test
    public void testSuccessfulDistCpProcedure() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        Path path = new Path(str, SRCDAT);
        Path path2 = new Path(str, DSTDAT);
        FsPermission fsPermission = new FsPermission(777);
        distributedFileSystem.setPermission(path, fsPermission);
        FedBalanceContext buildContext = buildContext(path, path2, MOUNT);
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext);
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(conf);
        balanceProcedureScheduler.init(true);
        BalanceJob build = new BalanceJob.Builder().nextProcedure(distCpProcedure).build();
        balanceProcedureScheduler.submit(build);
        balanceProcedureScheduler.waitUntilDone(build);
        TestCase.assertTrue(build.isJobDone());
        if (build.getError() != null) {
            throw build.getError();
        }
        Assert.assertNull(build.getError());
        TestCase.assertTrue(distributedFileSystem.exists(path2));
        Assert.assertFalse(distributedFileSystem.exists(new Path(buildContext.getSrc(), ".snapshot")));
        Assert.assertFalse(distributedFileSystem.exists(new Path(buildContext.getDst(), ".snapshot")));
        Assert.assertEquals(fsPermission, distributedFileSystem.getFileStatus(path2).getPermission());
        Assert.assertEquals(0L, distributedFileSystem.getFileStatus(path).getPermission().toShort());
        for (FileEntry fileEntry : this.srcfiles) {
            if (!fileEntry.isDir) {
                Assert.assertEquals(FILE_SIZE, distributedFileSystem.getFileStatus(new Path(str, fileEntry.path.replace(SRCDAT, DSTDAT))).getLen());
            }
        }
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testInitDistCp() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        Path path = new Path(str, SRCDAT);
        Path path2 = new Path(str, DSTDAT);
        distributedFileSystem.setPermission(path, FsPermission.createImmutable((short) 16));
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext(path, path2, MOUNT));
        try {
            distCpProcedure.initDistCp();
        } catch (BalanceProcedure.RetryException e) {
        }
        distributedFileSystem.delete(new Path(path, "a"), true);
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> {
            distCpProcedure.initDistCp();
        });
        TestCase.assertTrue(distributedFileSystem.exists(path2));
        TestCase.assertTrue(distributedFileSystem.exists(new Path(path2, "a")));
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testDiffThreshold() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        Path path = new Path(str, SRCDAT);
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext(path, new Path(str, DSTDAT), MOUNT, 10));
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> {
            distCpProcedure.initDistCp();
        });
        Path path2 = new Path(path, "a");
        for (int i = 0; i < 5; i++) {
            Path path3 = new Path(path, "a-" + i);
            distributedFileSystem.rename(path2, path3);
            path2 = path3;
            TestCase.assertTrue(distCpProcedure.diffDistCpStageDone());
            executeProcedure(distCpProcedure, DistCpProcedure.Stage.DISABLE_WRITE, () -> {
                distCpProcedure.diffDistCp();
            });
        }
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testDiffDistCp() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        Path path = new Path(str, SRCDAT);
        Path path2 = new Path(str, DSTDAT);
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext(path, path2, MOUNT));
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> {
            distCpProcedure.initDistCp();
        });
        TestCase.assertTrue(distributedFileSystem.exists(path2));
        distributedFileSystem.rename(new Path(path, "a"), new Path("/a"));
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.FINISH, () -> {
            distCpProcedure.finalDistCp();
        });
        Assert.assertFalse(distributedFileSystem.exists(new Path(path2, "a")));
        distributedFileSystem.rename(new Path("/a"), new Path(path, "a"));
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.FINISH, () -> {
            distCpProcedure.finalDistCp();
        });
        TestCase.assertTrue(distributedFileSystem.exists(new Path(path2, "a")));
        FSDataOutputStream append = distributedFileSystem.append(new Path(path, "a"));
        append.write("hello".getBytes());
        append.close();
        long len = distributedFileSystem.getFileStatus(new Path(path, "a")).getLen();
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.FINISH, () -> {
            distCpProcedure.finalDistCp();
        });
        Assert.assertEquals(len, distributedFileSystem.getFileStatus(new Path(path2, "a")).getLen());
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testStageFinalDistCp() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        Path path = new Path(str, SRCDAT);
        Path path2 = new Path(str, DSTDAT);
        FSDataOutputStream append = distributedFileSystem.append(new Path(path, "a"));
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext(path, path2, MOUNT));
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.DIFF_DISTCP, () -> {
            distCpProcedure.initDistCp();
        });
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.FINISH, () -> {
            distCpProcedure.finalDistCp();
        });
        LambdaTestUtils.intercept(RemoteException.class, "LeaseExpiredException", "Expect RemoteException(LeaseExpiredException).", () -> {
            append.close();
        });
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testStageFinish() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        Path path = new Path(str, SRCDAT);
        Path path2 = new Path(str, DSTDAT);
        distributedFileSystem.mkdirs(path);
        distributedFileSystem.mkdirs(path2);
        distributedFileSystem.allowSnapshot(path);
        distributedFileSystem.allowSnapshot(path2);
        distributedFileSystem.createSnapshot(path, "DISTCP-BALANCE-CURRENT");
        distributedFileSystem.createSnapshot(path, "DISTCP-BALANCE-NEXT");
        distributedFileSystem.createSnapshot(path2, "DISTCP-BALANCE-CURRENT");
        FsPermission fsPermission = new FsPermission(777);
        distributedFileSystem.setPermission(path, fsPermission);
        FedBalanceContext buildContext = buildContext(path, path2, MOUNT);
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext);
        distCpProcedure.disableWrite(buildContext);
        distCpProcedure.finish();
        TestCase.assertTrue(distributedFileSystem.exists(path2));
        Assert.assertFalse(distributedFileSystem.exists(new Path(path, ".snapshot")));
        Assert.assertFalse(distributedFileSystem.exists(new Path(path2, ".snapshot")));
        Assert.assertEquals(fsPermission, distributedFileSystem.getFileStatus(path2).getPermission());
        Assert.assertEquals(0L, distributedFileSystem.getFileStatus(path).getPermission().toShort());
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testRecoveryByStage() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        Path path = new Path(str, SRCDAT);
        Path path2 = new Path(str, DSTDAT);
        FedBalanceContext buildContext = buildContext(path, path2, MOUNT);
        DistCpProcedure[] distCpProcedureArr = {new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext)};
        distCpProcedureArr[0] = serializeProcedure(distCpProcedureArr[0]);
        executeProcedure(distCpProcedureArr[0], DistCpProcedure.Stage.INIT_DISTCP, () -> {
            distCpProcedureArr[0].preCheck();
        });
        distCpProcedureArr[0] = serializeProcedure(distCpProcedureArr[0]);
        executeProcedure(distCpProcedureArr[0], DistCpProcedure.Stage.DIFF_DISTCP, () -> {
            distCpProcedureArr[0].initDistCp();
        });
        distributedFileSystem.delete(new Path(path, "a"), true);
        distCpProcedureArr[0] = serializeProcedure(distCpProcedureArr[0]);
        executeProcedure(distCpProcedureArr[0], DistCpProcedure.Stage.DISABLE_WRITE, () -> {
            distCpProcedureArr[0].diffDistCp();
        });
        distCpProcedureArr[0] = serializeProcedure(distCpProcedureArr[0]);
        executeProcedure(distCpProcedureArr[0], DistCpProcedure.Stage.FINAL_DISTCP, () -> {
            distCpProcedureArr[0].disableWrite(buildContext);
        });
        distCpProcedureArr[0] = serializeProcedure(distCpProcedureArr[0]);
        FSDataOutputStream append = distributedFileSystem.append(new Path(path, "b/c"));
        executeProcedure(distCpProcedureArr[0], DistCpProcedure.Stage.FINISH, () -> {
            distCpProcedureArr[0].finalDistCp();
        });
        LambdaTestUtils.intercept(RemoteException.class, "LeaseExpiredException", "Expect RemoteException(LeaseExpiredException).", () -> {
            append.close();
        });
        distCpProcedureArr[0] = serializeProcedure(distCpProcedureArr[0]);
        TestCase.assertTrue(distCpProcedureArr[0].execute());
        TestCase.assertTrue(distributedFileSystem.exists(path2));
        Assert.assertFalse(distributedFileSystem.exists(new Path(buildContext.getSrc(), ".snapshot")));
        Assert.assertFalse(distributedFileSystem.exists(new Path(buildContext.getDst(), ".snapshot")));
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testShutdown() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext(new Path(str, SRCDAT), new Path(str, DSTDAT), MOUNT));
        BalanceProcedureScheduler balanceProcedureScheduler = new BalanceProcedureScheduler(conf);
        balanceProcedureScheduler.init(true);
        balanceProcedureScheduler.submit(new BalanceJob.Builder().nextProcedure(distCpProcedure).build());
        Thread.sleep(Math.abs(new Random().nextLong()) % 10000);
        balanceProcedureScheduler.shutDown();
        cleanup(distributedFileSystem, new Path(str));
    }

    @Test
    public void testDisableWrite() throws Exception {
        String str = nnUri + "/user/foo/testdir." + GenericTestUtils.getMethodName();
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) FileSystem.get(URI.create(nnUri), conf);
        createFiles(distributedFileSystem, str, this.srcfiles);
        FedBalanceContext buildContext = buildContext(new Path(str, SRCDAT), new Path(str, DSTDAT), MOUNT);
        DistCpProcedure distCpProcedure = new DistCpProcedure("distcp-procedure", (String) null, 1000L, buildContext);
        Assert.assertNotEquals(0L, distributedFileSystem.getFileStatus(r0).getPermission().toShort());
        executeProcedure(distCpProcedure, DistCpProcedure.Stage.FINAL_DISTCP, () -> {
            distCpProcedure.disableWrite(buildContext);
        });
        Assert.assertEquals(0L, distributedFileSystem.getFileStatus(r0).getPermission().toShort());
        cleanup(distributedFileSystem, new Path(str));
    }

    private FedBalanceContext buildContext(Path path, Path path2, String str) {
        return buildContext(path, path2, str, 0);
    }

    private FedBalanceContext buildContext(Path path, Path path2, String str, int i) {
        return new FedBalanceContext.Builder(path, path2, str, conf).setMapNum(10).setBandwidthLimit(1).setTrash(FedBalanceConfigs.TrashOption.TRASH).setDelayDuration(1000L).setDiffThreshold(i).build();
    }

    protected static void executeProcedure(DistCpProcedure distCpProcedure, DistCpProcedure.Stage stage, Call call) throws IOException {
        DistCpProcedure.Stage stage2 = DistCpProcedure.Stage.PRE_CHECK;
        distCpProcedure.updateStage(stage2);
        while (stage2 != stage) {
            try {
                call.execute();
                stage2 = distCpProcedure.getStage();
            } catch (BalanceProcedure.RetryException e) {
                stage2 = distCpProcedure.getStage();
            } catch (Throwable th) {
                distCpProcedure.getStage();
                throw th;
            }
        }
    }

    private void createFiles(DistributedFileSystem distributedFileSystem, String str, FileEntry[] fileEntryArr) throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        Random random = new Random(currentTimeMillis);
        for (FileEntry fileEntry : fileEntryArr) {
            Path path = new Path(str + "/" + fileEntry.getPath());
            if (fileEntry.isDirectory()) {
                distributedFileSystem.mkdirs(path);
            } else {
                DFSTestUtil.createFile(distributedFileSystem, path, 128, FILE_SIZE, BLOCK_SIZE, (short) 2, currentTimeMillis);
            }
            currentTimeMillis = System.currentTimeMillis() + random.nextLong();
        }
    }

    private DistCpProcedure serializeProcedure(DistCpProcedure distCpProcedure) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        distCpProcedure.write(new DataOutputStream(byteArrayOutputStream));
        DistCpProcedure distCpProcedure2 = new DistCpProcedure();
        distCpProcedure2.readFields(new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())));
        return distCpProcedure2;
    }

    private void cleanup(DistributedFileSystem distributedFileSystem, Path path) throws IOException {
        Path path2 = new Path(path, SRCDAT);
        Path path3 = new Path(path, DSTDAT);
        DistCpProcedure.cleanupSnapshot(distributedFileSystem, path2);
        DistCpProcedure.cleanupSnapshot(distributedFileSystem, path3);
        distributedFileSystem.delete(path, true);
    }
}
