/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestConsistentReadsObserver {
    public static final Logger LOG = LoggerFactory.getLogger((String)TestConsistentReadsObserver.class.getName());
    private static Configuration conf;
    private static MiniQJMHACluster qjmhaCluster;
    private static MiniDFSCluster dfsCluster;
    private DistributedFileSystem dfs;
    private final Path testPath = new Path("/TestConsistentReadsObserver");

    @BeforeClass
    public static void startUpCluster() throws Exception {
        conf = new Configuration();
        conf.setBoolean("dfs.namenode.state.context.enabled", true);
        qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 0, false);
        dfsCluster = qjmhaCluster.getDfsCluster();
    }

    @Before
    public void setUp() throws Exception {
        this.dfs = this.setObserverRead(true);
    }

    @After
    public void cleanUp() throws IOException {
        this.dfs.delete(this.testPath, true);
    }

    @AfterClass
    public static void shutDownCluster() throws IOException {
        if (qjmhaCluster != null) {
            qjmhaCluster.shutdown();
        }
    }

    @Test
    public void testRequeueCall() throws Exception {
        this.setObserverRead(true);
        int observerIdx = 2;
        NameNode nn = dfsCluster.getNameNode(2);
        int port = nn.getNameNodeAddress().getPort();
        Configuration configuration = dfsCluster.getConfiguration(2);
        String prefix = "ipc." + port + ".";
        configuration.set(prefix + "scheduler.impl", TestRpcScheduler.class.getName());
        configuration.setBoolean(prefix + "backoff.enable", true);
        NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
        this.dfs.create(this.testPath, (short)1).close();
        this.assertSentTo(0);
        this.dfs.getFileStatus(this.testPath);
        this.assertSentTo(0);
    }

    @Test
    public void testMsyncSimple() throws Exception {
        AtomicInteger readStatus = new AtomicInteger(0);
        this.dfs.getClient().getHAServiceState();
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        Thread reader = new Thread(() -> {
            try {
                this.dfs.getFileStatus(this.testPath);
                readStatus.set(1);
            }
            catch (IOException e) {
                e.printStackTrace();
                readStatus.set(-1);
            }
        });
        reader.start();
        Assert.assertEquals((long)0L, (long)readStatus.get());
        dfsCluster.rollEditLogAndTail(0);
        GenericTestUtils.waitFor(() -> readStatus.get() != 0, (long)100L, (long)10000L);
        Assert.assertEquals((long)1L, (long)readStatus.get());
    }

    private void testMsync(boolean autoMsync, long autoMsyncPeriodMs) throws Exception {
        AtomicInteger readStatus = new AtomicInteger(0);
        Configuration conf2 = new Configuration(conf);
        conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
        if (autoMsync) {
            conf2.setTimeDuration("dfs.client.failover.observer.auto-msync-period." + this.dfs.getUri().getHost(), autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
        }
        DistributedFileSystem dfs2 = (DistributedFileSystem)FileSystem.get((Configuration)conf2);
        this.dfs.getClient().getHAServiceState();
        dfs2.getClient().getHAServiceState();
        this.dfs.mkdir(new Path("/test"), FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        this.assertSentTo(0);
        Thread reader = new Thread(() -> {
            try {
                if (!autoMsync) {
                    dfs2.getClient().msync();
                } else if (autoMsyncPeriodMs > 0L) {
                    Thread.sleep(autoMsyncPeriodMs);
                }
                dfs2.getFileStatus(this.testPath);
                if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
                    readStatus.set(1);
                } else {
                    readStatus.set(-1);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                readStatus.set(-1);
            }
        });
        reader.start();
        Thread.sleep(100L);
        Assert.assertEquals((long)0L, (long)readStatus.get());
        dfsCluster.rollEditLogAndTail(0);
        GenericTestUtils.waitFor(() -> readStatus.get() != 0, (long)100L, (long)3000L);
        Assert.assertEquals((long)1L, (long)readStatus.get());
    }

    @Test
    public void testExplicitMsync() throws Exception {
        this.testMsync(false, -1L);
    }

    @Test
    public void testAutoMsyncPeriod0() throws Exception {
        this.testMsync(true, 0L);
    }

    @Test
    public void testAutoMsyncPeriod5() throws Exception {
        this.testMsync(true, 5L);
    }

    @Test(expected=TimeoutException.class)
    public void testAutoMsyncLongPeriod() throws Exception {
        this.testMsync(true, Long.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCallFromNewClient() throws Exception {
        dfsCluster.transitionToStandby(0);
        dfsCluster.transitionToObserver(0);
        dfsCluster.transitionToStandby(2);
        dfsCluster.transitionToActive(2);
        try {
            AtomicInteger readStatus = new AtomicInteger(0);
            this.dfs.getClient().getHAServiceState();
            this.dfs.mkdir(new Path("/test"), FsPermission.getDefault());
            dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
            dfsCluster.getNameNode(0).getNamesystem().getEditLogTailer().doTailEdits();
            this.dfs.mkdir(this.testPath, FsPermission.getDefault());
            this.assertSentTo(2);
            Configuration conf2 = new Configuration(conf);
            conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
            DistributedFileSystem dfs2 = (DistributedFileSystem)FileSystem.get((Configuration)conf2);
            dfs2.getClient().getHAServiceState();
            Thread reader = new Thread(() -> {
                try {
                    dfs2.getFileStatus(this.testPath);
                    readStatus.set(1);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    readStatus.set(-1);
                }
            });
            reader.start();
            Thread.sleep(100L);
            Assert.assertEquals((long)0L, (long)readStatus.get());
            dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
            dfsCluster.getNameNode(0).getNamesystem().getEditLogTailer().doTailEdits();
            GenericTestUtils.waitFor(() -> readStatus.get() != 0, (long)100L, (long)10000L);
            Assert.assertEquals((long)1L, (long)readStatus.get());
        }
        finally {
            dfsCluster.transitionToStandby(2);
            dfsCluster.transitionToObserver(2);
            dfsCluster.transitionToStandby(0);
            dfsCluster.transitionToActive(0);
        }
    }

    @Test
    public void testUncoordinatedCall() throws Exception {
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        AtomicInteger readStatus = new AtomicInteger(0);
        Thread reader = new Thread(() -> {
            try {
                this.dfs.getClient().getFileInfo("/");
                readStatus.set(1);
                Assert.fail((String)"Should have been interrupted before getting here.");
            }
            catch (IOException e) {
                e.printStackTrace();
                readStatus.set(-1);
            }
        });
        reader.start();
        long before = Time.now();
        this.dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
        long after = Time.now();
        Assert.assertTrue((after - before < 200L ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)readStatus.get());
        Thread.sleep(5000L);
        Assert.assertEquals((long)0L, (long)readStatus.get());
        Assert.assertEquals((Object)((Object)Thread.State.WAITING), (Object)((Object)reader.getState()));
        reader.interrupt();
    }

    @Test
    public void testRequestFromNonObserverProxyProvider() throws Exception {
        Configuration conf2 = new Configuration(conf);
        HATestUtil.setFailoverConfigurations(conf2, HATestUtil.getLogicalHostname(dfsCluster), Collections.singletonList(dfsCluster.getNameNode(2).getNameNodeAddress()), ConfiguredFailoverProxyProvider.class);
        conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
        conf2.setInt("dfs.client.retry.max.attempts", 1);
        conf2.setInt("dfs.client.failover.max.attempts", 1);
        FileSystem dfs2 = FileSystem.get((Configuration)conf2);
        this.dfs.mkdir(this.testPath, FsPermission.getDefault());
        dfsCluster.rollEditLogAndTail(0);
        try {
            dfs2.listStatus(this.testPath);
            Assert.fail((String)"listStatus should have thrown exception");
        }
        catch (RemoteException re) {
            IOException e = re.unwrapRemoteException();
            Assert.assertTrue((String)("should have thrown StandbyException but got " + e.getClass().getSimpleName()), (boolean)(e instanceof StandbyException));
        }
    }

    private void assertSentTo(int nnIdx) throws IOException {
        Assert.assertTrue((String)("Request was not sent to the expected namenode " + nnIdx), (boolean)HATestUtil.isSentToAnyOfNameNodes(this.dfs, dfsCluster, nnIdx));
    }

    private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
        return HATestUtil.configureObserverReadFs(dfsCluster, conf, ObserverReadProxyProvider.class, flag);
    }

    public static class TestRpcScheduler
    implements RpcScheduler {
        private int allowed = 10;

        public int getPriorityLevel(Schedulable obj) {
            return 0;
        }

        public boolean shouldBackOff(Schedulable obj) {
            return --this.allowed < 0;
        }

        public void stop() {
        }
    }
}

