/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.batch.sql.join;

import java.io.Serializable;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.plan.batch.sql.join.LookupJoinTest$;
import org.apache.flink.table.planner.plan.optimize.program.BatchOptimizeContext;
import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram$;
import org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram;
import org.apache.flink.table.planner.plan.stream.sql.join.TestTemporalTable$;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions;
import org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Function1;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.LambdaDeserialize;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;
import scala.runtime.java8.JFunction1;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\u00055f\u0001B\u0001\u0003\u0001U\u0011a\u0002T8pWV\u0004(j\\5o)\u0016\u001cHO\u0003\u0002\u0004\t\u0005!!n\\5o\u0015\t)a!A\u0002tc2T!a\u0002\u0005\u0002\u000b\t\fGo\u00195\u000b\u0005%Q\u0011\u0001\u00029mC:T!a\u0003\u0007\u0002\u000fAd\u0017M\u001c8fe*\u0011QBD\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u001fA\tQA\u001a7j].T!!\u0005\n\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0019\u0012aA8sO\u000e\u00011C\u0001\u0001\u0017!\t9\"$D\u0001\u0019\u0015\tI\"\"A\u0003vi&d7/\u0003\u0002\u001c1\tiA+\u00192mKR+7\u000f\u001e\"bg\u0016D\u0001\"\b\u0001\u0003\u0002\u0003\u0006IAH\u0001\u0012Y\u0016<\u0017mY=UC\ndWmU8ve\u000e,\u0007CA\u0010#\u001b\u0005\u0001#\"A\u0011\u0002\u000bM\u001c\u0017\r\\1\n\u0005\r\u0002#a\u0002\"p_2,\u0017M\u001c\u0005\u0006K\u0001!\tAJ\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005\u001dJ\u0003C\u0001\u0015\u0001\u001b\u0005\u0011\u0001\"B\u000f%\u0001\u0004q\u0002bB\u0016\u0001\u0005\u0004%I\u0001L\u0001\ti\u0016\u001cH/\u0016;jYV\tQ\u0006\u0005\u0002\u0018]%\u0011q\u0006\u0007\u0002\u0013\u0005\u0006$8\r\u001b+bE2,G+Z:u+RLG\u000e\u0003\u00042\u0001\u0001\u0006I!L\u0001\ni\u0016\u001cH/\u0016;jY\u0002BQa\r\u0001\u0005\u0002Q\naAY3g_J,G#A\u001b\u0011\u0005}1\u0014BA\u001c!\u0005\u0011)f.\u001b;)\u0005IJ\u0004C\u0001\u001e>\u001b\u0005Y$B\u0001\u001f\u0013\u0003\u0015QWO\\5u\u0013\tq4H\u0001\u0004CK\u001a|'/\u001a\u0005\u0006\u0001\u0002!\t\u0001N\u0001!i\u0016\u001cHOS8j]&sg/\u00197jI*{\u0017N\u001c+f[B|'/\u00197UC\ndW\r\u000b\u0002@\u0005B\u0011!hQ\u0005\u0003\tn\u0012A\u0001V3ti\")a\t\u0001C\u0001i\u0005\u0011C/Z:u\u001d>$H)[:uS:\u001cGO\u0012:p[&s'j\\5o\u0007>tG-\u001b;j_:D#!\u0012\"\t\u000b%\u0003A\u0011\u0001\u001b\u00029Q,7\u000f\u001e)zi\"|g.\u0016#G\u0013:Tu.\u001b8D_:$\u0017\u000e^5p]\"\u0012\u0001J\u0011\u0005\u0006\u0019\u0002!\t\u0001N\u0001\u0010i\u0016\u001cH\u000fT8hS\u000e\fG\u000e\u00157b]\"\u00121J\u0011\u0005\u0006\u001f\u0002!\t\u0001N\u0001$i\u0016\u001cH\u000fT8hS\u000e\fG\u000e\u00157b]^KG\u000f[%na2L7-\u001b;UsB,7)Y:uQ\tq%\tC\u0003S\u0001\u0011\u0005A'A\u000buKN$(j\\5o)\u0016l\u0007o\u001c:bYR\u000b'\r\\3)\u0005E\u0013\u0005\"B+\u0001\t\u0003!\u0014!\u0007;fgRdUM\u001a;K_&tG+Z7q_J\fG\u000eV1cY\u0016D#\u0001\u0016\"\t\u000ba\u0003A\u0011\u0001\u001b\u0002IQ,7\u000f\u001e&pS:$V-\u001c9pe\u0006dG+\u00192mK^KG\u000f\u001b(fgR,G-U;fefD#a\u0016\"\t\u000bm\u0003A\u0011\u0001\u001b\u0002WQ,7\u000f\u001e&pS:$V-\u001c9pe\u0006dG+\u00192mK^KG\u000f\u001b)s_*,7\r^5p]B+8\u000f\u001b#po:D#A\u0017\"\t\u000by\u0003A\u0011\u0001\u001b\u0002OQ,7\u000f\u001e&pS:$V-\u001c9pe\u0006dG+\u00192mK^KG\u000f\u001b$jYR,'\u000fU;tQ\u0012{wO\u001c\u0015\u0003;\nCQ!\u0019\u0001\u0005\u0002Q\n!\u0004^3ti\u00063x.\u001b3BO\u001e\u0014XmZ1uKB+8\u000f\u001b#po:D#\u0001\u0019\"\t\u000b\u0011\u0004A\u0011\u0001\u001b\u0002MQ,7\u000f\u001e&pS:$V-\u001c9pe\u0006dG+\u00192mK^KG\u000f\u001b+sk\u0016\u001cuN\u001c3ji&|g\u000e\u000b\u0002d\u0005\")q\r\u0001C\u0001i\u00059C/Z:u\u0015>Lg\u000eV3na>\u0014\u0018\r\u001c+bE2,w+\u001b;i\u0007>l\u0007/\u001e;fI\u000e{G.^7oQ\t1'\tC\u0003k\u0001\u0011\u0005A'\u0001\u001auKN$(j\\5o)\u0016l\u0007o\u001c:bYR\u000b'\r\\3XSRD7i\\7qkR,GmQ8mk6t\u0017I\u001c3QkNDGi\\<oQ\tI'\tC\u0003n\u0001\u0011\u0005A'A\u0006uKN$(+Z;tS:<\u0007F\u00017C\u0011\u0015\u0001\b\u0001\"\u0003r\u0003U)\u0007\u0010]3di\u0016C8-\u001a9uS>tG\u000b\u001b:po:$R!\u000e:\u007f\u0003\u0003AQ!B8A\u0002M\u0004\"\u0001^>\u000f\u0005UL\bC\u0001<!\u001b\u00059(B\u0001=\u0015\u0003\u0019a$o\\8u}%\u0011!\u0010I\u0001\u0007!J,G-\u001a4\n\u0005ql(AB*ue&twM\u0003\u0002{A!)qp\u001ca\u0001g\u0006A1.Z=x_J$7\u000fC\u0005\u0002\u0004=\u0004\n\u00111\u0001\u0002\u0006\u0005)1\r\\1{uB\"\u0011qAA\t!\u0015!\u0018\u0011BA\u0007\u0013\r\tY! \u0002\u0006\u00072\f7o\u001d\t\u0005\u0003\u001f\t\t\u0002\u0004\u0001\u0005\u0019\u0005M\u0011\u0011AA\u0001\u0002\u0003\u0015\t!!\u0006\u0003\u0007}#\u0013'\u0005\u0003\u0002\u0018\u0005u\u0001cA\u0010\u0002\u001a%\u0019\u00111\u0004\u0011\u0003\u000f9{G\u000f[5oOB!\u0011qDA\u0015\u001d\u0011\t\t#!\n\u000f\u0007Y\f\u0019#C\u0001\"\u0013\r\t9\u0003I\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tY#!\f\u0003\u0013QC'o\\<bE2,'bAA\u0014A!I\u0011\u0011\u0007\u0001\u0012\u0002\u0013%\u00111G\u0001 Kb\u0004Xm\u0019;Fq\u000e,\u0007\u000f^5p]RC'o\\<oI\u0011,g-Y;mi\u0012\u001aTCAA\u001ba\u0011\t9$a\u000f\u0011\u000bQ\fI!!\u000f\u0011\t\u0005=\u00111\b\u0003\r\u0003'\ty#!A\u0001\u0002\u000b\u0005\u0011Q\u0003\u0015\b\u0001\u0005}\u00121JA'!\u0011\t\t%a\u0012\u000e\u0005\u0005\r#bAA#w\u00051!/\u001e8oKJLA!!\u0013\u0002D\t9!+\u001e8XSRD\u0017!\u0002<bYV,7EAA(!\u0011\t\t&a\u0016\u000e\u0005\u0005M#bAA+w\u00059!/\u001e8oKJ\u001c\u0018\u0002BA-\u0003'\u0012Q\u0002U1sC6,G/\u001a:ju\u0016$waBA/\u0005!\u0005\u0011qL\u0001\u000f\u0019>|7.\u001e9K_&tG+Z:u!\rA\u0013\u0011\r\u0004\u0007\u0003\tA\t!a\u0019\u0014\t\u0005\u0005\u0014Q\r\t\u0004?\u0005\u001d\u0014bAA5A\t1\u0011I\\=SK\u001aDq!JA1\t\u0003\ti\u0007\u0006\u0002\u0002`!A\u0011\u0011OA1\t\u0003\t\u0019(\u0001\u0006qCJ\fW.\u001a;feN$\"!!\u001e\u0011\r\u0005]\u0014\u0011QAC\u001b\t\tIH\u0003\u0003\u0002|\u0005u\u0014\u0001B;uS2T!!a \u0002\t)\fg/Y\u0005\u0005\u0003\u0007\u000bIH\u0001\u0006D_2dWm\u0019;j_:\u0004RaHAD\u0003\u0017K1!!#!\u0005\u0015\t%O]1z!\u0011\ti)a%\u000e\u0005\u0005=%\u0002BAI\u0003{\nA\u0001\\1oO&!\u0011QSAH\u0005\u0019y%M[3di\"B\u0011qNAM\u0003O\u000bI\u000b\u0005\u0003\u0002\u001c\u0006\u0005f\u0002BA)\u0003;KA!a(\u0002T\u0005i\u0001+\u0019:b[\u0016$XM]5{K\u0012LA!a)\u0002&\nQ\u0001+\u0019:b[\u0016$XM]:\u000b\t\u0005}\u00151K\u0001\u0005]\u0006lW-\t\u0002\u0002,\u0006)B*Z4bGf$\u0016M\u00197f'>,(oY3>wBj\b")
public class LookupJoinTest
extends TableTestBase {
    private final boolean legacyTableSource;
    private final BatchTableTestUtil testUtil;

    @Parameterized.Parameters(name="LegacyTableSource={0}")
    public static Collection<Object[]> parameters() {
        return LookupJoinTest$.MODULE$.parameters();
    }

    private BatchTableTestUtil testUtil() {
        return this.testUtil;
    }

    @Before
    public void before() {
        this.testUtil().addDataStream("T0", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c"))}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$1[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$1(org.apache.flink.table.planner.plan.batch.sql.join.LookupJoinTest$$anon$4 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.testUtil().addDataStream("T1", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "a")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "b")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "c")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "d"))}), new CaseClassTypeInfo<Tuple4<Object, String, Object, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<Object, String, Object, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$2[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>> unused = new ScalaCaseClassSerializer<Tuple4<Object, String, Object, Object>>(this, fieldSerializers){

                    public Tuple4<Object, String, Object, Object> createInstance(Object[] fields) {
                        return new Tuple4((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[2])), (Object)BoxesRunTime.boxToDouble((double)BoxesRunTime.unboxToDouble((Object)fields[3])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$2(org.apache.flink.table.planner.plan.batch.sql.join.LookupJoinTest$$anon$5 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        this.testUtil().addDataStream("nonTemporal", (Seq<Expression>)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "id")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "name")), package$.MODULE$.symbol2FieldExpression((Symbol)SymbolLiteral.bootstrap("apply", "age"))}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(null){

            public /* synthetic */ TypeInformation[] protected$types($anon$6 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
                    fieldSerializers$3[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                ScalaCaseClassSerializer<Tuple3<Object, String, Object>> unused = new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, fieldSerializers){

                    public Tuple3<Object, String, Object> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[0])), (Object)((String)fields[1]), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[2])));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }

            private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
                return LambdaDeserialize.bootstrap("lambdaDeserialize", new MethodHandle[]{$anonfun$createSerializer$3(org.apache.flink.table.planner.plan.batch.sql.join.LookupJoinTest$$anon$6 org.apache.flink.api.common.ExecutionConfig org.apache.flink.api.common.typeutils.TypeSerializer[] int )}, serializedLambda);
            }
        });
        Table myTable = this.testUtil().tableEnv().sqlQuery("SELECT *, PROCTIME() as proctime FROM T0");
        this.testUtil().tableEnv().createTemporaryView("MyTable", myTable);
        if (this.legacyTableSource) {
            TestTemporalTable$.MODULE$.createTemporaryTable(this.testUtil().tableEnv(), "LookupTable", true, TestTemporalTable$.MODULE$.createTemporaryTable$default$4());
        } else {
            this.testUtil().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                          |CREATE TABLE LookupTable (\n                          |  `id` INT,\n                          |  `name` STRING,\n                          |  `age` INT\n                          |) WITH (\n                          |  'connector' = 'values',\n                          |  'bounded' = 'true'\n                          |)\n                          |")).stripMargin());
            this.testUtil().addTable(new StringOps(Predef$.MODULE$.augmentString("\n                          |CREATE TABLE LookupTableWithComputedColumn (\n                          |  `id` INT,\n                          |  `name` STRING,\n                          |  `age` INT,\n                          |  `nominal_age` as age + 1\n                          |) WITH (\n                          |  'connector' = 'values',\n                          |  'bounded' = 'true'\n                          |)\n                          |")).stripMargin());
        }
    }

    @Test
    public void testJoinInvalidJoinTemporalTable() {
        this.expectExceptionThrown("SELECT * FROM MyTable AS T JOIN LookupTable T.proc AS D ON T.a = D.id", "SQL parse failed", SqlParserException.class);
        this.expectExceptionThrown("SELECT * FROM LookupTable FOR SYSTEM_TIME AS OF TIMESTAMP '2017-08-09 14:36:11'", "Temporal table can only be used in temporal join and only supports 'FOR SYSTEM_TIME AS OF' left table's time attribute field.\nQuerying a temporal table using 'FOR SYSTEM TIME AS OF' syntax with a constant timestamp '2017-08-09 14:36:11' is not supported yet", AssertionError.class);
        this.expectExceptionThrown("SELECT * FROM MyTable AS T RIGHT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id", null, AssertionError.class);
        this.expectExceptionThrown("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a + 1 = D.id + 1", "Temporal table join requires an equality condition on fields of table [default_catalog.default_database.LookupTable].", TableException.class);
    }

    @Test
    public void testNotDistinctFromInJoinCondition() {
        this.expectExceptionThrown("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT  DISTINCT FROM D.id", "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or alternative '(a = b) or (a IS NULL AND b IS NULL)')", TableException.class);
        this.expectExceptionThrown("SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)", "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or alternative '(a = b) or (a IS NULL AND b IS NULL)')", TableException.class);
    }

    @Test
    public void testPythonUDFInJoinCondition() {
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Only inner join condition with equality predicates supports the Python UDF taking the inputs from the left table and the right table at the same time, e.g., ON T1.id = T2.id && pythonUdf(T1.a, T2.b)");
        this.testUtil().addFunction("pyFunc", new JavaUserDefinedScalarFunctions.PythonScalarFunction("pyFunc"));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |LEFT OUTER JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id AND D.age = 10 AND pyFunc(D.age, T.a) > 100\n      ")).stripMargin();
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testLogicalPlan() {
        String sql1 = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, a, sum(c) c, sum(d) d, PROCTIME() as proctime\n        |FROM T1\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(157).append("\n         |SELECT T.* FROM (").append(sql1).append(") AS T\n         |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n         |ON T.a = D.id\n         |WHERE D.age > 10\n      ").toString())).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(96).append("\n         |SELECT b, count(a), sum(c), sum(d)\n         |FROM (").append(sql2).append(") AS T\n         |GROUP BY b\n      ").toString())).stripMargin();
        FlinkChainedProgram programs = FlinkBatchProgram$.MODULE$.buildProgram((ReadableConfig)this.testUtil().tableEnv().getConfig());
        programs.remove(FlinkBatchProgram$.MODULE$.PHYSICAL());
        this.testUtil().replaceBatchProgram((FlinkChainedProgram<BatchOptimizeContext>)programs);
        this.testUtil().verifyRelPlan(sql);
    }

    @Test
    public void testLogicalPlanWithImplicitTypeCast() {
        FlinkChainedProgram programs = FlinkBatchProgram$.MODULE$.buildProgram((ReadableConfig)this.testUtil().tableEnv().getConfig());
        programs.remove(FlinkBatchProgram$.MODULE$.PHYSICAL());
        this.testUtil().replaceBatchProgram((FlinkChainedProgram<BatchOptimizeContext>)programs);
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("implicit type conversion between VARCHAR(2147483647) and INTEGER is not supported on join's condition now");
        this.testUtil().verifyRelPlan("SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.b = D.id");
    }

    @Test
    public void testJoinTemporalTable() {
        String sql = "SELECT * FROM MyTable AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id";
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testLeftJoinTemporalTable() {
        String sql = "SELECT * FROM MyTable AS T LEFT JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id";
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testJoinTemporalTableWithNestedQuery() {
        String sql = "SELECT * FROM (SELECT a, b, proctime FROM MyTable WHERE c > 1000) AS T JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id";
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testJoinTemporalTableWithProjectionPushDown() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT T.*, D.id\n        |FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id\n      ")).stripMargin();
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testJoinTemporalTableWithFilterPushDown() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON T.a = D.id AND D.age = 10\n        |WHERE T.c > 1000\n      ")).stripMargin();
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testAvoidAggregatePushDown() {
        String sql1 = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, a, sum(c) c, sum(d) d, PROCTIME() as proctime\n        |FROM T1\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(157).append("\n         |SELECT T.* FROM (").append(sql1).append(") AS T\n         |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n         |ON T.a = D.id\n         |WHERE D.age > 10\n      ").toString())).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(96).append("\n         |SELECT b, count(a), sum(c), sum(d)\n         |FROM (").append(sql2).append(") AS T\n         |GROUP BY b\n      ").toString())).stripMargin();
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testJoinTemporalTableWithTrueCondition() {
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Temporal table join requires an equality condition on fields of table [default_catalog.default_database.LookupTable]");
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM MyTable AS T\n        |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n        |ON true\n        |WHERE T.c > 1000\n      ")).stripMargin();
        this.testUtil().verifyExplain(sql);
    }

    @Test
    public void testJoinTemporalTableWithComputedColumn() {
        Assume.assumeFalse((boolean)this.legacyTableSource);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  T.a, T.b, T.c, D.name, D.age, D.nominal_age\n        |FROM\n        |  MyTable AS T JOIN LookupTableWithComputedColumn FOR SYSTEM_TIME AS OF T.proctime AS D\n        |  ON T.a = D.id\n      ")).stripMargin();
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testJoinTemporalTableWithComputedColumnAndPushDown() {
        Assume.assumeFalse((boolean)this.legacyTableSource);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  T.a, T.b, T.c, D.name, D.age, D.nominal_age\n        |FROM\n        |  MyTable AS T JOIN LookupTableWithComputedColumn FOR SYSTEM_TIME AS OF T.proctime AS D\n        |  ON T.a = D.id and D.nominal_age > 12\n      ")).stripMargin();
        this.testUtil().verifyExecPlan(sql);
    }

    @Test
    public void testReusing() {
        this.testUtil().tableEnv().getConfig().set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, (Object)BoxesRunTime.boxToBoolean((boolean)true));
        String sql1 = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT b, a, sum(c) c, sum(d) d, PROCTIME() as proctime\n        |FROM T1\n        |GROUP BY a, b\n      ")).stripMargin();
        String sql2 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(155).append("\n         |SELECT * FROM (").append(sql1).append(") AS T\n         |JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n         |ON T.a = D.id\n         |WHERE D.age > 10\n      ").toString())).stripMargin();
        String sql3 = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(49).append("\n         |SELECT id as a, b FROM (").append(sql2).append(") AS T\n       ").toString())).stripMargin();
        String sql = new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(153).append("\n         |SELECT count(T1.a), count(T1.id), sum(T2.a)\n         |FROM (").append(sql2).append(") AS T1, (").append(sql3).append(") AS T2\n         |WHERE T1.a = T2.a\n         |GROUP BY T1.b, T2.b\n      ").toString())).stripMargin();
        this.testUtil().verifyExecPlan(sql);
    }

    private void expectExceptionThrown(String sql, String keywords, Class<? extends Throwable> clazz) {
        try {
            this.testUtil().verifyExplain(sql);
            Assert.fail((String)new StringBuilder(40).append("Expected a ").append(clazz).append(", but no exception is thrown.").toString());
        }
        catch (Throwable throwable) {
            Throwable throwable2 = throwable;
            Class<?> clazz2 = throwable2.getClass();
            Class<? extends Throwable> clazz3 = clazz;
            if (!(clazz2 != null ? !clazz2.equals(clazz3) : clazz3 != null)) {
                BoxedUnit boxedUnit;
                if (keywords != null) {
                    Assert.assertTrue((String)new StringBuilder(31).append("The actual exception message \n").append(throwable2.getMessage()).append("\n").append(new StringBuilder(35).append("doesn't contain expected keyword \n").append(keywords).append("\n").toString()).toString(), (boolean)throwable2.getMessage().contains(keywords));
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit2 = boxedUnit;
            }
            if (throwable2 != null) {
                Throwable throwable3 = throwable2;
                throwable3.printStackTrace();
                Assert.fail((String)new StringBuilder(25).append("Expected throw ").append(clazz.getSimpleName()).append(", but is ").append(throwable3).append(".").toString());
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            }
            throw throwable;
        }
    }

    private Class<? extends Throwable> expectExceptionThrown$default$3() {
        return ValidationException.class;
    }

    public LookupJoinTest(boolean legacyTableSource) {
        this.legacyTableSource = legacyTableSource;
        this.testUtil = this.batchTestUtil(this.batchTestUtil$default$1());
    }
}

