/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.AllocatedSlot;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.taskmanager.TaskOperationResult;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;

public class Execution {
    private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
    private static final Logger LOG = ExecutionGraph.LOG;
    private static final int NUM_CANCEL_CALL_TRIES = 3;
    private final ExecutionVertex vertex;
    private final ExecutionAttemptID attemptId;
    private final long[] stateTimestamps;
    private final int attemptNumber;
    private volatile ExecutionState state = ExecutionState.CREATED;
    private volatile AllocatedSlot assignedResource;
    private volatile Throwable failureCause;

    public Execution(ExecutionVertex vertex, int attemptNumber, long startTimestamp) {
        Preconditions.checkNotNull((Object)vertex);
        Preconditions.checkArgument((attemptNumber >= 0 ? 1 : 0) != 0);
        this.vertex = vertex;
        this.attemptId = new ExecutionAttemptID();
        this.attemptNumber = attemptNumber;
        this.stateTimestamps = new long[ExecutionState.values().length];
        this.markTimestamp(ExecutionState.CREATED, startTimestamp);
    }

    public ExecutionVertex getVertex() {
        return this.vertex;
    }

    public ExecutionAttemptID getAttemptId() {
        return this.attemptId;
    }

    public int getAttemptNumber() {
        return this.attemptNumber;
    }

    public ExecutionState getState() {
        return this.state;
    }

