/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.impl.ValueVectorHashHelper;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.base.AbstractBase;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.aggregate.SpilledRecordBatch;
import org.apache.drill.exec.physical.impl.common.AbstractSpilledPartitionMetadata;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashPartition;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.SpilledState;
import org.apache.drill.exec.physical.impl.join.HashJoinMechanicalMemoryCalculator;
import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculator;
import org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl;
import org.apache.drill.exec.physical.impl.join.Probe;
import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.AbstractBinaryRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessibleUtilities;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractHashBinaryRecordBatch<T extends PhysicalOperator>
extends AbstractBinaryRecordBatch<T> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    protected boolean semiJoin;
    protected boolean joinIsLeftOrFull;
    protected boolean joinIsRightOrFull;
    protected boolean isRowKeyJoin;
    protected boolean enableRuntimeFilter;
    protected RuntimeFilterDef runtimeFilterDef = null;
    protected List<NamedExpression> rightExpr = Lists.newArrayList();
    protected Set<String> buildJoinColumns = Sets.newHashSet();
    protected boolean skipHashTableBuild;
    protected final int RECORDS_PER_BATCH;
    protected RowKeyJoin.RowKeyJoinState rkJoinState = RowKeyJoin.RowKeyJoinState.INITIAL;
    protected Probe probe = null;
    protected int numPartitions = 1;
    protected ChainedHashTable baseHashTable;
    protected final MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
    protected final MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
    protected boolean canSpill = true;
    protected boolean wasKilled;
    protected HashPartition[] partitions;
    protected int outputRecords;
    protected BatchSchema buildSchema;
    protected BatchSchema probeSchema;
    protected boolean buildComplete;
    protected boolean firstOutputBatch = true;
    protected int rightHVColPosition;
    protected final BufferAllocator allocator;
    protected RecordBatch buildBatch;
    protected RecordBatch probeBatch;
    protected final MutableBoolean prefetchedBuild = new MutableBoolean(false);
    protected final MutableBoolean prefetchedProbe = new MutableBoolean(false);
    protected final SpillSet spillSet;
    T popConfig;
    protected final int originalPartition = -1;
    IntVector read_right_HV_vector;
    protected final int maxBatchesInMemory;
    protected final List<String> probeFields = new ArrayList<String>();
    protected RuntimeFilterReporter runtimeFilterReporter;
    protected ValueVectorHashHelper.Hash64 hash64;
    protected final Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<BloomFilter, Integer>();
    protected final Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<BloomFilterDef, Integer>();
    protected final List<BloomFilter> bloomFilters = new ArrayList<BloomFilter>();
    protected boolean bloomFiltersGenerated;
    protected final SpilledState<SpilledPartition> spilledState = new SpilledState();
    private final Updater spilledStateUpdater = new Updater();
    protected SpilledPartition[] spilledInners;

    @Override
    public int getRecordCount() {
        return this.outputRecords;
    }

    @Override
    protected void buildSchema() {
        boolean validSchema = this.prefetchFirstBatchFromBothSides();
        if (validSchema) {
            this.state = AbstractRecordBatch.BatchState.BUILD_SCHEMA;
            if (this.leftUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) {
                this.probeSchema = this.left.getSchema();
            }
            if (this.rightUpstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA) {
                this.buildSchema = this.right.getSchema();
                this.rightHVColPosition = this.right.getContainer().getNumberOfColumns();
                this.skipHashTableBuild = this.leftUpstream == RecordBatch.IterOutcome.NONE && !this.joinIsRightOrFull;
                this.setupHashTable();
            }
            this.probe = this.createProbe();
        }
        this.setupOutputContainerSchema();
        this.container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
        this.container.setEmpty();
    }

    private void prefetchFirstBuildBatch() {
        this.rightUpstream = this.prefetchFirstBatch(this.rightUpstream, this.prefetchedBuild, this.buildSideIsEmpty, 1, this.buildBatch, () -> {
            this.batchMemoryManager.update(1, 0, true);
            RecordBatchStats.logRecordBatchStats(RecordBatchStats.RecordBatchIOType.INPUT_RIGHT, this.batchMemoryManager.getRecordBatchSizer(1), this.getRecordBatchStatsContext());
        });
    }

    private void prefetchFirstProbeBatch() {
        this.leftUpstream = this.prefetchFirstBatch(this.leftUpstream, this.prefetchedProbe, this.probeSideIsEmpty, 0, this.probeBatch, () -> {
            this.batchMemoryManager.update(0, 0);
            RecordBatchStats.logRecordBatchStats(RecordBatchStats.RecordBatchIOType.INPUT_LEFT, this.batchMemoryManager.getRecordBatchSizer(0), this.getRecordBatchStatsContext());
        });
    }

    private RecordBatch.IterOutcome prefetchFirstBatch(RecordBatch.IterOutcome outcome, MutableBoolean prefetched, MutableBoolean isEmpty, int index, RecordBatch batch, Runnable memoryManagerUpdate) {
        if (prefetched.booleanValue()) {
            return outcome;
        }
        prefetched.setValue(true);
        if (outcome != RecordBatch.IterOutcome.NONE) {
            outcome = this.sniffNonEmptyBatch(outcome, index, batch);
        }
        isEmpty.setValue(outcome == RecordBatch.IterOutcome.NONE);
        if (this.spilledState.isFirstCycle()) {
            memoryManagerUpdate.run();
        }
        this.state = AbstractRecordBatch.BatchState.FIRST;
        return outcome;
    }

    private RecordBatch.IterOutcome sniffNonEmptyBatch(RecordBatch.IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
        block5: while (true) {
            if (recordBatch.getRecordCount() != 0) {
                return curr;
            }
            curr = this.next(inputIndex, recordBatch);
            switch (curr) {
                case OK: {
                    continue block5;
                }
                case NOT_YET: {
                    continue block5;
                }
                case EMIT: {
                    throw new UnsupportedOperationException("We do not support " + (Object)((Object)RecordBatch.IterOutcome.EMIT));
                }
            }
            break;
        }
        return curr;
    }

    public HashJoinMemoryCalculator getCalculatorImpl() {
        if (this.maxBatchesInMemory == 0) {
            double safetyFactor = this.context.getOptions().getDouble("exec.hashjoin.safety_factor");
            double fragmentationFactor = this.context.getOptions().getDouble("exec.hashjoin.fragmentation_factor");
            double hashTableDoublingFactor = this.context.getOptions().getDouble("exec.hashjoin.hash_double_factor");
            String hashTableCalculatorType = this.context.getOptions().getString("exec.hashjoin.hash_table_calc_type");
            return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType, this.semiJoin);
        }
        return new HashJoinMechanicalMemoryCalculator(this.maxBatchesInMemory);
    }

    /*
     * WARNING - Removed back jump from a try to a catch block - possible behaviour change.
     * Unable to fully structure code
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Override
    public RecordBatch.IterOutcome innerNext() {
        if (this.wasKilled) {
            this.cleanup();
            return RecordBatch.IterOutcome.NONE;
        }
        this.prefetchFirstBuildBatch();
        if (this.rightUpstream.isError()) {
            return this.rightUpstream;
        }
        isExistException = false;
        try {
            if (this.state != AbstractRecordBatch.BatchState.FIRST) ** GOTO lbl48
            var2_2 = this.executeBuildPhase();
            if (var2_2 == null) ** GOTO lbl-1000
            var3_13 = var2_2;
            if (this.buildBatch != null) {
            }
            ** GOTO lbl-1000
        }
        catch (SchemaChangeException var2_10) {
            try {
                throw UserException.schemaChangeError(var2_10).build(this.logger);
                catch (OutOfMemoryException var2_11) {
                    isExistException = true;
                    throw UserException.memoryError(var2_11).build(this.logger);
                }
                catch (Exception var2_12) {
                    isExistException = true;
                    throw UserException.executionError(var2_12).build(this.logger);
                }
            }
            catch (Throwable var6_25) {
                isReleaseBuildBatch = this.buildBatch != null && this.buildBatch instanceof SpilledRecordBatch != false;
                v0 = isReleaseProbeBatch = this.probeBatch != null && this.probeBatch instanceof SpilledRecordBatch != false;
                if (isExistException && isReleaseBuildBatch) {
                    this.buildBatch.cancel();
                }
                if (isExistException && isReleaseProbeBatch) {
                    this.probeBatch.cancel();
                }
                throw var6_25;
            }
        }
        if (this.buildBatch instanceof SpilledRecordBatch) {
            v1 = true;
        } else lbl-1000:
        // 2 sources

        {
            v1 = false;
        }
        isReleaseBuildBatch = v1;
        v2 = isReleaseProbeBatch = this.probeBatch != null && this.probeBatch instanceof SpilledRecordBatch != false;
        if (isExistException && isReleaseBuildBatch) {
            this.buildBatch.cancel();
        }
        if (isExistException && isReleaseProbeBatch) {
            this.probeBatch.cancel();
        }
        return var3_13;
lbl-1000:
        // 1 sources

        {
            this.buildComplete = true;
            if (this.isRowKeyJoin) {
                this.leftUpstream = this.next(this.left);
            }
            this.updateStats();
lbl48:
            // 2 sources

            if (this.buildSideIsEmpty.booleanValue() && !this.joinIsLeftOrFull) ** GOTO lbl-1000
            this.prefetchFirstProbeBatch();
            if (!this.leftUpstream.isError() && (this.leftUpstream != RecordBatch.IterOutcome.NONE || this.joinIsRightOrFull)) ** GOTO lbl-1000
            var2_3 = this.leftUpstream;
        }
        isReleaseBuildBatch = this.buildBatch != null && this.buildBatch instanceof SpilledRecordBatch != false;
        v3 = isReleaseProbeBatch = this.probeBatch != null && this.probeBatch instanceof SpilledRecordBatch != false;
        if (isExistException && isReleaseBuildBatch) {
            this.buildBatch.cancel();
        }
        if (isExistException && isReleaseProbeBatch) {
            this.probeBatch.cancel();
        }
        return var2_3;
lbl-1000:
        // 1 sources

        {
            if (this.buildSideIsEmpty.booleanValue() && this.probeSideIsEmpty.booleanValue()) ** GOTO lbl-1000
            if (this.state == AbstractRecordBatch.BatchState.FIRST) {
                this.setupProbe();
            }
            this.batchMemoryManager.allocateVectors(this.container);
            this.probe.setTargetOutputCount(this.batchMemoryManager.getOutputRowCount());
            this.outputRecords = this.probe.probeAndProject();
            this.container.setValueCount(this.outputRecords);
            this.batchMemoryManager.updateOutgoingStats(this.outputRecords);
            RecordBatchStats.logRecordBatchStats(RecordBatchStats.RecordBatchIOType.OUTPUT, this, this.getRecordBatchStatsContext());
            if (this.outputRecords <= 0 && this.state != AbstractRecordBatch.BatchState.FIRST) ** GOTO lbl-1000
            this.state = AbstractRecordBatch.BatchState.NOT_FIRST;
            var2_4 = RecordBatch.IterOutcome.OK;
        }
        isReleaseBuildBatch = this.buildBatch != null && this.buildBatch instanceof SpilledRecordBatch != false;
        v4 = isReleaseProbeBatch = this.probeBatch != null && this.probeBatch instanceof SpilledRecordBatch != false;
        if (isExistException && isReleaseBuildBatch) {
            this.buildBatch.cancel();
        }
        if (isExistException && isReleaseProbeBatch) {
            this.probeBatch.cancel();
        }
        return var2_4;
lbl-1000:
        // 3 sources

        {
            for (HashPartition partn : this.partitions) {
                partn.cleanup(false);
            }
            if (this.buildSideIsEmpty.booleanValue()) ** GOTO lbl109
            while (!this.spilledState.isEmpty()) {
                var2_7 = this.spilledState.getNextSpilledPartition();
                if (SpilledPartition.access$100(var2_7) == 0 && !this.joinIsRightOrFull) continue;
                this.buildBatch = new SpilledRecordBatch(SpilledPartition.access$200(var2_7), SpilledPartition.access$300(var2_7), this.context, this.buildSchema, this.oContext, this.spillSet);
                this.rightUpstream = ((SpilledRecordBatch)this.buildBatch).getInitialOutcome();
                if (SpilledPartition.access$100(var2_7) > 0) {
                    this.probeBatch = new SpilledRecordBatch(SpilledPartition.access$400(var2_7), SpilledPartition.access$100(var2_7), this.context, this.probeSchema, this.oContext, this.spillSet);
                    this.leftUpstream = ((SpilledRecordBatch)this.probeBatch).getInitialOutcome();
                } else {
                    this.probeBatch = this.left;
                    this.leftUpstream = RecordBatch.IterOutcome.NONE;
                    this.probe.changeToFinalProbeState();
                }
                this.spilledState.updateCycle(this.stats, var2_7, this.spilledStateUpdater);
                this.state = AbstractRecordBatch.BatchState.FIRST;
                this.prefetchedBuild.setValue(false);
                this.prefetchedProbe.setValue(false);
                isReleaseBuildBatch = this.innerNext();
            }
            ** GOTO lbl109
        }
        {
            isReleaseBuildBatch = this.buildBatch != null && this.buildBatch instanceof SpilledRecordBatch != false;
            v5 = isReleaseProbeBatch = this.probeBatch != null && this.probeBatch instanceof SpilledRecordBatch != false;
            if (isExistException && isReleaseBuildBatch) {
                this.buildBatch.cancel();
            }
            if (isExistException && isReleaseProbeBatch) {
                this.probeBatch.cancel();
            }
            return isReleaseBuildBatch;
            break;
        }
lbl-1000:
        // 1 sources

        {
            this.killAndDrainLeftUpstream();
lbl109:
            // 3 sources

            this.state = AbstractRecordBatch.BatchState.DONE;
            this.cleanup();
            var2_9 = RecordBatch.IterOutcome.NONE;
        }
        isReleaseBuildBatch = this.buildBatch != null && this.buildBatch instanceof SpilledRecordBatch != false ? 1 : 0;
        v6 = isReleaseProbeBatch = this.probeBatch != null && this.probeBatch instanceof SpilledRecordBatch != false ? 1 : 0;
        if (isExistException && isReleaseBuildBatch != 0) {
            this.buildBatch.cancel();
        }
        if (isExistException && isReleaseProbeBatch != 0) {
            this.probeBatch.cancel();
        }
        return var2_9;
    }

    private void killAndDrainUpstream(RecordBatch batch, RecordBatch.IterOutcome upstream, boolean isLeft) {
        batch.cancel();
        while (upstream == RecordBatch.IterOutcome.OK_NEW_SCHEMA || upstream == RecordBatch.IterOutcome.OK) {
            VectorAccessibleUtilities.clear(batch);
            upstream = this.next(isLeft ? 0 : 1, batch);
        }
    }

    private void killAndDrainLeftUpstream() {
        this.killAndDrainUpstream(this.probeBatch, this.leftUpstream, true);
    }

    private void killAndDrainRightUpstream() {
        this.killAndDrainUpstream(this.buildBatch, this.rightUpstream, false);
    }

    private void setupHashTable() {
        if (this.skipHashTableBuild) {
            return;
        }
        HashTableConfig htConfig = this.buildHashTableConfig();
        this.baseHashTable = new ChainedHashTable(htConfig, this.context, this.allocator, this.buildBatch, this.probeBatch, null);
        if (this.enableRuntimeFilter) {
            this.setupHash64(htConfig);
        }
    }

    private void setupHash64(HashTableConfig htConfig) {
        LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()];
        ErrorCollectorImpl collector = new ErrorCollectorImpl();
        int i = 0;
        for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
            LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), this.buildBatch, collector, this.context.getFunctionRegistry());
            collector.reportErrors(this.logger);
            if (expr == null) continue;
            keyExprsBuild[i] = expr;
            ++i;
        }
        i = 0;
        boolean missingField = false;
        TypedFieldId[] buildSideTypeFieldIds = new TypedFieldId[keyExprsBuild.length];
        for (NamedExpression namedExpression : htConfig.getKeyExprsBuild()) {
            SchemaPath schemaPath = (SchemaPath)namedExpression.getExpr();
            TypedFieldId typedFieldId = this.buildBatch.getValueVectorId(schemaPath);
            if (typedFieldId == null) {
                missingField = true;
                break;
            }
            buildSideTypeFieldIds[i] = typedFieldId;
            ++i;
        }
        if (missingField) {
            this.logger.info("As some build side key fields not found, runtime filter was disabled");
            this.enableRuntimeFilter = false;
            return;
        }
        List<BloomFilterDef> bloomFilterDefs = this.runtimeFilterDef.getBloomFilterDefs();
        for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
            String buildField = bloomFilterDef.getBuildField();
            SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
            TypedFieldId typedFieldId = this.buildBatch.getValueVectorId(schemaPath);
            if (typedFieldId == null) {
                missingField = true;
                break;
            }
            int fieldId = typedFieldId.getFieldIds()[0];
            this.bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
        }
        if (missingField) {
            this.logger.info("As some build side key fields not found, runtime filter was disabled");
            this.enableRuntimeFilter = false;
            return;
        }
        ValueVectorHashHelper valueVectorHashHelper = new ValueVectorHashHelper(this.buildBatch, this.context);
        try {
            this.hash64 = valueVectorHashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
        }
        catch (Exception e) {
            throw UserException.internalError(e).message("Failed to construct a field's hash64 dynamic codes", new Object[0]).build(this.logger);
        }
    }

    private void delayedSetup() {
        this.spilledState.initialize(this.numPartitions);
        this.partitions = new HashPartition[this.numPartitions];
    }

    private void initializeBuild() {
        this.baseHashTable.updateIncoming(this.buildBatch, this.probeBatch);
        for (int part = 0; part < this.numPartitions; ++part) {
            this.partitions[part] = new HashPartition(this.context, this.allocator, this.baseHashTable, this.buildBatch, this.probeBatch, this.semiJoin, this.RECORDS_PER_BATCH, this.spillSet, part, this.spilledState.getCycle(), this.numPartitions);
        }
        this.spilledInners = new SpilledPartition[this.numPartitions];
    }

    private void initializeRuntimeFilter() {
        if (!this.enableRuntimeFilter || this.bloomFiltersGenerated) {
            return;
        }
        this.runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext)this.context);
        if (this.runtimeFilterDef != null) {
            List<BloomFilterDef> bloomFilterDefs = this.runtimeFilterDef.getBloomFilterDefs();
            for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
                int buildFieldId = this.bloomFilterDef2buildId.get(bloomFilterDef);
                int numBytes = bloomFilterDef.getNumBytes();
                String probeField = bloomFilterDef.getProbeField();
                this.probeFields.add(probeField);
                BloomFilter bloomFilter = new BloomFilter(numBytes, this.context.getAllocator());
                this.bloomFilters.add(bloomFilter);
                this.bloomFilter2buildId.put(bloomFilter, buildFieldId);
            }
        }
        this.bloomFiltersGenerated = true;
    }

    private HashJoinMemoryCalculator.BuildSidePartitioning partitionNumTuning(int maxBatchSize, HashJoinMemoryCalculator.BuildSidePartitioning buildCalc) {
        this.numPartitions = buildCalc.getNumPartitions();
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(buildCalc.makeDebugString());
        }
        if (buildCalc.getMaxReservedMemory() > this.allocator.getLimit()) {
            String message = String.format("When using the minimum number of partitions %d we require %s memory but only have %s available. Forcing legacy behavior of using unbounded memory in order to prevent regressions.", this.numPartitions, FileUtils.byteCountToDisplaySize((long)buildCalc.getMaxReservedMemory()), FileUtils.byteCountToDisplaySize((long)this.allocator.getLimit()));
            this.logger.warn(message);
            HashJoinMemoryCalculator calc = this.getCalculatorImpl();
            calc.initialize(false);
            buildCalc = (HashJoinMemoryCalculator.BuildSidePartitioning)calc.next();
            buildCalc.initialize(true, true, this.buildBatch, this.probeBatch, this.buildJoinColumns, this.leftUpstream == RecordBatch.IterOutcome.NONE, this.allocator.getLimit(), this.numPartitions, this.RECORDS_PER_BATCH, this.RECORDS_PER_BATCH, maxBatchSize, maxBatchSize, this.batchMemoryManager.getOutputBatchSize(), 0.75);
            this.disableSpilling(null);
        }
        return buildCalc;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private void disableSpilling(String reason) {
        if (reason == null) {
            boolean fallbackEnabled = this.context.getOptions().getBoolean("drill.exec.hashjoin.fallback.enabled");
            if (!fallbackEnabled) throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for HashJoin to use unbounded memory is disabled.\nEither enable fallback option %s using ALTER SESSION/SYSTEM command or increase the memory limit for the Drillbit", "drill.exec.hashjoin.fallback.enabled"), new Object[0]).build(this.logger);
            this.logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back to use unbounded memory");
        } else {
            this.logger.warn(reason);
        }
        this.numPartitions = 1;
        this.canSpill = false;
        this.allocator.setLimit(AbstractBase.MAX_ALLOCATION);
    }

    public RecordBatch.IterOutcome executeBuildPhase() throws SchemaChangeException {
        if (this.buildSideIsEmpty.booleanValue()) {
            return null;
        }
        if (this.skipHashTableBuild) {
            this.killAndDrainRightUpstream();
            return null;
        }
        int maxBatchSize = this.spilledState.isFirstCycle() ? 65536 : this.RECORDS_PER_BATCH;
        boolean doMemoryCalculation = this.canSpill && !this.probeSideIsEmpty.booleanValue();
        HashPartition[] calc = this.getCalculatorImpl();
        calc.initialize(doMemoryCalculation);
        HashJoinMemoryCalculator.BuildSidePartitioning buildCalc = (HashJoinMemoryCalculator.BuildSidePartitioning)calc.next();
        buildCalc.initialize(this.spilledState.isFirstCycle(), true, this.buildBatch, this.probeBatch, this.buildJoinColumns, this.probeSideIsEmpty.booleanValue(), this.allocator.getLimit(), this.numPartitions, this.RECORDS_PER_BATCH, this.RECORDS_PER_BATCH, maxBatchSize, maxBatchSize, this.batchMemoryManager.getOutputBatchSize(), 0.75);
        if (this.spilledState.isFirstCycle() && doMemoryCalculation) {
            buildCalc = this.partitionNumTuning(maxBatchSize, buildCalc);
        }
        if (this.spilledState.isFirstCycle()) {
            this.delayedSetup();
        }
        this.initializeBuild();
        this.initializeRuntimeFilter();
        HashJoinMemoryCalculator.PartitionStatSet partitionStatSet = new HashJoinMemoryCalculator.PartitionStatSet(this.partitions);
        buildCalc.setPartitionStatSet(partitionStatSet);
        boolean moreData = true;
        block7: while (moreData) {
            switch (this.rightUpstream) {
                case NOT_YET: 
                case NONE: {
                    moreData = false;
                    continue block7;
                }
                case OK_NEW_SCHEMA: {
                    if (!this.buildSchema.equals(this.buildBatch.getSchema())) {
                        throw SchemaChangeException.schemaChanged(this.getClass().getSimpleName() + " does not support schema changes in build side.", this.buildSchema, this.buildBatch.getSchema());
                    }
                    for (HashPartition partn : this.partitions) {
                        partn.updateBatches();
                    }
                }
                case OK: {
                    this.batchMemoryManager.update(this.buildBatch, 1, 0, true);
                    int currentRecordCount = this.buildBatch.getRecordCount();
                    if (this.spilledState.isFirstCycle() && this.enableRuntimeFilter) {
                        for (BloomFilter bloomFilter : this.bloomFilter2buildId.keySet()) {
                            int fieldId = this.bloomFilter2buildId.get(bloomFilter);
                            for (int ind = 0; ind < currentRecordCount; ++ind) {
                                long hashCode = this.hash64.hash64Code(ind, 0, fieldId);
                                bloomFilter.insert(hashCode);
                            }
                        }
                    }
                    if (this.numPartitions == 1) {
                        this.partitions[0].appendBatch(this.buildBatch);
                        break;
                    }
                    if (!this.spilledState.isFirstCycle()) {
                        this.read_right_HV_vector = (IntVector)this.buildBatch.getContainer().getLast();
                    }
                    for (int ind = 0; ind < currentRecordCount; ++ind) {
                        int hashCode = this.spilledState.isFirstCycle() ? this.partitions[0].getBuildHashCode(ind) : this.read_right_HV_vector.getAccessor().get(ind);
                        int currPart = hashCode & this.spilledState.getPartitionMask();
                        this.partitions[currPart].appendInnerRow(this.buildBatch.getContainer(), ind, hashCode >>>= this.spilledState.getBitsInMask(), buildCalc);
                    }
                    if (this.read_right_HV_vector == null) break;
                    this.read_right_HV_vector.clear();
                    this.read_right_HV_vector = null;
                    break;
                }
                default: {
                    throw new IllegalStateException(this.rightUpstream.name());
                }
            }
            this.rightUpstream = this.next(1, this.buildBatch);
        }
        if (this.spilledState.isFirstCycle() && this.enableRuntimeFilter && this.bloomFilter2buildId.size() > 0) {
            int hashJoinOpId = this.popConfig.getOperatorId();
            this.runtimeFilterReporter.sendOut(this.bloomFilters, this.probeFields, this.runtimeFilterDef, hashJoinOpId);
        }
        if (this.numPartitions > 1) {
            for (HashPartition partn : this.partitions) {
                partn.completeAnInnerBatch(false, partn.isSpilled());
            }
        }
        this.prefetchFirstProbeBatch();
        if (this.leftUpstream.isError()) {
            return this.leftUpstream;
        }
        HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = (HashJoinMemoryCalculator.PostBuildCalculations)buildCalc.next();
        postBuildCalc.initialize(this.probeSideIsEmpty.booleanValue());
        for (int index = 0; index < this.partitions.length; ++index) {
            HashPartition partn = this.partitions[index];
            if (partn.isSpilled()) continue;
            try {
                if (postBuildCalc.shouldSpill()) {
                    partn.spillThisPartition();
                    continue;
                }
                partn.buildContainersHashTableAndHelper();
                continue;
            }
            catch (OutOfMemoryException e) {
                String message = "Failed building hash table on partition " + index + ":\n" + this.makeDebugString() + "\n" + postBuildCalc.makeDebugString();
                throw new OutOfMemoryException(message, e);
            }
        }
        if (this.logger.isDebugEnabled()) {
            this.logger.debug(postBuildCalc.makeDebugString());
        }
        for (HashPartition partn : this.partitions) {
            if (!partn.isSpilled()) continue;
            SpilledPartition sp = new SpilledPartition(this.spilledState.getCycle(), partn.getPartitionNum(), -1, partn.getPartitionBatchesCount(), partn.getSpillFile());
            this.spilledState.addPartition(sp);
            this.spilledInners[partn.getPartitionNum()] = sp;
            partn.closeWriter();
            partn.updateProbeRecordsPerBatch(postBuildCalc.getProbeRecordsPerBatch());
        }
        return null;
    }

    private void setupOutputContainerSchema() {
        TypeProtos.MajorType outputType;
        TypeProtos.MajorType inputType;
        if (this.buildSchema != null && !this.semiJoin) {
            for (MaterializedField field : this.buildSchema) {
                inputType = field.getType();
                outputType = this.joinIsLeftOrFull && inputType.getMode() == TypeProtos.DataMode.REQUIRED && inputType.getMinorType() != TypeProtos.MinorType.MAP ? Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL) : inputType;
                MaterializedField projected = field.withType(outputType);
                this.container.addOrGet(projected);
            }
        }
        if (this.probeSchema != null) {
            for (VectorWrapper vv : this.probeBatch) {
                inputType = vv.getField().getType();
                outputType = this.joinIsRightOrFull && inputType.getMode() == TypeProtos.DataMode.REQUIRED && inputType.getMinorType() != TypeProtos.MinorType.MAP ? Types.overrideMode(inputType, TypeProtos.DataMode.OPTIONAL) : inputType;
                Object v = this.container.addOrGet(MaterializedField.create(vv.getField().getName(), outputType));
                if (!(v instanceof AbstractContainerVector)) continue;
                vv.getValueVector().makeTransferPair((ValueVector)v);
                v.clear();
            }
        }
    }

    public boolean isSpilledInner(int part) {
        if (this.spilledInners == null) {
            return false;
        }
        return this.spilledInners[part] != null;
    }

    public AbstractHashBinaryRecordBatch(T popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
        super(popConfig, context, true, left, right);
        this.buildBatch = right;
        this.probeBatch = left;
        this.popConfig = popConfig;
        this.allocator = this.oContext.getAllocator();
        this.numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
        if (this.numPartitions == 1) {
            this.disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1");
        }
        this.numPartitions = BaseAllocator.nextPowerOfTwo(this.numPartitions);
        long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
        if (memLimit != 0L) {
            this.allocator.setLimit(memLimit);
        }
        this.RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
        this.maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
        this.logger.info("Memory limit {} bytes", (Object)FileUtils.byteCountToDisplaySize((long)this.allocator.getLimit()));
        this.spillSet = new SpillSet(context, (PhysicalOperator)popConfig);
        this.partitions = new HashPartition[0];
        int configuredBatchSize = (int)context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
        double avail_mem_factor = context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
        int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)((double)this.allocator.getLimit() * avail_mem_factor)));
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "configured output batch size: %d, allocated memory %d, avail mem factor %f, output batch size: %d", configuredBatchSize, this.allocator.getLimit(), avail_mem_factor, outputBatchSize);
        this.batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right, new HashSet<String>());
        RecordBatchStats.printConfiguredBatchSize(this.getRecordBatchStatsContext(), configuredBatchSize);
    }

    private void cleanup() {
        if (this.buildSideIsEmpty.booleanValue()) {
            return;
        }
        if (this.spillSet.getWriteBytes() > 0L) {
            this.stats.setLongStat(Metric.SPILL_MB, (int)Math.round((double)this.spillSet.getWriteBytes() / 1024.0 / 1024.0));
        }
        for (HashPartition partn : this.partitions) {
            if (partn == null) continue;
            partn.close();
        }
        while (!this.spilledState.isEmpty()) {
            SpilledPartition sp = this.spilledState.getNextSpilledPartition();
            try {
                this.spillSet.delete(sp.innerSpillFile);
            }
            catch (IOException e) {
                this.logger.warn("Cleanup: Failed to delete spill file {}", (Object)sp.innerSpillFile);
            }
            try {
                if (sp.outerSpillFile == null) continue;
                this.spillSet.delete(sp.outerSpillFile);
            }
            catch (IOException e) {
                this.logger.warn("Cleanup: Failed to delete spill file {}", (Object)sp.outerSpillFile);
            }
        }
        this.spillSet.close();
    }

    public String makeDebugString() {
        StringBuilder sb = new StringBuilder();
        for (int partitionIndex = 0; partitionIndex < this.partitions.length; ++partitionIndex) {
            String partitionPrefix = "Partition " + partitionIndex + ": ";
            HashPartition hashPartition = this.partitions[partitionIndex];
            sb.append(partitionPrefix).append(hashPartition.makeDebugString()).append("\n");
        }
        return sb.toString();
    }

    private void updateStats() {
        if (this.buildSideIsEmpty.booleanValue()) {
            return;
        }
        if (!this.spilledState.isFirstCycle()) {
            return;
        }
        HashTableStats htStats = new HashTableStats();
        long numSpilled = 0L;
        HashTableStats newStats = new HashTableStats();
        for (HashPartition partn : this.partitions) {
            if (partn.isSpilled()) {
                ++numSpilled;
            }
            partn.getStats(newStats);
            htStats.addStats(newStats);
        }
        this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
        this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
        this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
        this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
        this.stats.setLongStat(Metric.NUM_PARTITIONS, this.numPartitions);
        this.stats.setLongStat(Metric.SPILL_CYCLE, this.spilledState.getCycle());
        this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
    }

    @Override
    protected void cancelIncoming() {
        this.wasKilled = true;
        this.probeBatch.cancel();
        this.buildBatch.cancel();
    }

    public void updateMetrics() {
        this.stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, this.batchMemoryManager.getNumIncomingBatches(0));
        this.stats.setLongStat(Metric.LEFT_AVG_INPUT_BATCH_BYTES, this.batchMemoryManager.getAvgInputBatchSize(0));
        this.stats.setLongStat(Metric.LEFT_AVG_INPUT_ROW_BYTES, this.batchMemoryManager.getAvgInputRowWidth(0));
        this.stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, this.batchMemoryManager.getTotalInputRecords(0));
        this.stats.setLongStat(Metric.RIGHT_INPUT_BATCH_COUNT, this.batchMemoryManager.getNumIncomingBatches(1));
        this.stats.setLongStat(Metric.RIGHT_AVG_INPUT_BATCH_BYTES, this.batchMemoryManager.getAvgInputBatchSize(1));
        this.stats.setLongStat(Metric.RIGHT_AVG_INPUT_ROW_BYTES, this.batchMemoryManager.getAvgInputRowWidth(1));
        this.stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, this.batchMemoryManager.getTotalInputRecords(1));
        this.stats.setLongStat(Metric.OUTPUT_BATCH_COUNT, this.batchMemoryManager.getNumOutgoingBatches());
        this.stats.setLongStat(Metric.AVG_OUTPUT_BATCH_BYTES, this.batchMemoryManager.getAvgOutputBatchSize());
        this.stats.setLongStat(Metric.AVG_OUTPUT_ROW_BYTES, this.batchMemoryManager.getAvgOutputRowWidth());
        this.stats.setLongStat(Metric.OUTPUT_RECORD_COUNT, this.batchMemoryManager.getTotalOutputRecords());
    }

    @Override
    public void close() {
        if (!this.spilledState.isFirstCycle()) {
            // empty if block
        }
        this.updateMetrics();
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "incoming aggregate left: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumIncomingBatches(0), this.batchMemoryManager.getAvgInputBatchSize(0), this.batchMemoryManager.getAvgInputRowWidth(0), this.batchMemoryManager.getTotalInputRecords(0));
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "incoming aggregate right: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumIncomingBatches(1), this.batchMemoryManager.getAvgInputBatchSize(1), this.batchMemoryManager.getAvgInputRowWidth(1), this.batchMemoryManager.getTotalInputRecords(1));
        RecordBatchStats.logRecordBatchStats(this.getRecordBatchStatsContext(), "outgoing aggregate: batch count : %d, avg bytes : %d,  avg row bytes : %d, record count : %d", this.batchMemoryManager.getNumOutgoingBatches(), this.batchMemoryManager.getAvgOutputBatchSize(), this.batchMemoryManager.getAvgOutputRowWidth(), this.batchMemoryManager.getTotalOutputRecords());
        this.cleanup();
        super.close();
    }

    public abstract Probe createProbe();

    public abstract void setupProbe() throws SchemaChangeException;

    protected abstract HashTableConfig buildHashTableConfig();

    public static class SpilledPartition
    extends AbstractSpilledPartitionMetadata {
        private final int innerSpilledBatches;
        private final String innerSpillFile;
        private int outerSpilledBatches;
        private String outerSpillFile;
        private boolean updatedOuter;

        public SpilledPartition(int cycle, int originPartition, int prevOriginPartition, int innerSpilledBatches, String innerSpillFile) {
            super(cycle, originPartition, prevOriginPartition);
            this.innerSpilledBatches = innerSpilledBatches;
            this.innerSpillFile = innerSpillFile;
        }

        public int getInnerSpilledBatches() {
            return this.innerSpilledBatches;
        }

        public String getInnerSpillFile() {
            return this.innerSpillFile;
        }

        public int getOuterSpilledBatches() {
            Preconditions.checkState(this.updatedOuter);
            return this.outerSpilledBatches;
        }

        public String getOuterSpillFile() {
            Preconditions.checkState(this.updatedOuter);
            return this.outerSpillFile;
        }

        public void updateOuter(int outerSpilledBatches, String outerSpillFile) {
            Preconditions.checkState(!this.updatedOuter);
            this.updatedOuter = true;
            this.outerSpilledBatches = outerSpilledBatches;
            this.outerSpillFile = outerSpillFile;
        }

        @Override
        public String makeDebugString() {
            return String.format("Start reading spilled partition %d (prev %d) from cycle %d (with %d-%d batches).", this.getOriginPartition(), this.getPrevOriginPartition(), this.getCycle(), this.outerSpilledBatches, this.innerSpilledBatches);
        }

        static /* synthetic */ int access$100(SpilledPartition x0) {
            return x0.outerSpilledBatches;
        }

        static /* synthetic */ int access$300(SpilledPartition x0) {
            return x0.innerSpilledBatches;
        }
    }

    public class Updater
    implements SpilledState.Updater {
        @Override
        public void cleanup() {
            AbstractHashBinaryRecordBatch.this.cleanup();
        }

        @Override
        public String getFailureMessage() {
            return this.getClass().getName() + " can not partition the inner data any further (probably due to too many join-key duplicates).";
        }

        @Override
        public long getMemLimit() {
            return AbstractHashBinaryRecordBatch.this.allocator.getLimit();
        }

        @Override
        public boolean hasPartitionLimit() {
            return true;
        }
    }

    public static enum Metric implements MetricDef
    {
        NUM_BUCKETS,
        NUM_ENTRIES,
        NUM_RESIZING,
        RESIZING_TIME_MS,
        NUM_PARTITIONS,
        SPILLED_PARTITIONS,
        SPILL_MB,
        SPILL_CYCLE,
        LEFT_INPUT_BATCH_COUNT,
        LEFT_AVG_INPUT_BATCH_BYTES,
        LEFT_AVG_INPUT_ROW_BYTES,
        LEFT_INPUT_RECORD_COUNT,
        RIGHT_INPUT_BATCH_COUNT,
        RIGHT_AVG_INPUT_BATCH_BYTES,
        RIGHT_AVG_INPUT_ROW_BYTES,
        RIGHT_INPUT_RECORD_COUNT,
        OUTPUT_BATCH_COUNT,
        AVG_OUTPUT_BATCH_BYTES,
        AVG_OUTPUT_ROW_BYTES,
        OUTPUT_RECORD_COUNT;


        @Override
        public int metricId() {
            return this.ordinal();
        }
    }
}

