/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.planner.fragment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.common.util.function.CheckedConsumer;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Receiver;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.Materializer;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.QueryParallelizer;
import org.apache.drill.exec.planner.fragment.StatsCollector;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.QueryOptionManager;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SimpleParallelizer
implements QueryParallelizer {
    static final Logger logger = LoggerFactory.getLogger(SimpleParallelizer.class);
    private final long parallelizationThreshold;
    private final int maxWidthPerNode;
    private final int maxGlobalWidth;
    private final double affinityFactor;
    private boolean enableDynamicFC;

    protected SimpleParallelizer(QueryContext context) {
        QueryOptionManager optionManager = context.getOptions();
        long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION);
        this.parallelizationThreshold = sliceTarget > 0L ? sliceTarget : 1L;
        double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
        long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
        this.maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average, maxWidth);
        this.maxGlobalWidth = optionManager.getOption((String)"planner.width.max_per_query").num_val.intValue();
        this.affinityFactor = optionManager.getOption((String)"planner.affinity_factor").float_val.intValue();
        this.enableDynamicFC = optionManager.getBoolean("exec.enable_dynamic_fc");
    }

    protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
        this.parallelizationThreshold = parallelizationThreshold;
        this.maxWidthPerNode = maxWidthPerNode;
        this.maxGlobalWidth = maxGlobalWidth;
        this.affinityFactor = affinityFactor;
    }

    @Override
    public long getSliceTarget() {
        return this.parallelizationThreshold;
    }

    @Override
    public int getMaxWidthPerNode() {
        return this.maxWidthPerNode;
    }

    @Override
    public int getMaxGlobalWidth() {
        return this.maxGlobalWidth;
    }

    @Override
    public double getAffinityFactor() {
        return this.affinityFactor;
    }

    public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
        HashSet<Wrapper> roots = Sets.newHashSet();
        for (Wrapper w : planningSet) {
            roots.add(w);
        }
        for (Wrapper wrapper : planningSet) {
            List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
            if (fragmentDependencies == null || fragmentDependencies.size() <= 0) continue;
            for (Wrapper dependency : fragmentDependencies) {
                if (!roots.contains(dependency)) continue;
                roots.remove(dependency);
            }
        }
        return roots;
    }

    public PlanningSet prepareFragmentTree(Fragment rootFragment) {
        PlanningSet planningSet = new PlanningSet();
        this.initFragmentWrappers(rootFragment, planningSet);
        this.constructFragmentDependencyGraph(rootFragment, planningSet);
        return planningSet;
    }

    public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots, Collection<CoordinationProtos.DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
        for (Wrapper wrapper : roots) {
            this.traverse(wrapper, CheckedConsumer.throwingConsumerWrapper(fragmentWrapper -> {
                if (fragmentWrapper.isEndpointsAssignmentDone()) {
                    return;
                }
                fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
                fragmentWrapper.getStats().getDistributionAffinity().getFragmentParallelizer().parallelizeFragment((Wrapper)fragmentWrapper, this, activeEndpoints);
                fragmentWrapper.computeCpuResources();
            }));
        }
    }

    public abstract void adjustMemory(PlanningSet var1, Set<Wrapper> var2, Collection<CoordinationProtos.DrillbitEndpoint> var3) throws PhysicalOperatorSetupException;

    @Override
    public final QueryWorkUnit generateWorkUnit(OptionList options, CoordinationProtos.DrillbitEndpoint foremanNode, UserBitShared.QueryId queryId, Collection<CoordinationProtos.DrillbitEndpoint> activeEndpoints, Fragment rootFragment, UserSession session, BitControl.QueryContextInformation queryContextInfo) throws ExecutionSetupException {
        PlanningSet planningSet = this.prepareFragmentTree(rootFragment);
        Set<Wrapper> rootFragments = this.getRootFragments(planningSet);
        this.collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
        this.adjustMemory(planningSet, rootFragments, activeEndpoints);
        return this.generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
    }

    public List<QueryWorkUnit> getSplitFragments(OptionList options, CoordinationProtos.DrillbitEndpoint foremanNode, UserBitShared.QueryId queryId, Collection<CoordinationProtos.DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment, UserSession session, BitControl.QueryContextInformation queryContextInfo) throws ExecutionSetupException {
        throw new UnsupportedOperationException("Use children classes");
    }

    @VisibleForTesting
    public void initFragmentWrappers(Fragment rootFragment, PlanningSet planningSet) {
        planningSet.get(rootFragment);
        for (Fragment.ExchangeFragmentPair fragmentPair : rootFragment) {
            this.initFragmentWrappers(fragmentPair.getNode(), planningSet);
        }
    }

    private void constructFragmentDependencyGraph(Fragment rootFragment, PlanningSet planningSet) {
        for (Wrapper currentFragment : planningSet) {
            Fragment.ExchangeFragmentPair sendingXchgForCurrFrag = currentFragment.getNode().getSendingExchangePair();
            if (sendingXchgForCurrFrag == null) continue;
            Exchange.ParallelizationDependency dependency = sendingXchgForCurrFrag.getExchange().getParallelizationDependency();
            Wrapper receivingFragmentWrapper = planningSet.get(sendingXchgForCurrFrag.getNode());
            if (dependency == Exchange.ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) {
                receivingFragmentWrapper.addFragmentDependency(currentFragment);
                continue;
            }
            if (dependency != Exchange.ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) continue;
            currentFragment.addFragmentDependency(receivingFragmentWrapper);
        }
        planningSet.findRootWrapper(rootFragment);
    }

    protected void traverse(Wrapper fragmentWrapper, Consumer<Wrapper> operation) throws PhysicalOperatorSetupException {
        List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
        if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
            for (Wrapper dependency : fragmentDependencies) {
                this.traverse(dependency, operation);
            }
        }
        operation.accept(fragmentWrapper);
    }

    protected abstract BiFunction<CoordinationProtos.DrillbitEndpoint, PhysicalOperator, Long> getMemory();

    protected QueryWorkUnit generateWorkUnit(OptionList options, CoordinationProtos.DrillbitEndpoint foremanNode, UserBitShared.QueryId queryId, Fragment rootNode, PlanningSet planningSet, UserSession session, BitControl.QueryContextInformation queryContextInfo) throws ExecutionSetupException {
        ArrayList<QueryWorkUnit.MinorFragmentDefn> fragmentDefns = new ArrayList<QueryWorkUnit.MinorFragmentDefn>();
        QueryWorkUnit.MinorFragmentDefn rootFragmentDefn = null;
        FragmentRoot rootOperator = null;
        for (Wrapper wrapper : planningSet) {
            boolean isRootNode;
            Fragment node = wrapper.getNode();
            PhysicalOperator physicalOperatorRoot = node.getRoot();
            boolean bl = isRootNode = rootNode == node;
            if (isRootNode && wrapper.getWidth() != 1) {
                throw new ForemanSetupException(String.format("Failure while trying to setup fragment. The root fragment must always have parallelization one. In the current case, the width was set to %d.", wrapper.getWidth()));
            }
            boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0;
            for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); ++minorFragmentId) {
                Materializer.IndexedFragmentNode iNode = new Materializer.IndexedFragmentNode(minorFragmentId, wrapper, (fragmentWrapper, minorFragment) -> fragmentWrapper.getAssignedEndpoint((int)minorFragment), this.getMemory());
                wrapper.resetAllocation();
                PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode);
                Preconditions.checkArgument(op instanceof FragmentRoot);
                FragmentRoot root = (FragmentRoot)op;
                ExecProtos.FragmentHandle handle = ExecProtos.FragmentHandle.newBuilder().setMajorFragmentId(wrapper.getMajorFragmentId()).setMinorFragmentId(minorFragmentId).setQueryId(queryId).build();
                BitControl.PlanFragment fragment = BitControl.PlanFragment.newBuilder().setForeman(foremanNode).setHandle(handle).setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)).setLeafFragment(isLeafFragment).setContext(queryContextInfo).setMemInitial(wrapper.getInitialAllocation()).setMemMax(wrapper.getMaxAllocation()).setCredentials(session.getCredentials()).addAllCollector(CountRequiredFragments.getCollectors(root, this.enableDynamicFC)).build();
                QueryWorkUnit.MinorFragmentDefn fragmentDefn = new QueryWorkUnit.MinorFragmentDefn(fragment, root, options);
                if (isRootNode) {
                    logger.debug("Root fragment:\n {}", (Object)DrillStringUtils.unescapeJava(fragment.toString()));
                    rootFragmentDefn = fragmentDefn;
                    rootOperator = root;
                    continue;
                }
                logger.debug("Remote fragment:\n {}", (Object)DrillStringUtils.unescapeJava(fragment.toString()));
                fragmentDefns.add(fragmentDefn);
            }
        }
        Wrapper rootWrapper = planningSet.getRootWrapper();
        return new QueryWorkUnit(rootOperator, rootFragmentDefn, fragmentDefns, rootWrapper);
    }

    protected static class CountRequiredFragments
    extends AbstractPhysicalVisitor<Void, List<BitControl.Collector>, RuntimeException> {
        private boolean enableDynamicFC;

        CountRequiredFragments(boolean enableDynamicFC) {
            this.enableDynamicFC = enableDynamicFC;
        }

        public static List<BitControl.Collector> getCollectors(PhysicalOperator root, boolean enableDynamicFC) {
            ArrayList<BitControl.Collector> collectors = Lists.newArrayList();
            CountRequiredFragments countRequiredFragments = new CountRequiredFragments(enableDynamicFC);
            root.accept(countRequiredFragments, collectors);
            return collectors;
        }

        @Override
        public Void visitReceiver(Receiver receiver, List<BitControl.Collector> collectors) throws RuntimeException {
            List<MinorFragmentEndpoint> endpoints = receiver.getProvidingEndpoints();
            ArrayList<Integer> list = new ArrayList<Integer>(endpoints.size());
            for (MinorFragmentEndpoint ep : endpoints) {
                list.add(ep.getId());
            }
            collectors.add(BitControl.Collector.newBuilder().setIsSpooling(receiver.isSpooling()).setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId()).setSupportsOutOfOrder(receiver.supportsOutOfOrderExchange()).setEnableDynamicFc(this.enableDynamicFC).addAllIncomingMinorFragment(list).build());
            return null;
        }

        @Override
        public Void visitOp(PhysicalOperator op, List<BitControl.Collector> collectors) throws RuntimeException {
            for (PhysicalOperator o : op) {
                o.accept(this, collectors);
            }
            return null;
        }
    }
}

