/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.AdminStatesBaseTest;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.hdfs.util.HostsFileWriter;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class TestDecommissioningStatus {
    private static final long seed = 3735928559L;
    private static final int blockSize = 8192;
    private static final int fileSize = 16384;
    private static final int numDatanodes = 2;
    private static MiniDFSCluster cluster;
    private static FileSystem fileSys;
    private static HostsFileWriter hostsFileWriter;
    private static Configuration conf;
    private static final Logger LOG;
    final ArrayList<String> decommissionedNodes = new ArrayList(2);

    @Before
    public void setUp() throws Exception {
        conf = new HdfsConfiguration();
        conf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
        hostsFileWriter = new HostsFileWriter();
        hostsFileWriter.initialize(conf, "work-dir/decommission");
        conf.setInt("dfs.namenode.heartbeat.recheck-interval", 1000);
        conf.setInt("dfs.heartbeat.interval", 1);
        conf.setInt("dfs.namenode.reconstruction.pending.timeout-sec", 4);
        conf.setInt("dfs.namenode.redundancy.interval.seconds", 1);
        conf.setInt("dfs.namenode.decommission.interval", 1);
        conf.setLong("dfs.datanode.balance.bandwidthPerSec", 1L);
        cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
        cluster.waitActive();
        fileSys = cluster.getFileSystem();
        cluster.getNamesystem().getBlockManager().getDatanodeManager().setHeartbeatExpireInterval(3000L);
        GenericTestUtils.setLogLevel((Logger)LoggerFactory.getLogger(DatanodeAdminManager.class), (Level)Level.DEBUG);
    }

    @After
    public void tearDown() throws Exception {
        if (hostsFileWriter != null) {
            hostsFileWriter.cleanup();
        }
        if (fileSys != null) {
            fileSys.close();
        }
        if (cluster != null) {
            cluster.shutdown();
        }
    }

    private String decommissionNode(DFSClient client, int nodeIndex) throws IOException {
        DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
        String nodename = info[nodeIndex].getXferAddr();
        this.decommissionNode(nodename);
        return nodename;
    }

    private void decommissionNode(String dnName) throws IOException {
        System.out.println("Decommissioning node: " + dnName);
        ArrayList<String> nodes = new ArrayList<String>(this.decommissionedNodes);
        nodes.add(dnName);
        hostsFileWriter.initExcludeHosts(nodes);
    }

    private void checkDecommissionStatus(DatanodeDescriptor decommNode, int expectedUnderRep, int expectedDecommissionOnly, int expectedUnderRepInOpenFiles) throws TimeoutException, InterruptedException {
        String errorMsg = "Under replicated blocks. Expected: " + expectedUnderRep + " , Actual: " + decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks();
        GenericTestUtils.waitFor(() -> expectedUnderRep == decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks(), (long)1000L, (long)TimeUnit.SECONDS.toMillis(10L), (String)errorMsg);
        errorMsg = "OutOfService only replicas. Expected: " + expectedDecommissionOnly + " , Actual: " + decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas();
        GenericTestUtils.waitFor(() -> expectedDecommissionOnly == decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas(), (long)1000L, (long)TimeUnit.SECONDS.toMillis(10L), (String)errorMsg);
        errorMsg = "UnderReplicated in open files. Expected: " + expectedUnderRepInOpenFiles + " , Actual: " + decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles();
        GenericTestUtils.waitFor(() -> expectedUnderRepInOpenFiles == decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles(), (long)1000L, (long)TimeUnit.SECONDS.toMillis(10L), (String)errorMsg);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * WARNING - void declaration
     */
    private void checkDFSAdminDecommissionStatus(List<DatanodeDescriptor> expectedDecomm, DistributedFileSystem dfs, DFSAdmin admin) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        PrintStream ps = new PrintStream((OutputStream)baos);
        PrintStream oldOut = System.out;
        System.setOut(ps);
        try {
            void var12_14;
            admin.report(new String[]{"-decommissioning"}, 0);
            String[] lines = baos.toString().split("\n");
            Integer num = null;
            int count = 0;
            String[] stringArray = lines;
            int n = stringArray.length;
            boolean bl = false;
            while (var12_14 < n) {
                String line = stringArray[var12_14];
                if (line.startsWith("Decommissioning datanodes")) {
                    String temp = line.split(" ")[2];
                    num = Integer.parseInt((String)temp.subSequence(1, temp.length() - 2));
                }
                if (line.contains("Decommission in progress")) {
                    ++count;
                }
                ++var12_14;
            }
            Assert.assertTrue((String)"No decommissioning output", (num != null ? 1 : 0) != 0);
            Assert.assertEquals((String)"Unexpected number of decomming DNs", (long)expectedDecomm.size(), (long)num.intValue());
            Assert.assertEquals((String)"Unexpected number of decomming DNs", (long)expectedDecomm.size(), (long)count);
            ArrayList<DatanodeInfo> decomming = new ArrayList<DatanodeInfo>(Arrays.asList(dfs.getDataNodeStats(HdfsConstants.DatanodeReportType.DECOMMISSIONING)));
            Assert.assertEquals((String)"Unexpected number of decomming DNs", (long)expectedDecomm.size(), (long)decomming.size());
            for (DatanodeID datanodeID : expectedDecomm) {
                Assert.assertTrue((String)("Did not find expected decomming DN " + datanodeID), (boolean)decomming.contains(datanodeID));
            }
        }
        finally {
            System.setOut(oldOut);
        }
    }

    private void waitForDecommissionedNodes(DatanodeAdminManager dnAdminMgr, int trackedNumber) throws TimeoutException, InterruptedException {
        GenericTestUtils.waitFor(() -> dnAdminMgr.getNumTrackedNodes() == trackedNumber, (long)100L, (long)2000L);
    }

    @Test
    public void testDecommissionStatus() throws Exception {
        InetSocketAddress addr = new InetSocketAddress("localhost", cluster.getNameNodePort());
        DFSClient client = new DFSClient(addr, conf);
        DatanodeInfo[] info = client.datanodeReport(HdfsConstants.DatanodeReportType.LIVE);
        Assert.assertEquals((String)"Number of Datanodes ", (long)2L, (long)info.length);
        DistributedFileSystem fileSys = cluster.getFileSystem();
        DFSAdmin admin = new DFSAdmin(cluster.getConfiguration(0));
        short replicas = 2;
        Path file1 = new Path("decommission.dat");
        DFSTestUtil.createFile((FileSystem)fileSys, file1, 16384, 16384L, 8192L, replicas, 3735928559L);
        Path file2 = new Path("decommission1.dat");
        FSDataOutputStream st1 = AdminStatesBaseTest.writeIncompleteFile((FileSystem)fileSys, file2, replicas, (short)2);
        for (DataNode d : cluster.getDataNodes()) {
            DataNodeTestUtils.triggerBlockReport(d);
        }
        FSNamesystem fsn = cluster.getNamesystem();
        DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
        this.verifyInitialState(fsn, dm);
        for (int iteration = 0; iteration < 2; ++iteration) {
            String downnode = this.decommissionNode(client, iteration);
            dm.refreshNodes(conf);
            this.decommissionedNodes.add(downnode);
            BlockManagerTestUtil.recheckDecommissionState(dm);
            this.waitForDecommissionedNodes(dm.getDatanodeAdminManager(), iteration + 1);
            List decommissioningNodes = dm.getDecommissioningNodes();
            Assert.assertEquals((long)decommissioningNodes.size(), (long)(iteration + 1));
            if (iteration == 0) {
                DatanodeDescriptor decommNode = (DatanodeDescriptor)decommissioningNodes.get(0);
                this.checkDecommissionStatus(decommNode, 3, 0, 1);
                this.checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1), fileSys, admin);
                continue;
            }
            DatanodeDescriptor decommNode1 = (DatanodeDescriptor)decommissioningNodes.get(0);
            DatanodeDescriptor decommNode2 = (DatanodeDescriptor)decommissioningNodes.get(1);
            this.checkDecommissionStatus(decommNode1, 3, 3, 1);
            this.checkDecommissionStatus(decommNode2, 4, 4, 2);
            this.checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 2), fileSys, admin);
        }
        hostsFileWriter.initExcludeHost("");
        dm.refreshNodes(conf);
        st1.close();
        AdminStatesBaseTest.cleanupFile((FileSystem)fileSys, file1);
        AdminStatesBaseTest.cleanupFile((FileSystem)fileSys, file2);
    }

    protected void verifyInitialState(FSNamesystem fsn, DatanodeManager dm) throws InterruptedException {
        dm.getDatanodes().forEach(datanodeDescriptor -> {
            try {
                this.checkDecommissionStatus((DatanodeDescriptor)datanodeDescriptor, 0, 0, 0);
            }
            catch (InterruptedException | TimeoutException e) {
                throw new AssertionError("Datanode not in good state.", e);
            }
        });
        int c = 0;
        while (true) {
            int totalBlocks = fsn.getBlockManager().getTotalBlocks();
            long totalReplicatedBlocks = fsn.getBlockManager().getTotalReplicatedBlocks();
            if (totalBlocks == 4 && totalReplicatedBlocks == 4L) break;
            if (c == 4) {
                throw new AssertionError((Object)("Unexpected Total blocks " + totalBlocks + " and replicated blocks " + totalReplicatedBlocks));
            }
            Thread.sleep(3000L);
            ++c;
        }
        c = 0;
        AtomicInteger total = new AtomicInteger(0);
        AtomicInteger sufficientBlocksSuccess = new AtomicInteger(0);
        while (true) {
            total.set(0);
            sufficientBlocksSuccess.set(0);
            dm.getDatanodes().forEach(datanodeDescriptor -> {
                total.addAndGet(datanodeDescriptor.numBlocks());
                if (datanodeDescriptor.numBlocks() == 4) {
                    sufficientBlocksSuccess.incrementAndGet();
                }
            });
            if (total.get() == 8 && sufficientBlocksSuccess.get() == 2) break;
            if (c == 4) {
                throw new AssertionError((Object)("Unexpected Total blocks " + total.get() + " from Datanode Storage. 4 blocks per Datanode Storage expected from each DataNode"));
            }
            Thread.sleep(3000L);
            ++c;
        }
    }

    @Test(timeout=120000L)
    public void testDecommissionStatusAfterDNRestart() throws Exception {
        DistributedFileSystem fileSys = cluster.getFileSystem();
        Path f = new Path("decommission.dat");
        DFSTestUtil.createFile((FileSystem)fileSys, f, 16384, 16384L, 16384L, (short)1, 3735928559L);
        RemoteIterator fileList = fileSys.listLocatedStatus(f);
        BlockLocation[] blockLocations = ((LocatedFileStatus)fileList.next()).getBlockLocations();
        String dnName = blockLocations[0].getNames()[0];
        FSNamesystem fsn = cluster.getNamesystem();
        DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
        this.decommissionNode(dnName);
        dm.refreshNodes(conf);
        MiniDFSCluster.DataNodeProperties dataNodeProperties = cluster.stopDataNode(dnName);
        ArrayList dead = new ArrayList();
        while (true) {
            dm.fetchDatanodes(null, dead, false);
            if (dead.size() == 1) break;
            Thread.sleep(1000L);
        }
        BlockManagerTestUtil.checkHeartbeat(fsn.getBlockManager());
        BlockManagerTestUtil.recheckDecommissionState(dm);
        this.waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 1);
        Assert.assertTrue((String)"the node should be DECOMMISSION_IN_PROGRESSS", (boolean)((DatanodeDescriptor)dead.get(0)).isDecommissionInProgress());
        List decomlist = dm.getDecommissioningNodes();
        Assert.assertTrue((String)"The node should be be decommissioning", (decomlist.size() == 1 ? 1 : 0) != 0);
        AdminStatesBaseTest.cleanupFile((FileSystem)fileSys, f);
        BlockManagerTestUtil.recheckDecommissionState(dm);
        this.waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 0);
        Assert.assertTrue((String)"the node should be decommissioned", (boolean)((DatanodeDescriptor)dead.get(0)).isDecommissioned());
        cluster.restartDataNode(dataNodeProperties, true);
        cluster.waitActive();
        hostsFileWriter.initExcludeHost("");
        dm.refreshNodes(conf);
    }

    @Test(timeout=120000L)
    public void testDecommissionDeadDN() throws Exception {
        Logger log = LoggerFactory.getLogger(DatanodeAdminManager.class);
        GenericTestUtils.setLogLevel((Logger)log, (Level)Level.DEBUG);
        DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
        String dnName = dnID.getXferAddr();
        MiniDFSCluster.DataNodeProperties stoppedDN = cluster.stopDataNode(0);
        DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000);
        FSNamesystem fsn = cluster.getNamesystem();
        DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
        DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
        this.decommissionNode(dnName);
        dm.refreshNodes(conf);
        BlockManagerTestUtil.recheckDecommissionState(dm);
        this.waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 0);
        Assert.assertTrue((boolean)dnDescriptor.isDecommissioned());
        cluster.restartDataNode(stoppedDN, true);
        cluster.waitActive();
        hostsFileWriter.initExcludeHost("");
        dm.refreshNodes(conf);
    }

    @Test(timeout=120000L)
    public void testDecommissionLosingData() throws Exception {
        ArrayList<String> nodes = new ArrayList<String>(2);
        FSNamesystem fsn = cluster.getNamesystem();
        BlockManager bm = fsn.getBlockManager();
        DatanodeManager dm = bm.getDatanodeManager();
        Path file1 = new Path("decommissionLosingData.dat");
        DFSTestUtil.createFile(fileSys, file1, 16384, 16384L, 8192L, (short)2, 3735928559L);
        Thread.sleep(1000L);
        LOG.info("Shutdown dn1");
        DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId();
        String dnName = dnID.getXferAddr();
        DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID);
        nodes.add(dnName);
        MiniDFSCluster.DataNodeProperties stoppedDN1 = cluster.stopDataNode(1);
        DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000);
        LOG.info("Shutdown dn0");
        dnID = cluster.getDataNodes().get(0).getDatanodeId();
        dnName = dnID.getXferAddr();
        DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID);
        nodes.add(dnName);
        MiniDFSCluster.DataNodeProperties stoppedDN0 = cluster.stopDataNode(0);
        DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000);
        LOG.info("Decommissioning nodes");
        hostsFileWriter.initExcludeHosts(nodes);
        dm.refreshNodes(conf);
        BlockManagerTestUtil.recheckDecommissionState(dm);
        this.waitForDecommissionedNodes(dm.getDatanodeAdminManager(), 0);
        Assert.assertTrue((boolean)dnDescriptor0.isDecommissioned());
        Assert.assertTrue((boolean)dnDescriptor1.isDecommissioned());
        long missingBlocks = bm.getMissingBlocksCount();
        long underreplicated = bm.getLowRedundancyBlocksCount();
        Assert.assertTrue((missingBlocks > 0L ? 1 : 0) != 0);
        Assert.assertTrue((underreplicated > 0L ? 1 : 0) != 0);
        LOG.info("Bring back dn0");
        cluster.restartDataNode(stoppedDN0, true);
        while ((dnID = cluster.getDataNodes().get(0).getDatanodeId()) == null) {
        }
        dnDescriptor0 = dm.getDatanode(dnID);
        while (dnDescriptor0.numBlocks() == 0) {
            Thread.sleep(100L);
        }
        LOG.info("Bring back dn1");
        cluster.restartDataNode(stoppedDN1, true);
        while ((dnID = cluster.getDataNodes().get(1).getDatanodeId()) == null) {
        }
        dnDescriptor1 = dm.getDatanode(dnID);
        while (dnDescriptor1.numBlocks() == 0) {
            Thread.sleep(100L);
        }
        Thread.sleep(2000L);
        Assert.assertEquals((long)underreplicated, (long)bm.getLowRedundancyBlocksCount());
        LOG.info("Starting two more nodes");
        cluster.startDataNodes(conf, 2, true, null, null);
        cluster.waitActive();
        int count = 0;
        while ((bm.getLowRedundancyBlocksCount() > 0L || bm.getPendingReconstructionBlocksCount() > 0L) && count++ < 10) {
            Thread.sleep(1000L);
        }
        Assert.assertEquals((long)0L, (long)bm.getLowRedundancyBlocksCount());
        Assert.assertEquals((long)0L, (long)bm.getPendingReconstructionBlocksCount());
        Assert.assertEquals((long)0L, (long)bm.getMissingBlocksCount());
        dnID = cluster.getDataNodes().get(3).getDatanodeId();
        cluster.stopDataNode(3);
        DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000);
        dnID = cluster.getDataNodes().get(2).getDatanodeId();
        cluster.stopDataNode(2);
        DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(), false, 30000);
        hostsFileWriter.initExcludeHost("");
        dm.refreshNodes(conf);
        fileSys.delete(file1, false);
    }

    static {
        LOG = LoggerFactory.getLogger(TestDecommissioningStatus.class);
    }
}

