/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.partitionsender;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import javax.inject.Named;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.common.CodeGenMemberInjector;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionOutgoingBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.Partitioner;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class PartitionerTemplate
implements Partitioner {
    static final Logger logger = LoggerFactory.getLogger(PartitionerTemplate.class);
    private static final int DEFAULT_RECORD_BATCH_SIZE = 1023;
    private SelectionVector2 sv2;
    private SelectionVector4 sv4;
    private RecordBatch incoming;
    private OperatorStats stats;
    protected ClassGenerator<?> cg;
    protected FragmentContext context;
    private int start;
    private int end;
    private final List<OutgoingRecordBatch> outgoingBatches = Lists.newArrayList();
    private int outgoingRecordBatchSize = 1023;

    @Override
    public List<? extends PartitionOutgoingBatch> getOutgoingBatches() {
        return this.outgoingBatches;
    }

    @Override
    public PartitionOutgoingBatch getOutgoingBatch(int index) {
        if (index >= this.start && index < this.end) {
            return this.outgoingBatches.get(index - this.start);
        }
        return null;
    }

    @Override
    public final void setup(ExchangeFragmentContext context, RecordBatch incoming, HashPartitionSender popConfig, OperatorStats stats, OperatorContext oContext, ClassGenerator<?> cg, int start, int end) throws SchemaChangeException {
        this.incoming = incoming;
        this.stats = stats;
        this.context = context;
        this.cg = cg;
        this.start = start;
        this.end = end;
        this.doSetup(context, incoming, null);
        int destinationCount = popConfig.getDestinations().size();
        int reductionCutoff = oContext.getFragmentContext().getOptions().getInt("exec.partition.mem_throttle");
        if (reductionCutoff > 0 && destinationCount >= reductionCutoff) {
            int reducedBatchSize = Math.max(256, 1024 / (destinationCount - reductionCutoff));
            this.outgoingRecordBatchSize = BaseAllocator.nextPowerOfTwo(reducedBatchSize) - 1;
            logger.info("{} is set to {}: {} receivers, reduced send buffer size from {} to {} rows", new Object[]{"exec.partition.mem_throttle", reductionCutoff, destinationCount, 1023, this.outgoingRecordBatchSize});
        } else if (destinationCount > 1000) {
            this.outgoingRecordBatchSize = 511;
        }
        int fieldId = 0;
        for (MinorFragmentEndpoint destination : popConfig.getDestinations()) {
            if (fieldId >= start && fieldId < end) {
                logger.debug("start: {}, count: {}, fieldId: {}", new Object[]{start, end, fieldId});
                this.outgoingBatches.add(this.newOutgoingRecordBatch(stats, popConfig, context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId()));
            }
            ++fieldId;
        }
        for (OutgoingRecordBatch outgoingRecordBatch : this.outgoingBatches) {
            outgoingRecordBatch.initializeBatch();
        }
        BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
        switch (svMode) {
            case FOUR_BYTE: {
                this.sv4 = incoming.getSelectionVector4();
                break;
            }
            case TWO_BYTE: {
                this.sv2 = incoming.getSelectionVector2();
                break;
            }
            case NONE: {
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown selection vector mode: " + svMode.toString());
            }
        }
    }

    protected OutgoingRecordBatch newOutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel, FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
        return this.injectMembers(new OutgoingRecordBatch(stats, operator, tunnel, context, allocator, oppositeMinorFragmentId));
    }

    protected OutgoingRecordBatch injectMembers(OutgoingRecordBatch outgoingRecordBatch) {
        CodeGenMemberInjector.injectMembers(this.cg, outgoingRecordBatch, this.context);
        return outgoingRecordBatch;
    }

    @Override
    public OperatorStats getStats() {
        return this.stats;
    }

    @Override
    public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException {
        for (OutgoingRecordBatch batch : this.outgoingBatches) {
            logger.debug("Attempting to flush all outgoing batches");
            if (isLastBatch) {
                batch.setIsLast();
            }
            batch.flush(schemaChanged);
            if (!schemaChanged) continue;
            batch.resetBatch();
            batch.initializeBatch();
        }
    }

    @Override
    public void partitionBatch(RecordBatch incoming) throws IOException {
        BatchSchema.SelectionVectorMode svMode = incoming.getSchema().getSelectionVectorMode();
        switch (svMode) {
            case NONE: {
                for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
                    this.doCopy(recordId);
                }
                break;
            }
            case TWO_BYTE: {
                for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
                    char svIndex = this.sv2.getIndex(recordId);
                    this.doCopy(svIndex);
                }
                break;
            }
            case FOUR_BYTE: {
                for (int recordId = 0; recordId < incoming.getRecordCount(); ++recordId) {
                    int svIndex = this.sv4.get(recordId);
                    this.doCopy(svIndex);
                }
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown selection vector mode: " + svMode.toString());
            }
        }
    }

    private void doCopy(int svIndex) throws IOException {
        int index;
        try {
            index = this.doEval(svIndex);
        }
        catch (SchemaChangeException e) {
            throw new UnsupportedOperationException(e);
        }
        if (index >= this.start && index < this.end) {
            OutgoingRecordBatch outgoingBatch = this.outgoingBatches.get(index - this.start);
            outgoingBatch.copy(svIndex);
        }
    }

    @Override
    public void initialize() {
    }

    @Override
    public void clear() {
        for (OutgoingRecordBatch outgoingRecordBatch : this.outgoingBatches) {
            outgoingRecordBatch.clear();
        }
    }

    public abstract void doSetup(@Named(value="context") FragmentContext var1, @Named(value="incoming") RecordBatch var2, @Named(value="outgoing") OutgoingRecordBatch[] var3) throws SchemaChangeException;

    public abstract int doEval(@Named(value="inIndex") int var1) throws SchemaChangeException;

    public class OutgoingRecordBatch
    implements PartitionOutgoingBatch,
    VectorAccessible {
        private final AccountingDataTunnel tunnel;
        private final HashPartitionSender operator;
        private final FragmentContext context;
        private final VectorContainer vectorContainer;
        private final int oppositeMinorFragmentId;
        private final OperatorStats stats;
        private boolean isLast;
        private boolean dropAll;
        private int recordCount;
        private int totalRecords;

        public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel, FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
            this.context = context;
            this.operator = operator;
            this.tunnel = tunnel;
            this.stats = stats;
            this.oppositeMinorFragmentId = oppositeMinorFragmentId;
            this.vectorContainer = new VectorContainer(allocator);
        }

        protected void copy(int inIndex) throws IOException {
            try {
                this.doEval(inIndex, this.recordCount);
            }
            catch (SchemaChangeException e) {
                throw new UnsupportedOperationException(e);
            }
            ++this.recordCount;
            ++this.totalRecords;
            if (this.recordCount == PartitionerTemplate.this.outgoingRecordBatchSize) {
                this.flush(false);
            }
        }

        @Override
        public void terminate() {
            this.dropAll = true;
        }

        @RuntimeOverridden
        protected void doSetup(@Named(value="incoming") RecordBatch incoming, @Named(value="outgoing") VectorAccessible outgoing) throws SchemaChangeException {
        }

        @RuntimeOverridden
        protected void doEval(@Named(value="inIndex") int inIndex, @Named(value="outIndex") int outIndex) throws SchemaChangeException {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void flush(boolean schemaChanged) throws IOException {
            boolean isLastBatch;
            if (this.dropAll) {
                this.recordCount = 0;
                return;
            }
            ExecProtos.FragmentHandle handle = this.context.getHandle();
            boolean bl = isLastBatch = this.isLast || Thread.currentThread().isInterrupted();
            if (!isLastBatch && this.recordCount == 0) {
                return;
            }
            this.vectorContainer.setValueCount(this.recordCount);
            FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLastBatch, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), this.operator.getOppositeMajorFragmentId(), this.oppositeMinorFragmentId, this.getWritableBatch());
            this.updateStats(writableBatch);
            this.stats.startWait();
            try {
                this.tunnel.sendRecordBatch(writableBatch);
            }
            finally {
                this.stats.stopWait();
            }
            if (isLastBatch) {
                this.dropAll = true;
            }
            if (!schemaChanged) {
                this.recordCount = 0;
                this.vectorContainer.zeroVectors();
                this.allocateOutgoingRecordBatch();
            }
        }

        private void allocateOutgoingRecordBatch() {
            this.vectorContainer.allocate(PartitionerTemplate.this.outgoingRecordBatchSize);
        }

        public void updateStats(FragmentWritableBatch writableBatch) {
            this.stats.addLongStat(PartitionSenderRootExec.Metric.BYTES_SENT, writableBatch.getByteCount());
            this.stats.addLongStat(PartitionSenderRootExec.Metric.BATCHES_SENT, 1L);
            this.stats.addLongStat(PartitionSenderRootExec.Metric.RECORDS_SENT, writableBatch.getHeader().getDef().getRecordCount());
        }

        public void initializeBatch() {
            this.vectorContainer.buildFrom(PartitionerTemplate.this.incoming.getSchema());
            this.allocateOutgoingRecordBatch();
            try {
                this.doSetup(PartitionerTemplate.this.incoming, this.vectorContainer);
            }
            catch (SchemaChangeException e) {
                throw new UnsupportedOperationException(e);
            }
        }

        public void resetBatch() {
            this.isLast = false;
            this.recordCount = 0;
            this.vectorContainer.clear();
        }

        public void setIsLast() {
            this.isLast = true;
        }

        @Override
        public BatchSchema getSchema() {
            return PartitionerTemplate.this.incoming.getSchema();
        }

        @Override
        public int getRecordCount() {
            return this.recordCount;
        }

        @Override
        public long getTotalRecords() {
            return this.totalRecords;
        }

        @Override
        public TypedFieldId getValueVectorId(SchemaPath path) {
            return this.vectorContainer.getValueVectorId(path);
        }

        @Override
        public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int ... fieldIds) {
            return this.vectorContainer.getValueAccessorById(clazz, fieldIds);
        }

        @Override
        public Iterator<VectorWrapper<?>> iterator() {
            return this.vectorContainer.iterator();
        }

        @Override
        public SelectionVector2 getSelectionVector2() {
            throw new UnsupportedOperationException();
        }

        @Override
        public SelectionVector4 getSelectionVector4() {
            throw new UnsupportedOperationException();
        }

        public WritableBatch getWritableBatch() {
            return WritableBatch.getBatchNoHVWrap(this.recordCount, this, false);
        }

        public void clear() {
            this.vectorContainer.clear();
        }
    }
}

