/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.util.Preconditions;

public abstract class AbstractStreamTaskNetworkInput<T, R extends RecordDeserializer<DeserializationDelegate<StreamElement>>>
implements StreamTaskInput<T> {
    protected final CheckpointedInputGate checkpointedInputGate;
    protected final DeserializationDelegate<StreamElement> deserializationDelegate;
    protected final TypeSerializer<T> inputSerializer;
    protected final Map<InputChannelInfo, R> recordDeserializers;
    protected final Map<InputChannelInfo, Integer> flattenedChannelIndices = new HashMap<InputChannelInfo, Integer>();
    protected final StatusWatermarkValve statusWatermarkValve;
    protected final int inputIndex;
    private InputChannelInfo lastChannel = null;
    private R currentRecordDeserializer = null;
    protected final StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords;

    public AbstractStreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<T> inputSerializer, StatusWatermarkValve statusWatermarkValve, int inputIndex, Map<InputChannelInfo, R> recordDeserializers, StreamTask.CanEmitBatchOfRecordsChecker canEmitBatchOfRecords) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer<T>(inputSerializer));
        this.inputSerializer = inputSerializer;
        for (InputChannelInfo i : checkpointedInputGate.getChannelInfos()) {
            this.flattenedChannelIndices.put(i, this.flattenedChannelIndices.size());
        }
        this.statusWatermarkValve = (StatusWatermarkValve)Preconditions.checkNotNull((Object)statusWatermarkValve);
        this.inputIndex = inputIndex;
        this.recordDeserializers = (Map)Preconditions.checkNotNull(recordDeserializers);
        this.canEmitBatchOfRecords = (StreamTask.CanEmitBatchOfRecordsChecker)Preconditions.checkNotNull((Object)canEmitBatchOfRecords);
    }

    @Override
    public DataInputStatus emitNext(PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        block8: {
            DataInputStatus status;
            while (true) {
                Optional<BufferOrEvent> bufferOrEvent;
                if (this.currentRecordDeserializer != null) {
                    RecordDeserializer.DeserializationResult result;
                    try {
                        result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                    }
                    catch (IOException e) {
                        throw new IOException(String.format("Can't get next record for channel %s", this.lastChannel), e);
                    }
                    if (result.isBufferConsumed()) {
                        this.currentRecordDeserializer = null;
                    }
                    if (result.isFullRecord()) {
                        this.processElement((StreamElement)this.deserializationDelegate.getInstance(), output);
                        if (this.canEmitBatchOfRecords.check()) continue;
                        return DataInputStatus.MORE_AVAILABLE;
                    }
                }
                if (!(bufferOrEvent = this.checkpointedInputGate.pollNext()).isPresent()) break block8;
                if (bufferOrEvent.get().isBuffer()) {
                    this.processBuffer(bufferOrEvent.get());
                    continue;
                }
                status = this.processEvent(bufferOrEvent.get());
                if (status != DataInputStatus.MORE_AVAILABLE || !this.canEmitBatchOfRecords.check()) break;
            }
            return status;
        }
        if (this.checkpointedInputGate.isFinished()) {
            Preconditions.checkState((boolean)this.checkpointedInputGate.getAvailableFuture().isDone(), (Object)"Finished BarrierHandler should be available");
            return DataInputStatus.END_OF_INPUT;
        }
        return DataInputStatus.NOTHING_AVAILABLE;
    }

    private void processElement(StreamElement recordOrMark, PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()) {
            output.emitRecord(recordOrMark.asRecord());
        } else if (recordOrMark.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), this.flattenedChannelIndices.get(this.lastChannel), output);
        } else if (recordOrMark.isLatencyMarker()) {
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isWatermarkStatus()) {
            this.statusWatermarkValve.inputWatermarkStatus(recordOrMark.asWatermarkStatus(), this.flattenedChannelIndices.get(this.lastChannel), output);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }

    protected DataInputStatus processEvent(BufferOrEvent bufferOrEvent) {
        AbstractEvent event = bufferOrEvent.getEvent();
        if (event.getClass() == EndOfData.class) {
            switch (this.checkpointedInputGate.hasReceivedEndOfData()) {
                case NOT_END_OF_DATA: {
                    break;
                }
                case DRAINED: {
                    return DataInputStatus.END_OF_DATA;
                }
                case STOPPED: {
                    return DataInputStatus.STOPPED;
                }
            }
        } else if (event.getClass() == EndOfPartitionEvent.class) {
            this.releaseDeserializer(bufferOrEvent.getChannelInfo());
            if (this.checkpointedInputGate.isFinished()) {
                return DataInputStatus.END_OF_INPUT;
            }
        } else if (event.getClass() == EndOfChannelStateEvent.class && this.checkpointedInputGate.allChannelsRecovered()) {
            return DataInputStatus.END_OF_RECOVERY;
        }
        return DataInputStatus.MORE_AVAILABLE;
    }

    protected void processBuffer(BufferOrEvent bufferOrEvent) throws IOException {
        this.lastChannel = bufferOrEvent.getChannelInfo();
        Preconditions.checkState((this.lastChannel != null ? 1 : 0) != 0);
        this.currentRecordDeserializer = this.getActiveSerializer(bufferOrEvent.getChannelInfo());
        Preconditions.checkState((this.currentRecordDeserializer != null ? 1 : 0) != 0, (Object)"currentRecordDeserializer has already been released");
        this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
    }

    protected R getActiveSerializer(InputChannelInfo channelInfo) {
        return (R)((RecordDeserializer)this.recordDeserializers.get(channelInfo));
    }

    @Override
    public int getInputIndex() {
        return this.inputIndex;
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.currentRecordDeserializer != null) {
            return AVAILABLE;
        }
        return this.checkpointedInputGate.getAvailableFuture();
    }

    @Override
    public void close() throws IOException {
        for (InputChannelInfo channelInfo : new ArrayList<InputChannelInfo>(this.recordDeserializers.keySet())) {
            this.releaseDeserializer(channelInfo);
        }
    }

    protected void releaseDeserializer(InputChannelInfo channelInfo) {
        RecordDeserializer deserializer = (RecordDeserializer)this.recordDeserializers.get(channelInfo);
        if (deserializer != null) {
            deserializer.clear();
            this.recordDeserializers.remove(channelInfo);
        }
    }
}

