/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.common;

import com.carrotsearch.hppc.IntArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.cache.VectorSerializer;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
import org.apache.drill.exec.physical.impl.join.HashJoinHelper;
import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ObjectVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashPartition
implements HashJoinMemoryCalculator.PartitionStat {
    static final Logger logger = LoggerFactory.getLogger(HashPartition.class);
    public static final String HASH_VALUE_COLUMN_NAME = "$Hash_Values$";
    private int partitionNum = -1;
    private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8;
    private final int maxColumnWidth = 8;
    public static final TypeProtos.MajorType HVtype = TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.INT).setMode(TypeProtos.DataMode.REQUIRED).build();
    private ArrayList<VectorContainer> containers;
    private final List<VectorContainer> tmpBatchesList;
    private VectorContainer currentBatch;
    private IntVector currHVVector;
    private HashJoinHelper hjHelper;
    private HashTable hashTable;
    private VectorSerializer.Writer writer;
    private int partitionBatchesCount;
    private String spillFile;
    private final BufferAllocator allocator;
    private int recordsPerBatch;
    private final SpillSet spillSet;
    private boolean isSpilled;
    private boolean processingOuter;
    private boolean outerBatchAllocNotNeeded;
    private final RecordBatch buildBatch;
    private final RecordBatch probeBatch;
    private final int cycleNum;
    private final int numPartitions;
    private final List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
    private long partitionInMemorySize;
    private long numInMemoryRecords;
    private boolean updatedRecordsPerBatch;
    private final boolean semiJoin;

    public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable, RecordBatch buildBatch, RecordBatch probeBatch, boolean semiJoin, int recordsPerBatch, SpillSet spillSet, int partNum, int cycleNum, int numPartitions) {
        this.allocator = allocator;
        this.buildBatch = buildBatch;
        this.probeBatch = probeBatch;
        this.recordsPerBatch = recordsPerBatch;
        this.spillSet = spillSet;
        this.partitionNum = partNum;
        this.cycleNum = cycleNum;
        this.numPartitions = numPartitions;
        this.semiJoin = semiJoin;
        try {
            this.hashTable = baseHashTable.createAndSetupHashTable(null);
            this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
            this.tmpBatchesList = new ArrayList<VectorContainer>();
            if (numPartitions > 1) {
                this.allocateNewCurrentBatchAndHV();
            }
        }
        catch (ClassTransformationException e) {
            throw UserException.unsupportedError(e).message("Code generation error - likely an error in the code.", new Object[0]).build(logger);
        }
        catch (IOException e) {
            throw UserException.resourceError(e).message("IO Error while creating a hash table.", new Object[0]).build(logger);
        }
        catch (SchemaChangeException sce) {
            throw new IllegalStateException("Unexpected Schema Change while creating a hash table", sce);
        }
        catch (OutOfMemoryException oom) {
            this.close();
            throw UserException.memoryError(oom).message("Failed to allocate hash partition.", new Object[0]).build(logger);
        }
    }

    public void updateProbeRecordsPerBatch(int newRecordsPerBatch) {
        Preconditions.checkArgument(newRecordsPerBatch > 0);
        Preconditions.checkState(!this.updatedRecordsPerBatch);
        Preconditions.checkState(this.processingOuter);
        this.recordsPerBatch = newRecordsPerBatch;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private VectorContainer allocateNewVectorContainer(RecordBatch rb) {
        VectorContainer newVC = new VectorContainer();
        VectorContainer fromVC = rb.getContainer();
        Iterator<VectorWrapper<?>> vci = fromVC.iterator();
        boolean success = false;
        try {
            while (vci.hasNext()) {
                VectorWrapper<?> vw = vci.next();
                if (this.cycleNum > 0 && !vci.hasNext()) break;
                Object vv = vw.getValueVector();
                ValueVector newVV = TypeHelper.getNewVector(vv.getField(), this.allocator);
                newVC.add(newVV);
                if (newVV instanceof FixedWidthVector) {
                    ((FixedWidthVector)newVV).allocateNew(this.recordsPerBatch);
                    continue;
                }
                if (newVV instanceof VariableWidthVector) {
                    ((VariableWidthVector)newVV).allocateNew(8 * this.recordsPerBatch, this.recordsPerBatch);
                    continue;
                }
                if (newVV instanceof ObjectVector) {
                    ((ObjectVector)newVV).allocateNew(this.recordsPerBatch);
                    continue;
                }
                newVV.allocateNew();
            }
            newVC.setRecordCount(0);
            success = true;
        }
        finally {
            if (!success) {
                newVC.clear();
            }
        }
        return newVC;
    }

    public void allocateNewCurrentBatchAndHV() {
        if (this.outerBatchAllocNotNeeded) {
            return;
        }
        this.currentBatch = this.allocateNewVectorContainer(this.processingOuter ? this.probeBatch : this.buildBatch);
        this.currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), this.allocator);
        this.currHVVector.allocateNew(this.recordsPerBatch);
    }

    public void appendInnerRow(VectorContainer buildContainer, int ind, int hashCode, HashJoinMemoryCalculator.BuildSidePartitioning calc) {
        int pos = this.currentBatch.appendRow(buildContainer, ind);
        this.currHVVector.getMutator().set(pos - 1, hashCode);
        if (pos == this.recordsPerBatch) {
            boolean needsSpill = this.isSpilled || calc.shouldSpill();
            this.completeAnInnerBatch(true, needsSpill);
        }
    }

    public void appendOuterRow(int hashCode, int recordsProcessed) {
        int pos = this.currentBatch.appendRow(this.probeBatch.getContainer(), recordsProcessed);
        this.currHVVector.getMutator().set(pos - 1, hashCode);
        if (pos == this.recordsPerBatch) {
            this.completeAnOuterBatch(true);
        }
    }

    public void completeAnOuterBatch(boolean toInitialize) {
        this.completeABatch(toInitialize, true);
    }

    public void completeAnInnerBatch(boolean toInitialize, boolean needsSpill) {
        this.completeABatch(toInitialize, needsSpill);
    }

    private void completeABatch(boolean toInitialize, boolean needsSpill) {
        if (this.currentBatch.hasRecordCount() && this.currentBatch.getRecordCount() > 0) {
            this.currentBatch.add(this.currHVVector);
            this.currentBatch.buildSchema(BatchSchema.SelectionVectorMode.NONE);
            this.tmpBatchesList.add(this.currentBatch);
            ++this.partitionBatchesCount;
            long batchSize = new RecordBatchSizer(this.currentBatch).getActualSize();
            this.inMemoryBatchStats.add(new HashJoinMemoryCalculator.BatchStat(this.currentBatch.getRecordCount(), batchSize));
            this.partitionInMemorySize += batchSize;
            this.numInMemoryRecords += (long)this.currentBatch.getRecordCount();
        } else {
            this.freeCurrentBatchAndHVVector();
        }
        if (needsSpill) {
            this.spillThisPartition();
        }
        if (toInitialize) {
            this.allocateNewCurrentBatchAndHV();
        } else {
            this.currentBatch = null;
            this.currHVVector = null;
        }
    }

    public void appendBatch(VectorAccessible batch) {
        assert (this.numPartitions == 1);
        int recordCount = batch.getRecordCount();
        this.currHVVector = new IntVector(MaterializedField.create(HASH_VALUE_COLUMN_NAME, HVtype), this.allocator);
        this.currHVVector.allocateNew(recordCount);
        try {
            for (int ind = 0; ind < recordCount; ++ind) {
                int hashCode = this.getBuildHashCode(ind);
                this.currHVVector.getMutator().set(ind, hashCode);
            }
        }
        catch (SchemaChangeException ind) {
            // empty catch block
        }
        VectorContainer container = new VectorContainer();
        ArrayList<ValueVector> vectors = Lists.newArrayList();
        for (VectorWrapper v : batch) {
            TransferPair tp = v.getValueVector().getTransferPair(this.allocator);
            tp.transfer();
            vectors.add(tp.getTo());
        }
        container.addCollection(vectors);
        container.add(this.currHVVector);
        container.setRecordCount(recordCount);
        container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
        this.tmpBatchesList.add(container);
        ++this.partitionBatchesCount;
        this.currHVVector = null;
        this.numInMemoryRecords += (long)recordCount;
    }

    public void spillThisPartition() {
        if (this.tmpBatchesList.size() == 0) {
            return;
        }
        logger.debug("HashJoin: Spilling partition {}, current cycle {}, part size {} batches", new Object[]{this.partitionNum, this.cycleNum, this.tmpBatchesList.size()});
        if (this.writer == null) {
            String side = this.processingOuter ? "outer" : "inner";
            String suffix = this.cycleNum > 0 ? side + "_" + Integer.toString(this.cycleNum) : side;
            this.spillFile = this.spillSet.getNextSpillFile(suffix);
            try {
                this.writer = this.spillSet.writer(this.spillFile);
            }
            catch (IOException ioe) {
                throw UserException.resourceError(ioe).message("Hash Join failed to open spill file: " + this.spillFile, new Object[0]).build(logger);
            }
            this.isSpilled = true;
        }
        this.partitionInMemorySize = 0L;
        this.numInMemoryRecords = 0L;
        this.inMemoryBatchStats.clear();
        while (this.tmpBatchesList.size() > 0) {
            VectorContainer vc = this.tmpBatchesList.remove(0);
            int numRecords = vc.getRecordCount();
            vc.setValueCount(numRecords);
            WritableBatch wBatch = WritableBatch.getBatchNoHVWrap(numRecords, vc, false);
            try {
                this.writer.write(wBatch, null);
            }
            catch (IOException ioe) {
                throw UserException.dataWriteError(ioe).message("Hash Join failed to write to output file: " + this.spillFile, new Object[0]).build(logger);
            }
            finally {
                wBatch.clear();
            }
            vc.zeroVectors();
            logger.trace("HASH JOIN: Took {} us to spill {} records", (Object)this.writer.time(TimeUnit.MICROSECONDS), (Object)numRecords);
        }
    }

    public int probeForKey(int recordsProcessed, int hashCode) throws SchemaChangeException {
        return this.hashTable.probeForKey(recordsProcessed, hashCode);
    }

    public int getRecordNumForKey(int currentIndex) {
        return this.hashTable.getRecordNumForKey(currentIndex);
    }

    public void setRecordNumForKey(int currentIndex, int num) {
        this.hashTable.setRecordNumForKey(currentIndex, num);
    }

    public void decreaseRecordNumForKey(int currentIndex) {
        this.hashTable.decreaseRecordNumForKey(currentIndex);
    }

    public Pair<Integer, Boolean> getStartIndex(int probeIndex) {
        int compositeIndex = this.hjHelper.getStartIndex(probeIndex);
        boolean matchExists = this.hjHelper.setRecordMatched(compositeIndex);
        return Pair.of((Object)compositeIndex, (Object)matchExists);
    }

    public int getNextIndex(int compositeIndex) {
        return this.hjHelper.getNextIndex(compositeIndex);
    }

    public boolean setRecordMatched(int compositeIndex) {
        return this.hjHelper.setRecordMatched(compositeIndex);
    }

    public IntArrayList getNextUnmatchedIndex() {
        return this.hjHelper.getNextUnmatchedIndex();
    }

    public int getBuildHashCode(int ind) throws SchemaChangeException {
        return this.hashTable.getBuildHashCode(ind);
    }

    public int getProbeHashCode(int ind) throws SchemaChangeException {
        return this.hashTable.getProbeHashCode(ind);
    }

    public ArrayList<VectorContainer> getContainers() {
        return this.containers;
    }

    public void updateBatches() throws SchemaChangeException {
        this.hashTable.updateBatches();
    }

    public Pair<VectorContainer, Integer> nextBatch() {
        return this.hashTable.nextBatch();
    }

    @Override
    public List<HashJoinMemoryCalculator.BatchStat> getInMemoryBatches() {
        return this.inMemoryBatchStats;
    }

    @Override
    public int getNumInMemoryBatches() {
        return this.inMemoryBatchStats.size();
    }

    @Override
    public boolean isSpilled() {
        return this.isSpilled;
    }

    @Override
    public long getNumInMemoryRecords() {
        return this.numInMemoryRecords;
    }

    @Override
    public long getInMemorySize() {
        return this.partitionInMemorySize;
    }

    public String getSpillFile() {
        return this.spillFile;
    }

    public int getPartitionBatchesCount() {
        return this.partitionBatchesCount;
    }

    public int getPartitionNum() {
        return this.partitionNum;
    }

    public void closeWriter() {
        this.closeWriterInternal(false);
        this.processingOuter = true;
    }

    private void closeWriterInternal(boolean doDeleteFile) {
        try {
            if (this.writer != null) {
                this.spillSet.close(this.writer);
            }
            if (doDeleteFile && this.spillFile != null) {
                this.spillSet.delete(this.spillFile);
            }
        }
        catch (IOException ioe) {
            throw UserException.resourceError(ioe).message("IO Error while closing %s spill file %s", doDeleteFile ? "and deleting" : "", this.spillFile).build(logger);
        }
        this.spillFile = null;
        this.writer = null;
        this.partitionBatchesCount = 0;
    }

    public void buildContainersHashTableAndHelper() throws SchemaChangeException {
        if (this.isSpilled) {
            return;
        }
        this.containers = new ArrayList();
        this.hashTable.updateInitialCapacity((int)this.getNumInMemoryRecords());
        for (int curr = 0; curr < this.partitionBatchesCount; ++curr) {
            VectorContainer nextBatch = this.tmpBatchesList.get(curr);
            int currentRecordCount = nextBatch.getRecordCount();
            if (!this.semiJoin) {
                this.hjHelper.addNewBatch(currentRecordCount);
            }
            IndexPointer htIndex = new IndexPointer();
            assert (nextBatch != null);
            assert (this.probeBatch != null);
            this.hashTable.updateIncoming(nextBatch, this.probeBatch);
            IntVector HV_vector = (IntVector)nextBatch.getLast();
            for (int recInd = 0; recInd < currentRecordCount; ++recInd) {
                int hashCode = HV_vector.getAccessor().get(recInd);
                try {
                    this.hashTable.put(recInd, htIndex, hashCode, 65536);
                }
                catch (RetryAfterSpillException RE) {
                    throw new OutOfMemoryException("HT put");
                }
                if (this.semiJoin) continue;
                this.hjHelper.setCurrentIndex(htIndex.value, curr, recInd);
            }
            this.containers.add(nextBatch);
        }
        this.outerBatchAllocNotNeeded = true;
    }

    public void getStats(HashTableStats newStats) {
        this.hashTable.getStats(newStats);
    }

    private void clearHashTableAndHelper() {
        if (this.hashTable != null) {
            this.hashTable.clear();
            this.hashTable = null;
        }
        if (this.hjHelper != null) {
            this.hjHelper.clear();
            this.hjHelper = null;
        }
    }

    private void freeCurrentBatchAndHVVector() {
        if (this.currentBatch != null) {
            this.currentBatch.clear();
            this.currentBatch = null;
        }
        if (this.currHVVector != null) {
            this.currHVVector.clear();
            this.currHVVector = null;
        }
    }

    public void cleanup(boolean deleteFile) {
        this.freeCurrentBatchAndHVVector();
        if (this.containers != null && !this.containers.isEmpty()) {
            for (VectorContainer vc : this.containers) {
                vc.clear();
            }
        }
        while (this.tmpBatchesList.size() > 0) {
            VectorContainer vc = this.tmpBatchesList.remove(0);
            vc.clear();
        }
        this.closeWriterInternal(deleteFile);
        this.clearHashTableAndHelper();
        if (this.containers != null) {
            this.containers.clear();
        }
    }

    public void close() {
        this.cleanup(true);
    }

    public String makeDebugString() {
        return String.format("[hashTable = %s]", this.hashTable == null ? "None" : this.hashTable.makeDebugString());
    }
}

