/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.orderedpartitioner;

import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.cache.CachedVectorContainer;
import org.apache.drill.exec.cache.Counter;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.DistributedMap;
import org.apache.drill.exec.cache.DistributedMultiMap;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
import org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionProjector;
import org.apache.drill.exec.physical.impl.orderedpartitioner.SampleCopier;
import org.apache.drill.exec.physical.impl.sort.SortBatch;
import org.apache.drill.exec.physical.impl.sort.SortRecordBatchBuilder;
import org.apache.drill.exec.physical.impl.sort.Sorter;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrderedPartitionRecordBatch
extends AbstractRecordBatch<OrderedPartitionSender> {
    static final Logger logger = LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
    public static final DistributedCache.CacheConfig<String, CachedVectorContainer> SINGLE_CACHE_CONFIG = DistributedCache.CacheConfig.newBuilder(CachedVectorContainer.class).name("SINGLE-" + CachedVectorContainer.class.getSimpleName()).mode(DistributedCache.SerializationMode.DRILL_SERIALIZIABLE).build();
    public static final DistributedCache.CacheConfig<String, CachedVectorContainer> MULTI_CACHE_CONFIG = DistributedCache.CacheConfig.newBuilder(CachedVectorContainer.class).name("MULTI-" + CachedVectorContainer.class.getSimpleName()).mode(DistributedCache.SerializationMode.DRILL_SERIALIZIABLE).build();
    private final MappingSet mainMapping = new MappingSet((String)null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
    private final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
    private final MappingSet partitionMapping = new MappingSet("partitionIndex", null, "partitionVectors", null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
    private final int recordsToSample;
    private final int samplingFactor;
    private final float completionFactor;
    protected final RecordBatch incoming;
    private boolean first = true;
    private OrderedPartitionProjector projector;
    private final VectorContainer partitionVectors = new VectorContainer();
    private final int partitions;
    private Queue<VectorContainer> batchQueue;
    private int recordsSampled;
    private final int sendingMajorFragmentWidth;
    private boolean startedUnsampledBatches;
    private boolean upstreamNone;
    private int recordCount;
    private final IntVector partitionKeyVector;
    private final DistributedMap<String, CachedVectorContainer> tableMap;
    private final Counter minorFragmentSampleCount;
    private final DistributedMultiMap<String, CachedVectorContainer> mmap;
    private final String mapKey;
    private List<VectorContainer> sampledIncomingBatches;

    public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
        super(pop, context);
        this.incoming = incoming;
        this.partitions = pop.getDestinations().size();
        this.sendingMajorFragmentWidth = pop.getSendingWidth();
        this.recordsToSample = pop.getRecordsToSample();
        this.samplingFactor = pop.getSamplingFactor();
        this.completionFactor = pop.getCompletionFactor();
        DistributedCache cache = null;
        this.mmap = cache.getMultiMap(MULTI_CACHE_CONFIG);
        this.tableMap = cache.getMap(SINGLE_CACHE_CONFIG);
        Preconditions.checkNotNull(this.tableMap);
        this.mapKey = String.format("%s_%d", context.getHandle().getQueryId(), context.getHandle().getMajorFragmentId());
        this.minorFragmentSampleCount = cache.getCounter(this.mapKey);
        FieldReference outputPath = ((OrderedPartitionSender)this.popConfig).getRef();
        MaterializedField outputField = MaterializedField.create(outputPath.getAsNamePart().getName(), Types.required(TypeProtos.MinorType.INT));
        this.partitionKeyVector = (IntVector)TypeHelper.getNewVector(outputField, this.oContext.getAllocator());
    }

    @Override
    public void close() {
        super.close();
        this.partitionVectors.clear();
        this.partitionKeyVector.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean saveSamples() {
        this.recordsSampled = 0;
        SortRecordBatchBuilder builder = new SortRecordBatchBuilder(this.oContext.getAllocator());
        WritableBatch batch = null;
        CachedVectorContainer sampleToSave = null;
        VectorContainer containerToCache = new VectorContainer();
        try {
            builder.add(this.incoming);
            this.recordsSampled += this.incoming.getRecordCount();
            block8: while (this.recordsSampled < this.recordsToSample) {
                RecordBatch.IterOutcome upstream = this.next(this.incoming);
                switch (upstream) {
                    case NONE: 
                    case NOT_YET: {
                        this.upstreamNone = true;
                        break block8;
                    }
                    default: {
                        builder.add(this.incoming);
                        this.recordsSampled += this.incoming.getRecordCount();
                        if (upstream != RecordBatch.IterOutcome.NONE) continue block8;
                        break block8;
                    }
                }
            }
            VectorContainer sortedSamples = new VectorContainer();
            builder.build(sortedSamples);
            Sorter sorter = SortBatch.createNewSorter(this.context, ((OrderedPartitionSender)this.popConfig).getOrderings(), sortedSamples);
            SelectionVector4 sv4 = builder.getSv4();
            try {
                sorter.setup(this.context, sv4, sortedSamples);
            }
            catch (SchemaChangeException e) {
                throw this.schemaChangeException(e, logger);
            }
            sorter.sort(sv4, sortedSamples);
            ArrayList<ValueVector> localAllocationVectors = Lists.newArrayList();
            SampleCopier copier = this.getCopier(sv4, sortedSamples, containerToCache, ((OrderedPartitionSender)this.popConfig).getOrderings(), localAllocationVectors);
            int allocationSize = 50;
            while (true) {
                for (ValueVector vv : localAllocationVectors) {
                    AllocationHelper.allocate(vv, this.samplingFactor * this.partitions, allocationSize);
                }
                if (copier.copyRecords(this.recordsSampled / (this.samplingFactor * this.partitions), 0, this.samplingFactor * this.partitions)) break;
                containerToCache.zeroVectors();
                allocationSize *= 2;
            }
            containerToCache.setValueCount(copier.getOutputRecords());
            batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
            sampleToSave = new CachedVectorContainer(batch, this.context.getAllocator());
            this.mmap.put(this.mapKey, sampleToSave);
            this.sampledIncomingBatches = builder.getHeldRecordBatches();
        }
        finally {
            builder.clear();
            builder.close();
            if (batch != null) {
                batch.clear();
            }
            containerToCache.clear();
            if (sampleToSave != null) {
                sampleToSave.clear();
            }
        }
        return true;
    }

    private void waitUntilTimeOut(long timeout) {
        while (true) {
            try {
                while (true) {
                    Thread.sleep(timeout);
                }
            }
            catch (InterruptedException e) {
                this.checkContinue();
                continue;
            }
            break;
        }
    }

    private void getPartitionVectors() {
        this.saveSamples();
        CachedVectorContainer finalTable = null;
        long val = this.minorFragmentSampleCount.incrementAndGet();
        logger.debug("Incremented mfsc, got {}", (Object)val);
        long fragmentsBeforeProceed = (long)Math.ceil((float)this.sendingMajorFragmentWidth * this.completionFactor);
        String finalTableKey = this.mapKey + "final";
        if (val == fragmentsBeforeProceed) {
            this.buildTable();
            finalTable = this.tableMap.get(finalTableKey);
        } else {
            if (val < fragmentsBeforeProceed) {
                this.waitUntilTimeOut(10L);
            }
            for (int i = 0; i < 100 && finalTable == null && (finalTable = this.tableMap.get(finalTableKey)) == null; ++i) {
                this.waitUntilTimeOut(10L);
            }
            if (finalTable == null) {
                this.buildTable();
            }
            finalTable = this.tableMap.get(finalTableKey);
        }
        Preconditions.checkState(finalTable != null);
        for (VectorWrapper w : finalTable.get()) {
            this.partitionVectors.add((ValueVector)w.getValueVector());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void buildTable() {
        SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(this.context.getAllocator());
        VectorContainer allSamplesContainer = new VectorContainer();
        VectorContainer candidatePartitionTable = new VectorContainer();
        CachedVectorContainer wrap = null;
        try {
            for (CachedVectorContainer w : this.mmap.get(this.mapKey)) {
                containerBuilder.add(w.get());
            }
            containerBuilder.build(allSamplesContainer);
            ArrayList<Order.Ordering> orderDefs = Lists.newArrayList();
            int i = 0;
            for (Order.Ordering od : ((OrderedPartitionSender)this.popConfig).getOrderings()) {
                SchemaPath sp = SchemaPath.getSimplePath("f" + i++);
                orderDefs.add(new Order.Ordering(od.getDirection(), new FieldReference(sp)));
            }
            SelectionVector4 newSv4 = containerBuilder.getSv4();
            Sorter sorter = SortBatch.createNewSorter(this.context, orderDefs, allSamplesContainer);
            try {
                sorter.setup(this.context, newSv4, allSamplesContainer);
            }
            catch (SchemaChangeException e) {
                throw this.schemaChangeException(e, logger);
            }
            sorter.sort(newSv4, allSamplesContainer);
            SampleCopier copier = null;
            ArrayList<ValueVector> localAllocationVectors = Lists.newArrayList();
            copier = this.getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors);
            int allocationSize = 50;
            while (true) {
                for (ValueVector vv : localAllocationVectors) {
                    AllocationHelper.allocate(vv, this.samplingFactor * this.partitions, allocationSize);
                }
                int skipRecords = containerBuilder.getSv4().getTotalCount() / this.partitions;
                if (copier.copyRecords(skipRecords, skipRecords, this.partitions - 1)) {
                    assert (copier.getOutputRecords() == this.partitions - 1) : String.format("output records: %d partitions: %d", copier.getOutputRecords(), this.partitions);
                    break;
                }
                candidatePartitionTable.zeroVectors();
                allocationSize *= 2;
            }
            candidatePartitionTable.setValueCount(copier.getOutputRecords());
            candidatePartitionTable.setRecordCount(copier.getOutputRecords());
            WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
            wrap = new CachedVectorContainer(batch, this.context.getAllocator());
            this.tableMap.putIfAbsent(this.mapKey + "final", wrap, 1L, TimeUnit.MINUTES);
        }
        finally {
            candidatePartitionTable.clear();
            allSamplesContainer.clear();
            containerBuilder.clear();
            containerBuilder.close();
            if (wrap != null) {
                wrap.clear();
            }
        }
    }

    private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing, List<Order.Ordering> orderings, List<ValueVector> localAllocationVectors) {
        ErrorCollectorImpl collector = new ErrorCollectorImpl();
        ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, this.context.getOptions());
        int i = 0;
        for (Order.Ordering od : orderings) {
            LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), incoming, collector, this.context.getFunctionRegistry());
            TypeProtos.MajorType.Builder builder = TypeProtos.MajorType.newBuilder().mergeFrom(expr.getMajorType()).clearMode().setMode(TypeProtos.DataMode.REQUIRED);
            TypeProtos.MajorType newType = builder.build();
            MaterializedField outputField = MaterializedField.create("f" + i++, newType);
            collector.reportErrors(logger);
            ValueVector vector = TypeHelper.getNewVector(outputField, this.oContext.getAllocator());
            localAllocationVectors.add(vector);
            TypedFieldId fid = outgoing.add(vector);
            ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
            ClassGenerator.HoldingContainer hc = cg.addExpr(write);
            cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit((int)0)))._then()._return(JExpr.FALSE);
        }
        cg.rotateBlock();
        cg.getEvalBlock()._return(JExpr.TRUE);
        outgoing.buildSchema(BatchSchema.SelectionVectorMode.NONE);
        try {
            SampleCopier sampleCopier = this.context.getImplementationClass(cg);
            sampleCopier.setupCopier(this.context, sv4, incoming, outgoing);
            return sampleCopier;
        }
        catch (SchemaChangeException e) {
            throw this.schemaChangeException(e, logger);
        }
    }

    @Override
    protected void cancelIncoming() {
        this.incoming.cancel();
    }

    @Override
    public RecordBatch.IterOutcome innerNext() {
        this.recordCount = 0;
        this.container.zeroVectors();
        if (this.upstreamNone && (this.batchQueue == null || this.batchQueue.size() == 0)) {
            return RecordBatch.IterOutcome.NONE;
        }
        if (this.batchQueue != null && this.batchQueue.size() > 0) {
            VectorContainer vc = this.batchQueue.poll();
            this.recordCount = vc.getRecordCount();
            this.setupNewSchema(vc);
            this.doWork(vc);
            vc.zeroVectors();
            return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
        }
        RecordBatch.IterOutcome upstream = this.next(this.incoming);
        if (this.first && upstream == RecordBatch.IterOutcome.OK) {
            throw new RuntimeException("Invalid state: First batch should have OK_NEW_SCHEMA");
        }
        if (this.first && upstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) {
            this.getPartitionVectors();
            this.batchQueue = new LinkedBlockingQueue<VectorContainer>(this.sampledIncomingBatches);
            this.first = false;
            VectorContainer vc = this.batchQueue.poll();
            this.setupNewSchema(vc);
            this.doWork(vc);
            vc.zeroVectors();
            this.recordCount = vc.getRecordCount();
            return RecordBatch.IterOutcome.OK_NEW_SCHEMA;
        }
        if (!this.startedUnsampledBatches) {
            this.startedUnsampledBatches = true;
            if (upstream == RecordBatch.IterOutcome.OK) {
                upstream = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
            }
        }
        switch (upstream) {
            case NONE: 
            case NOT_YET: {
                this.close();
                this.recordCount = 0;
                return upstream;
            }
            case OK_NEW_SCHEMA: {
                this.setupNewSchema(this.incoming);
            }
            case OK: {
                this.doWork(this.incoming);
                this.recordCount = this.incoming.getRecordCount();
                return upstream;
            }
        }
        throw new UnsupportedOperationException();
    }

    @Override
    public int getRecordCount() {
        return this.recordCount;
    }

    protected void doWork(VectorAccessible batch) {
        int recordCount = batch.getRecordCount();
        AllocationHelper.allocate(this.partitionKeyVector, recordCount, 50);
        this.projector.projectRecords(recordCount, 0);
        this.container.setValueCount(recordCount);
    }

    protected void setupNewSchema(VectorAccessible batch) {
        this.container.clear();
        ErrorCollectorImpl collector = new ErrorCollectorImpl();
        ArrayList<TransferPair> transfers = Lists.newArrayList();
        ClassGenerator<OrderedPartitionProjector> cg = CodeGenerator.getRoot(OrderedPartitionProjector.TEMPLATE_DEFINITION, this.context.getOptions());
        for (Object vw : batch) {
            TransferPair tp = vw.getValueVector().getTransferPair(this.oContext.getAllocator());
            transfers.add(tp);
            this.container.add(tp.getTo());
        }
        cg.setMappingSet(this.mainMapping);
        int count = 0;
        for (Order.Ordering od : ((OrderedPartitionSender)this.popConfig).getOrderings()) {
            LogicalExpression expr = ExpressionTreeMaterializer.materialize(od.getExpr(), batch, collector, this.context.getFunctionRegistry());
            collector.reportErrors(logger);
            cg.setMappingSet(this.incomingMapping);
            ClassGenerator.HoldingContainer left = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
            cg.setMappingSet(this.partitionMapping);
            TypedFieldId fieldId = new TypedFieldId.Builder().finalType(expr.getMajorType()).addId(count++).build();
            ClassGenerator.HoldingContainer right = cg.addExpr(new ValueVectorReadExpression(fieldId), ClassGenerator.BlkCreateMode.FALSE);
            cg.setMappingSet(this.mainMapping);
            LogicalExpression fh = FunctionGenerationHelper.getOrderingComparator(od.nullsSortHigh(), left, right, this.context.getFunctionRegistry());
            ClassGenerator.HoldingContainer out = cg.addExpr(fh, ClassGenerator.BlkCreateMode.FALSE);
            JConditional jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit((int)0)));
            if (od.getDirection() == RelFieldCollation.Direction.ASCENDING) {
                jc._then()._return((JExpression)out.getValue());
                continue;
            }
            jc._then()._return(out.getValue().minus());
        }
        cg.getEvalBlock()._return(JExpr.lit((int)0));
        this.container.add(this.partitionKeyVector);
        this.container.buildSchema(batch.getSchema().getSelectionVectorMode());
        this.projector = this.context.getImplementationClass(cg);
        try {
            this.projector.setup(this.context, batch, this, transfers, this.partitionVectors, this.partitions, ((OrderedPartitionSender)this.popConfig).getRef());
        }
        catch (SchemaChangeException e) {
            throw UserException.schemaChangeError(e).addContext("Unexpected schema change in the Ordered Partitioner").build(logger);
        }
    }

    @Override
    public void dump() {
        logger.error("OrderedPartitionRecordBatch[container={}, popConfig={}, partitionVectors={}, partitions={}, recordsSampled={}, recordCount={}]", new Object[]{this.container, this.popConfig, this.partitionVectors, this.partitions, this.recordsSampled, this.recordCount});
    }
}

