/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.sources;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.RowKind;

@Internal
public final class DynamicSourceUtils {
    public static void prepareDynamicSource(ObjectIdentifier sourceIdentifier, CatalogTable table, DynamicTableSource source, boolean isStreamingMode, TableConfig config) {
        TableSchema schema = table.getSchema();
        DynamicSourceUtils.validateAndApplyMetadata(sourceIdentifier, schema, source);
        if (source instanceof ScanTableSource) {
            DynamicSourceUtils.validateScanSource(sourceIdentifier, schema, (ScanTableSource)source, isStreamingMode, config);
        }
    }

    public static List<String> createRequiredMetadataKeys(TableSchema schema, DynamicTableSource source) {
        List<TableColumn.MetadataColumn> metadataColumns = DynamicSourceUtils.extractMetadataColumns(schema);
        Set requiredMetadataKeys = metadataColumns.stream().map(c -> c.getMetadataAlias().orElse(c.getName())).collect(Collectors.toSet());
        Map<String, DataType> metadataMap = DynamicSourceUtils.extractMetadataMap(source);
        return metadataMap.keySet().stream().filter(requiredMetadataKeys::contains).collect(Collectors.toList());
    }

    public static RowType createProducedType(TableSchema schema, DynamicTableSource source) {
        Map<String, DataType> metadataMap = DynamicSourceUtils.extractMetadataMap(source);
        Stream<RowType.RowField> physicalFields = schema.getTableColumns().stream().filter(TableColumn::isPhysical).map(c -> new RowType.RowField(c.getName(), c.getType().getLogicalType()));
        Stream<RowType.RowField> metadataFields = DynamicSourceUtils.createRequiredMetadataKeys(schema, source).stream().map(k -> new RowType.RowField(k, ((DataType)metadataMap.get(k)).getLogicalType()));
        List rowFields = Stream.concat(physicalFields, metadataFields).collect(Collectors.toList());
        return new RowType(false, rowFields);
    }

    public static boolean isUpsertSource(CatalogTable catalogTable, DynamicTableSource tableSource) {
        if (!(tableSource instanceof ScanTableSource)) {
            return false;
        }
        ChangelogMode mode = ((ScanTableSource)tableSource).getChangelogMode();
        boolean isUpsertMode = mode.contains(RowKind.UPDATE_AFTER) && !mode.contains(RowKind.UPDATE_BEFORE);
        boolean hasPrimaryKey = catalogTable.getSchema().getPrimaryKey().isPresent();
        return isUpsertMode && hasPrimaryKey;
    }

