/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hama.graph;

import com.google.common.base.Preconditions;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.graph.AbstractAggregator;
import org.apache.hama.graph.Aggregator;
import org.apache.hama.graph.GraphJobMessage;
import org.apache.hama.graph.GraphJobRunner;

public final class AggregationRunner<V extends WritableComparable, E extends Writable, M extends Writable> {
    private Aggregator<M>[] aggregators;
    private Writable[] globalAggregatorResult;
    private IntWritable[] globalAggregatorIncrement;
    private boolean[] isAbstractAggregator;
    private String[] aggregatorClassNames;
    private Text[] aggregatorValueFlag;
    private Text[] aggregatorIncrementFlag;
    private Aggregator<M>[] masterAggregator;
    private boolean enabled = false;
    private Configuration conf;

    public void setupAggregators(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
        this.conf = peer.getConfiguration();
        String aggregatorClasses = peer.getConfiguration().get("hama.graph.aggregator.class");
        if (aggregatorClasses != null) {
            this.enabled = true;
            this.aggregatorClassNames = aggregatorClasses.split(";");
            this.aggregators = new Aggregator[this.aggregatorClassNames.length];
            this.globalAggregatorResult = new Writable[this.aggregatorClassNames.length];
            this.globalAggregatorIncrement = new IntWritable[this.aggregatorClassNames.length];
            this.isAbstractAggregator = new boolean[this.aggregatorClassNames.length];
            this.aggregatorValueFlag = new Text[this.aggregatorClassNames.length];
            this.aggregatorIncrementFlag = new Text[this.aggregatorClassNames.length];
            if (GraphJobRunner.isMasterTask(peer)) {
                this.masterAggregator = new Aggregator[this.aggregatorClassNames.length];
            }
            for (int i = 0; i < this.aggregatorClassNames.length; ++i) {
                this.aggregators[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
                this.aggregatorValueFlag[i] = new Text("hama.1;" + i);
                this.aggregatorIncrementFlag[i] = new Text("hama.2;" + i);
                if (this.aggregators[i] instanceof AbstractAggregator) {
                    this.isAbstractAggregator[i] = true;
                }
                if (!GraphJobRunner.isMasterTask(peer)) continue;
                this.masterAggregator[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
            }
        }
    }

    public void sendAggregatorValues(BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer, int activeVertices, int changedVertexCnt) throws IOException {
        MapWritable updatedCnt = new MapWritable();
        updatedCnt.put((Writable)GraphJobRunner.FLAG_MESSAGE_COUNTS, (Writable)new IntWritable(activeVertices));
        updatedCnt.put((Writable)GraphJobRunner.FLAG_VERTEX_ALTER_COUNTER, (Writable)new LongWritable((long)changedVertexCnt));
        if (this.aggregators != null) {
            int i;
            for (i = 0; i < this.aggregators.length; ++i) {
                updatedCnt.put((Writable)this.aggregatorValueFlag[i], this.aggregators[i].getValue());
                if (!this.isAbstractAggregator[i]) continue;
                updatedCnt.put((Writable)this.aggregatorIncrementFlag[i], (Writable)((AbstractAggregator)this.aggregators[i]).getTimesAggregated());
            }
            for (i = 0; i < this.aggregators.length; ++i) {
                this.aggregators[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
                if (!GraphJobRunner.isMasterTask(peer)) continue;
                this.masterAggregator[i] = this.getNewAggregator(this.aggregatorClassNames[i]);
            }
        }
        peer.send(GraphJobRunner.getMasterTask(peer), (Writable)new GraphJobMessage(updatedCnt));
    }

    public void aggregateVertex(int index, M lastValue, M value) {
        if (this.isEnabled()) {
            Aggregator<M> aggregator = this.aggregators[index];
            aggregator.aggregate(value);
            if (this.isAbstractAggregator[index]) {
                AbstractAggregator intern = (AbstractAggregator)aggregator;
                intern.aggregate(lastValue, value);
                intern.aggregateInternal();
            }
        }
    }

    public void doMasterAggregation(MapWritable updatedCnt) {
        if (this.isEnabled()) {
            for (int i = 0; i < this.masterAggregator.length; ++i) {
                M lastAggregatedValue = this.masterAggregator[i].getValue();
                if (this.isAbstractAggregator[i]) {
                    AbstractAggregator intern = (AbstractAggregator)this.masterAggregator[i];
                    Object finalizeAggregation = intern.finalizeAggregation();
                    if (intern.finalizeAggregation() != null) {
                        lastAggregatedValue = finalizeAggregation;
                    }
                    updatedCnt.put((Writable)this.aggregatorIncrementFlag[i], (Writable)intern.getTimesAggregated());
                }
                updatedCnt.put((Writable)this.aggregatorValueFlag[i], lastAggregatedValue);
            }
        }
    }

    public boolean receiveAggregatedValues(MapWritable updatedValues, long iteration) throws IOException, SyncException, InterruptedException {
        for (int i = 0; i < this.aggregators.length; ++i) {
            this.globalAggregatorResult[i] = updatedValues.get((Object)this.aggregatorValueFlag[i]);
            this.globalAggregatorIncrement[i] = (IntWritable)updatedValues.get((Object)this.aggregatorIncrementFlag[i]);
        }
        IntWritable count = (IntWritable)updatedValues.get((Object)GraphJobRunner.FLAG_MESSAGE_COUNTS);
        return count == null || count.get() != Integer.MIN_VALUE;
    }

    public boolean isEnabled() {
        return this.enabled;
    }

    public void masterReadAggregatedValue(Text textIndex, M value) {
        int index = Integer.parseInt(textIndex.toString().split(";")[1]);
        this.masterAggregator[index].aggregate(value);
    }

    public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
        int index = Integer.parseInt(textIndex.toString().split(";")[1]);
        if (this.isAbstractAggregator[index]) {
            ((AbstractAggregator)this.masterAggregator[index]).addTimesAggregated(((IntWritable)value).get());
        }
    }

    private Aggregator<M> getNewAggregator(String clsName) {
        try {
            return (Aggregator)ReflectionUtils.newInstance((Class)this.conf.getClassByName(clsName), (Configuration)this.conf);
        }
        catch (ClassNotFoundException e) {
            e.printStackTrace();
            throw new IllegalArgumentException("Aggregator class " + clsName + " could not be found or instantiated!");
        }
    }

    public final Writable getLastAggregatedValue(int index) {
        return this.globalAggregatorResult[Preconditions.checkPositionIndex((int)index, (int)this.globalAggregatorResult.length)];
    }

    public final IntWritable getNumLastAggregatedVertices(int index) {
        return this.globalAggregatorIncrement[Preconditions.checkPositionIndex((int)index, (int)this.globalAggregatorIncrement.length)];
    }
}

