/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate;

import java.io.EOFException;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.binary.BinaryRowDataUtil;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.aggregate.BytesHashMapSpillMemorySegmentPool;
import org.apache.flink.table.runtime.operators.sort.BufferedKVExternalSorter;
import org.apache.flink.table.runtime.operators.sort.IntNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.KeyValueIterator;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.runtime.util.collections.binary.BytesHashMap;
import org.apache.flink.table.runtime.util.collections.binary.BytesMap;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;

public class SumHashAggTestOperator
extends AbstractStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData> {
    private final long memorySize;
    private final LogicalType[] keyTypes = new LogicalType[]{new IntType()};
    private final LogicalType[] aggBufferTypes = new LogicalType[]{new IntType(), new BigIntType()};
    private transient BinaryRowData currentKey;
    private transient BinaryRowWriter currentKeyWriter;
    private transient BufferedKVExternalSorter sorter;
    private transient BytesHashMap aggregateMap;
    private transient BinaryRowData emptyAggBuffer;

    public SumHashAggTestOperator(long memorySize) throws Exception {
        this.memorySize = memorySize;
    }

    public void open() throws Exception {
        super.open();
        this.aggregateMap = new BytesHashMap(this.getOwner(), this.getMemoryManager(), this.memorySize, this.keyTypes, this.aggBufferTypes);
        this.currentKey = new BinaryRowData(1);
        this.currentKeyWriter = new BinaryRowWriter(this.currentKey);
        this.emptyAggBuffer = new BinaryRowData(1);
        BinaryRowWriter emptyAggBufferWriter = new BinaryRowWriter(this.emptyAggBuffer);
        emptyAggBufferWriter.reset();
        emptyAggBufferWriter.setNullAt(0);
        emptyAggBufferWriter.complete();
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData in1 = (RowData)element.getValue();
        this.currentKeyWriter.reset();
        if (in1.isNullAt(0)) {
            this.currentKeyWriter.setNullAt(0);
        } else {
            this.currentKeyWriter.writeInt(0, in1.getInt(0));
        }
        this.currentKeyWriter.complete();
        BytesMap.LookupInfo lookupInfo = this.aggregateMap.lookup((Object)this.currentKey);
        BinaryRowData currentAggBuffer = (BinaryRowData)lookupInfo.getValue();
        if (!lookupInfo.isFound()) {
            try {
                currentAggBuffer = this.aggregateMap.append(lookupInfo, this.emptyAggBuffer);
            }
            catch (EOFException exp) {
                if (this.sorter == null) {
                    this.sorter = new BufferedKVExternalSorter(this.getIOManager(), new BinaryRowDataSerializer(this.keyTypes.length), new BinaryRowDataSerializer(this.aggBufferTypes.length), (NormalizedKeyComputer)new IntNormalizedKeyComputer(), (RecordComparator)new IntRecordComparator(), this.getMemoryManager().getPageSize(), this.getConf());
                }
                this.sorter.sortAndSpill(this.aggregateMap.getRecordAreaMemorySegments(), this.aggregateMap.getNumElements(), (MemorySegmentPool)new BytesHashMapSpillMemorySegmentPool(this.aggregateMap.getBucketAreaMemorySegments()));
                this.aggregateMap.reset();
                lookupInfo = this.aggregateMap.lookup((Object)this.currentKey);
                try {
                    currentAggBuffer = this.aggregateMap.append(lookupInfo, this.emptyAggBuffer);
                }
                catch (EOFException e) {
                    throw new OutOfMemoryError("BytesHashMap Out of Memory.");
                }
            }
        }
        if (!in1.isNullAt(1)) {
            long sumInput = in1.getLong(1);
            if (currentAggBuffer.isNullAt(0)) {
                currentAggBuffer.setLong(0, sumInput);
            } else {
                currentAggBuffer.setLong(0, sumInput + currentAggBuffer.getLong(0));
            }
        }
    }

    public void endInput() throws Exception {
        StreamRecord outElement = new StreamRecord(null);
        JoinedRowData hashAggOutput = new JoinedRowData();
        GenericRowData aggValueOutput = new GenericRowData(1);
        if (this.sorter == null) {
            KeyValueIterator iter = this.aggregateMap.getEntryIterator(false);
            while (iter.advanceNext()) {
                aggValueOutput.setField(0, ((BinaryRowData)iter.getValue()).isNullAt(0) ? null : Long.valueOf(((BinaryRowData)iter.getValue()).getLong(0)));
                hashAggOutput.replace((RowData)iter.getKey(), (RowData)aggValueOutput);
                this.getOutput().collect((Object)outElement.replace((Object)hashAggOutput));
            }
        } else {
            Tuple2 kv;
            this.sorter.sortAndSpill(this.aggregateMap.getRecordAreaMemorySegments(), this.aggregateMap.getNumElements(), (MemorySegmentPool)new BytesHashMapSpillMemorySegmentPool(this.aggregateMap.getBucketAreaMemorySegments()));
            this.aggregateMap.free(true);
            BinaryRowData lastKey = null;
            JoinedRowData fallbackInput = new JoinedRowData();
            boolean aggSumIsNull = false;
            long aggSum = -1L;
            MutableObjectIterator iterator = this.sorter.getKVIterator();
            while ((kv = (Tuple2)iterator.next()) != null) {
                BinaryRowData key = (BinaryRowData)kv.f0;
                BinaryRowData value = (BinaryRowData)kv.f1;
                fallbackInput.replace((RowData)key, (RowData)value);
                if (lastKey == null) {
                    lastKey = key.copy();
                    aggSumIsNull = true;
                    aggSum = -1L;
                } else if (key.getSizeInBytes() != lastKey.getSizeInBytes() || !BinaryRowDataUtil.byteArrayEquals((byte[])key.getSegments()[0].getArray(), (byte[])lastKey.getSegments()[0].getArray(), (int)key.getSizeInBytes())) {
                    aggValueOutput.setField(0, (Object)(aggSumIsNull ? null : Long.valueOf(aggSum)));
                    hashAggOutput.replace((RowData)lastKey, (RowData)aggValueOutput);
                    this.getOutput().collect((Object)outElement.replace((Object)hashAggOutput));
                    lastKey = key.copy();
                    aggSumIsNull = true;
                    aggSum = -1L;
                }
                if (fallbackInput.isNullAt(1)) continue;
                long sumInput = fallbackInput.getLong(1);
                aggSum = aggSumIsNull ? sumInput : (aggSum += sumInput);
                aggSumIsNull = false;
            }
            aggValueOutput.setField(0, (Object)(aggSumIsNull ? null : Long.valueOf(aggSum)));
            hashAggOutput.replace(lastKey, (RowData)aggValueOutput);
            this.getOutput().collect((Object)outElement.replace((Object)hashAggOutput));
        }
    }

    public void close() throws Exception {
        super.close();
        this.aggregateMap.free();
        if (this.sorter != null) {
            this.sorter.close();
        }
    }

    Object getOwner() {
        return this.getContainingTask();
    }

    Collector<StreamRecord<RowData>> getOutput() {
        return this.output;
    }

    MemoryManager getMemoryManager() {
        return this.getContainingTask().getEnvironment().getMemoryManager();
    }

    Configuration getConf() {
        return this.getContainingTask().getJobConfiguration();
    }

    public IOManager getIOManager() {
        return this.getContainingTask().getEnvironment().getIOManager();
    }
}

