/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.over;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.AggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.over.ProcTimeRangeBoundedPrecedingFunction;
import org.apache.flink.table.runtime.operators.over.SumAggsHandleFunction;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class ProcTimeRangeBoundedPrecedingFunctionTest {
    private static GeneratedAggsHandleFunction aggsHandleFunction = new GeneratedAggsHandleFunction("Function", "", new Object[0]){

        public AggsHandleFunction newInstance(ClassLoader classLoader) {
            return new SumAggsHandleFunction(1);
        }
    };
    private LogicalType[] inputFieldTypes = new LogicalType[]{VarCharType.STRING_TYPE, new BigIntType()};
    private LogicalType[] accTypes = new LogicalType[]{new BigIntType()};
    private RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.inputFieldTypes);
    private TypeInformation<RowData> keyType = this.keySelector.getProducedType();

    @Test
    public void testStateCleanup() throws Exception {
        ProcTimeRangeBoundedPrecedingFunction function = new ProcTimeRangeBoundedPrecedingFunction(aggsHandleFunction, this.accTypes, this.inputFieldTypes, 2000L);
        KeyedProcessOperator operator = new KeyedProcessOperator((KeyedProcessFunction)function);
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createTestHarness((KeyedProcessOperator<RowData, RowData, RowData>)operator);
        testHarness.open();
        AbstractKeyedStateBackend stateBackend = (AbstractKeyedStateBackend)operator.getKeyedStateBackend();
        ((AbstractIntegerAssert)Assertions.assertThat((int)stateBackend.numKeyValueStateEntries()).as("Initial state is not empty", new Object[0])).isEqualTo(0);
        testHarness.setProcessingTime(100L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        testHarness.setProcessingTime(500L);
        testHarness.processElement(StreamRecordUtils.insertRecord("key", 1L));
        testHarness.setProcessingTime(1000L);
        testHarness.setProcessingTime(4000L);
        ((AbstractIntegerAssert)Assertions.assertThat((int)stateBackend.numKeyValueStateEntries()).as("State has not been cleaned up", new Object[0])).isEqualTo(0);
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(KeyedProcessOperator<RowData, RowData, RowData> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)this.keySelector, this.keyType);
    }
}

