/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSStripedOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DataStreamer;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.StripedDataStreamer;
import org.apache.hadoop.hdfs.StripedFileTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockRecoveryWorker;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.StringUtils;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestLeaseRecoveryStriped {
    public static final Logger LOG = LoggerFactory.getLogger(TestLeaseRecoveryStriped.class);
    private final ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy();
    private final int dataBlocks = this.ecPolicy.getNumDataUnits();
    private final int parityBlocks = this.ecPolicy.getNumParityUnits();
    private final int cellSize = this.ecPolicy.getCellSize();
    private final int stripeSize = this.dataBlocks * this.cellSize;
    private final int stripesPerBlock = 4;
    private final int blockSize = this.cellSize * 4;
    private final int blockGroupSize = this.blockSize * this.dataBlocks;
    private static final int bytesPerChecksum = 512;
    private static final String fakeUsername = "fakeUser1";
    private static final String fakeGroup = "supergroup";
    private MiniDFSCluster cluster;
    private DistributedFileSystem dfs;
    private Configuration conf;
    private final Path dir = new Path("/" + this.getClass().getSimpleName());
    final Path p = new Path(this.dir, "testfile");
    private final int testFileLength = 3 * this.stripeSize;
    private final BlockLengths[] blockLengthsSuite = this.getBlockLengthsSuite();

    @Before
    public void setup() throws IOException {
        this.conf = new HdfsConfiguration();
        this.conf.setLong("dfs.blocksize", (long)this.blockSize);
        this.conf.setLong("dfs.client.socket-timeout", 60000L);
        this.conf.setBoolean("dfs.namenode.redundancy.considerLoad", false);
        this.conf.setInt("dfs.heartbeat.interval", 1);
        this.conf.setInt("dfs.namenode.replication.max-streams", 0);
        int numDNs = this.dataBlocks + this.parityBlocks;
        this.cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(numDNs).build();
        this.cluster.waitActive();
        this.dfs = this.cluster.getFileSystem();
        this.dfs.enableErasureCodingPolicy(this.ecPolicy.getName());
        this.dfs.mkdirs(this.dir);
        this.dfs.setErasureCodingPolicy(this.dir, this.ecPolicy.getName());
    }

    @After
    public void tearDown() {
        if (this.cluster != null) {
            this.cluster.shutdown();
        }
    }

    private BlockLengths[] getBlockLengthsSuite() {
        int groups = 4;
        boolean minNumCell = true;
        int maxNumCell = 4;
        int minNumDelta = -4;
        int maxNumDelta = 2;
        BlockLengths[] suite = new BlockLengths[4];
        ThreadLocalRandom random = ThreadLocalRandom.current();
        for (int i = 0; i < 4; ++i) {
            int[] blockLengths = new int[this.dataBlocks + this.parityBlocks];
            for (int j = 0; j < blockLengths.length; ++j) {
                int numCell = ((Random)random).nextInt(4) + 1;
                int numDelta = 0;
                if (i == 3 && j < this.dataBlocks) {
                    numDelta = ((Random)random).nextInt(7) + -4;
                }
                blockLengths[j] = this.cellSize * numCell + 512 * numDelta;
            }
            suite[i] = new BlockLengths(this.ecPolicy, blockLengths);
        }
        return suite;
    }

    @Test
    public void testLeaseRecovery() throws Exception {
        LOG.info("blockLengthsSuite: " + Arrays.toString(this.blockLengthsSuite));
        for (int i = 0; i < this.blockLengthsSuite.length; ++i) {
            BlockLengths blockLengths = this.blockLengthsSuite[i];
            try {
                this.runTest(blockLengths.getBlockLengths(), blockLengths.getSafeLength());
                continue;
            }
            catch (Throwable e) {
                String msg = "failed testCase at i=" + i + ", blockLengths=" + blockLengths + "\n" + StringUtils.stringifyException((Throwable)e);
                Assert.fail((String)msg);
            }
        }
    }

    private void runTest(int[] blockLengths, long safeLength) throws Exception {
        this.writePartialBlocks(blockLengths);
        int checkDataLength = Math.min(this.testFileLength, (int)safeLength);
        this.recoverLease();
        ArrayList<Long> oldGS = new ArrayList<Long>();
        oldGS.add(1001L);
        StripedFileTestUtil.checkData(this.dfs, this.p, checkDataLength, new ArrayList<DatanodeInfo>(), oldGS, this.blockGroupSize);
        this.cluster.restartNameNode(true);
        this.cluster.waitFirstBRCompleted(0, 10000);
        StripedFileTestUtil.checkData(this.dfs, this.p, checkDataLength, new ArrayList<DatanodeInfo>(), oldGS, this.blockGroupSize);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void writePartialBlocks(int[] blockLengths) throws Exception {
        FSDataOutputStream out = this.dfs.create(this.p);
        DFSStripedOutputStream stripedOut = (DFSStripedOutputStream)out.getWrappedStream();
        int[] posToKill = this.getPosToKill(blockLengths);
        int checkingPos = this.nextCheckingPos(posToKill, 0);
        HashSet<Integer> stoppedStreamerIndexes = new HashSet<Integer>();
        try {
            for (int pos = 0; pos < this.testFileLength; ++pos) {
                out.write((int)StripedFileTestUtil.getByte(pos));
                if (pos != checkingPos) continue;
                for (int index : this.getIndexToStop(posToKill, pos)) {
                    out.flush();
                    stripedOut.enqueueAllCurrentPackets();
                    LOG.info("Stopping block stream idx {} at file offset {} block length {}", new Object[]{index, pos, blockLengths[index]});
                    StripedDataStreamer s = stripedOut.getStripedDataStreamer(index);
                    TestLeaseRecoveryStriped.waitStreamerAllAcked((DataStreamer)s);
                    this.waitByteSent(s, blockLengths[index]);
                    this.stopBlockStream(s);
                    stoppedStreamerIndexes.add(index);
                }
                checkingPos = this.nextCheckingPos(posToKill, pos);
            }
        }
        finally {
            out.flush();
            stripedOut.enqueueAllCurrentPackets();
            for (int i = 0; i < blockLengths.length; ++i) {
                if (stoppedStreamerIndexes.contains(i)) continue;
                StripedDataStreamer s = stripedOut.getStripedDataStreamer(i);
                LOG.info("Waiting for block stream idx {} to reach length {}", (Object)i, (Object)blockLengths[i]);
                TestLeaseRecoveryStriped.waitStreamerAllAcked((DataStreamer)s);
            }
            DFSTestUtil.abortStream((DFSOutputStream)stripedOut);
        }
    }

    private int nextCheckingPos(int[] posToKill, int curPos) {
        int checkingPos = Integer.MAX_VALUE;
        for (int i = 0; i < posToKill.length; ++i) {
            if (posToKill[i] <= curPos) continue;
            checkingPos = Math.min(checkingPos, posToKill[i]);
        }
        return checkingPos;
    }

    private int[] getPosToKill(int[] blockLengths) {
        int numStripe;
        int i;
        int[] posToKill = new int[this.dataBlocks + this.parityBlocks];
        for (i = 0; i < this.dataBlocks; ++i) {
            numStripe = (blockLengths[i] - 1) / this.cellSize;
            posToKill[i] = numStripe * this.stripeSize + i * this.cellSize + blockLengths[i] % this.cellSize;
            if (blockLengths[i] % this.cellSize != 0) continue;
            int n = i;
            posToKill[n] = posToKill[n] + this.cellSize;
        }
        for (i = this.dataBlocks; i < this.dataBlocks + this.parityBlocks; ++i) {
            Preconditions.checkArgument((blockLengths[i] % this.cellSize == 0 ? 1 : 0) != 0);
            numStripe = blockLengths[i] / this.cellSize;
            posToKill[i] = numStripe * this.stripeSize;
        }
        return posToKill;
    }

    private List<Integer> getIndexToStop(int[] posToKill, int pos) {
        LinkedList<Integer> indices = new LinkedList<Integer>();
        for (int i = 0; i < posToKill.length; ++i) {
            if (pos != posToKill[i]) continue;
            indices.add(i);
        }
        return indices;
    }

    private void waitByteSent(final StripedDataStreamer s, final long byteSent) throws Exception {
        try {
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                public Boolean get() {
                    return s.bytesSent >= byteSent;
                }
            }, (int)100, (int)30000);
        }
        catch (TimeoutException e) {
            throw new IOException("Timeout waiting for streamer " + s + ". Sent=" + s.bytesSent + ", expected=" + byteSent);
        }
    }

    private void stopBlockStream(StripedDataStreamer s) throws Exception {
        IOUtils.NullOutputStream nullOutputStream = new IOUtils.NullOutputStream();
        Whitebox.setInternalState((Object)s, (String)"blockStream", (Object)new DataOutputStream((OutputStream)nullOutputStream));
    }

    private void recoverLease() throws Exception {
        final DistributedFileSystem dfs2 = (DistributedFileSystem)this.getFSAsAnotherUser(this.conf);
        try {
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                public Boolean get() {
                    try {
                        return dfs2.recoverLease(TestLeaseRecoveryStriped.this.p);
                    }
                    catch (IOException e) {
                        return false;
                    }
                }
            }, (int)5000, (int)24000);
        }
        catch (TimeoutException e) {
            throw new IOException("Timeout waiting for recoverLease()");
        }
    }

    private FileSystem getFSAsAnotherUser(Configuration c) throws IOException, InterruptedException {
        return FileSystem.get((URI)FileSystem.getDefaultUri((Configuration)c), (Configuration)c, (String)UserGroupInformation.createUserForTesting((String)fakeUsername, (String[])new String[]{fakeGroup}).getUserName());
    }

    public static void waitStreamerAllAcked(DataStreamer s) throws IOException {
        long toWaitFor = s.getLastQueuedSeqno();
        s.waitForAckedSeqno(toWaitFor);
    }

    static {
        GenericTestUtils.setLogLevel((Logger)DataNode.LOG, (Level)Level.ALL);
        GenericTestUtils.setLogLevel((Logger)DFSStripedOutputStream.LOG, (Level)Level.DEBUG);
        GenericTestUtils.setLogLevel((Logger)BlockRecoveryWorker.LOG, (Level)Level.DEBUG);
        GenericTestUtils.setLogLevel((Logger)DataStreamer.LOG, (Level)Level.DEBUG);
    }

    private static class BlockLengths {
        private final int[] blockLengths;
        private final long safeLength;

        BlockLengths(ErasureCodingPolicy policy, int[] blockLengths) {
            this.blockLengths = blockLengths;
            long[] longArray = Arrays.stream(blockLengths).asLongStream().toArray();
            this.safeLength = StripedBlockUtil.getSafeLength((ErasureCodingPolicy)policy, (long[])longArray);
        }

        public String toString() {
            return new ToStringBuilder((Object)this).append("blockLengths", this.getBlockLengths()).append("safeLength", this.getSafeLength()).toString();
        }

        public int[] getBlockLengths() {
            return this.blockLengths;
        }

        public long getSafeLength() {
            return this.safeLength;
        }
    }
}

