/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.foreman.rm;

import java.util.concurrent.TimeUnit;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.work.foreman.rm.QueryQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DistributedQueryQueue
implements QueryQueue {
    private static final Logger logger = LoggerFactory.getLogger(DistributedQueryQueue.class);
    private long memoryPerNode;
    private final SystemOptionManager optionManager;
    private ConfigSet configSet;
    private final ClusterCoordinator clusterCoordinator;
    private long nextRefreshTime;
    private long memoryPerSmallQuery;
    private long memoryPerLargeQuery;
    private final StatusAdapter statusAdapter;

    public DistributedQueryQueue(DrillbitContext context, StatusAdapter adapter) {
        this.statusAdapter = adapter;
        this.optionManager = context.getOptionManager();
        this.clusterCoordinator = context.getClusterCoordinator();
    }

    @Override
    public void setMemoryPerNode(long memoryPerNode) {
        this.memoryPerNode = memoryPerNode;
        this.refreshConfig();
    }

    @Override
    public long defaultQueryMemoryPerNode(double cost) {
        return cost < (double)this.configSet.queueThreshold ? this.memoryPerSmallQuery : this.memoryPerLargeQuery;
    }

    @Override
    public long minimumOperatorMemory() {
        return this.configSet.minimumOperatorMemory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public QueryQueue.QueueLease enqueue(UserBitShared.QueryId queryId, double cost) throws QueryQueue.QueryQueueException, QueryQueue.QueueTimeoutException {
        long queryMemory;
        String queueName;
        DistributedSemaphore.DistributedLease lease = null;
        try {
            DistributedSemaphore distributedSemaphore;
            DistributedQueryQueue distributedQueryQueue = this;
            synchronized (distributedQueryQueue) {
                this.refreshConfig();
                if (cost >= (double)this.configSet.queueThreshold) {
                    distributedSemaphore = this.clusterCoordinator.getSemaphore("query.large", this.configSet.largeQueueSize);
                    queueName = "large";
                    queryMemory = this.memoryPerLargeQuery;
                } else {
                    distributedSemaphore = this.clusterCoordinator.getSemaphore("query.small", this.configSet.smallQueueSize);
                    queueName = "small";
                    queryMemory = this.memoryPerSmallQuery;
                }
            }
            logger.debug("Query {} with cost {} placed into the {} queue.", new Object[]{QueryIdHelper.getQueryId(queryId), cost, queueName});
            lease = distributedSemaphore.acquire(this.configSet.queueTimeout, TimeUnit.MILLISECONDS);
        }
        catch (Exception e) {
            logger.error("Unable to acquire slot for query " + QueryIdHelper.getQueryId(queryId), (Throwable)e);
            throw new QueryQueue.QueryQueueException("Unable to acquire slot for query.", e);
        }
        if (lease == null) {
            int timeoutSecs = (int)Math.round((double)this.configSet.queueTimeout / 1000.0);
            logger.warn("Queue timeout: {} after {} ms. ({} seconds)", new Object[]{queueName, String.format("%,d", this.configSet.queueTimeout), timeoutSecs});
            throw new QueryQueue.QueueTimeoutException(queryId, queueName, this.configSet.queueTimeout);
        }
        return new DistributedQueueLease(queryId, queueName, lease, queryMemory);
    }

    private synchronized void refreshConfig() {
        long now = System.currentTimeMillis();
        if (now < this.nextRefreshTime) {
            return;
        }
        this.nextRefreshTime = now + 5000L;
        ConfigSet newSet = new ConfigSet(this.optionManager);
        if (newSet.isSameAs(this.configSet)) {
            return;
        }
        this.configSet = newSet;
        double totalUnits = this.configSet.largeToSmallRatio * (double)this.configSet.largeQueueSize + (double)this.configSet.smallQueueSize;
        double availableMemory = Math.round((double)this.memoryPerNode * (1.0 - this.configSet.reserveMemoryRatio));
        double memoryUnit = availableMemory / totalUnits;
        this.memoryPerLargeQuery = Math.round(memoryUnit * this.configSet.largeToSmallRatio);
        this.memoryPerSmallQuery = Math.round(memoryUnit);
        logger.debug("Memory config: total memory per node = {}, available: {},  large/small memory ratio = {}", new Object[]{this.memoryPerNode, availableMemory, this.configSet.largeToSmallRatio});
        logger.debug("Reserve memory ratio: {}, bytes: {}", (Object)this.configSet.reserveMemoryRatio, (Object)((double)this.memoryPerNode - availableMemory));
        logger.debug("Minimum operator memory: {}", (Object)this.configSet.minimumOperatorMemory);
        logger.debug("Small queue: {} slots, {} bytes per slot", (Object)this.configSet.smallQueueSize, (Object)this.memoryPerSmallQuery);
        logger.debug("Large queue: {} slots, {} bytes per slot", (Object)this.configSet.largeQueueSize, (Object)this.memoryPerLargeQuery);
        logger.debug("Cost threshold: {}, timeout: {} ms.", (Object)this.configSet.queueThreshold, (Object)this.configSet.queueTimeout);
    }

    @Override
    public boolean enabled() {
        return true;
    }

    public synchronized ZKQueueInfo getInfo() {
        this.refreshConfig();
        return new ZKQueueInfo(this);
    }

    private void release(QueryQueue.QueueLease lease) {
        DistributedQueueLease theLease = (DistributedQueueLease)lease;
        while (true) {
            try {
                theLease.lease.close();
                theLease.lease = null;
                break;
            }
            catch (InterruptedException interruptedException) {
            }
            catch (Exception e) {
                logger.warn("Failure while releasing lease.", (Throwable)e);
                break;
            }
            if (!this.inShutdown()) continue;
            logger.warn("In shutdown mode: abandoning attempt to release lease");
        }
    }

    private boolean inShutdown() {
        if (this.statusAdapter == null) {
            return false;
        }
        return this.statusAdapter.inShutDown();
    }

    @Override
    public void close() {
    }

    private static class ConfigSet {
        private final long queueThreshold;
        private final int queueTimeout;
        private final int largeQueueSize;
        private final int smallQueueSize;
        private final double largeToSmallRatio;
        private final double reserveMemoryRatio;
        private final long minimumOperatorMemory;

        public ConfigSet(SystemOptionManager optionManager) {
            this.queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
            this.queueTimeout = (int)optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
            this.largeQueueSize = (int)optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
            this.smallQueueSize = (int)optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
            this.largeToSmallRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO);
            this.reserveMemoryRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE);
            this.minimumOperatorMemory = optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
        }

        public boolean isSameAs(ConfigSet otherSet) {
            if (otherSet == null) {
                return false;
            }
            return this.queueThreshold == otherSet.queueThreshold && this.queueTimeout == otherSet.queueTimeout && this.largeQueueSize == otherSet.largeQueueSize && this.smallQueueSize == otherSet.smallQueueSize && this.largeToSmallRatio == otherSet.largeToSmallRatio && this.reserveMemoryRatio == otherSet.reserveMemoryRatio && this.minimumOperatorMemory == otherSet.minimumOperatorMemory;
        }
    }

    public static interface StatusAdapter {
        public boolean inShutDown();
    }

    private class DistributedQueueLease
    implements QueryQueue.QueueLease {
        private final UserBitShared.QueryId queryId;
        private DistributedSemaphore.DistributedLease lease;
        private final String queueName;
        private final long queryMemory;

        public DistributedQueueLease(UserBitShared.QueryId queryId, String queueName, DistributedSemaphore.DistributedLease lease, long queryMemory) {
            this.queryId = queryId;
            this.queueName = queueName;
            this.lease = lease;
            this.queryMemory = queryMemory;
        }

        public String toString() {
            return String.format("Lease for %s queue to query %s", this.queueName, QueryIdHelper.getQueryId(this.queryId));
        }

        @Override
        public long queryMemoryPerNode() {
            return this.queryMemory;
        }

        @Override
        public void release() {
            DistributedQueryQueue.this.release(this);
        }

        @Override
        public String queueName() {
            return this.queueName;
        }
    }

    @XmlRootElement
    public static class ZKQueueInfo {
        public final int smallQueueSize;
        public final int largeQueueSize;
        public final double queueThreshold;
        public final long memoryPerNode;
        public final long memoryPerSmallQuery;
        public final long memoryPerLargeQuery;

        public ZKQueueInfo(DistributedQueryQueue queue) {
            this.smallQueueSize = queue.configSet.smallQueueSize;
            this.largeQueueSize = queue.configSet.largeQueueSize;
            this.queueThreshold = queue.configSet.queueThreshold;
            this.memoryPerNode = queue.memoryPerNode;
            this.memoryPerSmallQuery = queue.memoryPerSmallQuery;
            this.memoryPerLargeQuery = queue.memoryPerLargeQuery;
        }
    }
}

