/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hama.graph;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSP;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.Partitioner;
import org.apache.hama.bsp.PartitioningRunner;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
import org.apache.hama.graph.AggregationRunner;
import org.apache.hama.graph.Edge;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.graph.IDSkippingIterator;
import org.apache.hama.graph.ListVerticesInfo;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VertexMessageIterable;
import org.apache.hama.graph.VertexOutputWriter;
import org.apache.hama.graph.VerticesInfo;

public final class GraphJobRunner<V extends WritableComparable, E extends Writable, M extends Writable>
extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
    private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
    public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
    public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
    public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
    public static final String S_FLAG_VERTEX_INCREASE = "hama.3";
    public static final String S_FLAG_VERTEX_DECREASE = "hama.4";
    public static final String S_FLAG_VERTEX_ALTER_COUNTER = "hama.5";
    public static final String S_FLAG_VERTEX_TOTAL_VERTICES = "hama.6";
    public static final Text FLAG_MESSAGE_COUNTS = new Text("hama.0");
    public static final Text FLAG_VERTEX_INCREASE = new Text("hama.3");
    public static final Text FLAG_VERTEX_DECREASE = new Text("hama.4");
    public static final Text FLAG_VERTEX_ALTER_COUNTER = new Text("hama.5");
    public static final Text FLAG_VERTEX_TOTAL_VERTICES = new Text("hama.6");
    public static final String VERTEX_CLASS_KEY = "hama.graph.vertex.class";
    private HamaConfiguration conf;
    private Partitioner<V, M> partitioner;
    public static Class<?> VERTEX_CLASS;
    public static Class<? extends WritableComparable> VERTEX_ID_CLASS;
    public static Class<? extends Writable> VERTEX_VALUE_CLASS;
    public static Class<? extends Writable> EDGE_VALUE_CLASS;
    public static Class<Vertex<?, ?, ?>> vertexClass;
    private VerticesInfo<V, E, M> vertices;
    private boolean updated = true;
    private int globalUpdateCounts = 0;
    private int changedVertexCnt = 0;
    private long numberVertices = 0L;
    private int maxIteration = -1;
    private long iteration;
    private AggregationRunner<V, E, M> aggregationRunner;
    private VertexOutputWriter<Writable, Writable, V, E, M> vertexOutputWriter;
    private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;

    public final void setup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        this.setupFields(peer);
        this.loadVertices(peer);
        this.countGlobalVertexCount(peer);
        this.doInitialSuperstep(peer);
    }

    public final void bsp(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        while (this.updated && (this.maxIteration <= 0 || this.iteration <= (long)this.maxIteration)) {
            this.globalUpdateCounts = 0;
            peer.sync();
            GraphJobMessage firstVertexMessage = this.parseMessages(peer);
            firstVertexMessage = this.doAggregationUpdates(firstVertexMessage, peer);
            if (!this.updated) break;
            this.doSuperstep(firstVertexMessage, peer);
            if (!GraphJobRunner.isMasterTask(peer)) continue;
            peer.getCounter((Enum)GraphJobCounter.ITERATIONS).increment(1L);
        }
    }

    public final void cleanup(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        this.vertexOutputWriter.setup((Configuration)this.conf);
        IDSkippingIterator<V, E, M> skippingIterator = this.vertices.skippingIterator();
        while (skippingIterator.hasNext()) {
            this.vertexOutputWriter.write(skippingIterator.next(), peer);
        }
        this.vertices.cleanup(this.conf, peer.getTaskId());
    }

    private GraphJobMessage doAggregationUpdates(GraphJobMessage firstVertexMessage, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        if (GraphJobRunner.isMasterTask(peer)) {
            MapWritable updatedCnt = new MapWritable();
            updatedCnt.put((Writable)FLAG_VERTEX_TOTAL_VERTICES, (Writable)new LongWritable(peer.getCounter((Enum)GraphJobCounter.INPUT_VERTICES).getCounter()));
            if (this.globalUpdateCounts == 0) {
                updatedCnt.put((Writable)FLAG_MESSAGE_COUNTS, (Writable)new IntWritable(Integer.MIN_VALUE));
            } else {
                this.getAggregationRunner().doMasterAggregation(updatedCnt);
            }
            for (String peerName : peer.getAllPeerNames()) {
                peer.send(peerName, (Writable)new GraphJobMessage(updatedCnt));
            }
        }
        if (this.getAggregationRunner().isEnabled()) {
            if (firstVertexMessage != null) {
                peer.send(peer.getPeerName(), (Writable)firstVertexMessage);
            }
            GraphJobMessage msg = null;
            while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
                peer.send(peer.getPeerName(), (Writable)msg);
            }
            peer.sync();
            this.updated = this.getAggregationRunner().receiveAggregatedValues(((GraphJobMessage)peer.getCurrentMessage()).getMap(), this.iteration);
            firstVertexMessage = (GraphJobMessage)peer.getCurrentMessage();
        }
        return firstVertexMessage;
    }

    private void doSuperstep(GraphJobMessage currentMessage, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        int activeVertices = 0;
        this.changedVertexCnt = 0;
        this.vertices.startSuperstep();
        IDSkippingIterator<WritableComparable, E, M> iterator = this.vertices.skippingIterator();
        VertexMessageIterable<WritableComparable, M> iterable = null;
        Vertex vertex = null;
        while (iterator.hasNext(currentMessage == null ? null : currentMessage.getVertexId(), IDSkippingIterator.Strategy.ALL)) {
            vertex = iterator.next();
            if (currentMessage != null) {
                iterable = this.iterate(currentMessage, currentMessage.getVertexId(), vertex, peer);
            }
            if (iterable != null && vertex.isHalted()) {
                vertex.setActive();
            }
            if (!vertex.isHalted()) {
                if (iterable == null) {
                    vertex.compute(Collections.emptyList());
                } else {
                    vertex.compute(iterable);
                    currentMessage = iterable.getOverflowMessage();
                }
                ++activeVertices;
            }
            this.vertices.finishVertexComputation(vertex);
        }
        this.vertices.finishSuperstep();
        this.getAggregationRunner().sendAggregatorValues(peer, activeVertices, this.changedVertexCnt);
        ++this.iteration;
    }

    private VertexMessageIterable<V, M> iterate(GraphJobMessage currentMessage, V firstMessageId, Vertex<V, E, M> vertex, BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        int comparision = firstMessageId.compareTo(vertex.getVertexID());
        if (this.conf.getBoolean("hama.check.missing.vertex", true)) {
            if (comparision < 0) {
                throw new IllegalArgumentException("A message has recieved with a destination ID: " + firstMessageId + " that does not exist! (Vertex iterator is at" + vertex.getVertexID() + " ID)");
            }
        } else {
            while (comparision < 0) {
                VertexMessageIterable messageIterable = new VertexMessageIterable(currentMessage, firstMessageId, peer);
                currentMessage = messageIterable.getOverflowMessage();
                firstMessageId = currentMessage.getVertexId();
                comparision = firstMessageId.compareTo(vertex.getVertexID());
            }
        }
        if (comparision == 0) {
            return new VertexMessageIterable(currentMessage, vertex.getVertexID(), peer);
        }
        return null;
    }

    private void doInitialSuperstep(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        this.vertices.startSuperstep();
        this.changedVertexCnt = 0;
        IDSkippingIterator<V, E, M> skippingIterator = this.vertices.skippingIterator();
        while (skippingIterator.hasNext()) {
            Vertex<V, E, M> vertex = skippingIterator.next();
            vertex.setup(this.conf);
            vertex.compute(Collections.singleton(vertex.getValue()));
            this.vertices.finishVertexComputation(vertex);
        }
        this.vertices.finishSuperstep();
        this.getAggregationRunner().sendAggregatorValues(peer, 1, this.changedVertexCnt);
        ++this.iteration;
    }

    private void setupFields(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException {
        this.peer = peer;
        this.conf = peer.getConfiguration();
        this.maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration", -1);
        GraphJobRunner.initClasses((Configuration)this.conf);
        this.partitioner = (Partitioner)ReflectionUtils.newInstance((Class)this.conf.getClass("bsp.input.partitioner.class", HashPartitioner.class), (Configuration)this.conf);
        Class outputWriter = this.conf.getClass("hama.graph.vertex.output.writer.class", VertexOutputWriter.class);
        this.vertexOutputWriter = (VertexOutputWriter)org.apache.hama.util.ReflectionUtils.newInstance((Class)outputWriter);
        this.setAggregationRunner(new AggregationRunner());
        this.getAggregationRunner().setupAggregators(peer);
        Class verticesInfoClass = this.conf.getClass("hama.graph.vertices.info", ListVerticesInfo.class, VerticesInfo.class);
        this.vertices = (VerticesInfo)org.apache.hama.util.ReflectionUtils.newInstance((Class)verticesInfoClass);
        this.vertices.init(this, this.conf, peer.getTaskId());
    }

    public static <V extends WritableComparable<? super V>, E extends Writable, M extends Writable> void initClasses(Configuration conf) {
        Class vertexIdClass = conf.getClass("hama.graph.vertex.id.class", Text.class, Writable.class);
        Class vertexValueClass = conf.getClass("hama.graph.vertex.value.class", IntWritable.class, Writable.class);
        Class edgeValueClass = conf.getClass("hama.graph.vertex.edge.value.class", IntWritable.class, Writable.class);
        vertexClass = conf.getClass(VERTEX_CLASS_KEY, Vertex.class);
        VERTEX_ID_CLASS = vertexIdClass;
        VERTEX_VALUE_CLASS = vertexValueClass;
        VERTEX_CLASS = vertexClass;
        EDGE_VALUE_CLASS = edgeValueClass;
    }

    private void loadVertices(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        boolean selfReference = this.conf.getBoolean("hama.graph.self.ref", false);
        PartitioningRunner.RecordConverter converter = (PartitioningRunner.RecordConverter)ReflectionUtils.newInstance((Class)this.conf.getClass("bsp.runtime.partition.recordconverter", PartitioningRunner.DefaultRecordConverter.class, PartitioningRunner.RecordConverter.class), (Configuration)this.conf);
        Vertex vertex = GraphJobRunner.newVertexInstance(VERTEX_CLASS);
        Vertex currentVertex = GraphJobRunner.newVertexInstance(VERTEX_CLASS);
        KeyValuePair record = null;
        KeyValuePair converted = null;
        while ((record = peer.readNext()) != null) {
            converted = converter.convertRecord(record, (Configuration)this.conf);
            currentVertex = (Vertex)converted.getValue();
            if (vertex.getVertexID() == null) {
                vertex = currentVertex;
                continue;
            }
            if (vertex.getVertexID().equals(currentVertex.getVertexID())) {
                for (Edge edge : currentVertex.getEdges()) {
                    vertex.addEdge(edge);
                }
                continue;
            }
            if (vertex.compareTo(currentVertex) > 0) {
                throw new IOException("The records of split aren't in order by vertex ID.");
            }
            if (selfReference) {
                vertex.addEdge(new Edge(vertex.getVertexID(), null));
            }
            this.vertices.addVertex(vertex);
            vertex = currentVertex;
        }
        if (selfReference) {
            vertex.addEdge(new Edge<V, Object>(vertex.getVertexID(), null));
        }
        this.vertices.addVertex(vertex);
        this.vertices.finishAdditions();
        this.vertices.finishSuperstep();
        LOG.info((Object)(this.vertices.size() + " vertices are loaded into " + peer.getPeerName()));
        LOG.debug((Object)"Starting Vertex processing!");
    }

    private void addVertex(Vertex<V, E, M> vertex) throws IOException {
        vertex.setRunner(this);
        vertex.setup(this.conf);
        if (this.conf.getBoolean("hama.graph.self.ref", false)) {
            vertex.addEdge(new Edge<V, Object>(vertex.getVertexID(), null));
        }
        LOG.debug((Object)("Added VertexID: " + vertex.getVertexID() + " in peer " + this.peer.getPeerName()));
        this.vertices.addVertex(vertex);
    }

    private void removeVertex(V vertexID) {
        this.vertices.removeVertex(vertexID);
        LOG.debug((Object)("Removed VertexID: " + vertexID + " in peer " + this.peer.getPeerName()));
    }

    private void finishAdditions() throws IOException {
        this.vertices.finishAdditions();
        this.vertices.finishSuperstep();
    }

    private void finishRemovals() throws IOException {
        this.vertices.finishRemovals();
        this.vertices.finishSuperstep();
    }

    private void countGlobalVertexCount(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        GraphJobMessage msg;
        for (String peerName : peer.getAllPeerNames()) {
            peer.send(peerName, (Writable)new GraphJobMessage(new IntWritable(this.vertices.size())));
        }
        peer.sync();
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null) {
            if (!msg.isVerticesSizeMessage()) continue;
            this.numberVertices += (long)msg.getVerticesSize().get();
        }
        if (GraphJobRunner.isMasterTask(peer)) {
            peer.getCounter((Enum)GraphJobCounter.INPUT_VERTICES).increment(this.numberVertices);
        }
    }

    private GraphJobMessage parseMessages(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) throws IOException, SyncException, InterruptedException {
        GraphJobMessage msg = null;
        boolean dynamicAdditions = false;
        boolean dynamicRemovals = false;
        while ((msg = (GraphJobMessage)peer.getCurrentMessage()) != null && !msg.isVertexMessage()) {
            if (msg.isMapMessage()) {
                for (Map.Entry e : msg.getMap().entrySet()) {
                    Text vertexID = (Text)e.getKey();
                    if (FLAG_MESSAGE_COUNTS.equals((Object)vertexID)) {
                        if (((IntWritable)e.getValue()).get() == Integer.MIN_VALUE) {
                            this.updated = false;
                            continue;
                        }
                        this.globalUpdateCounts += ((IntWritable)e.getValue()).get();
                        continue;
                    }
                    if (this.getAggregationRunner().isEnabled() && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
                        this.getAggregationRunner().masterReadAggregatedValue(vertexID, (Writable)e.getValue());
                        continue;
                    }
                    if (this.getAggregationRunner().isEnabled() && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
                        this.getAggregationRunner().masterReadAggregatedIncrementalValue(vertexID, (Writable)e.getValue());
                        continue;
                    }
                    if (FLAG_VERTEX_INCREASE.equals((Object)vertexID)) {
                        dynamicAdditions = true;
                        this.addVertex((Vertex)e.getValue());
                        continue;
                    }
                    if (FLAG_VERTEX_DECREASE.equals((Object)vertexID)) {
                        dynamicRemovals = true;
                        this.removeVertex((WritableComparable)e.getValue());
                        continue;
                    }
                    if (FLAG_VERTEX_TOTAL_VERTICES.equals((Object)vertexID)) {
                        this.numberVertices = ((LongWritable)e.getValue()).get();
                        continue;
                    }
                    if (!FLAG_VERTEX_ALTER_COUNTER.equals((Object)vertexID)) continue;
                    if (GraphJobRunner.isMasterTask(peer)) {
                        peer.getCounter((Enum)GraphJobCounter.INPUT_VERTICES).increment(((LongWritable)e.getValue()).get());
                        continue;
                    }
                    throw new UnsupportedOperationException("A message to increase vertex count is in a wrong place: " + peer);
                }
                continue;
            }
            throw new UnsupportedOperationException("Unknown message type: " + msg);
        }
        if (dynamicAdditions) {
            this.finishAdditions();
        }
        if (dynamicRemovals) {
            this.finishRemovals();
        }
        return msg;
    }

    public final long getNumberVertices() {
        return this.numberVertices;
    }

    public final long getNumberIterations() {
        return this.iteration;
    }

    public final int getMaxIteration() {
        return this.maxIteration;
    }

    public final Partitioner<V, M> getPartitioner() {
        return this.partitioner;
    }

    public final Writable getLastAggregatedValue(int index) {
        return this.getAggregationRunner().getLastAggregatedValue(index);
    }

    public final IntWritable getNumLastAggregatedVertices(int index) {
        return this.getAggregationRunner().getNumLastAggregatedVertices(index);
    }

    public final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer() {
        return this.peer;
    }

    public static boolean isMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        return peer.getPeerName().equals(GraphJobRunner.getMasterTask(peer));
    }

    public static String getMasterTask(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        return peer.getPeerName(0);
    }

    public static <V extends WritableComparable, E extends Writable, M extends Writable> Vertex<V, E, M> newVertexInstance(Class<?> vertexClass) {
        return (Vertex)org.apache.hama.util.ReflectionUtils.newInstance(vertexClass);
    }

    public static <X extends Writable> X createVertexIDObject() {
        return (X)((Writable)org.apache.hama.util.ReflectionUtils.newInstance(VERTEX_ID_CLASS));
    }

    public static <X extends Writable> X createVertexValue() {
        return (X)((Writable)org.apache.hama.util.ReflectionUtils.newInstance(VERTEX_VALUE_CLASS));
    }

    public static <X extends Writable> X createEdgeCostObject() {
        return (X)((Writable)org.apache.hama.util.ReflectionUtils.newInstance(EDGE_VALUE_CLASS));
    }

    public int getChangedVertexCnt() {
        return this.changedVertexCnt;
    }

    public void setChangedVertexCnt(int changedVertexCnt) {
        this.changedVertexCnt = changedVertexCnt;
    }

    public AggregationRunner<V, E, M> getAggregationRunner() {
        return this.aggregationRunner;
    }

    void setAggregationRunner(AggregationRunner<V, E, M> aggregationRunner) {
        this.aggregationRunner = aggregationRunner;
    }

    public static enum GraphJobCounter {
        MULTISTEP_PARTITIONING,
        ITERATIONS,
        INPUT_VERTICES,
        AGGREGATE_VERTICES;

    }
}

