/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.io.network.partition.PartitionException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ConsumerVertexGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingPipelinedRegion;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;

public class RestartPipelinedRegionFailoverStrategy
implements FailoverStrategy {
    private final SchedulingTopology topology;
    private final RegionFailoverResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;

    @VisibleForTesting
    public RestartPipelinedRegionFailoverStrategy(SchedulingTopology topology) {
        this(topology, resultPartitionID -> true);
    }

    public RestartPipelinedRegionFailoverStrategy(SchedulingTopology topology, ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {
        this.topology = (SchedulingTopology)Preconditions.checkNotNull((Object)topology);
        this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker(resultPartitionAvailabilityChecker, intermediateResultPartitionID -> topology.getResultPartition((IntermediateResultPartitionID)intermediateResultPartitionID).getResultType());
    }

    @Override
    public Set<ExecutionVertexID> getTasksNeedingRestart(ExecutionVertexID executionVertexId, Throwable cause) {
        SchedulingPipelinedRegion failedRegion = (SchedulingPipelinedRegion)this.topology.getPipelinedRegionOfVertex(executionVertexId);
        if (failedRegion == null) {
            throw new IllegalStateException("Can not find the failover region for task " + String.valueOf(executionVertexId), cause);
        }
        Optional dataConsumptionException = ExceptionUtils.findThrowable((Throwable)cause, PartitionException.class);
        if (dataConsumptionException.isPresent()) {
            this.resultPartitionAvailabilityChecker.markResultPartitionFailed(((PartitionException)dataConsumptionException.get()).getPartitionId().getPartitionId());
        }
        HashSet<ExecutionVertexID> tasksToRestart = new HashSet<ExecutionVertexID>();
        for (SchedulingPipelinedRegion region : this.getRegionsToRestart(failedRegion)) {
            for (SchedulingExecutionVertex vertex : region.getVertices()) {
                if (vertex.getState() == ExecutionState.CREATED) continue;
                tasksToRestart.add((ExecutionVertexID)vertex.getId());
            }
        }
        if (dataConsumptionException.isPresent()) {
            this.resultPartitionAvailabilityChecker.removeResultPartitionFromFailedState(((PartitionException)dataConsumptionException.get()).getPartitionId().getPartitionId());
        }
        return tasksToRestart;
    }

    private Set<SchedulingPipelinedRegion> getRegionsToRestart(SchedulingPipelinedRegion failedRegion) {
        Set<SchedulingPipelinedRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap());
        Set visitedRegions = Collections.newSetFromMap(new IdentityHashMap());
        Set<ConsumedPartitionGroup> visitedConsumedResultGroups = Collections.newSetFromMap(new IdentityHashMap());
        Set<ConsumerVertexGroup> visitedConsumerVertexGroups = Collections.newSetFromMap(new IdentityHashMap());
        ArrayDeque<SchedulingPipelinedRegion> regionsToVisit = new ArrayDeque<SchedulingPipelinedRegion>();
        visitedRegions.add(failedRegion);
        regionsToVisit.add(failedRegion);
        while (!regionsToVisit.isEmpty()) {
            SchedulingPipelinedRegion regionToRestart = (SchedulingPipelinedRegion)regionsToVisit.poll();
            regionsToRestart.add(regionToRestart);
            for (IntermediateResultPartitionID consumedPartitionId : this.getConsumedPartitionsToVisit(regionToRestart, visitedConsumedResultGroups)) {
                SchedulingResultPartition consumedPartition;
                SchedulingPipelinedRegion producerRegion;
                if (this.resultPartitionAvailabilityChecker.isAvailable(consumedPartitionId) || visitedRegions.contains(producerRegion = (SchedulingPipelinedRegion)this.topology.getPipelinedRegionOfVertex((ExecutionVertexID)((SchedulingExecutionVertex)(consumedPartition = this.topology.getResultPartition(consumedPartitionId)).getProducer()).getId()))) continue;
                visitedRegions.add(producerRegion);
                regionsToVisit.add(producerRegion);
            }
            for (ExecutionVertexID consumerVertexId : this.getConsumerVerticesToVisit(regionToRestart, visitedConsumerVertexGroups)) {
                SchedulingPipelinedRegion consumerRegion = (SchedulingPipelinedRegion)this.topology.getPipelinedRegionOfVertex(consumerVertexId);
                if (visitedRegions.contains(consumerRegion)) continue;
                visitedRegions.add(consumerRegion);
                regionsToVisit.add(consumerRegion);
            }
        }
        return regionsToRestart;
    }

    private Iterable<IntermediateResultPartitionID> getConsumedPartitionsToVisit(SchedulingPipelinedRegion regionToRestart, Set<ConsumedPartitionGroup> visitedConsumedResultGroups) {
        ArrayList<ConsumedPartitionGroup> consumedPartitionGroupsToVisit = new ArrayList<ConsumedPartitionGroup>();
        for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) {
            for (ConsumedPartitionGroup consumedPartitionGroup : vertex.getConsumedPartitionGroups()) {
                if (visitedConsumedResultGroups.contains(consumedPartitionGroup)) continue;
                visitedConsumedResultGroups.add(consumedPartitionGroup);
                consumedPartitionGroupsToVisit.add(consumedPartitionGroup);
            }
        }
        return IterableUtils.flatMap(consumedPartitionGroupsToVisit, Function.identity());
    }

    private Iterable<ExecutionVertexID> getConsumerVerticesToVisit(SchedulingPipelinedRegion regionToRestart, Set<ConsumerVertexGroup> visitedConsumerVertexGroups) {
        ArrayList<ConsumerVertexGroup> consumerVertexGroupsToVisit = new ArrayList<ConsumerVertexGroup>();
        for (SchedulingExecutionVertex vertex : regionToRestart.getVertices()) {
            for (SchedulingResultPartition producedPartition : vertex.getProducedResults()) {
                for (ConsumerVertexGroup consumerVertexGroup : producedPartition.getConsumerVertexGroups()) {
                    if (visitedConsumerVertexGroups.contains(consumerVertexGroup)) continue;
                    visitedConsumerVertexGroups.add(consumerVertexGroup);
                    consumerVertexGroupsToVisit.add(consumerVertexGroup);
                }
            }
        }
        return IterableUtils.flatMap(consumerVertexGroupsToVisit, Function.identity());
    }

    @VisibleForTesting
    public SchedulingPipelinedRegion getFailoverRegion(ExecutionVertexID vertexID) {
        return (SchedulingPipelinedRegion)this.topology.getPipelinedRegionOfVertex(vertexID);
    }

    public static class Factory
    implements FailoverStrategy.Factory {
        @Override
        public FailoverStrategy create(SchedulingTopology topology, ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {
            return new RestartPipelinedRegionFailoverStrategy(topology, resultPartitionAvailabilityChecker);
        }
    }

    private static class RegionFailoverResultPartitionAvailabilityChecker
    implements ResultPartitionAvailabilityChecker {
        private final ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker;
        private final HashSet<IntermediateResultPartitionID> failedPartitions;
        private final Function<IntermediateResultPartitionID, ResultPartitionType> resultPartitionTypeRetriever;

        RegionFailoverResultPartitionAvailabilityChecker(ResultPartitionAvailabilityChecker checker, Function<IntermediateResultPartitionID, ResultPartitionType> resultPartitionTypeRetriever) {
            this.resultPartitionAvailabilityChecker = (ResultPartitionAvailabilityChecker)Preconditions.checkNotNull((Object)checker);
            this.failedPartitions = new HashSet();
            this.resultPartitionTypeRetriever = (Function)Preconditions.checkNotNull(resultPartitionTypeRetriever);
        }

        @Override
        public boolean isAvailable(IntermediateResultPartitionID resultPartitionID) {
            return !this.failedPartitions.contains(resultPartitionID) && this.resultPartitionAvailabilityChecker.isAvailable(resultPartitionID) && this.isResultPartitionIsReConsumableOrPipelinedApproximate(resultPartitionID);
        }

        public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
            this.failedPartitions.add(resultPartitionID);
        }

        public void removeResultPartitionFromFailedState(IntermediateResultPartitionID resultPartitionID) {
            this.failedPartitions.remove(resultPartitionID);
        }

        private boolean isResultPartitionIsReConsumableOrPipelinedApproximate(IntermediateResultPartitionID resultPartitionID) {
            ResultPartitionType resultPartitionType = this.resultPartitionTypeRetriever.apply(resultPartitionID);
            return resultPartitionType.isReconsumable() || resultPartitionType == ResultPartitionType.PIPELINED_APPROXIMATE;
        }
    }
}

