/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapred.TestShuffleHandler;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoMaker;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.jboss.netty.channel.ChannelFuture;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;

/*
 * Exception performing whole class analysis ignored.
 */
public class TestShuffleHandler {
    static final long MiB = 0x100000L;
    private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);

    @Test(timeout=10000L)
    public void testSerializeMeta() throws Exception {
        Assert.assertEquals((long)1L, (long)ShuffleHandler.deserializeMetaData((ByteBuffer)ShuffleHandler.serializeMetaData((int)1)));
        Assert.assertEquals((long)-1L, (long)ShuffleHandler.deserializeMetaData((ByteBuffer)ShuffleHandler.serializeMetaData((int)-1)));
        Assert.assertEquals((long)8080L, (long)ShuffleHandler.deserializeMetaData((ByteBuffer)ShuffleHandler.serializeMetaData((int)8080)));
    }

    @Test(timeout=10000L)
    public void testShuffleMetrics() throws Exception {
        MetricsSystemImpl ms = new MetricsSystemImpl();
        ShuffleHandler sh = new ShuffleHandler((MetricsSystem)ms);
        ChannelFuture cf = (ChannelFuture)MockitoMaker.make((Object)((ChannelFuture)MockitoMaker.stub(ChannelFuture.class).returning((Object)Boolean.valueOf((boolean)true), (Object[])new Object[]{Boolean.valueOf((boolean)false)}).from).isSuccess());
        sh.metrics.shuffleConnections.incr();
        sh.metrics.shuffleOutputBytes.incr(0x100000L);
        sh.metrics.shuffleConnections.incr();
        sh.metrics.shuffleOutputBytes.incr(0x200000L);
        TestShuffleHandler.checkShuffleMetrics((MetricsSystem)ms, (long)0x300000L, (int)0, (int)0, (int)2);
        sh.metrics.operationComplete(cf);
        sh.metrics.operationComplete(cf);
        TestShuffleHandler.checkShuffleMetrics((MetricsSystem)ms, (long)0x300000L, (int)1, (int)1, (int)0);
    }

    static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed, int succeeded, int connections) {
        MetricsSource source = ms.getSource("ShuffleMetrics");
        MetricsRecordBuilder rb = MetricsAsserts.getMetrics((MetricsSource)source);
        MetricsAsserts.assertCounter((String)"ShuffleOutputBytes", (long)bytes, (MetricsRecordBuilder)rb);
        MetricsAsserts.assertCounter((String)"ShuffleOutputsFailed", (int)failed, (MetricsRecordBuilder)rb);
        MetricsAsserts.assertCounter((String)"ShuffleOutputsOK", (int)succeeded, (MetricsRecordBuilder)rb);
        MetricsAsserts.assertGauge((String)"ShuffleConnections", (int)connections, (MetricsRecordBuilder)rb);
    }

    @Test(timeout=10000L)
    public void testClientClosesConnection() throws Exception {
        ArrayList failures = new ArrayList(1);
        Configuration conf = new Configuration();
        conf.setInt("mapreduce.shuffle.port", 0);
        1 shuffleHandler = new /* Unavailable Anonymous Inner Class!! */;
        shuffleHandler.init(conf);
        shuffleHandler.start();
        URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("mapreduce.shuffle.port") + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestProperty("name", "mapreduce");
        conn.setRequestProperty("version", "1.0.0");
        conn.connect();
        DataInputStream input = new DataInputStream(conn.getInputStream());
        Assert.assertEquals((long)200L, (long)conn.getResponseCode());
        ShuffleHeader header = new ShuffleHeader();
        header.readFields((DataInput)input);
        input.close();
        shuffleHandler.stop();
        Assert.assertTrue((String)"sendError called when client closed connection", (failures.size() == 0 ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testIncompatibleShuffleVersion() throws Exception {
        int failureNum = 3;
        Configuration conf = new Configuration();
        conf.setInt("mapreduce.shuffle.port", 0);
        ShuffleHandler shuffleHandler = new ShuffleHandler();
        shuffleHandler.init(conf);
        shuffleHandler.start();
        URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("mapreduce.shuffle.port") + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
        for (int i = 0; i < 3; ++i) {
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", i == 0 ? "mapreduce" : "other");
            conn.setRequestProperty("version", i == 1 ? "1.0.0" : "1.0.1");
            conn.connect();
            Assert.assertEquals((long)400L, (long)conn.getResponseCode());
        }
        shuffleHandler.stop();
        shuffleHandler.close();
    }

    @Test(timeout=10000L)
    public void testMaxConnections() throws Exception {
        int i;
        Configuration conf = new Configuration();
        conf.setInt("mapreduce.shuffle.port", 0);
        conf.setInt("mapreduce.shuffle.max.connections", 3);
        2 shuffleHandler = new /* Unavailable Anonymous Inner Class!! */;
        shuffleHandler.init(conf);
        shuffleHandler.start();
        int connAttempts = 3;
        HttpURLConnection[] conns = new HttpURLConnection[connAttempts];
        for (i = 0; i < connAttempts; ++i) {
            String URLstring = "http://127.0.0.1:" + shuffleHandler.getConfig().get("mapreduce.shuffle.port") + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_" + i + "_0";
            URL url = new URL(URLstring);
            conns[i] = (HttpURLConnection)url.openConnection();
            conns[i].setRequestProperty("name", "mapreduce");
            conns[i].setRequestProperty("version", "1.0.0");
        }
        for (i = 0; i < connAttempts; ++i) {
            conns[i].connect();
        }
        conns[0].getInputStream();
        int rc = conns[0].getResponseCode();
        Assert.assertEquals((long)200L, (long)rc);
        conns[1].getInputStream();
        rc = conns[1].getResponseCode();
        Assert.assertEquals((long)200L, (long)rc);
        try {
            conns[2].getInputStream();
            rc = conns[2].getResponseCode();
            Assert.fail((String)"Expected a SocketException");
        }
        catch (SocketException se) {
            LOG.info((Object)"Expected - connection should not be open");
        }
        catch (Exception e) {
            Assert.fail((String)"Expected a SocketException");
        }
        shuffleHandler.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testMapFileAccess() throws IOException {
        Assume.assumeTrue((boolean)NativeIO.isAvailable());
        Configuration conf = new Configuration();
        conf.setInt("mapreduce.shuffle.port", 0);
        conf.setInt("mapreduce.shuffle.max.connections", 3);
        conf.set("hadoop.security.authentication", "kerberos");
        UserGroupInformation.setConfiguration((Configuration)conf);
        File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        conf.set("yarn.nodemanager.local-dirs", absLogDir.getAbsolutePath());
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        System.out.println(appId.toString());
        String appAttemptId = "attempt_12345_1_m_1_0";
        String user = "randomUser";
        String reducerId = "0";
        ArrayList fileMap = new ArrayList();
        TestShuffleHandler.createShuffleHandlerFiles((File)absLogDir, (String)user, (String)appId.toString(), (String)appAttemptId, (Configuration)conf, fileMap);
        3 shuffleHandler = new /* Unavailable Anonymous Inner Class!! */;
        shuffleHandler.init(conf);
        try {
            InputStream is;
            shuffleHandler.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("mapreduce.shuffle.port") + "/mapOutput?job=job_12345_0001&reduce=" + reducerId + "&map=attempt_12345_1_m_1_0");
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            conn.connect();
            byte[] byteArr = new byte[10000];
            try {
                is = new DataInputStream(conn.getInputStream());
                ((DataInputStream)is).readFully(byteArr);
            }
            catch (EOFException e) {
                // empty catch block
            }
            is = new FileInputStream((File)fileMap.get(0));
            String owner = NativeIO.POSIX.getFstat((FileDescriptor)((FileInputStream)is).getFD()).getOwner();
            ((FileInputStream)is).close();
            String message = "Owner '" + owner + "' for path " + ((File)fileMap.get(0)).getAbsolutePath() + " did not match expected owner '" + user + "'";
            Assert.assertTrue((boolean)new String(byteArr).contains(message));
        }
        finally {
            shuffleHandler.stop();
        }
    }

    public static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId, Configuration conf, List<File> fileMap) throws IOException {
        String attemptDir = StringUtils.join((CharSequence)"/", Arrays.asList(logDir.getAbsolutePath(), "usercache", user, "appcache", appId, "output", appAttemptId));
        File appAttemptDir = new File(attemptDir);
        appAttemptDir.mkdirs();
        System.out.println(appAttemptDir.getAbsolutePath());
        File indexFile = new File(appAttemptDir, "file.out.index");
        fileMap.add(indexFile);
        TestShuffleHandler.createIndexFile((File)indexFile, (Configuration)conf);
        File mapOutputFile = new File(appAttemptDir, "file.out");
        fileMap.add(mapOutputFile);
        TestShuffleHandler.createMapOutputFile((File)mapOutputFile, (Configuration)conf);
    }

    public static void createMapOutputFile(File mapOutputFile, Configuration conf) throws IOException {
        FileOutputStream out = new FileOutputStream(mapOutputFile);
        out.write("Creating new dummy map output file. Used only for testing".getBytes());
        out.flush();
        out.close();
    }

    public static void createIndexFile(File indexFile, Configuration conf) throws IOException {
        if (indexFile.exists()) {
            System.out.println("Deleting existing file");
            indexFile.delete();
        }
        indexFile.createNewFile();
        FSDataOutputStream output = FileSystem.getLocal((Configuration)conf).getRaw().append(new Path(indexFile.getAbsolutePath()));
        PureJavaCrc32 crc = new PureJavaCrc32();
        crc.reset();
        CheckedOutputStream chk = new CheckedOutputStream((OutputStream)output, (Checksum)crc);
        String msg = "Writing new index file. This file will be used only for the testing.";
        chk.write(Arrays.copyOf(msg.getBytes(), 24));
        output.writeLong(chk.getChecksum().getValue());
        output.close();
    }
}

