/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.hive.migrate;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryWriter;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.HiveTypeUtils;
import org.apache.paimon.migrate.FileMetaUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HiveMigrator
implements Migrator {
    private static final Logger LOG = LoggerFactory.getLogger(HiveMigrator.class);
    private static final Predicate<FileStatus> HIDDEN_PATH_FILTER = p -> !p.getPath().getName().startsWith("_") && !p.getPath().getName().startsWith(".");
    private static final String PAIMON_SUFFIX = "_paimon_";
    private final FileIO fileIO;
    private final HiveCatalog hiveCatalog;
    private final IMetaStoreClient client;
    private final String sourceDatabase;
    private final String sourceTable;
    private final String targetDatabase;
    private final String targetTable;
    private final Map<String, String> options;

    public HiveMigrator(HiveCatalog hiveCatalog, String sourceDatabase, String sourceTable, String targetDatabase, String targetTable, Map<String, String> options) {
        this.hiveCatalog = hiveCatalog;
        this.fileIO = hiveCatalog.fileIO();
        this.client = hiveCatalog.getHmsClient();
        this.sourceDatabase = sourceDatabase;
        this.sourceTable = sourceTable;
        this.targetDatabase = targetDatabase;
        this.targetTable = targetTable;
        this.options = options;
    }

    public static List<Migrator> databaseMigrators(HiveCatalog hiveCatalog, String sourceDatabase, Map<String, String> options) {
        IMetaStoreClient client = hiveCatalog.getHmsClient();
        try {
            return client.getAllTables(sourceDatabase).stream().map(sourceTable -> new HiveMigrator(hiveCatalog, sourceDatabase, (String)sourceTable, sourceDatabase, sourceTable + PAIMON_SUFFIX, options)).collect(Collectors.toList());
        }
        catch (TException e) {
            throw new RuntimeException(e);
        }
    }

    public void executeMigrate() throws Exception {
        if (!this.client.tableExists(this.sourceDatabase, this.sourceTable)) {
            throw new RuntimeException("Source hive table does not exist");
        }
        Table sourceHiveTable = this.client.getTable(this.sourceDatabase, this.sourceTable);
        HashMap<String, String> properties = new HashMap<String, String>(sourceHiveTable.getParameters());
        this.checkPrimaryKey();
        Identifier identifier = Identifier.create((String)this.targetDatabase, (String)this.targetTable);
        boolean alreadyExist = this.hiveCatalog.tableExists(identifier);
        if (!alreadyExist) {
            Schema schema = this.from(this.client.getSchema(this.sourceDatabase, this.sourceTable), sourceHiveTable.getPartitionKeys(), properties);
            this.hiveCatalog.createTable(identifier, schema, false);
        }
        try {
            FileStoreTable paimonTable = (FileStoreTable)this.hiveCatalog.getTable(identifier);
            this.checkPaimonTable(paimonTable);
            List partitionsNames = this.client.listPartitionNames(this.sourceDatabase, this.sourceTable, (short)Short.MAX_VALUE);
            this.checkCompatible(sourceHiveTable, paimonTable);
            ArrayList<MigrateTask> tasks = new ArrayList<MigrateTask>();
            ConcurrentHashMap<Path, Path> rollBack = new ConcurrentHashMap<Path, Path>();
            if (partitionsNames.isEmpty()) {
                tasks.add(this.importUnPartitionedTableTask(this.fileIO, sourceHiveTable, paimonTable, rollBack));
            } else {
                tasks.addAll(this.importPartitionedTableTask(this.client, this.fileIO, partitionsNames, sourceHiveTable, paimonTable, rollBack));
            }
            List<Future> futures = tasks.stream().map(FileUtils.COMMON_IO_FORK_JOIN_POOL::submit).collect(Collectors.toList());
            ArrayList commitMessages = new ArrayList();
            try {
                for (Future future : futures) {
                    commitMessages.add(future.get());
                }
            }
            catch (Exception e) {
                futures.forEach(f -> f.cancel(true));
                for (Future future : futures) {
                    while (!future.isDone()) {
                        Thread.sleep(100L);
                    }
                }
                for (Map.Entry entry : rollBack.entrySet()) {
                    Path newPath = (Path)entry.getKey();
                    Path origin = (Path)entry.getValue();
                    if (!this.fileIO.exists(newPath)) continue;
                    this.fileIO.rename(newPath, origin);
                }
                throw new RuntimeException("Migrating failed because exception happens", e);
            }
            try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit();){
                commit.commit(new ArrayList(commitMessages));
            }
        }
        catch (Exception e) {
            if (!alreadyExist) {
                this.hiveCatalog.dropTable(identifier, true);
            }
            throw new RuntimeException("Migrating failed", e);
        }
        this.client.dropTable(this.sourceDatabase, this.sourceTable, true, true);
    }

    public void renameTable(boolean ignoreIfNotExists) throws Exception {
        Identifier targetTableId = Identifier.create((String)this.targetDatabase, (String)this.targetTable);
        Identifier sourceTableId = Identifier.create((String)this.sourceDatabase, (String)this.sourceTable);
        LOG.info("Last step: rename {} to {}.", (Object)targetTableId, (Object)sourceTableId);
        this.hiveCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists);
    }

    private void checkPrimaryKey() throws Exception {
        PrimaryKeysRequest primaryKeysRequest = new PrimaryKeysRequest(this.sourceDatabase, this.sourceTable);
        if (!this.client.getPrimaryKeys(primaryKeysRequest).isEmpty()) {
            throw new IllegalArgumentException("Can't migrate primary key table yet.");
        }
    }

    private void checkPaimonTable(FileStoreTable paimonTable) {
        if (paimonTable.primaryKeys().size() > 0) {
            throw new IllegalArgumentException("Hive migrator only support append only table target table");
        }
        if (paimonTable.store().bucketMode() != BucketMode.UNAWARE) {
            throw new IllegalArgumentException("Hive migrator only support unaware-bucket target table");
        }
    }

    public Schema from(List<FieldSchema> fields, List<FieldSchema> partitionFields, Map<String, String> hiveTableOptions) {
        HashMap<String, String> paimonOptions = new HashMap<String, String>(this.options);
        paimonOptions.put(CoreOptions.BUCKET.key(), "-1");
        if (hiveTableOptions.get("comment") != null) {
            paimonOptions.put("hive.comment", hiveTableOptions.get("comment"));
        }
        Schema.Builder schemaBuilder = Schema.newBuilder().comment(hiveTableOptions.get("comment")).options(paimonOptions).partitionKeys(partitionFields.stream().map(FieldSchema::getName).collect(Collectors.toList()));
        fields.forEach(field -> schemaBuilder.column(field.getName(), HiveTypeUtils.toPaimonType(field.getType()), field.getComment()));
        return schemaBuilder.build();
    }

    private List<MigrateTask> importPartitionedTableTask(IMetaStoreClient client, FileIO fileIO, List<String> partitionNames, Table sourceTable, FileStoreTable paimonTable, Map<Path, Path> rollback) throws Exception {
        ArrayList<MigrateTask> migrateTasks = new ArrayList<MigrateTask>();
        ArrayList valueSetters = new ArrayList();
        RowType partitionRowType = paimonTable.schema().projectedLogicalRowType(paimonTable.schema().partitionKeys());
        partitionRowType.getFieldTypes().forEach(type -> valueSetters.add(BinaryWriter.createValueSetter((DataType)type)));
        for (String partitionName : partitionNames) {
            Partition partition = client.getPartition(sourceTable.getDbName(), sourceTable.getTableName(), partitionName);
            Map values = client.partitionNameToSpec(partitionName);
            String format = this.parseFormat(partition.getSd().getSerdeInfo().toString());
            String location = partition.getSd().getLocation();
            BinaryRow partitionRow = FileMetaUtils.writePartitionValue((RowType)partitionRowType, (Map)values, valueSetters);
            Path path = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
            migrateTasks.add(new MigrateTask(fileIO, format, location, paimonTable, partitionRow, path, rollback));
        }
        return migrateTasks;
    }

    public MigrateTask importUnPartitionedTableTask(FileIO fileIO, Table sourceTable, FileStoreTable paimonTable, Map<Path, Path> rollback) {
        String format = this.parseFormat(sourceTable.getSd().getSerdeInfo().toString());
        String location = sourceTable.getSd().getLocation();
        Path path = paimonTable.store().pathFactory().bucketPath(BinaryRow.EMPTY_ROW, 0);
        return new MigrateTask(fileIO, format, location, paimonTable, BinaryRow.EMPTY_ROW, path, rollback);
    }

    private void checkCompatible(Table sourceHiveTable, FileStoreTable paimonTable) {
        ArrayList<FieldSchema> sourceFields = new ArrayList<FieldSchema>(sourceHiveTable.getPartitionKeys());
        ArrayList<DataField> targetFields = new ArrayList<DataField>(paimonTable.schema().projectedLogicalRowType(paimonTable.partitionKeys()).getFields());
        if (sourceFields.size() != targetFields.size()) {
            throw new RuntimeException("Source table partition keys not match target table partition keys.");
        }
        sourceFields.sort(Comparator.comparing(FieldSchema::getName));
        targetFields.sort(Comparator.comparing(DataField::name));
        for (int i = 0; i < sourceFields.size(); ++i) {
            FieldSchema s = (FieldSchema)sourceFields.get(i);
            DataField t = (DataField)targetFields.get(i);
            if (s.getName().equals(t.name()) && s.getType().equalsIgnoreCase(t.type().asSQLString())) continue;
            throw new RuntimeException("Source table partition keys not match target table partition keys, please checkCompatible.");
        }
    }

    private String parseFormat(String serder) {
        if (serder.contains("avro")) {
            return "avro";
        }
        if (serder.contains("parquet")) {
            return "parquet";
        }
        if (serder.contains("orc")) {
            return "orc";
        }
        throw new UnsupportedOperationException("Unknown partition format: " + serder);
    }

    public static class MigrateTask
    implements Callable<CommitMessage> {
        private final FileIO fileIO;
        private final String format;
        private final String location;
        private final FileStoreTable paimonTable;
        private final BinaryRow partitionRow;
        private final Path newDir;
        private final Map<Path, Path> rollback;

        public MigrateTask(FileIO fileIO, String format, String location, FileStoreTable paimonTable, BinaryRow partitionRow, Path newDir, Map<Path, Path> rollback) {
            this.fileIO = fileIO;
            this.format = format;
            this.location = location;
            this.paimonTable = paimonTable;
            this.partitionRow = partitionRow;
            this.newDir = newDir;
            this.rollback = rollback;
        }

        @Override
        public CommitMessage call() throws Exception {
            if (!this.fileIO.exists(this.newDir)) {
                this.fileIO.mkdirs(this.newDir);
            }
            List fileMetas = FileMetaUtils.construct((FileIO)this.fileIO, (String)this.format, (String)this.location, (org.apache.paimon.table.Table)this.paimonTable, (Predicate)HIDDEN_PATH_FILTER, (Path)this.newDir, this.rollback);
            return FileMetaUtils.commitFile((BinaryRow)this.partitionRow, (List)fileMetas);
        }
    }
}

