/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.examples;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.security.DAGAccessControls;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import org.apache.tez.mapreduce.examples.ExampleDriver;
import org.apache.tez.mapreduce.examples.helpers.SplitsInClientOptionParser;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.output.MROutputLegacy;
import org.apache.tez.mapreduce.processor.map.MapProcessor;
import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestOrderedWordCount
extends Configured
implements Tool {
    private static Logger LOG = LoggerFactory.getLogger(TestOrderedWordCount.class);
    private static final String DAG_VIEW_ACLS = "tez.testorderedwordcount.view-acls";
    private static final String DAG_MODIFY_ACLS = "tez.testorderedwordcount.modify-acls";
    private static final String IS_MAX_IPC_DATA_SET_BY_USER = "tez.testorderedwordcount.is-max-ipc-set-by-user";
    private static final String MAX_IPC_DATA_LENGTH = "tez.testorderedwordcount.ipc.maximum.data.length";
    private static final String EXCEED_IPC_DATA_LIMIT = "tez.testorderedwordcount.exceed.ipc.limit";
    private static final String IPC_PAYLOAD = "tez.testorderedwordcount.ipc.payload";
    private static final int NO_OF_VERTICES = 3;
    private Credentials credentials = new Credentials();

    @VisibleForTesting
    public DAG createDAG(FileSystem fs, Configuration conf, Map<String, LocalResource> commonLocalResources, Path stagingDir, int dagIndex, String inputPath, String outputPath, boolean generateSplitsInClient, boolean useMRSettings, int intermediateNumReduceTasks, int maxDataLengthThroughIPC, int exceedDataLimit) throws Exception {
        Vertex finalReduceVertex;
        Vertex intermediateVertex;
        Vertex mapVertex;
        DataSourceDescriptor dsd;
        JobConf mapStageConf = new JobConf(conf);
        mapStageConf.set("mapreduce.job.map.class", TokenizerMapper.class.getName());
        MRHelpers.translateMRConfToTez((Configuration)mapStageConf, (!useMRSettings ? 1 : 0) != 0);
        JobConf iReduceStageConf = new JobConf(conf);
        iReduceStageConf.setInt("mapreduce.job.reduces", 2);
        iReduceStageConf.set("mapreduce.job.reduce.class", IntSumReducer.class.getName());
        iReduceStageConf.set("tez.runtime.key.class", Text.class.getName());
        iReduceStageConf.set("tez.runtime.value.class", IntWritable.class.getName());
        iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
        MRHelpers.translateMRConfToTez((Configuration)iReduceStageConf, (!useMRSettings ? 1 : 0) != 0);
        JobConf finalReduceConf = new JobConf(conf);
        finalReduceConf.setInt("mapreduce.job.reduces", 1);
        finalReduceConf.set("mapreduce.job.reduce.class", MyOrderByNoOpReducer.class.getName());
        finalReduceConf.set("tez.runtime.key.class", IntWritable.class.getName());
        finalReduceConf.set("tez.runtime.value.class", Text.class.getName());
        MRHelpers.translateMRConfToTez((Configuration)finalReduceConf, (!useMRSettings ? 1 : 0) != 0);
        MRHelpers.configureMRApiUsage((Configuration)mapStageConf);
        MRHelpers.configureMRApiUsage((Configuration)iReduceStageConf);
        MRHelpers.configureMRApiUsage((Configuration)finalReduceConf);
        ArrayList<Vertex> vertices = new ArrayList<Vertex>();
        String mapStageHistoryText = TezUtils.convertToHistoryText((String)"Initial Tokenizer Vertex", (Configuration)mapStageConf);
        if (generateSplitsInClient) {
            mapStageConf.set("mapreduce.job.inputformat.class", TextInputFormat.class.getName());
            mapStageConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
            mapStageConf.setBoolean("mapred.mapper.new-api", true);
            dsd = MRInputHelpers.configureMRInputWithLegacySplitGeneration((Configuration)mapStageConf, (Path)stagingDir, (boolean)true);
        } else {
            dsd = MRInputLegacy.createConfigBuilder((Configuration)mapStageConf, TextInputFormat.class, (String)inputPath).build();
        }
        dsd.getInputDescriptor().setHistoryText(TezUtils.convertToHistoryText((String)("HDFS Input " + inputPath), (Configuration)mapStageConf));
        HashMap mapEnv = Maps.newHashMap();
        MRHelpers.updateEnvBasedOnMRTaskEnv((Configuration)mapStageConf, (Map)mapEnv, (boolean)true);
        HashMap reduceEnv = Maps.newHashMap();
        MRHelpers.updateEnvBasedOnMRTaskEnv((Configuration)mapStageConf, (Map)reduceEnv, (boolean)false);
        Configuration copyMapStageConf = new Configuration((Configuration)mapStageConf);
        this.setMaxDataLengthConf(copyMapStageConf, maxDataLengthThroughIPC, exceedDataLimit);
        ProcessorDescriptor mapProcessorDescriptor = (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)MapProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)copyMapStageConf))).setHistoryText(mapStageHistoryText);
        if (!useMRSettings) {
            mapVertex = Vertex.create((String)"initialmap", (ProcessorDescriptor)mapProcessorDescriptor);
        } else {
            mapVertex = Vertex.create((String)"initialmap", (ProcessorDescriptor)mapProcessorDescriptor, (int)-1, (Resource)MRHelpers.getResourceForMRMapper((Configuration)mapStageConf));
            mapVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRMapper((Configuration)mapStageConf));
            mapVertex.setTaskEnvironment((Map)mapEnv);
        }
        mapVertex.addTaskLocalFiles(commonLocalResources).addDataSource("MRInput", dsd);
        vertices.add(mapVertex);
        Configuration copyiReduceStageConf = new Configuration((Configuration)iReduceStageConf);
        this.setMaxDataLengthConf(copyiReduceStageConf, maxDataLengthThroughIPC, exceedDataLimit);
        String iReduceStageHistoryText = TezUtils.convertToHistoryText((String)"Intermediate Summation Vertex", (Configuration)iReduceStageConf);
        ProcessorDescriptor iReduceProcessorDescriptor = (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)copyiReduceStageConf))).setHistoryText(iReduceStageHistoryText);
        if (!useMRSettings) {
            intermediateVertex = Vertex.create((String)"intermediate_reducer", (ProcessorDescriptor)iReduceProcessorDescriptor, (int)intermediateNumReduceTasks);
        } else {
            intermediateVertex = Vertex.create((String)"intermediate_reducer", (ProcessorDescriptor)iReduceProcessorDescriptor, (int)intermediateNumReduceTasks, (Resource)MRHelpers.getResourceForMRReducer((Configuration)iReduceStageConf));
            intermediateVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer((Configuration)iReduceStageConf));
            intermediateVertex.setTaskEnvironment((Map)reduceEnv);
        }
        intermediateVertex.addTaskLocalFiles(commonLocalResources);
        vertices.add(intermediateVertex);
        Configuration copyFinalReduceConf = new Configuration((Configuration)finalReduceConf);
        this.setMaxDataLengthConf(copyFinalReduceConf, maxDataLengthThroughIPC, exceedDataLimit);
        String finalReduceStageHistoryText = TezUtils.convertToHistoryText((String)"Final Sorter Vertex", (Configuration)finalReduceConf);
        UserPayload finalReducePayload = TezUtils.createUserPayloadFromConf((Configuration)copyFinalReduceConf);
        ProcessorDescriptor finalReduceProcessorDescriptor = (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceProcessor.class.getName()).setUserPayload(finalReducePayload)).setHistoryText(finalReduceStageHistoryText);
        if (!useMRSettings) {
            finalReduceVertex = Vertex.create((String)"finalreduce", (ProcessorDescriptor)finalReduceProcessorDescriptor, (int)1);
        } else {
            finalReduceVertex = Vertex.create((String)"finalreduce", (ProcessorDescriptor)finalReduceProcessorDescriptor, (int)1, (Resource)MRHelpers.getResourceForMRReducer((Configuration)finalReduceConf));
            finalReduceVertex.setTaskLaunchCmdOpts(MRHelpers.getJavaOptsForMRReducer((Configuration)finalReduceConf));
            finalReduceVertex.setTaskEnvironment((Map)reduceEnv);
        }
        finalReduceVertex.addTaskLocalFiles(commonLocalResources);
        finalReduceVertex.addDataSink("MROutput", MROutputLegacy.createConfigBuilder((Configuration)finalReduceConf, TextOutputFormat.class, (String)outputPath).build());
        ((DataSinkDescriptor)finalReduceVertex.getDataSinks().get(0)).getOutputDescriptor().setHistoryText(TezUtils.convertToHistoryText((String)("HDFS Output " + outputPath), (Configuration)finalReduceConf));
        vertices.add(finalReduceVertex);
        DAG dag = DAG.create((String)("OrderedWordCount" + dagIndex));
        for (int i = 0; i < vertices.size(); ++i) {
            dag.addVertex((Vertex)vertices.get(i));
        }
        OrderedPartitionedKVEdgeConfig edgeConf1 = ((OrderedPartitionedKVEdgeConfig.Builder)OrderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)IntWritable.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)iReduceStageConf).configureInput().useLegacyInput().done()).build();
        dag.addEdge(Edge.create((Vertex)dag.getVertex("initialmap"), (Vertex)dag.getVertex("intermediate_reducer"), (EdgeProperty)edgeConf1.createDefaultEdgeProperty()));
        OrderedPartitionedKVEdgeConfig edgeConf2 = ((OrderedPartitionedKVEdgeConfig.Builder)OrderedPartitionedKVEdgeConfig.newBuilder((String)IntWritable.class.getName(), (String)Text.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)finalReduceConf).configureInput().useLegacyInput().done()).build();
        dag.addEdge(Edge.create((Vertex)dag.getVertex("intermediate_reducer"), (Vertex)dag.getVertex("finalreduce"), (EdgeProperty)edgeConf2.createDefaultEdgeProperty()));
        this.updateDAGACls(conf, dag, dagIndex);
        return dag;
    }

    private void setMaxDataLengthConf(Configuration config, int maxDataLengthThroughIPC, int exceedDataLimit) {
        if (maxDataLengthThroughIPC > 0) {
            config.setBoolean(IS_MAX_IPC_DATA_SET_BY_USER, true);
            config.setInt(EXCEED_IPC_DATA_LIMIT, exceedDataLimit);
            int payloadSize = (maxDataLengthThroughIPC * 1024 * 1024 + exceedDataLimit * 1024 * 1024) / 3;
            String payload = RandomStringUtils.randomAlphanumeric((int)payloadSize);
            config.set(IPC_PAYLOAD, payload);
        }
    }

    private void updateDAGACls(Configuration conf, DAG dag, int dagIndex) {
        LOG.info("Checking DAG specific ACLS");
        DAGAccessControls accessControls = null;
        String suffix = "." + dagIndex;
        if (conf.get(DAG_VIEW_ACLS + suffix) != null || conf.get(DAG_MODIFY_ACLS + suffix) != null) {
            accessControls = new DAGAccessControls(conf.get(DAG_VIEW_ACLS + suffix), conf.get(DAG_MODIFY_ACLS + suffix));
        } else if (conf.get(DAG_VIEW_ACLS) != null || conf.get(DAG_MODIFY_ACLS) != null) {
            accessControls = new DAGAccessControls(conf.get(DAG_VIEW_ACLS), conf.get(DAG_MODIFY_ACLS));
        }
        if (accessControls != null) {
            LOG.info("Setting DAG specific ACLS");
            dag.setAccessControls(accessControls);
        }
    }

    private static void printUsage() {
        String options = " [-generateSplitsInClient true/<false>]";
        System.err.println("Usage: testorderedwordcount <in> <out>" + options);
        System.err.println("Usage (In Session Mode): testorderedwordcount <in1> <out1> ... <inN> <outN>" + options);
        ToolRunner.printGenericCommandUsage((PrintStream)System.err);
    }

    public int run(String[] args) throws Exception {
        boolean generateSplitsInClient;
        Configuration conf = this.getConf();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        SplitsInClientOptionParser splitCmdLineParser = new SplitsInClientOptionParser();
        try {
            generateSplitsInClient = splitCmdLineParser.parse(otherArgs, false);
            otherArgs = splitCmdLineParser.getRemainingArgs();
        }
        catch (ParseException e1) {
            System.err.println("Invalid options");
            TestOrderedWordCount.printUsage();
            return 2;
        }
        boolean useTezSession = conf.getBoolean("USE_TEZ_SESSION", true);
        long interJobSleepTimeout = conf.getInt("INTER_JOB_SLEEP_INTERVAL", 0) * 1000;
        boolean retainStagingDir = conf.getBoolean("RETAIN_STAGING_DIR", false);
        boolean useMRSettings = conf.getBoolean("USE_MR_CONFIGS", true);
        int intermediateNumReduceTasks = conf.getInt("IREDUCE_NUM_TASKS", 2);
        int maxDataLengthThroughIPC = conf.getInt(MAX_IPC_DATA_LENGTH, -1);
        int exceedDataLimit = conf.getInt(EXCEED_IPC_DATA_LIMIT, 3);
        if (maxDataLengthThroughIPC > 0) {
            conf.setInt("ipc.maximum.data.length", maxDataLengthThroughIPC * 1024 * 1024);
        }
        if (otherArgs.length % 2 != 0 || !useTezSession && otherArgs.length != 2) {
            TestOrderedWordCount.printUsage();
            return 2;
        }
        ArrayList<String> inputPaths = new ArrayList<String>();
        ArrayList<String> outputPaths = new ArrayList<String>();
        TezConfiguration tezConf = new TezConfiguration(conf);
        for (int i = 0; i < otherArgs.length; i += 2) {
            FileSystem inputPathFs = new Path(otherArgs[i]).getFileSystem((Configuration)tezConf);
            inputPaths.add(inputPathFs.makeQualified(new Path(otherArgs[i])).toString());
            FileSystem outputPathFs = new Path(otherArgs[i + 1]).getFileSystem((Configuration)tezConf);
            outputPaths.add(outputPathFs.makeQualified(new Path(otherArgs[i + 1])).toString());
        }
        UserGroupInformation.setConfiguration((Configuration)conf);
        HadoopShim hadoopShim = new HadoopShimsLoader((Configuration)tezConf).getHadoopShim();
        TestOrderedWordCount instance = new TestOrderedWordCount();
        FileSystem fs = FileSystem.get((Configuration)conf);
        String stagingDirStr = conf.get("tez.staging-dir", TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + "/" + Long.toString(System.currentTimeMillis());
        Path stagingDir = new Path(stagingDirStr);
        FileSystem pathFs = stagingDir.getFileSystem((Configuration)tezConf);
        pathFs.mkdirs(new Path(stagingDirStr));
        tezConf.set("tez.staging-dir", stagingDirStr);
        stagingDir = pathFs.makeQualified(new Path(stagingDirStr));
        TokenCache.obtainTokensForNamenodes((Credentials)instance.credentials, (Path[])new Path[]{stagingDir}, (Configuration)conf);
        TezClientUtils.ensureStagingDirExists((Configuration)tezConf, (Path)stagingDir);
        if (useTezSession) {
            LOG.info("Creating Tez Session");
            tezConf.setBoolean("tez.am.mode.session", true);
        } else {
            tezConf.setBoolean("tez.am.mode.session", false);
        }
        TezClient tezSession = TezClient.create((String)"OrderedWordCountSession", (TezConfiguration)tezConf, null, (Credentials)instance.credentials);
        tezSession.start();
        if (tezSession.getAppMasterApplicationId() != null) {
            TezUtilsInternal.setHadoopCallerContext((HadoopShim)hadoopShim, (ApplicationId)tezSession.getAppMasterApplicationId());
        }
        DAGStatus dagStatus = null;
        DAGClient dagClient = null;
        String[] vNames = new String[]{"initialmap", "intermediate_reducer", "finalreduce"};
        EnumSet<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
        try {
            for (int dagIndex = 1; dagIndex <= inputPaths.size(); ++dagIndex) {
                if (dagIndex != 1 && interJobSleepTimeout > 0L) {
                    try {
                        LOG.info("Sleeping between jobs, sleepInterval=" + interJobSleepTimeout / 1000L);
                        Thread.sleep(interJobSleepTimeout);
                    }
                    catch (InterruptedException e) {
                        LOG.info("Main thread interrupted. Breaking out of job loop");
                        break;
                    }
                }
                String inputPath = (String)inputPaths.get(dagIndex - 1);
                String outputPath = (String)outputPaths.get(dagIndex - 1);
                if (fs.exists(new Path(outputPath))) {
                    throw new FileAlreadyExistsException("Output directory " + outputPath + " already exists");
                }
                LOG.info("Running OrderedWordCount DAG, dagIndex=" + dagIndex + ", inputPath=" + inputPath + ", outputPath=" + outputPath);
                TreeMap<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
                DAG dag = instance.createDAG(fs, (Configuration)tezConf, localResources, stagingDir, dagIndex, inputPath, outputPath, generateSplitsInClient, useMRSettings, intermediateNumReduceTasks, maxDataLengthThroughIPC, exceedDataLimit);
                String callerType = "TestOrderedWordCount";
                String callerId = tezSession.getAppMasterApplicationId() == null ? "UnknownApp_" + System.currentTimeMillis() + dagIndex : tezSession.getAppMasterApplicationId().toString() + "_" + dagIndex;
                dag.setCallerContext(CallerContext.create((String)"Tez", (String)callerId, (String)callerType, (String)"TestOrderedWordCount Job"));
                boolean doPreWarm = dagIndex == 1 && useTezSession && conf.getBoolean("PRE_WARM_SESSION", true);
                int preWarmNumContainers = 0;
                if (doPreWarm && (preWarmNumContainers = conf.getInt("PRE_WARM_NUM_CONTAINERS", 0)) <= 0) {
                    doPreWarm = false;
                }
                if (doPreWarm) {
                    LOG.info("Pre-warming Session");
                    PreWarmVertex preWarmVertex = PreWarmVertex.create((String)"PreWarm", (int)preWarmNumContainers, (Resource)dag.getVertex("initialmap").getTaskResource());
                    preWarmVertex.addTaskLocalFiles(dag.getVertex("initialmap").getTaskLocalFiles());
                    preWarmVertex.setTaskEnvironment(dag.getVertex("initialmap").getTaskEnvironment());
                    preWarmVertex.setTaskLaunchCmdOpts(dag.getVertex("initialmap").getTaskLaunchCmdOpts());
                    tezSession.preWarm(preWarmVertex);
                }
                if (useTezSession) {
                    LOG.info("Waiting for TezSession to get into ready state");
                    TestOrderedWordCount.waitForTezSessionReady(tezSession);
                    LOG.info("Submitting DAG to Tez Session, dagIndex=" + dagIndex);
                    dagClient = tezSession.submitDAG(dag);
                    LOG.info("Submitted DAG to Tez Session, dagIndex=" + dagIndex);
                } else {
                    LOG.info("Submitting DAG as a new Tez Application");
                    dagClient = tezSession.submitDAG(dag);
                }
                while ((dagStatus = dagClient.getDAGStatus(statusGetOpts)).getState() != DAGStatus.State.RUNNING && dagStatus.getState() != DAGStatus.State.SUCCEEDED && dagStatus.getState() != DAGStatus.State.FAILED && dagStatus.getState() != DAGStatus.State.KILLED && dagStatus.getState() != DAGStatus.State.ERROR) {
                    try {
                        Thread.sleep(500L);
                    }
                    catch (InterruptedException preWarmVertex) {}
                }
                while (dagStatus.getState() != DAGStatus.State.SUCCEEDED && dagStatus.getState() != DAGStatus.State.FAILED && dagStatus.getState() != DAGStatus.State.KILLED && dagStatus.getState() != DAGStatus.State.ERROR) {
                    if (dagStatus.getState() == DAGStatus.State.RUNNING) {
                        ExampleDriver.printDAGStatus(dagClient, vNames);
                    }
                    try {
                        try {
                            Thread.sleep(1000L);
                        }
                        catch (InterruptedException preWarmVertex) {
                            // empty catch block
                        }
                        dagStatus = dagClient.getDAGStatus(statusGetOpts);
                    }
                    catch (TezException e) {
                        LOG.error("Failed to get application progress. Exiting");
                        int n = -1;
                        if (!retainStagingDir) {
                            pathFs.delete(stagingDir, true);
                        }
                        LOG.info("Shutting down session");
                        tezSession.stop();
                        return n;
                    }
                }
                ExampleDriver.printDAGStatus(dagClient, vNames, true, true);
                LOG.info("DAG " + dagIndex + " completed. " + "FinalState=" + dagStatus.getState());
                if (dagStatus.getState() == DAGStatus.State.SUCCEEDED) continue;
                LOG.info("DAG " + dagIndex + " diagnostics: " + dagStatus.getDiagnostics());
            }
        }
        catch (Exception e) {
            LOG.error("Error occurred when submitting/running DAGs", (Throwable)e);
            throw e;
        }
        finally {
            if (!retainStagingDir) {
                pathFs.delete(stagingDir, true);
            }
            LOG.info("Shutting down session");
            tezSession.stop();
        }
        if (!useTezSession) {
            ExampleDriver.printDAGStatus(dagClient, vNames);
            LOG.info("Application completed. FinalState=" + dagStatus.getState());
        }
        return dagStatus.getState() == DAGStatus.State.SUCCEEDED ? 0 : 1;
    }

    private static void waitForTezSessionReady(TezClient tezSession) throws IOException, TezException, InterruptedException {
        tezSession.waitTillReady();
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run((Configuration)new TezConfiguration(), (Tool)new TestOrderedWordCount(), (String[])args);
        System.exit(res);
    }

    public static class MyOrderByNoOpReducer
    extends Reducer<IntWritable, Text, Text, IntWritable> {
        public void reduce(IntWritable key, Iterable<Text> values, Reducer.Context context) throws IOException, InterruptedException {
            for (Text word : values) {
                context.write((Object)word, (Object)key);
            }
        }
    }

    public static class IntSumReducer
    extends Reducer<Text, IntWritable, IntWritable, Text> {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Reducer.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            this.result.set(sum);
            context.write((Object)this.result, (Object)key);
        }
    }

    public static class TokenizerMapper
    extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void setup(Mapper.Context context) throws IOException, InterruptedException {
            Configuration conf = context.getConfiguration();
            if (conf.getBoolean(TestOrderedWordCount.IS_MAX_IPC_DATA_SET_BY_USER, false)) {
                LOG.info("Max IPC Data Length set : " + conf.getInt(TestOrderedWordCount.MAX_IPC_DATA_LENGTH, -1) + " MB," + " Exceed the Max IPC Data Length : " + conf.getInt(TestOrderedWordCount.EXCEED_IPC_DATA_LIMIT, 3) + " MB," + " Total Dag Payload sent through IPC : " + (conf.getInt(TestOrderedWordCount.MAX_IPC_DATA_LENGTH, -1) + conf.getInt(TestOrderedWordCount.EXCEED_IPC_DATA_LIMIT, 3)) + " MB," + " Each Vertex Processor payload : " + (conf.getInt(TestOrderedWordCount.MAX_IPC_DATA_LENGTH, -1) + conf.getInt(TestOrderedWordCount.EXCEED_IPC_DATA_LIMIT, 3)) / 3 + " MB");
            }
        }

        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write((Object)this.word, (Object)one);
            }
        }
    }
}

