/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001!4A!\u0001\u0002\u0001'\t9r+\u001b8e_^$U\rZ;qY&\u001c\u0017\r^3J)\u000e\u000b7/\u001a\u0006\u0003\u0007\u0011\t1a]9m\u0015\t)a!\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u000f!\tqA];oi&lWM\u0003\u0002\n\u0015\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u0006\r\u0003\u0015!\u0018M\u00197f\u0015\tia\"A\u0003gY&t7N\u0003\u0002\u0010!\u00051\u0011\r]1dQ\u0016T\u0011!E\u0001\u0004_J<7\u0001A\n\u0003\u0001Q\u0001\"!\u0006\r\u000e\u0003YQ!a\u0006\u0004\u0002\u000bU$\u0018\u000e\\:\n\u0005e1\"AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0007\u0002C\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\t5|G-\u001a\t\u0003;Er!AH\u0018\u000f\u0005}qcB\u0001\u0011.\u001d\t\tCF\u0004\u0002#W9\u00111E\u000b\b\u0003I%r!!\n\u0015\u000e\u0003\u0019R!a\n\n\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0012BA\b\u0011\u0013\tia\"\u0003\u0002\f\u0019%\u0011\u0011BC\u0005\u0003\u000f!I!a\u0006\u0004\n\u0005A2\u0012AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0017B\u0001\u001a4\u0005A\u0019F/\u0019;f\u0005\u0006\u001c7.\u001a8e\u001b>$WM\u0003\u00021-!)Q\u0007\u0001C\u0001m\u00051A(\u001b8jiz\"\"aN\u001d\u0011\u0005a\u0002Q\"\u0001\u0002\t\u000bm!\u0004\u0019\u0001\u000f\t\u000bm\u0002A\u0011\t\u001f\u0002\r\t,gm\u001c:f)\u0005i\u0004C\u0001 B\u001b\u0005y$\"\u0001!\u0002\u000bM\u001c\u0017\r\\1\n\u0005\t{$\u0001B+oSRD#A\u000f#\u0011\u0005\u0015CU\"\u0001$\u000b\u0005\u001d\u0003\u0012!\u00026v]&$\u0018BA%G\u0005\u0019\u0011UMZ8sK\")1\n\u0001C\u0001y\u0005YB/Z:u)Vl'\r\\3XS:$wn^&fKBd\u0015m\u001d;S_^D#AS'\u0011\u0005\u0015s\u0015BA(G\u0005\u0011!Vm\u001d;\t\u000bE\u0003A\u0011\u0001\u001f\u00029Q,7\u000f\u001e+v[\ndWmV5oI><8*Z3q\r&\u00148\u000f\u001e*po\"\u0012\u0001+\u0014\u0005\u0006)\u0002!\t\u0001P\u0001$i\u0016\u001cH\u000fV;nE2,w+\u001b8e_^\\U-\u001a9MCN$(k\\<XSRD7)\u00197dQ\t\u0019V\nC\u0003X\u0001\u0011\u0005A(A\u000fuKN$8)^7vY\u0006$XmV5oI><8*Z3q\u0019\u0006\u001cHOU8xQ\t1V\n\u000b\u0003\u00015\u0002\f\u0007CA._\u001b\u0005a&BA/G\u0003\u0019\u0011XO\u001c8fe&\u0011q\f\u0018\u0002\b%Vtw+\u001b;i\u0003\u00151\u0018\r\\;fG\u0005\u0011\u0007CA2g\u001b\u0005!'BA3G\u0003\u001d\u0011XO\u001c8feNL!a\u001a3\u0003\u001bA\u000b'/Y7fi\u0016\u0014\u0018N_3e\u0001")
public class WindowDeduplicateITCase
extends StreamingWithStateTestBase {
    @Override
    @Before
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        this.env().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        FailingCollectionSource.reset();
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringBuilder(697).append("\n                       |CREATE TABLE T1 (\n                       | `ts` STRING,\n                       | `int` INT,\n                       | `double` DOUBLE,\n                       | `float` FLOAT,\n                       | `bigdec` DECIMAL(10, 2),\n                       | `string` STRING,\n                       | `name` STRING,\n                       | `rowtime` AS TO_TIMESTAMP(`ts`),\n                       | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n                       |) WITH (\n                       | 'connector' = 'values',\n                       | 'data-id' = '").append(dataId).append("',\n                       | 'failing-source' = 'true'\n                       |)\n                       |").toString())).stripMargin());
        this.tEnv().createFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
    }

    @Test
    public void testTumbleWindowKeepLastRow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY rowtime DESC) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |)\n         |WHERE rownum <= 1\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testTumbleWindowKeepFirstRow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY rowtime) as rownum\n         |  FROM TABLE(\n         |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n         |)\n         |WHERE rownum <= 1\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:01,1,1.0,1.0,1.11,Hi,a,2020-10-10 00:00:01.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:06,6,6.0,6.0,6.66,Hi,b,2020-10-10 00:00:06.000,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testTumbleWindowKeepLastRowWithCalc() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `int`,\n        |  `string`,\n        |  `name`,\n        |  window_start,\n        |  window_end,\n        |  window_time\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end, `name` ORDER BY rowtime DESC) as rownum\n        |  FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |)\n        |WHERE rownum <= 1\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"5,null,a,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "3,Comment#2,a,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "3,Hello,b,2020-10-10T00:00:05,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "4,Hi,b,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "7,null,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "1,Comment#3,b,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testCumulateWindowKeepLastRow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n         |SELECT\n         |  TO_TIMESTAMP(`ts`),\n         |  `int`,\n         |  `double`,\n         |  `float`,\n         |  `bigdec`,\n         |  `string`,\n         |  `name`,\n         |  CAST(`rowtime` AS STRING),\n         |  window_start,\n         |  window_end,\n         |  window_time\n         |FROM (\n         |  SELECT *,\n         |    ROW_NUMBER() OVER(\n         |      PARTITION BY window_start, window_end, `name` ORDER BY rowtime DESC) as rownum\n         |  FROM TABLE(\n         |      CUMULATE(\n         |        TABLE T1,\n         |        DESCRIPTOR(rowtime),\n         |        INTERVAL '5' SECOND,\n         |        INTERVAL '15' SECOND))\n         |)\n         |WHERE rownum <= 1\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"2020-10-10T00:00:04,5,5.0,5.0,5.55,null,a,2020-10-10 00:00:04.000,2020-10-10T00:00,2020-10-10T00:00:05,2020-10-10T00:00:04.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:08,3,null,3.0,3.33,Comment#2,a,2020-10-10 00:00:08.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:10,2020-10-10T00:00:09.999", "2020-10-10T00:00:07,3,3.0,3.0,null,Hello,b,2020-10-10 00:00:07.000,2020-10-10T00:00,2020-10-10T00:00:15,2020-10-10T00:00:14.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:20,2020-10-10T00:00:19.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:25,2020-10-10T00:00:24.999", "2020-10-10T00:00:16,4,4.0,4.0,4.44,Hi,b,2020-10-10 00:00:16.000,2020-10-10T00:00:15,2020-10-10T00:00:30,2020-10-10T00:00:29.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:32,7,7.0,7.0,7.77,null,null,2020-10-10 00:00:32.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:35,2020-10-10T00:00:34.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:40,2020-10-10T00:00:39.999", "2020-10-10T00:00:34,1,3.0,3.0,3.33,Comment#3,b,2020-10-10 00:00:34.000,2020-10-10T00:00:30,2020-10-10T00:00:45,2020-10-10T00:00:44.999"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowDeduplicateITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