    public static boolean isSourceChangeEventsDuplicate(CatalogTable catalogTable, DynamicTableSource tableSource, TableConfig config) {
        if (!(tableSource instanceof ScanTableSource)) {
            return false;
        }
        ChangelogMode mode = ((ScanTableSource)tableSource).getChangelogMode();
        boolean isCDCSource = !mode.containsOnly(RowKind.INSERT) && !DynamicSourceUtils.isUpsertSource(catalogTable, tableSource);
        boolean changeEventsDuplicate = config.getConfiguration().getBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE);
        boolean hasPrimaryKey = catalogTable.getSchema().getPrimaryKey().isPresent();
        return isCDCSource && changeEventsDuplicate && hasPrimaryKey;
    }

    private static Map<String, DataType> extractMetadataMap(DynamicTableSource source) {
        if (source instanceof SupportsReadingMetadata) {
            return ((SupportsReadingMetadata)source).listReadableMetadata();
        }
        return Collections.emptyMap();
    }

    private static List<TableColumn.MetadataColumn> extractMetadataColumns(TableSchema schema) {
        return schema.getTableColumns().stream().filter(TableColumn.MetadataColumn.class::isInstance).map(TableColumn.MetadataColumn.class::cast).collect(Collectors.toList());
    }

    private static void validateAndApplyMetadata(ObjectIdentifier sourceIdentifier, TableSchema schema, DynamicTableSource source) {
        List<TableColumn.MetadataColumn> metadataColumns = DynamicSourceUtils.extractMetadataColumns(schema);
        if (metadataColumns.isEmpty()) {
            return;
        }
        if (!(source instanceof SupportsReadingMetadata)) {
            throw new ValidationException(String.format("Table '%s' declares metadata columns, but the underlying %s doesn't implement the %s interface. Therefore, metadata cannot be read from the given source.", source.asSummaryString(), DynamicTableSource.class.getSimpleName(), SupportsReadingMetadata.class.getSimpleName()));
        }
        SupportsReadingMetadata metadataSource = (SupportsReadingMetadata)source;
        Map metadataMap = metadataSource.listReadableMetadata();
        metadataColumns.forEach(c -> {
            String metadataKey = c.getMetadataAlias().orElse(c.getName());
            LogicalType metadataType = c.getType().getLogicalType();
            DataType expectedMetadataDataType = (DataType)metadataMap.get(metadataKey);
            if (expectedMetadataDataType == null) {
                throw new ValidationException(String.format("Invalid metadata key '%s' in column '%s' of table '%s'. The %s class '%s' supports the following metadata keys for reading:\n%s", metadataKey, c.getName(), sourceIdentifier.asSummaryString(), DynamicTableSource.class.getSimpleName(), source.getClass().getName(), String.join((CharSequence)"\n", metadataMap.keySet())));
            }
            if (!LogicalTypeCasts.supportsExplicitCast((LogicalType)expectedMetadataDataType.getLogicalType(), (LogicalType)metadataType)) {
                if (metadataKey.equals(c.getName())) {
                    throw new ValidationException(String.format("Invalid data type for metadata column '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable from metadata type '%s'.", c.getName(), sourceIdentifier.asSummaryString(), expectedMetadataDataType.getLogicalType(), metadataType));
                }
                throw new ValidationException(String.format("Invalid data type for metadata column '%s' with metadata key '%s' of table '%s'. The column cannot be declared as '%s' because the type must be castable from metadata type '%s'.", c.getName(), metadataKey, sourceIdentifier.asSummaryString(), expectedMetadataDataType.getLogicalType(), metadataType));
            }
        });
        metadataSource.applyReadableMetadata(DynamicSourceUtils.createRequiredMetadataKeys(schema, source), TypeConversions.fromLogicalToDataType((LogicalType)DynamicSourceUtils.createProducedType(schema, source)));
    }

    private static void validateScanSource(ObjectIdentifier sourceIdentifier, TableSchema schema, ScanTableSource scanSource, boolean isStreamingMode, TableConfig config) {
        ScanTableSource.ScanRuntimeProvider provider = scanSource.getScanRuntimeProvider((ScanTableSource.ScanContext)ScanRuntimeProviderContext.INSTANCE);
        ChangelogMode changelogMode = scanSource.getChangelogMode();
        DynamicSourceUtils.validateWatermarks(sourceIdentifier, schema);
        if (isStreamingMode) {
            DynamicSourceUtils.validateScanSourceForStreaming(sourceIdentifier, schema, scanSource, changelogMode, config);
        } else {
            DynamicSourceUtils.validateScanSourceForBatch(sourceIdentifier, changelogMode, provider);
        }
    }

    private static void validateScanSourceForStreaming(ObjectIdentifier sourceIdentifier, TableSchema schema, ScanTableSource scanSource, ChangelogMode changelogMode, TableConfig config) {
        boolean hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE);
        boolean hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER);
        if (!hasUpdateBefore && hasUpdateAfter) {
            if (!schema.getPrimaryKey().isPresent()) {
                throw new TableException(String.format("Table '%s' produces a changelog stream contains UPDATE_AFTER, no UPDATE_BEFORE. This requires to define primary key constraint on the table.", sourceIdentifier.asSummaryString()));
            }
        } else {
            boolean changeEventsDuplicate;
            if (hasUpdateBefore && !hasUpdateAfter) {
                throw new ValidationException(String.format("Invalid source for table '%s'. A %s doesn't support a changelog which contains UPDATE_BEFORE but no UPDATE_AFTER. Please adapt the implementation of class '%s'.", sourceIdentifier.asSummaryString(), ScanTableSource.class.getSimpleName(), scanSource.getClass().getName()));
            }
            if (!changelogMode.containsOnly(RowKind.INSERT) && (changeEventsDuplicate = config.getConfiguration().getBoolean(ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE)) && !schema.getPrimaryKey().isPresent()) {
                throw new TableException(String.format("Configuration '%s' is enabled which requires the changelog sources to define a PRIMARY KEY. However, table '%s' doesn't have a primary key.", ExecutionConfigOptions.TABLE_EXEC_SOURCE_CDC_EVENTS_DUPLICATE.key(), sourceIdentifier.asSummaryString()));
            }
        }
    }

    private static void validateScanSourceForBatch(ObjectIdentifier sourceIdentifier, ChangelogMode changelogMode, ScanTableSource.ScanRuntimeProvider provider) {
        if (!provider.isBounded()) {
            throw new ValidationException(String.format("Querying an unbounded table '%s' in batch mode is not allowed. The table source is unbounded.", sourceIdentifier.asSummaryString()));
        }
        if (!changelogMode.containsOnly(RowKind.INSERT)) {
            throw new TableException(String.format("Querying a table in batch mode is currently only possible for INSERT-only table sources. But the source for table '%s' produces other changelog messages than just INSERT.", sourceIdentifier.asSummaryString()));
        }
    }

    private static void validateWatermarks(ObjectIdentifier sourceIdentifier, TableSchema schema) {
        if (schema.getWatermarkSpecs().isEmpty()) {
            return;
        }
        if (schema.getWatermarkSpecs().size() > 1) {
            throw new TableException(String.format("Currently only at most one WATERMARK declaration is supported for table '%s'.", sourceIdentifier.asSummaryString()));
        }
        String rowtimeAttribute = ((WatermarkSpec)schema.getWatermarkSpecs().get(0)).getRowtimeAttribute();
        if (rowtimeAttribute.contains(".")) {
            throw new TableException(String.format("A nested field '%s' cannot be declared as rowtime attribute for table '%s' right now.", rowtimeAttribute, sourceIdentifier.asSummaryString()));
        }
    }

    private DynamicSourceUtils() {
    }
}

