/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.flink.lookup.LookupStreamingReader;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.lookup.NoPrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.PrimaryKeyLookupTable;
import org.apache.paimon.flink.lookup.SecondaryIndexLookupTable;
import org.apache.paimon.lookup.BulkLoader;
import org.apache.paimon.lookup.RocksDBState;
import org.apache.paimon.lookup.RocksDBStateFactory;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.PartialRow;
import org.apache.paimon.utils.TypeUtils;
import org.apache.paimon.utils.UserDefinedSeqComparator;

public abstract class FullCacheLookupTable
implements LookupTable {
    protected final Context context;
    protected final RowType projectedType;
    @Nullable
    protected final FieldsComparator userDefinedSeqComparator;
    protected final int appendUdsFieldNumber;
    protected RocksDBStateFactory stateFactory;
    private LookupStreamingReader reader;
    private Predicate specificPartition;

    public FullCacheLookupTable(Context context) {
        this.context = context;
        FileStoreTable table = context.table;
        List<Object> sequenceFields = new ArrayList();
        if (table.primaryKeys().size() > 0) {
            sequenceFields = new CoreOptions(table.options()).sequenceField();
        }
        RowType projectedType = TypeUtils.project(table.rowType(), context.projection);
        if (sequenceFields.size() > 0) {
            RowType.Builder builder = RowType.builder();
            projectedType.getFields().forEach(f -> builder.field(f.name(), f.type()));
            RowType rowType = table.rowType();
            AtomicInteger appendUdsFieldNumber = new AtomicInteger(0);
            sequenceFields.stream().filter(projectedType::notContainsField).map(rowType::getField).forEach(f -> {
                appendUdsFieldNumber.incrementAndGet();
                builder.field(f.name(), f.type());
            });
            projectedType = builder.build();
            this.userDefinedSeqComparator = UserDefinedSeqComparator.create(projectedType, sequenceFields);
            this.appendUdsFieldNumber = appendUdsFieldNumber.get();
        } else {
            this.userDefinedSeqComparator = null;
            this.appendUdsFieldNumber = 0;
        }
        this.projectedType = projectedType;
    }

    @Override
    public void specificPartitionFilter(Predicate filter) {
        this.specificPartition = filter;
    }

    protected void openStateFactory() throws Exception {
        this.stateFactory = new RocksDBStateFactory(this.context.tempPath.toString(), this.context.table.coreOptions().toConfiguration(), null);
    }

    protected void bootstrap() throws Exception {
        Predicate scanPredicate = PredicateBuilder.andNullable(this.context.tablePredicate, this.specificPartition);
        this.reader = new LookupStreamingReader(this.context.table, this.context.projection, scanPredicate);
        BinaryExternalSortBuffer bulkLoadSorter = RocksDBState.createBulkLoadSorter(IOManager.create(this.context.tempPath.toString()), this.context.table.coreOptions());
        Predicate predicate = this.projectedPredicate();
        try (RecordReaderIterator<InternalRow> batch = new RecordReaderIterator<InternalRow>(this.reader.nextBatch(true));){
            while (batch.hasNext()) {
                InternalRow row = batch.next();
                if (predicate != null && !predicate.test(row)) continue;
                bulkLoadSorter.write(GenericRow.of(this.toKeyBytes(row), this.toValueBytes(row)));
            }
        }
        MutableObjectIterator<BinaryRow> keyIterator = bulkLoadSorter.sortedIterator();
        BinaryRow row = new BinaryRow(2);
        TableBulkLoader bulkLoader = this.createBulkLoader();
        try {
            while ((row = keyIterator.next(row)) != null) {
                bulkLoader.write(row.getBinary(0), row.getBinary(1));
            }
        }
        catch (BulkLoader.WriteException e) {
            throw new RuntimeException("Exception in bulkLoad, the most suspicious reason is that your data contains duplicates, please check your lookup table. ", e.getCause());
        }
        bulkLoader.finish();
        bulkLoadSorter.clear();
    }

    @Override
    public void refresh() throws Exception {
        while (true) {
            RecordReaderIterator<InternalRow> batch = new RecordReaderIterator<InternalRow>(this.reader.nextBatch(false));
            Throwable throwable = null;
            try {
                if (!batch.hasNext()) {
                    return;
                }
                this.refresh(batch);
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (batch == null) continue;
                if (throwable != null) {
                    try {
                        batch.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                batch.close();
                continue;
            }
            break;
        }
    }

    @Override
    public final List<InternalRow> get(InternalRow key) throws IOException {
        List<InternalRow> values = this.innerGet(key);
        if (this.appendUdsFieldNumber == 0) {
            return values;
        }
        ArrayList<InternalRow> dropSequence = new ArrayList<InternalRow>(values.size());
        for (InternalRow matchedRow : values) {
            dropSequence.add(new PartialRow(matchedRow.getFieldCount() - this.appendUdsFieldNumber, matchedRow));
        }
        return dropSequence;
    }

    public abstract List<InternalRow> innerGet(InternalRow var1) throws IOException;

    public abstract void refresh(Iterator<InternalRow> var1) throws IOException;

    @Nullable
    public Predicate projectedPredicate() {
        return this.context.projectedPredicate;
    }

    public abstract byte[] toKeyBytes(InternalRow var1) throws IOException;

    public abstract byte[] toValueBytes(InternalRow var1) throws IOException;

    public abstract TableBulkLoader createBulkLoader();

    @Override
    public void close() throws IOException {
        this.stateFactory.close();
        FileIOUtils.deleteDirectory(this.context.tempPath);
    }

    static FullCacheLookupTable create(Context context, long lruCacheSize) {
        List<String> primaryKeys = context.table.primaryKeys();
        if (primaryKeys.isEmpty()) {
            return new NoPrimaryKeyLookupTable(context, lruCacheSize);
        }
        if (new HashSet<String>(primaryKeys).equals(new HashSet<String>(context.joinKey))) {
            return new PrimaryKeyLookupTable(context, lruCacheSize, context.joinKey);
        }
        return new SecondaryIndexLookupTable(context, lruCacheSize);
    }

    public static class Context {
        public final FileStoreTable table;
        public final int[] projection;
        @Nullable
        public final Predicate tablePredicate;
        @Nullable
        public final Predicate projectedPredicate;
        public final File tempPath;
        public final List<String> joinKey;

        public Context(FileStoreTable table, int[] projection, @Nullable Predicate tablePredicate, @Nullable Predicate projectedPredicate, File tempPath, List<String> joinKey) {
            this.table = table;
            this.projection = projection;
            this.tablePredicate = tablePredicate;
            this.projectedPredicate = projectedPredicate;
            this.tempPath = tempPath;
            this.joinKey = joinKey;
        }
    }

    public static interface TableBulkLoader {
        public void write(byte[] var1, byte[] var2) throws BulkLoader.WriteException, IOException;

        public void finish() throws IOException;
    }
}

