/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.partitionsender;

import com.carrotsearch.hppc.IntArrayList;
import com.sun.codemodel.JCodeModel;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JType;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionOutgoingBatch;
import org.apache.drill.exec.physical.impl.partitionsender.Partitioner;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionSenderRootExec
extends BaseRootExec {
    private static final Logger logger = LoggerFactory.getLogger(PartitionSenderRootExec.class);
    private final RecordBatch incoming;
    private final HashPartitionSender operator;
    private PartitionerDecorator partitioner;
    private final ExchangeFragmentContext context;
    private final int outGoingBatchCount;
    private final HashPartitionSender popConfig;
    private final double cost;
    private final AtomicIntegerArray remainingReceivers;
    private final AtomicInteger remaingReceiverCount;
    private boolean done;
    private boolean first = true;
    private final boolean closeIncoming;
    long minReceiverRecordCount = Long.MAX_VALUE;
    long maxReceiverRecordCount = Long.MIN_VALUE;
    protected final int numberPartitions;
    protected final int actualPartitions;
    private final IntArrayList terminations = new IntArrayList();

    public PartitionSenderRootExec(RootFragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException {
        this(context, incoming, operator, false);
    }

    public PartitionSenderRootExec(RootFragmentContext context, RecordBatch incoming, HashPartitionSender operator, boolean closeIncoming) throws OutOfMemoryException {
        super(context, context.newOperatorContext(operator, null), operator);
        int imposedThreads;
        this.incoming = incoming;
        this.operator = operator;
        this.closeIncoming = closeIncoming;
        this.context = context;
        this.outGoingBatchCount = operator.getDestinations().size();
        this.popConfig = operator;
        this.remainingReceivers = new AtomicIntegerArray(this.outGoingBatchCount);
        this.remaingReceiverCount = new AtomicInteger(this.outGoingBatchCount);
        this.stats.setLongStat(Metric.N_RECEIVERS, this.outGoingBatchCount);
        this.cost = operator.getChild().getCost().getOutputRowCount();
        OptionManager optMgr = context.getOptions();
        long sliceTarget = optMgr.getOption((String)"planner.slice_target").num_val;
        int threadFactor = optMgr.getOption((String)PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue();
        int tmpParts = 1;
        if (sliceTarget != 0L && this.outGoingBatchCount != 0 && (tmpParts = (int)Math.round(this.cost / ((double)sliceTarget * 1.0) / ((double)this.outGoingBatchCount * 1.0) / ((double)threadFactor * 1.0))) < 1) {
            tmpParts = 1;
        }
        this.numberPartitions = (imposedThreads = optMgr.getOption((String)PlannerSettings.PARTITION_SENDER_SET_THREADS.getOptionName()).num_val.intValue()) > 0 ? imposedThreads : Math.min(tmpParts, optMgr.getOption((String)PlannerSettings.PARTITION_SENDER_MAX_THREADS.getOptionName()).num_val.intValue());
        logger.info("Preliminary number of sending threads is: " + this.numberPartitions);
        this.actualPartitions = this.outGoingBatchCount > this.numberPartitions ? this.numberPartitions : this.outGoingBatchCount;
        this.stats.setLongStat(Metric.SENDING_THREADS_COUNT, this.actualPartitions);
        this.stats.setDoubleStat(Metric.COST, this.cost);
    }

    @Override
    public boolean innerNext() {
        RecordBatch.IterOutcome out;
        if (!this.done) {
            out = this.next(this.incoming);
        } else {
            this.incoming.cancel();
            out = RecordBatch.IterOutcome.NONE;
        }
        logger.debug("Partitioner.next(): got next record batch with status {}", (Object)out);
        if (this.first && out == RecordBatch.IterOutcome.OK) {
            out = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
        }
        switch (out) {
            case NONE: {
                try {
                    if (this.partitioner != null) {
                        this.partitioner.flushOutgoingBatches(true, false);
                    } else {
                        this.sendEmptyBatch(true);
                    }
                }
                catch (ExecutionException e) {
                    throw UserException.dataWriteError(e).addContext("Error while creating partitioning sender or flushing outgoing batches").build(logger);
                }
                return false;
            }
            case OK_NEW_SCHEMA: {
                try {
                    if (this.partitioner != null) {
                        this.partitioner.flushOutgoingBatches(false, true);
                        this.partitioner.clear();
                    }
                    try {
                        this.stats.startSetup();
                        this.createPartitioner();
                    }
                    finally {
                        this.stats.stopSetup();
                    }
                    if (this.first) {
                        this.first = false;
                        this.sendEmptyBatch(false);
                    }
                }
                catch (ExecutionException e) {
                    throw UserException.dataWriteError(e).addContext("Error while flushing outgoing batches").build(logger);
                }
            }
            case OK: {
                try {
                    this.partitioner.partitionBatch(this.incoming);
                }
                catch (ExecutionException e) {
                    throw UserException.dataWriteError(e).addContext("Error while partitioning outgoing batches").build(logger);
                }
                VectorAccessibleUtilities.clear(this.incoming);
                return true;
            }
        }
        throw new IllegalStateException();
    }

    @VisibleForTesting
    protected void createPartitioner() {
        this.createClassInstances(this.actualPartitions);
    }

    private List<Partitioner> createClassInstances(int actualPartitions) {
        LogicalExpression expr = this.operator.getExpr();
        ErrorCollectorImpl collector = new ErrorCollectorImpl();
        ClassGenerator<Partitioner> cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, this.context.getOptions());
        cg.getCodeGenerator().plainJavaCapable(true);
        ClassGenerator<Partitioner> cgInner = cg.getInnerGenerator("OutgoingRecordBatch");
        LogicalExpression materializedExpr = ExpressionTreeMaterializer.materialize(expr, this.incoming, collector, this.context.getFunctionRegistry());
        collector.reportErrors(logger);
        JExpression bucket = JExpr.direct((String)"bucket");
        ClassGenerator.HoldingContainer exprHolder = cg.addExpr(materializedExpr);
        cg.getEvalBlock().decl((JType)JType.parse((JCodeModel)cg.getModel(), (String)"int"), "bucket", exprHolder.getValue().mod(JExpr.lit((int)this.outGoingBatchCount)));
        cg.getEvalBlock()._return((JExpression)cg.getModel().ref(Math.class).staticInvoke("abs").arg(bucket));
        CopyUtil.generateCopies(cgInner, this.incoming, this.incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.FOUR_BYTE);
        List<Partitioner> subPartitioners = this.context.getImplementationClass(cg, actualPartitions);
        int divisor = Math.max(1, this.outGoingBatchCount / actualPartitions);
        int longTail = this.outGoingBatchCount % actualPartitions;
        int startIndex = 0;
        int endIndex = 0;
        boolean success = false;
        try {
            for (int i = 0; i < actualPartitions; ++i) {
                startIndex = endIndex;
                int n = endIndex = i < actualPartitions - 1 ? startIndex + divisor : this.outGoingBatchCount;
                if (i < longTail) {
                    ++endIndex;
                }
                OperatorStats partitionStats = new OperatorStats(this.stats, true);
                subPartitioners.get(i).setup(this.context, this.incoming, this.popConfig, partitionStats, this.oContext, cgInner, startIndex, endIndex);
            }
            this.partitioner = new PartitionerDecorator(subPartitioners, this.stats, this.context);
            for (int index = 0; index < this.terminations.size(); ++index) {
                this.partitioner.getOutgoingBatches(this.terminations.buffer[index]).terminate();
            }
            this.terminations.clear();
            success = true;
        }
        catch (SchemaChangeException e) {
            throw AbstractRecordBatch.schemaChangeException(e, "Partition Sender", logger);
        }
        finally {
            if (!success) {
                for (Partitioner p : subPartitioners) {
                    p.clear();
                }
            }
        }
        return subPartitioners;
    }

    private void updateAggregateStats() {
        for (Partitioner part : this.partitioner.getPartitioners()) {
            for (PartitionOutgoingBatch partitionOutgoingBatch : part.getOutgoingBatches()) {
                long totalRecords = partitionOutgoingBatch.getTotalRecords();
                this.minReceiverRecordCount = Math.min(this.minReceiverRecordCount, totalRecords);
                this.maxReceiverRecordCount = Math.max(this.maxReceiverRecordCount, totalRecords);
            }
        }
        this.stats.setLongStat(Metric.MIN_RECORDS, this.minReceiverRecordCount);
        this.stats.setLongStat(Metric.MAX_RECORDS, this.maxReceiverRecordCount);
    }

    @Override
    public void receivingFragmentFinished(ExecProtos.FragmentHandle handle) {
        int id = handle.getMinorFragmentId();
        if (this.remainingReceivers.compareAndSet(id, 0, 1)) {
            if (this.partitioner == null) {
                this.terminations.add(id);
            } else {
                this.partitioner.getOutgoingBatches(id).terminate();
            }
            int remaining = this.remaingReceiverCount.decrementAndGet();
            if (remaining == 0) {
                this.done = true;
            }
        }
    }

    @Override
    public void close() throws Exception {
        logger.debug("Partition sender stopping.");
        super.close();
        if (this.partitioner != null) {
            this.updateAggregateStats();
            this.partitioner.clear();
        }
        if (this.closeIncoming) {
            ((CloseableRecordBatch)this.incoming).close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendEmptyBatch(boolean isLast) {
        BatchSchema schema = this.incoming.getSchema();
        if (schema == null) {
            schema = BatchSchema.newBuilder().build();
        }
        ExecProtos.FragmentHandle handle = this.context.getHandle();
        for (MinorFragmentEndpoint destination : this.popConfig.getDestinations()) {
            AccountingDataTunnel tunnel = this.context.getDataTunnel(destination.getEndpoint());
            FragmentWritableBatch writableBatch = FragmentWritableBatch.getEmptyBatchWithSchema(isLast, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), this.operator.getOppositeMajorFragmentId(), destination.getId(), schema);
            this.stats.startWait();
            try {
                tunnel.sendRecordBatch(writableBatch);
            }
            finally {
                this.stats.stopWait();
            }
        }
        this.stats.addLongStat(Metric.BATCHES_SENT, 1L);
    }

    @VisibleForTesting
    protected PartitionerDecorator getPartitioner() {
        return this.partitioner;
    }

    public static enum Metric implements MetricDef
    {
        BATCHES_SENT,
        RECORDS_SENT,
        MIN_RECORDS,
        MAX_RECORDS,
        N_RECEIVERS,
        BYTES_SENT,
        SENDING_THREADS_COUNT,
        COST;


        @Override
        public int metricId() {
            return this.ordinal();
        }
    }
}

