/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.flink.source.ConsumerProgressCalculator;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
import org.apache.paimon.flink.source.ReaderConsumeProgressEvent;
import org.apache.paimon.flink.source.assigners.FIFOSplitAssigner;
import org.apache.paimon.flink.source.assigners.PreAssignSplitAssigner;
import org.apache.paimon.flink.source.assigners.SplitAssigner;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.SnapshotNotExistPlan;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ContinuousFileSplitEnumerator
implements SplitEnumerator<FileStoreSourceSplit, PendingSplitsCheckpoint> {
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileSplitEnumerator.class);
    protected final SplitEnumeratorContext<FileStoreSourceSplit> context;
    protected final long discoveryInterval;
    protected final Set<Integer> readersAwaitingSplit;
    protected final FileStoreSourceSplitGenerator splitGenerator;
    protected final StreamTableScan scan;
    protected final SplitAssigner splitAssigner;
    protected final ConsumerProgressCalculator consumerProgressCalculator;
    private final int splitMaxNum;
    @Nullable
    protected Long nextSnapshotId;
    protected boolean finished = false;
    private boolean stopTriggerScan = false;

    public ContinuousFileSplitEnumerator(SplitEnumeratorContext<FileStoreSourceSplit> context, Collection<FileStoreSourceSplit> remainSplits, @Nullable Long nextSnapshotId, long discoveryInterval, StreamTableScan scan, BucketMode bucketMode, int splitMaxPerTask) {
        Preconditions.checkArgument(discoveryInterval > 0L);
        this.context = Preconditions.checkNotNull(context);
        this.nextSnapshotId = nextSnapshotId;
        this.discoveryInterval = discoveryInterval;
        this.readersAwaitingSplit = new LinkedHashSet<Integer>();
        this.splitGenerator = new FileStoreSourceSplitGenerator();
        this.scan = scan;
        this.splitAssigner = this.createSplitAssigner(bucketMode);
        this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
        this.addSplits(remainSplits);
        this.consumerProgressCalculator = new ConsumerProgressCalculator(context.currentParallelism());
    }

    @VisibleForTesting
    void enableTriggerScan() {
        this.stopTriggerScan = false;
    }

    protected void addSplits(Collection<FileStoreSourceSplit> splits) {
        splits.forEach(this::addSplit);
    }

    private void addSplit(FileStoreSourceSplit split) {
        this.splitAssigner.addSplit(this.assignSuggestedTask(split), split);
    }

    public void start() {
        this.context.callAsync(this::scanNextSnapshot, this::processDiscoveredSplits, 0L, this.discoveryInterval);
    }

    public void close() throws IOException {
    }

    public void addReader(int subtaskId) {
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        this.readersAwaitingSplit.add(subtaskId);
        this.assignSplits();
        if (this.readersAwaitingSplit.contains(subtaskId)) {
            if (this.stopTriggerScan) {
                return;
            }
            this.stopTriggerScan = true;
            this.context.callAsync(this::scanNextSnapshot, this::processDiscoveredSplits);
        }
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof ReaderConsumeProgressEvent) {
            this.consumerProgressCalculator.updateConsumeProgress(subtaskId, (ReaderConsumeProgressEvent)sourceEvent);
        } else {
            LOG.error("Received unrecognized event: {}", (Object)sourceEvent);
        }
    }

    public void addSplitsBack(List<FileStoreSourceSplit> splits, int subtaskId) {
        LOG.debug("File Source Enumerator adds splits back: {}", splits);
        this.splitAssigner.addSplitsBack(subtaskId, splits);
    }

    public PendingSplitsCheckpoint snapshotState(long checkpointId) throws Exception {
        ArrayList<FileStoreSourceSplit> splits = new ArrayList<FileStoreSourceSplit>(this.splitAssigner.remainingSplits());
        PendingSplitsCheckpoint checkpoint = new PendingSplitsCheckpoint(splits, this.nextSnapshotId);
        this.consumerProgressCalculator.notifySnapshotState(checkpointId, this.readersAwaitingSplit, subtask -> this.splitAssigner.getNextSnapshotId((int)subtask).orElse(this.nextSnapshotId), this.context.currentParallelism());
        LOG.debug("Source Checkpoint is {}", (Object)checkpoint);
        return checkpoint;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.consumerProgressCalculator.notifyCheckpointComplete(checkpointId).ifPresent(this.scan::notifyCheckpointComplete);
    }

    protected synchronized Optional<PlanWithNextSnapshotId> scanNextSnapshot() {
        if (this.splitAssigner.numberOfRemainingSplits() >= this.splitMaxNum) {
            return Optional.empty();
        }
        TableScan.Plan plan = this.scan.plan();
        Long nextSnapshotId = this.scan.checkpoint();
        return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
    }

    protected void processDiscoveredSplits(Optional<PlanWithNextSnapshotId> planWithNextSnapshotIdOptional, Throwable error) {
        if (error != null) {
            if (!(error instanceof EndOfScanException)) {
                LOG.error("Failed to enumerate files", error);
                throw new RuntimeException(error);
            }
            LOG.debug("Catching EndOfStreamException, the stream is finished.");
            this.finished = true;
            this.assignSplits();
            return;
        }
        if (!planWithNextSnapshotIdOptional.isPresent()) {
            return;
        }
        PlanWithNextSnapshotId planWithNextSnapshotId = planWithNextSnapshotIdOptional.get();
        this.nextSnapshotId = planWithNextSnapshotId.nextSnapshotId;
        TableScan.Plan plan = planWithNextSnapshotId.plan;
        if (plan.equals(SnapshotNotExistPlan.INSTANCE)) {
            this.stopTriggerScan = true;
            return;
        }
        this.stopTriggerScan = false;
        if (plan.splits().isEmpty()) {
            return;
        }
        this.addSplits(this.splitGenerator.createSplits(plan));
        this.assignSplits();
    }

    protected synchronized void assignSplits() {
        HashMap<Integer, List<FileStoreSourceSplit>> assignment = new HashMap<Integer, List<FileStoreSourceSplit>>();
        Iterator<Integer> readersAwait = this.readersAwaitingSplit.iterator();
        Set subtaskIds = this.context.registeredReaders().keySet();
        while (readersAwait.hasNext()) {
            Integer task = readersAwait.next();
            if (!subtaskIds.contains(task)) {
                readersAwait.remove();
                continue;
            }
            List<FileStoreSourceSplit> splits = this.splitAssigner.getNext(task, null);
            if (splits.isEmpty()) continue;
            assignment.put(task, splits);
            this.consumerProgressCalculator.updateAssignInformation(task, splits.get(0));
        }
        if (this.noMoreSplits()) {
            Iterator<Integer> iterator = this.readersAwaitingSplit.iterator();
            while (iterator.hasNext()) {
                Integer reader = iterator.next();
                if (assignment.containsKey(reader)) continue;
                this.context.signalNoMoreSplits(reader.intValue());
                iterator.remove();
            }
        }
        assignment.keySet().forEach(this.readersAwaitingSplit::remove);
        this.context.assignSplits(new SplitsAssignment(assignment));
    }

    protected int assignSuggestedTask(FileStoreSourceSplit split) {
        return ((DataSplit)split.split()).bucket() % this.context.currentParallelism();
    }

    protected SplitAssigner createSplitAssigner(BucketMode bucketMode) {
        return bucketMode == BucketMode.UNAWARE ? new FIFOSplitAssigner(Collections.emptyList()) : new PreAssignSplitAssigner(1, this.context, Collections.emptyList());
    }

    protected boolean noMoreSplits() {
        return this.finished;
    }

    protected static class PlanWithNextSnapshotId {
        private final TableScan.Plan plan;
        private final Long nextSnapshotId;

        public PlanWithNextSnapshotId(TableScan.Plan plan, Long nextSnapshotId) {
            this.plan = plan;
            this.nextSnapshotId = nextSnapshotId;
        }

        public TableScan.Plan plan() {
            return this.plan;
        }

        public Long nextSnapshotId() {
            return this.nextSnapshotId;
        }
    }
}

