/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.slf4j.Logger;

public class ExecutionJobVertex {
    private static final Logger LOG = ExecutionGraph.LOG;
    private final Object stateMonitor = new Object();
    private final ExecutionGraph graph;
    private final AbstractJobVertex jobVertex;
    private final ExecutionVertex[] taskVertices;
    private final IntermediateResult[] producedDataSets;
    private final List<IntermediateResult> inputs;
    private final int parallelism;
    private final boolean[] finishedSubtasks;
    private volatile int numSubtasksInFinalState;
    private final SlotSharingGroup slotSharingGroup;
    private final CoLocationGroup coLocationGroup;
    private final InputSplit[] inputSplits;
    private InputSplitAssigner splitAssigner;

    public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism) throws JobException {
        this(graph, jobVertex, defaultParallelism, System.currentTimeMillis());
    }

    public ExecutionJobVertex(ExecutionGraph graph, AbstractJobVertex jobVertex, int defaultParallelism, long createTimestamp) throws JobException {
        int i;
        int numTaskVertices;
        if (graph == null || jobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = graph;
        this.jobVertex = jobVertex;
        int vertexParallelism = jobVertex.getParallelism();
        this.parallelism = numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
        this.taskVertices = new ExecutionVertex[numTaskVertices];
        this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
        this.slotSharingGroup = jobVertex.getSlotSharingGroup();
        this.coLocationGroup = jobVertex.getCoLocationGroup();
        if (this.coLocationGroup != null && this.slotSharingGroup == null) {
            throw new JobException("Vertex uses a co-location constraint without using slot sharing");
        }
        this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
        for (i = 0; i < jobVertex.getProducedDataSets().size(); ++i) {
            IntermediateDataSet set = jobVertex.getProducedDataSets().get(i);
            this.producedDataSets[i] = new IntermediateResult(set.getId(), this, numTaskVertices);
        }
        for (i = 0; i < numTaskVertices; ++i) {
            ExecutionVertex vertex;
            this.taskVertices[i] = vertex = new ExecutionVertex(this, i, this.producedDataSets, createTimestamp);
        }
        for (IntermediateResult ir : this.producedDataSets) {
            if (ir.getNumberOfAssignedPartitions() == this.parallelism) continue;
            throw new RuntimeException("The intermediate result's partitions were not correctly assiged.");
        }
        try {
            InputSplitSource<?> splitSource = jobVertex.getInputSplitSource();
            if (splitSource != null) {
                this.inputSplits = splitSource.createInputSplits(numTaskVertices);
                this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
            } else {
                this.inputSplits = null;
                this.splitAssigner = null;
            }
        }
        catch (Throwable t) {
            throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
        }
        this.finishedSubtasks = new boolean[this.parallelism];
    }

    public ExecutionGraph getGraph() {
        return this.graph;
    }

    public AbstractJobVertex getJobVertex() {
        return this.jobVertex;
    }

    public int getParallelism() {
        return this.parallelism;
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    public ExecutionVertex[] getTaskVertices() {
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        return this.splitAssigner;
    }

    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        return this.inputs;
    }

    public boolean isInFinalState() {
        return this.numSubtasksInFinalState == this.parallelism;
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", this.jobVertex.getID(), this.jobVertex.getName(), inputs.size()));
        }
        for (int num = 0; num < inputs.size(); ++num) {
            IntermediateResult ires;
            JobEdge edge = inputs.get(num);
            if (LOG.isDebugEnabled()) {
                if (edge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSourceId()));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
                }
            }
            if ((ires = intermediateDataSets.get(edge.getSourceId())) == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + edge.getSourceId());
            }
            this.inputs.add(ires);
            int consumerIndex = ires.registerConsumer();
            for (int i = 0; i < this.parallelism; ++i) {
                ExecutionVertex ev = this.taskVertices[i];
                ev.connectSource(num, ires, edge, consumerIndex);
            }
        }
    }

    public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.scheduleForExecution(scheduler, queued);
        }
    }

    public void cancel() {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.cancel();
        }
    }

    public void fail(Throwable t) {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.fail(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
        Object object = this.stateMonitor;
        synchronized (object) {
            while (this.numSubtasksInFinalState < this.parallelism) {
                this.stateMonitor.wait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetForNewExecution() {
        if (this.numSubtasksInFinalState != 0 && this.numSubtasksInFinalState != this.parallelism) {
            throw new IllegalStateException("Cannot reset vertex that is not in final state");
        }
        Object object = this.stateMonitor;
        synchronized (object) {
            if (this.slotSharingGroup != null) {
                this.slotSharingGroup.clearTaskAssignment();
            }
            if (this.coLocationGroup != null) {
                this.coLocationGroup.resetConstraints();
            }
            for (int i = 0; i < this.parallelism; ++i) {
                this.taskVertices[i].resetForNewExecution();
                if (!this.finishedSubtasks[i]) continue;
                this.finishedSubtasks[i] = false;
                --this.numSubtasksInFinalState;
            }
            if (this.numSubtasksInFinalState != 0) {
                throw new RuntimeException("Bug: resetting the execution job vertex failed.");
            }
            try {
                if (this.inputSplits != null) {
                    InputSplitSource<?> splitSource = this.jobVertex.getInputSplitSource();
                    this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
                }
            }
            catch (Throwable t) {
                throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
            }
        }
    }

    void vertexFinished(int subtask) {
        this.subtaskInFinalState(subtask);
    }

    void vertexCancelled(int subtask) {
        this.subtaskInFinalState(subtask);
    }

    void vertexFailed(int subtask, Throwable error) {
        this.subtaskInFinalState(subtask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subtaskInFinalState(int subtask) {
        Object object = this.stateMonitor;
        synchronized (object) {
            if (!this.finishedSubtasks[subtask]) {
                this.finishedSubtasks[subtask] = true;
                if (this.numSubtasksInFinalState + 1 == this.parallelism) {
                    try {
                        this.getJobVertex().finalizeOnMaster(this.getGraph().getUserClassLoader());
                    }
                    catch (Throwable t) {
                        this.getGraph().fail(t);
                    }
                    ++this.numSubtasksInFinalState;
                    this.stateMonitor.notifyAll();
                    this.graph.jobVertexInFinalState(this);
                } else {
                    ++this.numSubtasksInFinalState;
                }
            }
        }
    }

    public void execute(Runnable action) {
        this.graph.execute(action);
    }
}

