/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.examples;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
import org.apache.tez.runtime.library.processor.SimpleProcessor;

public class BroadcastAndOneToOneExample
extends Configured
implements Tool {
    static String skipLocalityCheck = "-skipLocalityCheck";

    private DAG createDAG(FileSystem fs, TezConfiguration tezConf, Path stagingDir, boolean doLocalityCheck) throws IOException, YarnException {
        int numBroadcastTasks = 2;
        int numOneToOneTasks = 3;
        if (doLocalityCheck) {
            YarnClient yarnClient = YarnClient.createYarnClient();
            yarnClient.init((Configuration)tezConf);
            yarnClient.start();
            int numNMs = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING}).size();
            yarnClient.stop();
            numOneToOneTasks = numNMs - numBroadcastTasks - 1;
            if (numOneToOneTasks < 1) {
                numOneToOneTasks = 1;
            }
        }
        byte[] procByte = new byte[]{(byte)(doLocalityCheck ? 1 : 0), 1};
        UserPayload procPayload = UserPayload.create((ByteBuffer)ByteBuffer.wrap(procByte));
        System.out.println("Using " + numOneToOneTasks + " 1-1 tasks");
        Vertex broadcastVertex = Vertex.create((String)"Broadcast", (ProcessorDescriptor)ProcessorDescriptor.create((String)InputProcessor.class.getName()), (int)numBroadcastTasks);
        Vertex inputVertex = Vertex.create((String)"Input", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)InputProcessor.class.getName()).setUserPayload(procPayload)), (int)numOneToOneTasks);
        Vertex oneToOneVertex = Vertex.create((String)"OneToOne", (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)OneToOneProcessor.class.getName()).setUserPayload(procPayload)));
        oneToOneVertex.setVertexManagerPlugin(VertexManagerPluginDescriptor.create((String)InputReadyVertexManager.class.getName()));
        UnorderedKVEdgeConfig edgeConf = UnorderedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)IntWritable.class.getName()).setFromConfiguration((Configuration)tezConf).build();
        DAG dag = DAG.create((String)"BroadcastAndOneToOneExample");
        dag.addVertex(inputVertex).addVertex(broadcastVertex).addVertex(oneToOneVertex).addEdge(Edge.create((Vertex)inputVertex, (Vertex)oneToOneVertex, (EdgeProperty)edgeConf.createDefaultOneToOneEdgeProperty())).addEdge(Edge.create((Vertex)broadcastVertex, (Vertex)oneToOneVertex, (EdgeProperty)edgeConf.createDefaultBroadcastEdgeProperty()));
        return dag;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean run(Configuration conf, boolean doLocalityCheck) throws Exception {
        System.out.println("Running BroadcastAndOneToOneExample");
        TezConfiguration tezConf = conf != null ? new TezConfiguration(conf) : new TezConfiguration();
        tezConf.setBoolean("tez.am.container.reuse.enabled", true);
        UserGroupInformation.setConfiguration((Configuration)tezConf);
        String user = UserGroupInformation.getCurrentUser().getShortUserName();
        FileSystem fs = FileSystem.get((Configuration)tezConf);
        String stagingDirStr = tezConf.get("tez.staging-dir", TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT) + "/" + "BroadcastAndOneToOneExample" + "/" + Long.toString(System.currentTimeMillis());
        Path stagingDir = new Path(stagingDirStr);
        tezConf.set("tez.staging-dir", stagingDirStr);
        stagingDir = fs.makeQualified(stagingDir);
        TezClient tezSession = null;
        tezSession = TezClient.create((String)"broadcastAndOneToOneExample", (TezConfiguration)tezConf);
        tezSession.start();
        DAGClient dagClient = null;
        try {
            DAG dag = this.createDAG(fs, tezConf, stagingDir, doLocalityCheck);
            tezSession.waitTillReady();
            dagClient = tezSession.submitDAG(dag);
            DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(null);
            if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
                System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
                boolean bl = false;
                return bl;
            }
            boolean bl = true;
            return bl;
        }
        finally {
            fs.delete(stagingDir, true);
            tezSession.stop();
        }
    }

    /*
     * Enabled aggressive block sorting
     */
    public int run(String[] args) throws Exception {
        boolean status;
        boolean doLocalityCheck = true;
        if (args.length == 1) {
            if (!args[0].equals(skipLocalityCheck)) {
                BroadcastAndOneToOneExample.printUsage();
                throw new TezException("Invalid command line");
            }
            doLocalityCheck = false;
        } else if (args.length > 1) {
            BroadcastAndOneToOneExample.printUsage();
            throw new TezException("Invalid command line");
        }
        if (doLocalityCheck && this.getConf().getBoolean("tez.local.mode", false)) {
            System.out.println("locality check is not valid in local mode. skipping");
            doLocalityCheck = false;
        }
        if (!(status = this.run(this.getConf(), doLocalityCheck))) return 1;
        return 0;
    }

    private static void printUsage() {
        System.err.println("broadcastAndOneToOneExample " + skipLocalityCheck);
        ToolRunner.printGenericCommandUsage((PrintStream)System.err);
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        BroadcastAndOneToOneExample job = new BroadcastAndOneToOneExample();
        int status = ToolRunner.run((Configuration)conf, (Tool)job, (String[])args);
        System.exit(status);
    }

    public static class OneToOneProcessor
    extends SimpleProcessor {
        Text word = new Text();

        public OneToOneProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            ObjectRegistry objectRegistry;
            String index;
            Preconditions.checkArgument((this.inputs.size() == 2 ? 1 : 0) != 0);
            KeyValueReader inputKvReader = (KeyValueReader)((LogicalInput)this.getInputs().get("Input")).getReader();
            KeyValueReader broadcastKvReader = (KeyValueReader)((LogicalInput)this.getInputs().get("Broadcast")).getReader();
            int sum = 0;
            while (broadcastKvReader.next()) {
                sum += ((IntWritable)broadcastKvReader.getCurrentValue()).get();
            }
            while (inputKvReader.next()) {
                sum += ((IntWritable)inputKvReader.getCurrentValue()).get();
            }
            boolean doLocalityCheck = this.getContext().getUserPayload().getPayload().get(0) > 0;
            byte broadcastSum = this.getContext().getUserPayload().getPayload().get(1);
            int expectedSum = broadcastSum + this.getContext().getTaskIndex();
            System.out.println("Index: " + this.getContext().getTaskIndex() + " sum: " + sum + " expectedSum: " + expectedSum + " broadcastSum: " + broadcastSum);
            Preconditions.checkState((sum == expectedSum ? 1 : 0) != 0, (Object)("Sum = " + sum));
            if (doLocalityCheck && ((index = (String)(objectRegistry = this.getContext().getObjectRegistry()).get(String.valueOf(this.getContext().getTaskIndex()))) == null || Integer.valueOf(index).intValue() != this.getContext().getTaskIndex())) {
                String msg = "Did not find expected local producer " + this.getContext().getTaskIndex() + " in the same JVM";
                System.out.println(msg);
                throw new TezUncheckedException(msg);
            }
        }
    }

    public static class InputProcessor
    extends SimpleProcessor {
        Text word = new Text();

        public InputProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            ByteBuffer userPayload;
            Preconditions.checkArgument((this.getOutputs().size() == 1 ? 1 : 0) != 0);
            UnorderedKVOutput output = (UnorderedKVOutput)this.getOutputs().values().iterator().next();
            KeyValuesWriter kvWriter = output.getWriter();
            kvWriter.write((Object)this.word, (Object)new IntWritable(this.getContext().getTaskIndex()));
            ByteBuffer byteBuffer = userPayload = this.getContext().getUserPayload() == null ? null : this.getContext().getUserPayload().getPayload();
            if (userPayload != null) {
                boolean doLocalityCheck;
                boolean bl = doLocalityCheck = this.getContext().getUserPayload().getPayload().get(0) > 0;
                if (doLocalityCheck) {
                    ObjectRegistry objectRegistry = this.getContext().getObjectRegistry();
                    String entry = String.valueOf(this.getContext().getTaskIndex());
                    objectRegistry.cacheForDAG(entry, (Object)entry);
                }
            }
        }
    }
}

