/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.source.operator;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.source.operator.MultiTablesCompactorSourceFunction;
import org.apache.paimon.flink.source.operator.MultiTablesReadOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTablesStreamingCompactorSourceFunction
extends MultiTablesCompactorSourceFunction {
    private static final Logger LOG = LoggerFactory.getLogger(MultiTablesStreamingCompactorSourceFunction.class);

    public MultiTablesStreamingCompactorSourceFunction(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, long monitorInterval) {
        super(catalogLoader, includingPattern, excludingPattern, databasePattern, true, monitorInterval);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<Tuple2<Split, String>> ctx) throws Exception {
        this.ctx = ctx;
        while (this.isRunning) {
            boolean isEmpty;
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                if (!this.isRunning) {
                    return;
                }
                try {
                    this.updateTableMap();
                    ArrayList splits = new ArrayList();
                    for (Map.Entry entry : this.scansMap.entrySet()) {
                        Identifier identifier = (Identifier)entry.getKey();
                        StreamTableScan scan = (StreamTableScan)entry.getValue();
                        splits.addAll(scan.plan().splits().stream().map(split -> new Tuple2(split, (Object)identifier.getFullName())).collect(Collectors.toList()));
                    }
                    isEmpty = splits.isEmpty();
                    splits.forEach(arg_0 -> ctx.collect(arg_0));
                }
                catch (EndOfScanException esf) {
                    LOG.info("Catching EndOfStreamException, the stream is finished.");
                    return;
                }
            }
            if (!isEmpty) continue;
            Thread.sleep(this.monitorInterval);
        }
    }

    public static DataStream<RowData> buildSource(StreamExecutionEnvironment env, String name, TypeInformation<RowData> typeInfo, Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, long monitorInterval) {
        MultiTablesStreamingCompactorSourceFunction function = new MultiTablesStreamingCompactorSourceFunction(catalogLoader, includingPattern, excludingPattern, databasePattern, monitorInterval);
        StreamSource sourceOperator = new StreamSource((SourceFunction)function);
        boolean isParallel = false;
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(new TypeInformation[]{new JavaTypeInfo<Split>(Split.class), BasicTypeInfo.STRING_TYPE_INFO});
        return new DataStreamSource(env, (TypeInformation)tupleTypeInfo, sourceOperator, isParallel, name, Boundedness.CONTINUOUS_UNBOUNDED).forceNonParallel().partitionCustom((Partitioner & Serializable)(key, numPartitions) -> key % numPartitions, (KeySelector & Serializable)split -> ((DataSplit)split.f0).bucket()).transform(name, typeInfo, (OneInputStreamOperator)new MultiTablesReadOperator(catalogLoader, true));
    }
}

