/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PartitionPredicateVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlinkTableSource {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkTableSource.class);
    protected static final String FLINK_INFER_SCAN_PARALLELISM = String.format("%s%s", "paimon.", FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key());
    protected final Table table;
    @Nullable
    protected Predicate predicate;
    @Nullable
    protected int[][] projectFields;
    @Nullable
    protected Long limit;
    protected SplitStatistics splitStatistics;

    public FlinkTableSource(Table table) {
        this(table, null, null, null);
    }

    public FlinkTableSource(Table table, @Nullable Predicate predicate, @Nullable int[][] projectFields, @Nullable Long limit) {
        this.table = table;
        this.predicate = predicate;
        this.projectFields = projectFields;
        this.limit = limit;
    }

    public List<ResolvedExpression> pushFilters(List<ResolvedExpression> filters) {
        List<String> partitionKeys = this.table.partitionKeys();
        RowType rowType = LogicalTypeConversion.toLogicalType(this.table.rowType());
        ArrayList<ResolvedExpression> unConsumedFilters = new ArrayList<ResolvedExpression>();
        ArrayList<ResolvedExpression> consumedFilters = new ArrayList<ResolvedExpression>();
        ArrayList<Predicate> converted = new ArrayList<Predicate>();
        PartitionPredicateVisitor visitor = new PartitionPredicateVisitor(partitionKeys);
        for (ResolvedExpression filter : filters) {
            Optional<Predicate> predicateOptional = PredicateConverter.convert(rowType, filter);
            if (!predicateOptional.isPresent()) {
                unConsumedFilters.add(filter);
                continue;
            }
            Predicate p = predicateOptional.get();
            if (this.isStreaming() || !p.visit(visitor).booleanValue()) {
                unConsumedFilters.add(filter);
            } else {
                consumedFilters.add(filter);
            }
            converted.add(p);
        }
        this.predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted);
        LOG.info("Consumed filters: {} of {}", consumedFilters, filters);
        return unConsumedFilters;
    }

    public void pushProjection(int[][] projectedFields) {
        this.projectFields = projectedFields;
    }

    public void pushLimit(long limit) {
        this.limit = limit;
    }

    public abstract ChangelogMode getChangelogMode();

    public abstract ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext var1);

    public abstract void pushWatermark(WatermarkStrategy<RowData> var1);

    public abstract LookupTableSource.LookupRuntimeProvider getLookupRuntimeProvider(LookupTableSource.LookupContext var1);

    public abstract TableStats reportStatistics();

    public abstract FlinkTableSource copy();

    public abstract String asSummaryString();

    public abstract List<String> listAcceptedFilterFields();

    public abstract void applyDynamicFiltering(List<String> var1);

    public abstract boolean isStreaming();

    @Nullable
    protected Integer inferSourceParallelism(StreamExecutionEnvironment env) {
        Integer parallelism;
        Options options = Options.fromMap(this.table.options());
        Configuration envConfig = (Configuration)env.getConfiguration();
        if (envConfig.containsKey(FLINK_INFER_SCAN_PARALLELISM)) {
            options.set(FlinkConnectorOptions.INFER_SCAN_PARALLELISM, Boolean.parseBoolean((String)envConfig.toMap().get(FLINK_INFER_SCAN_PARALLELISM)));
        }
        if ((parallelism = options.get(FlinkConnectorOptions.SCAN_PARALLELISM)) == null && options.get(FlinkConnectorOptions.INFER_SCAN_PARALLELISM).booleanValue()) {
            if (this.isStreaming()) {
                parallelism = Math.max(1, options.get(CoreOptions.BUCKET));
            } else {
                this.scanSplitsForInference();
                parallelism = this.splitStatistics.splitNumber();
                if (null != this.limit && this.limit > 0L) {
                    int limitCount = this.limit >= Integer.MAX_VALUE ? Integer.MAX_VALUE : this.limit.intValue();
                    parallelism = Math.min(parallelism, limitCount);
                }
                parallelism = Math.max(1, parallelism);
                parallelism = Math.min(parallelism, options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
            }
        }
        return parallelism;
    }

    protected void scanSplitsForInference() {
        if (this.splitStatistics == null) {
            List<Split> splits = this.table.newReadBuilder().withFilter(this.predicate).newScan().plan().splits();
            this.splitStatistics = new SplitStatistics(splits);
        }
    }

    public Table getTable() {
        return this.table;
    }

    protected static class SplitStatistics {
        private final int splitNumber;
        private final long totalRowCount;

        protected SplitStatistics(List<Split> splits) {
            this.splitNumber = splits.size();
            this.totalRowCount = splits.stream().mapToLong(Split::rowCount).sum();
        }

        public int splitNumber() {
            return this.splitNumber;
        }

        public long totalRowCount() {
            return this.totalRowCount;
        }
    }
}

