/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.types.Row;
import org.junit.Test;

public class ChangelogSourceJsonPlanITCase
extends JsonPlanTestBase {
    @Test
    public void testChangelogSource() throws Exception {
        this.registerChangelogSource();
        this.createTestNonInsertOnlyValuesSinkTable("user_sink", "user_id STRING PRIMARY KEY NOT ENFORCED", "user_name STRING", "email STRING", "balance DECIMAL(18,2)", "balance2 DECIMAL(18,2)");
        String dml = "INSERT INTO user_sink SELECT * FROM users";
        this.compileSqlAndExecutePlan(dml).await();
        List<String> expected = Arrays.asList("+I[user1, Tom, tom123@gmail.com, 8.10, 16.20]", "+I[user3, Bailey, bailey@qq.com, 9.99, 19.98]", "+I[user4, Tina, tina@gmail.com, 11.30, 22.60]");
        this.assertResult(expected, TestValuesTableFactory.getResults("user_sink"));
    }

    @Test
    public void testToUpsertSource() throws Exception {
        this.registerUpsertSource();
        this.createTestNonInsertOnlyValuesSinkTable("user_sink", "user_id STRING PRIMARY KEY NOT ENFORCED", "user_name STRING", "email STRING", "balance DECIMAL(18,2)", "balance2 DECIMAL(18,2)");
        String dml = "INSERT INTO user_sink SELECT * FROM users";
        this.compileSqlAndExecutePlan(dml).await();
        List<String> expected = Arrays.asList("+I[user1, Tom, tom123@gmail.com, 8.10, 16.20]", "+I[user3, Bailey, bailey@qq.com, 9.99, 19.98]", "+I[user4, Tina, tina@gmail.com, 11.30, 22.60]");
        this.assertResult(expected, TestValuesTableFactory.getResults("user_sink"));
    }

    public void registerChangelogSource() {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("changelog-mode", "I,UA,UB,D");
        this.createTestValuesSourceTable("users", (List<Row>)JavaScalaConversionUtil.toJava(TestData.userChangelog()), new String[]{"user_id STRING", "user_name STRING", "email STRING", "balance DECIMAL(18,2)", "balance2 AS balance * 2"}, properties);
    }

    public void registerUpsertSource() {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("changelog-mode", "I,UA,D");
        this.createTestValuesSourceTable("users", (List<Row>)JavaScalaConversionUtil.toJava(TestData.userUpsertlog()), new String[]{"user_id STRING PRIMARY KEY NOT ENFORCED", "user_name STRING", "email STRING", "balance DECIMAL(18,2)", "balance2 AS balance * 2"}, properties);
    }
}

