/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.parquet.columnreaders;

import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.store.parquet.columnreaders.ColumnReader;
import org.apache.drill.exec.store.parquet.columnreaders.PageReader;
import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AsyncPageReader
extends PageReader {
    static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
    private ExecutorService threadPool;
    private long queueSize;
    private LinkedBlockingQueue<ReadStatus> pageQueue;
    private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
    private long totalPageValuesRead = 0L;
    private final Object pageQueueSyncronize = new Object();

    AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
        super(parentStatus, fs, path);
        this.threadPool = this.parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
        this.queueSize = this.parentColumnReader.parentReader.readQueueSize;
        this.pageQueue = new LinkedBlockingQueue((int)this.queueSize);
        this.asyncPageRead = new ConcurrentLinkedQueue();
    }

    @Override
    protected void init() throws IOException {
        super.init();
        if (!this.parentColumnReader.isShuttingDown) {
            this.asyncPageRead.offer(ExecutorServiceUtil.submit(this.threadPool, new AsyncPageReaderTask(this.debugName, this.pageQueue)));
        }
    }

    protected void loadDictionary(ReadStatus readStatus) throws IOException {
        assert (readStatus.isDictionaryPage());
        assert (this.dictionary == null);
        this.dictData = this.codecName == CompressionCodecName.UNCOMPRESSED ? readStatus.getPageData() : this.decompressPageV1(readStatus);
        DictionaryPage page = new DictionaryPage(AsyncPageReader.asBytesInput(this.dictData, 0, this.pageHeader.uncompressed_page_size), this.pageHeader.uncompressed_page_size, this.pageHeader.dictionary_page_header.num_values, Encoding.valueOf((String)this.pageHeader.dictionary_page_header.encoding.name()));
        this.dictionary = page.getEncoding().initDictionary(this.columnDescriptor, page);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
        Stopwatch timer = Stopwatch.createUnstarted();
        PageHeader pageHeader = readStatus.getPageHeader();
        int inputSize = pageHeader.getCompressed_page_size();
        int outputSize = pageHeader.getUncompressed_page_size();
        long start = this.dataReader.getPos();
        DrillBuf inputPageData = readStatus.getPageData();
        DrillBuf outputPageData = this.allocator.buffer(outputSize);
        try {
            timer.start();
            CompressionCodecName codecName = this.columnChunkMetaData.getCodec();
            CompressionCodecFactory.BytesInputDecompressor decomp = this.codecFactory.getDecompressor(codecName);
            ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
            ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
            decomp.decompress(input, inputSize, output, outputSize);
            outputPageData.writerIndex(outputSize);
            long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
            if (logger.isTraceEnabled()) {
                logger.trace("Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}", new Object[]{this.columnChunkMetaData.toString(), this.dataReader.getPos(), outputSize, ByteBufUtil.hexDump(outputPageData)});
            }
            this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
        }
        finally {
            readStatus.setPageData(null);
            if (inputPageData != null) {
                inputPageData.release();
            }
        }
        return outputPageData;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
        Stopwatch timer = Stopwatch.createUnstarted();
        PageHeader pageHeader = readStatus.getPageHeader();
        int inputSize = pageHeader.getCompressed_page_size();
        int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
        int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
        int compDataOffset = repLevelSize + defLevelSize;
        int outputSize = pageHeader.uncompressed_page_size;
        long start = this.dataReader.getPos();
        DrillBuf inputPageData = readStatus.getPageData();
        DrillBuf outputPageData = this.allocator.buffer(outputSize);
        try {
            timer.start();
            outputPageData.setBytes(0, inputPageData, compDataOffset);
            CompressionCodecName codecName = this.columnChunkMetaData.getCodec();
            CompressionCodecFactory.BytesInputDecompressor decomp = this.codecFactory.getDecompressor(codecName);
            ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
            ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
            decomp.decompress(input, inputSize - compDataOffset, output, outputSize - compDataOffset);
            outputPageData.writerIndex(outputSize);
            long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
            if (logger.isTraceEnabled()) {
                logger.trace("Col: {}  readPos: {}  Uncompressed_size: {}  pageData: {}", new Object[]{this.columnChunkMetaData.toString(), this.dataReader.getPos(), outputSize, ByteBufUtil.hexDump(outputPageData)});
            }
            this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
        }
        finally {
            readStatus.setPageData(null);
            if (inputPageData != null) {
                inputPageData.release();
            }
        }
        return outputPageData;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ReadStatus nextPageFromQueue() throws InterruptedException, ExecutionException {
        ReadStatus readStatus;
        Stopwatch timer = Stopwatch.createStarted();
        OperatorStats opStats = this.parentColumnReader.parentReader.getOperatorContext().getStats();
        opStats.startWait();
        try {
            this.waitForExecutionResult();
            Object object = this.pageQueueSyncronize;
            synchronized (object) {
                boolean pageQueueFull = this.pageQueue.remainingCapacity() == 0;
                readStatus = this.pageQueue.take();
                if (readStatus == ReadStatus.EMPTY) {
                    throw new DrillRuntimeException("Unexpected end of data");
                }
                if (!this.parentColumnReader.isShuttingDown && pageQueueFull) {
                    this.asyncPageRead.offer(ExecutorServiceUtil.submit(this.threadPool, new AsyncPageReaderTask(this.debugName, this.pageQueue)));
                }
            }
        }
        finally {
            opStats.stopWait();
        }
        long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
        this.stats.timeDiskScanWait.addAndGet(timeBlocked);
        this.stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
        if (readStatus.isDictionaryPage) {
            this.stats.numDictPageLoads.incrementAndGet();
            this.stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
        } else {
            this.stats.numDataPageLoads.incrementAndGet();
            this.stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
        }
        return readStatus;
    }

    @Override
    protected void nextInternal() throws IOException {
        try {
            ReadStatus readStatus = this.nextPageFromQueue();
            this.pageHeader = readStatus.getPageHeader();
            if (this.pageHeader.uncompressed_page_size == 0) {
                logger.info("skipping a {} of size {} because its uncompressed size is 0 bytes.", (Object)this.pageHeader.getType(), (Object)this.pageHeader.compressed_page_size);
                this.skip(this.pageHeader.compressed_page_size);
                Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
                return;
            }
            switch (this.pageHeader.getType()) {
                case DICTIONARY_PAGE: {
                    this.loadDictionary(readStatus);
                    break;
                }
                case DATA_PAGE: {
                    this.pageData = this.codecName == CompressionCodecName.UNCOMPRESSED ? readStatus.getPageData() : this.decompressPageV1(readStatus);
                    break;
                }
                case DATA_PAGE_V2: {
                    this.pageData = this.codecName == CompressionCodecName.UNCOMPRESSED ? readStatus.getPageData() : this.decompressPageV2(readStatus);
                    break;
                }
                default: {
                    logger.warn("skipping page of type {} of size {}", (Object)this.pageHeader.getType(), (Object)this.pageHeader.compressed_page_size);
                    this.skip(this.pageHeader.compressed_page_size);
                    Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
                    break;
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (RuntimeException e) {
            this.throwUserException(e, "Error reading page data");
        }
        catch (Exception e) {
            this.throwUserException(e, "Error reading page data");
        }
    }

    private void waitForExecutionResult() throws InterruptedException, ExecutionException {
        this.asyncPageRead.peek().get();
        this.asyncPageRead.poll();
    }

    @Override
    public void clear() {
        while (this.asyncPageRead != null && !this.asyncPageRead.isEmpty()) {
            try {
                Future<Void> f = this.asyncPageRead.poll();
                if (!f.isDone() && !f.isCancelled()) {
                    f.cancel(true);
                    continue;
                }
                f.get(1L, TimeUnit.MILLISECONDS);
            }
            catch (RuntimeException f) {
            }
            catch (Exception f) {}
        }
        while (!this.pageQueue.isEmpty()) {
            ReadStatus r = null;
            try {
                r = this.pageQueue.poll();
                if (r != ReadStatus.EMPTY) continue;
                break;
            }
            catch (Exception e) {
                logger.error(e.getMessage());
            }
            finally {
                if (r == null || r.pageData == null) continue;
                r.pageData.release();
            }
        }
        super.clear();
    }

    private class AsyncPageReaderTask
    implements Callable<Void> {
        private final AsyncPageReader parent;
        private final LinkedBlockingQueue<ReadStatus> queue;
        private final String name;

        public AsyncPageReaderTask(String name, LinkedBlockingQueue<ReadStatus> queue) {
            this.parent = AsyncPageReader.this;
            this.name = name;
            this.queue = queue;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Void call() throws IOException {
            int compressedSize;
            PageHeader pageHeader;
            DrillBuf pageData;
            long totalValuesCount;
            Stopwatch timer;
            long totalValuesRead;
            long valuesRead;
            long bytesRead;
            ReadStatus readStatus;
            block26: {
                readStatus = new ReadStatus();
                bytesRead = 0L;
                valuesRead = 0L;
                totalValuesRead = this.parent.totalPageValuesRead;
                timer = Stopwatch.createStarted();
                totalValuesCount = this.parent.columnChunkMetaData.getValueCount();
                logger.trace("[{}]: Total Values COUNT {}  Total Values READ {} ", new Object[]{this.name, totalValuesCount, totalValuesRead});
                if (totalValuesRead >= totalValuesCount) {
                    try {
                        this.queue.put(ReadStatus.EMPTY);
                        try {
                            this.parent.inputStream.close();
                        }
                        catch (IOException e) {
                            logger.trace("[{}]: Failure while closing InputStream", (Object)this.name, (Object)e);
                        }
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                }
                pageData = null;
                timer.reset();
                pageHeader = Util.readPageHeader((InputStream)this.parent.dataReader);
                compressedSize = pageHeader.getCompressed_page_size();
                if (!this.parent.parentColumnReader.isShuttingDown) break block26;
                Void void_ = null;
                return void_;
            }
            try {
                pageData = this.parent.dataReader.getNext(compressedSize);
                bytesRead = compressedSize;
                Object object = this.parent;
                synchronized (object) {
                    PageType type = pageHeader.getType() == null ? PageType.DATA_PAGE : pageHeader.getType();
                    switch (type) {
                        case DICTIONARY_PAGE: {
                            readStatus.setIsDictionaryPage(true);
                            valuesRead += (long)pageHeader.getDictionary_page_header().getNum_values();
                            break;
                        }
                        case DATA_PAGE_V2: {
                            this.parent.totalPageValuesRead += (valuesRead += (long)pageHeader.getData_page_header_v2().getNum_values());
                            break;
                        }
                        case DATA_PAGE: {
                            this.parent.totalPageValuesRead += (valuesRead += (long)pageHeader.getData_page_header().getNum_values());
                            break;
                        }
                        default: {
                            throw UserException.unsupportedError().message("Page type is not supported yet: " + type, new Object[0]).build(logger);
                        }
                    }
                    long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
                    readStatus.setPageHeader(pageHeader);
                    readStatus.setPageData(pageData);
                    readStatus.setBytesRead(bytesRead);
                    readStatus.setValuesRead(valuesRead);
                    readStatus.setDiskScanTime(timeToRead);
                    assert (totalValuesRead <= totalValuesCount);
                }
                object = this.parent.pageQueueSyncronize;
                synchronized (object) {
                    this.queue.put(readStatus);
                    if (!AsyncPageReader.this.parentColumnReader.isShuttingDown && this.queue.remainingCapacity() > 0) {
                        AsyncPageReader.this.asyncPageRead.offer(ExecutorServiceUtil.submit(this.parent.threadPool, new AsyncPageReaderTask(AsyncPageReader.this.debugName, this.queue)));
                    }
                }
            }
            catch (InterruptedException e) {
                if (pageData != null) {
                    pageData.release();
                }
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                if (pageData != null) {
                    pageData.release();
                }
                this.parent.throwUserException(e, "Exception occurred while reading from disk.");
            }
            return null;
        }
    }

    public static class ReadStatus {
        private PageHeader pageHeader;
        private DrillBuf pageData;
        private boolean isDictionaryPage = false;
        private long bytesRead = 0L;
        private long valuesRead = 0L;
        private long diskScanTime = 0L;
        public static final ReadStatus EMPTY = new ReadStatus();

        public synchronized PageHeader getPageHeader() {
            return this.pageHeader;
        }

        public synchronized void setPageHeader(PageHeader pageHeader) {
            this.pageHeader = pageHeader;
        }

        public synchronized DrillBuf getPageData() {
            return this.pageData;
        }

        public synchronized void setPageData(DrillBuf pageData) {
            this.pageData = pageData;
        }

        public synchronized boolean isDictionaryPage() {
            return this.isDictionaryPage;
        }

        public synchronized void setIsDictionaryPage(boolean isDictionaryPage) {
            this.isDictionaryPage = isDictionaryPage;
        }

        public synchronized long getBytesRead() {
            return this.bytesRead;
        }

        public synchronized void setBytesRead(long bytesRead) {
            this.bytesRead = bytesRead;
        }

        public synchronized long getValuesRead() {
            return this.valuesRead;
        }

        public synchronized void setValuesRead(long valuesRead) {
            this.valuesRead = valuesRead;
        }

        public synchronized long getDiskScanTime() {
            return this.diskScanTime;
        }

        public synchronized void setDiskScanTime(long diskScanTime) {
            this.diskScanTime = diskScanTime;
        }
    }
}

