/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.sources;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.FieldsDataType;
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeDefaultVisitor;
import org.apache.flink.table.types.utils.TypeConversions;

@Internal
public class TableSourceValidation {
    public static void validateTableSource(TableSource<?> tableSource) {
        TableSchema schema = tableSource.getTableSchema();
        List<RowtimeAttributeDescriptor> rowtimeAttributes = TableSourceValidation.getRowtimeAttributes(tableSource);
        Optional<String> proctimeAttribute = TableSourceValidation.getProctimeAttribute(tableSource);
        TableSourceValidation.validateSingleRowtimeAttribute(rowtimeAttributes);
        TableSourceValidation.validateRowtimeAttributesExistInSchema(rowtimeAttributes, schema);
        TableSourceValidation.validateProctimeAttributesExistInSchema(proctimeAttribute, schema);
        TableSourceValidation.validateLogicalToPhysicalMapping(tableSource, schema, rowtimeAttributes, proctimeAttribute);
        TableSourceValidation.validateTimestampExtractorArguments(rowtimeAttributes, tableSource);
        TableSourceValidation.validateNotOverlapping(rowtimeAttributes, proctimeAttribute);
    }

    public static boolean hasRowtimeAttribute(TableSource<?> tableSource) {
        return !TableSourceValidation.getRowtimeAttributes(tableSource).isEmpty();
    }

    private static void validateSingleRowtimeAttribute(List<RowtimeAttributeDescriptor> rowtimeAttributes) {
        if (rowtimeAttributes.size() > 1) {
            throw new ValidationException("Currently, only a single rowtime attribute is supported. Please remove all but one RowtimeAttributeDescriptor.");
        }
    }

    private static void validateRowtimeAttributesExistInSchema(List<RowtimeAttributeDescriptor> rowtimeAttributes, TableSchema tableSchema) {
        rowtimeAttributes.forEach(r -> {
            if (!tableSchema.getFieldDataType(r.getAttributeName()).isPresent()) {
                throw new ValidationException(String.format("Found a rowtime attribute for field '%s' but it does not exist in the Table. TableSchema: %s", r.getAttributeName(), tableSchema));
            }
        });
    }

    private static void validateProctimeAttributesExistInSchema(Optional<String> proctimeAttribute, TableSchema tableSchema) {
        proctimeAttribute.ifPresent(r -> {
            if (!tableSchema.getFieldDataType((String)r).isPresent()) {
                throw new ValidationException(String.format("Found a proctime attribute for field '%s' but it does not exist in the Table. TableSchema: %s", r, tableSchema));
            }
        });
    }

    private static void validateNotOverlapping(List<RowtimeAttributeDescriptor> rowtimeAttributes, Optional<String> proctimeAttribute) {
        proctimeAttribute.ifPresent(proctime -> {
            if (rowtimeAttributes.stream().anyMatch(rowtimeAttribute -> rowtimeAttribute.getAttributeName().equals(proctime))) {
                throw new ValidationException(String.format("Field '%s' must not be processing time and rowtime attribute at the same time.", proctime));
            }
        });
    }

    private static void validateLogicalToPhysicalMapping(TableSource<?> tableSource, TableSchema schema, List<RowtimeAttributeDescriptor> rowtimeAttributes, Optional<String> proctimeAttribute) {
        int mappedFieldCnt = 0;
        for (int i = 0; i < schema.getFieldCount(); ++i) {
            DataType fieldType = schema.getFieldDataType(i).get();
            LogicalType logicalFieldType = fieldType.getLogicalType();
            String fieldName = schema.getFieldName(i).get();
            if (proctimeAttribute.map(p -> p.equals(fieldName)).orElse(false).booleanValue()) {
                if (LogicalTypeChecks.hasFamily(logicalFieldType, LogicalTypeFamily.TIMESTAMP)) continue;
                throw new ValidationException(String.format("Processing time field '%s' has invalid type %s. Processing time attributes must be of type SQL_TIMESTAMP.", fieldName, logicalFieldType));
            }
            if (rowtimeAttributes.stream().anyMatch(p -> p.getAttributeName().equals(fieldName))) {
                if (LogicalTypeChecks.hasFamily(logicalFieldType, LogicalTypeFamily.TIMESTAMP)) continue;
                throw new ValidationException(String.format("Rowtime time field '%s' has invalid type %s. Rowtime time attributes must be of type SQL_TIMESTAMP.", fieldName, logicalFieldType));
            }
            TableSourceValidation.validateLogicalTypeEqualsPhysical(fieldName, fieldType, tableSource);
            ++mappedFieldCnt;
        }
        DataType producedDataType = tableSource.getProducedDataType();
        if (!TableSourceValidation.isCompositeType(producedDataType) && mappedFieldCnt > 1) {
            throw new ValidationException(String.format("More than one table field matched to atomic input type %s.", producedDataType));
        }
    }

