/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.store.parquet.columnreaders;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet.columnreaders.VarLengthColumn;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchOverflow;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;

public class VarLenBinaryReader {
    final ParquetRecordReader parentReader;
    final RecordBatchSizerManager batchSizer;
    final List<VarLengthColumn<? extends ValueVector>> columns;
    final List<VLColumnContainer> orderedColumns;
    private final Comparator<VLColumnContainer> comparator = new VLColumnComparator();
    final boolean useAsyncTasks;
    private final boolean useBulkReader;

    public VarLenBinaryReader(ParquetRecordReader parentReader, List<VarLengthColumn<? extends ValueVector>> columns) {
        this.parentReader = parentReader;
        this.batchSizer = parentReader.getBatchSizesMgr();
        this.columns = columns;
        this.orderedColumns = this.populateOrderedColumns();
        this.useAsyncTasks = parentReader.useAsyncColReader;
        this.useBulkReader = parentReader.useBulkReader();
    }

    public long readFields(long recordsToReadInThisPass) throws IOException {
        for (VarLengthColumn<? extends ValueVector> columnReader : this.columns) {
            columnReader.reset();
        }
        Stopwatch timer = Stopwatch.createStarted();
        recordsToReadInThisPass = Math.min(recordsToReadInThisPass, (long)this.batchSizer.getCurrentRecordsPerBatch());
        long recordsReadInCurrentPass = 0L;
        if (!this.useBulkReader) {
            recordsReadInCurrentPass = this.determineSizesSerial(recordsToReadInThisPass);
            if (this.useAsyncTasks) {
                this.readRecordsParallel(recordsReadInCurrentPass);
            } else {
                this.readRecordsSerial(recordsReadInCurrentPass);
            }
        } else {
            recordsReadInCurrentPass = this.readRecordsInBulk((int)recordsToReadInThisPass);
        }
        this.parentReader.getReadState().setValuesReadInCurrentPass((int)recordsReadInCurrentPass);
        this.parentReader.parquetReaderStats.timeVarColumnRead.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
        return recordsReadInCurrentPass;
    }

