/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
import org.apache.flink.runtime.io.disk.iomanager.AbstractChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.runtime.operators.sort.IndexedSortable;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.io.ChannelWithMeta;
import org.apache.flink.table.runtime.operators.sort.BinaryKVExternalMerger;
import org.apache.flink.table.runtime.operators.sort.BinaryKVInMemorySortBuffer;
import org.apache.flink.table.runtime.operators.sort.BinaryMergeIterator;
import org.apache.flink.table.runtime.operators.sort.SpillChannelManager;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.FileChannelUtil;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferedKVExternalSorter {
    private static final Logger LOG = LoggerFactory.getLogger(BufferedKVExternalSorter.class);
    private volatile boolean closed = false;
    private final NormalizedKeyComputer nKeyComputer;
    private final RecordComparator comparator;
    private final BinaryRowDataSerializer keySerializer;
    private final BinaryRowDataSerializer valueSerializer;
    private final IndexedSorter sorter;
    private final BinaryKVExternalMerger merger;
    private final IOManager ioManager;
    private final int maxNumFileHandles;
    private final FileIOChannel.Enumerator enumerator;
    private final List<ChannelWithMeta> channelIDs = new ArrayList<ChannelWithMeta>();
    private final SpillChannelManager channelManager;
    private final int pageSize;
    private long numSpillFiles;
    private long spillInBytes;
    private long spillInCompressedBytes;
    private final boolean compressionEnabled;
    private final BlockCompressionFactory compressionCodecFactory;
    private final int compressionBlockSize;

    public BufferedKVExternalSorter(IOManager ioManager, BinaryRowDataSerializer keySerializer, BinaryRowDataSerializer valueSerializer, NormalizedKeyComputer nKeyComputer, RecordComparator comparator, int pageSize, int maxNumFileHandles, boolean compressionEnabled, int compressionBlockSize) {
        this.keySerializer = keySerializer;
        this.valueSerializer = valueSerializer;
        this.nKeyComputer = nKeyComputer;
        this.comparator = comparator;
        this.pageSize = pageSize;
        this.sorter = new QuickSort();
        this.maxNumFileHandles = maxNumFileHandles;
        this.compressionEnabled = compressionEnabled;
        this.compressionCodecFactory = this.compressionEnabled ? BlockCompressionFactory.createBlockCompressionFactory((String)BlockCompressionFactory.CompressionFactoryName.LZ4.toString()) : null;
        this.compressionBlockSize = compressionBlockSize;
        this.ioManager = ioManager;
        this.enumerator = this.ioManager.createChannelEnumerator();
        this.channelManager = new SpillChannelManager();
        this.merger = new BinaryKVExternalMerger(ioManager, pageSize, maxNumFileHandles, this.channelManager, keySerializer, valueSerializer, comparator, compressionEnabled, this.compressionCodecFactory, compressionBlockSize);
    }

    public MutableObjectIterator<Tuple2<BinaryRowData, BinaryRowData>> getKVIterator() throws IOException {
        List<ChannelWithMeta> channelIDs = this.channelIDs;
        while (!this.closed && channelIDs.size() > this.maxNumFileHandles) {
            channelIDs = this.merger.mergeChannelList(channelIDs);
        }
        ArrayList<FileIOChannel> openChannels = new ArrayList<FileIOChannel>();
        BinaryMergeIterator<Tuple2<BinaryRowData, BinaryRowData>> iterator = this.merger.getMergingIterator(channelIDs, openChannels);
        this.channelManager.addOpenChannels(openChannels);
        return iterator;
    }

    public void sortAndSpill(ArrayList<MemorySegment> recordBufferSegments, long numElements, MemorySegmentPool pool) throws IOException {
        int blockCount;
        int bytesInLastBuffer;
        BinaryKVInMemorySortBuffer buffer = BinaryKVInMemorySortBuffer.createBuffer(this.nKeyComputer, this.keySerializer, this.valueSerializer, this.comparator, recordBufferSegments, numElements, pool);
        this.sorter.sort((IndexedSortable)buffer);
        FileIOChannel.ID channel = this.enumerator.next();
        this.channelManager.addChannel(channel);
        AbstractChannelWriterOutputView output = null;
        try {
            ++this.numSpillFiles;
            output = FileChannelUtil.createOutputView(this.ioManager, channel, this.compressionEnabled, this.compressionCodecFactory, this.compressionBlockSize, this.pageSize);
            buffer.writeToOutput((AbstractPagedOutputView)output);
            this.spillInBytes += output.getNumBytes();
            this.spillInCompressedBytes += output.getNumCompressedBytes();
            bytesInLastBuffer = output.close();
            blockCount = output.getBlockCount();
            LOG.info("here spill the {}th kv external buffer data with {} bytes and {} compressed bytes", new Object[]{this.numSpillFiles, this.spillInBytes, this.spillInCompressedBytes});
        }
        catch (IOException e) {
            if (output != null) {
                output.close();
                output.getChannel().deleteChannel();
            }
            throw e;
        }
        this.channelIDs.add(new ChannelWithMeta(channel, blockCount, bytesInLastBuffer));
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.merger.close();
        this.channelManager.close();
    }
}