    private static boolean isCompositeType(DataType producedDataType) {
        LogicalType logicalType = producedDataType.getLogicalType();
        return producedDataType instanceof FieldsDataType || logicalType instanceof LegacyTypeInformationType && ((LegacyTypeInformationType)logicalType).getTypeInformation() instanceof CompositeType;
    }

    private static void validateLogicalTypeEqualsPhysical(String fieldName, DataType logicalType, TableSource<?> tableSource) {
        ResolvedField resolvedField = TableSourceValidation.resolveField(fieldName, tableSource);
        if (!resolvedField.getType().equals(logicalType)) {
            throw new ValidationException(String.format("Type %s of table field '%s' does not match with type '%s; of the field '%s' of the TableSource return type.", logicalType, resolvedField.getType(), fieldName, resolvedField.getType()));
        }
    }

    private static void validateTimestampExtractorArguments(List<RowtimeAttributeDescriptor> descriptors, TableSource<?> tableSource) {
        if (descriptors.size() == 1) {
            RowtimeAttributeDescriptor descriptor = descriptors.get(0);
            String[] extractorInputFields = descriptor.getTimestampExtractor().getArgumentFields();
            TypeInformation[] physicalTypes = (TypeInformation[])Arrays.stream(extractorInputFields).map(fieldName -> TableSourceValidation.resolveField(fieldName, tableSource)).map(resolvedField -> TypeConversions.fromDataTypeToLegacyInfo(resolvedField.getType())).toArray(TypeInformation[]::new);
            descriptor.getTimestampExtractor().validateArgumentFields(physicalTypes);
        }
    }

    private static ResolvedField resolveField(String fieldName, TableSource<?> tableSource) {
        Map<String, String> fieldMapping;
        DataType producedDataType = tableSource.getProducedDataType();
        if (tableSource instanceof DefinedFieldMapping && (fieldMapping = ((DefinedFieldMapping)((Object)tableSource)).getFieldMapping()) != null) {
            String resolvedFieldName = fieldMapping.get(fieldName);
            if (resolvedFieldName == null) {
                throw new ValidationException(String.format("Field '%s' could not be resolved by the field mapping.", fieldName));
            }
            return new ResolvedField(resolvedFieldName, TableSourceValidation.lookupFieldType(producedDataType, resolvedFieldName, String.format("Table field '%s' was resolved to TableSource return type field '%s', but field '%s' was not found in the return type %s of the TableSource. Please verify the field mapping of the TableSource.", fieldName, resolvedFieldName, resolvedFieldName, producedDataType)));
        }
        return new ResolvedField(fieldName, TableSourceValidation.lookupFieldType(producedDataType, fieldName, String.format("Table field '%s' was not found in the return type %s of the TableSource.", fieldName, producedDataType)));
    }

    private static DataType lookupFieldType(DataType inputType, String fieldName, String failMsg) {
        return inputType.accept(new TypeExtractor(fieldName)).orElseThrow(() -> new ValidationException(failMsg));
    }

    private static List<RowtimeAttributeDescriptor> getRowtimeAttributes(TableSource<?> tableSource) {
        if (tableSource instanceof DefinedRowtimeAttributes) {
            return ((DefinedRowtimeAttributes)((Object)tableSource)).getRowtimeAttributeDescriptors();
        }
        return Collections.emptyList();
    }

    private static Optional<String> getProctimeAttribute(TableSource<?> tableSource) {
        if (tableSource instanceof DefinedProctimeAttribute) {
            return Optional.ofNullable(((DefinedProctimeAttribute)((Object)tableSource)).getProctimeAttribute());
        }
        return Optional.empty();
    }

    private TableSourceValidation() {
    }

    private static class TypeExtractor
    extends DataTypeDefaultVisitor<Optional<DataType>> {
        private final String fieldName;

        TypeExtractor(String fieldName) {
            this.fieldName = fieldName;
        }

        @Override
        public Optional<DataType> visit(AtomicDataType atomicDataType) {
            LegacyTypeInformationType legacyTypeInformationType;
            TypeInformation typeInformation;
            LogicalType logicalType = atomicDataType.getLogicalType();
            if (logicalType instanceof LegacyTypeInformationType && (typeInformation = (legacyTypeInformationType = (LegacyTypeInformationType)logicalType).getTypeInformation()) instanceof CompositeType) {
                CompositeType compositeType = (CompositeType)typeInformation;
                return Optional.of(TypeConversions.fromLegacyInfoToDataType(compositeType.getTypeAt(this.fieldName)));
            }
            return Optional.of(atomicDataType);
        }

        @Override
        public Optional<DataType> visit(FieldsDataType fieldsDataType) {
            return Optional.ofNullable(fieldsDataType.getFieldDataTypes().get(this.fieldName));
        }

        @Override
        protected Optional<DataType> defaultMethod(DataType dataType) {
            return Optional.of(dataType);
        }
    }

    private static class ResolvedField {
        private final String name;
        private final DataType type;

        private ResolvedField(String name, DataType type) {
            this.type = type;
            this.name = name;
        }

        public DataType getType() {
            return this.type;
        }

        public String getName() {
            return this.name;
        }
    }
}