    private int readRecordsInBulk(int recordsToReadInThisPass) throws IOException {
        int batchNumRecords = recordsToReadInThisPass;
        ArrayList<RecordBatchSizerManager.VarLenColumnBatchStats> columnStats = new ArrayList<RecordBatchSizerManager.VarLenColumnBatchStats>(this.columns.size());
        int prevReadColumns = -1;
        boolean overflowCondition = false;
        for (VLColumnContainer vLColumnContainer : this.orderedColumns) {
            VarLengthColumn columnReader = vLColumnContainer.column;
            int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
            Preconditions.checkState(readColumns <= batchNumRecords, "Reader cannot return more values than requested..");
            if (!overflowCondition) {
                if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
                    overflowCondition = true;
                } else {
                    prevReadColumns = readColumns;
                }
            }
            columnStats.add(new RecordBatchSizerManager.VarLenColumnBatchStats(columnReader.valueVec, readColumns));
            if (batchNumRecords <= readColumns) continue;
            batchNumRecords = readColumns;
            ++vLColumnContainer.numCausedOverflows;
        }
        for (VarLengthColumn varLengthColumn : this.columns) {
            varLengthColumn.valuesReadInCurrentPass = batchNumRecords;
        }
        this.publishBatchStats(columnStats, batchNumRecords);
        if (overflowCondition) {
            this.handleColumnOverflow(columnStats, batchNumRecords);
        }
        return batchNumRecords;
    }

    private void handleColumnOverflow(List<RecordBatchSizerManager.VarLenColumnBatchStats> columnStats, int batchNumRecords) {
        RecordBatchOverflow.Builder builder = null;
        for (RecordBatchSizerManager.VarLenColumnBatchStats columnStat : columnStats) {
            if (columnStat.numValuesRead <= batchNumRecords || this.fieldHasAlreadyOverflowData(columnStat.vector.getField().getName())) continue;
            columnStat.vector.getMutator().setValueCount(batchNumRecords);
            if (builder == null) {
                builder = RecordBatchOverflow.newBuilder(this.parentReader.getOperatorContext().getAllocator(), this.batchSizer.getBatchStatsContext());
            }
            int numOverflowValues = columnStat.numValuesRead - batchNumRecords;
            builder.addFieldOverflow(columnStat.vector, batchNumRecords, numOverflowValues);
        }
        if (builder != null) {
            Map<String, RecordBatchSizerManager.FieldOverflowStateContainer> overflowContainerMap = this.parentReader.getBatchSizesMgr().getFieldOverflowMap();
            Map<String, RecordBatchOverflow.FieldOverflowDefinition> overflowDefMap = builder.build().getRecordOverflowDefinition().getFieldOverflowDefs();
            for (Map.Entry<String, RecordBatchOverflow.FieldOverflowDefinition> entry : overflowDefMap.entrySet()) {
                RecordBatchSizerManager.FieldOverflowStateContainer overflowStateContainer = new RecordBatchSizerManager.FieldOverflowStateContainer(entry.getValue(), null);
                overflowContainerMap.put(entry.getKey(), overflowStateContainer);
            }
        }
        this.reorderVLColumns();
    }

    private void reorderVLColumns() {
        Collections.sort(this.orderedColumns, this.comparator);
        if (this.batchSizer.getBatchStatsContext().isEnableBatchSzLogging()) {
            boolean isFirstValue = true;
            StringBuilder msg = new StringBuilder();
            msg.append(": Dumping the variable length columns read order: ");
            for (VLColumnContainer container : this.orderedColumns) {
                if (!isFirstValue) {
                    msg.append(", ");
                } else {
                    isFirstValue = false;
                }
                msg.append(((VLColumnContainer)container).column.valueVec.getField().getName());
            }
            msg.append('.');
            RecordBatchStats.logRecordBatchStats(msg.toString(), this.batchSizer.getBatchStatsContext());
        }
    }

    private boolean fieldHasAlreadyOverflowData(String field) {
        RecordBatchSizerManager.FieldOverflowStateContainer container = this.parentReader.getBatchSizesMgr().getFieldOverflowContainer(field);
        if (container == null) {
            return false;
        }
        if (container.overflowState == null || container.overflowState.isOverflowDataFullyConsumed()) {
            this.parentReader.getBatchSizesMgr().releaseFieldOverflowContainer(field);
            return false;
        }
        return true;
    }

    private void publishBatchStats(List<RecordBatchSizerManager.VarLenColumnBatchStats> stats, int batchNumRecords) {
        Map<String, RecordBatchSizerManager.FieldOverflowStateContainer> overflowMap = this.parentReader.getBatchSizesMgr().getFieldOverflowMap();
        for (RecordBatchSizerManager.FieldOverflowStateContainer container : overflowMap.values()) {
            RecordBatchSizerManager.FieldOverflowState overflowState = container.overflowState;
            if (overflowState == null) continue;
            overflowState.onNewBatchValuesConsumed(batchNumRecords);
        }
        this.parentReader.getBatchSizesMgr().onEndOfBatch(batchNumRecords, stats);
    }

    private long determineSizesSerial(long recordsToReadInThisPass) throws IOException {
        int recordsReadInCurrentPass = 0;
        block0: do {
            for (VarLengthColumn<? extends ValueVector> columnReader : this.columns) {
                if (!columnReader.determineSize(recordsReadInCurrentPass)) continue;
                break block0;
            }
            for (VarLengthColumn<? extends ValueVector> columnReader : this.columns) {
                columnReader.updateReadyToReadPosition();
                columnReader.currDefLevel = -1;
            }
        } while ((long)(++recordsReadInCurrentPass) < recordsToReadInThisPass);
        return recordsReadInCurrentPass;
    }

    private void readRecordsSerial(long recordsReadInCurrentPass) {
        for (VarLengthColumn<? extends ValueVector> columnReader : this.columns) {
            columnReader.readRecords(columnReader.pageReader.valuesReadyToRead);
        }
        for (VarLengthColumn<? extends ValueVector> columnReader : this.columns) {
            columnReader.valueVec.getMutator().setValueCount((int)recordsReadInCurrentPass);
        }
    }

    private void readRecordsParallel(long recordsReadInCurrentPass) {
        ArrayList<Future<Integer>> futures = Lists.newArrayList();
        for (VarLengthColumn<? extends ValueVector> columnReader : this.columns) {
            Future<Integer> future = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead);
            if (future == null) continue;
            futures.add(future);
        }
        Exception exception = null;
        for (Future future : futures) {
            if (exception != null) {
                future.cancel(true);
                continue;
            }
            try {
                future.get();
            }
            catch (Exception e) {
                future.cancel(true);
                exception = e;
            }
        }
        for (VarLengthColumn varLengthColumn : this.columns) {
            varLengthColumn.valueVec.getMutator().setValueCount((int)recordsReadInCurrentPass);
        }
    }

    protected void handleAndRaise(String s, Exception e) {
        String message = "Error in parquet record reader.\nMessage: " + s;
        throw new DrillRuntimeException(message, e);
    }

    private List<VLColumnContainer> populateOrderedColumns() {
        ArrayList<VLColumnContainer> result = new ArrayList<VLColumnContainer>(this.columns.size());
        for (VarLengthColumn<? extends ValueVector> column : this.columns) {
            result.add(new VLColumnContainer(column));
        }
        Collections.sort(result, this.comparator);
        return result;
    }

    private static final class VLColumnComparator
    implements Comparator<VLColumnContainer> {
        private VLColumnComparator() {
        }

        @Override
        public int compare(VLColumnContainer o1, VLColumnContainer o2) {
            assert (o1 != null && o2 != null);
            if (o1.numCausedOverflows == o2.numCausedOverflows) {
                return 0;
            }
            if (o1.numCausedOverflows < o2.numCausedOverflows) {
                return 1;
            }
            return -1;
        }
    }

    private static final class VLColumnContainer {
        private final VarLengthColumn<? extends ValueVector> column;
        private int numCausedOverflows;

        private VLColumnContainer(VarLengthColumn<? extends ValueVector> column) {
            this.column = column;
        }
    }
}

