/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.planner.fragment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.planner.fragment.FragmentParallelizer;
import org.apache.drill.exec.planner.fragment.ParallelizationInfo;
import org.apache.drill.exec.planner.fragment.ParallelizationParameters;
import org.apache.drill.exec.planner.fragment.Stats;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HardAffinityFragmentParallelizer
implements FragmentParallelizer {
    private static final Logger logger = LoggerFactory.getLogger(HardAffinityFragmentParallelizer.class);
    public static final HardAffinityFragmentParallelizer INSTANCE = new HardAffinityFragmentParallelizer();
    private static String EOL = System.getProperty("line.separator");

    private HardAffinityFragmentParallelizer() {
    }

    @Override
    public void parallelizeFragment(Wrapper fragmentWrapper, ParallelizationParameters parameters, Collection<CoordinationProtos.DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
        Stats stats = fragmentWrapper.getStats();
        ParallelizationInfo pInfo = stats.getParallelizationInfo();
        int totalMaxWidth = 0;
        HashMap<CoordinationProtos.DrillbitEndpoint, EndpointAffinity> endpointPool = Maps.newHashMap();
        for (Map.Entry<CoordinationProtos.DrillbitEndpoint, EndpointAffinity> entry : pInfo.getEndpointAffinityMap().entrySet()) {
            if (!entry.getValue().isAssignmentRequired()) continue;
            endpointPool.put(entry.getKey(), entry.getValue());
            if ((totalMaxWidth += Math.min(parameters.getMaxWidthPerNode(), entry.getValue().getMaxWidth())) >= 0) continue;
            totalMaxWidth = Integer.MAX_VALUE;
        }
        int width = (int)Math.ceil(stats.getMaxCost() / (double)parameters.getSliceTarget());
        width = Math.max(endpointPool.size(), width);
        width = Math.max(1, Math.min(width, pInfo.getMaxWidth()));
        HardAffinityFragmentParallelizer.checkOrThrow(endpointPool.size() <= width, logger, "Number of mandatory endpoints ({}) that require an assignment is more than the allowed fragment max width ({}).", endpointPool.size(), pInfo.getMaxWidth());
        width = Math.max(1, Math.min(width, parameters.getMaxGlobalWidth()));
        HardAffinityFragmentParallelizer.checkOrThrow(endpointPool.size() <= width, logger, "Number of mandatory endpoints ({}) that require an assignment is more than the allowed global query width ({}).", endpointPool.size(), parameters.getMaxGlobalWidth());
        width = Math.max(1, Math.min(width, endpointPool.size() * parameters.getMaxWidthPerNode()));
        width = Math.min(totalMaxWidth, width);
        HashMap<CoordinationProtos.DrillbitEndpoint, Integer> endpoints = Maps.newHashMap();
        for (Map.Entry entry : endpointPool.entrySet()) {
            endpoints.put((CoordinationProtos.DrillbitEndpoint)entry.getKey(), 1);
        }
        int totalAssigned = endpoints.size();
        int remainingSlots = width - endpoints.size();
        while (remainingSlots > 0) {
            for (EndpointAffinity epAf : endpointPool.values()) {
                int moreAllocation = (int)Math.ceil(epAf.getAffinity() * (double)remainingSlots);
                int currentAssignments = (Integer)endpoints.get(epAf.getEndpoint());
                for (int i = 0; i < moreAllocation && totalAssigned < width && currentAssignments < parameters.getMaxWidthPerNode() && currentAssignments < epAf.getMaxWidth(); ++totalAssigned, ++currentAssignments, ++i) {
                }
                endpoints.put(epAf.getEndpoint(), currentAssignments);
            }
            int previousRemainingSlots = remainingSlots;
            if (previousRemainingSlots != (remainingSlots = width - totalAssigned)) continue;
            logger.error("Can't parallelize fragment: Every mandatory node has exhausted the maximum width per node limit." + EOL + "Endpoint pool: {}" + EOL + "Assignment so far: {}" + EOL + "Width: {}", new Object[]{endpointPool, endpoints, width});
            throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
        }
        ArrayList<CoordinationProtos.DrillbitEndpoint> assignedEndpoints = Lists.newArrayList();
        for (Map.Entry entry : endpoints.entrySet()) {
            for (int i = 0; i < (Integer)entry.getValue(); ++i) {
                assignedEndpoints.add((CoordinationProtos.DrillbitEndpoint)entry.getKey());
            }
        }
        fragmentWrapper.setWidth(width);
        fragmentWrapper.assignEndpoints(assignedEndpoints);
    }

    private static void checkOrThrow(boolean expr, Logger logger, String errMsg, Object ... args) throws PhysicalOperatorSetupException {
        if (!expr) {
            logger.error(errMsg, args);
            throw new PhysicalOperatorSetupException("Can not parallelize fragment.");
        }
    }
}

