/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocator;
import org.apache.flink.runtime.scheduler.ExecutionVertexSchedulingRequirements;
import org.apache.flink.runtime.scheduler.InputsLocationsRetriever;
import org.apache.flink.runtime.scheduler.SlotExecutionVertexAssignment;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultExecutionSlotAllocator
implements ExecutionSlotAllocator {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutionSlotAllocator.class);
    private final Map<ExecutionVertexID, SlotExecutionVertexAssignment> pendingSlotAssignments;
    private final SlotProvider slotProvider;
    private final InputsLocationsRetriever inputsLocationsRetriever;
    private final Time allocationTimeout;

    public DefaultExecutionSlotAllocator(SlotProvider slotProvider, InputsLocationsRetriever inputsLocationsRetriever, Time allocationTimeout) {
        this.slotProvider = (SlotProvider)Preconditions.checkNotNull((Object)slotProvider);
        this.inputsLocationsRetriever = (InputsLocationsRetriever)Preconditions.checkNotNull((Object)inputsLocationsRetriever);
        this.allocationTimeout = (Time)Preconditions.checkNotNull((Object)allocationTimeout);
        this.pendingSlotAssignments = new HashMap<ExecutionVertexID, SlotExecutionVertexAssignment>();
    }

    @Override
    public Collection<SlotExecutionVertexAssignment> allocateSlotsFor(Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
        ArrayList<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = new ArrayList<SlotExecutionVertexAssignment>(executionVertexSchedulingRequirements.size());
        Set<AllocationID> allPreviousAllocationIds = DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(executionVertexSchedulingRequirements);
        for (ExecutionVertexSchedulingRequirements schedulingRequirements : executionVertexSchedulingRequirements) {
            ExecutionVertexID executionVertexId = schedulingRequirements.getExecutionVertexId();
            SlotRequestId slotRequestId = new SlotRequestId();
            SlotSharingGroupId slotSharingGroupId = schedulingRequirements.getSlotSharingGroupId();
            LOG.debug("Allocate slot with id {} for execution {}", (Object)slotRequestId, (Object)executionVertexId);
            CompletionStage slotFuture = DefaultExecutionSlotAllocator.calculatePreferredLocations(executionVertexId, schedulingRequirements.getPreferredLocations(), this.inputsLocationsRetriever).thenCompose(preferredLocations -> this.slotProvider.allocateSlot(slotRequestId, new ScheduledUnit(executionVertexId.getJobVertexId(), slotSharingGroupId, schedulingRequirements.getCoLocationConstraint()), new SlotProfile(schedulingRequirements.getResourceProfile(), (Collection<TaskManagerLocation>)preferredLocations, (Collection<AllocationID>)Arrays.asList(schedulingRequirements.getPreviousAllocationId()), allPreviousAllocationIds), true, this.allocationTimeout));
            SlotExecutionVertexAssignment slotExecutionVertexAssignment = new SlotExecutionVertexAssignment(executionVertexId, (CompletableFuture<LogicalSlot>)slotFuture);
            this.pendingSlotAssignments.put(executionVertexId, slotExecutionVertexAssignment);
            ((CompletableFuture)slotFuture).whenComplete((ignored, throwable) -> {
                this.pendingSlotAssignments.remove(executionVertexId);
                if (throwable != null) {
                    this.slotProvider.cancelSlotRequest(slotRequestId, slotSharingGroupId, (Throwable)throwable);
                }
            });
            slotExecutionVertexAssignments.add(slotExecutionVertexAssignment);
        }
        return slotExecutionVertexAssignments;
    }

    @Override
    public void cancel(ExecutionVertexID executionVertexId) {
        SlotExecutionVertexAssignment slotExecutionVertexAssignment = this.pendingSlotAssignments.get(executionVertexId);
        if (slotExecutionVertexAssignment != null) {
            slotExecutionVertexAssignment.getLogicalSlotFuture().cancel(false);
        }
    }

    @Override
    public CompletableFuture<Void> stop() {
        ArrayList<ExecutionVertexID> executionVertexIds = new ArrayList<ExecutionVertexID>(this.pendingSlotAssignments.keySet());
        executionVertexIds.forEach(this::cancel);
        return CompletableFuture.completedFuture(null);
    }

    private static CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(ExecutionVertexID executionVertexId, Collection<TaskManagerLocation> preferredLocationsBasedOnState, InputsLocationsRetriever inputsLocationsRetriever) {
        if (!preferredLocationsBasedOnState.isEmpty()) {
            return CompletableFuture.completedFuture(preferredLocationsBasedOnState);
        }
        return DefaultExecutionSlotAllocator.getPreferredLocationsBasedOnInputs(executionVertexId, inputsLocationsRetriever);
    }

    @VisibleForTesting
    static CompletableFuture<Collection<TaskManagerLocation>> getPreferredLocationsBasedOnInputs(ExecutionVertexID executionVertexId, InputsLocationsRetriever inputsLocationsRetriever) {
        CompletionStage<Collection<TaskManagerLocation>> preferredLocations = CompletableFuture.completedFuture(Collections.emptyList());
        ArrayList locationsFutures = new ArrayList();
        Collection<Collection<ExecutionVertexID>> allProducers = inputsLocationsRetriever.getConsumedResultPartitionsProducers(executionVertexId);
        for (Collection<ExecutionVertexID> producers : allProducers) {
            for (ExecutionVertexID producer : producers) {
                Optional<CompletableFuture<TaskManagerLocation>> optionalLocationFuture = inputsLocationsRetriever.getTaskManagerLocation(producer);
                optionalLocationFuture.ifPresent(locationsFutures::add);
                if (locationsFutures.size() <= 8) continue;
                locationsFutures.clear();
                break;
            }
            CompletionStage uniqueLocationsFuture = FutureUtils.combineAll(locationsFutures).thenApply(HashSet::new);
            preferredLocations = preferredLocations.thenCombine(uniqueLocationsFuture, (locationsOnOneEdge, locationsOnAnotherEdge) -> {
                if (!locationsOnOneEdge.isEmpty() && locationsOnAnotherEdge.size() > locationsOnOneEdge.size() || locationsOnAnotherEdge.isEmpty()) {
                    return locationsOnOneEdge;
                }
                return locationsOnAnotherEdge;
            });
            locationsFutures.clear();
        }
        return preferredLocations;
    }

    @VisibleForTesting
    static Set<AllocationID> computeAllPriorAllocationIds(Collection<ExecutionVertexSchedulingRequirements> executionVertexSchedulingRequirements) {
        return executionVertexSchedulingRequirements.stream().map(ExecutionVertexSchedulingRequirements::getPreviousAllocationId).filter(Objects::nonNull).collect(Collectors.toSet());
    }

    @VisibleForTesting
    int getNumberOfPendingSlotAssignments() {
        return this.pendingSlotAssignments.size();
    }
}

