/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Before;
import org.junit.Test;

public class TemporalJoinJsonPlanTest
extends TableTestBase {
    private StreamTableTestUtil util;
    private TableEnvironment tEnv;

    @Before
    public void setup() {
        this.util = this.streamTestUtil(TableConfig.getDefault());
        this.tEnv = this.util.getTableEnv();
        this.tEnv.executeSql("CREATE TABLE Orders (\n amount INT,\n currency STRING,\n rowtime TIMESTAMP(3),\n proctime AS PROCTIME(),\n WATERMARK FOR rowtime AS rowtime\n) WITH (\n 'connector' = 'values'\n)");
        this.tEnv.executeSql("CREATE TABLE RatesHistory (\n currency STRING,\n rate INT,\n rowtime TIMESTAMP(3),\n WATERMARK FOR rowtime AS rowtime,\n PRIMARY KEY(currency) NOT ENFORCED\n) WITH (\n 'connector' = 'values'\n)");
        TemporalTableFunction ratesHistory = this.tEnv.from("RatesHistory").createTemporalTableFunction((Expression)Expressions.$((String)"rowtime"), (Expression)Expressions.$((String)"currency"));
        this.tEnv.createTemporarySystemFunction("Rates", (UserDefinedFunction)ratesHistory);
    }

    @Test
    public void testJoinTemporalFunction() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("INSERT INTO MySink SELECT amount * r.rate FROM Orders AS o,  LATERAL TABLE (Rates(o.rowtime)) AS r WHERE o.currency = r.currency ");
    }

    @Test
    public void testTemporalTableJoin() {
        String sinkTableDdl = "CREATE TABLE MySink (\n  a int\n) with (\n  'connector' = 'values',\n  'table-sink-class' = 'DEFAULT')";
        this.tEnv.executeSql(sinkTableDdl);
        this.util.verifyJsonPlan("INSERT INTO MySink SELECT amount * r.rate FROM Orders AS o  JOIN RatesHistory  FOR SYSTEM_TIME AS OF o.rowtime AS r ON o.currency = r.currency ");
    }
}

