/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.foreman.rm;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.fragment.QueryParallelizer;
import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.rm.AbstractResourceManager;
import org.apache.drill.exec.work.foreman.rm.QueryQueue;
import org.apache.drill.exec.work.foreman.rm.QueryResourceAllocator;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThrottledResourceManager
extends AbstractResourceManager {
    private static final Logger logger = LoggerFactory.getLogger(ThrottledResourceManager.class);
    private final QueryQueue queue;

    public ThrottledResourceManager(DrillbitContext drillbitContext, QueryQueue queue) {
        super(drillbitContext);
        this.queue = queue;
        queue.setMemoryPerNode(this.memoryPerNode());
    }

    public long minimumOperatorMemory() {
        return this.queue.minimumOperatorMemory();
    }

    public long defaultQueryMemoryPerNode(double cost) {
        return this.queue.defaultQueryMemoryPerNode(cost);
    }

    public QueryQueue queue() {
        return this.queue;
    }

    @Override
    public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) {
        return new QueuedResourceAllocator(this, queryContext);
    }

    @Override
    public QueryResourceManager newQueryRM(Foreman foreman) {
        return new QueuedQueryResourceManager(this, foreman);
    }

    @Override
    public void close() {
        this.queue.close();
    }

    public static class QueuedResourceAllocator
    implements QueryResourceAllocator {
        protected final ThrottledResourceManager rm;
        protected QueryContext queryContext;
        protected PhysicalPlan plan;
        protected QueryWorkUnit work;
        protected double queryCost;

        protected QueuedResourceAllocator(ThrottledResourceManager rm, QueryContext queryContext) {
            this.rm = rm;
            this.queryContext = queryContext;
        }

        @Override
        public void visitAbstractPlan(PhysicalPlan plan) {
            this.plan = plan;
            this.queryCost = plan.totalCost();
        }

        @Override
        public void visitPhysicalPlan(QueryWorkUnit work) {
            this.work = work;
            this.planMemory();
        }

        private void planMemory() {
            if (this.plan.getProperties().hasResourcePlan) {
                logger.debug("Memory already planned.");
                return;
            }
            Map<String, Collection<PhysicalOperator>> nodeMap = this.buildBufferedOpMap();
            int width = this.countBufferingOperators(nodeMap);
            for (Map.Entry<String, Collection<PhysicalOperator>> entry : nodeMap.entrySet()) {
                this.planNodeMemory(entry.getKey(), entry.getValue(), width);
            }
        }

        public QueryContext getQueryContext() {
            return this.queryContext;
        }

        private int countBufferingOperators(Map<String, Collection<PhysicalOperator>> nodeMap) {
            int width = 0;
            for (Collection<PhysicalOperator> fragSorts : nodeMap.values()) {
                width = Math.max(width, fragSorts.size());
            }
            return width;
        }

        private void planNodeMemory(String nodeAddr, Collection<PhysicalOperator> bufferedOps, int width) {
            long perOpMemory;
            if (bufferedOps.isEmpty()) {
                return;
            }
            long nodeMemory = this.queryMemoryPerNode();
            long preferredOpMemory = nodeMemory / (long)width;
            if (preferredOpMemory < (perOpMemory = Math.max(preferredOpMemory, this.rm.minimumOperatorMemory()))) {
                logger.warn("Preferred per-operator memory: {}, actual amount: {}", (Object)preferredOpMemory, (Object)perOpMemory);
            }
            logger.debug("Query: {}, Node: {}, allocating {} bytes each for {} buffered operator(s).", new Object[]{QueryIdHelper.getQueryId(this.queryContext.getQueryId()), nodeAddr, perOpMemory, width});
            for (PhysicalOperator op : bufferedOps) {
                long alloc = Math.min(perOpMemory, op.getMaxAllocation());
                if ((alloc = Math.max(alloc, op.getInitialAllocation())) > preferredOpMemory && alloc != perOpMemory) {
                    logger.warn("Allocated memory of {} for {} exceeds available memory of {} due to operator minimum", new Object[]{alloc, op.getClass().getSimpleName(), preferredOpMemory});
                } else if (alloc < preferredOpMemory) {
                    logger.warn("Allocated memory of {} for {} is less than available memory of {} due to operator limit", new Object[]{alloc, op.getClass().getSimpleName(), preferredOpMemory});
                }
                op.setMaxAllocation(alloc);
            }
        }

        protected long queryMemoryPerNode() {
            return this.rm.defaultQueryMemoryPerNode(this.plan.totalCost());
        }

        private Map<String, Collection<PhysicalOperator>> buildBufferedOpMap() {
            ArrayListMultimap<String, PhysicalOperator> map = ArrayListMultimap.create();
            this.getBufferedOps(map, this.work.getRootFragmentDefn());
            for (QueryWorkUnit.MinorFragmentDefn defn : this.work.getMinorFragmentDefns()) {
                this.getBufferedOps(map, defn);
            }
            return map.asMap();
        }

        private void getBufferedOps(Multimap<String, PhysicalOperator> map, QueryWorkUnit.MinorFragmentDefn defn) {
            List<PhysicalOperator> bufferedOps = this.getBufferedOps(defn.root());
            if (!bufferedOps.isEmpty()) {
                map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps);
            }
        }

        private List<PhysicalOperator> getBufferedOps(FragmentRoot root) {
            ArrayList<PhysicalOperator> bufferedOps = new ArrayList<PhysicalOperator>();
            BufferedOpFinder finder = new BufferedOpFinder();
            root.accept(finder, bufferedOps);
            return bufferedOps;
        }

        protected static class BufferedOpFinder
        extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
            protected BufferedOpFinder() {
            }

            @Override
            public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value) throws RuntimeException {
                if (op.isBufferedOperator(null)) {
                    value.add(op);
                }
                this.visitChildren(op, value);
                return null;
            }
        }
    }

    public static class QueuedQueryResourceManager
    extends QueuedResourceAllocator
    implements QueryResourceManager {
        private final Foreman foreman;
        private QueryQueue.QueueLease lease;

        public QueuedQueryResourceManager(ThrottledResourceManager rm, Foreman foreman) {
            super(rm, foreman.getQueryContext());
            this.foreman = foreman;
        }

        @Override
        public void setCost(double cost) {
            this.queryCost = cost;
        }

        @Override
        public QueryParallelizer getParallelizer(boolean planHasMemory) {
            return new QueueQueryParallelizer(true, this.getQueryContext());
        }

        @Override
        public void admit() throws QueryQueue.QueueTimeoutException, QueryQueue.QueryQueueException {
            this.lease = this.rm.queue().enqueue(this.foreman.getQueryId(), this.queryCost);
        }

        @Override
        protected long queryMemoryPerNode() {
            if (this.lease == null) {
                return super.queryMemoryPerNode();
            }
            return this.lease.queryMemoryPerNode();
        }

        @Override
        public void exit() {
            if (this.lease != null) {
                this.lease.release();
            }
            this.lease = null;
        }

        @Override
        public boolean hasQueue() {
            return true;
        }

        @Override
        public String queueName() {
            return this.lease == null ? null : this.lease.queueName();
        }
    }
}

