/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.iterative.io;

import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.AbstractPagedInputView;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;

public class SerializedUpdateBuffer
extends AbstractPagedOutputView {
    private static final int HEADER_LENGTH = 4;
    private static final float SPILL_THRESHOLD = 0.95f;
    private final LinkedBlockingQueue<MemorySegment> emptyBuffers;
    private ArrayDeque<MemorySegment> fullBuffers;
    private BlockChannelWriter<MemorySegment> currentWriter;
    private final IOManager ioManager;
    private final FileIOChannel.Enumerator channelEnumerator;
    private final int numSegmentsSpillingThreshold;
    private int numBuffersSpilled;
    private final int minBuffersForWriteEnd;
    private final int minBuffersForSpilledReadEnd;
    private final List<ReadEnd> readEnds;
    private final int totalNumBuffers;

    public SerializedUpdateBuffer() {
        super(-1, 4);
        this.emptyBuffers = null;
        this.fullBuffers = null;
        this.ioManager = null;
        this.channelEnumerator = null;
        this.numSegmentsSpillingThreshold = -1;
        this.minBuffersForWriteEnd = -1;
        this.minBuffersForSpilledReadEnd = -1;
        this.totalNumBuffers = -1;
        this.readEnds = Collections.emptyList();
    }

    public SerializedUpdateBuffer(List<MemorySegment> memSegments, int segmentSize, IOManager ioManager) {
        super(memSegments.remove(memSegments.size() - 1), segmentSize, 4);
        this.totalNumBuffers = memSegments.size() + 1;
        if (this.totalNumBuffers < 3) {
            throw new IllegalArgumentException("SerializedUpdateBuffer needs at least 3 memory segments.");
        }
        this.emptyBuffers = new LinkedBlockingQueue(this.totalNumBuffers);
        this.fullBuffers = new ArrayDeque(64);
        this.emptyBuffers.addAll(memSegments);
        int threshold = (int)(0.050000012f * (float)this.totalNumBuffers);
        this.numSegmentsSpillingThreshold = threshold > 0 ? threshold : 0;
        this.minBuffersForWriteEnd = Math.max(2, Math.min(16, this.totalNumBuffers / 2));
        this.minBuffersForSpilledReadEnd = Math.max(1, Math.min(16, this.totalNumBuffers / 4));
        if (this.minBuffersForSpilledReadEnd + this.minBuffersForWriteEnd > this.totalNumBuffers) {
            throw new IllegalArgumentException("BUG: Unfulfillable memory assignment.");
        }
        this.ioManager = ioManager;
        this.channelEnumerator = ioManager.createChannelEnumerator();
        this.readEnds = new ArrayList<ReadEnd>();
    }

    @Override
    protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent) throws IOException {
        current.putInt(0, positionInCurrent);
        if (this.emptyBuffers.size() > this.numSegmentsSpillingThreshold) {
            this.fullBuffers.addLast(current);
        } else {
            if (this.currentWriter == null) {
                this.currentWriter = this.ioManager.createBlockChannelWriter(this.channelEnumerator.next(), this.emptyBuffers);
            }
            this.numBuffersSpilled += this.fullBuffers.size();
            while (this.fullBuffers.size() > 0) {
                this.currentWriter.writeBlock(this.fullBuffers.removeFirst());
            }
            this.currentWriter.writeBlock(current);
            ++this.numBuffersSpilled;
        }
        try {
            return this.emptyBuffers.take();
        }
        catch (InterruptedException iex) {
            throw new RuntimeException("Spilling Fifo Queue was interrupted while waiting for next buffer.");
        }
    }

    public void flush() throws IOException {
        this.advance();
    }

    public ReadEnd switchBuffers() throws IOException {
        ReadEnd readEnd;
        for (int i = this.readEnds.size() - 1; i >= 0; --i) {
            ReadEnd re = this.readEnds.get(i);
            if (!re.disposeIfDone()) continue;
            this.readEnds.remove(i);
        }
        MemorySegment current = this.getCurrentSegment();
        current.putInt(0, this.getCurrentPositionInSegment());
        this.fullBuffers.addLast(current);
        if (this.numBuffersSpilled == 0 && this.emptyBuffers.size() >= this.minBuffersForWriteEnd) {
            readEnd = new ReadEnd(this.fullBuffers.removeFirst(), this.emptyBuffers, this.fullBuffers, null, null, 0);
        } else {
            int toSpill = Math.min(this.minBuffersForSpilledReadEnd + this.minBuffersForWriteEnd - this.emptyBuffers.size(), this.fullBuffers.size());
            if (toSpill > 0) {
                if (this.currentWriter == null) {
                    this.currentWriter = this.ioManager.createBlockChannelWriter(this.channelEnumerator.next(), this.emptyBuffers);
                }
                for (int i = 0; i < toSpill; ++i) {
                    this.currentWriter.writeBlock(this.fullBuffers.removeFirst());
                }
                this.numBuffersSpilled += toSpill;
            }
            this.currentWriter.close();
            BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(this.currentWriter.getChannelID());
            ArrayList<MemorySegment> readSegments = new ArrayList<MemorySegment>();
            try {
                while (readSegments.size() < this.minBuffersForSpilledReadEnd) {
                    readSegments.add(this.emptyBuffers.take());
                }
                MemorySegment firstSeg = (MemorySegment)readSegments.remove(readSegments.size() - 1);
                reader.readBlock(firstSeg);
                firstSeg = reader.getReturnQueue().take();
                readEnd = new ReadEnd(firstSeg, this.emptyBuffers, this.fullBuffers, reader, readSegments, this.numBuffersSpilled - 1);
            }
            catch (InterruptedException e) {
                throw new RuntimeException("SerializedUpdateBuffer was interrupted while reclaiming memory by spilling.", e);
            }
        }
        this.fullBuffers = new ArrayDeque(64);
        this.currentWriter = null;
        this.numBuffersSpilled = 0;
        try {
            this.seekOutput(this.emptyBuffers.take(), 4);
        }
        catch (InterruptedException e) {
            throw new RuntimeException("SerializedUpdateBuffer was interrupted while reclaiming memory by spilling.", e);
        }
        this.readEnds.add(readEnd);
        return readEnd;
    }

    public List<MemorySegment> close() {
        if (this.currentWriter != null) {
            try {
                this.currentWriter.closeAndDelete();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
        ArrayList<MemorySegment> freeMem = new ArrayList<MemorySegment>(64);
        freeMem.add(this.getCurrentSegment());
        this.clear();
        freeMem.addAll(this.fullBuffers);
        this.fullBuffers = null;
        try {
            for (int i = this.readEnds.size() - 1; i >= 0; --i) {
                ReadEnd re = this.readEnds.remove(i);
                re.forceDispose(freeMem);
            }
            while (freeMem.size() < this.totalNumBuffers) {
                freeMem.add(this.emptyBuffers.take());
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException("Retrieving memory back from asynchronous I/O was interrupted.", e);
        }
        return freeMem;
    }

    private static final class ReadEnd
    extends AbstractPagedInputView {
        private final LinkedBlockingQueue<MemorySegment> emptyBufferTarget;
        private final Deque<MemorySegment> fullBufferSource;
        private final BlockChannelReader<MemorySegment> spilledBufferSource;
        private int spilledBuffersRemaining;
        private int requestsRemaining;

        private ReadEnd(MemorySegment firstMemSegment, LinkedBlockingQueue<MemorySegment> emptyBufferTarget, Deque<MemorySegment> fullBufferSource, BlockChannelReader<MemorySegment> spilledBufferSource, List<MemorySegment> emptyBuffers, int numBuffersSpilled) throws IOException {
            super(firstMemSegment, firstMemSegment.getInt(0), 4);
            this.emptyBufferTarget = emptyBufferTarget;
            this.fullBufferSource = fullBufferSource;
            this.spilledBufferSource = spilledBufferSource;
            this.requestsRemaining = numBuffersSpilled;
            this.spilledBuffersRemaining = numBuffersSpilled;
            while (this.requestsRemaining > 0 && emptyBuffers.size() > 0) {
                this.spilledBufferSource.readBlock(emptyBuffers.remove(emptyBuffers.size() - 1));
                --this.requestsRemaining;
            }
        }

        @Override
        protected MemorySegment nextSegment(MemorySegment current) throws IOException {
            if (this.requestsRemaining > 0) {
                --this.requestsRemaining;
                this.spilledBufferSource.readBlock(current);
            } else {
                this.emptyBufferTarget.add(current);
            }
            if (this.spilledBuffersRemaining > 0) {
                --this.spilledBuffersRemaining;
                try {
                    return this.spilledBufferSource.getReturnQueue().take();
                }
                catch (InterruptedException e) {
                    throw new RuntimeException("Read End was interrupted while waiting for spilled buffer.", e);
                }
            }
            if (this.fullBufferSource.size() > 0) {
                return this.fullBufferSource.removeFirst();
            }
            this.clear();
            if (this.spilledBufferSource != null) {
                this.spilledBufferSource.closeAndDelete();
            }
            throw new EOFException();
        }

        @Override
        protected int getLimitForSegment(MemorySegment segment) {
            return segment.getInt(0);
        }

        private boolean disposeIfDone() {
            if (this.fullBufferSource.isEmpty() && this.spilledBuffersRemaining == 0 && (this.getCurrentSegment() == null || this.getCurrentPositionInSegment() >= this.getCurrentSegmentLimit())) {
                if (this.getCurrentSegment() != null) {
                    this.emptyBufferTarget.add(this.getCurrentSegment());
                    this.clear();
                }
                if (this.spilledBufferSource != null) {
                    try {
                        this.spilledBufferSource.closeAndDelete();
                    }
                    catch (Throwable throwable) {
                        // empty catch block
                    }
                }
                return true;
            }
            return false;
        }

        private void forceDispose(List<MemorySegment> freeMemTarget) throws InterruptedException {
            MemorySegment current = this.getCurrentSegment();
            this.clear();
            if (current != null) {
                freeMemTarget.add(current);
            }
            freeMemTarget.addAll(this.fullBufferSource);
            for (int i = this.spilledBuffersRemaining - this.requestsRemaining; i > 0; --i) {
                freeMemTarget.add(this.emptyBufferTarget.take());
            }
            if (this.spilledBufferSource != null) {
                try {
                    this.spilledBufferSource.closeAndDelete();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }
    }
}

