/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.assigners;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.source.DataSplit;
import org.slf4j.Logger;

public class DynamicPartitionPruningAssigner
implements SplitAssigner {
    private final SplitAssigner innerAssigner;
    private final Projection partitionRowProjection;
    private final DynamicFilteringData dynamicFilteringData;

    public DynamicPartitionPruningAssigner(SplitAssigner innerAssigner, Projection partitionRowProjection, DynamicFilteringData dynamicFilteringData) {
        this.innerAssigner = innerAssigner;
        this.partitionRowProjection = partitionRowProjection;
        this.dynamicFilteringData = dynamicFilteringData;
    }

    @Override
    public List<FileStoreSourceSplit> getNext(int subtask, @Nullable String hostname) {
        List<FileStoreSourceSplit> sourceSplits = this.innerAssigner.getNext(subtask, hostname);
        while (!sourceSplits.isEmpty()) {
            List<FileStoreSourceSplit> filtered = sourceSplits.stream().filter(this::filter).collect(Collectors.toList());
            if (!filtered.isEmpty()) {
                return filtered;
            }
            sourceSplits = this.innerAssigner.getNext(subtask, hostname);
        }
        return Collections.emptyList();
    }

    @Override
    public void addSplit(int suggestedTask, FileStoreSourceSplit splits) {
        if (this.filter(splits)) {
            this.innerAssigner.addSplit(suggestedTask, splits);
        }
    }

    @Override
    public void addSplitsBack(int subtask, List<FileStoreSourceSplit> splits) {
        this.innerAssigner.addSplitsBack(subtask, splits);
    }

    @Override
    public Collection<FileStoreSourceSplit> remainingSplits() {
        return this.innerAssigner.remainingSplits().stream().filter(this::filter).collect(Collectors.toList());
    }

    public static SplitAssigner createDynamicPartitionPruningAssignerIfNeeded(int subtaskId, SplitAssigner oriAssigner, Projection partitionRowProjection, SourceEvent sourceEvent, Logger logger) {
        DynamicFilteringData dynamicFilteringData = ((DynamicFilteringEvent)sourceEvent).getData();
        logger.info("Received DynamicFilteringEvent: {}, is filtering: {}.", (Object)subtaskId, (Object)dynamicFilteringData.isFiltering());
        return dynamicFilteringData.isFiltering() ? new DynamicPartitionPruningAssigner(oriAssigner, partitionRowProjection, dynamicFilteringData) : oriAssigner;
    }

    @Override
    public Optional<Long> getNextSnapshotId(int subtask) {
        return this.innerAssigner.getNextSnapshotId(subtask);
    }

    @Override
    public int numberOfRemainingSplits() {
        return this.innerAssigner.numberOfRemainingSplits();
    }

    private boolean filter(FileStoreSourceSplit sourceSplit) {
        DataSplit dataSplit = (DataSplit)sourceSplit.split();
        BinaryRow partition = dataSplit.partition();
        FlinkRowData projected = new FlinkRowData(this.partitionRowProjection.apply(partition));
        return this.dynamicFilteringData.contains((RowData)projected);
    }
}

