/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.aggregate.SumHashAggTestOperator;
import org.apache.flink.util.Collector;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class HashAggTest {
    private static final int MEMORY_SIZE = 0x2000000;
    private Map<Integer, Long> outputMap = new HashMap<Integer, Long>();
    private MemoryManager memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x2000000L).build();
    private IOManager ioManager;
    private SumHashAggTestOperator operator;

    @Before
    public void before() throws Exception {
        this.ioManager = new IOManagerAsync();
        this.operator = new SumHashAggTestOperator(0x140000L){

            @Override
            Object getOwner() {
                return HashAggTest.this;
            }

            @Override
            MemoryManager getMemoryManager() {
                return HashAggTest.this.memoryManager;
            }

            @Override
            Collector<StreamRecord<RowData>> getOutput() {
                return new Collector<StreamRecord<RowData>>(){

                    public void collect(StreamRecord<RowData> record) {
                        RowData row = (RowData)record.getValue();
                        HashAggTest.this.outputMap.put(row.isNullAt(0) ? null : Integer.valueOf(row.getInt(0)), row.isNullAt(1) ? null : Long.valueOf(row.getLong(1)));
                    }

                    public void close() {
                    }
                };
            }

            @Override
            Configuration getConf() {
                return new Configuration();
            }

            @Override
            public IOManager getIOManager() {
                return HashAggTest.this.ioManager;
            }
        };
        this.operator.open();
    }

    @After
    public void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memoryManager.verifyEmpty()).as("Memory leak: not all segments have been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    private void addRow(RowData row) throws Exception {
        this.operator.processElement((StreamRecord<RowData>)new StreamRecord((Object)row));
    }

    @Test
    public void testNormal() throws Exception {
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{1, 1L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{5, 2L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{2, 3L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{2, null}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{1, 4L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{4, 5L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{1, 6L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{1, null}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{2, 8L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{5, 9L}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{10, null}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{null, 5L}));
        this.operator.endInput();
        this.operator.close();
        HashMap<Integer, Long> expected = new HashMap<Integer, Long>();
        expected.put(null, 5L);
        expected.put(1, 11L);
        expected.put(2, 11L);
        expected.put(4, 5L);
        expected.put(5, 11L);
        expected.put(10, null);
        Assertions.assertThat(this.outputMap).isEqualTo(expected);
    }

    @Test
    public void testSpill() throws Exception {
        for (int i = 0; i < 30000; ++i) {
            this.addRow((RowData)GenericRowData.of((Object[])new Object[]{i, (long)i}));
            this.addRow((RowData)GenericRowData.of((Object[])new Object[]{i + 1, (long)i}));
        }
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{1, null}));
        this.addRow((RowData)GenericRowData.of((Object[])new Object[]{null, 5L}));
        this.operator.endInput();
        this.operator.close();
        Assertions.assertThat(this.outputMap).hasSize(30002);
    }
}

