/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.logical;

import java.util.Arrays;
import java.util.Collections;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rex.RexLiteral;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.plan.stats.TableStats;
import org.apache.flink.table.planner.plan.abilities.source.LimitPushDownSpec;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilityContext;
import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort;
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalTableSourceScan;
import org.apache.flink.table.planner.plan.schema.TableSourceTable;
import org.apache.flink.table.planner.plan.stats.FlinkStatistic;

public class PushLimitIntoTableSourceScanRule
extends RelOptRule {
    public static final PushLimitIntoTableSourceScanRule INSTANCE = new PushLimitIntoTableSourceScanRule();

    public PushLimitIntoTableSourceScanRule() {
        super(PushLimitIntoTableSourceScanRule.operand(FlinkLogicalSort.class, PushLimitIntoTableSourceScanRule.operand(FlinkLogicalTableSourceScan.class, PushLimitIntoTableSourceScanRule.none()), new RelOptRuleOperand[0]), "PushLimitIntoTableSourceScanRule");
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        Sort sort = (Sort)call.rel(0);
        TableSourceTable tableSourceTable = call.rel(1).getTable().unwrap(TableSourceTable.class);
        boolean onlyLimit = sort.getCollation().getFieldCollations().isEmpty() && sort.fetch != null;
        return onlyLimit && tableSourceTable != null && tableSourceTable.tableSource() instanceof SupportsLimitPushDown && Arrays.stream(tableSourceTable.abilitySpecs()).noneMatch(spec -> spec instanceof LimitPushDownSpec);
    }

    @Override
    public void onMatch(RelOptRuleCall call) {
        Sort sort = (Sort)call.rel(0);
        FlinkLogicalTableSourceScan scan = (FlinkLogicalTableSourceScan)call.rel(1);
        int offset = sort.offset == null ? 0 : RexLiteral.intValue(sort.offset);
        int limit = offset + RexLiteral.intValue(sort.fetch);
        TableSourceTable newTableSourceTable = this.applyLimit(limit, scan);
        FlinkLogicalTableSourceScan newScan = FlinkLogicalTableSourceScan.create(scan.getCluster(), scan.getHints(), newTableSourceTable);
        RelNode newSort = sort.copy(sort.getTraitSet(), Collections.singletonList(newScan));
        call.transformTo(newSort);
    }

    private TableSourceTable applyLimit(long limit, FlinkLogicalTableSourceScan scan) {
        TableSourceTable relOptTable = scan.getTable().unwrap(TableSourceTable.class);
        TableSourceTable oldTableSourceTable = relOptTable.unwrap(TableSourceTable.class);
        DynamicTableSource newTableSource = oldTableSourceTable.tableSource().copy();
        LimitPushDownSpec limitPushDownSpec = new LimitPushDownSpec(limit);
        limitPushDownSpec.apply(newTableSource, SourceAbilityContext.from(scan));
        FlinkStatistic statistic2 = relOptTable.getStatistic();
        long newRowCount = statistic2.getRowCount() != null ? Math.min(limit, statistic2.getRowCount().longValue()) : limit;
        TableStats newTableStats = new TableStats(newRowCount);
        FlinkStatistic newStatistic = FlinkStatistic.builder().statistic(statistic2).tableStats(newTableStats).build();
        return oldTableSourceTable.copy(newTableSource, newStatistic, new String[]{"limit=[" + limit + "]"}, new SourceAbilitySpec[]{limitPushDownSpec});
    }
}

