/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master.procedure;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={LargeTests.class})
public class TestWALProcedureStoreOnHDFS {
    private static final Log LOG = LogFactory.getLog(TestWALProcedureStoreOnHDFS.class);
    protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
    private WALProcedureStore store;
    private ProcedureStore.ProcedureStoreListener stopProcedureListener = new ProcedureStore.ProcedureStoreListener(){

        public void postSync() {
        }

        public void abortProcess() {
            LOG.fatal((Object)"Abort the Procedure Store");
            TestWALProcedureStoreOnHDFS.this.store.stop(true);
        }
    };

    @Before
    public void initConfig() {
        Configuration conf = UTIL.getConfiguration();
        conf.setInt("dfs.replication", 3);
        conf.setInt("dfs.namenode.replication.min", 3);
        conf.setInt("hbase.procedure.store.wal.wait.before.roll", 1000);
        conf.setInt("hbase.procedure.store.wal.max.roll.retries", 10);
        conf.setInt("hbase.procedure.store.wal.sync.failure.roll.max", 10);
    }

    private void setup() throws Exception {
        MiniDFSCluster dfs = UTIL.startMiniDFSCluster(3);
        Path logDir = new Path(new Path(dfs.getFileSystem().getUri()), "/test-logs");
        this.store = ProcedureTestingUtility.createWalStore((Configuration)UTIL.getConfiguration(), (Path)logDir);
        this.store.registerListener(this.stopProcedureListener);
        this.store.start(8);
        this.store.recoverLease();
    }

    public void tearDown() throws Exception {
        this.store.stop(false);
        UTIL.getDFSCluster().getFileSystem().delete(this.store.getWALDir(), true);
        try {
            UTIL.shutdownMiniCluster();
        }
        catch (Exception e) {
            LOG.warn((Object)"failure shutting down cluster", (Throwable)e);
        }
    }

    @Test(timeout=60000L, expected=RuntimeException.class)
    public void testWalAbortOnLowReplication() throws Exception {
        this.setup();
        Assert.assertEquals((long)3L, (long)UTIL.getDFSCluster().getDataNodes().size());
        LOG.info((Object)"Stop DataNode");
        UTIL.getDFSCluster().stopDataNode(0);
        Assert.assertEquals((long)2L, (long)UTIL.getDFSCluster().getDataNodes().size());
        this.store.insert((Procedure)new ProcedureTestingUtility.TestProcedure(1L, -1L), null);
        long i = 2L;
        while (this.store.isRunning()) {
            Assert.assertEquals((long)2L, (long)UTIL.getDFSCluster().getDataNodes().size());
            this.store.insert((Procedure)new ProcedureTestingUtility.TestProcedure(i, -1L), null);
            Thread.sleep(100L);
            ++i;
        }
        Assert.assertFalse((boolean)this.store.isRunning());
        Assert.fail((String)"The store.insert() should throw an exeption");
    }

    @Test(timeout=60000L)
    public void testWalAbortOnLowReplicationWithQueuedWriters() throws Exception {
        int i;
        this.setup();
        Assert.assertEquals((long)3L, (long)UTIL.getDFSCluster().getDataNodes().size());
        this.store.registerListener(new ProcedureStore.ProcedureStoreListener(){

            public void postSync() {
                Threads.sleepWithoutInterrupt((long)2000L);
            }

            public void abortProcess() {
            }
        });
        final AtomicInteger reCount = new AtomicInteger(0);
        Thread[] thread = new Thread[this.store.getNumThreads() * 2 + 1];
        for (i = 0; i < thread.length; ++i) {
            final long procId = i + 1;
            thread[i] = new Thread(){

                @Override
                public void run() {
                    try {
                        LOG.debug((Object)("[S] INSERT " + procId));
                        TestWALProcedureStoreOnHDFS.this.store.insert((Procedure)new ProcedureTestingUtility.TestProcedure(procId, -1L), null);
                        LOG.debug((Object)("[E] INSERT " + procId));
                    }
                    catch (RuntimeException e) {
                        reCount.incrementAndGet();
                        LOG.debug((Object)("[F] INSERT " + procId + ": " + e.getMessage()));
                    }
                }
            };
            thread[i].start();
        }
        Thread.sleep(1000L);
        LOG.info((Object)"Stop DataNode");
        UTIL.getDFSCluster().stopDataNode(0);
        Assert.assertEquals((long)2L, (long)UTIL.getDFSCluster().getDataNodes().size());
        for (i = 0; i < thread.length; ++i) {
            thread[i].join();
        }
        Assert.assertFalse((boolean)this.store.isRunning());
        Assert.assertTrue((String)reCount.toString(), (reCount.get() >= this.store.getNumThreads() && reCount.get() < thread.length ? 1 : 0) != 0);
    }

    @Test(timeout=60000L)
    public void testWalRollOnLowReplication() throws Exception {
        UTIL.getConfiguration().setInt("dfs.namenode.replication.min", 1);
        this.setup();
        int dnCount = 0;
        this.store.insert((Procedure)new ProcedureTestingUtility.TestProcedure(1L, -1L), null);
        UTIL.getDFSCluster().restartDataNode(dnCount);
        for (long i = 2L; i < 100L; ++i) {
            this.store.insert((Procedure)new ProcedureTestingUtility.TestProcedure(i, -1L), null);
            this.waitForNumReplicas(3);
            Thread.sleep(100L);
            if (i % 30L != 0L) continue;
            LOG.info((Object)"Restart Data Node");
            UTIL.getDFSCluster().restartDataNode(++dnCount % 3);
        }
        Assert.assertTrue((boolean)this.store.isRunning());
    }

    public void waitForNumReplicas(int numReplicas) throws Exception {
        while (UTIL.getDFSCluster().getDataNodes().size() < numReplicas) {
            Thread.sleep(100L);
        }
        for (int i = 0; i < numReplicas; ++i) {
            for (DataNode dn : UTIL.getDFSCluster().getDataNodes()) {
                while (!dn.isDatanodeFullyStarted()) {
                    Thread.sleep(100L);
                }
            }
        }
    }
}

