/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class PythonGroupWindowAggregateJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        String srcTableDdl = "CREATE TABLE MyTable (\n a INT NOT NULL,\n b BIGINT,\n c VARCHAR,\n `rowtime` AS TO_TIMESTAMP(c),\n proctime as PROCTIME(),\n WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(srcTableDdl);
        this.tEnv.createTemporarySystemFunction("pyFunc", (UserDefinedFunction)new JavaUserDefinedAggFunctions.TestPythonAggregateFunction());
    }

    @Test
    public void testEventTimeTumbleWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n window_start TIMESTAMP(3),\n window_end TIMESTAMP(3),\n c BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  TUMBLE_START(rowtime, INTERVAL '5' SECOND) as window_start,\n  TUMBLE_END(rowtime, INTERVAL '5' SECOND) as window_end,\n  pyFunc(a, a + 1)\nFROM MyTable\nGROUP BY b, TUMBLE(rowtime, INTERVAL '5' SECOND)");
    }

    @Test
    public void testProcTimeTumbleWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n window_end TIMESTAMP(3),\n c BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  TUMBLE_END(proctime, INTERVAL '15' MINUTE) as window_end,\n  pyFunc(a, a + 1)\nFROM MyTable\nGROUP BY b, TUMBLE(proctime, INTERVAL '15' MINUTE)");
    }

    @Test
    public void testEventTimeHopWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n c BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  pyFunc(a, a + 1)\nFROM MyTable\nGROUP BY b, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");
    }

    @Test
    public void testProcTimeHopWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n c BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  pyFunc(a, a + 1)\nFROM MyTable\nGROUP BY b, HOP(proctime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)");
    }

    @Test
    public void testEventTimeSessionWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n c BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  pyFunc(a, a + 1)\nFROM MyTable\nGROUP BY b, Session(rowtime, INTERVAL '10' SECOND)");
    }

    @Test
    public void testProcTimeSessionWindow() {
        String sinkTableDdl = "CREATE TABLE MySink (\n b BIGINT,\n c BIGINT\n) WITH (\n 'connector' = 'values')\n";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("insert into MySink select\n  b,\n  pyFunc(a, a + 1)\nFROM MyTable\nGROUP BY b, Session(proctime, INTERVAL '10' MINUTE)");
    }
}

