/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.kafka;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.kafka.KafkaLogTestUtils;
import org.apache.paimon.flink.kafka.KafkaTableTestBase;
import org.apache.paimon.flink.util.ReadWriteTableTestUtil;
import org.apache.paimon.utils.BlockingIterator;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class StreamingReadWriteTableWithKafkaLogITCase
extends KafkaTableTestBase {
    @BeforeEach
    public void setUp() {
        ReadWriteTableTestUtil.init((String)this.createAndRegisterTempFile("").toString());
    }

    @Test
    public void testReadWriteWithPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), initialRecords, (String)"dt:2022-01-01;dt:2022-01-02", (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.checkFileStorePath((String)table, Arrays.asList("dt=2022-01-01", "dt=2022-01-02"));
        BlockingIterator streamItr = ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"*", (String)"WHERE dt >= '2022-01-01' AND dt <= '2022-01-03' OR currency = 'HK Dollar'"), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"})));
        ReadWriteTableTestUtil.insertIntoPartition((String)table, (String)"PARTITION (dt = '2022-01-03')", (String[])new String[]{"('HK Dollar', 100)", "('Yen', 20)"});
        ReadWriteTableTestUtil.insertIntoPartition((String)table, (String)"PARTITION (dt = '2022-01-04')", (String[])new String[]{"('Yen', 20)"});
        ReadWriteTableTestUtil.validateStreamingReadResult((BlockingIterator)streamItr, Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", 100L, "2022-01-03"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 20L, "2022-01-03"})));
        ReadWriteTableTestUtil.insertOverwritePartition((String)table, (String)"PARTITION (dt = '2022-01-02')", (String[])new String[]{"('Euro', 100)", "('Yen', 1)"});
        ReadWriteTableTestUtil.assertNoMoreRecords((BlockingIterator)streamItr);
        streamItr.close();
        ReadWriteTableTestUtil.testBatchRead((String)ReadWriteTableTestUtil.buildSimpleQuery((String)table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 100L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"HK Dollar", 100L, "2022-01-03"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 20L, "2022-01-03"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 20L, "2022-01-04"})));
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"*", (String)"WHERE dt = '2022-01-01'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"*", (String)"WHERE currency = 'US Dollar'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}))).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"*", (String)"WHERE dt = '2022-01-01' AND rate = 1"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"rate, dt, currency", (String)"WHERE dt = '2022-01-02' AND currency = 'Euro'"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{100L, "2022-01-02", "Euro"}))).close();
    }

    @Test
    public void testSReadWriteWithNonPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), initialRecords, null, (boolean)false, (String)"I, UA, D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.checkFileStorePath((String)table, Collections.emptyList());
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildSimpleQuery((String)table), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}))).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"*", (String)"WHERE currency = 'Yen'"), Collections.emptyList()).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"currency", (String)""), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}))).close();
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQuery((String)table, (String)"currency", (String)"WHERE rate = 102"), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}))).close();
    }

    @Test
    public void testReadLatestChangelogOfPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), initialRecords, (String)"dt:2022-01-01;dt:2022-01-02", (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        BlockingIterator streamItr = ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"})));
        ReadWriteTableTestUtil.insertInto((String)table, (String[])new String[]{"('US Dollar', 104, '2022-01-01')", "('Euro', 100, '2022-01-02')"});
        ReadWriteTableTestUtil.validateStreamingReadResult((BlockingIterator)streamItr, Arrays.asList(TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"US Dollar", 104L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 100L, "2022-01-02"})));
        ReadWriteTableTestUtil.assertNoMoreRecords((BlockingIterator)streamItr);
        streamItr.close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE dt = '2022-01-01'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L, "2022-01-01"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE currency = 'Yen'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE rate = 114", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE rate = 114 AND dt = '2022-01-02'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.emptyList()).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"rate", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{114L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{116L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{1L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{116L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{119L}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"rate", (String)"WHERE dt = '2022-01-02'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Collections.singletonList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{119L}))).close();
    }

    @Test
    public void testReadLatestChangelogOfNonPartitionedRecordsWithPk() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), initialRecords, null, (boolean)false, (String)"I,UA,D");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen", 1L}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE currency = 'Euro'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro", 116L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"currency", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{"Yen"}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"rate", (String)"WHERE currency = 'Euro'", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{114L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{114L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{116L}), TestValuesTableFactory.changelogRow((String)"-D", (Object[])new Object[]{116L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{119L}))).close();
    }

    @Test
    public void testReadLatestChangelogOfInsertOnlyRecords() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), initialRecords, null, (boolean)true, (String)"I");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), initialRecords).close();
        temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), initialRecords, null, (boolean)true, (String)"I");
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"WHERE rate = 114", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"rate", (String)"", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{1L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{114L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{119L}))).close();
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), true);
        ReadWriteTableTestUtil.testStreamingReadWithReadFirst((String)temporaryTable, (String)table, (String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"currency", (String)"WHERE rate = 114", (Map)ReadWriteTableTestUtil.SCAN_LATEST), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro"}))).close();
    }

    @Test
    public void testReadInsertOnlyChangelogFromTimestamp() throws Exception {
        List<Row> initialRecords0 = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), initialRecords0, (String)"dt:2022-01-01;dt:2022-01-02", (boolean)true, (String)"I");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Collections.emptyList(), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", this.scanFromTimeStampMillis(0L)), initialRecords0).close();
        temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), initialRecords0, (String)"dt:2022-01-01;dt:2022-01-02", (boolean)true, (String)"I");
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT", "dt STRING"), Arrays.asList("currency", "dt"), Collections.singletonList("dt"), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", this.scanFromTimeStampMillis(0L)), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"US Dollar", 102L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"US Dollar", 114L, "2022-01-01"}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L, "2022-01-02"}))).close();
        List<Row> initialRecords1 = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), initialRecords1, null, (boolean)true, (String)"I");
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.singletonList("currency"), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", this.scanFromTimeStampMillis(0L)), Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"-U", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+U", (Object[])new Object[]{"Euro", 119L}))).close();
        temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), initialRecords1, null, (boolean)true, (String)"I");
        table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", this.scanFromTimeStampMillis(0L)), initialRecords1).close();
    }

    @Test
    public void testReadInsertOnlyChangelogFromEnormousTimestamp() throws Exception {
        List<Row> initialRecords = Arrays.asList(TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"US Dollar", 102L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Yen", 1L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 114L}), TestValuesTableFactory.changelogRow((String)"+I", (Object[])new Object[]{"Euro", 119L}));
        String temporaryTable = ReadWriteTableTestUtil.createTemporaryTable(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), initialRecords, null, (boolean)true, (String)"I");
        String table = KafkaLogTestUtils.createTableWithKafkaLog(Arrays.asList("currency STRING", "rate BIGINT"), Collections.emptyList(), Collections.emptyList(), false);
        ReadWriteTableTestUtil.insertIntoFromTable((String)temporaryTable, (String)table);
        ReadWriteTableTestUtil.testStreamingRead((String)ReadWriteTableTestUtil.buildQueryWithTableOptions((String)table, (String)"*", (String)"", this.scanFromTimeStampMillis(0x7FFFFFFFFFFFFFFEL)), Collections.emptyList()).close();
    }

    private Map<String, String> scanFromTimeStampMillis(final Long timeStampMillis) {
        return new HashMap<String, String>(){
            {
                this.put(CoreOptions.SCAN_MODE.key(), CoreOptions.StartupMode.FROM_TIMESTAMP.toString());
                this.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(timeStampMillis));
            }
        };
    }
}