    public AllocatedSlot getAssignedResource() {
        return this.assignedResource;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public long[] getStateTimestamps() {
        return this.stateTimestamps;
    }

    public long getStateTimestamp(ExecutionState state) {
        return this.stateTimestamps[state.ordinal()];
    }

    public boolean isFinished() {
        return this.state == ExecutionState.FINISHED || this.state == ExecutionState.FAILED || this.state == ExecutionState.CANCELED;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
        if (scheduler == null) {
            throw new NullPointerException();
        }
        SlotSharingGroup sharingGroup = this.vertex.getJobVertex().getSlotSharingGroup();
        CoLocationConstraint locationConstraint = this.vertex.getLocationConstraint();
        if (locationConstraint != null && sharingGroup == null) {
            throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
        }
        if (this.transitionState(ExecutionState.CREATED, ExecutionState.SCHEDULED)) {
            ScheduledUnit toSchedule;
            ScheduledUnit scheduledUnit = toSchedule = locationConstraint == null ? new ScheduledUnit(this, sharingGroup) : new ScheduledUnit(this, sharingGroup, locationConstraint);
            if (queued) {
                SlotAllocationFuture future = scheduler.scheduleQueued(toSchedule);
                future.setFutureAction(new SlotAllocationFutureAction(){

                    @Override
                    public void slotAllocated(AllocatedSlot slot) {
                        try {
                            Execution.this.deployToSlot(slot);
                        }
                        catch (Throwable t) {
                            try {
                                slot.releaseSlot();
                            }
                            finally {
                                Execution.this.markFailed(t);
                            }
                        }
                    }
                });
            } else {
                AllocatedSlot slot = scheduler.scheduleImmediately(toSchedule);
                try {
                    this.deployToSlot(slot);
                }
                catch (Throwable t) {
                    try {
                        slot.releaseSlot();
                    }
                    finally {
                        this.markFailed(t);
                    }
                }
            }
        } else {
            return;
        }
    }

    public void deployToSlot(final AllocatedSlot slot) throws JobException {
        if (slot == null) {
            throw new NullPointerException();
        }
        if (!slot.isAlive()) {
            throw new JobException("Traget slot for deployment is not alive.");
        }
        ExecutionState previous = this.state;
        if (previous == ExecutionState.SCHEDULED || previous == ExecutionState.CREATED) {
            if (!this.transitionState(previous, ExecutionState.DEPLOYING)) {
                throw new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");
            }
        } else {
            throw new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + (Object)((Object)previous));
        }
        try {
            if (!slot.setExecutedVertex(this)) {
                throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
            }
            this.assignedResource = slot;
            if (this.state != ExecutionState.DEPLOYING) {
                slot.releaseSlot();
                return;
            }
            if (LOG.isInfoEnabled()) {
                LOG.info(String.format("Deploying %s (attempt #%d) to %s", this.vertex.getSimpleName(), this.attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
            }
            final TaskDeploymentDescriptor deployment = this.vertex.createDeploymentDescriptor(this.attemptId, slot);
            this.vertex.getExecutionGraph().registerExecution(this);
            Runnable deployaction = new Runnable(){

                @Override
                public void run() {
                    try {
                        Instance instance = slot.getInstance();
                        TaskOperationResult result = instance.getTaskManagerProxy().submitTask(deployment);
                        if (result == null) {
                            Execution.this.markFailed(new Exception("Failed to deploy the task to slot " + slot + ": TaskOperationResult was null"));
                        } else if (!result.getExecutionId().equals(Execution.this.attemptId)) {
                            Execution.this.markFailed(new Exception("Answer execution id does not match the request execution id."));
                        } else if (result.isSuccess()) {
                            Execution.this.switchToRunning();
                        } else {
                            Execution.this.markFailed(new Exception("Failed to deploy the task " + Execution.this.getVertexWithAttempt() + " to slot " + slot + ": " + result.getDescription()));
                        }
                    }
                    catch (Throwable t) {
                        Execution.this.markFailed(t);
                    }
                }
            };
            this.vertex.execute(deployaction);
        }
        catch (Throwable t) {
            this.markFailed(t);
            ExceptionUtils.rethrow((Throwable)t);
        }
    }

    public void cancel() {
        ExecutionState current;
        block8: {
            while (true) {
                if ((current = this.state) == ExecutionState.CANCELING || current == ExecutionState.CANCELED) {
                    return;
                }
                if (current == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
                    if (!this.transitionState(current, ExecutionState.CANCELING)) continue;
                    this.sendCancelRpcCall();
                    return;
                }
                if (current == ExecutionState.FINISHED || current == ExecutionState.FAILED) {
                    return;
                }
                if (current != ExecutionState.CREATED && current != ExecutionState.SCHEDULED) break block8;
                if (this.transitionState(current, ExecutionState.CANCELED)) break;
            }
            this.markTimestamp(ExecutionState.CANCELING, this.getStateTimestamp(ExecutionState.CANCELED));
            try {
                this.vertex.executionCanceled();
            }
            finally {
                this.vertex.getExecutionGraph().deregisterExecution(this);
                if (this.assignedResource != null) {
                    this.assignedResource.releaseSlot();
                }
            }
            return;
        }
        throw new IllegalStateException(current.name());
    }

    public void fail(Throwable t) {
        this.processFail(t, false);
    }

    void markFailed(Throwable t) {
        this.processFail(t, true);
    }

    void markFinished() {
        ExecutionState current;
        while ((current = this.state) == ExecutionState.RUNNING || current == ExecutionState.DEPLOYING) {
            if (!this.transitionState(current, ExecutionState.FINISHED)) continue;
            try {
                this.assignedResource.releaseSlot();
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionFinished();
            }
            return;
        }
        if (current == ExecutionState.CANCELING) {
            this.cancelingComplete();
            return;
        }
        if (current == ExecutionState.CANCELED || current == ExecutionState.FAILED) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Task FINISHED, but concurrently went to state " + (Object)((Object)this.state));
            }
            return;
        }
        this.markFailed(new Exception("Vertex received FINISHED message while being in state " + (Object)((Object)this.state)));
    }

