/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.rank.window.processors;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordComparator;
import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
import org.apache.flink.table.runtime.operators.rank.TopNBuffer;
import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowProcessor;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.state.WindowMapState;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.types.RowKind;

public final class WindowRankProcessor
implements SlicingWindowProcessor<Long> {
    private static final long serialVersionUID = 1L;
    private final GeneratedRecordComparator generatedSortKeyComparator;
    private Comparator<RowData> sortKeyComparator;
    private final TypeSerializer<RowData> sortKeySerializer;
    private final WindowBuffer.Factory bufferFactory;
    private final TypeSerializer<RowData> inputSerializer;
    private final long rankStart;
    private final long rankEnd;
    private final boolean outputRankNumber;
    private final int windowEndIndex;
    private final ZoneId shiftTimeZone;
    private transient long currentProgress;
    private transient SlicingWindowProcessor.Context<Long> ctx;
    private transient WindowTimerService<Long> windowTimerService;
    private transient WindowBuffer windowBuffer;
    private transient WindowMapState<Long, List<RowData>> windowState;
    private transient JoinedRowData reuseOutput;
    private transient GenericRowData reuseRankRow;

    public WindowRankProcessor(TypeSerializer<RowData> inputSerializer, GeneratedRecordComparator genSortKeyComparator, TypeSerializer<RowData> sortKeySerializer, WindowBuffer.Factory bufferFactory, long rankStart, long rankEnd, boolean outputRankNumber, int windowEndIndex, ZoneId shiftTimeZone) {
        this.inputSerializer = inputSerializer;
        this.generatedSortKeyComparator = genSortKeyComparator;
        this.sortKeySerializer = sortKeySerializer;
        this.bufferFactory = bufferFactory;
        this.rankStart = rankStart;
        this.rankEnd = rankEnd;
        this.outputRankNumber = outputRankNumber;
        this.windowEndIndex = windowEndIndex;
        this.shiftTimeZone = shiftTimeZone;
    }

    @Override
    public void open(SlicingWindowProcessor.Context<Long> context) throws Exception {
        this.ctx = context;
        this.sortKeyComparator = (Comparator)this.generatedSortKeyComparator.newInstance(this.ctx.getRuntimeContext().getUserCodeClassLoader());
        LongSerializer namespaceSerializer = LongSerializer.INSTANCE;
        ListSerializer listSerializer = new ListSerializer(this.inputSerializer);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("window_rank", this.sortKeySerializer, (TypeSerializer)listSerializer);
        MapState state = (MapState)this.ctx.getKeyedStateBackend().getOrCreateKeyedState((TypeSerializer)namespaceSerializer, (StateDescriptor)mapStateDescriptor);
        this.windowTimerService = new WindowTimerServiceImpl(this.ctx.getTimerService(), this.shiftTimeZone);
        this.windowState = new WindowMapState((InternalMapState)state);
        this.windowBuffer = this.bufferFactory.create(this.ctx.getOperatorOwner(), this.ctx.getMemoryManager(), this.ctx.getMemorySize(), this.ctx.getRuntimeContext(), this.windowTimerService, this.ctx.getKeyedStateBackend(), this.windowState, true, this.shiftTimeZone);
        this.reuseOutput = new JoinedRowData();
        this.reuseRankRow = new GenericRowData(1);
        this.currentProgress = Long.MIN_VALUE;
    }

    @Override
    public void initializeWatermark(long watermark) {
        this.currentProgress = watermark;
    }

    @Override
    public boolean processElement(RowData key, RowData element) throws Exception {
        long sliceEnd = element.getLong(this.windowEndIndex);
        if (TimeWindowUtil.isWindowFired(sliceEnd, this.currentProgress, this.shiftTimeZone)) {
            return true;
        }
        this.windowBuffer.addElement(key, sliceEnd, element);
        return false;
    }

    @Override
    public void advanceProgress(long progress) throws Exception {
        if (progress > this.currentProgress) {
            this.currentProgress = progress;
            this.windowBuffer.advanceProgress(this.currentProgress);
        }
    }

    @Override
    public void prepareCheckpoint() throws Exception {
        this.windowBuffer.flush();
    }

    @Override
    public void clearWindow(Long windowEnd) throws Exception {
        this.windowState.clear(windowEnd);
    }

    @Override
    public void close() throws Exception {
        if (this.windowBuffer != null) {
            this.windowBuffer.close();
        }
    }

    @Override
    public TypeSerializer<Long> createWindowSerializer() {
        return LongSerializer.INSTANCE;
    }

    @Override
    public void fireWindow(Long windowEnd) throws Exception {
        TopNBuffer buffer = new TopNBuffer(this.sortKeyComparator, ArrayList::new);
        Iterator<Map.Entry<RowData, List<RowData>>> stateIterator = this.windowState.iterator(windowEnd);
        while (stateIterator.hasNext()) {
            Map.Entry<RowData, List<RowData>> entry = stateIterator.next();
            RowData sortKey = entry.getKey();
            if (!buffer.checkSortKeyInBufferRange(sortKey, this.rankEnd)) continue;
            buffer.putAll(sortKey, (Collection<RowData>)entry.getValue());
        }
        Iterator<Map.Entry<RowData, Collection<RowData>>> bufferItr = buffer.entrySet().iterator();
        long currentRank = 1L;
        while (bufferItr.hasNext() && currentRank <= this.rankEnd) {
            Map.Entry<RowData, Collection<RowData>> entry = bufferItr.next();
            Collection<RowData> records = entry.getValue();
            Iterator<RowData> recordsIter = records.iterator();
            while (recordsIter.hasNext() && currentRank <= this.rankEnd) {
                RowData rowData = recordsIter.next();
                if (currentRank >= this.rankStart && currentRank <= this.rankEnd) {
                    this.ctx.output(this.createOutputRow(rowData, currentRank));
                }
                ++currentRank;
            }
        }
    }

    private RowData createOutputRow(RowData inputRow, long rank) {
        if (this.outputRankNumber) {
            this.reuseRankRow.setField(0, (Object)rank);
            this.reuseOutput.replace(inputRow, (RowData)this.reuseRankRow);
            this.reuseOutput.setRowKind(RowKind.INSERT);
            return this.reuseOutput;
        }
        return inputRow;
    }
}

