/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hama.graph;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.directmemory.DirectMemory;
import org.apache.directmemory.cache.CacheService;
import org.apache.directmemory.serialization.Serializer;
import org.apache.directmemory.serialization.kryo.KryoSerializer;
import org.apache.directmemory.utils.CacheValuesIterable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.graph.GraphJobRunner;
import org.apache.hama.graph.IDSkippingIterator;
import org.apache.hama.graph.Vertex;
import org.apache.hama.graph.VerticesInfo;
import org.apache.hama.util.ReflectionUtils;

public class OffHeapVerticesInfo<V extends WritableComparable<?>, E extends Writable, M extends Writable>
implements VerticesInfo<V, E, M> {
    public static final String DM_STRICT_ITERATOR = "dm.iterator.strict";
    public static final String DM_BUFFERS = "dm.buffers";
    public static final String DM_SIZE = "dm.size";
    public static final String DM_CAPACITY = "dm.capacity";
    public static final String DM_CONCURRENCY = "dm.concurrency";
    public static final String DM_DISPOSAL_TIME = "dm.disposal.time";
    public static final String DM_SERIALIZER = "dm.serializer";
    public static final String DM_SORTED = "dm.sorted";
    private CacheService<V, Vertex<V, E, M>> vertices;
    private boolean strict;
    private GraphJobRunner<V, E, M> runner;

    @Override
    public void init(GraphJobRunner<V, E, M> runner, Configuration conf, TaskAttemptID attempt) throws IOException {
        this.runner = runner;
        this.strict = conf.getBoolean(DM_STRICT_ITERATOR, true);
        DirectMemory dm = new DirectMemory().setNumberOfBuffers(conf.getInt(DM_BUFFERS, 100)).setSize(conf.getInt(DM_SIZE, 102400)).setSerializer((Serializer)ReflectionUtils.newInstance((Class)conf.getClass(DM_SERIALIZER, KryoSerializer.class, Serializer.class))).setDisposalTime((long)conf.getInt(DM_DISPOSAL_TIME, 3600000));
        if (conf.getBoolean(DM_SORTED, true)) {
            dm.setMap(new ConcurrentSkipListMap());
        } else {
            dm.setInitialCapacity(conf.getInt(DM_CAPACITY, 1000)).setConcurrencyLevel(conf.getInt(DM_CONCURRENCY, 10));
        }
        this.vertices = dm.newCacheService();
    }

    @Override
    public void cleanup(Configuration conf, TaskAttemptID attempt) throws IOException {
        this.vertices.dump();
    }

    @Override
    public void addVertex(Vertex<V, E, M> vertex) {
        this.vertices.put(vertex.getVertexID(), vertex);
    }

    @Override
    public void finishAdditions() {
    }

    @Override
    public void startSuperstep() throws IOException {
    }

    @Override
    public void finishSuperstep() throws IOException {
    }

    @Override
    public void finishVertexComputation(Vertex<V, E, M> vertex) throws IOException {
        this.vertices.put(vertex.getVertexID(), vertex);
    }

    public void clear() {
        this.vertices.clear();
    }

    @Override
    public int size() {
        return (int)this.vertices.entries();
    }

    @Override
    public IDSkippingIterator<V, E, M> skippingIterator() {
        final Iterator vertexIterator = new CacheValuesIterable(this.vertices, this.strict).iterator();
        return new IDSkippingIterator<V, E, M>(){
            int currentIndex = 0;
            Vertex<V, E, M> currentVertex = null;

            @Override
            public boolean hasNext(V e, IDSkippingIterator.Strategy strat) {
                if ((long)this.currentIndex < OffHeapVerticesInfo.this.vertices.entries()) {
                    Vertex next = (Vertex)vertexIterator.next();
                    while (!strat.accept(next, (WritableComparable)e)) {
                        ++this.currentIndex;
                    }
                    this.currentVertex = next;
                    return true;
                }
                return false;
            }

            @Override
            public Vertex<V, E, M> next() {
                ++this.currentIndex;
                if (this.currentVertex.getRunner() == null) {
                    this.currentVertex.setRunner(OffHeapVerticesInfo.this.runner);
                }
                return this.currentVertex;
            }
        };
    }

    @Override
    public void removeVertex(V vertexID) {
        throw new UnsupportedOperationException("Not yet implemented");
    }

    @Override
    public void finishRemovals() {
        throw new UnsupportedOperationException("Not yet implemented");
    }
}

