/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.mapred.lib;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.mapred.lib.DynamicInputChunk;
import org.apache.hadoop.tools.mapred.lib.DynamicRecordReader;
import org.apache.hadoop.tools.util.DistCpUtils;

public class DynamicInputFormat<K, V>
extends InputFormat<K, V> {
    private static final Log LOG = LogFactory.getLog(DynamicInputFormat.class);
    private static final String CONF_LABEL_LISTING_SPLIT_RATIO = "mapred.listing.split.ratio";
    private static final String CONF_LABEL_NUM_SPLITS = "mapred.num.splits";
    private static final String CONF_LABEL_NUM_ENTRIES_PER_CHUNK = "mapred.num.entries.per.chunk";
    private static int N_CHUNKS_OPEN_AT_ONCE_DEFAULT = 16;

    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        LOG.info((Object)("DynamicInputFormat: Getting splits for job:" + jobContext.getJobID()));
        return this.createSplits(jobContext, this.splitCopyListingIntoChunksWithShuffle(jobContext));
    }

    private List<InputSplit> createSplits(JobContext jobContext, List<DynamicInputChunk> chunks) throws IOException {
        int numMaps = DynamicInputFormat.getNumMapTasks(jobContext.getConfiguration());
        int nSplits = Math.min(numMaps, chunks.size());
        ArrayList<InputSplit> splits = new ArrayList<InputSplit>(nSplits);
        for (int i = 0; i < nSplits; ++i) {
            TaskID taskId = new TaskID(jobContext.getJobID(), TaskType.MAP, i);
            chunks.get(i).assignTo(taskId);
            splits.add((InputSplit)new FileSplit(chunks.get(i).getPath(), 0L, (long)DynamicInputFormat.getMinRecordsPerChunk(jobContext.getConfiguration()), null));
        }
        DistCpUtils.publish(jobContext.getConfiguration(), CONF_LABEL_NUM_SPLITS, splits.size());
        return splits;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<DynamicInputChunk> splitCopyListingIntoChunksWithShuffle(JobContext context) throws IOException {
        Configuration configuration = context.getConfiguration();
        int numRecords = DynamicInputFormat.getNumberOfRecords(configuration);
        int numMaps = DynamicInputFormat.getNumMapTasks(configuration);
        int maxChunksTolerable = DynamicInputFormat.getMaxChunksTolerable(configuration);
        int splitRatio = DynamicInputFormat.getListingSplitRatio(configuration, numMaps, numRecords);
        DynamicInputFormat.validateNumChunksUsing(splitRatio, numMaps, maxChunksTolerable);
        int numEntriesPerChunk = (int)Math.ceil((float)numRecords / (float)(splitRatio * numMaps));
        DistCpUtils.publish(context.getConfiguration(), CONF_LABEL_NUM_ENTRIES_PER_CHUNK, numEntriesPerChunk);
        int nChunksTotal = (int)Math.ceil((float)numRecords / (float)numEntriesPerChunk);
        int nChunksOpenAtOnce = Math.min(N_CHUNKS_OPEN_AT_ONCE_DEFAULT, nChunksTotal);
        Path listingPath = DynamicInputFormat.getListingFilePath(configuration);
        SequenceFile.Reader reader = new SequenceFile.Reader(configuration, new SequenceFile.Reader.Option[]{SequenceFile.Reader.file((Path)listingPath)});
        List<DynamicInputChunk> openChunks = new ArrayList<DynamicInputChunk>();
        ArrayList<DynamicInputChunk> chunksFinal = new ArrayList<DynamicInputChunk>();
        CopyListingFileStatus fileStatus = new CopyListingFileStatus();
        Text relPath = new Text();
        int recordCounter = 0;
        int chunkCount = 0;
        try {
            while (reader.next((Writable)relPath, (Writable)fileStatus)) {
                if (recordCounter % (nChunksOpenAtOnce * numEntriesPerChunk) == 0) {
                    DynamicInputFormat.closeAll(openChunks);
                    chunksFinal.addAll(openChunks);
                    openChunks = DynamicInputFormat.createChunks(configuration, chunkCount, nChunksTotal, nChunksOpenAtOnce);
                    chunkCount += openChunks.size();
                    nChunksOpenAtOnce = openChunks.size();
                    recordCounter = 0;
                }
                ((DynamicInputChunk)openChunks.get(recordCounter % nChunksOpenAtOnce)).write(relPath, fileStatus);
                ++recordCounter;
            }
        }
        finally {
            DynamicInputFormat.closeAll(openChunks);
            chunksFinal.addAll(openChunks);
            IOUtils.closeStream((Closeable)reader);
        }
        LOG.info((Object)("Number of dynamic-chunk-files created: " + chunksFinal.size()));
        return chunksFinal;
    }

    private static void validateNumChunksUsing(int splitRatio, int numMaps, int maxChunksTolerable) throws IOException {
        if (splitRatio * numMaps > maxChunksTolerable) {
            throw new IOException("Too many chunks created with splitRatio:" + splitRatio + ", numMaps:" + numMaps + ". Reduce numMaps or decrease split-ratio to proceed.");
        }
    }

    private static void closeAll(List<DynamicInputChunk> chunks) {
        for (DynamicInputChunk chunk : chunks) {
            chunk.close();
        }
    }

    private static List<DynamicInputChunk> createChunks(Configuration config, int chunkCount, int nChunksTotal, int nChunksOpenAtOnce) throws IOException {
        ArrayList<DynamicInputChunk> chunks = new ArrayList<DynamicInputChunk>();
        int chunkIdUpperBound = Math.min(nChunksTotal, chunkCount + nChunksOpenAtOnce);
        if (nChunksTotal - chunkIdUpperBound < nChunksOpenAtOnce) {
            chunkIdUpperBound = nChunksTotal;
        }
        for (int i = chunkCount; i < chunkIdUpperBound; ++i) {
            chunks.add(DynamicInputFormat.createChunk(i, config));
        }
        return chunks;
    }

    private static DynamicInputChunk createChunk(int chunkId, Configuration config) throws IOException {
        return DynamicInputChunk.createChunkForWrite(String.format("%05d", chunkId), config);
    }

    private static Path getListingFilePath(Configuration configuration) {
        Path listingFilePath;
        block4: {
            String listingFilePathString = configuration.get("distcp.listing.file.path", "");
            assert (!listingFilePathString.equals("")) : "Listing file not found.";
            listingFilePath = new Path(listingFilePathString);
            try {
                assert (listingFilePath.getFileSystem(configuration).exists(listingFilePath)) : "Listing file: " + listingFilePath + " not found.";
            }
            catch (IOException e) {
                if ($assertionsDisabled) break block4;
                throw new AssertionError((Object)("Listing file: " + listingFilePath + " couldn't be accessed. " + e.getMessage()));
            }
        }
        return listingFilePath;
    }

    private static int getNumberOfRecords(Configuration configuration) {
        return DistCpUtils.getInt(configuration, "mapred.number.of.records");
    }

    private static int getNumMapTasks(Configuration configuration) {
        return DistCpUtils.getInt(configuration, "mapreduce.job.maps");
    }

    private static int getListingSplitRatio(Configuration configuration, int numMaps, int numPaths) {
        return configuration.getInt(CONF_LABEL_LISTING_SPLIT_RATIO, DynamicInputFormat.getSplitRatio(numMaps, numPaths, configuration));
    }

    private static int getMaxChunksTolerable(Configuration conf) {
        int maxChunksTolerable = conf.getInt("distcp.dynamic.max.chunks.tolerable", 400);
        if (maxChunksTolerable <= 0) {
            LOG.warn((Object)"distcp.dynamic.max.chunks.tolerable should be positive. Fall back to default value: 400");
            maxChunksTolerable = 400;
        }
        return maxChunksTolerable;
    }

    private static int getMaxChunksIdeal(Configuration conf) {
        int maxChunksIdeal = conf.getInt("distcp.dynamic.max.chunks.ideal", 100);
        if (maxChunksIdeal <= 0) {
            LOG.warn((Object)"distcp.dynamic.max.chunks.ideal should be positive. Fall back to default value: 100");
            maxChunksIdeal = 100;
        }
        return maxChunksIdeal;
    }

    private static int getMinRecordsPerChunk(Configuration conf) {
        int minRecordsPerChunk = conf.getInt("distcp.dynamic.min.records_per_chunk", 5);
        if (minRecordsPerChunk <= 0) {
            LOG.warn((Object)"distcp.dynamic.min.records_per_chunk should be positive. Fall back to default value: 5");
            minRecordsPerChunk = 5;
        }
        return minRecordsPerChunk;
    }

    private static int getSplitRatio(Configuration conf) {
        int splitRatio = conf.getInt("distcp.dynamic.split.ratio", 2);
        if (splitRatio <= 0) {
            LOG.warn((Object)"distcp.dynamic.split.ratio should be positive. Fall back to default value: 2");
            splitRatio = 2;
        }
        return splitRatio;
    }

    static int getSplitRatio(int nMaps, int nRecords) {
        return DynamicInputFormat.getSplitRatio(nMaps, nRecords, new Configuration());
    }

    static int getSplitRatio(int nMaps, int nRecords, Configuration conf) {
        int maxChunksIdeal = DynamicInputFormat.getMaxChunksIdeal(conf);
        int minRecordsPerChunk = DynamicInputFormat.getMinRecordsPerChunk(conf);
        int splitRatio = DynamicInputFormat.getSplitRatio(conf);
        if (nMaps == 1) {
            LOG.warn((Object)"nMaps == 1. Why use DynamicInputFormat?");
            return 1;
        }
        if (nMaps > maxChunksIdeal) {
            return splitRatio;
        }
        int nPickups = (int)Math.ceil((float)maxChunksIdeal / (float)nMaps);
        int nRecordsPerChunk = (int)Math.ceil((float)nRecords / (float)(nMaps * nPickups));
        return nRecordsPerChunk < minRecordsPerChunk ? splitRatio : nPickups;
    }

    static int getNumEntriesPerChunk(Configuration configuration) {
        return DistCpUtils.getInt(configuration, CONF_LABEL_NUM_ENTRIES_PER_CHUNK);
    }

    public RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new DynamicRecordReader();
    }
}

