/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.cdc.SynchronizationActionBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CdcActionITCaseBase
extends ActionITCaseBase {
    private static final Logger LOG = LoggerFactory.getLogger(CdcActionITCaseBase.class);
    protected StreamExecutionEnvironment env;

    @BeforeEach
    public void setEnv() {
        this.env = this.streamExecutionEnvironmentBuilder().streamingMode().parallelism(2).checkpointIntervalMs(1000).build();
    }

    @AfterEach
    public void closeEnv() throws Exception {
        this.env.close();
    }

    protected void waitingTables(String ... tables) throws Exception {
        this.waitingTables(Arrays.asList(tables));
    }

    protected void waitingTables(List<String> tables) throws Exception {
        List actualTables;
        LOG.info("Waiting for tables '{}'", tables);
        while (!(actualTables = this.catalog.listTables(this.database)).containsAll(tables)) {
            Thread.sleep(100L);
        }
    }

    protected void assertExactlyExistTables(List<String> tableNames) throws Exception {
        this.assertExactlyExistTables(tableNames.toArray(new String[0]));
    }

    protected void assertExactlyExistTables(String ... tableNames) throws Exception {
        Assertions.assertThat((List)this.catalog.listTables(this.database)).containsExactlyInAnyOrder((Object[])tableNames);
    }

    protected void assertTableNotExists(List<String> tableNames) throws Exception {
        this.assertTableNotExists(tableNames.toArray(new String[0]));
    }

    protected void assertTableNotExists(String ... tableNames) throws Exception {
        Assertions.assertThat((List)this.catalog.listTables(this.database)).doesNotContain((Object[])tableNames);
    }

    protected void waitForResult(List<String> expected, FileStoreTable table, RowType rowType, List<String> primaryKeys) throws Exception {
        Assertions.assertThat((List)table.schema().primaryKeys()).isEqualTo(primaryKeys);
        while (true) {
            if (rowType.getFieldCount() == table.schema().fields().size()) {
                int cnt = 0;
                for (int i = 0; i < table.schema().fields().size(); ++i) {
                    DataField field = (DataField)table.schema().fields().get(i);
                    boolean sameName = field.name().equals(rowType.getFieldNames().get(i));
                    boolean sameType = field.type().equals(rowType.getFieldTypes().get(i));
                    if (!sameName || !sameType) continue;
                    ++cnt;
                }
                if (cnt == rowType.getFieldCount()) break;
            }
            table = table.copyWithLatestSchema();
            Thread.sleep(1000L);
        }
        ArrayList<String> sortedExpected = new ArrayList<String>(expected);
        Collections.sort(sortedExpected);
        while (true) {
            ReadBuilder readBuilder = table.newReadBuilder();
            TableScan.Plan plan = readBuilder.newScan().plan();
            List result = this.getResult(readBuilder.newRead(), plan == null ? Collections.emptyList() : plan.splits(), rowType);
            ArrayList sortedActual = new ArrayList(result);
            Collections.sort(sortedActual);
            if (sortedExpected.equals(sortedActual)) break;
            Thread.sleep(1000L);
        }
    }

    protected Map<String, String> getBasicTableConfig() {
        HashMap<String, String> config = new HashMap<String, String>();
        ThreadLocalRandom random = ThreadLocalRandom.current();
        config.put("bucket", String.valueOf(random.nextInt(3) + 1));
        config.put("sink.parallelism", String.valueOf(random.nextInt(3) + 1));
        return config;
    }

    protected List<String> mapToArgs(String argKey, Map<String, String> map) {
        ArrayList<String> args = new ArrayList<String>();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            args.add(argKey);
            args.add(String.format("%s=%s", entry.getKey(), entry.getValue()));
        }
        return args;
    }

    protected List<String> listToArgs(String argKey, List<String> list) {
        if (list.isEmpty()) {
            return Collections.emptyList();
        }
        return Arrays.asList(argKey, String.join((CharSequence)",", list));
    }

    protected List<String> listToMultiArgs(String argKey, List<String> list) {
        ArrayList<String> args = new ArrayList<String>();
        for (String v : list) {
            args.add(argKey);
            args.add(v);
        }
        return args;
    }

    protected <T> List<String> nullableToArgs(String argKey, @Nullable T nullable) {
        if (nullable == null) {
            return Collections.emptyList();
        }
        return Arrays.asList(argKey, nullable.toString());
    }

    public JobClient runActionWithDefaultEnv(ActionBase action) throws Exception {
        action.withStreamExecutionEnvironment(this.env).build();
        JobClient client = this.env.executeAsync();
        this.waitJobRunning(client);
        return client;
    }

    protected void waitJobRunning(JobClient client) throws Exception {
        JobStatus status;
        while ((status = (JobStatus)client.getJobStatus().get()) != JobStatus.RUNNING) {
            Thread.sleep(1000L);
        }
    }

    private <T> String getActionName(Class<T> clazz) {
        switch (clazz.getSimpleName()) {
            case "MySqlSyncTableAction": {
                return "mysql_sync_table";
            }
            case "MySqlSyncDatabaseAction": {
                return "mysql_sync_database";
            }
            case "KafkaSyncTableAction": {
                return "kafka_sync_table";
            }
            case "KafkaSyncDatabaseAction": {
                return "kafka_sync_database";
            }
            case "MongoDBSyncTableAction": {
                return "mongodb_sync_table";
            }
            case "MongoDBSyncDatabaseAction": {
                return "mongodb_sync_database";
            }
            case "PulsarSyncTableAction": {
                return "pulsar_sync_table";
            }
            case "PulsarSyncDatabaseAction": {
                return "pulsar_sync_database";
            }
            case "PostgresSyncTableAction": {
                return "postgres_sync_table";
            }
        }
        throw new UnsupportedOperationException("Unknown sync action: " + clazz.getSimpleName());
    }

    private <T> String getConfKey(Class<T> clazz) {
        switch (clazz.getSimpleName()) {
            case "MySqlSyncTableAction": 
            case "MySqlSyncDatabaseAction": {
                return "--mysql_conf";
            }
            case "KafkaSyncTableAction": 
            case "KafkaSyncDatabaseAction": {
                return "--kafka_conf";
            }
            case "MongoDBSyncTableAction": 
            case "MongoDBSyncDatabaseAction": {
                return "--mongodb_conf";
            }
            case "PulsarSyncTableAction": 
            case "PulsarSyncDatabaseAction": {
                return "--pulsar_conf";
            }
            case "PostgresSyncTableAction": {
                return "--postgres_conf";
            }
        }
        throw new UnsupportedOperationException("Unknown sync action: " + clazz.getSimpleName());
    }

    protected abstract class SyncDatabaseActionBuilder<T extends SynchronizationActionBase> {
        private final Class<T> clazz;
        private final Map<String, String> sourceConfig;
        private Map<String, String> catalogConfig = Collections.emptyMap();
        private Map<String, String> tableConfig = Collections.emptyMap();
        @Nullable
        private Boolean ignoreIncompatible;
        @Nullable
        private Boolean mergeShards;
        @Nullable
        private String tablePrefix;
        @Nullable
        private String tableSuffix;
        @Nullable
        private String includingTables;
        @Nullable
        private String excludingTables;
        @Nullable
        private String mode;
        private final List<String> typeMappingModes = new ArrayList<String>();
        private final List<String> metadataColumn = new ArrayList<String>();

        public SyncDatabaseActionBuilder(Class<T> clazz, Map<String, String> sourceConfig) {
            this.clazz = clazz;
            this.sourceConfig = sourceConfig;
        }

        public SyncDatabaseActionBuilder<T> withCatalogConfig(Map<String, String> catalogConfig) {
            this.catalogConfig = catalogConfig;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTableConfig(Map<String, String> tableConfig) {
            this.tableConfig = tableConfig;
            return this;
        }

        public SyncDatabaseActionBuilder<T> ignoreIncompatible(boolean ignoreIncompatible) {
            this.ignoreIncompatible = ignoreIncompatible;
            return this;
        }

        public SyncDatabaseActionBuilder<T> mergeShards(boolean mergeShards) {
            this.mergeShards = mergeShards;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTablePrefix(String tablePrefix) {
            this.tablePrefix = tablePrefix;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTableSuffix(String tableSuffix) {
            this.tableSuffix = tableSuffix;
            return this;
        }

        public SyncDatabaseActionBuilder<T> includingTables(String includingTables) {
            this.includingTables = includingTables;
            return this;
        }

        public SyncDatabaseActionBuilder<T> excludingTables(String excludingTables) {
            this.excludingTables = excludingTables;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withMode(String mode) {
            this.mode = mode;
            return this;
        }

        public SyncDatabaseActionBuilder<T> withTypeMappingModes(String ... typeMappingModes) {
            this.typeMappingModes.addAll(Arrays.asList(typeMappingModes));
            return this;
        }

        public SyncDatabaseActionBuilder<T> withMetadataColumn(List<String> metadataColumn) {
            this.metadataColumn.addAll(metadataColumn);
            return this;
        }

        public T build() {
            ArrayList<String> args = new ArrayList<String>(Arrays.asList(CdcActionITCaseBase.this.getActionName(this.clazz), "--warehouse", CdcActionITCaseBase.this.warehouse, "--database", CdcActionITCaseBase.this.database));
            args.addAll(CdcActionITCaseBase.this.mapToArgs(CdcActionITCaseBase.this.getConfKey(this.clazz), this.sourceConfig));
            args.addAll(CdcActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            args.addAll(CdcActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            args.addAll(CdcActionITCaseBase.this.nullableToArgs("--ignore-incompatible", this.ignoreIncompatible));
            args.addAll(CdcActionITCaseBase.this.nullableToArgs("--merge-shards", this.mergeShards));
            args.addAll(CdcActionITCaseBase.this.nullableToArgs("--table-prefix", this.tablePrefix));
            args.addAll(CdcActionITCaseBase.this.nullableToArgs("--table-suffix", this.tableSuffix));
            args.addAll(CdcActionITCaseBase.this.nullableToArgs("--including-tables", this.includingTables));
            args.addAll(CdcActionITCaseBase.this.nullableToArgs("--excluding-tables", this.excludingTables));
            args.addAll(CdcActionITCaseBase.this.nullableToArgs("--mode", this.mode));
            args.addAll(CdcActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            args.addAll(CdcActionITCaseBase.this.listToArgs("--metadata-column", this.metadataColumn));
            return (T)((SynchronizationActionBase)CdcActionITCaseBase.this.createAction(this.clazz, args));
        }
    }

    protected abstract class SyncTableActionBuilder<T extends SynchronizationActionBase> {
        private final Class<T> clazz;
        private final Map<String, String> sourceConfig;
        private Map<String, String> catalogConfig = Collections.emptyMap();
        private Map<String, String> tableConfig = Collections.emptyMap();
        private final List<String> partitionKeys = new ArrayList<String>();
        private final List<String> primaryKeys = new ArrayList<String>();
        private final List<String> computedColumnArgs = new ArrayList<String>();
        private final List<String> typeMappingModes = new ArrayList<String>();
        private final List<String> metadataColumns = new ArrayList<String>();

        public SyncTableActionBuilder(Class<T> clazz, Map<String, String> sourceConfig) {
            this.clazz = clazz;
            this.sourceConfig = sourceConfig;
        }

        public SyncTableActionBuilder<T> withCatalogConfig(Map<String, String> catalogConfig) {
            this.catalogConfig = catalogConfig;
            return this;
        }

        public SyncTableActionBuilder<T> withTableConfig(Map<String, String> tableConfig) {
            this.tableConfig = tableConfig;
            return this;
        }

        public SyncTableActionBuilder<T> withPartitionKeys(String ... partitionKeys) {
            this.partitionKeys.addAll(Arrays.asList(partitionKeys));
            return this;
        }

        public SyncTableActionBuilder<T> withPrimaryKeys(String ... primaryKeys) {
            this.primaryKeys.addAll(Arrays.asList(primaryKeys));
            return this;
        }

        public SyncTableActionBuilder<T> withComputedColumnArgs(String ... computedColumnArgs) {
            return this.withComputedColumnArgs(Arrays.asList(computedColumnArgs));
        }

        public SyncTableActionBuilder<T> withComputedColumnArgs(List<String> computedColumnArgs) {
            this.computedColumnArgs.addAll(computedColumnArgs);
            return this;
        }

        public SyncTableActionBuilder<T> withTypeMappingModes(String ... typeMappingModes) {
            this.typeMappingModes.addAll(Arrays.asList(typeMappingModes));
            return this;
        }

        public SyncTableActionBuilder<T> withMetadataColumns(String ... metadataColumns) {
            this.metadataColumns.addAll(Arrays.asList(metadataColumns));
            return this;
        }

        public T build() {
            ArrayList<String> args = new ArrayList<String>(Arrays.asList(CdcActionITCaseBase.this.getActionName(this.clazz), "--warehouse", CdcActionITCaseBase.this.warehouse, "--database", CdcActionITCaseBase.this.database, "--table", CdcActionITCaseBase.this.tableName));
            args.addAll(CdcActionITCaseBase.this.mapToArgs(CdcActionITCaseBase.this.getConfKey(this.clazz), this.sourceConfig));
            args.addAll(CdcActionITCaseBase.this.mapToArgs("--catalog-conf", this.catalogConfig));
            args.addAll(CdcActionITCaseBase.this.mapToArgs("--table-conf", this.tableConfig));
            args.addAll(CdcActionITCaseBase.this.listToArgs("--partition-keys", this.partitionKeys));
            args.addAll(CdcActionITCaseBase.this.listToArgs("--primary-keys", this.primaryKeys));
            args.addAll(CdcActionITCaseBase.this.listToArgs("--type-mapping", this.typeMappingModes));
            args.addAll(CdcActionITCaseBase.this.listToMultiArgs("--computed-column", this.computedColumnArgs));
            args.addAll(CdcActionITCaseBase.this.listToMultiArgs("--metadata-column", this.metadataColumns));
            return (T)((SynchronizationActionBase)CdcActionITCaseBase.this.createAction(this.clazz, args));
        }
    }
}