    void cancelingComplete() {
        ExecutionState current;
        block6: {
            do {
                if ((current = this.state) == ExecutionState.CANCELED) {
                    return;
                }
                if (current != ExecutionState.CANCELING && current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING) break block6;
            } while (!this.transitionState(current, ExecutionState.CANCELED));
            try {
                this.assignedResource.releaseSlot();
                this.vertex.getExecutionGraph().deregisterExecution(this);
            }
            finally {
                this.vertex.executionCanceled();
            }
            return;
        }
        if (current != ExecutionState.FAILED) {
            String message = String.format("Asynchronous race: Found state %s after successful cancel call.", new Object[]{this.state});
            LOG.error(message);
            this.vertex.getExecutionGraph().fail(new Exception(message));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processFail(Throwable t, boolean isCallback) {
        ExecutionState current;
        do {
            if ((current = this.state) == ExecutionState.FAILED) {
                return false;
            }
            if (current != ExecutionState.CANCELED) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug(String.format("Ignoring transition of vertex %s to %s while being %s", new Object[]{this.getVertexWithAttempt(), ExecutionState.FAILED, ExecutionState.CANCELED}));
            }
            return false;
        } while (!this.transitionState(current, ExecutionState.FAILED, t));
        this.failureCause = t;
        try {
            if (this.assignedResource != null) {
                this.assignedResource.releaseSlot();
            }
            this.vertex.getExecutionGraph().deregisterExecution(this);
        }
        finally {
            this.vertex.executionFailed(t);
        }
        if (!(isCallback || current != ExecutionState.RUNNING && current != ExecutionState.DEPLOYING)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Sending out cancel request, to remove task execution from TaskManager.");
            }
            try {
                if (this.assignedResource != null) {
                    this.sendCancelRpcCall();
                }
            }
            catch (Throwable tt) {
                LOG.error("Error triggering cancel call while marking task as failed.", tt);
            }
        }
        return true;
    }

    private boolean switchToRunning() {
        if (this.transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
            return true;
        }
        ExecutionState currentState = this.state;
        if (currentState != ExecutionState.FINISHED && currentState != ExecutionState.CANCELED) {
            if (currentState == ExecutionState.CANCELING || currentState == ExecutionState.FAILED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Concurrent canceling/failing of %s while deployment was in progress.", this.getVertexWithAttempt()));
                }
                this.sendCancelRpcCall();
            } else {
                String message = String.format("Concurrent unexpected state transition of task %s to %s while deployment was in progress.", new Object[]{this.getVertexWithAttempt(), currentState});
                if (LOG.isDebugEnabled()) {
                    LOG.debug(message);
                }
                this.sendCancelRpcCall();
                this.markFailed(new Exception(message));
            }
        }
        return false;
    }

    private void sendCancelRpcCall() {
        final AllocatedSlot slot = this.assignedResource;
        if (slot == null) {
            return;
        }
        Runnable cancelAction = new Runnable(){

            @Override
            public void run() {
                Throwable exception = null;
                for (int triesLeft = 3; triesLeft > 0; --triesLeft) {
                    try {
                        TaskOperationResult result = slot.getInstance().getTaskManagerProxy().cancelTask(Execution.this.attemptId);
                        if (!result.isSuccess() && LOG.isDebugEnabled()) {
                            LOG.debug("Cancel task call did not find task. Probably RPC call race.");
                        }
                        return;
                    }
                    catch (Throwable t) {
                        if (exception == null) {
                            exception = t;
                        }
                        LOG.error("Canceling vertex " + Execution.this.getVertexWithAttempt() + " failed (" + triesLeft + " tries left): " + t.getMessage(), t);
                        continue;
                    }
                }
                Execution.this.fail(new Exception("Task could not be canceled.", exception));
            }
        };
        this.vertex.execute(cancelAction);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
        return this.transitionState(currentState, targetState, null);
    }

    private boolean transitionState(ExecutionState currentState, ExecutionState targetState, Throwable error) {
        if (STATE_UPDATER.compareAndSet(this, currentState, targetState)) {
            this.markTimestamp(targetState);
            try {
                this.vertex.notifyStateTransition(this.attemptId, targetState, error);
            }
            catch (Throwable t) {
                LOG.error("Error while notifying execution graph of execution state trnsition.", t);
            }
            return true;
        }
        return false;
    }

    private void markTimestamp(ExecutionState state) {
        this.markTimestamp(state, System.currentTimeMillis());
    }

    private void markTimestamp(ExecutionState state, long timestamp) {
        this.stateTimestamps[state.ordinal()] = timestamp;
    }

    public String getVertexWithAttempt() {
        return this.vertex.getSimpleName() + " - execution #" + this.attemptNumber;
    }

    public String toString() {
        return String.format("Attempt #%d (%s) @ %s - [%s]", new Object[]{this.attemptNumber, this.vertex.getSimpleName(), this.assignedResource == null ? "(unassigned)" : this.assignedResource.toString(), this.state});
    }
}

