/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.drill.exec.rpc.user;

import java.sql.SQLTimeoutException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import oadd.org.apache.drill.common.exceptions.UserException;
import oadd.org.apache.drill.exec.proto.UserBitShared;
import oadd.org.apache.drill.exec.proto.helper.QueryIdHelper;
import oadd.org.apache.drill.exec.rpc.ConnectionThrottle;
import oadd.org.apache.drill.exec.rpc.user.QueryDataBatch;
import oadd.org.apache.drill.exec.rpc.user.UserResultsListener;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BlockingResultsListener
implements UserResultsListener {
    private static final Logger logger = LoggerFactory.getLogger(BlockingResultsListener.class);
    private static final AtomicInteger NEXT_INSTANCE_ID = new AtomicInteger(1);
    private final int instanceId;
    private final int batchQueueThrottlingThreshold;
    private volatile UserBitShared.QueryId queryId;
    private int lastReceivedBatchNumber;
    private int lastDequeuedBatchNumber;
    private volatile UserException executionFailureException;
    private volatile boolean completed;
    private final AtomicBoolean throttled = new AtomicBoolean(false);
    private volatile ConnectionThrottle throttle;
    private volatile boolean closed;
    private final CountDownLatch firstMessageReceived = new CountDownLatch(1);
    private final LinkedBlockingDeque<QueryDataBatch> batchQueue = Queues.newLinkedBlockingDeque();
    private final Supplier<Stopwatch> elapsedTimer;
    private final Supplier<Long> timeoutInMilliseconds;

    public BlockingResultsListener(Supplier<Stopwatch> elapsedTimer, Supplier<Long> timeoutInMilliseconds, int batchQueueThrottlingThreshold) {
        this.elapsedTimer = elapsedTimer;
        this.timeoutInMilliseconds = timeoutInMilliseconds;
        this.instanceId = NEXT_INSTANCE_ID.getAndIncrement();
        this.batchQueueThrottlingThreshold = batchQueueThrottlingThreshold;
        logger.debug("[#{}] Query listener created.", (Object)this.instanceId);
    }

    private boolean startThrottlingIfNot(ConnectionThrottle throttle) {
        boolean started = this.throttled.compareAndSet(false, true);
        if (started) {
            this.throttle = throttle;
            throttle.setAutoRead(false);
        }
        return started;
    }

    private boolean stopThrottlingIfSo() {
        boolean stopped = this.throttled.compareAndSet(true, false);
        if (stopped) {
            this.throttle.setAutoRead(true);
            this.throttle = null;
        }
        return stopped;
    }

    public void awaitFirstMessage() throws InterruptedException, SQLTimeoutException {
        if (this.timeoutInMilliseconds.get() > 0L) {
            long timeToTimeout = this.timeoutInMilliseconds.get() - this.elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS);
            if (timeToTimeout <= 0L || !this.firstMessageReceived.await(timeToTimeout, TimeUnit.MILLISECONDS)) {
                throw new SQLTimeoutException("Query timed out in " + TimeUnit.MILLISECONDS.toSeconds(this.timeoutInMilliseconds.get()) + " seconds");
            }
        } else {
            this.firstMessageReceived.await();
        }
    }

    private void releaseIfFirst() {
        this.firstMessageReceived.countDown();
    }

    @Override
    public void queryIdArrived(UserBitShared.QueryId queryId) {
        logger.debug("[#{}] Received query ID: {}.", (Object)this.instanceId, (Object)QueryIdHelper.getQueryId(queryId));
        this.queryId = queryId;
    }

    @Override
    public void submissionFailed(UserException ex) {
        logger.debug("Received query failure: {} {}", (Object)this.instanceId, (Object)ex);
        this.executionFailureException = ex;
        this.completed = true;
        this.close();
        logger.info("[#{}] Query failed: ", (Object)this.instanceId, (Object)ex);
    }

    @Override
    public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) {
        ++this.lastReceivedBatchNumber;
        logger.debug("[#{}] Received query data batch #{}: {}.", this.instanceId, this.lastReceivedBatchNumber, result);
        if (this.closed) {
            result.release();
            this.completed = true;
            return;
        }
        this.batchQueue.add(result);
        if (this.batchQueue.size() > this.batchQueueThrottlingThreshold && this.startThrottlingIfNot(throttle)) {
            logger.debug("[#{}] Throttling started at queue size {}.", (Object)this.instanceId, (Object)this.batchQueue.size());
        }
        this.releaseIfFirst();
    }

    @Override
    public void queryCompleted(UserBitShared.QueryResult.QueryState state) {
        logger.debug("[#{}] Received query completion: {}.", (Object)this.instanceId, (Object)state);
        this.releaseIfFirst();
        this.completed = true;
    }

    public UserBitShared.QueryId getQueryId() {
        return this.queryId;
    }

    public QueryDataBatch getNext() throws UserException, InterruptedException, SQLTimeoutException {
        do {
            if (this.executionFailureException != null) {
                logger.debug("[#{}] Dequeued query failure exception: {}.", (Object)this.instanceId, (Object)this.executionFailureException);
                throw this.executionFailureException;
            }
            if (this.completed && this.batchQueue.isEmpty()) {
                return null;
            }
            QueryDataBatch qdb = this.batchQueue.poll(50L, TimeUnit.MILLISECONDS);
            if (qdb == null) continue;
            ++this.lastDequeuedBatchNumber;
            logger.debug("[#{}] Dequeued query data batch #{}: {}.", this.instanceId, this.lastDequeuedBatchNumber, qdb);
            if ((this.batchQueue.size() < this.batchQueueThrottlingThreshold / 2 || this.batchQueue.size() == 0) && this.stopThrottlingIfSo()) {
                logger.debug("[#{}] Throttling stopped at queue size {}.", (Object)this.instanceId, (Object)this.batchQueue.size());
            }
            return qdb;
        } while (this.timeoutInMilliseconds.get() <= 0L || this.elapsedTimer.get().elapsed(TimeUnit.MILLISECONDS) < this.timeoutInMilliseconds.get());
        throw new SQLTimeoutException("Query timed out in " + TimeUnit.MILLISECONDS.toSeconds(this.timeoutInMilliseconds.get()) + " seconds");
    }

    public void close() {
        logger.debug("[#{}] Query listener closing.", (Object)this.instanceId);
        this.closed = true;
        if (this.stopThrottlingIfSo()) {
            logger.debug("[#{}] Throttling stopped at close() (at queue size {}).", (Object)this.instanceId, (Object)this.batchQueue.size());
        }
        while (!this.batchQueue.isEmpty()) {
            QueryDataBatch qdb = this.batchQueue.poll();
            if (qdb == null || qdb.getData() == null) continue;
            qdb.getData().release();
        }
        this.releaseIfFirst();
        this.completed = true;
    }

    public boolean isCompleted() {
        return this.completed;
    }
}

